This repository has been archived by the owner on Aug 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsort-artifacts
executable file
·347 lines (310 loc) · 12.3 KB
/
sort-artifacts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/python -tt
#
# (c) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import argparse
import atexit
import os.path
import pprint
import shutil
import subprocess
import sys
import tempfile
import pexpect
import rpm
class Distro(object):
"""
A representation of a distribution that uses rpm packages
"""
def __init__(self, topdir, distpaths, arches, disttag,
skip_debuginfo=True):
self.topdir = topdir
self.distpaths = distpaths
self.arches = arches
self.disttag = disttag
self.skip_debuginfo = skip_debuginfo
@property
def distpath(self):
return os.path.join(self.topdir, self.distpaths[0])
def create_dirs(self):
"""
Create the directory structure for this distro underneath the
top-level directory. Directories that already exist will be
left alone.
"""
if not os.path.isdir(self.distpath):
os.makedirs(self.distpath)
for arch in self.arches:
archdir = os.path.join(self.distpath, arch)
if not os.path.isdir(archdir):
os.mkdir(archdir)
for altpath in self.distpaths[1:]:
# Create a relative symlink for each alternate path for this distro
altpath = os.path.join(self.topdir, altpath)
if not os.path.exists(altpath):
relpath = os.path.relpath(self.distpath,
os.path.dirname(altpath.rstrip('/')))
os.symlink(relpath, altpath)
def __find_dests_for_package(self, path):
"""
Find all destinations for an rpm package based on its dist tag
and architecture.
"""
headers = get_rpm_headers(path)
if not headers[rpm.RPMTAG_RELEASE].endswith(self.disttag):
return []
if (self.skip_debuginfo and
headers[rpm.RPMTAG_NAME].endswith('-debuginfo')):
return []
dests = []
for arch in self.arches:
archpath = os.path.join(self.distpath, arch)
if headers[rpm.RPMTAG_SOURCEPACKAGE]:
# Source packages have arches, too, so we need to handle
# them specially. We also don't want to copy noarch
# packages into the source package repository.
if arch == 'source':
dests.append(os.path.join(archpath,
os.path.basename(path)))
elif (headers[rpm.RPMTAG_ARCH] in (arch, 'noarch') and
arch != 'source'):
dests.append(os.path.join(archpath,
os.path.basename(path)))
return dests
def copy_package(self, path, signer=None):
"""
Find all destinations for an rpm package based on its dist tag
and architecture, then optionally sign it and copy it to the
appropriate destinations.
This method will attempt to use hard links where possible.
"""
dests = self.__find_dests_for_package(path)
if signer:
path = signer.sign_rpm(path)
for dest in dests:
if not os.path.exists(self.distpath):
self.create_dirs()
try:
os.link(path, dest)
continue
except OSError:
pass
# All dests are probably on the same filesystem, so see if
# we can use a hard link from the first copy.
if dest != dests[0] and os.path.isfile(dests[0]):
try:
os.link(dests[0], dest)
continue
except OSError:
pass
shutil.copy2(path, dest)
return dests
def create_repo_md(self):
for arch in self.arches:
path = os.path.join(self.distpath, arch)
if os.path.isdir(path):
subprocess.check_call((
'createrepo_c', '--update', '--no-database',
'--retain-old-md-by-age', '6h', path))
class Tarball(object):
"""
Representation of a tar file
"""
def __init__(self, topdir):
self.topdir = topdir
self.dest = os.path.join(self.topdir, 'source')
def copy_tarfile(self, path, signer=None):
destfile = self.dest + "/" + os.path.basename(path)
if not os.path.isdir(self.dest):
os.makedirs(self.dest)
if signer:
ascpath = signer.create_detached_signature(path)
try:
os.link(path, destfile)
except OSError:
pass
if not os.path.isfile(destfile):
shutil.copy2(path, destfile)
if signer:
if not ascpath:
raise Exception(
'GPG signature not created properly for file {0}'
.format(path))
if os.path.exists(ascpath):
os.chmod(ascpath, 0o644)
shutil.move(ascpath, self.dest)
return self.dest
def get_rpm_headers(filename):
rpmts = rpm.ts()
rpmts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
fileno = os.open(filename, os.O_RDONLY)
try:
headers = rpmts.hdrFromFdno(fileno)
finally:
os.close(fileno)
return headers
class GPGKeyFileContextManager(object):
"""
A context manager that uses a file containing a private key to
create a temporary keystore
"""
def __init__(self, privkey_filename):
self.privkey_filename = privkey_filename
self.__previous_gnupghome = None
self.__tempdir = None
with self:
popen = subprocess.Popen(
('gpg', '--list-secret-keys', '--with-colons'),
stdout=subprocess.PIPE)
line = popen.stdout.readline()
popen.communicate()
self.key_id = line.split(':')[4]
def __enter__(self):
try:
self.__tempdir = tempfile.mkdtemp()
self.__previous_gnupghome = os.environ.get('GNUPGHOME')
os.environ['GNUPGHOME'] = self.__tempdir
with open(os.devnull) as devnull:
subprocess.check_call(('gpg', '-q', '--batch', '--import',
self.privkey_filename), stdin=devnull)
except:
self.__exit__(*sys.exc_info())
raise
return self
def __exit__(self, exc_type, exc_val, trace):
if self.__tempdir:
shutil.rmtree(self.__tempdir)
self.__tempdir = None
if self.__previous_gnupghome:
os.environ['GNUPGHOME'] = self.__previous_gnupghome
self.__previous_gnupghome = None
else:
del os.environ['GNUPGHOME']
class GPGKeyIdContextManager(object):
"""
A context manager that refers to a key that is already available
to the system
"""
def __init__(self, key_id):
self.key_id = key_id
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, trace):
pass
class GPGSigner(object):
def __init__(self, contextmgr, password=''):
self.contextmgr = contextmgr
self.password = password
self.__tempdir = tempfile.mkdtemp()
atexit.register(self.__remove_tempdir)
def sign_rpm(self, path):
filename = os.path.basename(path)
dest = os.path.join(self.__tempdir, filename)
shutil.copy2(path, dest)
with self.contextmgr:
args = ['--resign', '-D',
'_gpg_name {0}'.format(self.contextmgr.key_id), dest]
try:
child = pexpect.spawn('rpmsign', args)
child.expect('Enter pass phrase: ')
child.sendline(self.password)
answer = child.expect([r'Pass phrase is good\.',
'Pass phrase check failed'])
child.expect(pexpect.EOF)
child.close()
except pexpect.ExceptionPexpect as exc:
msg = str(exc).splitlines()[0]
raise RuntimeError(msg)
if (not os.WIFEXITED(child.status) or
os.WEXITSTATUS(child.status) != 0 or answer != 0):
print "child.status is {0}".format(child.status)
raise RuntimeError('rpmsign exited with error {0}'
.format(child.status))
return dest
def create_detached_signature(self, path):
filename = os.path.basename(path)
dest = os.path.join(self.__tempdir, filename)
shutil.copy2(path, dest)
with self.contextmgr:
subprocess.check_call(
('gpg', '-q', '--batch', '-u', self.contextmgr.key_id,
'--detach-sign', '--armor', dest))
return dest + '.asc'
def __remove_tempdir(self):
if self.__tempdir:
shutil.rmtree(self.__tempdir)
def find_packages(path):
if os.path.isfile(path):
yield path
else:
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
if filename.endswith('.rpm'):
yield os.path.join(dirpath, filename)
def find_tarfiles(path):
extensions = ['tar.gz', 'tar.gz.asc', 'tar.bz2', 'tar.bz2.asc',
'tar.xz', 'tar.xz.asc']
if os.path.isfile(path) and any(path.endswith(ext) for ext in extensions):
yield path
else:
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
if filename.endswith(tuple(extensions)):
yield os.path.join(dirpath, filename)
def main():
parser = argparse.ArgumentParser(
description='Sort code artifacts into a directory structure')
parser.add_argument('paths', metavar='PATH', nargs='+',
help='one or more file paths to search for artifacts')
parser.add_argument('-d', '--dest', required=True,
help='the top-level directory to sort artifacts into')
parser.add_argument('-p', '--package-set', required=True, action='append',
metavar='DISTTAG:PATH,...:ARCH,...',
help=('combination of dist tag, destination file '
'path(s), and architecture(s) to sort packages '
'into'))
parser.add_argument('-s', '--sign', metavar='KEY:PASSWORD')
parser.add_argument('-v', '--verbose', action='store_true',
help='show where packages were stored')
parser.add_argument('--skip-debuginfo', action='store_true',
help='skip debuginfo packages')
args = parser.parse_args()
distros = []
tarballs = []
signer = None
if args.sign:
if ":" in args.sign:
signer = GPGSigner(GPGKeyIdContextManager(args.sign.split(':')[0]))
else:
signer = GPGSigner(GPGKeyFileContextManager(args.sign))
for package_set in args.package_set:
disttag, paths, arches = package_set.split(':')
distros.append(Distro(args.dest, paths.split(','), arches.split(','),
disttag))
tarballs = Tarball(args.dest)
path_map = {}
for distro in distros:
for path in args.paths:
for package_path in find_packages(path):
path_map[package_path] = distro.copy_package(
package_path, signer)
distro.create_repo_md()
for path in args.paths:
for tarfile_path in find_tarfiles(path):
path_map[tarfile_path] = tarballs.copy_tarfile(tarfile_path,
signer)
if args.verbose:
pprint.pprint(path_map)
if __name__ == '__main__':
main()