001 /* 002 * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. 003 * 004 * Cloudera, Inc. licenses this file to you under the Apache License, 005 * Version 2.0 (the "License"). You may not use this file except in 006 * compliance with the License. You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 011 * CONDITIONS OF ANY KIND, either express or implied. See the License for 012 * the specific language governing permissions and limitations under the 013 * License. 014 */ 015 package com.cloudera.lib.service.scheduler; 016 017 import com.cloudera.lib.lang.RunnableCallable; 018 import com.cloudera.lib.server.BaseService; 019 import com.cloudera.lib.server.Server; 020 import com.cloudera.lib.server.ServiceException; 021 import com.cloudera.lib.service.Instrumentation; 022 import com.cloudera.lib.service.Scheduler; 023 import com.cloudera.lib.util.Check; 024 import org.slf4j.Logger; 025 import org.slf4j.LoggerFactory; 026 027 import java.text.MessageFormat; 028 import java.util.concurrent.Callable; 029 import java.util.concurrent.ScheduledExecutorService; 030 import java.util.concurrent.ScheduledThreadPoolExecutor; 031 import java.util.concurrent.TimeUnit; 032 033 public class SchedulerService extends BaseService implements Scheduler { 034 private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class); 035 036 private static final String INST_GROUP = "scheduler"; 037 038 public static final String PREFIX = "scheduler"; 039 040 public static final String CONF_THREADS = "threads"; 041 042 private ScheduledExecutorService scheduler; 043 044 public SchedulerService() { 045 super(PREFIX); 046 } 047 048 @Override 049 public void init() throws ServiceException { 050 int threads = getServiceConfig().getInt(CONF_THREADS, 5); 051 scheduler = new ScheduledThreadPoolExecutor(threads); 052 LOG.debug("Scheduler started"); 053 } 054 055 @Override 056 public void destroy() { 057 try { 058 long limit = System.currentTimeMillis() + 30 * 1000; 059 scheduler.shutdownNow(); 060 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 061 LOG.debug("Waiting for scheduler to shutdown"); 062 if (System.currentTimeMillis() > limit) { 063 LOG.warn("Gave up waiting for scheduler to shutdown"); 064 break; 065 } 066 } 067 if (scheduler.isTerminated()) { 068 LOG.debug("Scheduler shutdown"); 069 } 070 } 071 catch (InterruptedException ex) { 072 LOG.warn(ex.getMessage(), ex); 073 } 074 } 075 076 @Override 077 public Class[] getServiceDependencies() { 078 return new Class[]{Instrumentation.class}; 079 } 080 081 @Override 082 public Class getInterface() { 083 return Scheduler.class; 084 } 085 086 @Override 087 public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) { 088 Check.notNull(callable, "callable"); 089 if (!scheduler.isShutdown()) { 090 LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]", 091 new Object[]{callable, delay, interval, unit}); 092 Runnable r = new Runnable() { 093 public void run() { 094 String instrName = callable.getClass().getSimpleName(); 095 Instrumentation instr = getServer().get(Instrumentation.class); 096 if (getServer().getStatus() == Server.Status.HALTED) { 097 LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus()); 098 instr.incr(INST_GROUP, instrName + ".skips", 1); 099 } 100 else { 101 LOG.debug("Executing [{}]", callable); 102 instr.incr(INST_GROUP, instrName + ".execs", 1); 103 Instrumentation.Cron cron = instr.createCron().start(); 104 try { 105 callable.call(); 106 } 107 catch (Exception ex) { 108 instr.incr(INST_GROUP, instrName + ".fails", 1); 109 LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex}); 110 } 111 finally { 112 instr.addCron(INST_GROUP, instrName, cron.stop()); 113 } 114 } 115 } 116 }; 117 scheduler.scheduleWithFixedDelay(r, delay, interval, unit); 118 } 119 else { 120 throw new IllegalStateException( 121 MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable)); 122 } 123 } 124 125 @Override 126 public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) { 127 schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit); 128 } 129 130 }