-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathexchangeworker.py
112 lines (93 loc) · 4.23 KB
/
exchangeworker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from __future__ import annotations
import abc
import logging
import urllib.parse
from datetime import datetime, timedelta
from typing import List, Callable, Union, Dict
import aiohttp.client
from aiohttp import ClientResponse
from typing import NamedTuple
from requests import Request, Response, Session
from api.dbmodels.client import Client
from api.dbmodels.balance import Balance
class Cached(NamedTuple):
url: str
response: dict
expires: datetime
class ExchangeWorker:
__tablename__ = 'client'
_ENDPOINT = ''
_cache: Dict[str, Cached] = {}
exchange: str = ''
required_extra_args: List[str] = []
def __init__(self, client: Client, session: aiohttp.ClientSession):
self.client = client
self.client_id = client.id
self.exchange = client.exchange
# Client information has to be stored locally because SQL Objects aren't allowed to live in multiple threads
self._api_key = client.api_key
self._api_secret = client.api_secret
self._subaccount = client.subaccount
self._extra_kwargs = client.extra_kwargs
self._session = session
self._identifier = id
self._last_fetch = datetime.fromtimestamp(0)
async def get_balance(self, session, time: datetime = None, force=False):
if not time:
time = datetime.now()
if force or (time - self._last_fetch > timedelta(seconds=30) and not self.client.rekt_on):
self._last_fetch = time
try:
balance = await self._get_balance(time)
except Exception:
logging.exception(
f'Exception occured while fetching balance for client with id {self.client_id} ({self.exchange})')
return Balance(amount=0.0, currency='$', time=time, extra_currencies={}, error=f'Internal {self.exchange} implementation error.')
if not balance.time:
balance.time = time
balance.client_id = self.client_id
return balance
elif self.client.rekt_on:
return Balance(amount=0.0, currency='$', extra_currencies={}, error=None, time=time)
else:
return None
@abc.abstractmethod
async def _get_balance(self, time: datetime):
logging.error(f'Exchange {self.exchange} does not implement _get_balance')
raise NotImplementedError(f'Exchange {self.exchange} does not implement _get_balance')
@abc.abstractmethod
def _sign_request(self, method: str, path: str, headers=None, params=None, data=None, **kwargs):
logging.error(f'Exchange {self.exchange} does not implement _sign_request')
@abc.abstractmethod
async def _process_response(self, response: ClientResponse):
logging.error(f'Exchange {self.exchange} does not implement _process_response')
async def _request(self, method: str, path: str, headers=None, params=None, data=None, sign=True, cache=False, **kwargs):
headers = headers or {}
params = params or {}
url = self._ENDPOINT + path
if cache:
cached = ExchangeWorker._cache.get(url)
if cached and datetime.now() < cached.expires:
return cached.response
if sign:
self._sign_request(method, path, headers, params, data)
async with self._session.request(method, url, headers=headers, params=params, data=data, **kwargs) as resp:
resp = await self._process_response(resp)
if cache:
ExchangeWorker._cache[url] = Cached(
url=url,
response=resp,
expires=datetime.now() + timedelta(seconds=5)
)
return resp
async def _get(self, path: str, **kwargs):
return await self._request('GET', path, **kwargs)
async def _post(self, path: str, **kwargs):
return await self._request('POST', path, **kwargs)
async def _put(self, path: str, **kwargs):
return await self._request('PUT', path, **kwargs)
def _query_string(self, params: Dict):
query_string = urllib.parse.urlencode(params)
return f"?{query_string}" if query_string else ""
def __repr__(self):
return f'<Worker exchange={self.exchange} client_id={self.client_id}>'