001    /*
002     * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
003     *
004     * Cloudera, Inc. licenses this file to you under the Apache License,
005     * Version 2.0 (the "License"). You may not use this file except in
006     * compliance with the License. You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
011     * CONDITIONS OF ANY KIND, either express or implied. See the License for
012     * the specific language governing permissions and limitations under the
013     * License.
014     */
015    package com.cloudera.lib.service.scheduler;
016    
017    import com.cloudera.lib.lang.RunnableCallable;
018    import com.cloudera.lib.server.BaseService;
019    import com.cloudera.lib.server.Server;
020    import com.cloudera.lib.server.ServiceException;
021    import com.cloudera.lib.service.Instrumentation;
022    import com.cloudera.lib.service.Scheduler;
023    import com.cloudera.lib.util.Check;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    import java.text.MessageFormat;
028    import java.util.concurrent.Callable;
029    import java.util.concurrent.ScheduledExecutorService;
030    import java.util.concurrent.ScheduledThreadPoolExecutor;
031    import java.util.concurrent.TimeUnit;
032    
033    public class SchedulerService extends BaseService implements Scheduler {
034      private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
035    
036      private static final String INST_GROUP = "scheduler";
037    
038      public static final String PREFIX = "scheduler";
039    
040      public static final String CONF_THREADS = "threads";
041    
042      private ScheduledExecutorService scheduler;
043    
044      public SchedulerService() {
045        super(PREFIX);
046      }
047    
048      @Override
049      public void init() throws ServiceException {
050        int threads = getServiceConfig().getInt(CONF_THREADS, 5);
051        scheduler = new ScheduledThreadPoolExecutor(threads);
052        LOG.debug("Scheduler started");
053      }
054    
055      @Override
056      public void destroy() {
057        try {
058          long limit = System.currentTimeMillis() + 30 * 1000;
059          scheduler.shutdownNow();
060          while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
061            LOG.debug("Waiting for scheduler to shutdown");
062            if (System.currentTimeMillis() > limit) {
063              LOG.warn("Gave up waiting for scheduler to shutdown");
064              break;
065            }
066          }
067          if (scheduler.isTerminated()) {
068            LOG.debug("Scheduler shutdown");
069          }
070        }
071        catch (InterruptedException ex) {
072          LOG.warn(ex.getMessage(), ex);
073        }
074      }
075    
076      @Override
077      public Class[] getServiceDependencies() {
078        return new Class[]{Instrumentation.class};
079      }
080    
081      @Override
082      public Class getInterface() {
083        return Scheduler.class;
084      }
085    
086      @Override
087      public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) {
088        Check.notNull(callable, "callable");
089        if (!scheduler.isShutdown()) {
090          LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]",
091                    new Object[]{callable, delay, interval, unit});
092          Runnable r = new Runnable() {
093            public void run() {
094              String instrName = callable.getClass().getSimpleName();
095              Instrumentation instr = getServer().get(Instrumentation.class);
096              if (getServer().getStatus() == Server.Status.HALTED) {
097                LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus());
098                instr.incr(INST_GROUP, instrName + ".skips", 1);
099              }
100              else {
101                LOG.debug("Executing [{}]", callable);
102                instr.incr(INST_GROUP, instrName + ".execs", 1);
103                Instrumentation.Cron cron = instr.createCron().start();
104                try {
105                  callable.call();
106                }
107                catch (Exception ex) {
108                  instr.incr(INST_GROUP, instrName + ".fails", 1);
109                  LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex});
110                }
111                finally {
112                  instr.addCron(INST_GROUP, instrName, cron.stop());
113                }
114              }
115            }
116          };
117          scheduler.scheduleWithFixedDelay(r, delay, interval, unit);
118        }
119        else {
120          throw new IllegalStateException(
121            MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable));
122        }
123      }
124    
125      @Override
126      public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) {
127        schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit);
128      }
129    
130    }