Package cm_api :: Package endpoints :: Module services
[hide private]
[frames] | no frames]

Source Code for Module cm_api.endpoints.services

   1  # Licensed to Cloudera, Inc. under one 
   2  # or more contributor license agreements.  See the NOTICE file 
   3  # distributed with this work for additional information 
   4  # regarding copyright ownership.  Cloudera, Inc. licenses this file 
   5  # to you under the Apache License, Version 2.0 (the 
   6  # "License"); you may not use this file except in compliance 
   7  # with the License.  You may obtain a copy of the License at 
   8  # 
   9  #     http://www.apache.org/licenses/LICENSE-2.0 
  10  # 
  11  # Unless required by applicable law or agreed to in writing, software 
  12  # distributed under the License is distributed on an "AS IS" BASIS, 
  13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  14  # See the License for the specific language governing permissions and 
  15  # limitations under the License. 
  16   
  17  try: 
  18    import json 
  19  except ImportError: 
  20    import simplejson as json 
  21  import logging 
  22   
  23  from cm_api.endpoints.types import * 
  24  from cm_api.endpoints import roles, role_config_groups 
  25   
  26  __docformat__ = "epytext" 
  27   
  28  SERVICES_PATH = "/clusters/%s/services" 
  29  SERVICE_PATH = "/clusters/%s/services/%s" 
  30  ROLETYPES_CFG_KEY = 'roleTypeConfigs' 
  31   
  32  LOG = logging.getLogger(__name__) 
  33   
  34   
35 -def create_service(resource_root, name, service_type, 36 cluster_name="default"):
37 """ 38 Create a service 39 @param resource_root: The root Resource object. 40 @param name: Service name 41 @param service_type: Service type 42 @param cluster_name: Cluster name 43 @return: An ApiService object 44 """ 45 apiservice = ApiService(resource_root, name, service_type) 46 apiservice_list = ApiList([apiservice]) 47 body = json.dumps(apiservice_list.to_json_dict()) 48 resp = resource_root.post(SERVICES_PATH % (cluster_name,), data=body) 49 # The server returns a list of created services (with size 1) 50 return ApiList.from_json_dict(ApiService, resp, resource_root)[0]
51
52 -def get_service(resource_root, name, cluster_name="default"):
53 """ 54 Lookup a service by name 55 @param resource_root: The root Resource object. 56 @param name: Service name 57 @param cluster_name: Cluster name 58 @return: An ApiService object 59 """ 60 return _get_service(resource_root, "%s/%s" % (SERVICES_PATH % (cluster_name,), name))
61
62 -def _get_service(resource_root, path):
63 dic = resource_root.get(path) 64 return ApiService.from_json_dict(dic, resource_root)
65
66 -def get_all_services(resource_root, cluster_name="default", view=None):
67 """ 68 Get all services 69 @param resource_root: The root Resource object. 70 @param cluster_name: Cluster name 71 @return: A list of ApiService objects. 72 """ 73 dic = resource_root.get(SERVICES_PATH % (cluster_name,), 74 params=view and dict(view=view) or None) 75 return ApiList.from_json_dict(ApiService, dic, resource_root)
76
77 -def delete_service(resource_root, name, cluster_name="default"):
78 """ 79 Delete a service by name 80 @param resource_root: The root Resource object. 81 @param name: Service name 82 @param cluster_name: Cluster name 83 @return: The deleted ApiService object 84 """ 85 resp = resource_root.delete("%s/%s" % (SERVICES_PATH % (cluster_name,), name)) 86 return ApiService.from_json_dict(resp, resource_root)
87 88
89 -class ApiService(BaseApiObject):
90 _ATTRIBUTES = { 91 'name' : None, 92 'type' : None, 93 'displayName' : None, 94 'serviceState' : ROAttr(), 95 'healthSummary' : ROAttr(), 96 'healthChecks' : ROAttr(), 97 'clusterRef' : ROAttr(ApiClusterRef), 98 'configStale' : ROAttr(), 99 'serviceUrl' : ROAttr(), 100 'maintenanceMode' : ROAttr(), 101 'maintenanceOwners' : ROAttr(), 102 } 103
104 - def __init__(self, resource_root, name=None, type=None):
105 BaseApiObject.init(self, resource_root, locals())
106
107 - def __str__(self):
108 return "<ApiService>: %s (cluster: %s)" % ( 109 self.name, self._get_cluster_name())
110
111 - def _get_cluster_name(self):
112 if hasattr(self, 'clusterRef') and self.clusterRef: 113 return self.clusterRef.clusterName 114 return None
115
116 - def _path(self):
117 """ 118 Return the API path for this service. 119 120 This method assumes that lack of a cluster reference means that the 121 object refers to the Cloudera Management Services instance. 122 """ 123 if self._get_cluster_name(): 124 return SERVICE_PATH % (self._get_cluster_name(), self.name) 125 else: 126 return '/cm/service'
127
128 - def _cmd(self, cmd, data=None, params=None):
129 path = self._path() + '/commands/' + cmd 130 resp = self._get_resource_root().post(path, data=data, params=params) 131 return ApiCommand.from_json_dict(resp, self._get_resource_root())
132
133 - def _role_cmd(self, cmd, roles):
134 path = self._path() + '/roleCommands/' + cmd 135 data = json.dumps({ ApiList.LIST_KEY : roles }) 136 resp = self._get_resource_root().post(path, data = data) 137 return ApiList.from_json_dict(ApiCommand, resp, self._get_resource_root())
138
139 - def _parse_svc_config(self, json_dic, view = None):
140 """ 141 Parse a json-decoded ApiServiceConfig dictionary into a 2-tuple. 142 143 @param json_dic: The json dictionary with the config data. 144 @param view: View to materialize. 145 @return: 2-tuple (service config dictionary, role type configurations) 146 """ 147 svc_config = json_to_config(json_dic, view == 'full') 148 rt_configs = { } 149 if json_dic.has_key(ROLETYPES_CFG_KEY): 150 for rt_config in json_dic[ROLETYPES_CFG_KEY]: 151 rt_configs[rt_config['roleType']] = \ 152 json_to_config(rt_config, view == 'full') 153 154 return (svc_config, rt_configs)
155
156 - def get_commands(self, view=None):
157 """ 158 Retrieve a list of running commands for this service. 159 160 @param view: View to materialize ('full' or 'summary') 161 @return: A list of running commands. 162 """ 163 resp = self._get_resource_root().get( 164 self._path() + '/commands', 165 params = view and dict(view=view) or None) 166 return ApiList.from_json_dict(ApiCommand, resp, self._get_resource_root())
167
168 - def get_running_activities(self):
169 path = self._path() + "/activities" 170 resp = self._get_resource_root().get(path) 171 return ApiList.from_json_dict(ApiActivity, resp, self._get_resource_root())
172
173 - def query_activities(self, query_str=None):
174 path = self._path() + "/activities" 175 params = { } 176 if query_str: 177 params['query'] = query_str 178 resp = self._get_resource_root().get(path, params=params) 179 return ApiList.from_json_dict(ApiActivity, resp, self._get_resource_root())
180
181 - def get_activity(self, job_id):
182 path = self._path() + "/activities/%s" % (job_id,) 183 resp = self._get_resource_root().get(path) 184 return ApiActivity.from_json_dict(resp, self._get_resource_root())
185
186 - def get_impala_queries(self, start_time, end_time, filter_str="", limit=100,\ 187 offset=0):
188 """ 189 Returns a list of queries that satisfy the filter 190 191 @type start_time: datetime.datetime 192 @param start_time: Queries must have ended after this time 193 @type end_time: datetime.datetime 194 @param end_time: Queries must have started before this time 195 @param filter: A filter to apply to the queries. For example: 196 'user = root and queryDuration > 5s' 197 @param limit: The maximum number of results to return 198 @param offset: The offset into the return list 199 """ 200 path = self._path() + "/impalaQueries" 201 resp = self._get_resource_root().get(path, \ 202 params = {'from':start_time.isoformat(),'to':end_time.isoformat(),\ 203 'filter':filter_str, 'limit':limit,'offset':offset}) 204 return ApiImpalaQueryResponse.from_json_dict(resp, \ 205 self._get_resource_root())
206
207 - def cancel_impala_query(self, query_id):
208 """ 209 Cancel the query. 210 211 @return The warning message, if any. 212 """ 213 path = self._path() + "/impalaQueries/%s" % (query_id) + "/cancel" 214 return ApiImpalaCancelResponse.from_json_dict( \ 215 self._get_resource_root().post(path), \ 216 self._get_resource_root())
217
218 - def get_query_details(self, query_id, format='text'):
219 """ 220 Get the query details 221 222 @param profile_format: The format of the response ('text' or 'thrift_encoded') 223 @return The details text 224 """ 225 path = self._path() + "/impalaQueries/%s" % (query_id) 226 return ApiImpalaQueryDetailsResponse.from_json_dict( \ 227 self._get_resource_root().get(path, params=dict(format=format)), \ 228 self._get_resource_root())
229
230 - def get_config(self, view = None):
231 """ 232 Retrieve the service's configuration. 233 234 Retrieves both the service configuration and role type configuration 235 for each of the service's supported role types. The role type 236 configurations are returned as a dictionary, whose keys are the 237 role type name, and values are the respective configuration dictionaries. 238 239 The 'summary' view contains strings as the dictionary values. The full 240 view contains ApiConfig instances as the values. 241 242 @param view: View to materialize ('full' or 'summary') 243 @return 2-tuple (service config dictionary, role type configurations) 244 """ 245 path = self._path() + '/config' 246 resp = self._get_resource_root().get(path, 247 params = view and dict(view=view) or None) 248 return self._parse_svc_config(resp, view)
249
250 - def update_config(self, svc_config, **rt_configs):
251 """ 252 Update the service's configuration. 253 254 @param svc_config Dictionary with service configuration to update. 255 @param rt_configs Dict of role type configurations to update. 256 @return 2-tuple (service config dictionary, role type configurations) 257 """ 258 path = self._path() + '/config' 259 260 if svc_config: 261 data = config_to_api_list(svc_config) 262 else: 263 data = { } 264 if rt_configs: 265 rt_list = [ ] 266 for rt, cfg in rt_configs.iteritems(): 267 rt_data = config_to_api_list(cfg) 268 rt_data['roleType'] = rt 269 rt_list.append(rt_data) 270 data[ROLETYPES_CFG_KEY] = rt_list 271 272 resp = self._get_resource_root().put(path, data = json.dumps(data)) 273 return self._parse_svc_config(resp)
274
275 - def create_role(self, role_name, role_type, host_id):
276 """ 277 Create a role. 278 279 @param role_name: Role name 280 @param role_type: Role type 281 @param host_id: ID of the host to assign the role to 282 @return: An ApiRole object 283 """ 284 return roles.create_role(self._get_resource_root(), self.name, role_type, 285 role_name, host_id, self._get_cluster_name())
286
287 - def delete_role(self, name):
288 """ 289 Delete a role by name. 290 291 @param name Role name 292 @return The deleted ApiRole object 293 """ 294 return roles.delete_role(self._get_resource_root(), self.name, name, 295 self._get_cluster_name())
296
297 - def get_role(self, name):
298 """ 299 Lookup a role by name. 300 301 @param name: Role name 302 @return: An ApiRole object 303 """ 304 return roles.get_role(self._get_resource_root(), self.name, name, 305 self._get_cluster_name())
306
307 - def get_all_roles(self, view = None):
308 """ 309 Get all roles in the service. 310 311 @param view: View to materialize ('full' or 'summary') 312 @return: A list of ApiRole objects. 313 """ 314 return roles.get_all_roles(self._get_resource_root(), self.name, 315 self._get_cluster_name(), view)
316
317 - def get_roles_by_type(self, role_type, view = None):
318 """ 319 Get all roles of a certain type in a service. 320 321 @param role_type: Role type 322 @param view: View to materialize ('full' or 'summary') 323 @return: A list of ApiRole objects. 324 """ 325 return roles.get_roles_by_type(self._get_resource_root(), self.name, 326 role_type, self._get_cluster_name(), view)
327
328 - def get_role_types(self):
329 """ 330 Get a list of role types in a service. 331 332 @return: A list of role types (strings) 333 """ 334 resp = self._get_resource_root().get(self._path() + '/roleTypes') 335 return resp[ApiList.LIST_KEY]
336
337 - def get_all_role_config_groups(self):
338 """ 339 Get a list of role configuration groups in the service. 340 341 @return: A list of ApiRoleConfigGroup objects. 342 @since: API v3 343 """ 344 return role_config_groups.get_all_role_config_groups( 345 self._get_resource_root(), self.name, self._get_cluster_name())
346
347 - def get_role_config_group(self, name):
348 """ 349 Get a role configuration group in the service by name. 350 351 @param name: The name of the role config group. 352 @return: An ApiRoleConfigGroup object. 353 @since: API v3 354 """ 355 return role_config_groups.get_role_config_group( 356 self._get_resource_root(), self.name, name, self._get_cluster_name())
357
358 - def create_role_config_group(self, name, display_name, role_type):
359 """ 360 Create a role config group. 361 362 @param name: The name of the new group. 363 @param display_name: The display name of the new group. 364 @param role_type: The role type of the new group. 365 @return: New ApiRoleConfigGroup object. 366 @since: API v3 367 """ 368 return role_config_groups.create_role_config_group( 369 self._get_resource_root(), self.name, name, display_name, role_type, 370 self._get_cluster_name())
371
372 - def update_role_config_group(self, name, apigroup):
373 """ 374 Update a role config group. 375 376 @param name: Role config group name. 377 @param apigroup: The updated role config group. 378 @return: The updated ApiRoleConfigGroup object. 379 @since: API v3 380 """ 381 return role_config_groups.update_role_config_group( 382 self._get_resource_root(), self.name, name, apigroup, 383 self._get_cluster_name())
384
385 - def delete_role_config_group(self, name):
386 """ 387 Delete a role config group by name. 388 389 @param name: Role config group name. 390 @return: The deleted ApiRoleConfigGroup object. 391 @since: API v3 392 """ 393 return role_config_groups.delete_role_config_group( 394 self._get_resource_root(), self.name, name, self._get_cluster_name())
395
396 - def get_metrics(self, from_time=None, to_time=None, metrics=None, view=None):
397 """ 398 Retrieve metric readings for the service. 399 400 @param from_time: A datetime; start of the period to query (optional). 401 @param to_time: A datetime; end of the period to query (default = now). 402 @param metrics: List of metrics to query (default = all). 403 @param view: View to materialize ('full' or 'summary') 404 @return List of metrics and their readings. 405 """ 406 return self._get_resource_root().get_metrics(self._path() + '/metrics', 407 from_time, to_time, metrics, view)
408
409 - def start(self):
410 """ 411 Start a service. 412 413 @return Reference to the submitted command. 414 """ 415 return self._cmd('start')
416
417 - def stop(self):
418 """ 419 Stop a service. 420 421 @return Reference to the submitted command. 422 """ 423 return self._cmd('stop')
424
425 - def restart(self):
426 """ 427 Restart a service. 428 429 @return Reference to the submitted command. 430 """ 431 return self._cmd('restart')
432
433 - def start_roles(self, *role_names):
434 """ 435 Start a list of roles. 436 437 @param role_names: names of the roles to start. 438 @return: List of submitted commands. 439 """ 440 return self._role_cmd('start', role_names)
441
442 - def stop_roles(self, *role_names):
443 """ 444 Stop a list of roles. 445 446 @param role_names: names of the roles to stop. 447 @return: List of submitted commands. 448 """ 449 return self._role_cmd('stop', role_names)
450
451 - def restart_roles(self, *role_names):
452 """ 453 Restart a list of roles. 454 455 @param role_names: names of the roles to restart. 456 @return: List of submitted commands. 457 """ 458 return self._role_cmd('restart', role_names)
459
460 - def bootstrap_hdfs_stand_by(self, *role_names):
461 """ 462 Bootstrap HDFS stand-by NameNodes. 463 464 Initialize their state by syncing it with the respective HA partner. 465 466 @param role_names: NameNodes to bootstrap. 467 @return: List of submitted commands. 468 """ 469 return self._role_cmd('hdfsBootstrapStandBy', role_names)
470
471 - def create_beeswax_warehouse(self):
472 """ 473 DEPRECATED: use create_hive_warehouse on the Hive service. Deprecated since v3. 474 475 Create the Beeswax role's warehouse for a Hue service. 476 477 @return: Reference to the submitted command. 478 """ 479 return self._cmd('hueCreateHiveWarehouse')
480
481 - def create_hbase_root(self):
482 """ 483 Create the root directory of an HBase service. 484 485 @return Reference to the submitted command. 486 """ 487 return self._cmd('hbaseCreateRoot')
488
489 - def create_hdfs_tmp(self):
490 """ 491 Create the /tmp directory in HDFS with appropriate ownership and permissions. 492 493 @return: Reference to the submitted command 494 @since: API v2 495 """ 496 return self._cmd('hdfsCreateTmpDir')
497
498 - def refresh(self, *role_names):
499 """ 500 Execute the "refresh" command on a set of roles. 501 502 @param: role_names Names of the roles to refresh. 503 @return: Reference to the submitted command. 504 """ 505 return self._role_cmd('refresh', role_names)
506
507 - def decommission(self, *role_names):
508 """ 509 Decommission roles in a service. 510 511 @param role_names Names of the roles to decommission. 512 @return Reference to the submitted command. 513 """ 514 data = json.dumps({ ApiList.LIST_KEY : role_names }) 515 return self._cmd('decommission', data)
516
517 - def recommission(self, *role_names):
518 """ 519 Recommission roles in a service. 520 521 @param role_names Names of the roles to recommission. 522 @return Reference to the submitted command. 523 @since: API v2 524 """ 525 data = json.dumps({ ApiList.LIST_KEY : role_names }) 526 return self._cmd('recommission', data)
527
528 - def deploy_client_config(self, *role_names):
529 """ 530 Deploys client configuration to the hosts where roles are running. 531 532 @param: role_names Names of the roles to decommission. 533 @return: Reference to the submitted command. 534 """ 535 data = json.dumps({ ApiList.LIST_KEY : role_names }) 536 return self._cmd('deployClientConfig', data)
537
538 - def disable_hdfs_auto_failover(self, nameservice):
539 """ 540 Disable auto-failover for a highly available HDFS nameservice. 541 542 @param nameservice: Affected nameservice. 543 @return: Reference to the submitted command. 544 """ 545 return self._cmd('hdfsDisableAutoFailover', data = json.dumps(nameservice))
546
547 - def disable_hdfs_ha(self, active_name, secondary_name, 548 start_dependent_services=True, deploy_client_configs=True, 549 disable_quorum_storage=False):
550 """ 551 Disable high availability for an HDFS NameNode. 552 553 @param active_name: Name of the NameNode to keep. 554 @param secondary_name: Name of (existing) SecondaryNameNode to link to 555 remaining NameNode. 556 @param start_dependent_services: whether to re-start dependent services. 557 @param deploy_client_configs: whether to re-deploy client configurations. 558 @param disable_quorum_storage: whether to disable Quorum-based Storage. Available since API v2. 559 Quorum-based Storage will be disabled for all 560 nameservices that have Quorum-based Storage 561 enabled. 562 @return: Reference to the submitted command. 563 """ 564 args = dict( 565 activeName = active_name, 566 secondaryName = secondary_name, 567 startDependentServices = start_dependent_services, 568 deployClientConfigs = deploy_client_configs, 569 ) 570 571 version = self._get_resource_root().version 572 if version < 2: 573 if disable_quorum_storage: 574 raise AttributeError("Quorum-based Storage requires at least API version 2 available in Cloudera Manager 4.1.") 575 else: 576 args['disableQuorumStorage'] = disable_quorum_storage 577 578 return self._cmd('hdfsDisableHa', data = json.dumps(args))
579
580 - def enable_hdfs_auto_failover(self, nameservice, active_fc_name, 581 standby_fc_name, zk_service):
582 """ 583 Enable auto-failover for an HDFS nameservice. 584 585 @param nameservice: Nameservice for which to enable auto-failover. 586 @param active_fc_name: Name of failover controller to create for active node. 587 @param standby_fc_name: Name of failover controller to create for stand-by node. 588 @param zk_service: ZooKeeper service to use. 589 @return: Reference to the submitted command. 590 """ 591 args = dict( 592 nameservice = nameservice, 593 activeFCName = active_fc_name, 594 standByFCName = standby_fc_name, 595 zooKeeperService = dict( 596 clusterName = zk_service.clusterRef.clusterName, 597 serviceName = zk_service.name, 598 ), 599 ) 600 return self._cmd('hdfsEnableAutoFailover', data = json.dumps(args))
601
602 - def enable_hdfs_ha(self, active_name, active_shared_path, standby_name, 603 standby_shared_path, nameservice, start_dependent_services=True, 604 deploy_client_configs=True, enable_quorum_storage=False):
605 """ 606 Enable high availability for an HDFS NameNode. 607 608 @param active_name: name of active NameNode. 609 @param active_shared_path: shared edits path for active NameNode. 610 Ignored if Quorum-based Storage is being enabled. 611 @param standby_name: name of stand-by NameNode. 612 @param standby_shared_path: shared edits path for stand-by NameNode. 613 Ignored if Quourm Journal is being enabled. 614 @param nameservice: nameservice for the HA pair. 615 @param start_dependent_services: whether to re-start dependent services. 616 @param deploy_client_configs: whether to re-deploy client configurations. 617 @param enable_quorum_storage: whether to enable Quorum-based Storage. Available since API v2. 618 Quorum-based Storage will be enabled for all 619 nameservices except those configured with NFS High 620 Availability. 621 @return: Reference to the submitted command. 622 """ 623 args = dict( 624 activeName = active_name, 625 standByName = standby_name, 626 nameservice = nameservice, 627 startDependentServices = start_dependent_services, 628 deployClientConfigs = deploy_client_configs, 629 ) 630 631 if enable_quorum_storage: 632 version = self._get_resource_root().version 633 if version < 2: 634 raise AttributeError("Quorum-based Storage is not supported prior to Cloudera Manager 4.1.") 635 else: 636 args['enableQuorumStorage'] = enable_quorum_storage 637 else: 638 if active_shared_path is None or standby_shared_path is None: 639 raise AttributeError("Active and standby shared paths must be specified if not enabling Quorum-based Storage") 640 args['activeSharedEditsPath'] = active_shared_path 641 args['standBySharedEditsPath'] = standby_shared_path 642 643 return self._cmd('hdfsEnableHa', data = json.dumps(args))
644
645 - def enable_jt_ha(self, new_jt_host_id, force_init_znode=True, zk_service_name=None):
646 """ 647 Enable high availability for a MR JobTracker. 648 649 @param zk_service_name: Name of the ZooKeeper service to use for auto-failover. 650 If MapReduce service depends on a ZooKeeper service then that ZooKeeper 651 service will be used for auto-failover and in that case this parameter 652 can be omitted. 653 @param new_jt_host_id: id of the host where the second JobTracker 654 will be added. 655 @param force_init_znode: Initialize the ZNode used for auto-failover even if 656 it already exists. This can happen if JobTracker HA 657 was enabled before and then disabled. Disable operation 658 doesn't delete this ZNode. Defaults to true. 659 @return: Reference to the submitted command. 660 """ 661 args = dict( 662 newJtHostId = new_jt_host_id, 663 forceInitZNode = force_init_znode, 664 zkServiceName = zk_service_name, 665 ) 666 return self._cmd('enableJtHa', data = json.dumps(args))
667
668 - def disable_jt_ha(self, active_name):
669 """ 670 Disable high availability for a MR JobTracker active-standby pair. 671 672 @param active_name: name of the JobTracker that will be active after 673 the disable operation. The other JobTracker and 674 Failover Controllers will be removed. 675 @return: Reference to the submitted command. 676 """ 677 args = dict( 678 activeName = active_name, 679 ) 680 return self._cmd('disableJtHa', data = json.dumps(args))
681
682 - def failover_hdfs(self, active_name, standby_name, force=False):
683 """ 684 Initiate a failover of an HDFS NameNode HA pair. 685 686 This will make the given stand-by NameNode active, and vice-versa. 687 688 @param active_name: name of currently active NameNode. 689 @param standby_name: name of NameNode currently in stand-by. 690 @param force: whether to force failover. 691 @return: Reference to the submitted command. 692 """ 693 params = { "force" : "true" and force or "false" } 694 args = { ApiList.LIST_KEY : [ active_name, standby_name ] } 695 return self._cmd('hdfsFailover', data = json.dumps(args))
696
697 - def format_hdfs(self, *namenodes):
698 """ 699 Format NameNode instances of an HDFS service. 700 701 @param namenodes Name of NameNode instances to format. 702 @return List of submitted commands. 703 """ 704 return self._role_cmd('hdfsFormat', namenodes)
705
706 - def init_hdfs_auto_failover(self, *controllers):
707 """ 708 Initialize HDFS failover controller metadata. 709 710 Only one controller per nameservice needs to be initialized. 711 712 @param controllers: Name of failover controller instances to initialize. 713 @return: List of submitted commands. 714 """ 715 return self._role_cmd('hdfsInitializeAutoFailover', controllers)
716
717 - def init_hdfs_shared_dir(self, *namenodes):
718 """ 719 Initialize a NameNode's shared edits directory. 720 721 @param namenodes Name of NameNode instances. 722 @return List of submitted commands. 723 """ 724 return self._role_cmd('hdfsInitializeSharedDir', namenodes)
725
726 - def roll_edits_hdfs(self, nameservice=None):
727 """ 728 Roll the edits of an HDFS NameNode or Nameservice. 729 730 @param nameservice: Nameservice whose edits should be rolled. 731 Required only with a federated HDFS. 732 @return: Reference to the submitted command. 733 @since: API v3 734 """ 735 args = dict() 736 if nameservice: 737 args['nameservice'] = nameservice 738 739 return self._cmd('hdfsRollEdits', data = json.dumps(args))
740
741 - def cleanup_zookeeper(self, *servers):
742 """ 743 Cleanup a ZooKeeper service or roles. 744 745 If no server role names are provided, the command applies to the whole 746 service, and cleans up all the server roles that are currently running. 747 748 @param servers: ZK server role names (optional). 749 @return: Command reference (for service command) or list of command 750 references (for role commands). 751 """ 752 if servers: 753 return self._role_cmd('zooKeeperCleanup', servers) 754 else: 755 return self._cmd('zooKeeperCleanup')
756
757 - def init_zookeeper(self, *servers):
758 """ 759 Initialize a ZooKeeper service or roles. 760 761 If no server role names are provided, the command applies to the whole 762 service, and initializes all the configured server roles. 763 764 @param servers: ZK server role names (optional). 765 @return: Command reference (for service command) or list of command 766 references (for role commands). 767 """ 768 if servers: 769 return self._role_cmd('zooKeeperInit', servers) 770 else: 771 return self._cmd('zooKeeperInit')
772
773 - def sync_hue_db(self, *servers):
774 """ 775 Synchronize the Hue server's database. 776 777 @param: servers Name of Hue Server roles to synchronize. 778 @return: List of submitted commands. 779 """ 780 return self._role_cmd('hueSyncDb', servers)
781
782 - def enter_maintenance_mode(self):
783 """ 784 Put the service in maintenance mode. 785 786 @return: Reference to the completed command. 787 @since: API v2 788 """ 789 cmd = self._cmd('enterMaintenanceMode') 790 if cmd.success: 791 self._update(_get_service(self._get_resource_root(), self._path())) 792 return cmd
793
794 - def exit_maintenance_mode(self):
795 """ 796 Take the service out of maintenance mode. 797 798 @return: Reference to the completed command. 799 @since: API v2 800 """ 801 cmd = self._cmd('exitMaintenanceMode') 802 if cmd.success: 803 self._update(_get_service(self._get_resource_root(), self._path())) 804 return cmd
805
806 - def rolling_restart(self, slave_batch_size=None, 807 slave_fail_count_threshold=None, 808 sleep_seconds=None, 809 stale_configs_only=None, 810 unupgraded_only=None, 811 restart_role_types=None, 812 restart_role_names=None):
813 """ 814 Rolling restart the roles of a service. The sequence is: 815 1. Restart all the non-slave roles 816 2. If slaves are present restart them in batches of size specified 817 3. Perform any post-command needed after rolling restart 818 819 @param: slave_batch_size Number of slave roles to restart at a time 820 Must be greater than 0. Default is 1. 821 For HDFS, this number should be less than the replication factor (default 3) 822 to ensure data availability during rolling restart. 823 @param: slave_fail_count_threshold The threshold for number of slave batches that 824 are allowed to fail to restart before the entire command is considered failed. 825 Must be >= 0. Default is 0. 826 @param: sleep_seconds Number of seconds to sleep between restarts of slave role batches. 827 Must be >=0. Default is 0. 828 @param: stale_configs_only Restart roles with stale configs only. Default is false. 829 @param: unupgraded_only Restart roles that haven't been upgraded yet. Default is false. 830 @param: restart_role_types Role types to restart. If not specified, all startable roles are restarted. 831 @param: restart_role_names List of specific roles to restart. 832 If none are specified, then all roles of specified role types are restarted. 833 @return: Reference to the submitted command. 834 @since: API v3 835 """ 836 args = dict() 837 if slave_batch_size: 838 args['slaveBatchSize'] = slave_batch_size 839 if slave_fail_count_threshold: 840 args['slaveFailCountThreshold'] = slave_fail_count_threshold 841 if sleep_seconds: 842 args['sleepSeconds'] = sleep_seconds 843 if stale_configs_only: 844 args['staleConfigsOnly'] = stale_configs_only 845 if unupgraded_only: 846 args['unUpgradedOnly'] = unupgraded_only 847 if restart_role_types: 848 args['restartRoleTypes'] = restart_role_types 849 if restart_role_names: 850 args['restartRoleNames'] = restart_role_names 851 852 return self._cmd('rollingRestart', data = json.dumps(args))
853
854 - def create_replication_schedule(self, 855 start_time, end_time, interval_unit, interval, paused, arguments, 856 alert_on_start=False, alert_on_success=False, alert_on_fail=False,\ 857 alert_on_abort=False):
858 """ 859 Create a new replication schedule for this service. 860 861 The replication argument type varies per service type. The following types 862 are recognized: 863 - HDFS: ApiHdfsReplicationArguments 864 - Hive: ApiHiveReplicationArguments 865 866 @type start_time: datetime.datetime 867 @param start_time: The time at which the schedule becomes active and first executes. 868 @type end_time: datetime.datetime 869 @param end_time: The time at which the schedule will expire. 870 @type interval_unit: str 871 @param interval_unit: The unit of time the `interval` represents. Ex. MINUTE, HOUR, 872 DAY. See the server documentation for a full list of values. 873 @type interval: int 874 @param interval: The number of time units to wait until triggering the next replication. 875 @type paused: bool 876 @param paused: Should the schedule be paused? Useful for on-demand replication. 877 @param arguments: service type-specific arguments for the replication job. 878 @param alert_on_start: whether to generate alerts when the job is started. 879 @param alert_on_success: whether to generate alerts when the job succeeds. 880 @param alert_on_fail: whether to generate alerts when the job fails. 881 @param alert_on_abort: whether to generate alerts when the job is aborted. 882 @return: The newly created schedule. 883 @since: API v3 884 """ 885 self._require_min_api_version(3) 886 887 schedule = ApiReplicationSchedule(self._get_resource_root(), 888 startTime=start_time, endTime=end_time, intervalUnit=interval_unit, interval=interval, 889 paused=paused, alertOnStart=alert_on_start, alertOnSuccess=alert_on_success, 890 alertOnFail=alert_on_fail, alertOnAbort=alert_on_abort) 891 892 if self.type == 'HDFS': 893 if not isinstance(arguments, ApiHdfsReplicationArguments): 894 raise TypeError, 'Unexpected type for HDFS replication argument.' 895 schedule.hdfsArguments = arguments 896 elif self.type == 'HIVE': 897 if not isinstance(arguments, ApiHiveReplicationArguments): 898 raise TypeError, 'Unexpected type for Hive replication argument.' 899 schedule.hiveArguments = arguments 900 else: 901 raise TypeError, 'Replication is not supported for service type ' + self.type 902 903 data = json.dumps(ApiList([schedule]).to_json_dict()) 904 resp = self._get_resource_root().post("%s/replications" % self._path(), data=data) 905 data = resp[ApiList.LIST_KEY][0] 906 907 return ApiReplicationSchedule.from_json_dict(data, self._get_resource_root())
908
909 - def get_replication_schedules(self):
910 """ 911 Retrieve a list of replication schedules. 912 913 @return: A list of replication schedules. 914 @since: API v3 915 """ 916 self._require_min_api_version(3) 917 resp = self._get_resource_root().get("%s/replications" % self._path()) 918 return ApiList.from_json_dict(ApiReplicationSchedule, resp, self._get_resource_root())
919
920 - def get_replication_schedule(self, schedule_id):
921 """ 922 Retrieve a single replication schedule. 923 924 @param schedule_id: The id of the schedule to retrieve. 925 @return: The requested schedule. 926 @since: API v3 927 """ 928 self._require_min_api_version(3) 929 resp = self._get_resource_root().get("%s/replications/%d" % 930 (self._path(), schedule_id)) 931 return ApiReplicationSchedule.from_json_dict(resp, self._get_resource_root())
932
933 - def delete_replication_schedule(self, schedule_id):
934 """ 935 Delete a replication schedule. 936 937 @param schedule_id: The id of the schedule to delete. 938 @return: The deleted replication schedule. 939 @since: API v3 940 """ 941 self._require_min_api_version(3) 942 resp = self._get_resource_root().delete("%s/replications/%s" 943 % (self._path(), schedule_id)) 944 return ApiReplicationSchedule.from_json_dict(resp, self._get_resource_root())
945
946 - def update_replication_schedule(self, schedule_id, schedule):
947 """ 948 Update a replication schedule. 949 950 @param schedule_id: The id of the schedule to update. 951 @param schedule: The modified schedule. 952 @return: The updated replication schedule. 953 @since: API v3 954 """ 955 self._require_min_api_version(3) 956 data = json.dumps(schedule.to_json_dict()) 957 resp = self._get_resource_root().put("%s/replications/%s" 958 % (self._path(), schedule_id), data=data) 959 return ApiReplicationSchedule.from_json_dict(resp, self._get_resource_root())
960
961 - def trigger_replication_schedule(self, schedule_id, dry_run=False):
962 """ 963 Trigger replication immediately. Start and end dates on the schedule will be 964 ignored. 965 966 @param schedule_id: The id of the schedule to trigger. 967 @param dry_run: Whether to execute a dry run. 968 @return: The command corresponding to the replication job. 969 @since: API v3 970 """ 971 self._require_min_api_version(3) 972 resp = self._get_resource_root().post("%s/replications/%s/run" 973 % (self._path(), schedule_id), params=dict(dryRun=dry_run)) 974 return ApiCommand.from_json_dict(resp, self._get_resource_root())
975
976 - def install_oozie_sharelib(self):
977 """ 978 Installs the Oozie ShareLib. Oozie must be stopped before running this 979 command. 980 981 @return: Reference to the submitted command. 982 @since: API v3 983 """ 984 return self._cmd('installOozieShareLib')
985
987 """ 988 Creates the Hive metastore tables in the configured database, if it 989 hasn't been done yet. 990 991 @return: Reference to the submitted command. 992 @since: API v3 993 """ 994 return self._cmd('hiveCreateMetastoreDatabaseTables')
995
996 - def create_hive_warehouse(self):
997 """ 998 Creates the Hive warehouse directory in HDFS. 999 1000 @return: Reference to the submitted command. 1001 @since: API v3 1002 """ 1003 return self._cmd('hiveCreateHiveWarehouse')
1004
1006 """ 1007 Create the Hive Metastore Database. Only works with embedded postgresql 1008 database. This command should usually be followed by a call to 1009 hiveCreateMetastoreDatabaseTables. 1010 1011 @return: Reference to the submitted command. 1012 @since: API v4 1013 """ 1014 self._require_min_api_version(4) 1015 return self._cmd('hiveCreateMetastoreDatabase')
1016
1018 """ 1019 Update Hive Metastore to point to a NameNode's Nameservice name instead of 1020 hostname. Only available when all Hive Metastore Servers are stopped and 1021 HDFS has High Availability. 1022 1023 Back up the Hive Metastore Database before running this command. 1024 1025 @return: Reference to the submitted command. 1026 @since: API v4 1027 """ 1028 self._require_min_api_version(4) 1029 return self._cmd('hiveUpdateMetastoreNamenodes')
1030
1031 -class ApiServiceSetupInfo(ApiService):
1032 _ATTRIBUTES = { 1033 'name' : None, 1034 'type' : None, 1035 'config' : Attr(ApiConfig), 1036 'roles' : Attr(roles.ApiRole), 1037 } 1038
1039 - def __init__(self, name=None, type=None, 1040 config=None, roles=None):
1041 # The BaseApiObject expects a resource_root, which we don't care about 1042 resource_root = None 1043 # Unfortunately, the json key is called "type". So our input arg 1044 # needs to be called "type" as well, despite it being a python keyword. 1045 BaseApiObject.init(self, None, locals())
1046
1047 - def set_config(self, config):
1048 """ 1049 Set the service configuration. 1050 1051 @param config: A dictionary of config key/value 1052 """ 1053 if self.config is None: 1054 self.config = { } 1055 self.config.update(config_to_api_list(config))
1056
1057 - def add_role_type_info(self, role_type, config):
1058 """ 1059 Add a role type setup info. 1060 1061 @param role_type: Role type 1062 @param config: A dictionary of role type configuration 1063 """ 1064 rt_config = config_to_api_list(config) 1065 rt_config['roleType'] = role_type 1066 1067 if self.config is None: 1068 self.config = { } 1069 if not self.config.has_key(ROLETYPES_CFG_KEY): 1070 self.config[ROLETYPES_CFG_KEY] = [ ] 1071 self.config[ROLETYPES_CFG_KEY].append(rt_config)
1072
1073 - def add_role_info(self, role_name, role_type, host_id, config=None):
1074 """ 1075 Add a role info. The role will be created along with the service setup. 1076 1077 @param role_name: Role name 1078 @param role_type: Role type 1079 @param host_id: The host where the role should run 1080 @param config: (Optional) A dictionary of role config values 1081 """ 1082 if self.roles is None: 1083 self.roles = [ ] 1084 api_config_list = config is not None and config_to_api_list(config) or None 1085 self.roles.append({ 1086 'name' : role_name, 1087 'type' : role_type, 1088 'hostRef' : { 'hostId' : host_id }, 1089 'config' : api_config_list })
1090