001 /*
002 * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
003 *
004 * Cloudera, Inc. licenses this file to you under the Apache License,
005 * Version 2.0 (the "License"). You may not use this file except in
006 * compliance with the License. You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
011 * CONDITIONS OF ANY KIND, either express or implied. See the License for
012 * the specific language governing permissions and limitations under the
013 * License.
014 */
015 package com.cloudera.lib.service.instrumentation;
016
017 import com.cloudera.lib.server.BaseService;
018 import com.cloudera.lib.server.ServiceException;
019 import com.cloudera.lib.service.Instrumentation;
020 import com.cloudera.lib.service.Scheduler;
021 import org.json.simple.JSONAware;
022 import org.json.simple.JSONObject;
023 import org.json.simple.JSONStreamAware;
024
025 import java.io.IOException;
026 import java.io.Writer;
027 import java.util.ArrayList;
028 import java.util.LinkedHashMap;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicLong;
034 import java.util.concurrent.locks.Lock;
035 import java.util.concurrent.locks.ReentrantLock;
036
037 public class InstrumentationService extends BaseService implements Instrumentation {
038 public static final String PREFIX = "instrumentation";
039 public static final String CONF_TIMERS_SIZE = "timers.size";
040
041 private int timersSize;
042 private Lock counterLock;
043 private Lock timerLock;
044 private Lock variableLock;
045 private Lock samplerLock;
046 private Map<String, VariableHolder> jvmVariables;
047 private Map<String, Map<String, AtomicLong>> counters;
048 private Map<String, Map<String, Timer>> timers;
049 private Map<String, Map<String, VariableHolder>> variables;
050 private Map<String, Map<String, Sampler>> samplers;
051 private List<Sampler> samplersList;
052 private Map<String, Map<String, ?>> all;
053
054 public InstrumentationService() {
055 super(PREFIX);
056 }
057
058 @Override
059 @SuppressWarnings("unchecked")
060 public void init() throws ServiceException {
061 timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
062 counterLock = new ReentrantLock();
063 timerLock = new ReentrantLock();
064 variableLock = new ReentrantLock();
065 samplerLock = new ReentrantLock();
066 jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
067 counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
068 timers = new ConcurrentHashMap<String, Map<String, Timer>>();
069 variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
070 samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
071 samplersList = new ArrayList<Sampler>();
072 all = new LinkedHashMap<String, Map<String, ?>>();
073 all.put("os-env", System.getenv());
074 all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
075 all.put("jvm", jvmVariables);
076 all.put("counters", (Map) counters);
077 all.put("timers", (Map) timers);
078 all.put("variables", (Map) variables);
079 all.put("samplers", (Map) samplers);
080
081 jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
082 public Long getValue() {
083 return Runtime.getRuntime().freeMemory();
084 }
085 }));
086 jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
087 public Long getValue() {
088 return Runtime.getRuntime().maxMemory();
089 }
090 }));
091 jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
092 public Long getValue() {
093 return Runtime.getRuntime().totalMemory();
094 }
095 }));
096 }
097
098 @Override
099 public void postInit() throws ServiceException {
100 Scheduler scheduler = getServer().get(Scheduler.class);
101 if (scheduler != null) {
102 scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
103 }
104 }
105
106 @Override
107 public Class getInterface() {
108 return Instrumentation.class;
109 }
110
111 @SuppressWarnings("unchecked")
112 private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
113 boolean locked = false;
114 try {
115 Map<String, T> groupMap = map.get(group);
116 if (groupMap == null) {
117 lock.lock();
118 locked = true;
119 groupMap = map.get(group);
120 if (groupMap == null) {
121 groupMap = new ConcurrentHashMap<String, T>();
122 map.put(group, groupMap);
123 }
124 }
125 T element = groupMap.get(name);
126 if (element == null) {
127 if (!locked) {
128 lock.lock();
129 locked = true;
130 }
131 element = groupMap.get(name);
132 if (element == null) {
133 try {
134 if (klass == Timer.class) {
135 element = (T) new Timer(timersSize);
136 }
137 else {
138 element = klass.newInstance();
139 }
140 }
141 catch (Exception ex) {
142 throw new RuntimeException(ex);
143 }
144 groupMap.put(name, element);
145 }
146 }
147 return element;
148 }
149 finally {
150 if (locked) {
151 lock.unlock();
152 }
153 }
154 }
155
156 static class Cron implements Instrumentation.Cron {
157 long start;
158 long lapStart;
159 long own;
160 long total;
161
162 public Cron start() {
163 if (total != 0) {
164 throw new IllegalStateException("Cron already used");
165 }
166 if (start == 0) {
167 start = System.currentTimeMillis();
168 lapStart = start;
169 }
170 else if (lapStart == 0) {
171 lapStart = System.currentTimeMillis();
172 }
173 return this;
174 }
175
176 public Cron stop() {
177 if (total != 0) {
178 throw new IllegalStateException("Cron already used");
179 }
180 if (lapStart > 0) {
181 own += System.currentTimeMillis() - lapStart;
182 lapStart = 0;
183 }
184 return this;
185 }
186
187 void end() {
188 stop();
189 total = System.currentTimeMillis() - start;
190 }
191
192 }
193
194 static class Timer implements JSONAware, JSONStreamAware {
195 static final int LAST_TOTAL = 0;
196 static final int LAST_OWN = 1;
197 static final int AVG_TOTAL = 2;
198 static final int AVG_OWN = 3;
199
200 Lock lock = new ReentrantLock();
201 private long[] own;
202 private long[] total;
203 private int last;
204 private boolean full;
205 private int size;
206
207 public Timer(int size) {
208 this.size = size;
209 own = new long[size];
210 total = new long[size];
211 for (int i = 0; i < size; i++) {
212 own[i] = -1;
213 total[i] = -1;
214 }
215 last = -1;
216 }
217
218 long[] getValues() {
219 lock.lock();
220 try {
221 long[] values = new long[4];
222 values[LAST_TOTAL] = total[last];
223 values[LAST_OWN] = own[last];
224 int limit = (full) ? size : (last + 1);
225 for (int i = 0; i < limit; i++) {
226 values[AVG_TOTAL] += total[i];
227 values[AVG_OWN] += own[i];
228 }
229 values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
230 values[AVG_OWN] = values[AVG_OWN] / limit;
231 return values;
232 }
233 finally {
234 lock.unlock();
235 }
236 }
237
238 void addCron(Cron cron) {
239 cron.end();
240 lock.lock();
241 try {
242 last = (last + 1) % size;
243 full = full || last == (size - 1);
244 total[last] = cron.total;
245 own[last] = cron.own;
246 }
247 finally {
248 lock.unlock();
249 }
250 }
251
252 @SuppressWarnings("unchecked")
253 private JSONObject getJSON() {
254 long[] values = getValues();
255 JSONObject json = new JSONObject();
256 json.put("lastTotal", values[0]);
257 json.put("lastOwn", values[1]);
258 json.put("avgTotal", values[2]);
259 json.put("avgOwn", values[3]);
260 return json;
261 }
262
263 @Override
264 public String toJSONString() {
265 return getJSON().toJSONString();
266 }
267
268 @Override
269 public void writeJSONString(Writer out) throws IOException {
270 getJSON().writeJSONString(out);
271 }
272
273 }
274
275 @Override
276 public Cron createCron() {
277 return new Cron();
278 }
279
280 @Override
281 public void incr(String group, String name, long count) {
282 AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
283 counter.addAndGet(count);
284 }
285
286 @Override
287 public void addCron(String group, String name, Instrumentation.Cron cron) {
288 Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
289 timer.addCron((Cron) cron);
290 }
291
292 static class VariableHolder<E> implements JSONAware, JSONStreamAware {
293 Variable<E> var;
294
295 public VariableHolder() {
296 }
297
298 public VariableHolder(Variable<E> var) {
299 this.var = var;
300 }
301
302 @SuppressWarnings("unchecked")
303 private JSONObject getJSON() {
304 JSONObject json = new JSONObject();
305 json.put("value", var.getValue());
306 return json;
307 }
308
309 @Override
310 public String toJSONString() {
311 return getJSON().toJSONString();
312 }
313
314 @Override
315 public void writeJSONString(Writer out) throws IOException {
316 out.write(toJSONString());
317 }
318
319 }
320
321 @Override
322 public void addVariable(String group, String name, Variable<?> variable) {
323 VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
324 holder.var = variable;
325 }
326
327 static class Sampler implements JSONAware, JSONStreamAware {
328 Variable<Long> variable;
329 long[] values;
330 private AtomicLong sum;
331 private int last;
332 private boolean full;
333
334 void init(int size, Variable<Long> variable) {
335 this.variable = variable;
336 values = new long[size];
337 sum = new AtomicLong();
338 last = 0;
339 }
340
341 void sample() {
342 int index = last;
343 long valueGoingOut = values[last];
344 full = full || last == (values.length - 1);
345 last = (last + 1) % values.length;
346 values[index] = variable.getValue();
347 sum.addAndGet(-valueGoingOut + values[index]);
348 }
349
350 double getRate() {
351 return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
352 }
353
354 @SuppressWarnings("unchecked")
355 private JSONObject getJSON() {
356 JSONObject json = new JSONObject();
357 json.put("sampler", getRate());
358 json.put("size", (full) ? values.length : last);
359 return json;
360 }
361
362 @Override
363 public String toJSONString() {
364 return getJSON().toJSONString();
365 }
366
367 @Override
368 public void writeJSONString(Writer out) throws IOException {
369 out.write(toJSONString());
370 }
371 }
372
373 @Override
374 public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
375 Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
376 samplerLock.lock();
377 try {
378 sampler.init(samplingSize, variable);
379 samplersList.add(sampler);
380 }
381 finally {
382 samplerLock.unlock();
383 }
384 }
385
386 class SamplersRunnable implements Runnable {
387
388 @Override
389 public void run() {
390 samplerLock.lock();
391 try {
392 for (Sampler sampler : samplersList) {
393 sampler.sample();
394 }
395 }
396 finally {
397 samplerLock.unlock();
398 }
399 }
400 }
401
402 @Override
403 public Map<String, Map<String, ?>> getSnapshot() {
404 return all;
405 }
406
407
408 }