forked from HuJK/DN42-AutoPeer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDN42AutoPeer.py
executable file
·1860 lines (1775 loc) · 87.4 KB
/
DN42AutoPeer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
import os
import re
import jwt
import time
import pgpy
import yaml
import json
import shlex
import errno
import shutil
import random
import string
import socket
import base64
import pathlib
import asyncio
import hashlib
import OpenSSL
import textwrap
import requests
import datetime
import ipaddress
import traceback
import nacl.public
import tornado.web
import tornado.gen
from git import Repo
import tornado.ioloop
import multiprocessing
from urllib import parse
import tornado.httpclient
import requests.packages.urllib3
from Crypto.PublicKey import RSA
from ipaddress import IPv4Network , IPv6Network , IPv4Interface , IPv6Interface, IPv4Address , IPv6Address
from ipaddress import IPv6Network
from subprocess import Popen, PIPE, STDOUT
from tornado.httpclient import HTTPClientError
import DN42whois
from DN42GIT import DN42GIT
requests.packages.urllib3.disable_warnings()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--envfile",action='append', help="envfile", type=str)
parser.add_argument("-c", "--config",action='store', help="envfile", type=str)
parser.add_argument("-p", "--parms",action='store', help="envfile", type=str)
args = parser.parse_args()
envs = {}
if args.envfile:
for efs in args.envfile:
es = open(efs).read().split("\n")
for e in es:
if "=" in e:
k,v = e.split("=",1)
os.environ[k] = v
confpath = "my_config.yaml"
parmpath = "my_parameters.yaml"
peerHostDisplayText = "(Peer endpoint hidden, authenticate to show)"
if args.config:
confpath = args.config
if args.parms:
parmpath = args.parms
print("Starting...")
os.environ['GIT_SSH_COMMAND'] = "ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no"
my_paramaters = yaml.load(open(parmpath).read(),Loader=yaml.Loader)
my_config = yaml.load(open(confpath).read(),Loader=yaml.Loader)
if my_config["jwt_secret"] == None:
my_config["jwt_secret"] = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
open(confpath,"w").write(yaml.dump(my_config, sort_keys=False))
jwt_secret = my_config["jwt_secret"]
def try_read_env(params,pkey,ekey,ValType=str,default=None):
if ekey in os.environ:
if ValType == bool:
params[pkey] = os.environ[ekey].lower() == "true"
elif ValType == "json":
params[pkey] = json.loads( os.environ[ekey] )
else:
params[pkey] = ValType(os.environ[ekey])
print(f"Load {pkey} from env, val: {params[pkey]}")
if pkey not in params:
if default == None:
raise ValueError("default for " + pkey + " are not set")
params[pkey] = default
use_speed_limit = False
if "WG_SPEED_LIMIT" in os.environ:
try:
if int(os.environ["WG_SPEED_LIMIT"]) > 0:
use_speed_limit = True
except:
pass
try_read_env(my_paramaters,"myIPV4",'DN42_IPV4')
try_read_env(my_paramaters,"myIPV6",'DN42_IPV6')
try_read_env(my_paramaters,"myIPV4LL",'DN42_IPV4_LL')
try_read_env(my_paramaters,"myIPV6LL",'DN42_IPV6_LL')
try_read_env(my_paramaters,"myIPV4",'DN42AP_MY_IPV4')
try_read_env(my_paramaters,"myIPV6",'DN42AP_MY_IPV6')
try_read_env(my_paramaters,"myIPV4LL",'DN42AP_MY_IPV4_LL')
try_read_env(my_paramaters,"myIPV6LL",'DN42AP_MY_IPV6_LL')
try_read_env(my_paramaters,"myHost",'DN42AP_ENDPOINT')
try_read_env(my_paramaters,"myHostDisplay",'DN42AP_HOST_DISPLAY')
try_read_env(my_paramaters,"myASN",'DN42_E_AS')
try_read_env(my_paramaters,"myContact",'DN42_CONTACT')
try_read_env(my_paramaters,"allowExtNh",'DN42AP_ALLOW_ENH',bool,False)
try_read_env(my_config,"myHostHidden",'DN42AP_HOST_HIDDEN',bool,False)
try_read_env(my_config,"peerEndpointHidden",'DN42AP_PEER_ENDPOINT_HIDDEN',bool,False)
try_read_env(my_config,"registerAdminOnly",'DN42AP_REGISTER_ADMINONLY',bool,False)
try_read_env(my_config,"html_title",'DN42AP_TITLE')
try_read_env(my_config,"git_repo_url",'DN42AP_GIT_REPO_URL')
try_read_env(my_config,"listen_host",'DN42AP_LISTEN_HOST')
try_read_env(my_config,"listen_port",'DN42AP_PORT')
try_read_env(my_config,"myWG_Pri_Key",'WG_PRIVKEY')
try_read_env(my_config,"urlprefix",'DN42AP_URLPREFIX')
try_read_env(my_config,"wgconfpath",'DN42AP_WGCONFPATH')
try_read_env(my_config,"bdconfpath",'DN42AP_BIRDCONFPATH')
try_read_env(my_config,"gitsyncpath",'DN42AP_GIT_SYNC_PATH')
try_read_env(my_config,"admin_mnt",'DN42AP_ADMIN')
try_read_env(my_config,"register_redirect",'DN42AP_REGISTER_REDIRECT')
try_read_env(my_config,"wg_port_search_range",'DN42AP_PORT_RANGE',str)
try_read_env(my_config,"dn42_whois_server","DN42AP_WHOIS_SERVER","json")
try_read_env(my_config,"dn42repo_base","DN42AP_REPO_BASE",str)
try_read_env(my_config,"init_device",'DN42AP_INIT_DEVICE',bool)
try_read_env(my_config,"reset_wgconf_interval",'DN42AP_RESET_WGCONF',int,0)
RRstate_repo = DN42GIT(my_config["gitsyncpath"])
use_remote_command = ""
if "DN42AP_REMOTE_COMMAND" in os.environ:
use_remote_command = os.environ['DN42AP_REMOTE_COMMAND']
def es2none(p):
if p == "":
return None
return p
my_paramaters["myIPV4"] = es2none(my_paramaters["myIPV4"])
my_paramaters["myIPV6"] = es2none(my_paramaters["myIPV6"])
my_paramaters["myIPV6LL"] = es2none(my_paramaters["myIPV6LL"])
my_paramaters["myHost"] = es2none(my_paramaters["myHost"])
my_paramaters["myASN"] = my_paramaters["myASN"] if my_paramaters["myASN"].startswith("AS") else "AS" + my_paramaters["myASN"]
node_name = ""
try:
node_name = os.environ['NODE_NAME']
except Exception as e:
pass
wgconfpath = my_config["wgconfpath"]
bdconfpath = my_config["bdconfpath"]
pathlib.Path(wgconfpath + "/peerinfo").mkdir(parents=True, exist_ok=True)
client_valid_keys = ["peer_plaintext","peer_pub_key_pgp","peer_signature", "peerASN","peerName", "hasIPV4", "peerIPV4","hasIPV4LL","peerIPV4LL", "hasIPV6", "peerIPV6", "hasIPV6LL", "peerIPV6LL","MP_BGP","Ext_Nh", "hasHost", "peerHost", "peerWG_Pub_Key","peerWG_PS_Key", "peerContact", "PeerID","myIPV4","myIPV6","myIPV4LL","myIPV6LL","customDevice","customDeviceSetup","myWG_Pri_Key","transitMode","myWG_MTU","birdAddConf"]
client_valid_keys_admin_only = ["customDevice","customDeviceSetup","myWG_Pri_Key","peerName","birdAddConf","transitMode","myWG_MTU"]
dn42repo_base = my_config["dn42repo_base"]
DN42_valid_ipv4s = my_config["DN42_valid_ipv4s"]
DN42_valid_ipv6s = my_config["DN42_valid_ipv6s"]
valid_ipv4_lilo = my_config["valid_ipv4_linklocal"]
valid_ipv6_lilo = my_config["valid_ipv6_linklocal"]
wg_allowed_ips = DN42_valid_ipv4s + DN42_valid_ipv6s + [valid_ipv4_lilo , valid_ipv6_lilo ]
whois = DN42whois.whois(*my_config["dn42_whois_server"])
whois_query = whois.query
method_hint = {"ssh-rsa":"""<h4>Paste following command to your terminal to get your signature.</h4>
<code>
echo -n "{text2sign}" | ssh-keygen -Y sign -n dn42ap -f ~/.ssh/id_rsa
</code>""",
"ssh-ed25519":"""<h4>Paste following command to your terminal to get your signature.</h4>
<code>
echo -n "{text2sign}" | ssh-keygen -Y sign -n dn42ap -f ~/.ssh/id_ed25519
</code>""",
"pgp-fingerprint": """<h4>Paste following command to your terminal to get your PGP public key and signature.</h4>
<code>
# Export PGP public key<br>
gpg --armor --export --fingerprint {fingerprint}<br>
<br>
# sign message with your PGP private key<br>
echo -n "{text2sign}" | gpg --clearsign --detach-sign -u {fingerprint}<br>
<br>
# Done. You can copy the signature now<br>
</code>""",
"PGPKEY": """<h4>Paste following command to your terminal to get your PGP public key and signature.</h4>
<code>
# Export PGP public key<br>
gpg --armor --export --fingerprint {fingerprint}<br>
<br>
# sign message with your PGP private key<br>
echo -n "{text2sign}" | gpg --clearsign --detach-sign -u {fingerprint}<br>
<br>
# Done. You can copy the signature now<br>
</code>"""
}
async def get_signature_html(baseURL,paramaters):
peerASN = paramaters["peerASN"]
peerMNT, peerADM = await get_info_from_asn(peerASN)
try:
peerADMname = (await get_person_info(peerADM))["person"][0]
except Exception as e:
peerADMname = ""
methods = await get_auth_method(peerMNT, peerADM)
text2sign = jwt.encode({'ASN': peerASN, "exp":datetime.datetime.utcnow() + datetime.timedelta(minutes = 30) }, jwt_secret, algorithm='HS256')
methods_class = {"Supported":{},"Unsupported":{}}
for m,v in methods:
if m in method_hint:
if m not in methods_class["Supported"]:
methods_class["Supported"][m] = []
methods_class["Supported"][m] += [v]
if m in { "PGPKEY" , "pgp-fingerprint" }:
if paramaters["peer_pub_key_pgp"] == "":
paramaters["peer_pub_key_pgp"] = await try_get_pub_key(v)
else:
if m not in methods_class["Unsupported"]:
methods_class["Unsupported"][m] = []
methods_class["Unsupported"][m] += [v]
retstr = f"""<!DOCTYPE html>
<html>
<head>
<title>{ my_config["html_title"] }</title>
<a href="{ my_config["git_repo_url"] }" class="github-corner" aria-label="View source on GitHub"><svg width="80" height="80" viewBox="0 0 250 250" style="fill:#64CEAA; color:#fff; position: absolute; top: 0; border: 0; right: 0;" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a><style>.github-corner:hover .octo-arm{{animation:octocat-wave 560ms ease-in-out}}@keyframes octocat-wave{{0%,100%{{transform:rotate(0)}}20%,60%{{transform:rotate(-25deg)}}40%,80%{{transform:rotate(10deg)}}}}@media (max-width:500px){{.github-corner:hover .octo-arm{{animation:none}}.github-corner .octo-arm{{animation:octocat-wave 560ms ease-in-out}}}}</style>
<style type="text/css">
code {{display: block; /* fixes a strange ie margin bug */font-family: Courier New;font-size: 11pt;overflow:auto;background: #f0f0f0 url() left top repeat-y;border: 10px solid white;padding: 10px 10px 10px 21px;max-height:1000px;line-height: 1.2em;}}
table {{
table-layout: fixed;
width: 100%;
}}
table td {{
word-wrap: break-word; /* All browsers since IE 5.5+ */
overflow-wrap: break-word; /* Renamed property in CSS3 draft spec */
}}
textarea {{
width: 100%;
height:87px;
}}
input[type="text"] {{
width: 100%;
}}
</style>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
</head>
<body>
<h2>{ my_config["html_title"] }</h2>
<h3>Dear { peerADMname }:</h3>
"""
if len(methods_class["Supported"]) == 0:
retstr += f"""<h4> Sorry, we couldn't find any available authentication method in your <a href="{baseURL}/data/mntner/{peerMNT}" target="_blank">mntner</a> object or <a href="{baseURL}/data/person/{peerADM}" target="_blank"> admin contact</a> in the DN42 registry.</h4><h4>Please <a href="{my_paramaters["myContact"]}" target="_blank">contact me</a> to peer manually.</h4>"""
else:
retstr += f"""<h4> Please sign our message with your private key registered in your <a href="{baseURL}/data/mntner/{peerMNT}" target="_blank">mntner object</a> or <a href="{baseURL}/data/person/{peerADM}" target="_blank"> admin contact</a> in the DN42 registry.</h4>"""
retstr += "<h3><font color='red'><b>Supported</b></font> auth method: </h3>" if len(list(methods_class["Supported"].keys())) != 0 else ""
for m,v in methods_class["Supported"].items():
retstr += f"""<table class="table"><tr><td><b>Allowed {m}(s): </b></td></tr>"""
for v_item in v:
retstr += f"""<tr><td>{v_item}</td></tr>"""
retstr += "</table>"
retstr += method_hint[m].format(text2sign = text2sign,fingerprint=v[0])
retstr += "<h4>Unupported auth method: </h4>" if len(list(methods_class["Unsupported"].keys())) != 0 else ""
for m,v in methods_class["Unsupported"].items():
retstr += f"""<table class="table"><tr><td><b>{m}</b></td></tr>"""
for v_item in v:
retstr += f"""<tr><td>{v_item}</td></tr>"""
retstr += "</table>"
retstr += f"""
<br>
<form action="action_page.php" method="post">\n"""
paramaters = { valid_key: paramaters[valid_key] for valid_key in client_valid_keys if valid_key in paramaters}
paramaters["peer_plaintext"] = text2sign
for k,v in paramaters.items():
if k in client_valid_keys_admin_only:
continue
if v == None:
v = ""
elif v == True:
v = "on"
retstr += f'<input type="hidden" name="{k}" value="{v}">\n'
retstr +="""<input type="submit" name="action" value="OK" />
</form>
</body>
</html>
"""
return retstr
def wgpri2pub(pri):
try:
pb = base64.b64decode(pri)
pp = nacl.public.PrivateKey(pb)
return base64.b64encode(bytes(pp.public_key)).decode("ascii")
except Exception as e:
return "Wireguard Key: " + str(e)
async def get_html(paramaters,action="OK",peerSuccess=False):
peer_plaintext = paramaters["peer_plaintext"]
peer_pub_key_pgp = paramaters["peer_pub_key_pgp"]
peer_signature = paramaters["peer_signature"]
peerASN = paramaters["peerASN"]
hasIPV4 = paramaters["hasIPV4"]
hasIPV4Disabled = ""
peerIPV4 = paramaters["peerIPV4"]
hasIPV6 = paramaters["hasIPV6"]
hasIPV6Disabled = ""
peerIPV6 = paramaters["peerIPV6"]
hasIPV4LL = paramaters["hasIPV4LL"]
hasIPV4LLDisabled = ""
peerIPV4LL = paramaters["peerIPV4LL"]
hasIPV6LL = paramaters["hasIPV6LL"]
hasIPV6LLDisabled = ""
peerIPV6LL = paramaters["peerIPV6LL"]
MP_BGP = paramaters["MP_BGP"]
MP_BGP_Disabled = ""
Ext_Nh = paramaters["Ext_Nh"]
Ext_Nh_Disabled = ""
hasHost = paramaters["hasHost"]
hasHost_Readonly = ""
peerHost = paramaters["peerHost"]
peerHostDisplay = peerHostDisplayText
peerWG_Pub_Key = paramaters["peerWG_Pub_Key"]
peerWG_PS_Key = paramaters["peerWG_PS_Key"]
peerContact = paramaters["peerContact"]
PeerID = paramaters["PeerID"]
myASN = paramaters["myASN"]
myHost = paramaters["myHost"]
myIPV4 = paramaters["myIPV4"]
myIPV6 = paramaters["myIPV6"]
myIPV4LL = paramaters["myIPV4LL"]
myIPV6LL = paramaters["myIPV6LL"]
myWG_Pub_Key = paramaters["myWG_Pub_Key"]
myContact = paramaters["myContact"]
if myIPV4 == None:
myIPV4 = ""
if myIPV6 == None:
myIPV6 = ""
if myIPV4LL == None:
myIPV4LL = ""
if myIPV6LL == None:
myIPV6LL = ""
if myHost == None:
myHost = ""
if myIPV4 == "":
hasIPV4 = False
hasIPV4Disabled = "disabled"
peerIPV4 = "Sorry, I don't support IPv4 address."
if myIPV6 == "":
hasIPV6 = False
hasIPV6Disabled = "disabled"
peerIPV6 = "Sorry, I don't support IPv6 address."
if myIPV4LL == "":
hasIPV4LL = False
hasIPV4LLDisabled = "disabled"
peerIPV4LL = "Sorry, I don't support IPv4 link local address."
if myIPV6LL == "":
hasIPV6LL = False
hasIPV6LLDisabled = "disabled"
peerIPV6LL = "Sorry, I don't support IPv6 link local address."
if not (myIPV4!="") and (myIPV6!="" or myIPV6LL!=""):
MP_BGP_Disabled = "disabled"
Ext_Nh_Disabled = "disabled"
if my_config["peerEndpointHidden"] and action == "Show":
if peer_signature != "" and peer_signature != None:
try:
mntner = await verify_user_signature(paramaters["peerASN"],paramaters["peer_plaintext"],paramaters["peer_pub_key_pgp"],peer_signature)
peerHostDisplay = peerHost
except Exception as e:
pass
else:
peerHostDisplay = peerHost
if myHost == "":
hasHost = True
hasHost_Readonly = 'onclick="alert(\\"Sorry, I don\'t have a public IP so that your endpoint can\'t be null.\\");return false;"'
hasHost_Readonly = 'onclick="alert(\'Sorry, I don\\\'t have a public IP so that your endpoint can\\\'t be null.\');return false;";'
myHostDisplay = paramaters["myHostDisplay"]
else:
if PeerID == None:
myHostDisplay = "(Register to get the endpoint) :"
else:
myHostDisplay = "(Authenticate to show the endpoint) :"
if my_config["myHostHidden"]:
if peer_signature != "" and peer_signature != None:
try:
mntner = await verify_user_signature(paramaters["peerASN"],paramaters["peer_plaintext"],paramaters["peer_pub_key_pgp"],peer_signature)
myHostDisplay = myHost + ":"
except Exception as e:
pass
else:
myHostDisplay = myHost + ":"
if PeerID == None:
myHostDisplay += f" [{my_config['wg_port_search_range']}]"
else:
myHostDisplay += str(PeerID)
edit_btn_disabled = "disabled"
try:
peerInfo = yaml.load(open(wgconfpath + "/peerinfo/" + str(paramaters["PeerID"]) + ".yaml").read(),Loader=yaml.SafeLoader)
edit_btn_disabled = ""
except FileNotFoundError as e:
pass
jsscripts = """
prevVars={
v4: "hasIPV4",
v6: "hasIPV6LL",
nov4: "v4only"
}
function getV4() {
return document.getElementsByName("hasIPV4")[0].checked || document.getElementsByName("hasIPV4LL")[0].checked
}
function getV6() {
return document.getElementsByName("hasIPV6")[0].checked || document.getElementsByName("hasIPV6LL")[0].checked
}
function onV4() {
if (document.getElementsByName("hasIPV4")[0].checked == true){
document.getElementsByName("hasIPV4LL")[0].checked = false;
document.getElementsByName("Ext_Nh")[0].checked = false;
prevVars.v4 = "hasIPV4"
} else {
if( (getV4() || getV6()) == false){
alert("We can't establish BGP session without any IP.")
document.getElementsByName("hasIPV4")[0].checked = true;
return false;
}
if (prevVars.nov4 == "enh"){
document.getElementsByName("Ext_Nh")[0].checked = true
} else {
document.getElementsByName("MP_BGP")[0].checked = false
}
}
}
function onV4LL() {
if (document.getElementsByName("hasIPV4LL")[0].checked == true){
document.getElementsByName("hasIPV4")[0].checked = false;
document.getElementsByName("Ext_Nh")[0].checked = false;
prevVars.v4 = "hasIPV4LL"
} else {
if( (getV4() || getV6()) == false){
alert("We can't establish BGP session without any IP.")
document.getElementsByName("hasIPV4LL")[0].checked = true;
return false;
}
if (prevVars.nov4 == "enh"){
document.getElementsByName("Ext_Nh")[0].checked = true
} else {
document.getElementsByName("MP_BGP")[0].checked = false
}
}
}
function onV6() {
if (document.getElementsByName("hasIPV6")[0].checked == true){
document.getElementsByName("hasIPV6LL")[0].checked = false;
document.getElementsByName("MP_BGP")[0].disabled = false;
document.getElementsByName("Ext_Nh")[0].disabled = false;
prevVars.v6 = "hasIPV6"
} else {
if( (getV4() || getV6()) == false){
alert("We can't establish BGP session without any IP.")
document.getElementsByName("hasIPV6")[0].checked = true;
return false;
}
if (getV6() == false){
document.getElementsByName("MP_BGP")[0].checked = false;
document.getElementsByName("Ext_Nh")[0].checked = false
document.getElementsByName("MP_BGP")[0].disabled = true;
document.getElementsByName("Ext_Nh")[0].disabled = true;
prevVars.nov4 = "v4only"
}
}
}
function onV6LL() {
if (document.getElementsByName("hasIPV6LL")[0].checked == true){
document.getElementsByName("hasIPV6")[0].checked = false;
document.getElementsByName("MP_BGP")[0].disabled = false;
document.getElementsByName("Ext_Nh")[0].disabled = false;
prevVars.v6 = "hasIPV6LL"
} else {
if( (getV4() || getV6()) == false){
alert("We can't establish BGP session without any IP.")
document.getElementsByName("hasIPV6LL")[0].checked = true;
return false;
}
if (getV6() == false){
document.getElementsByName("MP_BGP")[0].checked = false;
document.getElementsByName("Ext_Nh")[0].checked = false
document.getElementsByName("MP_BGP")[0].disabled = true;
document.getElementsByName("Ext_Nh")[0].disabled = true;
prevVars.nov4 = "v4only"
}
}
}
function onMPBGP() {
if (document.getElementsByName("MP_BGP")[0].checked == true){
document.getElementsByName(prevVars.v4)[0].checked = true;
}
if (document.getElementsByName("MP_BGP")[0].checked == false){
document.getElementsByName("Ext_Nh")[0].checked = false;
prevVars.nov4 = "v4only"
}
}
function onENH() {
if (document.getElementsByName("Ext_Nh")[0].checked == true){
document.getElementsByName("hasIPV4")[0].checked = false;
document.getElementsByName("hasIPV4LL")[0].checked = false;
document.getElementsByName("MP_BGP")[0].checked = true;
prevVars.nov4 = "enh"
} else {
document.getElementsByName(prevVars.v4)[0].checked = true;
}
}
"""
return f"""
<!DOCTYPE html>
<html>
<head>
<title>{ my_config["html_title"] }</title>
<a href="{ my_config["git_repo_url"] }" class="github-corner" aria-label="View source on GitHub"><svg width="80" height="80" viewBox="0 0 250 250" style="fill:#64CEAA; color:#fff; position: absolute; top: 0; border: 0; right: 0;" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a><style>.github-corner:hover .octo-arm{{animation:octocat-wave 560ms ease-in-out}}@keyframes octocat-wave{{0%,100%{{transform:rotate(0)}}20%,60%{{transform:rotate(-25deg)}}40%,80%{{transform:rotate(10deg)}}}}@media (max-width:500px){{.github-corner:hover .octo-arm{{animation:none}}.github-corner .octo-arm{{animation:octocat-wave 560ms ease-in-out}}}}</style>
<style type="text/css">
code {{display: block; /* fixes a strange ie margin bug */font-family: Courier New;font-size: 11pt;overflow:auto;background: #f0f0f0 url() left top repeat-y;border: 10px solid white;padding: 10px 10px 10px 21px;max-height:1000px;line-height: 1.2em;}}
html {{
height: 100%;
width: 100%;
display: flex;
justify-content: center;
}}
body {{
height: 100%;
width: 100%;
max-width: 1000px;
}}
table {{
table-layout: fixed;
width: 100%;
}}
table td {{
word-wrap: break-word; /* All browsers since IE 5.5+ */
overflow-wrap: break-word; /* Renamed property in CSS3 draft spec */
}}
textarea {{
width: 100%;
height:87px;
}}
input[type="text"] {{
width: 100%;
}}
</style>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
</head>
<body>
<h1>{ my_config["html_title"] }</h1>
<h3>{"Peer success! " if peerSuccess else "Please fill "}Your Info</h3>
<script>
{jsscripts}
</script>
<form action="action_page.php" method="post" class="markdown-body">
<h2>Authentication</h2>
<table class="table">
<tr><td>Your ASN</td><td><input type="text" value="{peerASN if peerASN != None else ""}" name="peerASN" /></td></tr>
<tr><td>Plain text to sign</td><td><input type="text" value="{peer_plaintext}" name="peer_plaintext" readonly/></td></tr>
<tr><td>Your PGP public key<br>(leave it blank if you don't use it)</td><td><textarea name="peer_pub_key_pgp">{peer_pub_key_pgp}</textarea></td></tr>
<tr><td>Your signature</td><td><textarea name="peer_signature">{peer_signature}</textarea></td></tr>
<tr><td>Fill your ASN, Click the button, Follow the instruction</td><td><input type="submit" name="action" value="Get Signature" /></td></tr>
</table>
<h2>Registration</h2>
<table class="table">
<tr><td><h5>BGP Session Info:</h5></td><td> </td></tr>
<tr><td><input type="checkbox" name="hasIPV4" onclick="onV4()" {"checked" if hasIPV4 else ""} {hasIPV4Disabled}>DN42 IPv4</td><td><input type="text" value="{peerIPV4 if peerIPV4 != None else ""}" name="peerIPV4" {hasIPV4Disabled} /></td></tr>
<tr><td><input type="checkbox" name="hasIPV4LL" onclick="onV4LL()" {"checked" if hasIPV4LL else ""} {hasIPV4LLDisabled}>IPv4 Link local</td><td><input type="text" value="{peerIPV4LL if peerIPV4LL != None else ""}" name="peerIPV4LL" {hasIPV4LLDisabled} /></td></tr>
<tr><td><input type="checkbox" name="hasIPV6" onclick="onV6()" {"checked" if hasIPV6 else ""} {hasIPV6Disabled}>DN42 IPv6</td><td><input type="text" value="{peerIPV6 if peerIPV6 != None else ""}" name="peerIPV6" {hasIPV6Disabled} /></td></tr>
<tr><td><input type="checkbox" name="hasIPV6LL" onclick="onV6LL()" {"checked" if hasIPV6LL else ""} {hasIPV6LLDisabled}>IPv6 Link local</td><td><input type="text" value="{peerIPV6LL if peerIPV6LL != None else ""}" name="peerIPV6LL" {hasIPV6LLDisabled} /></td></tr>
<tr><td><input type="checkbox" name="MP_BGP" onclick="onMPBGP()" {"checked" if MP_BGP else ""} {MP_BGP_Disabled} >Multiprotocol BGP</td><td></td></tr>
<tr><td><input type="checkbox" name="Ext_Nh" onclick="onENH()" {"checked" if Ext_Nh else ""} {Ext_Nh_Disabled} >Extended next hop</td><td></td></tr>
<tr><td><h5>Wireguard Connection Info:</h5></td><td> </td></tr>
<tr><td><input type="checkbox" name="hasHost" {"checked" if hasHost else ""} {hasHost_Readonly}>Your Clearnet Endpoint (domain or ip:port)</td><td><input type="text" value="{peerHostDisplay if peerHost != None else ""}" name="peerHost" /></td></tr>
<tr><td>Your Wireguard Public Key</td><td><input type="text" value="{peerWG_Pub_Key}" name="peerWG_Pub_Key" /></td></tr>
<tr><td>Your Wireguard Pre-Shared Key (Optional)</td><td><input type="text" value="{peerWG_PS_Key}" name="peerWG_PS_Key" /></td></tr>
<tr><td>Your Telegram ID or e-mail</td><td><input type="text" value="{peerContact}" name="peerContact" /></td></tr>
<tr><td>Register a new peer and get the peer ID</td><td><input type="submit" name="action" value="Register" /></td></tr>
</table>
<h2>Management</h2>
<table class="table">
<tr><td>Your Peer ID</td><td><input type="text" value="{PeerID if PeerID != None else ""}" name="PeerID" /></td></tr>
<tr><td></td><td><input type="submit" name="action" value="Show" /><input type="submit" name="action" value="Update" {edit_btn_disabled}/><input type="submit" name="action" value="Delete" {edit_btn_disabled} /></td></tr>
</table>
<h3>{"Peer success! " if peerSuccess else "This is "}My Info</h3>
<table>
<tr><td>My ASN</td><td><input type="text" value="{myASN}" readonly /></td></tr>
<tr><td>DN42 IPv4</td><td><input type="text" name="myIPV4" value="{myIPV4}" /></td></tr>
<tr><td>DN42 IPv6</td><td><input type="text" name="myIPV6" value="{myIPV6}" /></td></tr>
<tr><td>IPv4 Link local</td><td><input type="text" name="myIPV4LL" value="{myIPV4LL}" {hasIPV4LLDisabled} /></td></tr>
<tr><td>IPv6 Link local</td><td><input type="text" name="myIPV6LL" value="{myIPV6LL}" {hasIPV6LLDisabled} /></td></tr>
<tr><td>Connectrion Info: </td><td> </td></tr>
<tr><td>My Clearnet Endpoint</td><td><input type="text" value="{myHostDisplay}" readonly /></td></tr>
<tr><td>My WG Public Key</td><td><input type="text" value="{myWG_Pub_Key}" readonly /></td></tr>
<tr><td>My Contact</td><td><input type="text" value="{myContact}" readonly /></td></tr>
</table>
</form>
</body>
</html>
"""
remove_empty_line = lambda s: "\n".join(filter(lambda x:len(x)>0, s.replace("\r\n","\n").replace("\r","\n").split("\n")))
async def get_info_from_asn(asn):
asn_info = await whois_query("aut-num/" + asn)
data = DN42whois.proc_data(asn_info)
return data["mnt-by"][0] , data["admin-c"][0]
async def get_mntner_info(mntner):
mntner_info = await whois_query("mntner/" + mntner)
ret = DN42whois.proc_data(mntner_info)
if "auth" not in ret:
ret["auth"] = []
return ret
async def get_person_info(person):
person_info = (await whois_query("person/" + person))
ret = DN42whois.proc_data(person_info)
if "auth" not in ret:
ret["auth"] = []
if "pgp-fingerprint" in ret:
ret["auth"] += ["pgp-fingerprint " + ret["pgp-fingerprint"][0]]
return ret
async def get_auth_method(mnt,admin):
authes = []
if mnt != None:
authes += (await get_mntner_info(mnt))["auth"]
if admin != None:
authes += (await get_person_info(admin))["auth"]
ret = {}
for a in authes:
if a.startswith("PGPKEY"):
method , pgp_sign8 = a.split("-",1)
pgp_pubkey_str = await try_get_pub_key(pgp_sign8)
if pgp_pubkey_str == "":
continue
try:
pub = pgpy.PGPKey.from_blob(remove_empty_line(pgp_pubkey_str).encode("utf8"))[0]
real_fingerprint = pub.fingerprint.replace(" ","")
ainfo = method + " " + real_fingerprint
except Exception as e:
ainfo = method + "_Error " + str(e).replace(" ","_")
else:
ainfo = a
ret[ainfo] = False
return list(filter(lambda x:len(x) == 2,[r.split(" ",1) for r,v in ret.items()]))
async def try_get_pub_key(pgpsig):
if len(pgpsig) < 8:
return ""
pgpsig = pgpsig[-8:]
try:
result = await whois_query("key-cert/PGPKEY-" + pgpsig)
result = list(filter(lambda l:l.startswith("certif:"),result.split("\n")))
result = list(map(lambda x:x.split(":")[1].lstrip(),result))
result = "\n".join(result)
return remove_empty_line(result)
except Exception as e:
pass
return ""
def verify_signature_pgp(plaintext,fg,pub_key,raw_signature):
pub = pgpy.PGPKey.from_blob(remove_empty_line(pub_key).encode("utf8"))[0]
fg_in = fg.replace(" ","")
fg_p = pub.fingerprint.replace(" ","")
if fg_in != fg_p:
raise ValueError("fingerprint not match")
sig = pgpy.PGPSignature.from_blob(remove_empty_line(raw_signature).encode("utf8"))
if not pub.verify(plaintext,sig):
raise ValueError("signature verification failed")
return True
def verify_signature_pgpn8(plaintext,fg,pub_key,raw_signature):
pub = pgpy.PGPKey.from_blob(remove_empty_line(pub_key).encode("utf8"))[0]
fg_in = fg.replace(" ","")
fg_p = pub.fingerprint.replace(" ","")
if fg_in != fg_p:
raise ValueError("fingerprint not match")
sig = pgpy.PGPSignature.from_blob(remove_empty_line(raw_signature).encode("utf8"))
if not pub.verify(plaintext,sig):
raise ValueError("signature verification failed")
return True
def verify_signature_ssh_rsa(plaintext,pub_key,raw_signature):
sess = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
pathlib.Path("ssh").mkdir(parents=True, exist_ok=True)
sigfile_path = "ssh/tmp" + sess + ".sig"
pubfile_path = "ssh/tmp" + sess + ".pub"
open(sigfile_path,"w").write(raw_signature)
open(pubfile_path,"w").write(sess + " ssh-rsa " + pub_key)
command = 'ssh-keygen',"-Y","verify","-f",pubfile_path,"-n","dn42ap","-I",sess,"-s",sigfile_path
p = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE)
stdout_data = p.communicate(input=plaintext.encode())[0]
os.remove(sigfile_path)
os.remove(pubfile_path)
if stdout_data.startswith(b"Good"):
return True
else:
raise ValueError(stdout_data)
def verify_signature_ssh_ed25519(plaintext,pub_key,raw_signature):
sess = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
pathlib.Path("ssh").mkdir(parents=True, exist_ok=True)
sigfile_path = "ssh/tmp" + sess + ".sig"
pubfile_path = "ssh/tmp" + sess + ".pub"
open(sigfile_path,"w").write(raw_signature)
open(pubfile_path,"w").write(sess + " ssh-ed25519 " + pub_key)
command = 'ssh-keygen',"-Y","verify","-f",pubfile_path,"-n","dn42ap","-I",sess,"-s",sigfile_path
p = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE)
stdout_data = p.communicate(input=plaintext.encode())[0]
os.remove(sigfile_path)
os.remove(pubfile_path)
if stdout_data.startswith(b"Good"):
return True
else:
raise ValueError(stdout_data)
def removern(strin):
if type(strin) == str:
return strin.replace("\r\n","\n").replace("\r","\n")
elif type(strin) == bytes:
return strin.replace(b"\r\n",b"\n").replace(b"\r",b"\n")
return strin
def verify_signature(plaintext,pub_key,pub_key_pgp,raw_signature,method):
if method=="pgp-fingerprint":
return verify_signature_pgp(plaintext,pub_key,pub_key_pgp,raw_signature)
elif method=="PGPKEY":
return verify_signature_pgpn8(plaintext,pub_key,pub_key_pgp,raw_signature)
elif method=="ssh-rsa":
return verify_signature_ssh_rsa(plaintext,pub_key,raw_signature)
elif method=="ssh-ed25519":
return verify_signature_ssh_ed25519(plaintext,pub_key,raw_signature)
raise NotImplementedError("method not implement")
async def verify_user_signature(peerASN,plaintext,pub_key_pgp,raw_signature):
try:
plaintext = removern(plaintext)
pub_key_pgp = removern(pub_key_pgp)
raw_signature = removern(raw_signature)
if plaintext == "" or plaintext == None:
raise ValueError('Plain text to sign can\'t be null, please click the button "Get Signature" first.')
raw_signature = raw_signature.replace("\r\n","\n").replace("\r","\n")
if raw_signature == "" or raw_signature == None:
raise ValueError('Signature can\'t be null, please click the button "Get Signature" and follow the instruction.')
sig_info = jwt.decode(plaintext.encode("utf8"),jwt_secret,algorithms=["HS256"])
if sig_info["ASN"] != peerASN:
raise ValueError("JWT verification failed. You are not the mntner of " + sig_info["ASN"])
supported_method= ["ssh-rsa"]
# verify user signature
mntner, admin = await get_info_from_asn(peerASN)
authes = await get_auth_method(mntner, admin)
tried = False
authresult = [{"Your input":{"plaintext":plaintext,"signature":raw_signature,"pub_key_pgp":pub_key_pgp}}]
for method,pub_key in authes:
try:
if verify_signature(plaintext,pub_key,pub_key_pgp,raw_signature,method) == True:
return mntner
except Exception as e:
authresult += [{"Source": "User credential","Method": method , "Result": type(e).__name__ + ": " + str(e), "Content": pub_key}]
# verify admin signature
mntner_admin = my_config["admin_mnt"]
try:
authes_admin = await get_auth_method(mntner_admin,None)
for method,pub_key in authes_admin:
try:
if verify_signature(plaintext,pub_key,pub_key_pgp,raw_signature,method) == True:
return mntner_admin
except Exception as e:
authresult += [{"Source": "Admin credential", "Method": method , "Result": type(e).__name__ + ": " + str(e), "Content": pub_key}]
except Exception as e:
pass
raise ValueError(yaml.dump(authresult, sort_keys=False,default_style='|'))
except Exception as e:
class AuthenticationError(type(e)):
def init(m):
super(m)
AuthenticationError.__name__ = "AuthenticationError: " + type(e).__name__
raise AuthenticationError(str(e))
def get_err_page(paramaters,title,error,big_title="Server Error", tab_title = None,redirect=None):
if tab_title == None:
tab_title = title
retstr = f"""
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1"/>
<title>{ tab_title }</title>
<style type="text/css">
<!--
body{{margin:0;font-size:.7em;font-family:Verdana, Arial, Helvetica, sans-serif;background:#EEEEEE;}}
fieldset{{padding:0 15px 10px 15px;}}
h1{{font-size:2.4em;margin:0;color:#FFF;}}
h2{{font-size:1.7em;margin:0;color:#CC0000;}}
h3{{font-size:1.2em;margin:10px 0 0 0;color:#000000;}}
#header{{width:96%;margin:0 0 0 0;padding:6px 2% 6px 2%;font-family:"trebuchet MS", Verdana, sans-serif;color:#FFF;
background-color:#555555;}}
#content{{margin:0 0 0 2%;position:relative;}}
.content-container{{background:#FFF;width:96%;margin-top:8px;padding:10px;position:relative;}}
-->
</style>
</head>
<body>
<div id="header"><h1>{big_title}</h1></div>
<div id="content">
<div class="content-container"><fieldset>
<h2>{ title }</h2>
<h3>{str(error).replace(chr(10),"<br>").replace(" "," ")}</h3>
<h3></h3>
<form action="action_page.php" method="post">\n"""
paramaters = { valid_key: paramaters[valid_key] for valid_key in client_valid_keys if valid_key in paramaters}
for k,v in paramaters.items():
if k in client_valid_keys_admin_only:
continue
if v == None:
v = ""
elif v == True:
v = "on"
retstr += f'<input type="hidden" name="{k}" value="{v}">\n'
if redirect == None:
retstr +='<input type="submit" name="action" value="OK" />'
else:
retstr += f"""<a href="{redirect}">
<input type="button" value="OK" />
</a>"""
retstr +="""
</form>
</fieldset></div>
</div>
</body>
</html>
"""
return retstr
def check_wg_key(wgkey):
wg_keylen = 32
if len(wgkey) > wg_keylen*2:
raise ValueError(f"Wireguard key {wgkey} too long")
base64_valid_chars = set(string.ascii_letters + string.digits + "+/=")
if not set(wgkey).issubset(base64_valid_chars):
raise ValueError(f"Wireguard key {wgkey} contains invalid character: {set(filter(lambda x:x not in base64_valid_chars,wgkey))}")
key_raw = base64.b64decode(wgkey)
if len(key_raw) != 32:
raise ValueError(f"Wireguard key {wgkey} are not {wg_keylen} bytes len")
def check_valid_ip_range(af,IPranges,ip,name,only_ip = True):
if af == "IPv4":
IPNet = IPv4Network
IPInt = IPv4Interface
elif af == "IPv6":
IPNet = IPv6Network
IPInt = IPv6Interface
else:
raise ValueError("Unknown af:",af)
if only_ip:
if "/" in ip:
raise ValueError(ip + " is not a valid IPv4 or IPv6 address")
if IPNet(ip).num_addresses != 1:
raise ValueError(ip + " contains more than one IP")
for iprange in IPranges:
if IPNet(iprange,strict=False).supernet_of(IPInt(ip).network):
return True
raise ValueError(ip + " are not in " + name + " range: " + str(IPranges))
async def check_asn_ip(admin,mntner,asn,af,ip,only_ip=True):
if af == "IPv4":
IPNet = IPv4Network
IPInt = IPv4Interface
allowed = DN42_valid_ipv4s
descr = "DN42 IPv4"
elif af == "IPv6":
IPNet = IPv6Network
IPInt = IPv6Interface
allowed = DN42_valid_ipv6s
descr = "DN42 IPv6"
else:
raise ValueError("Unknown af:",af)
check_valid_ip_range(af,IPranges=allowed,ip=ip,name=descr,only_ip=only_ip)
peerIP_info = DN42whois.proc_data((await whois_query(ip)))
if "origin" not in peerIP_info or len(peerIP_info["origin"]) == 0:
originASN = "nobody"
else:
originASN = peerIP_info["origin"][0]
origin_check_pass = False
if asn in peerIP_info["origin"]:
return True
elif mntner in peerIP_info["mnt-by"] and mntner != "DN42-MNT":
return True
elif admin in peerIP_info["admin-c"]:
return True
else:
ipowner = peerIP_info["admin-c"][0] if len(peerIP_info["admin-c"]) > 0 else None
raise PermissionError("IP " + ip + f" owned by {originASN}({ipowner}) instead of {asn}({admin})")
async def check_reg_paramater(paramaters,skip_check=None,git_pull=True,allow_invalid_as=False,allowed_custom_myip=[]):
if (paramaters["hasIPV4"] or paramaters["hasIPV4LL"] or paramaters["hasIPV6"] or paramaters["hasIPV6LL"]) == False:
raise ValueError("You can't peer without any IP.")
if paramaters["peerASN"] == "AS" + paramaters["myASN"]:
raise ValueError("You can't peer with my ASN.")
try:
mntner,admin = await get_info_from_asn(paramaters["peerASN"])
except FileNotFoundError as e:
if allow_invalid_as:
mntner,admin = ["DN42-MNT","BURBLE-DN42"]
else:
raise e
######################### hasIPV4
if paramaters["hasIPV4"]:
if paramaters["myIPV4"] == None:
raise NotImplementedError("Sorry, I don't have IPv4 address.")
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv4",paramaters["peerIPV4"],only_ip=True)
if paramaters["myIPV4"] == my_paramaters["myIPV4"] or paramaters["myIPV4"] in allowed_custom_myip:
pass
else:
if "/" not in paramaters["myIPV4"]:
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv4",paramaters["myIPV4"],only_ip=True)
else:
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv4",paramaters["myIPV4"],only_ip=False)
check_valid_ip_range("IPv4",[paramaters["myIPV4"]],paramaters["peerIPV4"],"allocated IPv4 to me")
else:
paramaters["peerIPV4"] = None
######################### hasIPV6
if paramaters["hasIPV6"]:
if paramaters["myIPV6"] == None:
raise NotImplementedError("Sorry, I don't have IPv6 address.")
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv6",paramaters["peerIPV6"],only_ip=True)
if paramaters["myIPV6"] == my_paramaters["myIPV6"] or paramaters["myIPV6"] in allowed_custom_myip:
pass
else:
if "/" not in paramaters["myIPV6"]:
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv6",paramaters["myIPV6"],only_ip=True)
else:
await check_asn_ip(admin,mntner,paramaters['peerASN'],"IPv6",paramaters["myIPV6"],only_ip=False)
check_valid_ip_range("IPv6",[paramaters["myIPV6"]],paramaters["peerIPV6"],"allocated IPv6 to me")
else:
paramaters["peerIPV6"] = None
######################### hasIPV4LL
if paramaters["hasIPV4LL"]:
if paramaters["myIPV4LL"] == None:
raise NotImplementedError("Sorry, I don't have IPv4 link-local address.")
check_valid_ip_range("IPv4",[valid_ipv4_lilo],paramaters["peerIPV4LL"],"link-local ipv4")
check_valid_ip_range("IPv4",[valid_ipv4_lilo],paramaters["myIPV4LL"].split("/")[0],"link-local ipv4")
paramaters["myIPV4LL"] = paramaters["myIPV4LL"].split("/")[0] + "/" + valid_ipv4_lilo.split("/")[1]
else:
paramaters["peerIPV4LL"] = None
######################### hasIPV6LL
if paramaters["hasIPV6LL"]:
if paramaters["myIPV6LL"] == None:
raise NotImplementedError("Sorry, I don't have IPv6 link-local address.")
check_valid_ip_range("IPv6",[valid_ipv6_lilo],paramaters["peerIPV6LL"],"link-local ipv6")
check_valid_ip_range("IPv6",[valid_ipv6_lilo],paramaters["myIPV6LL"].split("/")[0],"link-local ipv6")
paramaters["myIPV6LL"] = paramaters["myIPV6LL"].split("/")[0] + "/" + valid_ipv6_lilo.split("/")[1]
else:
paramaters["peerIPV6LL"] = None
if paramaters["MP_BGP"]:
if not (paramaters["hasIPV6"] or paramaters["hasIPV6LL"]):
raise ValueError("Value Error. You need a IPv6 address to use multiprotocol BGP.")
if not paramaters["Ext_Nh"]:
if not (paramaters["hasIPV4"] or paramaters["hasIPV4LL"]):
raise ValueError("Value Error. You need a IPv4 address to enable multiprotocol BGP unless you support extended next hop.")
if paramaters["Ext_Nh"]:
if not (paramaters["hasIPV6"] or paramaters["hasIPV6LL"]):
raise ValueError("Value Error. You need a IPv6 address to use extended next hop.")
if not paramaters["MP_BGP"]:
raise ValueError("Value Error. You need enable multiprotocol BGP to use extended next hop.")
if paramaters["allowExtNh"] == False:
raise NotImplementedError("Sorry, I don't support extended next hop.")
if paramaters["peerWG_PS_Key"] == "":
paramaters["peerWG_PS_Key"] == None
if paramaters["customDevice"] == None:
if paramaters["hasHost"]:
if paramaters["peerHost"] == None and (my_paramaters["myHost"] == None):
raise ValueError("Sorry, I don't have a public IP so that your endpoint can't be null.")
if paramaters["peerHost"] == None or ":" not in paramaters["peerHost"]:
raise ValueError("Parse Error, Host must looks like address:port.")
hostaddr,port = paramaters["peerHost"].rsplit(":",1)
port = int(port)
if hostaddr[0] == "[" and hostaddr[-1] == "]":
hostaddr = hostaddr[1:-1]
elif ":" in hostaddr:
raise ValueError(f"Parse Error, IPv6 Address as endpoint, it should be like [{hostaddr}]:{port}.")
addrinfo = socket.getaddrinfo(hostaddr,port)
else:
paramaters["peerHost"] = None
peerKey = paramaters["peerWG_Pub_Key"]
if peerKey == None or len(peerKey) == 0:
raise ValueError('"Your WG Public Key" can\'t be null.')
if peerKey == paramaters["myWG_Pub_Key"]:
raise ValueError('You can\'t use my wireguard public key as your wireguard public key.')
check_wg_key(peerKey)
check_wg_key(paramaters["myWG_Pri_Key"])
else: