001    /*
002     * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
003     *
004     * Cloudera, Inc. licenses this file to you under the Apache License,
005     * Version 2.0 (the "License"). You may not use this file except in
006     * compliance with the License. You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
011     * CONDITIONS OF ANY KIND, either express or implied. See the License for
012     * the specific language governing permissions and limitations under the
013     * License.
014     */
015    package com.cloudera.lib.service.instrumentation;
016    
017    import com.cloudera.lib.server.BaseService;
018    import com.cloudera.lib.server.ServiceException;
019    import com.cloudera.lib.service.Instrumentation;
020    import com.cloudera.lib.service.Scheduler;
021    import org.json.simple.JSONAware;
022    import org.json.simple.JSONObject;
023    import org.json.simple.JSONStreamAware;
024    
025    import java.io.IOException;
026    import java.io.Writer;
027    import java.util.ArrayList;
028    import java.util.LinkedHashMap;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicLong;
034    import java.util.concurrent.locks.Lock;
035    import java.util.concurrent.locks.ReentrantLock;
036    
037    public class InstrumentationService extends BaseService implements Instrumentation {
038      public static final String PREFIX = "instrumentation";
039      public static final String CONF_TIMERS_SIZE = "timers.size";
040    
041      private int timersSize;
042      private Lock counterLock;
043      private Lock timerLock;
044      private Lock variableLock;
045      private Lock samplerLock;
046      private Map<String, VariableHolder> jvmVariables;
047      private Map<String, Map<String, AtomicLong>> counters;
048      private Map<String, Map<String, Timer>> timers;
049      private Map<String, Map<String, VariableHolder>> variables;
050      private Map<String, Map<String, Sampler>> samplers;
051      private List<Sampler> samplersList;
052      private Map<String, Map<String, ?>> all;
053    
054      public InstrumentationService() {
055        super(PREFIX);
056      }
057    
058      @Override
059      @SuppressWarnings("unchecked")
060      public void init() throws ServiceException {
061        timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
062        counterLock = new ReentrantLock();
063        timerLock = new ReentrantLock();
064        variableLock = new ReentrantLock();
065        samplerLock = new ReentrantLock();
066        jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
067        counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
068        timers = new ConcurrentHashMap<String, Map<String, Timer>>();
069        variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
070        samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
071        samplersList = new ArrayList<Sampler>();
072        all = new LinkedHashMap<String, Map<String, ?>>();
073        all.put("os-env", System.getenv());
074        all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
075        all.put("jvm", jvmVariables);
076        all.put("counters", (Map) counters);
077        all.put("timers", (Map) timers);
078        all.put("variables", (Map) variables);
079        all.put("samplers", (Map) samplers);
080    
081        jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
082          public Long getValue() {
083            return Runtime.getRuntime().freeMemory();
084          }
085        }));
086        jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
087          public Long getValue() {
088            return Runtime.getRuntime().maxMemory();
089          }
090        }));
091        jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
092          public Long getValue() {
093            return Runtime.getRuntime().totalMemory();
094          }
095        }));
096      }
097    
098      @Override
099      public void postInit() throws ServiceException {
100        Scheduler scheduler = getServer().get(Scheduler.class);
101        if (scheduler != null) {
102          scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
103        }
104      }
105    
106      @Override
107      public Class getInterface() {
108        return Instrumentation.class;
109      }
110    
111      @SuppressWarnings("unchecked")
112      private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
113        boolean locked = false;
114        try {
115          Map<String, T> groupMap = map.get(group);
116          if (groupMap == null) {
117            lock.lock();
118            locked = true;
119            groupMap = map.get(group);
120            if (groupMap == null) {
121              groupMap = new ConcurrentHashMap<String, T>();
122              map.put(group, groupMap);
123            }
124          }
125          T element = groupMap.get(name);
126          if (element == null) {
127            if (!locked) {
128              lock.lock();
129              locked = true;
130            }
131            element = groupMap.get(name);
132            if (element == null) {
133              try {
134                if (klass == Timer.class) {
135                  element = (T) new Timer(timersSize);
136                }
137                else {
138                  element = klass.newInstance();
139                }
140              }
141              catch (Exception ex) {
142                throw new RuntimeException(ex);
143              }
144              groupMap.put(name, element);
145            }
146          }
147          return element;
148        }
149        finally {
150          if (locked) {
151            lock.unlock();
152          }
153        }
154      }
155    
156      static class Cron implements Instrumentation.Cron {
157        long start;
158        long lapStart;
159        long own;
160        long total;
161    
162        public Cron start() {
163          if (total != 0) {
164            throw new IllegalStateException("Cron already used");
165          }
166          if (start == 0) {
167            start = System.currentTimeMillis();
168            lapStart = start;
169          }
170          else if (lapStart == 0) {
171            lapStart = System.currentTimeMillis();
172          }
173          return this;
174        }
175    
176        public Cron stop() {
177          if (total != 0) {
178            throw new IllegalStateException("Cron already used");
179          }
180          if (lapStart > 0) {
181            own += System.currentTimeMillis() - lapStart;
182            lapStart = 0;
183          }
184          return this;
185        }
186    
187        void end() {
188          stop();
189          total = System.currentTimeMillis() - start;
190        }
191    
192      }
193    
194      static class Timer implements JSONAware, JSONStreamAware {
195        static final int LAST_TOTAL = 0;
196        static final int LAST_OWN = 1;
197        static final int AVG_TOTAL = 2;
198        static final int AVG_OWN = 3;
199    
200        Lock lock = new ReentrantLock();
201        private long[] own;
202        private long[] total;
203        private int last;
204        private boolean full;
205        private int size;
206    
207        public Timer(int size) {
208          this.size = size;
209          own = new long[size];
210          total = new long[size];
211          for (int i = 0; i < size; i++) {
212            own[i] = -1;
213            total[i] = -1;
214          }
215          last = -1;
216        }
217    
218        long[] getValues() {
219          lock.lock();
220          try {
221            long[] values = new long[4];
222            values[LAST_TOTAL] = total[last];
223            values[LAST_OWN] = own[last];
224            int limit = (full) ? size : (last + 1);
225            for (int i = 0; i < limit; i++) {
226              values[AVG_TOTAL] += total[i];
227              values[AVG_OWN] += own[i];
228            }
229            values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
230            values[AVG_OWN] = values[AVG_OWN] / limit;
231            return values;
232          }
233          finally {
234            lock.unlock();
235          }
236        }
237    
238        void addCron(Cron cron) {
239          cron.end();
240          lock.lock();
241          try {
242            last = (last + 1) % size;
243            full = full || last == (size - 1);
244            total[last] = cron.total;
245            own[last] = cron.own;
246          }
247          finally {
248            lock.unlock();
249          }
250        }
251    
252        @SuppressWarnings("unchecked")
253        private JSONObject getJSON() {
254          long[] values = getValues();
255          JSONObject json = new JSONObject();
256          json.put("lastTotal", values[0]);
257          json.put("lastOwn", values[1]);
258          json.put("avgTotal", values[2]);
259          json.put("avgOwn", values[3]);
260          return json;
261        }
262    
263        @Override
264        public String toJSONString() {
265          return getJSON().toJSONString();
266        }
267    
268        @Override
269        public void writeJSONString(Writer out) throws IOException {
270          getJSON().writeJSONString(out);
271        }
272    
273      }
274    
275      @Override
276      public Cron createCron() {
277        return new Cron();
278      }
279    
280      @Override
281      public void incr(String group, String name, long count) {
282        AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
283        counter.addAndGet(count);
284      }
285    
286      @Override
287      public void addCron(String group, String name, Instrumentation.Cron cron) {
288        Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
289        timer.addCron((Cron) cron);
290      }
291    
292      static class VariableHolder<E> implements JSONAware, JSONStreamAware {
293        Variable<E> var;
294    
295        public VariableHolder() {
296        }
297    
298        public VariableHolder(Variable<E> var) {
299          this.var = var;
300        }
301    
302        @SuppressWarnings("unchecked")
303        private JSONObject getJSON() {
304          JSONObject json = new JSONObject();
305          json.put("value", var.getValue());
306          return json;
307        }
308    
309        @Override
310        public String toJSONString() {
311          return getJSON().toJSONString();
312        }
313    
314        @Override
315        public void writeJSONString(Writer out) throws IOException {
316          out.write(toJSONString());
317        }
318    
319      }
320    
321      @Override
322      public void addVariable(String group, String name, Variable<?> variable) {
323        VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
324        holder.var = variable;
325      }
326    
327      static class Sampler implements JSONAware, JSONStreamAware {
328        Variable<Long> variable;
329        long[] values;
330        private AtomicLong sum;
331        private int last;
332        private boolean full;
333    
334        void init(int size, Variable<Long> variable) {
335          this.variable = variable;
336          values = new long[size];
337          sum = new AtomicLong();
338          last = 0;
339        }
340    
341        void sample() {
342          int index = last;
343          long valueGoingOut = values[last];
344          full = full || last == (values.length - 1);
345          last = (last + 1) % values.length;
346          values[index] = variable.getValue();
347          sum.addAndGet(-valueGoingOut + values[index]);
348        }
349    
350        double getRate() {
351          return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
352        }
353    
354        @SuppressWarnings("unchecked")
355        private JSONObject getJSON() {
356          JSONObject json = new JSONObject();
357          json.put("sampler", getRate());
358          json.put("size", (full) ? values.length : last);
359          return json;
360        }
361    
362        @Override
363        public String toJSONString() {
364          return getJSON().toJSONString();
365        }
366    
367        @Override
368        public void writeJSONString(Writer out) throws IOException {
369          out.write(toJSONString());
370        }
371      }
372    
373      @Override
374      public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
375        Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
376        samplerLock.lock();
377        try {
378          sampler.init(samplingSize, variable);
379          samplersList.add(sampler);
380        }
381        finally {
382          samplerLock.unlock();
383        }
384      }
385    
386      class SamplersRunnable implements Runnable {
387    
388        @Override
389        public void run() {
390          samplerLock.lock();
391          try {
392            for (Sampler sampler : samplersList) {
393              sampler.sample();
394            }
395          }
396          finally {
397            samplerLock.unlock();
398          }
399        }
400      }
401    
402      @Override
403      public Map<String, Map<String, ?>> getSnapshot() {
404        return all;
405      }
406    
407    
408    }