-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathversion_cache.py
66 lines (58 loc) · 2.15 KB
/
version_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import rethinkdb as r
class VersionCache:
"""
Apparently RethinkDB is not capable of handling indexes without hitting 100 reads/second and shitting itself.
It's fine though. This is probably a bit quicker.
"""
def __init__(self, conn, loop):
self.loop = loop
self.conn = conn
self.loop.create_task(self._handle_db())
self._cache = []
async def _handle_db(self):
self._cache = sorted(await r.table("versions").coerce_to("array").run(self.conn), key=lambda x: x['release_id'])
while True:
try:
feed = await r.table("versions").changes().run(self.conn)
while await feed.fetch_next():
change = await feed.next()
old = change['old_val']
new = change['new_val']
if old:
if new:
for i in self._cache:
if i['id'] == old['id']:
for k in new.keys():
i[k] = new[k]
else:
for i in self._cache:
if i['id'] == old['id']:
self._cache.remove(i)
else:
self._cache.append(new)
except r.ReqlOpFailedError:
# Well the connection dropped. This will loop back around.
pass
def since_version_id(self, version_id):
i = 0
x = []
for version in self._cache:
if i >= version_id:
x.append(version)
else:
i += 1
return x
def get_latest(self):
indexed_length = len(self._cache) - 1
beta_res = None
release_res = None
while True:
if beta_res and release_res:
break
item = self._cache[indexed_length]
if item['beta']:
beta_res = item
else:
release_res = item
indexed_length -= 1
return (beta_res, release_res)