-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstatus.py
executable file
·377 lines (301 loc) · 11.8 KB
/
status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Status page handler for mapreduce framework."""
import os
import time
from google.appengine.api import validation
from google.appengine.api import yaml_builder
from google.appengine.api import yaml_errors
from google.appengine.api import yaml_listener
from google.appengine.api import yaml_object
from google.appengine.ext import db
from mapreduce import base_handler
from mapreduce import errors
from mapreduce import model
# TODO(user): a list of features we'd like to have in status page:
# - show sparklet of entities/sec on index page
# - shard bar chart should color finished shards differently
# mapreduce.yaml file names
MR_YAML_NAMES = ["mapreduce.yaml", "mapreduce.yml"]
class BadStatusParameterError(Exception):
"""A parameter passed to a status handler was invalid."""
class UserParam(validation.Validated):
"""A user-supplied parameter to a mapreduce job."""
ATTRIBUTES = {
"name": r"[a-zA-Z0-9_\.]+",
"default": validation.Optional(r".*"),
"value": validation.Optional(r".*"),
}
class MapperInfo(validation.Validated):
"""Configuration parameters for the mapper part of the job."""
ATTRIBUTES = {
"handler": r".+",
"input_reader": r".+",
"output_writer": validation.Optional(r".+"),
"params": validation.Optional(validation.Repeated(UserParam)),
"params_validator": validation.Optional(r".+"),
}
class MapreduceInfo(validation.Validated):
"""Mapreduce description in mapreduce.yaml."""
ATTRIBUTES = {
"name": r".+",
"mapper": MapperInfo,
"params": validation.Optional(validation.Repeated(UserParam)),
"params_validator": validation.Optional(r".+"),
}
class MapReduceYaml(validation.Validated):
"""Root class for mapreduce.yaml.
File format:
mapreduce:
- name: <mapreduce_name>
mapper:
- input_reader: google.appengine.ext.mapreduce.DatastoreInputReader
- handler: path_to_my.MapperFunction
- params:
- name: foo
default: bar
- name: blah
default: stuff
- params_validator: path_to_my.ValidatorFunction
Where
mapreduce_name: The name of the mapreduce. Used for UI purposes.
mapper_handler_spec: Full <module_name>.<function_name/class_name> of
mapper handler. See MapreduceSpec class documentation for full handler
specification.
input_reader: Full <module_name>.<function_name/class_name> of the
InputReader sub-class to use for the mapper job.
params: A list of optional parameter names and optional default values
that may be supplied or overridden by the user running the job.
params_validator is full <module_name>.<function_name/class_name> of
a callable to validate the mapper_params after they are input by the
user running the job.
"""
ATTRIBUTES = {
"mapreduce": validation.Optional(validation.Repeated(MapreduceInfo))
}
@staticmethod
def to_dict(mapreduce_yaml):
"""Converts a MapReduceYaml file into a JSON-encodable dictionary.
For use in user-visible UI and internal methods for interfacing with
user code (like param validation). as a list
Args:
mapreduce_yaml: The Pyton representation of the mapreduce.yaml document.
Returns:
A list of configuration dictionaries.
"""
all_configs = []
for config in mapreduce_yaml.mapreduce:
out = {
"name": config.name,
"mapper_input_reader": config.mapper.input_reader,
"mapper_handler": config.mapper.handler,
}
if config.mapper.params_validator:
out["mapper_params_validator"] = config.mapper.params_validator
if config.mapper.params:
param_defaults = {}
for param in config.mapper.params:
param_defaults[param.name] = param.default or param.value
out["mapper_params"] = param_defaults
if config.params:
param_defaults = {}
for param in config.params:
param_defaults[param.name] = param.default or param.value
out["params"] = param_defaults
if config.mapper.output_writer:
out["mapper_output_writer"] = config.mapper.output_writer
all_configs.append(out)
return all_configs
# N.B. Sadly, we currently don't have and ability to determine
# application root dir at run time. We need to walk up the directory structure
# to find it.
def find_mapreduce_yaml(status_file=__file__):
"""Traverse directory trees to find mapreduce.yaml file.
Begins with the location of status.py and then moves on to check the working
directory.
Args:
status_file: location of status.py, overridable for testing purposes.
Returns:
the path of mapreduce.yaml file or None if not found.
"""
checked = set()
yaml = _find_mapreduce_yaml(os.path.dirname(status_file), checked)
if not yaml:
yaml = _find_mapreduce_yaml(os.getcwd(), checked)
return yaml
def _find_mapreduce_yaml(start, checked):
"""Traverse the directory tree identified by start until a directory already
in checked is encountered or the path of mapreduce.yaml is found.
Checked is present both to make loop termination easy to reason about and so
that the same directories do not get rechecked.
Args:
start: the path to start in and work upward from
checked: the set of already examined directories
Returns:
the path of mapreduce.yaml file or None if not found.
"""
dir = start
while dir not in checked:
checked.add(dir)
for mr_yaml_name in MR_YAML_NAMES:
yaml_path = os.path.join(dir, mr_yaml_name)
if os.path.exists(yaml_path):
return yaml_path
dir = os.path.dirname(dir)
return None
def parse_mapreduce_yaml(contents):
"""Parses mapreduce.yaml file contents.
Args:
contents: mapreduce.yaml file contents.
Returns:
MapReduceYaml object with all the data from original file.
Raises:
errors.BadYamlError: when contents is not a valid mapreduce.yaml file.
"""
try:
builder = yaml_object.ObjectBuilder(MapReduceYaml)
handler = yaml_builder.BuilderHandler(builder)
listener = yaml_listener.EventListener(handler)
listener.Parse(contents)
mr_info = handler.GetResults()
except (ValueError, yaml_errors.EventError), e:
raise errors.BadYamlError(e)
if len(mr_info) < 1:
raise errors.BadYamlError("No configs found in mapreduce.yaml")
if len(mr_info) > 1:
raise errors.MultipleDocumentsInMrYaml("Found %d YAML documents" %
len(mr_info))
jobs = mr_info[0]
job_names = set(j.name for j in jobs.mapreduce)
if len(jobs.mapreduce) != len(job_names):
raise errors.BadYamlError(
"Overlapping mapreduce names; names must be unique")
return jobs
def get_mapreduce_yaml(parse=parse_mapreduce_yaml):
"""Locates mapreduce.yaml, loads and parses its info.
Args:
parse: Used for testing.
Returns:
MapReduceYaml object.
Raises:
errors.BadYamlError: when contents is not a valid mapreduce.yaml file or the
file is missing.
"""
mr_yaml_path = find_mapreduce_yaml()
if not mr_yaml_path:
raise errors.MissingYamlError()
mr_yaml_file = open(mr_yaml_path)
try:
return parse(mr_yaml_file.read())
finally:
mr_yaml_file.close()
class ResourceHandler(base_handler.BaseHandler):
"""Handler for static resources."""
_RESOURCE_MAP = {
"status": ("overview.html", "text/html"),
"detail": ("detail.html", "text/html"),
"base.css": ("base.css", "text/css"),
"jquery.js": ("jquery-1.6.1.min.js", "text/javascript"),
"jquery-json.js": ("jquery.json-2.2.min.js", "text/javascript"),
"status.js": ("status.js", "text/javascript"),
}
def get(self, relative):
if relative not in self._RESOURCE_MAP:
self.response.set_status(404)
self.response.out.write("Resource not found.")
return
real_path, content_type = self._RESOURCE_MAP[relative]
path = os.path.join(os.path.dirname(__file__), "static", real_path)
self.response.headers["Cache-Control"] = "public; max-age=300"
self.response.headers["Content-Type"] = content_type
self.response.out.write(open(path).read())
class ListConfigsHandler(base_handler.GetJsonHandler):
"""Lists mapreduce configs as JSON for users to start jobs."""
def handle(self):
self.json_response["configs"] = MapReduceYaml.to_dict(get_mapreduce_yaml())
class ListJobsHandler(base_handler.GetJsonHandler):
"""Lists running and completed mapreduce jobs for an overview as JSON."""
def handle(self):
cursor = self.request.get("cursor")
count = int(self.request.get("count", "50"))
query = model.MapreduceState.all()
if cursor:
query.filter("__key__ >=", db.Key(cursor))
query.order("__key__")
jobs_list = query.fetch(count + 1)
if len(jobs_list) == (count + 1):
self.json_response["cursor"] = str(jobs_list[-1].key())
jobs_list = jobs_list[:-1]
all_jobs = []
for job in jobs_list:
out = {
# Data shared between overview and detail pages.
"name": job.mapreduce_spec.name,
"mapreduce_id": job.mapreduce_spec.mapreduce_id,
"active": job.active,
"start_timestamp_ms":
int(time.mktime(job.start_time.utctimetuple()) * 1000),
"updated_timestamp_ms":
int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
# Specific to overview page.
"chart_url": job.sparkline_url,
"chart_width": job.chart_width,
"active_shards": job.active_shards,
"shards": job.mapreduce_spec.mapper.shard_count,
}
if job.result_status:
out["result_status"] = job.result_status
all_jobs.append(out)
self.json_response["jobs"] = all_jobs
class GetJobDetailHandler(base_handler.GetJsonHandler):
"""Retrieves the details of a mapreduce job as JSON."""
def handle(self):
mapreduce_id = self.request.get("mapreduce_id")
if not mapreduce_id:
raise BadStatusParameterError("'mapreduce_id' was invalid")
job = model.MapreduceState.get_by_key_name(mapreduce_id)
if job is None:
raise KeyError("Could not find job with ID %r" % mapreduce_id)
self.json_response.update(job.mapreduce_spec.to_json())
self.json_response.update(job.counters_map.to_json())
self.json_response.update({
# Shared with overview page.
"active": job.active,
"start_timestamp_ms":
int(time.mktime(job.start_time.utctimetuple()) * 1000),
"updated_timestamp_ms":
int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
# Specific to detail page.
"chart_url": job.chart_url,
})
self.json_response["result_status"] = job.result_status
shards_list = model.ShardState.find_by_mapreduce_state(job)
all_shards = []
shards_list.sort(key=lambda x: x.shard_number)
for shard in shards_list:
out = {
"active": shard.active,
"result_status": shard.result_status,
"shard_number": shard.shard_number,
"shard_id": shard.shard_id,
"updated_timestamp_ms":
int(time.mktime(shard.update_time.utctimetuple()) * 1000),
"shard_description": shard.shard_description,
"last_work_item": shard.last_work_item,
}
out.update(shard.counters_map.to_json())
all_shards.append(out)
self.json_response["shards"] = all_shards