-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo_local.py
203 lines (168 loc) · 7.52 KB
/
demo_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 25 16:10:00 2018
@author: Ash
"""
import scipy as sp
import numpy as np
import numpy.random as rnd
import sys
import distree as dst
import distree.schedulers as SL
import anytree as atr
class Distree_Demo(dst.Distree):
def get_taskdata_path(self, task_id):
return self.data_path + '%s.npy' % task_id
"""
Currently, this just dumps the data in an array and uses numpy/scipy save().
TODO: Use a better-structured data format
"""
def save_task_data(self, taskdata_path, data, task_id, parent_id):
dlist = [task_id, parent_id,
data['parent_path'],
data['branch_num'],
data['t_0'],
data['t_1'],
data['t_max'],
data['state'],
data['coeff'],
data['num_children']
]
sp.save(taskdata_path, sp.array(dlist, dtype=object))
def load_task_data(self, taskdata_path):
d = sp.load(taskdata_path)
task_id = d[0]
parent_id = d[1]
taskdata = {'parent_path': d[2],
'branch_num': d[3],
't_0': d[4],
't_1': d[5],
't_max': d[6],
'state': d[7],
'coeff': d[8],
'num_children': d[9]}
return taskdata, task_id, parent_id
def branch_path(self, parent_path, branch_num):
return parent_path + '/%u' % branch_num
def run_task(self, taskdata_path):
#Load data saved by parent task. The taskdata file contains everything
#needed to run the task. Initial values, parameters, and so on.
#It also contains the task_id, generated by the scheduler when the
#task was scheduled, and the parent_id, which may be `None`.
taskdata, task_id, parent_id = self.load_task_data(taskdata_path)
#Put some data into local variables for convenience
parent_path = taskdata['parent_path']
branch_num = taskdata['branch_num']
t_0 = taskdata['t_0']
t_max = taskdata['t_max']
state = taskdata['state']
coeff = taskdata['coeff']
#Do the task. This is just a stupid dummy task.
#It 'evolves time' from t_0 to t_0 + 1 and modifies the state.
t_1 = t_0 + 1
state = sp.rand(4) * state
#Update some of the taskdata structure based on the task we ran.
taskdata['t_1'] = t_1
taskdata['state'] = state
#If the simulation is not over, there will be children!
if t_1 < t_max:
taskdata = self.branch(taskdata, task_id, parent_id)
#Save the final taskdata, overwriting the initial data file(s)
self.save_task_data(taskdata_path, taskdata, task_id, parent_id)
def branch(self, taskdata, task_id, parent_id):
parent_path = taskdata['parent_path']
branch_num = taskdata['branch_num']
t_1 = taskdata['t_1']
t_max = taskdata['t_max']
state = taskdata['state']
coeff = taskdata['coeff']
#determine number of children
num_children = 2
#modify the
taskdata['num_children'] = 2
#determine the child's parent branch path
branch_path = self.branch_path(parent_path, branch_num)
#create taskdata files for, and schedule, children
for child_branch in range(num_children):
child_taskdata = {'parent_path': branch_path,
'branch_num': child_branch,
't_0': t_1,
't_1': None,
't_max': t_max,
#a new state for each child
'state': state * (child_branch+1),
'coeff': coeff/(child_branch+1),
'num_children': None}
#This will assign a new task_id to each child, add their tasks to
#the log, and schedule them to be run. How they are run is up to
#the scheduler.
child_id, child_path = self.schedule_task(task_id, child_taskdata)
#NOTE: We could add more child info to the parent taskdata here
return taskdata
#Build an anytree from saved data by parsing the log file.
def build_tree(dtree):
top = None
r = atr.Resolver('name')
with open(dtree.log_path, "r") as f:
for line in f:
task_id1, parent_id1, taskdata_path = line.strip().split("\t")
taskdata, task_id2, parent_id2 = dtree.load_task_data(taskdata_path)
assert task_id1 == str(task_id2)
assert parent_id1 == str(parent_id2)
parent_path = taskdata['parent_path']
branch_num = taskdata['branch_num']
num_children = taskdata['num_children']
if top is None:
#Check that this really is a top-level node
assert parent_id2 == "" or parent_id2 is None
top = atr.Node('%u' % branch_num, task_id=task_id2,
parent_id=parent_id2,
num_children=num_children,
data=taskdata)
else:
#pnode = atr.search.find_by_attr(top, parent_id,
# name='task_id') #not optimal, but should never fail
#should be efficient
# (alternatively, keep a node dictionary with id's as keys)
pnode = r.get(top, parent_path)
atr.Node('%u' % branch_num, parent=pnode, task_id=task_id2,
parent_id=parent_id2, num_children=num_children,
data=taskdata)
return top
if __name__ == "__main__":
#Create a scheduler and tell it what script to run to schedule tasks.
sched = SL.Sched_Local(sys.argv[0], scriptargs=['--child'])
#Create the tree object, telling it where the logfile lives, where the taskdata
#files are to be stored, and giving it the scheduler to use.
dtree = Distree_Demo('logfile.txt', '', sched)
#NOTE: This script is designed so that it can schedule the root job and also
# child jobs, depending on the supplied command-line arguments.
if len(sys.argv) == 1:
#Save a simple initial taskdata file and schedule a root job!
init_task_data = {'parent_path': '',
'branch_num': 0,
't_0': 0,
't_1': None,
't_max': 4,
'state': sp.rand(4),
'coeff': 1.0,
'num_children': None}
dtree.save_task_data('root.npy', init_task_data, None, None)
#The following schedules a job (it will be run in a different process)
dtree.schedule_task_from_file('root.npy')
elif len(sys.argv) == 2:
if sys.argv[1] == '--show':
#Print the tree from saved data
top = build_tree(dtree)
print(atr.RenderTree(top))
else:
#Assume the single argument is a taskdata file to be used for a root job
dtree.schedule_task_from_file(sys.argv[1])
elif len(sys.argv) == 3:
if sys.argv[2] == '--child':
#Assume the first argument is a taskdata file for a child job.
#This means the task should be run in the current process,
#rather than be scheduled for later.
dtree.run_task(sys.argv[1])
else:
print("I don't know what you want from me.")