Package cm_api_tests :: Module test_replication
[hide private]
[frames] | no frames]

Source Code for Module cm_api_tests.test_replication

  1  # Licensed to Cloudera, Inc. under one 
  2  # or more contributor license agreements.  See the NOTICE file 
  3  # distributed with this work for additional information 
  4  # regarding copyright ownership.  Cloudera, Inc. licenses this file 
  5  # to you under the Apache License, Version 2.0 (the 
  6  # "License"); you may not use this file except in compliance 
  7  # with the License.  You may obtain a copy of the License at 
  8  # 
  9  #     http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16   
 17  import datetime 
 18  import random 
 19  import unittest 
 20  try: 
 21    import json 
 22  except ImportError: 
 23    import simplejson as json 
 24   
 25  from cm_api.endpoints.types import * 
 26  from cm_api.endpoints.services import ApiService 
 27  from cm_api_tests import utils 
 28   
29 -class TestReplicationTypes(unittest.TestCase):
30
31 - def test_hdfs_arguments(self):
32 RAW = '''{ 33 "sourceService" : { 34 "peerName" : "vst2", 35 "clusterName" : "Cluster 1 - CDH4", 36 "serviceName" : "HDFS-1" 37 }, 38 "sourcePath" : "/data", 39 "destinationPath" : "/copy/data2", 40 "mapreduceServiceName" : "MAPREDUCE-1", 41 "schedulerPoolName" : "medium", 42 "userName" : "systest", 43 "dryRun" : false, 44 "abortOnError" : true, 45 "removeMissingFiles" : false, 46 "preserveReplicationCount" : true, 47 "preserveBlockSize" : true, 48 "preservePermissions" : false 49 }''' 50 args = utils.deserialize(RAW, ApiHdfsReplicationArguments) 51 self.assertEquals('vst2', args.sourceService.peerName) 52 self.assertEquals('Cluster 1 - CDH4', args.sourceService.clusterName) 53 self.assertEquals('HDFS-1', args.sourceService.serviceName) 54 self.assertEquals('/data', args.sourcePath) 55 self.assertEquals('/copy/data2', args.destinationPath) 56 self.assertEquals('MAPREDUCE-1', args.mapreduceServiceName) 57 self.assertEquals('medium', args.schedulerPoolName) 58 self.assertEquals('systest', args.userName) 59 self.assertFalse(args.dryRun) 60 self.assertTrue(args.abortOnError) 61 self.assertFalse(args.removeMissingFiles) 62 self.assertTrue(args.preserveBlockSize) 63 self.assertFalse(args.preservePermissions) 64 self.assertTrue(args.preserveReplicationCount)
65
66 - def test_hive_arguments(self):
67 RAW = '''{ 68 "sourceService" : { 69 "peerName" : "vst2", 70 "clusterName" : "Cluster 1 - CDH4", 71 "serviceName" : "HIVE-1" 72 }, 73 "force" : true, 74 "replicateData" : true, 75 "hdfsArguments" : { 76 "mapreduceServiceName" : "MAPREDUCE-1", 77 "dryRun" : false, 78 "abortOnError" : false, 79 "removeMissingFiles" : false, 80 "preserveReplicationCount" : false, 81 "preserveBlockSize" : false, 82 "preservePermissions" : false 83 }, 84 "tableFilters" : [ 85 { "database" : "db1", "tableName" : "table1" } 86 ], 87 "dryRun" : false 88 }''' 89 args = utils.deserialize(RAW, ApiHiveReplicationArguments) 90 self.assertEquals('vst2', args.sourceService.peerName) 91 self.assertEquals('Cluster 1 - CDH4', args.sourceService.clusterName) 92 self.assertEquals('HIVE-1', args.sourceService.serviceName) 93 self.assertTrue(args.force) 94 self.assertTrue(args.replicateData) 95 self.assertIsInstance(args.hdfsArguments, ApiHdfsReplicationArguments) 96 self.assertIsInstance(args.tableFilters, list) 97 self.assertEquals(1, len(args.tableFilters)) 98 self.assertIsInstance(args.tableFilters[0], ApiHiveTable) 99 self.assertEquals("db1", args.tableFilters[0].database) 100 self.assertEquals("table1", args.tableFilters[0].tableName)
101
102 - def test_schedule(self):
103 RAW = '''{ 104 "id" : 39, 105 "startTime" : "2012-12-10T23:11:31.041Z", 106 "interval" : 1, 107 "intervalUnit" : "DAY", 108 "paused" : false, 109 "nextRun" : "2013-01-15T23:11:31.041Z", 110 "history" : [ { 111 "id" : 738, 112 "name" : "HiveReplicationCommand", 113 "startTime" : "2013-01-15T18:28:24.895Z", 114 "endTime" : "2013-01-15T18:30:49.446Z", 115 "active" : false, 116 "success" : true, 117 "resultMessage" : "Hive Replication Finished Successfully.", 118 "resultDataUrl" : "/cmf/command/738/download", 119 "serviceRef" : { 120 "clusterName" : "Cluster 1 - CDH4", 121 "serviceName" : "HIVE-1" 122 }, 123 "hiveResult" : { 124 "tables" : [ { 125 "database" : "default", 126 "tableName" : "repl_test_1" 127 }, { 128 "database" : "default", 129 "tableName" : "sample_07" 130 }, { 131 "database" : "default", 132 "tableName" : "sample_08" 133 } ], 134 "errors" : [ ], 135 "dataReplicationResult" : { 136 "progress" : 100, 137 "numFilesCopied" : 0, 138 "numBytesCopied" : 0, 139 "numFilesSkipped" : 3, 140 "numBytesSkipped" : 92158, 141 "numFilesDeleted" : 0, 142 "numFilesCopyFailed" : 0, 143 "numBytesCopyFailed" : 0, 144 "dryRun" : false 145 }, 146 "dryRun" : false 147 } 148 } ], 149 "alertOnStart" : false, 150 "alertOnSuccess" : false, 151 "alertOnFail" : false, 152 "alertOnAbort" : false, 153 "hiveArguments" : { 154 "sourceService" : { 155 "peerName" : "vst2", 156 "clusterName" : "Cluster 1 - CDH4", 157 "serviceName" : "HIVE-1" 158 }, 159 "force" : true, 160 "replicateData" : true, 161 "hdfsArguments" : { 162 "mapreduceServiceName" : "MAPREDUCE-1", 163 "dryRun" : false, 164 "abortOnError" : false, 165 "removeMissingFiles" : false, 166 "preserveReplicationCount" : false, 167 "preserveBlockSize" : false, 168 "preservePermissions" : false 169 }, 170 "dryRun" : false 171 } 172 }''' 173 sched = utils.deserialize(RAW, ApiReplicationSchedule) 174 self.assertEqual(39, sched.id) 175 self.assertEqual(self._parse_time("2012-12-10T23:11:31.041Z"), sched.startTime) 176 self.assertEqual('DAY', sched.intervalUnit) 177 self.assertEqual(1, sched.interval) 178 self.assertFalse(sched.paused) 179 self.assertEqual(self._parse_time("2013-01-15T23:11:31.041Z"), sched.nextRun) 180 self.assertFalse(sched.alertOnStart) 181 self.assertIsNotNone(sched.hiveArguments) 182 183 self.assertEqual(1, len(sched.history)) 184 self.assertIsInstance(sched.history[0], ApiReplicationCommand) 185 self.assertEqual('default', sched.history[0].hiveResult.tables[0].database) 186 self.assertEqual(92158, sched.history[0].hiveResult.dataReplicationResult.numBytesSkipped)
187
188 - def test_peers(self):
189 RAW = '''{ 190 "name" : "peer1", 191 "url" : "http://peer1", 192 "username" : "user1", 193 "password" : "pwd" 194 }''' 195 peer = ApiCmPeer.from_json_dict(json.loads(RAW), None) 196 self.assertEquals("peer1", peer.name) 197 self.assertEquals("http://peer1", peer.url) 198 self.assertEquals("user1", peer.username) 199 self.assertEquals("pwd", peer.password)
200
201 - def _parse_time(self, tstr):
202 return datetime.datetime.strptime(tstr, "%Y-%m-%dT%H:%M:%S.%fZ")
203 204
205 -class TestReplicationRequests(unittest.TestCase):
206
207 - def __init__(self, methodName):
208 super(TestReplicationRequests, self).__init__(methodName) 209 self.resource = utils.MockResource(self)
210
211 - def test_replication_crud(self):
212 service = ApiService(self.resource, 'hdfs1', 'HDFS') 213 service.__dict__['clusterRef'] = ApiClusterRef(self.resource, clusterName='cluster1') 214 215 hdfs_args = ApiHdfsReplicationArguments(self.resource) 216 hdfs_args.sourceService = ApiServiceRef('cluster2', 'hdfs2') 217 hdfs_args.sourcePath = '/src' 218 hdfs_args.destinationPath = '/dst' 219 220 return_sched = ApiReplicationSchedule(self.resource, 221 interval=2, intervalUnit='DAY') 222 return_sched.hdfsArguments = hdfs_args 223 return_sched.__dict__['id'] = 1 224 return_list = ApiList([ return_sched ]).to_json_dict() 225 226 self.resource.expect("POST", 227 "/clusters/cluster1/services/hdfs1/replications", 228 retdata=return_list) 229 230 sched = service.create_replication_schedule( 231 None, None, 'DAY', 2, True, hdfs_args, alert_on_fail=True) 232 self.assertEqual(return_sched.intervalUnit, sched.intervalUnit) 233 self.assertEqual(return_sched.interval, sched.interval) 234 self.assertIsInstance(sched.hdfsArguments, ApiHdfsReplicationArguments) 235 236 self.resource.expect("GET", 237 "/clusters/cluster1/services/hdfs1/replications", 238 retdata=return_list) 239 service.get_replication_schedules() 240 241 self.resource.expect("GET", 242 "/clusters/cluster1/services/hdfs1/replications/1", 243 retdata=return_sched.to_json_dict()) 244 service.get_replication_schedule(1) 245 246 self.resource.expect("PUT", 247 "/clusters/cluster1/services/hdfs1/replications/1", 248 retdata=return_sched.to_json_dict()) 249 service.update_replication_schedule(1, return_sched) 250 251 self.resource.expect("DELETE", 252 "/clusters/cluster1/services/hdfs1/replications/1", 253 retdata=return_sched.to_json_dict()) 254 service.delete_replication_schedule(1)
255 256 if __name__ == '__main__': 257 unittest.main() 258