-
Notifications
You must be signed in to change notification settings - Fork 852
/
Copy pathmongo_lock_data.py
51 lines (36 loc) · 1.56 KB
/
mongo_lock_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from syscore.constants import arg_not_supplied
from sysdata.production.locks import lockData, lock_off, lock_on
from sysdata.mongodb.mongo_generic import mongoDataWithSingleKey
from syslogging.logger import *
LOCK_STATUS_COLLECTION = "locks"
LOCK_DICT_KEY = "lock"
class mongoLockData(lockData):
"""
Read and write data class to get lock data
"""
def __init__(self, mongo_db=arg_not_supplied, log=get_logger("mongoLockData")):
super().__init__(log=log)
self._mongo_data = mongoDataWithSingleKey(
LOCK_STATUS_COLLECTION, "instrument_code", mongo_db=mongo_db
)
def __repr__(self):
return "mongoLockData %s" % str(self.mongo_data)
@property
def mongo_data(self):
return self._mongo_data
def _get_lock_for_instrument_no_checking(self, instrument_code: str) -> str:
result = self.mongo_data.get_result_dict_for_key(instrument_code)
lock = result[LOCK_DICT_KEY]
return lock
def add_lock_for_instrument(self, instrument_code: str):
self.mongo_data.add_data(instrument_code, {LOCK_DICT_KEY: lock_on})
def remove_lock_for_instrument(self, instrument_code):
self.mongo_data.delete_data_without_any_warning(instrument_code)
def get_list_of_locked_instruments(self):
all_instruments = self.mongo_data.get_list_of_keys()
all_instruments_with_locks = [
instrument_code
for instrument_code in all_instruments
if self.is_instrument_locked(instrument_code)
]
return all_instruments_with_locks