diff --git a/dxfeed/core/DXFeedPy.pyx b/dxfeed/core/DXFeedPy.pyx index 6d7a2ac..deb1d1d 100644 --- a/dxfeed/core/DXFeedPy.pyx +++ b/dxfeed/core/DXFeedPy.pyx @@ -1,6 +1,5 @@ # distutils: language = c++ # cython: always_allow_keywords=True -from libcpp.deque cimport deque as cppdq from dxfeed.core.utils.helpers cimport * from dxfeed.core.utils.helpers import * @@ -12,6 +11,7 @@ from datetime import datetime import pandas as pd from typing import Optional, Union, Iterable from warnings import warn +from weakref import WeakSet # for importing variables import dxfeed.core.listeners.listener as lis @@ -53,24 +53,29 @@ cdef class ConnectionClass: Data structure that contains connection """ cdef clib.dxf_connection_t connection - # sub_ptr_list contains pointers to all subscriptions related to current connection - cdef cppdq[clib.dxf_subscription_t *] sub_ptr_list - # each subscription has its own index in a list - cdef int subs_order + cdef object __sub_refs def __init__(self): - self.subs_order = 0 + self.__sub_refs = WeakSet() def __dealloc__(self): dxf_close_connection(self) + def get_sub_refs(self): + """ + Method to get list of references to all subscriptions related to current connection + + Returns + ------- + :list + List of weakref objects. Empty list if no refs + """ + return list(self.__sub_refs) + cpdef SubscriptionClass make_new_subscription(self, data_len: int): cdef SubscriptionClass out = SubscriptionClass(data_len) out.connection = self.connection - self.sub_ptr_list.push_back(&out.subscription) # append pointer to new subscription - out.subscription_order = self.subs_order # assign each subscription an index - self.subs_order += 1 - out.con_sub_list_ptr = &self.sub_ptr_list # reverse pointer to pointers list + self.__sub_refs.add(out) return out @@ -80,9 +85,8 @@ cdef class SubscriptionClass: """ cdef clib.dxf_connection_t connection cdef clib.dxf_subscription_t subscription - cdef int subscription_order # index in list of subscription pointers - cdef cppdq[clib.dxf_subscription_t *] *con_sub_list_ptr # pointer to list of subscription pointers cdef dxf_event_listener_t listener + cdef object __weakref__ # Weak referencing enabling cdef object event_type_str cdef list columns cdef object data @@ -105,9 +109,7 @@ cdef class SubscriptionClass: self.listener = NULL def __dealloc__(self): - if self.subscription: # if connection is not closed - clib.dxf_close_subscription(self.subscription) - self.con_sub_list_ptr[0][self.subscription_order] = NULL + dxf_close_subscription(self) def get_data(self): """ @@ -187,7 +189,7 @@ def dxf_create_connection_auth_bearer(address: Union[str, unicode, bytes], address = address.encode('utf-8') token = token.encode('utf-8') clib.dxf_create_connection_auth_bearer(address, token, - NULL, NULL, NULL, NULL, NULL, &cc.connection) + NULL, NULL, NULL, NULL, NULL, &cc.connection) error_code = process_last_error(verbose=False) if error_code: raise RuntimeError(f"In underlying C-API library error {error_code} occurred!") @@ -229,7 +231,7 @@ def dxf_create_subscription(ConnectionClass cc, event_type: str, candle_time: Op candle_time = datetime.strptime(candle_time, '%Y-%m-%d %H:%M:%S') if candle_time else datetime.utcnow() timestamp = int((candle_time - datetime(1970, 1, 1)).total_seconds()) * 1000 - 5000 except ValueError: - raise Exception("Inapropriate date format, should be %Y-%m-%d %H:%M:%S") + raise Exception("Inappropriate date format, should be %Y-%m-%d %H:%M:%S") if event_type == 'Candle': clib.dxf_create_subscription_timed(sc.connection, et_type_int, timestamp, &sc.subscription) @@ -368,16 +370,11 @@ def dxf_close_connection(ConnectionClass cc): cc: ConnectionClass Variable with connection information """ - - # close all subscriptions before closing the connection - for i in range(cc.sub_ptr_list.size()): - if cc.sub_ptr_list[i]: # subscription should not be closed previously - clib.dxf_close_subscription(cc.sub_ptr_list[i][0]) - cc.sub_ptr_list[i][0] = NULL # mark subscription as closed - - cc.sub_ptr_list.clear() - if cc.connection: + related_subs = cc.get_sub_refs() + for sub in related_subs: + dxf_close_subscription(sub) + clib.dxf_close_connection(cc.connection) cc.connection = NULL @@ -393,7 +390,6 @@ def dxf_close_subscription(SubscriptionClass sc): if sc.subscription: clib.dxf_close_subscription(sc.subscription) sc.subscription = NULL - sc.con_sub_list_ptr[0][sc.subscription_order] = NULL def dxf_get_current_connection_status(ConnectionClass cc, return_str: bool=True): """ diff --git a/pyproject.toml b/pyproject.toml index d7b39a0..3d2ddf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dxfeed" -version = "0.2.0" +version = "0.2.1" description = "DXFeed Python API via C API" authors = ["Index Management Team "] build = "build.py" diff --git a/tests/test_dxfeedpy.py b/tests/test_dxfeedpy.py index 8dcfc27..6bf4d75 100644 --- a/tests/test_dxfeedpy.py +++ b/tests/test_dxfeedpy.py @@ -1,6 +1,7 @@ import dxfeed as dx import pytest + class ValueStorage(object): # config demo_address = 'demo.dxfeed.com:7300' event_types = ['Trade', 'Quote', 'Summary', 'Profile', 'Order', 'TimeAndSale', 'Candle', 'TradeETH', 'SpreadOrder', @@ -39,8 +40,7 @@ def test_fail_create_subscription_with_no_connection(): @pytest.mark.xfail() -def test_fail_to_use_subscription_without_connection(): - connection = dx.dxf_create_connection(ValueStorage.demo_address) +def test_fail_to_use_subscription_without_connection(connection): sub = dx.dxf_create_subscription(cc=connection, event_type='Trade') dx.dxf_close_connection(connection) dx.dxf_add_symbols(sc=sub, symbols=['AAPL']) @@ -99,9 +99,7 @@ def test_symbol_clearing(connection): @pytest.mark.parametrize('sub_type', ValueStorage.event_types) -def test_default_listeners(sub_type): - # fixture usage causes errors with, probably, threads - connection = dx.dxf_create_connection(ValueStorage.demo_address) +def test_default_listeners(connection, sub_type): con_err_status = dx.process_last_error(verbose=False) sub = dx.dxf_create_subscription(connection, sub_type) sub_err_status = dx.process_last_error(verbose=False) @@ -110,6 +108,5 @@ def test_default_listeners(sub_type): dx.dxf_detach_listener(sub) drop_lis_err_status = dx.process_last_error(verbose=False) dx.dxf_close_subscription(sub) - dx.dxf_close_connection(connection) assert (con_err_status, sub_err_status, add_lis_err_status, drop_lis_err_status) == (0, 0, 0, 0)