001    /*
002     * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
003     *
004     * Cloudera, Inc. licenses this file to you under the Apache License,
005     * Version 2.0 (the "License"). You may not use this file except in
006     * compliance with the License. You may obtain a copy of the License at
007     *
008     *     http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
011     * CONDITIONS OF ANY KIND, either express or implied. See the License for
012     * the specific language governing permissions and limitations under the
013     * License.
014     */
015    package com.cloudera.lib.service.hadoop;
016    
017    import com.cloudera.lib.server.BaseService;
018    import com.cloudera.lib.server.ServiceException;
019    import com.cloudera.lib.service.Hadoop;
020    import com.cloudera.lib.service.HadoopException;
021    import com.cloudera.lib.service.Instrumentation;
022    import com.cloudera.lib.util.Check;
023    import com.cloudera.lib.util.XConfiguration;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.mapred.JobClient;
027    import org.apache.hadoop.mapred.JobConf;
028    import org.apache.hadoop.security.UserGroupInformation;
029    import org.apache.hadoop.util.VersionInfo;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    import java.io.IOException;
034    import java.net.URI;
035    import java.security.PrivilegedExceptionAction;
036    import java.util.Collection;
037    import java.util.HashSet;
038    import java.util.Map;
039    import java.util.Set;
040    import java.util.concurrent.atomic.AtomicInteger;
041    
042    public class HadoopService extends BaseService implements Hadoop {
043      private static final Logger LOG = LoggerFactory.getLogger(HadoopService.class);
044    
045      public static final String PREFIX = "hadoop";
046    
047      private static final String INSTRUMENTATION_GROUP = "hadoop";
048    
049      public static final String AUTHENTICATION_TYPE = "authentication.type";
050      public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
051      public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
052    
053      public static final String JOB_TRACKER_WHITELIST = "job.tracker.whitelist";
054      public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
055    
056      private static final String HADOOP_CONF_PREFIX = "conf:";
057    
058      private static final String JOB_TRACKER_PROPERTY = "mapred.job.tracker";
059      private static final String NAME_NODE_PROPERTY = "fs.default.name";
060    
061      public HadoopService() {
062        super(PREFIX);
063      }
064    
065      private Collection<String> jobTrackerWhitelist;
066      private Collection<String> nameNodeWhitelist;
067    
068      XConfiguration serviceHadoopConf;
069    
070      private AtomicInteger unmanagedFileSystems = new AtomicInteger();
071    
072      @Override
073      protected void init() throws ServiceException {
074        LOG.info("Using Hadoop JARs version [{}]", VersionInfo.getVersion());
075        String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
076        if (security.equals("kerberos")) {
077          String defaultName = getServer().getName();
078          String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
079          keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
080          if (keytab.length() == 0) {
081            throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_KEYTAB);
082          }
083          String principal = defaultName + "/localhost@LOCALHOST";
084          principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
085          if (principal.length() == 0) {
086            throw new ServiceException(HadoopException.ERROR.H01, KERBEROS_PRINCIPAL);
087          }
088          Configuration conf = new Configuration();
089          conf.set("hadoop.security.authentication", "kerberos");
090          UserGroupInformation.setConfiguration(conf);
091          try {
092            UserGroupInformation.loginUserFromKeytab(principal, keytab);
093          }
094          catch (IOException ex) {
095            throw new ServiceException(HadoopException.ERROR.H02, ex.getMessage(), ex);
096          }
097          LOG.info("Using Hadoop Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
098        }
099        else if (security.equals("simple")) {
100          Configuration conf = new Configuration();
101          conf.set("hadoop.security.authentication", "simple");
102          UserGroupInformation.setConfiguration(conf);
103          LOG.info("Using Hadoop simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
104        }
105        else {
106          throw new ServiceException(HadoopException.ERROR.H09, security);
107        }
108    
109        serviceHadoopConf = new XConfiguration();
110        for (Map.Entry entry : getServiceConfig()) {
111          String name = (String) entry.getKey();
112          if (name.startsWith(HADOOP_CONF_PREFIX)) {
113            name = name.substring(HADOOP_CONF_PREFIX.length());
114            String value = (String) entry.getValue();
115            serviceHadoopConf.set(name, value);
116    
117          }
118        }
119        setRequiredServiceHadoopConf(serviceHadoopConf);
120    
121        LOG.debug("Hadoop default configuration:");
122        for (Map.Entry entry : serviceHadoopConf) {
123          LOG.debug("  {} = {}", entry.getKey(), entry.getValue());
124        }
125    
126        jobTrackerWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(JOB_TRACKER_WHITELIST));
127        nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
128      }
129    
130      @Override
131      public void postInit() throws ServiceException {
132        super.postInit();
133        Instrumentation instrumentation = getServer().get(Instrumentation.class);
134        instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
135          @Override
136          public Integer getValue() {
137            return unmanagedFileSystems.get();
138          }
139        });
140        instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
141          @Override
142          public Long getValue() {
143            return (long) unmanagedFileSystems.get();
144          }
145        });
146      }
147    
148      private Set<String> toLowerCase(Collection<String> collection) {
149        Set<String> set = new HashSet<String>();
150        for (String value : collection) {
151          set.add(value.toLowerCase());
152        }
153        return set;
154      }
155    
156      @Override
157      public Class getInterface() {
158        return Hadoop.class;
159      }
160    
161      @Override
162      public Class[] getServiceDependencies() {
163        return new Class[]{Instrumentation.class};
164      }
165    
166      protected UserGroupInformation getUGI(String user) throws IOException {
167        return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
168      }
169    
170      protected void setRequiredServiceHadoopConf(Configuration conf) {
171        conf.set("fs.hdfs.impl.disable.cache", "true");
172      }
173    
174      protected JobConf createHadoopConf(Configuration conf) {
175        JobConf hadoopConf = new JobConf();
176        XConfiguration.copy(serviceHadoopConf, hadoopConf);
177        XConfiguration.copy(conf, hadoopConf);
178        return hadoopConf;
179      }
180    
181      protected JobConf createJobTrackerConf(Configuration conf) {
182        return createHadoopConf(conf);
183      }
184    
185      protected Configuration createNameNodeConf(Configuration conf) {
186        return createHadoopConf(conf);
187      }
188    
189      protected FileSystem createFileSystem(Configuration namenodeConf) throws IOException {
190        return FileSystem.get(namenodeConf);
191      }
192    
193      protected void closeFileSystem(FileSystem fs) throws IOException {
194        fs.close();
195      }
196    
197      protected JobClient createJobClient(JobConf jobtrackerConf) throws IOException {
198        return new JobClient(jobtrackerConf);
199      }
200    
201      protected void closeJobClient(JobClient jobClient) throws IOException {
202        jobClient.close();
203      }
204    
205      protected void validateJobtracker(String jobtracker) throws HadoopException {
206        if (jobTrackerWhitelist.size() > 0 && !jobTrackerWhitelist.contains("*")) {
207          if (!jobTrackerWhitelist.contains(jobtracker.toLowerCase())) {
208            throw new HadoopException(HadoopException.ERROR.H05, jobtracker, "not in whitelist");
209          }
210        }
211      }
212    
213      protected void validateNamenode(String namenode) throws HadoopException {
214        if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
215          if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
216            throw new HadoopException(HadoopException.ERROR.H05, namenode, "not in whitelist");
217          }
218        }
219      }
220    
221      protected void checkJobTrackerHealth(JobClient jobClient) throws HadoopException {
222      }
223    
224      protected void checkNameNodeHealth(FileSystem fileSystem) throws HadoopException {
225      }
226    
227      @Override
228      public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
229        throws HadoopException {
230        Check.notEmpty(user, "user");
231        Check.notNull(conf, "conf");
232        Check.notNull(executor, "executor");
233        if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) {
234          throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY);
235        }
236        try {
237          validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
238          UserGroupInformation ugi = getUGI(user);
239          return ugi.doAs(new PrivilegedExceptionAction<T>() {
240            public T run() throws Exception {
241              Configuration namenodeConf = createNameNodeConf(conf);
242              FileSystem fs = createFileSystem(namenodeConf);
243              Instrumentation instrumentation = getServer().get(Instrumentation.class);
244              Instrumentation.Cron cron = instrumentation.createCron();
245              try {
246                checkNameNodeHealth(fs);
247                cron.start();
248                return executor.execute(fs);
249              }
250              finally {
251                cron.stop();
252                instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
253                closeFileSystem(fs);
254              }
255            }
256          });
257        }
258        catch (HadoopException ex) {
259          throw ex;
260        }
261        catch (Exception ex) {
262          throw new HadoopException(HadoopException.ERROR.H03, ex);
263        }
264      }
265    
266      @Override
267      public <T> T execute(String user, final Configuration conf, final JobClientExecutor<T> executor)
268        throws HadoopException {
269        Check.notEmpty(user, "user");
270        Check.notNull(conf, "conf");
271        Check.notNull(executor, "executor");
272        if (conf.get(JOB_TRACKER_PROPERTY) == null || conf.getTrimmed(JOB_TRACKER_PROPERTY).length() == 0) {
273          throw new HadoopException(HadoopException.ERROR.H06, JOB_TRACKER_PROPERTY);
274        }
275        if (conf.get(NAME_NODE_PROPERTY) == null || conf.getTrimmed(NAME_NODE_PROPERTY).length() == 0) {
276          throw new HadoopException(HadoopException.ERROR.H06, NAME_NODE_PROPERTY);
277        }
278        try {
279          validateJobtracker(new URI(conf.get(JOB_TRACKER_PROPERTY)).getAuthority());
280          validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
281          UserGroupInformation ugi = getUGI(user);
282          return ugi.doAs(new PrivilegedExceptionAction<T>() {
283            public T run() throws Exception {
284              JobConf jobtrackerConf = createJobTrackerConf(conf);
285              Configuration namenodeConf = createNameNodeConf(conf);
286              JobClient jobClient = createJobClient(jobtrackerConf);
287              try {
288                checkJobTrackerHealth(jobClient);
289                FileSystem fs = createFileSystem(namenodeConf);
290                Instrumentation instrumentation = getServer().get(Instrumentation.class);
291                Instrumentation.Cron cron = instrumentation.createCron();
292                try {
293                  checkNameNodeHealth(fs);
294                  cron.start();
295                  return executor.execute(jobClient, fs);
296                }
297                finally {
298                  cron.stop();
299                  instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
300                  closeFileSystem(fs);
301                }
302              }
303              finally {
304                closeJobClient(jobClient);
305              }
306            }
307          });
308        }
309        catch (HadoopException ex) {
310          throw ex;
311        }
312        catch (Exception ex) {
313          throw new HadoopException(HadoopException.ERROR.H04, ex);
314        }
315      }
316    
317      public FileSystem createFileSystemInternal(String user, final Configuration conf)
318        throws IOException, HadoopException {
319        Check.notEmpty(user, "user");
320        Check.notNull(conf, "conf");
321        try {
322          validateNamenode(new URI(conf.get(NAME_NODE_PROPERTY)).getAuthority());
323          UserGroupInformation ugi = getUGI(user);
324          return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
325            public FileSystem run() throws Exception {
326              Configuration namenodeConf = createNameNodeConf(conf);
327              return createFileSystem(namenodeConf);
328            }
329          });
330        }
331        catch (IOException ex) {
332          throw ex;
333        }
334        catch (HadoopException ex) {
335          throw ex;
336        }
337        catch (Exception ex) {
338          throw new HadoopException(HadoopException.ERROR.H08, ex.getMessage(), ex);
339        }
340      }
341    
342      @Override
343      public FileSystem createFileSystem(String user, final Configuration conf) throws IOException, HadoopException {
344        unmanagedFileSystems.incrementAndGet();
345        return createFileSystemInternal(user, conf);
346      }
347    
348      @Override
349      public void releaseFileSystem(FileSystem fs) throws IOException {
350        unmanagedFileSystems.decrementAndGet();
351        closeFileSystem(fs);
352      }
353    
354    
355      @Override
356      public Configuration getDefaultConfiguration() {
357        Configuration conf = new XConfiguration();
358        XConfiguration.copy(serviceHadoopConf, conf);
359        return conf;
360      }
361    
362    }