001 /*
002 * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
003 *
004 * Cloudera, Inc. licenses this file to you under the Apache License,
005 * Version 2.0 (the "License"). You may not use this file except in
006 * compliance with the License. You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
011 * CONDITIONS OF ANY KIND, either express or implied. See the License for
012 * the specific language governing permissions and limitations under the
013 * License.
014 */
015 package com.cloudera.lib.service.hadoop;
016
017 import com.cloudera.lib.server.BaseService;
018 import com.cloudera.lib.server.ServiceException;
019 import com.cloudera.lib.service.Hadoop;
020 import com.cloudera.lib.service.HadoopException;
021 import com.cloudera.lib.service.Instrumentation;
022 import com.cloudera.lib.util.Check;
023 import com.cloudera.lib.util.XConfiguration;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.mapred.JobClient;
027 import org.apache.hadoop.mapred.JobConf;
028 import org.apache.hadoop.security.UserGroupInformation;
029 import org.apache.hadoop.util.VersionInfo;
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 import java.io.IOException;
034 import java.net.URI;
035 import java.security.PrivilegedExceptionAction;
036 import java.util.Collection;
037 import java.util.HashSet;
038 import java.util.Map;
039 import java.util.Set;
040 import java.util.concurrent.atomic.AtomicInteger;
041
042 public class HadoopService extends BaseService implements Hadoop {
043 private static final Logger LOG = LoggerFactory.getLogger(HadoopService.class);
044
045 public static final String PREFIX = "hadoop";
046
047 private static final String INSTRUMENTATION_GROUP = "hadoop";
048
049 public static final String AUTHENTICATION_TYPE = "authentication.type";
050 public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
051 public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
052
053 public static final String JOB_TRACKER_WHITELIST = "job.tracker.whitelist";
054 public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
055
056 private static final String HADOOP_CONF_PREFIX = "conf:";
057
058 private static final String JOB_TRACKER_PROPERTY = "mapred.job.tracker";
059 private static final String NAME_NODE_PROPERTY = "fs.default.name";
060
061 public HadoopService() {
062 super(PREFIX);
063 }
064
065 private Collection<String> jobTrackerWhitelist;
066 private Collection<String> nameNodeWhitelist;
067
068 XConfiguration serviceHadoopConf;
069
070 private AtomicInteger unmanagedFileSystems = new AtomicInteger();
071
072 @Override
073 protected void init() throws ServiceException {
074 LOG.info("Using Hadoop JARs version [{}]", VersionInfo.getVersion());
075 String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
076 if (security.equals("kerberos")) {
077 String defaultName = getServer().getName();
078 String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
079 keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
080 if (keytab.length() == 0) {
081 throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_KEYTAB);
082 }
083 String principal = defaultName + "/localhost@LOCALHOST";
084 principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
085 if (principal.length() == 0) {
086 throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_PRINCIPAL);
087 }
088 Configuration conf = new Configuration();
089 conf.set("hadoop.security.authentication", "kerberos");
090 UserGroupInformation.setConfiguration(conf);
091 try {
092 UserGroupInformation.loginUserFromKeytab(principal, keytab);
093 }
094 catch (IOException ex) {
095 throw new ServiceException(HadoopException.ERROR.H02, ex.getMessage(), ex);
096 }
097 LOG.info("Using Hadoop Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
098 }
099 else if (security.equals("simple")) {
100 Configuration conf = new Configuration();
101 conf.set("hadoop.security.authentication", "simple");
102 UserGroupInformation.setConfiguration(conf);
103 LOG.info("Using Hadoop simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
104 }
105 else {
106 throw new ServiceException(HadoopException.ERROR.H09, security);
107 }
108
109 serviceHadoopConf = new XConfiguration();
110 for (Map.Entry entry : getServiceConfig()) {
111 String name = (String) entry.getKey();
112 if (name.startsWith(HADOOP_CONF_PREFIX)) {
113 name = name.substring(HADOOP_CONF_PREFIX.length());
114 String value = (String) entry.getValue();
115 serviceHadoopConf.set(name, value);
116
117 }
118 }
119 setRequiredServiceHadoopConf(serviceHadoopConf);
120
121 LOG.debug("Hadoop default configuration:");
122 for (Map.Entry entry : serviceHadoopConf) {
123 LOG.debug(" {} = {}", entry.getKey(), entry.getValue());
124 }
125
126 jobTrackerWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(JOB_TRACKER_WHITELIST));
127 nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
128 }
129
130 @Override
131 public void postInit() throws ServiceException {
132 super.postInit();
133 Instrumentation instrumentation = getServer().get(Instrumentation.class);
134 instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
135 @Override
136 public Integer getValue() {
137 return unmanagedFileSystems.get();
138 }
139 });
140 instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
141 @Override
142 public Long getValue() {
143 return (long) unmanagedFileSystems.get();
144 }
145 });
146 }
147
148 private Set<String> toLowerCase(Collection<String> collection) {
149 Set<String> set = new HashSet<String>();
150 for (String value : collection) {
151 set.add(value.toLowerCase());
152 }
153 return set;
154 }
155
156 @Override
157 public Class getInterface() {
158 return Hadoop.class;
159 }
160
161 @Override
162 public Class[] getServiceDependencies() {
163 return new Class[]{Instrumentation.class};
164 }
165
166 protected UserGroupInformation getUGI(String user) throws IOException {
167 return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
168 }
169
170 protected void setRequiredServiceHadoopConf(Configuration conf) {
171 conf.set("fs.hdfs.impl.disable.cache", "true");
172 }
173
174 protected JobConf createHadoopConf(Configuration conf) {
175 JobConf hadoopConf = new JobConf();
176 XConfiguration.copy(serviceHadoopConf, hadoopConf);
177 XConfiguration.copy(conf, hadoopConf);
178 return hadoopConf;
179 }
180
181 protected JobConf createJobTrackerConf(Configuration conf) {
182 return createHadoopConf(conf);
183 }
184
185 protected Configuration createNameNodeConf(Configuration conf) {
186 return createHadoopConf(conf);
187 }
188
189 protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException {
190 return FileSystem.get(namenodeConf);
191 }
192
193 protected void closeFileSystem(FileSystem fs) throws IOException {
194 fs.close();
195 }
196
197 protected JobClient createJobClient(JobConf jobtrackerConf) throws IOException {
198 return new JobClient(jobtrackerConf);
199 }
200
201 protected void closeJobClient(JobClient jobClient) throws IOException {
202 jobClient.close();
203 }
204
205 protected void validateJobtracker(String jobtracker) throws HadoopException {
206 if (jobTrackerWhitelist.size() > 0 && !jobTrackerWhitelist.contains("*")) {
207 if (!jobTrackerWhitelist.contains(jobtracker.toLowerCase())) {
208 throw new HadoopException(HadoopException.ERROR.H05, jobtracker, "not in whitelist");
209 }
210 }
211 }
212
213 protected void validateNamenode(String namenode) throws HadoopException {
214 if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
215 if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
216 throw new HadoopException(HadoopException.ERROR.H05, namenode, "not in whitelist");
217 }
218 }
219 }
220
221 protected void checkJobTrackerHealth(JobClient jobClient) throws HadoopException {
222 }
223
224 protected void checkNameNodeHealth(FileSystem fileSystem) throws HadoopException {
225 }
226
227 @Override
228 public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
229 throws HadoopException {
230 Check.notEmpty(user, "user");
231 Check.notNull(conf, "conf");
232 Check.notNull(executor, "executor");
233 if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) {
234 throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY);
235 }
236 try {
237 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
238 UserGroupInformation ugi = getUGI(user);
239 return ugi.doAs(new PrivilegedExceptionAction<T>() {
240 public T run() throws Exception {
241 Configuration namenodeConf = createNameNodeConf(conf);
242 FileSystem fs = createFileSystem(namenodeConf);
243 Instrumentation instrumentation = getServer().get(Instrumentation.class);
244 Instrumentation.Cron cron = instrumentation.createCron();
245 try {
246 checkNameNodeHealth(fs);
247 cron.start();
248 return executor.execute(fs);
249 }
250 finally {
251 cron.stop();
252 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
253 closeFileSystem(fs);
254 }
255 }
256 });
257 }
258 catch (HadoopException ex) {
259 throw ex;
260 }
261 catch (Exception ex) {
262 throw new HadoopException(HadoopException.ERROR.H03, ex);
263 }
264 }
265
266 @Override
267 public <T> T execute(String user, final Configuration conf, final JobClientExecutor<T> executor)
268 throws HadoopException {
269 Check.notEmpty(user, "user");
270 Check.notNull(conf, "conf");
271 Check.notNull(executor, "executor");
272 if (conf.get(JOB_TRACKER_PROPERTY) == null || conf.getTrimmed(JOB_TRACKER_PROPERTY).length() == 0) {
273 throw new HadoopException(HadoopException.ERROR.H06, JOB_TRACKER_PROPERTY);
274 }
275 if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) {
276 throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY);
277 }
278 try {
279 validateJobtracker(new URI(conf.get(JOB_TRACKER_PROPERTY)).getAuthority());
280 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
281 UserGroupInformation ugi = getUGI(user);
282 return ugi.doAs(new PrivilegedExceptionAction<T>() {
283 public T run() throws Exception {
284 JobConf jobtrackerConf = createJobTrackerConf(conf);
285 Configuration namenodeConf = createNameNodeConf(conf);
286 JobClient jobClient = createJobClient(jobtrackerConf);
287 try {
288 checkJobTrackerHealth(jobClient);
289 FileSystem fs = createFileSystem(namenodeConf);
290 Instrumentation instrumentation = getServer().get(Instrumentation.class);
291 Instrumentation.Cron cron = instrumentation.createCron();
292 try {
293 checkNameNodeHealth(fs);
294 cron.start();
295 return executor.execute(jobClient, fs);
296 }
297 finally {
298 cron.stop();
299 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
300 closeFileSystem(fs);
301 }
302 }
303 finally {
304 closeJobClient(jobClient);
305 }
306 }
307 });
308 }
309 catch (HadoopException ex) {
310 throw ex;
311 }
312 catch (Exception ex) {
313 throw new HadoopException(HadoopException.ERROR.H04, ex);
314 }
315 }
316
317 public FileSystem createFileSystemInternal(String user, final Configuration conf)
318 throws IOException, HadoopException {
319 Check.notEmpty(user, "user");
320 Check.notNull(conf, "conf");
321 try {
322 validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
323 UserGroupInformation ugi = getUGI(user);
324 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
325 public FileSystem run() throws Exception {
326 Configuration namenodeConf = createNameNodeConf(conf);
327 return createFileSystem(namenodeConf);
328 }
329 });
330 }
331 catch (IOException ex) {
332 throw ex;
333 }
334 catch (HadoopException ex) {
335 throw ex;
336 }
337 catch (Exception ex) {
338 throw new HadoopException(HadoopException.ERROR.H08, ex.getMessage(), ex);
339 }
340 }
341
342 @Override
343 public FileSystem createFileSystem(String user, final Configuration conf) throws IOException, HadoopException {
344 unmanagedFileSystems.incrementAndGet();
345 return createFileSystemInternal(user, conf);
346 }
347
348 @Override
349 public void releaseFileSystem(FileSystem fs) throws IOException {
350 unmanagedFileSystems.decrementAndGet();
351 closeFileSystem(fs);
352 }
353
354
355 @Override
356 public Configuration getDefaultConfiguration() {
357 Configuration conf = new XConfiguration();
358 XConfiguration.copy(serviceHadoopConf, conf);
359 return conf;
360 }
361
362 }