-
Notifications
You must be signed in to change notification settings - Fork 852
/
Copy patharctic_optimal_positions.py
65 lines (50 loc) · 1.94 KB
/
arctic_optimal_positions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
Read and write data from mongodb for individual futures contracts
"""
from typing import Union
from syscore.exceptions import missingData
from sysdata.arctic.arctic_connection import arcticData
from sysdata.production.optimal_positions import optimalPositionData
from syslogging.logger import *
from sysobjects.production.tradeable_object import (
instrumentStrategy,
listOfInstrumentStrategies,
)
import pandas as pd
OPTIMAL_POSITION_COLLECTION = "optimal_positions"
class arcticOptimalPositionData(optimalPositionData):
def __init__(self, mongo_db=None, log=get_logger("arcticOptimalPositionData")):
super().__init__(log=log)
self._arctic_connection = arcticData(
OPTIMAL_POSITION_COLLECTION, mongo_db=mongo_db
)
def __repr__(self):
return repr(self._arctic_connection)
@property
def arctic_connection(self):
return self._arctic_connection
def get_list_of_instrument_strategies_with_optimal_position(
self,
) -> listOfInstrumentStrategies:
raw_list_of_instrument_strategies = self.arctic_connection.get_keynames()
list_of_instrument_strategies = [
instrumentStrategy.from_key(key)
for key in raw_list_of_instrument_strategies
]
return listOfInstrumentStrategies(list_of_instrument_strategies)
def get_optimal_position_as_df_for_instrument_strategy(
self, instrument_strategy: instrumentStrategy
) -> pd.DataFrame:
try:
ident = instrument_strategy.key
df_result = self.arctic_connection.read(ident)
except:
raise missingData
return df_result
def write_optimal_position_as_df_for_instrument_strategy_without_checking(
self,
instrument_strategy: instrumentStrategy,
optimal_positions_as_df: pd.DataFrame,
):
ident = instrument_strategy.key
self.arctic_connection.write(ident, optimal_positions_as_df)