-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathutils.py
350 lines (293 loc) · 9.04 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import boto3
import datetime
import json
from zipfile import ZipFile
from collections import OrderedDict
import uuid
import time
from urlparse import urlparse
from threading import Thread
import httplib
from Queue import Queue
import uuid
import boto3
import json
import random
import os
import sys
import copy
import decimal
from conf import *
def fstr(f):
"""
Convert a float number to string
"""
ctx = decimal.Context()
ctx.prec = 20
d1 = ctx.create_decimal(repr(f))
return format(d1, 'f')
def run_cmd(cmd):
"""
The simplest way to run an external command
"""
return os.popen(cmd).read()
def zip_code(zip_name, code_path):
"""
Zip the source function files to a deployment package
"""
with ZipFile(zip_name, 'w') as lambda_zip:
if not os.path.isdir(code_path):
lambda_zip.write(code_path)
else:
for root, dirs, fs in os.walk(code_path):
for f in fs:
abs_path = os.path.join(root, f)
lambda_zip.write(abs_path, f)
def get_config_basic():
"""
Get the credentials and basic setting from the config file
"""
aws_id = CONGIF["creds"]["aws_id"]
aws_key = CONGIF["creds"]["aws_key"]
region = CONGIF["func"]["region"]
roles = [CONGIF["func"]["role_1"], CONGIF["func"]["role_2"]]
return aws_id, aws_key, region, roles
def get_default_req(sleep_tm):
"""
Construct the basic request
By default all the parameters in the request are set to False to skip
the tests, except sleep_tm and stat
Args:
sleep_tm: the function start tasks after
current time + sleep_tm
"""
d = copy.deepcopy(PARA_TEMP)
for k in d:
d[k] = False
d["stat"] = dict(argv=1)
d["sleep"] = (time.time() + sleep_tm) * 1000
return {"cmds": d}
def append_io_req(req, rd, size, cnt):
"""
Construct a request for the IO throughput test
Args:
req: the basic request
rd: no. of rounds
size: the size of data to write each time
cnt: the times to write in each round
Example:
append_io_req(rd=5, size=1kB, cnt=1000)
write 1kB 1000 times using dd, repeat 5 rounds
"""
req["cmds"]["io"] = dict(rd=rd, size=size, cnt=cnt)
return req
def append_net_req(req, server_ip, port):
"""
Construct a request for the network throughput test
By default the port number starts from 5000
Args:
req: the basic request
port_offset: the offset of the port number;
server_ip: the IP of the iperf server
Example:
append_net_req(port_offset=5, server_ip="1.1.1.1")
run network throughput tests using iperf with
server at 1.1.1.1:5005
"""
req["cmds"]["net"] = dict(port=port, server_ip=server_ip)
return req
def append_cpu_req(req, n):
"""
Construct a request for the CPU test
Args:
req: the basic request
n: the CPU test will calculate n! and record the time
"""
req["cmds"]["cpu"] = dict(n=n)
return req
def append_cpuu_req(req, n):
"""
Construct a request for the CPU utilization test
Args:
req: the basic request
n: the CPU utilization test will record n timestamps
"""
req["cmds"]["cpuu"] = dict(n=n)
return req
# if runtime not in ['java8']:
# zip_file_name = '%s_lambda.zip' % func_name
# with ZipFile(zip_file_name, 'w') as lambda_zip:
# lambda_zip.write(src_file)
# _handler = '%s.handler' % src_file.split(".")[0]
# else:
# cmd = "gradle build"
# run_cmd(cmd)
# zip_file_name = "build/distributions/code.zip"
# _handler = "example.Target::handleRequest"
class FuncOp():
"""
The class for function operation
"""
def __init__(
self,
aws_id,
aws_key,
region,
role,
runtime,
memory,
func_name):
self.aws_id = aws_id
self.aws_key = aws_key
self.region = region
self.role = role
self.runtime = runtime
self.memory = memory
self.func_name = func_name
def get_func_name(self):
return self.func_name
def set_func_role(self, role):
self.role = role
def set_func_runtime(self, runtime):
self.runtime = runtime
def set_func_memory(self, memory):
self.memory = memory
def set_func_name(self, name):
self.func_name = name
def get_client(self):
"""
run this everytime to get a new connection
should not use a persistent connection
"""
session = boto3.Session(aws_access_key_id=self.aws_id,
aws_secret_access_key=self.aws_key,
region_name=self.region)
client = session.client(service_name='lambda')
return client
def del_function(self):
try:
client = self.get_client()
client.delete_function(FunctionName=self.func_name)
return True
except Exception as e:
# print str(e)
return False
def create_function(self, src_file, func_handler):
"""
Create a new function
Args:
src_file: the DIRECTORY for the code
all the files under the directory will be zipped
func_handler: the name of the function entry point
"""
try:
client = self.get_client()
with open(src_file) as zip_blob:
response = client.create_function(
Code={'ZipFile': zip_blob.read()},
Description='',
FunctionName=self.func_name,
Handler=func_handler,
MemorySize=self.memory,
Publish=True,
Role=self.role,
Runtime=self.runtime,
Timeout=300,
)
return True
except Exception as e:
print str(e)
return False
def update_function(self, key, value):
assert key in ["role", "memory", "env"]
client = self.get_client()
if key == "role":
client.update_function_configuration(
FunctionName=self.func_name, Role=value)
elif key == "memory":
client.update_function_configuration(
FunctionName=self.func_name, MemorySize=value)
elif key == "env":
client.update_function_configuration(
FunctionName=self.func_name, Environment=value)
return True
def dump_meta(self):
"""
The basic information to record
"""
return "{}#{}#{}#{}".format(
self.region,
self.runtime,
self.memory,
self.func_name)
def send_one_request(self, req_para={}):
client = self.get_client()
tm_st = time.time() * 1000
resp = client.invoke(
FunctionName=self.func_name,
InvocationType='RequestResponse',
Payload=json.dumps(req_para)
)
tm_ed = time.time() * 1000
try:
resp = json.loads(resp['Payload'].read())
except Exception as e:
print str(e), resp
if not resp:
resp = "ERROR"
out = "{}#{}#{}#{}".format(
resp, fstr(tm_st), fstr(tm_ed), fstr(
tm_ed - tm_st))
out = "{}#{}".format(self.dump_meta(), out)
return out
class Worker():
"""
A queue-based multiple threading framework for sending
parallel requests
"""
def __init__(self, fout, rd_id, work_no, func):
self.fout = fout
self.work_no = work_no
self.rd_id = rd_id
self.func = func
self.subrd_id = 0
self.q = Queue(10000)
self.task_no = 0
def set_rdid(self, _id):
self.rd_id = _id
def set_subrdid(self, _id):
self.subrd_id = _id
def clear_queue(self):
with self.q.mutex:
self.q.queue.clear()
def run_task(self, task):
while True:
work_id, para = self.q.get()
res = task(para)
_entry = "{}#{}#{}#{}#{}\n".format(
self.rd_id, self.subrd_id, self.task_no, work_id, res)
open(self.fout, "a").write(_entry)
# print res
self.q.task_done()
def init(self):
for i in range(self.work_no):
t = Thread(target=self.run_task, args=(self.task,))
t.daemon = True
t.start()
def add_tasks(self, para_list):
self.task_no = len(para_list)
self.subrd_id += 1
try:
for i in xrange(self.task_no):
para = para_list[i]
work_id = i
self.q.put((work_id, para))
self.q.join()
except KeyboardInterrupt:
sys.exit(1)
def task(self, para):
"""
Customized your task here
"""
res = self.func(*para)
return res