1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import datetime
18 import random
19 import unittest
20 try:
21 import json
22 except ImportError:
23 import simplejson as json
24
25 from cm_api.endpoints.types import *
26 from cm_api.endpoints.services import ApiService
27 from cm_api_tests import utils
28
30
32 RAW = '''{
33 "sourceService" : {
34 "peerName" : "vst2",
35 "clusterName" : "Cluster 1 - CDH4",
36 "serviceName" : "HDFS-1"
37 },
38 "sourcePath" : "/data",
39 "destinationPath" : "/copy/data2",
40 "mapreduceServiceName" : "MAPREDUCE-1",
41 "schedulerPoolName" : "medium",
42 "userName" : "systest",
43 "dryRun" : false,
44 "abortOnError" : true,
45 "removeMissingFiles" : false,
46 "preserveReplicationCount" : true,
47 "preserveBlockSize" : true,
48 "preservePermissions" : false
49 }'''
50 args = utils.deserialize(RAW, ApiHdfsReplicationArguments)
51 self.assertEquals('vst2', args.sourceService.peerName)
52 self.assertEquals('Cluster 1 - CDH4', args.sourceService.clusterName)
53 self.assertEquals('HDFS-1', args.sourceService.serviceName)
54 self.assertEquals('/data', args.sourcePath)
55 self.assertEquals('/copy/data2', args.destinationPath)
56 self.assertEquals('MAPREDUCE-1', args.mapreduceServiceName)
57 self.assertEquals('medium', args.schedulerPoolName)
58 self.assertEquals('systest', args.userName)
59 self.assertFalse(args.dryRun)
60 self.assertTrue(args.abortOnError)
61 self.assertFalse(args.removeMissingFiles)
62 self.assertTrue(args.preserveBlockSize)
63 self.assertFalse(args.preservePermissions)
64 self.assertTrue(args.preserveReplicationCount)
65
67 RAW = '''{
68 "sourceService" : {
69 "peerName" : "vst2",
70 "clusterName" : "Cluster 1 - CDH4",
71 "serviceName" : "HIVE-1"
72 },
73 "force" : true,
74 "replicateData" : true,
75 "hdfsArguments" : {
76 "mapreduceServiceName" : "MAPREDUCE-1",
77 "dryRun" : false,
78 "abortOnError" : false,
79 "removeMissingFiles" : false,
80 "preserveReplicationCount" : false,
81 "preserveBlockSize" : false,
82 "preservePermissions" : false
83 },
84 "tableFilters" : [
85 { "database" : "db1", "tableName" : "table1" }
86 ],
87 "dryRun" : false
88 }'''
89 args = utils.deserialize(RAW, ApiHiveReplicationArguments)
90 self.assertEquals('vst2', args.sourceService.peerName)
91 self.assertEquals('Cluster 1 - CDH4', args.sourceService.clusterName)
92 self.assertEquals('HIVE-1', args.sourceService.serviceName)
93 self.assertTrue(args.force)
94 self.assertTrue(args.replicateData)
95 self.assertIsInstance(args.hdfsArguments, ApiHdfsReplicationArguments)
96 self.assertIsInstance(args.tableFilters, list)
97 self.assertEquals(1, len(args.tableFilters))
98 self.assertIsInstance(args.tableFilters[0], ApiHiveTable)
99 self.assertEquals("db1", args.tableFilters[0].database)
100 self.assertEquals("table1", args.tableFilters[0].tableName)
101
103 RAW = '''{
104 "id" : 39,
105 "startTime" : "2012-12-10T23:11:31.041Z",
106 "interval" : 1,
107 "intervalUnit" : "DAY",
108 "paused" : false,
109 "nextRun" : "2013-01-15T23:11:31.041Z",
110 "history" : [ {
111 "id" : 738,
112 "name" : "HiveReplicationCommand",
113 "startTime" : "2013-01-15T18:28:24.895Z",
114 "endTime" : "2013-01-15T18:30:49.446Z",
115 "active" : false,
116 "success" : true,
117 "resultMessage" : "Hive Replication Finished Successfully.",
118 "resultDataUrl" : "/cmf/command/738/download",
119 "serviceRef" : {
120 "clusterName" : "Cluster 1 - CDH4",
121 "serviceName" : "HIVE-1"
122 },
123 "hiveResult" : {
124 "tables" : [ {
125 "database" : "default",
126 "tableName" : "repl_test_1"
127 }, {
128 "database" : "default",
129 "tableName" : "sample_07"
130 }, {
131 "database" : "default",
132 "tableName" : "sample_08"
133 } ],
134 "errors" : [ ],
135 "dataReplicationResult" : {
136 "progress" : 100,
137 "numFilesCopied" : 0,
138 "numBytesCopied" : 0,
139 "numFilesSkipped" : 3,
140 "numBytesSkipped" : 92158,
141 "numFilesDeleted" : 0,
142 "numFilesCopyFailed" : 0,
143 "numBytesCopyFailed" : 0,
144 "dryRun" : false
145 },
146 "dryRun" : false
147 }
148 } ],
149 "alertOnStart" : false,
150 "alertOnSuccess" : false,
151 "alertOnFail" : false,
152 "alertOnAbort" : false,
153 "hiveArguments" : {
154 "sourceService" : {
155 "peerName" : "vst2",
156 "clusterName" : "Cluster 1 - CDH4",
157 "serviceName" : "HIVE-1"
158 },
159 "force" : true,
160 "replicateData" : true,
161 "hdfsArguments" : {
162 "mapreduceServiceName" : "MAPREDUCE-1",
163 "dryRun" : false,
164 "abortOnError" : false,
165 "removeMissingFiles" : false,
166 "preserveReplicationCount" : false,
167 "preserveBlockSize" : false,
168 "preservePermissions" : false
169 },
170 "dryRun" : false
171 }
172 }'''
173 sched = utils.deserialize(RAW, ApiReplicationSchedule)
174 self.assertEqual(39, sched.id)
175 self.assertEqual(self._parse_time("2012-12-10T23:11:31.041Z"), sched.startTime)
176 self.assertEqual('DAY', sched.intervalUnit)
177 self.assertEqual(1, sched.interval)
178 self.assertFalse(sched.paused)
179 self.assertEqual(self._parse_time("2013-01-15T23:11:31.041Z"), sched.nextRun)
180 self.assertFalse(sched.alertOnStart)
181 self.assertIsNotNone(sched.hiveArguments)
182
183 self.assertEqual(1, len(sched.history))
184 self.assertIsInstance(sched.history[0], ApiReplicationCommand)
185 self.assertEqual('default', sched.history[0].hiveResult.tables[0].database)
186 self.assertEqual(92158, sched.history[0].hiveResult.dataReplicationResult.numBytesSkipped)
187
189 RAW = '''{
190 "name" : "peer1",
191 "url" : "http://peer1",
192 "username" : "user1",
193 "password" : "pwd"
194 }'''
195 peer = ApiCmPeer.from_json_dict(json.loads(RAW), None)
196 self.assertEquals("peer1", peer.name)
197 self.assertEquals("http://peer1", peer.url)
198 self.assertEquals("user1", peer.username)
199 self.assertEquals("pwd", peer.password)
200
202 return datetime.datetime.strptime(tstr, "%Y-%m-%dT%H:%M:%S.%fZ")
203
204
206
210
212 service = ApiService(self.resource, 'hdfs1', 'HDFS')
213 service.__dict__['clusterRef'] = ApiClusterRef(self.resource, clusterName='cluster1')
214
215 hdfs_args = ApiHdfsReplicationArguments(self.resource)
216 hdfs_args.sourceService = ApiServiceRef('cluster2', 'hdfs2')
217 hdfs_args.sourcePath = '/src'
218 hdfs_args.destinationPath = '/dst'
219
220 return_sched = ApiReplicationSchedule(self.resource,
221 interval=2, intervalUnit='DAY')
222 return_sched.hdfsArguments = hdfs_args
223 return_sched.__dict__['id'] = 1
224 return_list = ApiList([ return_sched ]).to_json_dict()
225
226 self.resource.expect("POST",
227 "/clusters/cluster1/services/hdfs1/replications",
228 retdata=return_list)
229
230 sched = service.create_replication_schedule(
231 None, None, 'DAY', 2, True, hdfs_args, alert_on_fail=True)
232 self.assertEqual(return_sched.intervalUnit, sched.intervalUnit)
233 self.assertEqual(return_sched.interval, sched.interval)
234 self.assertIsInstance(sched.hdfsArguments, ApiHdfsReplicationArguments)
235
236 self.resource.expect("GET",
237 "/clusters/cluster1/services/hdfs1/replications",
238 retdata=return_list)
239 service.get_replication_schedules()
240
241 self.resource.expect("GET",
242 "/clusters/cluster1/services/hdfs1/replications/1",
243 retdata=return_sched.to_json_dict())
244 service.get_replication_schedule(1)
245
246 self.resource.expect("PUT",
247 "/clusters/cluster1/services/hdfs1/replications/1",
248 retdata=return_sched.to_json_dict())
249 service.update_replication_schedule(1, return_sched)
250
251 self.resource.expect("DELETE",
252 "/clusters/cluster1/services/hdfs1/replications/1",
253 retdata=return_sched.to_json_dict())
254 service.delete_replication_schedule(1)
255
256 if __name__ == '__main__':
257 unittest.main()
258