-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpem_extract.py
88 lines (74 loc) · 2.54 KB
/
pem_extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python
from __future__ import print_function
import hashlib
import re
import shlex
import subprocess
import sys
class OpenSSLWrapper(object):
"""Class containing helper methods for using OpenSSL."""
@staticmethod
def _exec_openssl(args, indata):
args = shlex.split(args)
proc = subprocess.Popen(
['openssl'] + args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, close_fds=True)
stdout, _ = proc.communicate(indata)
return stdout.strip()
@classmethod
def openssl_hash(cls, pemdata):
options = 'x509 -noout -hash'
return cls._exec_openssl(options, pemdata)
@classmethod
def openssl_fingerprint(cls, pemdata):
options = 'x509 -noout -fingerprint'
return cls._exec_openssl(options, pemdata).split('=', 1)[-1]
class PEMExtractor(object):
"""Hunt for PEM files within another file or file-like object."""
_pem_re = re.compile(
r'-----BEGIN ([^-]+)-----'
r'[A-Za-z0-9+/\n\r]+=*[\r\n]*'
r'-----END \1-----')
def __init__(self, source, block_size=5*1024*1024):
self.block_size = block_size
if isinstance(source, basestring):
self.source = open(source, 'rb')
else:
try:
getattr(source, 'read')
except AttributeError:
raise ValueError(
'source must be a filename or file-like object.')
self.source = source
def walk(self, callback, unique=True):
chunk = ''
seen = set()
while True:
tail = len(chunk)
tmp = self.source.read(self.block_size)
if not tmp:
break
chunk += tmp
for m in self._pem_re.finditer(chunk):
tail = max(tail, m.end())
cert = m.group()
fp = OpenSSLWrapper.openssl_fingerprint(cert)
if not unique or fp not in seen:
callback(cert)
seen.add(fp)
chunk = chunk[tail:]
def save_certs(self):
def save_single_cert(cert):
cert_hash = OpenSSLWrapper.openssl_hash(cert)
filename = '{}.pem'.format(cert_hash)
with open(filename, 'wb') as fp:
fp.write(cert)
print('Wrote {}'.format(filename))
self.walk(save_single_cert)
if __name__ == '__main__':
try:
source = sys.argv[1]
except IndexError:
source = sys.stdin
extractor = PEMExtractor(source)
extractor.save_certs()