-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmosrs-auth
executable file
·203 lines (163 loc) · 5.87 KB
/
mosrs-auth
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python3
import argparse
from configparser import ConfigParser
from getpass import getpass
import subprocess
from hashlib import md5
import os
import os.path
import logging
from subprocess import Popen, PIPE
from binascii import hexlify
import urllib.request
from urllib.parse import unquote, quote
import textwrap
rosie_key = 'rosie:https:code.metoffice.gov.uk'
svn_realm = b'<https://code.metoffice.gov.uk:443> Met Office Code'
svn_key = md5(svn_realm).hexdigest()
script_dir = os.path.dirname(__file__)
class GPGError(Exception):
pass
def get_passphrase(cache_id, prompt="X"):
"""
Get a passphrase from the cache
https://www.gnupg.org/documentation/manuals/gnupg/Agent-GET_005fPASSPHRASE.html
"""
prompt = quote(prompt)
stdout = send("GET_PASSPHRASE --data %s X %s X\n"%(cache_id, prompt))
return unquote(stdout[0][2:])
def clear_passphrase(cache_id):
"""
Remove a passphrase from the cache
https://www.gnupg.org/documentation/manuals/gnupg/Agent-GET_005fPASSPHRASE.html
"""
send("CLEAR_PASSPHRASE %s\n"%cache_id)
def preset_passphrase(keygrip, passphrase):
"""
Add a passphrase to the cache for `keygrip`
https://www.gnupg.org/documentation/manuals/gnupg/Agent-PRESET_005fPASSPHRASE.html
"""
# Only -1 is allowed for timeout
timeout = -1
assert(passphrase is not None)
if isinstance(passphrase, str):
passphrase = passphrase.encode('utf-8')
send("PRESET_PASSPHRASE %s %s %s\n"%(keygrip, timeout, passphrase.hex()))
def send(message):
"""
Connect to the agent and send a message
"""
agent = Popen(['gpg-connect-agent'],
bufsize = 0,
stdout = PIPE,
stdin = PIPE,
encoding = 'ascii',
)
stdout, stderr = agent.communicate(message)
if agent.returncode != 0:
raise Exception("ERROR connecting to gpg-agent. Try removing the file '~/.gpg-agent-info' and relogging")
_check_return(message,stdout)
return stdout.split('\n')[0:-2]
def _check_return(message,stdout):
"""
Check status returned on last line
"""
result = stdout.split('\n')[-2]
if result != "OK":
raise GPGError(message,result)
def get_username():
config = ConfigParser()
config.read(os.path.expanduser('~/.subversion/servers'))
return config.get('metofficesharedrepos','username')
def get_password():
svn_pass = get_passphrase(svn_key, prompt="MOSRS Password")
return svn_pass
def check_password(user, password):
h = urllib.request.HTTPBasicAuthHandler()
h.add_password(realm="Met Office Code", uri='https://code.metoffice.gov.uk', user=user, passwd=password)
urllib.request.install_opener(urllib.request.build_opener(h))
r = urllib.request.urlopen('https://code.metoffice.gov.uk/rosie/u/hello')
if r.status != 200:
raise Exception(r.status)
# Create the svn auth file
os.makedirs(os.path.expanduser('~/.subversion/auth/svn.simple'), exist_ok=True)
with open(os.path.expanduser(f'~/.subversion/auth/svn.simple/{svn_key}'), 'w') as f:
f.write(textwrap.dedent(f"""\
K 8
passtype
V 9
gpg-agent
K 15
svn:realmstring
V {len(svn_realm)}
{svn_realm.decode('ascii')}
K 8
username
V {len(user)}
{user}
END
"""))
subprocess.run(['svn','info','https://code.metoffice.gov.uk/svn/um', '--non-interactive'], stdout=subprocess.DEVNULL, check=True)
def check_host_independent():
"""
Check GPG can run on multiple hosts
Either:
GPG_AGENT_INFO defined and is not ~/.gnupg/S.gpg-agent
/run/user/$UID exists
"""
agent_info = os.environ.get('GPG_AGENT_INFO', None)
default_socket = os.path.expanduser('~/.gnupg/S.gpg-agent')
if (agent_info is not None) and (not agent_info.startswith(default_socket)):
# GPG_AGENT_INFO set to something other than default
return
elif os.path.isdir(f'/run/user/{os.getuid()}'):
# Running outside of singularity, socket will be placed under here
return
print(f"""WARNING: gpg-agent may break if it is being run on multiple ARE nodes simultaneously.
Consider adding to ~/.bashrc to avoid this:
source {script_dir}/gpg-agent-setup
""")
def main():
parser = argparse.ArgumentParser(description="Store your MOSRS password in gpg-agent")
parser.add_argument('--force', action='store_true', help="forget any previous password")
parser.add_argument('--print-password', action='store_true', help="print stored password")
args = parser.parse_args()
try:
user = get_username()
print(f"MOSRS username {user} - change in ~/.subversion/servers")
print(f"Current project {os.environ['PROJECT']}")
except:
print("""ERROR: Unknown user
Add to ~/.subversion/servers (use the existing [groups] if it already exists):
[groups]
metofficesharedrepos = code*.metoffice.gov.uk
[metofficesharedrepos]
username = unknown # set to your username
store-plaintext-passwords = no
""")
return
# check_host_independent()
if args.force:
# Clear an existing passphrase
clear_passphrase(rosie_key)
clear_passphrase(svn_key)
# Request the passphrase, prompting if needed
password = get_password()
if args.print_password:
print(password)
try:
# Check the passphrase
print("Checking stored password...")
check_password(user, password)
# Also save the passphrase for rose
preset_passphrase(rosie_key, password)
print("Authentication confirmed")
except Exception as e:
print("ERROR checking password")
# Clear the bad passphrase
clear_passphrase(rosie_key)
clear_passphrase(svn_key)
print(e)
if __name__ == '__main__':
os.environ['GPG_TTY'] = os.readlink('/proc/self/fd/0')
main()