001 /* 002 * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. 003 * 004 * Cloudera, Inc. licenses this file to you under the Apache License, 005 * Version 2.0 (the "License"). You may not use this file except in 006 * compliance with the License. You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 011 * CONDITIONS OF ANY KIND, either express or implied. See the License for 012 * the specific language governing permissions and limitations under the 013 * License. 014 */ 015 package com.cloudera.lib.service.instrumentation; 016 017 import com.cloudera.lib.server.BaseService; 018 import com.cloudera.lib.server.ServiceException; 019 import com.cloudera.lib.service.Instrumentation; 020 import com.cloudera.lib.service.Scheduler; 021 import org.json.simple.JSONAware; 022 import org.json.simple.JSONObject; 023 import org.json.simple.JSONStreamAware; 024 025 import java.io.IOException; 026 import java.io.Writer; 027 import java.util.ArrayList; 028 import java.util.LinkedHashMap; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.concurrent.ConcurrentHashMap; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicLong; 034 import java.util.concurrent.locks.Lock; 035 import java.util.concurrent.locks.ReentrantLock; 036 037 public class InstrumentationService extends BaseService implements Instrumentation { 038 public static final String PREFIX = "instrumentation"; 039 public static final String CONF_TIMERS_SIZE = "timers.size"; 040 041 private int timersSize; 042 private Lock counterLock; 043 private Lock timerLock; 044 private Lock variableLock; 045 private Lock samplerLock; 046 private Map<String, VariableHolder> jvmVariables; 047 private Map<String, Map<String, AtomicLong>> counters; 048 private Map<String, Map<String, Timer>> timers; 049 private Map<String, Map<String, VariableHolder>> variables; 050 private Map<String, Map<String, Sampler>> samplers; 051 private List<Sampler> samplersList; 052 private Map<String, Map<String, ?>> all; 053 054 public InstrumentationService() { 055 super(PREFIX); 056 } 057 058 @Override 059 @SuppressWarnings("unchecked") 060 public void init() throws ServiceException { 061 timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10); 062 counterLock = new ReentrantLock(); 063 timerLock = new ReentrantLock(); 064 variableLock = new ReentrantLock(); 065 samplerLock = new ReentrantLock(); 066 jvmVariables = new ConcurrentHashMap<String, VariableHolder>(); 067 counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>(); 068 timers = new ConcurrentHashMap<String, Map<String, Timer>>(); 069 variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>(); 070 samplers = new ConcurrentHashMap<String, Map<String, Sampler>>(); 071 samplersList = new ArrayList<Sampler>(); 072 all = new LinkedHashMap<String, Map<String, ?>>(); 073 all.put("os-env", System.getenv()); 074 all.put("sys-props", (Map<String, ?>) (Map) System.getProperties()); 075 all.put("jvm", jvmVariables); 076 all.put("counters", (Map) counters); 077 all.put("timers", (Map) timers); 078 all.put("variables", (Map) variables); 079 all.put("samplers", (Map) samplers); 080 081 jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 082 public Long getValue() { 083 return Runtime.getRuntime().freeMemory(); 084 } 085 })); 086 jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 087 public Long getValue() { 088 return Runtime.getRuntime().maxMemory(); 089 } 090 })); 091 jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 092 public Long getValue() { 093 return Runtime.getRuntime().totalMemory(); 094 } 095 })); 096 } 097 098 @Override 099 public void postInit() throws ServiceException { 100 Scheduler scheduler = getServer().get(Scheduler.class); 101 if (scheduler != null) { 102 scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS); 103 } 104 } 105 106 @Override 107 public Class getInterface() { 108 return Instrumentation.class; 109 } 110 111 @SuppressWarnings("unchecked") 112 private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) { 113 boolean locked = false; 114 try { 115 Map<String, T> groupMap = map.get(group); 116 if (groupMap == null) { 117 lock.lock(); 118 locked = true; 119 groupMap = map.get(group); 120 if (groupMap == null) { 121 groupMap = new ConcurrentHashMap<String, T>(); 122 map.put(group, groupMap); 123 } 124 } 125 T element = groupMap.get(name); 126 if (element == null) { 127 if (!locked) { 128 lock.lock(); 129 locked = true; 130 } 131 element = groupMap.get(name); 132 if (element == null) { 133 try { 134 if (klass == Timer.class) { 135 element = (T) new Timer(timersSize); 136 } 137 else { 138 element = klass.newInstance(); 139 } 140 } 141 catch (Exception ex) { 142 throw new RuntimeException(ex); 143 } 144 groupMap.put(name, element); 145 } 146 } 147 return element; 148 } 149 finally { 150 if (locked) { 151 lock.unlock(); 152 } 153 } 154 } 155 156 static class Cron implements Instrumentation.Cron { 157 long start; 158 long lapStart; 159 long own; 160 long total; 161 162 public Cron start() { 163 if (total != 0) { 164 throw new IllegalStateException("Cron already used"); 165 } 166 if (start == 0) { 167 start = System.currentTimeMillis(); 168 lapStart = start; 169 } 170 else if (lapStart == 0) { 171 lapStart = System.currentTimeMillis(); 172 } 173 return this; 174 } 175 176 public Cron stop() { 177 if (total != 0) { 178 throw new IllegalStateException("Cron already used"); 179 } 180 if (lapStart > 0) { 181 own += System.currentTimeMillis() - lapStart; 182 lapStart = 0; 183 } 184 return this; 185 } 186 187 void end() { 188 stop(); 189 total = System.currentTimeMillis() - start; 190 } 191 192 } 193 194 static class Timer implements JSONAware, JSONStreamAware { 195 static final int LAST_TOTAL = 0; 196 static final int LAST_OWN = 1; 197 static final int AVG_TOTAL = 2; 198 static final int AVG_OWN = 3; 199 200 Lock lock = new ReentrantLock(); 201 private long[] own; 202 private long[] total; 203 private int last; 204 private boolean full; 205 private int size; 206 207 public Timer(int size) { 208 this.size = size; 209 own = new long[size]; 210 total = new long[size]; 211 for (int i = 0; i < size; i++) { 212 own[i] = -1; 213 total[i] = -1; 214 } 215 last = -1; 216 } 217 218 long[] getValues() { 219 lock.lock(); 220 try { 221 long[] values = new long[4]; 222 values[LAST_TOTAL] = total[last]; 223 values[LAST_OWN] = own[last]; 224 int limit = (full) ? size : (last + 1); 225 for (int i = 0; i < limit; i++) { 226 values[AVG_TOTAL] += total[i]; 227 values[AVG_OWN] += own[i]; 228 } 229 values[AVG_TOTAL] = values[AVG_TOTAL] / limit; 230 values[AVG_OWN] = values[AVG_OWN] / limit; 231 return values; 232 } 233 finally { 234 lock.unlock(); 235 } 236 } 237 238 void addCron(Cron cron) { 239 cron.end(); 240 lock.lock(); 241 try { 242 last = (last + 1) % size; 243 full = full || last == (size - 1); 244 total[last] = cron.total; 245 own[last] = cron.own; 246 } 247 finally { 248 lock.unlock(); 249 } 250 } 251 252 @SuppressWarnings("unchecked") 253 private JSONObject getJSON() { 254 long[] values = getValues(); 255 JSONObject json = new JSONObject(); 256 json.put("lastTotal", values[0]); 257 json.put("lastOwn", values[1]); 258 json.put("avgTotal", values[2]); 259 json.put("avgOwn", values[3]); 260 return json; 261 } 262 263 @Override 264 public String toJSONString() { 265 return getJSON().toJSONString(); 266 } 267 268 @Override 269 public void writeJSONString(Writer out) throws IOException { 270 getJSON().writeJSONString(out); 271 } 272 273 } 274 275 @Override 276 public Cron createCron() { 277 return new Cron(); 278 } 279 280 @Override 281 public void incr(String group, String name, long count) { 282 AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters); 283 counter.addAndGet(count); 284 } 285 286 @Override 287 public void addCron(String group, String name, Instrumentation.Cron cron) { 288 Timer timer = getToAdd(group, name, Timer.class, timerLock, timers); 289 timer.addCron((Cron) cron); 290 } 291 292 static class VariableHolder<E> implements JSONAware, JSONStreamAware { 293 Variable<E> var; 294 295 public VariableHolder() { 296 } 297 298 public VariableHolder(Variable<E> var) { 299 this.var = var; 300 } 301 302 @SuppressWarnings("unchecked") 303 private JSONObject getJSON() { 304 JSONObject json = new JSONObject(); 305 json.put("value", var.getValue()); 306 return json; 307 } 308 309 @Override 310 public String toJSONString() { 311 return getJSON().toJSONString(); 312 } 313 314 @Override 315 public void writeJSONString(Writer out) throws IOException { 316 out.write(toJSONString()); 317 } 318 319 } 320 321 @Override 322 public void addVariable(String group, String name, Variable<?> variable) { 323 VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables); 324 holder.var = variable; 325 } 326 327 static class Sampler implements JSONAware, JSONStreamAware { 328 Variable<Long> variable; 329 long[] values; 330 private AtomicLong sum; 331 private int last; 332 private boolean full; 333 334 void init(int size, Variable<Long> variable) { 335 this.variable = variable; 336 values = new long[size]; 337 sum = new AtomicLong(); 338 last = 0; 339 } 340 341 void sample() { 342 int index = last; 343 long valueGoingOut = values[last]; 344 full = full || last == (values.length - 1); 345 last = (last + 1) % values.length; 346 values[index] = variable.getValue(); 347 sum.addAndGet(-valueGoingOut + values[index]); 348 } 349 350 double getRate() { 351 return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last)); 352 } 353 354 @SuppressWarnings("unchecked") 355 private JSONObject getJSON() { 356 JSONObject json = new JSONObject(); 357 json.put("sampler", getRate()); 358 json.put("size", (full) ? values.length : last); 359 return json; 360 } 361 362 @Override 363 public String toJSONString() { 364 return getJSON().toJSONString(); 365 } 366 367 @Override 368 public void writeJSONString(Writer out) throws IOException { 369 out.write(toJSONString()); 370 } 371 } 372 373 @Override 374 public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) { 375 Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers); 376 samplerLock.lock(); 377 try { 378 sampler.init(samplingSize, variable); 379 samplersList.add(sampler); 380 } 381 finally { 382 samplerLock.unlock(); 383 } 384 } 385 386 class SamplersRunnable implements Runnable { 387 388 @Override 389 public void run() { 390 samplerLock.lock(); 391 try { 392 for (Sampler sampler : samplersList) { 393 sampler.sample(); 394 } 395 } 396 finally { 397 samplerLock.unlock(); 398 } 399 } 400 } 401 402 @Override 403 public Map<String, Map<String, ?>> getSnapshot() { 404 return all; 405 } 406 407 408 }