From 8e0322b73d45c809ffce7a9ff9e211bacbfd48b4 Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Fri, 8 May 2020 17:45:16 +0300 Subject: [PATCH 1/6] [EN-1414] repetition in close subscription removed --- dxfeed/core/DXFeedPy.pyx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dxfeed/core/DXFeedPy.pyx b/dxfeed/core/DXFeedPy.pyx index 6d7a2ac..84026ec 100644 --- a/dxfeed/core/DXFeedPy.pyx +++ b/dxfeed/core/DXFeedPy.pyx @@ -105,9 +105,7 @@ cdef class SubscriptionClass: self.listener = NULL def __dealloc__(self): - if self.subscription: # if connection is not closed - clib.dxf_close_subscription(self.subscription) - self.con_sub_list_ptr[0][self.subscription_order] = NULL + dxf_close_subscription(self) def get_data(self): """ From 056a79b6e74cb3358ef3ad91d1e11f7852636dc0 Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Fri, 8 May 2020 17:50:29 +0300 Subject: [PATCH 2/6] [EN-1414] fixture usage added to all related tests --- tests/test_dxfeedpy.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_dxfeedpy.py b/tests/test_dxfeedpy.py index 8dcfc27..6bf4d75 100644 --- a/tests/test_dxfeedpy.py +++ b/tests/test_dxfeedpy.py @@ -1,6 +1,7 @@ import dxfeed as dx import pytest + class ValueStorage(object): # config demo_address = 'demo.dxfeed.com:7300' event_types = ['Trade', 'Quote', 'Summary', 'Profile', 'Order', 'TimeAndSale', 'Candle', 'TradeETH', 'SpreadOrder', @@ -39,8 +40,7 @@ def test_fail_create_subscription_with_no_connection(): @pytest.mark.xfail() -def test_fail_to_use_subscription_without_connection(): - connection = dx.dxf_create_connection(ValueStorage.demo_address) +def test_fail_to_use_subscription_without_connection(connection): sub = dx.dxf_create_subscription(cc=connection, event_type='Trade') dx.dxf_close_connection(connection) dx.dxf_add_symbols(sc=sub, symbols=['AAPL']) @@ -99,9 +99,7 @@ def test_symbol_clearing(connection): @pytest.mark.parametrize('sub_type', ValueStorage.event_types) -def test_default_listeners(sub_type): - # fixture usage causes errors with, probably, threads - connection = dx.dxf_create_connection(ValueStorage.demo_address) +def test_default_listeners(connection, sub_type): con_err_status = dx.process_last_error(verbose=False) sub = dx.dxf_create_subscription(connection, sub_type) sub_err_status = dx.process_last_error(verbose=False) @@ -110,6 +108,5 @@ def test_default_listeners(sub_type): dx.dxf_detach_listener(sub) drop_lis_err_status = dx.process_last_error(verbose=False) dx.dxf_close_subscription(sub) - dx.dxf_close_connection(connection) assert (con_err_status, sub_err_status, add_lis_err_status, drop_lis_err_status) == (0, 0, 0, 0) From 3e7d4df7b524a9d5a57815531662e511cd93804b Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Wed, 20 May 2020 15:42:22 +0300 Subject: [PATCH 3/6] [EN-1414] test passed with weakreferencing --- dxfeed/core/DXFeedPy.pyx | 43 ++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/dxfeed/core/DXFeedPy.pyx b/dxfeed/core/DXFeedPy.pyx index 84026ec..0474bb5 100644 --- a/dxfeed/core/DXFeedPy.pyx +++ b/dxfeed/core/DXFeedPy.pyx @@ -1,6 +1,5 @@ # distutils: language = c++ # cython: always_allow_keywords=True -from libcpp.deque cimport deque as cppdq from dxfeed.core.utils.helpers cimport * from dxfeed.core.utils.helpers import * @@ -12,6 +11,7 @@ from datetime import datetime import pandas as pd from typing import Optional, Union, Iterable from warnings import warn +from weakref import WeakSet # for importing variables import dxfeed.core.listeners.listener as lis @@ -53,24 +53,30 @@ cdef class ConnectionClass: Data structure that contains connection """ cdef clib.dxf_connection_t connection - # sub_ptr_list contains pointers to all subscriptions related to current connection - cdef cppdq[clib.dxf_subscription_t *] sub_ptr_list - # each subscription has its own index in a list - cdef int subs_order + cdef object __sub_refs def __init__(self): - self.subs_order = 0 + self.__sub_refs = WeakSet() def __dealloc__(self): dxf_close_connection(self) + def get_sub_refs(self): + """ + Method to get list of references to all subscriptions related to current connection + + Returns + ------- + :list + List of weakref objects. Empty list if no refs + """ + refs = list(self.__sub_refs) if self.__sub_refs else list() + return refs + cpdef SubscriptionClass make_new_subscription(self, data_len: int): cdef SubscriptionClass out = SubscriptionClass(data_len) out.connection = self.connection - self.sub_ptr_list.push_back(&out.subscription) # append pointer to new subscription - out.subscription_order = self.subs_order # assign each subscription an index - self.subs_order += 1 - out.con_sub_list_ptr = &self.sub_ptr_list # reverse pointer to pointers list + self.__sub_refs.add(out) return out @@ -80,9 +86,8 @@ cdef class SubscriptionClass: """ cdef clib.dxf_connection_t connection cdef clib.dxf_subscription_t subscription - cdef int subscription_order # index in list of subscription pointers - cdef cppdq[clib.dxf_subscription_t *] *con_sub_list_ptr # pointer to list of subscription pointers cdef dxf_event_listener_t listener + cdef object __weakref__ # Weak referencing enabling cdef object event_type_str cdef list columns cdef object data @@ -185,7 +190,7 @@ def dxf_create_connection_auth_bearer(address: Union[str, unicode, bytes], address = address.encode('utf-8') token = token.encode('utf-8') clib.dxf_create_connection_auth_bearer(address, token, - NULL, NULL, NULL, NULL, NULL, &cc.connection) + NULL, NULL, NULL, NULL, NULL, &cc.connection) error_code = process_last_error(verbose=False) if error_code: raise RuntimeError(f"In underlying C-API library error {error_code} occurred!") @@ -366,14 +371,9 @@ def dxf_close_connection(ConnectionClass cc): cc: ConnectionClass Variable with connection information """ - - # close all subscriptions before closing the connection - for i in range(cc.sub_ptr_list.size()): - if cc.sub_ptr_list[i]: # subscription should not be closed previously - clib.dxf_close_subscription(cc.sub_ptr_list[i][0]) - cc.sub_ptr_list[i][0] = NULL # mark subscription as closed - - cc.sub_ptr_list.clear() + related_subs = cc.get_sub_refs() + for sub in related_subs: + dxf_close_subscription(sub) if cc.connection: clib.dxf_close_connection(cc.connection) @@ -391,7 +391,6 @@ def dxf_close_subscription(SubscriptionClass sc): if sc.subscription: clib.dxf_close_subscription(sc.subscription) sc.subscription = NULL - sc.con_sub_list_ptr[0][sc.subscription_order] = NULL def dxf_get_current_connection_status(ConnectionClass cc, return_str: bool=True): """ From 941cca0a868fbdd202667d719d3d86528c2446bd Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Wed, 20 May 2020 16:15:40 +0300 Subject: [PATCH 4/6] [EN-1414] version bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f44e133..b81c281 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dxfeed" -version = "0.2.0" +version = "0.2.1" description = "DXFeed Python API via C API" authors = ["Index Management Team "] build = "build.py" From 4ada855f472c0558ea932da0fbb20ddd743c5bd4 Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Wed, 20 May 2020 17:07:31 +0300 Subject: [PATCH 5/6] [EN-1414] better get subs references method --- dxfeed/core/DXFeedPy.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dxfeed/core/DXFeedPy.pyx b/dxfeed/core/DXFeedPy.pyx index 0474bb5..fddb1f3 100644 --- a/dxfeed/core/DXFeedPy.pyx +++ b/dxfeed/core/DXFeedPy.pyx @@ -70,8 +70,7 @@ cdef class ConnectionClass: :list List of weakref objects. Empty list if no refs """ - refs = list(self.__sub_refs) if self.__sub_refs else list() - return refs + return list(self.__sub_refs) cpdef SubscriptionClass make_new_subscription(self, data_len: int): cdef SubscriptionClass out = SubscriptionClass(data_len) From 0e269a0072dfdb94987f809b1da034859739d102 Mon Sep 17 00:00:00 2001 From: Ivan Mishalkin Date: Wed, 20 May 2020 18:59:27 +0300 Subject: [PATCH 6/6] [EN-1414] fix extra logs in pytest --- dxfeed/core/DXFeedPy.pyx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dxfeed/core/DXFeedPy.pyx b/dxfeed/core/DXFeedPy.pyx index fddb1f3..deb1d1d 100644 --- a/dxfeed/core/DXFeedPy.pyx +++ b/dxfeed/core/DXFeedPy.pyx @@ -231,7 +231,7 @@ def dxf_create_subscription(ConnectionClass cc, event_type: str, candle_time: Op candle_time = datetime.strptime(candle_time, '%Y-%m-%d %H:%M:%S') if candle_time else datetime.utcnow() timestamp = int((candle_time - datetime(1970, 1, 1)).total_seconds()) * 1000 - 5000 except ValueError: - raise Exception("Inapropriate date format, should be %Y-%m-%d %H:%M:%S") + raise Exception("Inappropriate date format, should be %Y-%m-%d %H:%M:%S") if event_type == 'Candle': clib.dxf_create_subscription_timed(sc.connection, et_type_int, timestamp, &sc.subscription) @@ -370,11 +370,11 @@ def dxf_close_connection(ConnectionClass cc): cc: ConnectionClass Variable with connection information """ - related_subs = cc.get_sub_refs() - for sub in related_subs: - dxf_close_subscription(sub) - if cc.connection: + related_subs = cc.get_sub_refs() + for sub in related_subs: + dxf_close_subscription(sub) + clib.dxf_close_connection(cc.connection) cc.connection = NULL