forked from GzGod/Grass2.0
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
160 lines (147 loc) · 5.62 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import uuid
import json
import aiohttp
import argparse
from datetime import datetime, timezone
from fake_useragent import UserAgent
from colorama import *
green = Fore.LIGHTGREEN_EX
red = Fore.LIGHTRED_EX
magenta = Fore.LIGHTMAGENTA_EX
white = Fore.LIGHTWHITE_EX
black = Fore.LIGHTBLACK_EX
reset = Style.RESET_ALL
yellow = Fore.LIGHTYELLOW_EX
class Grass:
def __init__(self, userid, proxy):
self.userid = userid
self.proxy = proxy
self.ses = aiohttp.ClientSession()
def log(self, msg):
now = datetime.now(tz=timezone.utc).isoformat(" ").split(".")[0]
print(f"{black}[{now}] {reset}{msg}{reset}")
@staticmethod
async def ipinfo(proxy=None):
async with aiohttp.ClientSession() as client:
result = await client.get("https://api.ipify.org/", proxy=proxy)
return await result.text()
async def start(self):
max_retry = 10
retry = 1
proxy = self.proxy
if proxy is None:
proxy = await Grass.ipinfo()
browser_id = uuid.uuid5(uuid.NAMESPACE_URL, proxy)
useragent = UserAgent().random
headers = {
"Host": "proxy2.wynd.network:4650",
"Connection": "Upgrade",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"User-Agent": useragent,
"Upgrade": "websocket",
"Origin": "chrome-extension://lkbnfiajjmbhnfledhphioinpickokdi",
"Sec-WebSocket-Version": "13",
"Accept-Language": "en-US,en;q=0.9,id;q=0.8",
}
while True:
try:
if retry >= max_retry:
self.log(f"{yellow}max retrying reacted, skip the proxy !")
await self.ses.close()
return
async with self.ses.ws_connect(
"wss://proxy2.wynd.network:4650/",
headers=headers,
proxy=self.proxy,
timeout=1000,
autoclose=False,
) as wss:
res = await wss.receive_json()
auth_id = res.get("id")
if auth_id is None:
self.log(f"{red}auth id is None")
return None
auth_data = {
"id": auth_id,
"origin_action": "AUTH",
"result": {
"browser_id": browser_id.__str__(),
"user_id": self.userid,
"user_agent": useragent,
"timestamp": int(datetime.now().timestamp()),
"device_type": "extension",
"version": "4.26.2",
"extension_id": "lkbnfiajjmbhnfledhphioinpickokdi",
},
}
await wss.send_json(auth_data)
self.log(f"{green}成功连接 {white}到服务器!")
retry = 1
while True:
ping_data = {
"id": uuid.uuid4().__str__(),
"version": "1.0.0",
"action": "PING",
"data": {},
}
await wss.send_json(ping_data)
self.log(f"{white}发送 {green}ping {white}到服务器 !")
pong_data = {"id": "F3X", "origin_action": "PONG"}
await wss.send_json(pong_data)
self.log(f"{white}发送 {magenta}pong {white}到服务器 !")
# you can edit the countdown in code below
await countdown(120)
except KeyboardInterrupt:
await self.ses.close()
exit()
except Exception as e:
self.log(f"{red}error : {white}{e}")
retry += 1
continue
async def countdown(t):
for i in range(t, 0, -1):
minute, seconds = divmod(i, 60)
hour, minute = divmod(minute, 60)
seconds = str(seconds).zfill(2)
minute = str(minute).zfill(2)
hour = str(hour).zfill(2)
print(f"waiting for {hour}:{minute}:{seconds} ", flush=True, end="\r")
await asyncio.sleep(1)
async def main():
arg = argparse.ArgumentParser()
arg.add_argument(
"--proxy", "-P", default="proxies.txt", help="Custom proxy input file "
)
args = arg.parse_args()
os.system("cls" if os.name == "nt" else "clear")
print(
f"""
{red} 本脚本由推特用户雪糕战神@Hy78516012开源使用 无任何收费!!!
{white}Gihub : {green}github.com/Gzgod
{white}我的推特 :{green}雪糕战神@Hy78516012
{green}Get some grass !
"""
)
token = open("token.txt", "r").read()
userid = open("userid.txt", "r").read()
if len(userid) <= 0:
print(f"{red}错误 : {white}请先输入您的用户ID!")
exit()
if not os.path.exists(args.proxy):
print(f"{red}{args.proxy} 未找到,请确保 {args.proxy} 可用!")
exit()
proxies = open(args.proxy, "r").read().splitlines()
if len(proxies) <= 0:
proxies = [None]
tasks = [Grass(userid, proxy).start() for proxy in proxies]
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
import asyncio
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
except KeyboardInterrupt:
exit()