001 /* 002 * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. 003 * 004 * Cloudera, Inc. licenses this file to you under the Apache License, 005 * Version 2.0 (the "License"). You may not use this file except in 006 * compliance with the License. You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 011 * CONDITIONS OF ANY KIND, either express or implied. See the License for 012 * the specific language governing permissions and limitations under the 013 * License. 014 */ 015 package com.cloudera.lib.service.hadoop; 016 017 import com.cloudera.lib.server.BaseService; 018 import com.cloudera.lib.server.ServiceException; 019 import com.cloudera.lib.service.Hadoop; 020 import com.cloudera.lib.service.HadoopException; 021 import com.cloudera.lib.service.Instrumentation; 022 import com.cloudera.lib.util.Check; 023 import com.cloudera.lib.util.XConfiguration; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.mapred.JobClient; 027 import org.apache.hadoop.mapred.JobConf; 028 import org.apache.hadoop.security.UserGroupInformation; 029 import org.apache.hadoop.util.VersionInfo; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 import java.io.IOException; 034 import java.net.URI; 035 import java.security.PrivilegedExceptionAction; 036 import java.util.Collection; 037 import java.util.HashSet; 038 import java.util.Map; 039 import java.util.Set; 040 import java.util.concurrent.atomic.AtomicInteger; 041 042 public class HadoopService extends BaseService implements Hadoop { 043 private static final Logger LOG = LoggerFactory.getLogger(HadoopService.class); 044 045 public static final String PREFIX = "hadoop"; 046 047 private static final String INSTRUMENTATION_GROUP = "hadoop"; 048 049 public static final String AUTHENTICATION_TYPE = "authentication.type"; 050 public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab"; 051 public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal"; 052 053 public static final String JOB_TRACKER_WHITELIST = "job.tracker.whitelist"; 054 public static final String NAME_NODE_WHITELIST = "name.node.whitelist"; 055 056 private static final String HADOOP_CONF_PREFIX = "conf:"; 057 058 private static final String JOB_TRACKER_PROPERTY = "mapred.job.tracker"; 059 private static final String NAME_NODE_PROPERTY = "fs.default.name"; 060 061 public HadoopService() { 062 super(PREFIX); 063 } 064 065 private Collection<String> jobTrackerWhitelist; 066 private Collection<String> nameNodeWhitelist; 067 068 XConfiguration serviceHadoopConf; 069 070 private AtomicInteger unmanagedFileSystems = new AtomicInteger(); 071 072 @Override 073 protected void init() throws ServiceException { 074 LOG.info("Using Hadoop JARs version [{}]", VersionInfo.getVersion()); 075 String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim(); 076 if (security.equals("kerberos")) { 077 String defaultName = getServer().getName(); 078 String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab"; 079 keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim(); 080 if (keytab.length() == 0) { 081 throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_KEYTAB); 082 } 083 String principal = defaultName + "/localhost@LOCALHOST"; 084 principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim(); 085 if (principal.length() == 0) { 086 throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_PRINCIPAL); 087 } 088 Configuration conf = new Configuration(); 089 conf.set("hadoop.security.authentication", "kerberos"); 090 UserGroupInformation.setConfiguration(conf); 091 try { 092 UserGroupInformation.loginUserFromKeytab(principal, keytab); 093 } 094 catch (IOException ex) { 095 throw new ServiceException(HadoopException.ERROR.H02, ex.getMessage(), ex); 096 } 097 LOG.info("Using Hadoop Kerberos authentication, principal [{}] keytab [{}]", principal, keytab); 098 } 099 else if (security.equals("simple")) { 100 Configuration conf = new Configuration(); 101 conf.set("hadoop.security.authentication", "simple"); 102 UserGroupInformation.setConfiguration(conf); 103 LOG.info("Using Hadoop simple/pseudo authentication, principal [{}]", System.getProperty("user.name")); 104 } 105 else { 106 throw new ServiceException(HadoopException.ERROR.H09, security); 107 } 108 109 serviceHadoopConf = new XConfiguration(); 110 for (Map.Entry entry : getServiceConfig()) { 111 String name = (String) entry.getKey(); 112 if (name.startsWith(HADOOP_CONF_PREFIX)) { 113 name = name.substring(HADOOP_CONF_PREFIX.length()); 114 String value = (String) entry.getValue(); 115 serviceHadoopConf.set(name, value); 116 117 } 118 } 119 setRequiredServiceHadoopConf(serviceHadoopConf); 120 121 LOG.debug("Hadoop default configuration:"); 122 for (Map.Entry entry : serviceHadoopConf) { 123 LOG.debug(" {} = {}", entry.getKey(), entry.getValue()); 124 } 125 126 jobTrackerWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(JOB_TRACKER_WHITELIST)); 127 nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST)); 128 } 129 130 @Override 131 public void postInit() throws ServiceException { 132 super.postInit(); 133 Instrumentation instrumentation = getServer().get(Instrumentation.class); 134 instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() { 135 @Override 136 public Integer getValue() { 137 return unmanagedFileSystems.get(); 138 } 139 }); 140 instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() { 141 @Override 142 public Long getValue() { 143 return (long) unmanagedFileSystems.get(); 144 } 145 }); 146 } 147 148 private Set<String> toLowerCase(Collection<String> collection) { 149 Set<String> set = new HashSet<String>(); 150 for (String value : collection) { 151 set.add(value.toLowerCase()); 152 } 153 return set; 154 } 155 156 @Override 157 public Class getInterface() { 158 return Hadoop.class; 159 } 160 161 @Override 162 public Class[] getServiceDependencies() { 163 return new Class[]{Instrumentation.class}; 164 } 165 166 protected UserGroupInformation getUGI(String user) throws IOException { 167 return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 168 } 169 170 protected void setRequiredServiceHadoopConf(Configuration conf) { 171 conf.set("fs.hdfs.impl.disable.cache", "true"); 172 } 173 174 protected JobConf createHadoopConf(Configuration conf) { 175 JobConf hadoopConf = new JobConf(); 176 XConfiguration.copy(serviceHadoopConf, hadoopConf); 177 XConfiguration.copy(conf, hadoopConf); 178 return hadoopConf; 179 } 180 181 protected JobConf createJobTrackerConf(Configuration conf) { 182 return createHadoopConf(conf); 183 } 184 185 protected Configuration createNameNodeConf(Configuration conf) { 186 return createHadoopConf(conf); 187 } 188 189 protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException { 190 return FileSystem.get(namenodeConf); 191 } 192 193 protected void closeFileSystem(FileSystem fs) throws IOException { 194 fs.close(); 195 } 196 197 protected JobClient createJobClient(JobConf jobtrackerConf) throws IOException { 198 return new JobClient(jobtrackerConf); 199 } 200 201 protected void closeJobClient(JobClient jobClient) throws IOException { 202 jobClient.close(); 203 } 204 205 protected void validateJobtracker(String jobtracker) throws HadoopException { 206 if (jobTrackerWhitelist.size() > 0 && !jobTrackerWhitelist.contains("*")) { 207 if (!jobTrackerWhitelist.contains(jobtracker.toLowerCase())) { 208 throw new HadoopException(HadoopException.ERROR.H05, jobtracker, "not in whitelist"); 209 } 210 } 211 } 212 213 protected void validateNamenode(String namenode) throws HadoopException { 214 if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) { 215 if (!nameNodeWhitelist.contains(namenode.toLowerCase())) { 216 throw new HadoopException(HadoopException.ERROR.H05, namenode, "not in whitelist"); 217 } 218 } 219 } 220 221 protected void checkJobTrackerHealth(JobClient jobClient) throws HadoopException { 222 } 223 224 protected void checkNameNodeHealth(FileSystem fileSystem) throws HadoopException { 225 } 226 227 @Override 228 public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor) 229 throws HadoopException { 230 Check.notEmpty(user, "user"); 231 Check.notNull(conf, "conf"); 232 Check.notNull(executor, "executor"); 233 if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) { 234 throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY); 235 } 236 try { 237 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority()); 238 UserGroupInformation ugi = getUGI(user); 239 return ugi.doAs(new PrivilegedExceptionAction<T>() { 240 public T run() throws Exception { 241 Configuration namenodeConf = createNameNodeConf(conf); 242 FileSystem fs = createFileSystem(namenodeConf); 243 Instrumentation instrumentation = getServer().get(Instrumentation.class); 244 Instrumentation.Cron cron = instrumentation.createCron(); 245 try { 246 checkNameNodeHealth(fs); 247 cron.start(); 248 return executor.execute(fs); 249 } 250 finally { 251 cron.stop(); 252 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron); 253 closeFileSystem(fs); 254 } 255 } 256 }); 257 } 258 catch (HadoopException ex) { 259 throw ex; 260 } 261 catch (Exception ex) { 262 throw new HadoopException(HadoopException.ERROR.H03, ex); 263 } 264 } 265 266 @Override 267 public <T> T execute(String user, final Configuration conf, final JobClientExecutor<T> executor) 268 throws HadoopException { 269 Check.notEmpty(user, "user"); 270 Check.notNull(conf, "conf"); 271 Check.notNull(executor, "executor"); 272 if (conf.get(JOB_TRACKER_PROPERTY) == null || conf.getTrimmed(JOB_TRACKER_PROPERTY).length() == 0) { 273 throw new HadoopException(HadoopException.ERROR.H06, JOB_TRACKER_PROPERTY); 274 } 275 if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) { 276 throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY); 277 } 278 try { 279 validateJobtracker(new URI(conf.get(JOB_TRACKER_PROPERTY)).getAuthority()); 280 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority()); 281 UserGroupInformation ugi = getUGI(user); 282 return ugi.doAs(new PrivilegedExceptionAction<T>() { 283 public T run() throws Exception { 284 JobConf jobtrackerConf = createJobTrackerConf(conf); 285 Configuration namenodeConf = createNameNodeConf(conf); 286 JobClient jobClient = createJobClient(jobtrackerConf); 287 try { 288 checkJobTrackerHealth(jobClient); 289 FileSystem fs = createFileSystem(namenodeConf); 290 Instrumentation instrumentation = getServer().get(Instrumentation.class); 291 Instrumentation.Cron cron = instrumentation.createCron(); 292 try { 293 checkNameNodeHealth(fs); 294 cron.start(); 295 return executor.execute(jobClient, fs); 296 } 297 finally { 298 cron.stop(); 299 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron); 300 closeFileSystem(fs); 301 } 302 } 303 finally { 304 closeJobClient(jobClient); 305 } 306 } 307 }); 308 } 309 catch (HadoopException ex) { 310 throw ex; 311 } 312 catch (Exception ex) { 313 throw new HadoopException(HadoopException.ERROR.H04, ex); 314 } 315 } 316 317 public FileSystem createFileSystemInternal(String user, final Configuration conf) 318 throws IOException, HadoopException { 319 Check.notEmpty(user, "user"); 320 Check.notNull(conf, "conf"); 321 try { 322 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority()); 323 UserGroupInformation ugi = getUGI(user); 324 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 325 public FileSystem run() throws Exception { 326 Configuration namenodeConf = createNameNodeConf(conf); 327 return createFileSystem(namenodeConf); 328 } 329 }); 330 } 331 catch (IOException ex) { 332 throw ex; 333 } 334 catch (HadoopException ex) { 335 throw ex; 336 } 337 catch (Exception ex) { 338 throw new HadoopException(HadoopException.ERROR.H08, ex.getMessage(), ex); 339 } 340 } 341 342 @Override 343 public FileSystem createFileSystem(String user, final Configuration conf) throws IOException, HadoopException { 344 unmanagedFileSystems.incrementAndGet(); 345 return createFileSystemInternal(user, conf); 346 } 347 348 @Override 349 public void releaseFileSystem(FileSystem fs) throws IOException { 350 unmanagedFileSystems.decrementAndGet(); 351 closeFileSystem(fs); 352 } 353 354 355 @Override 356 public Configuration getDefaultConfiguration() { 357 Configuration conf = new XConfiguration(); 358 XConfiguration.copy(serviceHadoopConf, conf); 359 return conf; 360 } 361 362 }