-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathTMoohIManager.py
185 lines (164 loc) · 6.44 KB
/
TMoohIManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import time
import json
import random
import threading
import jsonpatch
from collections import deque
import urllib.request as urllib2
import MoohLog
import TMoohIUser
import TMoohIConnection
from MoohLog import eventmessage
from TMoohIErrors import RateLimitError, TooManyChannelsError, NotConnectedError
from TMoohIStatTrack import TMoohIStatTrack
# This is the main manager for anything TMI/twitch API related. It will also bootstrap all the connections that have to be created when the server boots.
# Its parent is the main TMoohI class.
class TMoohIManager(TMoohIStatTrack):
def __init__(self,parent):
self.quitting = False
self.started = time.time()
self.cachedJSONresponses = dict()
self.users = {}
self.parent = parent
self.logger = parent.logger
self.stats = { "users":self.users, "queue":self.getResendQueueLength, "since":time.time(), "build": self.parent.BuildInfo.__dict__ }
# contains all the messages that couldnt be sent at the given point in time as a tuple (user,client,message)
self.joinqueue = deque()
self._createdconnections = []
# times at which we created a connection or joined a channel
self._conn_join_times = []
self._connectionIDcounter = {}
self._queuethread = threading.Thread(target=self.handleJoinQueue)
self._queuethread.daemon = True
self._queuethread.start()
self._updatestatusthread = threading.Thread(target=self.updateStatus)
self._updatestatusthread.daemon = True
self._updatestatusthread.start()
def quit(self):
self.quitting = True
for userkey,usr in self.users.items():
usr.quit()
def TMIConnectionFactory(self,user):
now = time.time()
self._conn_join_times = [i for i in self._conn_join_times if i>now-10]
if len(self._conn_join_times)>self.parent.config["connections-per-10"]:
raise RateLimitError('Creating connection for user ' + user.nick)
else:
self._conn_join_times.append(now)
#create a connection
try:
self._connectionIDcounter[user.nick] += 1
except Exception:
self._connectionIDcounter[user.nick] = 1
return TMoohIConnection.TMoohIConnection(user, self.parent.config["server"],"%s/%d"%(user.nick,self._connectionIDcounter[user.nick]))
def connect(self, client):
for userkey,usr in self.users.items():
if usr.nick == client.nick and usr.oauth == client.oauth:
usr.welcome(client)
return usr
usr = TMoohIUser.TMoohIUser(self,client.nick,client.oauth)
self.users[usr.key] = usr
usr.welcome(client)
return usr
def disconnect(self, client):
if client.user:
try:
client.user.clients.remove(client)
except Exception:
self.logger.exception()
def getJSON(self,url,cooldown=3600):
try:
if time.time() < self.cachedJSONresponses[url][0]+cooldown:
self.logger.debug(eventmessage("manager","JSON response from cache: %s"%(url,)))
return self.cachedJSONresponses[url][1]
except KeyError:
pass
self.logger.debug(eventmessage("manager","Downloading JSON from %s"%(url,)))
res = urllib2.urlopen(url)
jsdata = res.read().decode("utf-8")
data = json.loads(jsdata)
self.cachedJSONresponses[url] = (time.time(), data)
return data
def join(self, user, channelinfo):
# try joining this channel
for conn in user.connections:
try:
if len(self._conn_join_times) < self.parent.config["connections-per-10"]:
conn.join(channelinfo)
self._conn_join_times.append(time.time())
self.logger.debug(eventmessage("manager","Channel %s joined on connection %s"%(channelinfo.name,conn.connid)))
break
except (TooManyChannelsError, NotConnectedError):
pass
else:
self.logger.debug(eventmessage("manager","Channel %s could not be joined, enqueueing"%(channelinfo.name,)))
self.joinqueue.append({"user":user,"channelinfo":channelinfo})
def handleJoinQueue(self):
while not self.quitting:
try:
# in each iteration, handle the joinQueue
now = time.time()
self._conn_join_times = [i for i in self._conn_join_times if i>now-10]
# check all users on connection deficit
try:
for userkey,user in self.users.items():
if self.quitting:
return
if len(self._conn_join_times) < self.parent.config["connections-per-10"]:
if user.getTotalChannels() >= self.parent.config["capacity-target"] * user.getCapacity():
self.logger.debug(eventmessage("manager","Requesting new connection for %s because of exceeded capacity"%(user.key,)))
# request new connection (in a non-GIL interpreter, wrap this in try-except)
user.connections.append(self.TMIConnectionFactory(user))
# handle message queues
user.handleMessageQueue()
except RuntimeError:
# Dict changed size during iteration. Nbd, well check again in .1 secs anyways.
pass
# check join queue
iterator = 0
while iterator < len(self.joinqueue):
if self.quitting:
return
if len(self._conn_join_times) >= self.parent.config["connections-per-10"]:
break
# dequeue a channel and try to join it
channeljoininfo = self.joinqueue.pop()
user = channeljoininfo["user"]
channelinfo = channeljoininfo["channelinfo"]
self.logger.debug(eventmessage("manager","Dequeing channel %s for %s from join queue"%(channelinfo.name,user.key)))
# try joining this channel
seed = random.randint(0,len(user.connections))
for index in range(len(user.connections)):
try:
conn = user.connections[(index+seed)%len(user.connections)]
conn.join(channelinfo)
self._conn_join_times.append(time.time())
self.logger.debug(eventmessage("manager","Channel %s joined on connection %s"%(channelinfo.name,conn.connid)))
break
except (TooManyChannelsError, NotConnectedError):
pass
else:
# put it back into the deque
self.joinqueue.append(channeljoininfo)
iterator += 1
self.logger.debug(eventmessage("manager","Channel %s could not be joined, requeueing"%(channelinfo.name,)))
time.sleep(0.1)
except Exception:
self.logger.exception()
def getResendQueueLength(self):
return len(self.joinqueue)
def getUptime(self):
return time.time()-self.started
def updateStatus(self):
cnt = 0
while not self.quitting:
if cnt%10==0:
try:
serialized = self.serialize()
patch = jsonpatch.JsonPatch.from_diff(self.parent.websocketserver.neweststatus, serialized)
self.parent.websocketserver.neweststatus = serialized
self.logger.log(1,MoohLog.statusmessage(patch.patch,"patch"))
except Exception:
self.logger.exception()
cnt += 1
time.sleep(1)