diff --git a/sportorg/gui/main_window.py b/sportorg/gui/main_window.py index 95eb5aa9..1bc9c997 100644 --- a/sportorg/gui/main_window.py +++ b/sportorg/gui/main_window.py @@ -147,7 +147,7 @@ def teamwork(self, command): # logging.info(repr(command.data)) # if 'object' in command.data and command.data['object'] in # ['ResultManual', 'ResultSportident', 'ResultSFR', 'ResultSportiduino']: - if command.header.objType in [ + if command.header.obj_type in [ ObjectTypes.Result.value, ObjectTypes.ResultManual.value, ObjectTypes.ResultSportident.value, diff --git a/sportorg/models/memory.py b/sportorg/models/memory.py index c7e20cee..7b8ad384 100644 --- a/sportorg/models/memory.py +++ b/sportorg/models/memory.py @@ -6,7 +6,6 @@ from abc import abstractmethod from datetime import date from enum import Enum, IntEnum -from typing import Dict from typing import Optional import dateutil.parser diff --git a/sportorg/modules/teamwork/client.py b/sportorg/modules/teamwork/client.py index 5b02383a..d6657d00 100644 --- a/sportorg/modules/teamwork/client.py +++ b/sportorg/modules/teamwork/client.py @@ -1,6 +1,8 @@ import queue +import select import socket -from threading import Event, Thread, main_thread +from threading import Event, Thread +from typing import Tuple import orjson @@ -8,99 +10,68 @@ from .server import Command -class ClientSenderThread(Thread): - def __init__(self, conn, in_queue, stop_event, logger): - super().__init__(daemon=True) - self.setName(self.__class__.__name__) - self.conn = conn +class ClientSender: + def __init__(self, in_queue: queue.Queue): self._in_queue = in_queue - self._stop_event = stop_event - self._logger = logger - - def run(self): - self._logger.debug('Client sender start') - while True: - try: - cmd = self._in_queue.get(timeout=5) - self.conn.sendall(cmd.get_packet()) - except queue.Empty: - if not main_thread().is_alive() or self._stop_event.is_set(): - break - except ConnectionResetError as e: - self._logger.debug(str(e)) - break - except Exception as e: - self._logger.debug(str(e)) - break - self.conn.close() - self._logger.debug('Client sender shutdown') - self._stop_event.set() + def send(self, conn: socket.socket) -> None: + try: + while True: + cmd = self._in_queue.get_nowait() + conn.sendall(cmd.get_packet()) + except queue.Empty: + return -class ClientReceiverThread(Thread): - def __init__(self, conn, out_queue, stop_event, logger): - super().__init__() - self.setName(self.__class__.__name__) - self.conn = conn - self._out_queue = out_queue - self._stop_event = stop_event - self._logger = logger - def run(self): - full_data = b'' - self.conn.settimeout(5) - self._logger.debug('Client receiver start') - hdr = Header() - is_new_pack = True +class ClientReceiver: + MSG_SIZE = 1024 + def __init__(self, out_queue: queue.Queue): + self._out_queue = out_queue + self._full_data = b'' + self._hdr = Header() + self._is_new_pack = True + + def read(self, conn: socket.socket) -> None: + data = conn.recv(self.MSG_SIZE) + if not data: + return + self._full_data += data while True: - try: - data = self.conn.recv(1024) - if not data: + # getting Header + if self._is_new_pack: + if len(self._full_data) >= self._hdr.header_size: + self._hdr.unpack_header(self._full_data[: self._hdr.header_size]) + self._full_data = self._full_data[self._hdr.header_size :] + self._is_new_pack = False + else: break - full_data += data - while True: - # getting Header - if is_new_pack: - if len(full_data) >= hdr.header_size: - hdr.unpack_header(full_data[: hdr.header_size]) - full_data = full_data[hdr.header_size :] - is_new_pack = False - else: - break - # Getting JSON data - else: - if len(full_data) >= hdr.size: - command = Command( - orjson.loads(full_data[: hdr.size].decode()), - Operations(hdr.op_type).name, - ) - self._out_queue.put(command) # for local - full_data = full_data[hdr.size :] - is_new_pack = True - else: - break - except socket.timeout: - if not main_thread().is_alive() or self._stop_event.is_set(): + # Getting JSON data + else: + if len(self._full_data) >= self._hdr.size: + command = Command( + orjson.loads(self._full_data[: self._hdr.size].decode()), + Operations(self._hdr.op_type).name, + ) + self._out_queue.put_nowait(command) + self._full_data = self._full_data[self._hdr.size :] + self._is_new_pack = True + else: break - except ConnectionAbortedError as e: - self._logger.exception(e) - break - except ConnectionResetError as e: - self._logger.exception(e) - break - except Exception as e: - self._logger.exception(e) - break - self._logger.debug('Client receiver shutdown') - self._stop_event.set() class ClientThread(Thread): - def __init__(self, addr, in_queue, out_queue, stop_event, logger): + def __init__( + self, + addr: Tuple[str, int], + in_queue: queue.Queue, + out_queue: queue.Queue, + stop_event: Event, + logger, + ): super().__init__() - self.setName(self.__class__.__name__) - self.addr = addr + self.setName('Teamwork Client') + self._addr = addr self._in_queue = in_queue self._out_queue = out_queue self._stop_event = stop_event @@ -113,27 +84,31 @@ def join_client(self) -> None: def run(self) -> None: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: - s.connect(self.addr) - self._logger.info('Client start') - sender = ClientSenderThread( - s, self._in_queue, self._stop_event, self._logger - ) - sender.start() - receiver = ClientReceiverThread( - s, self._out_queue, self._stop_event, self._logger - ) - receiver.start() + s.connect(self._addr) + s.settimeout(5) + s.setblocking(False) + self._logger.info('Client started') self._client_started.set() - - sender.join() - receiver.join() + sender = ClientSender(self._in_queue) + receiver = ClientReceiver(self._out_queue) + sockets = [s] + while True: + if self._stop_event.is_set(): + break + rread, rwrite, err = select.select(sockets, sockets, [], 1) + if rread: + receiver.read(s) + if rwrite: + sender.send(s) except ConnectionRefusedError as e: self._logger.exception(e) + self._stop_event.set() return except Exception as e: self._logger.exception(e) + self._stop_event.set() return s.close() - self._logger.info('Client shutdown') + self._logger.info('Client stopped') diff --git a/sportorg/modules/teamwork/server.py b/sportorg/modules/teamwork/server.py index 1aed50a9..9aedd588 100644 --- a/sportorg/modules/teamwork/server.py +++ b/sportorg/modules/teamwork/server.py @@ -1,8 +1,9 @@ -import json import socket from queue import Empty, Queue from threading import Event, Thread, main_thread +import orjson + from .packet_header import Header, ObjectTypes, Operations @@ -22,7 +23,7 @@ def exclude(self, addr): return self def get_packet(self): - pack_data = json.dumps(self.data).encode() + pack_data = orjson.dumps(self.data) return self.header.pack_header(len(pack_data)) + pack_data @@ -50,7 +51,7 @@ def __init__(self, conn, in_queue, out_queue, stop_event, logger): def run(self): with self.connect.conn: - self._logger.debug('Server receiver start') + self._logger.debug('Server receiver started') self._logger.info('Connected by {}'.format(self.connect.addr)) full_data = b'' self.connect.conn.settimeout(5) @@ -75,7 +76,7 @@ def run(self): else: if len(full_data) >= hdr.size: command = Command( - json.loads(full_data[: hdr.size].decode()), + orjson.loads(full_data[: hdr.size].decode()), Operations(hdr.op_type).name, self.connect.addr, ) @@ -166,7 +167,7 @@ def run(self) -> None: s.listen(1) s.settimeout(5) - self._logger.info('Server start') + self._logger.info('Server started') conns_queue = Queue() # type: ignore sender = ServerSenderThread( @@ -199,4 +200,4 @@ def run(self) -> None: sender.join() for srt in connections: srt.join() - self._logger.info('Server shutdown') + self._logger.info('Server stopped') diff --git a/tests/test_teamwork.py b/tests/test_teamwork.py index bfb4ed03..51d897b9 100644 --- a/tests/test_teamwork.py +++ b/tests/test_teamwork.py @@ -33,7 +33,7 @@ def test_teamwork(): 'Create', ) ) - result = client_out_queue.get(timeout=5) + result = client_out_queue.get(timeout=10) assert result.data == { 'object': 'Person', 'id': 'c24eef6c-a33b-4581-a6d1-78294711aef1',