forked from luci/luci-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcs.py
362 lines (305 loc) · 11.7 KB
/
gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Accesses files on Google Cloud Storage via Google Cloud Storage Client API.
References:
https://developers.google.com/appengine/docs/python/googlecloudstorageclient/
https://developers.google.com/storage/docs/accesscontrol#Signed-URLs
"""
import base64
import collections
import datetime
import logging
import time
import urllib
import Crypto.Hash.SHA256 as SHA256
import Crypto.PublicKey.RSA as RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS1_v1_5
# The app engine headers are located locally, so don't worry about not finding
# them.
# pylint: disable=F0401
import webapp2
# pylint: enable=F0401
import cloudstorage
# Export some exceptions for users of this module.
# pylint: disable=W0611
from cloudstorage.errors import (
AuthorizationError,
FatalError,
ForbiddenError,
NotFoundError,
TransientError)
import config
from components import utils
# The limit is 32 megs but it's a tad on the large side. Use 512kb chunks
# instead to not clog memory when there's multiple concurrent requests being
# served.
CHUNK_SIZE = 512 * 1024
# Return value for get_file_info call.
FileInfo = collections.namedtuple('FileInfo', ['size'])
def list_files(bucket, subdir=None, batch_size=100):
"""Yields filenames and stats of files inside subdirectory of a bucket.
It always lists directories recursively.
Arguments:
bucket: a bucket to list.
subdir: subdirectory to list files from or None for an entire bucket.
Yields:
Tuples of (filename, stats), where filename is relative to the bucket root
directory.
"""
# When listing an entire bucket, gcs expects /<bucket> without ending '/'.
path_prefix = '/%s/%s' % (bucket, subdir) if subdir else '/%s' % bucket
bucket_prefix = '/%s/' % bucket
marker = None
retry_params = _make_retry_params()
while True:
files_stats = cloudstorage.listbucket(
path_prefix=path_prefix,
marker=marker,
max_keys=batch_size,
retry_params=retry_params)
# |files_stats| is an iterable, need to iterate through it to figure out
# whether it's empty or not.
empty = True
for stat in files_stats:
# Restart next listing from the last fetched file.
marker = stat.filename
# pylint: disable=C0301
# https://developers.google.com/appengine/docs/python/googlecloudstorageclient/gcsfilestat_class
if stat.is_dir:
continue
empty = False
assert stat.filename.startswith(bucket_prefix)
yield stat.filename[len(bucket_prefix):], stat
# Last batch was empty -> listed all files.
if empty:
break
def delete_file(bucket, filename, ignore_missing=False):
"""Deletes one file stored in GS.
Arguments:
bucket: a bucket that contains the files.
filename: file path to delete (relative to a bucket root).
ignore_missing: if True, will silently skip missing files, otherwise will
print a warning to log.
"""
retry_params = _make_retry_params()
max_tries = 4
for i in xrange(max_tries+1):
try:
cloudstorage.delete(
'/%s/%s' % (bucket, filename), retry_params=retry_params)
return
except cloudstorage.errors.NotFoundError:
if not ignore_missing:
logging.warning(
'Trying to delete a GS file that\'s not there: /%s/%s',
bucket, filename)
return
except cloudstorage.errors.TransientError as e:
if i == max_tries:
raise
time.sleep(1 + i * 2)
continue
except cloudstorage.errors.FatalError as e:
if 'But got status 429' in e.message:
if i == max_tries:
raise
# There's a bug in cloudstorage.check_status() that mishandles HTTP
# 429.
time.sleep(1 + i * 2)
continue
raise
def delete_files(bucket, filenames, ignore_missing=False):
"""Deletes multiple files stored in GS.
Arguments:
bucket: a bucket that contains the files.
filenames: list of file paths to delete (relative to a bucket root).
ignore_missing: if True, will silently skip missing files, otherwise will
print a warning to log.
Returns:
An empty list so this function can be used with functions that expect
the RPC to return a Future.
"""
# Sadly Google Cloud Storage client library doesn't support batch deletes,
# so do it one by one.
for filename in filenames:
delete_file(bucket, filename, ignore_missing)
return []
def get_file_info(bucket, filename):
"""Returns information about stored file.
Arguments:
bucket: a bucket that contains the file.
filename: path to a file relative to bucket root.
Returns:
FileInfo object or None if no such file.
"""
try:
stat = cloudstorage.stat(
'/%s/%s' % (bucket, filename), retry_params=_make_retry_params())
return FileInfo(size=stat.st_size)
except cloudstorage.errors.NotFoundError:
return None
def read_file(bucket, filename, chunk_size=CHUNK_SIZE):
"""Reads a file and yields its content in chunks of a given size.
Arguments:
bucket: a bucket that contains the file.
filename: name of the file to read.
chunk_size: maximum size of a chunk to read and yield.
Yields:
Chunks of a file (as str objects).
"""
path = '/%s/%s' % (bucket, filename)
bytes_read = 0
data = None
file_ref = None
try:
with cloudstorage.open(
path,
read_buffer_size=chunk_size,
retry_params=_make_retry_params()) as file_ref:
while True:
data = file_ref.read(chunk_size)
if not data:
break
bytes_read += len(data)
yield data
# Remove reference to a buffer so it can be GC'ed.
data = None
except Exception as exc:
logging.warning(
'Exception while reading \'%s\', read %d bytes: %s %s',
path, bytes_read, exc.__class__.__name__, exc)
raise
finally:
# Remove lingering references to |data| and |file_ref| so they get GC'ed
# sooner. Otherwise this function's frame object keeps references to them,
# A frame object is around as long as there are references to this
# generator instance somewhere.
data = None
file_ref = None
def write_file(bucket, filename, content):
"""Stores the given content as a file in Google Storage.
Overwrites a file if it exists.
Arguments:
bucket: a bucket to store a file to.
filename: name of the file to write.
content: iterable that produces chunks of a content to write.
Returns:
True if successfully written a file, False on error.
"""
written = 0
last_chunk_size = 0
try:
with cloudstorage.open(
'/%s/%s' % (bucket, filename), 'w',
retry_params=_make_retry_params()) as f:
for chunk in content:
last_chunk_size = len(chunk)
f.write(chunk)
written += last_chunk_size
return True
except cloudstorage.errors.Error as exc:
logging.error(
'Failed to write to a GS file.\n'
'\'/%s/%s\', wrote %d bytes, failed at writting %d bytes: %s %s',
bucket, filename, written, last_chunk_size, exc.__class__.__name__, exc)
# Delete an incomplete file.
delete_file(bucket, filename, ignore_missing=True)
return False
def _make_retry_params():
"""RetryParams structure configured to store access token in Datastore."""
# Note that 'cloudstorage.set_default_retry_params' function stores retry
# params in per-request thread local storage, which means it needs to be
# called for each request. Since we are wrapping all cloudstorage library
# calls anyway, it's more convenient just to pass RetryParams explicitly,
# instead of making it a default for request with 'set_default_retry_params'.
return cloudstorage.RetryParams(save_access_token=True)
class URLSigner(object):
"""Object that can generated signed Google Storage URLs."""
# Default expiration time for signed links.
DEFAULT_EXPIRATION = datetime.timedelta(hours=4)
# Google Storage URL template for a singed link.
GS_URL = 'https://%(bucket)s.storage.googleapis.com/%(filename)s?%(query)s'
# True if switched to a local dev mode.
DEV_MODE_ENABLED = False
@staticmethod
def switch_to_dev_mode():
"""Enables GS mock for a local dev server.
Returns:
List of webapp2.Routes objects to add to the application.
"""
assert utils.is_local_dev_server(), 'Must not be run in production'
if not URLSigner.DEV_MODE_ENABLED:
# Replace GS_URL with a mocked one.
URLSigner.GS_URL = (
'http://%s/_gcs_mock/' % config.get_local_dev_server_host())
URLSigner.GS_URL += '%(bucket)s/%(filename)s?%(query)s'
URLSigner.DEV_MODE_ENABLED = True
class LocalStorageHandler(webapp2.RequestHandler):
"""Handles requests to a mock GS implementation."""
def get(self, bucket, filepath):
"""Read a file from a mocked GS, return 404 if not found."""
try:
with cloudstorage.open('/%s/%s' % (bucket, filepath), 'r') as f:
self.response.out.write(f.read())
self.response.headers['Content-Type'] = 'application/octet-stream'
except cloudstorage.errors.NotFoundError:
self.abort(404)
def put(self, bucket, filepath):
"""Stores a file in a mocked GS."""
with cloudstorage.open('/%s/%s' % (bucket, filepath), 'w') as f:
f.write(self.request.body)
endpoint = r'/_gcs_mock/<bucket:[a-z0-9\.\-_]+>/<filepath:.*>'
return [webapp2.Route(endpoint, LocalStorageHandler)]
def __init__(self, bucket, client_id, private_key):
self.bucket = str(bucket)
self.client_id = str(client_id)
self.private_key = URLSigner.load_private_key(private_key)
@staticmethod
def load_private_key(private_key):
"""Converts base64 *.der private key into RSA key instance."""
# Empty private key is ok in a dev mode.
if URLSigner.DEV_MODE_ENABLED and not private_key:
return None
binary = base64.b64decode(private_key)
return RSA.importKey(binary)
def generate_signature(self, data_to_sign):
"""Signs |data_to_sign| with a private key and returns a signature."""
# Signatures are not used in a dev mode.
if self.DEV_MODE_ENABLED:
return 'fakesig'
# Sign it with RSA-SHA256.
signer = PKCS1_v1_5.new(self.private_key)
signature = base64.b64encode(signer.sign(SHA256.new(data_to_sign)))
return signature
def get_signed_url(self, filename, http_verb, expiration=DEFAULT_EXPIRATION,
content_type='', content_md5=''):
"""Returns signed URL that can be used by clients to access a file."""
# Prepare data to sign.
filename = str(filename)
expires = str(int(time.time() + expiration.total_seconds()))
data_to_sign = '\n'.join([
http_verb,
content_md5,
content_type,
expires,
'/%s/%s' % (self.bucket, filename),
])
# Construct final URL.
query_params = urllib.urlencode([
('GoogleAccessId', self.client_id),
('Expires', expires),
('Signature', self.generate_signature(data_to_sign)),
])
return self.GS_URL % {
'bucket': self.bucket,
'filename': filename,
'query': query_params}
def get_download_url(self, filename, expiration=DEFAULT_EXPIRATION):
"""Returns signed URL that can be used to download a file."""
return self.get_signed_url(filename, 'GET', expiration=expiration)
def get_upload_url(self, filename, expiration=DEFAULT_EXPIRATION,
content_type='', content_md5=''):
"""Returns signed URL that can be used to upload a file."""
return self.get_signed_url(filename, 'PUT', expiration=expiration,
content_type=content_type, content_md5=content_md5)