Skip to content

Commit

Permalink
refactor: use default selector in server and client for teamwork
Browse files Browse the repository at this point in the history
  • Loading branch information
daxartio committed Nov 26, 2023
1 parent aee1e99 commit d0f971f
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 196 deletions.
23 changes: 3 additions & 20 deletions sportorg/common/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,10 @@ def subscribe(self, name, call, priority=0):
return self.add(name, priority).subscribe(call)

def produce(self, name, *args, **kwargs):
logging.debug(
str(datetime.datetime.now()) + ' Broker.produce started for ' + name
)
if name not in self._consumers:
logging.debug(
str(datetime.datetime.now())
+ ' Broker.produce finished (no consumers) for '
+ name
)
return None

if not isinstance(self._consumers[name], list):
logging.debug(
str(datetime.datetime.now())
+ ' Broker.produce finished (no consumers) for '
+ name
)
return None

result = []
Expand All @@ -67,16 +54,12 @@ def produce(self, name, *args, **kwargs):
r = method(*args, **kwargs)
except AttributeError:
self._logger.error(
'Class `{}` does not implement `{}`'.format(
cls.__class__.__name__, method_name
)
'Class `%s` does not implement `%s`',
cls.__class__.__name__,
method_name,
)
r = None

if r:
result.append(r)

logging.debug(
str(datetime.datetime.now()) + ' Broker.produce finished for ' + name
)
return result if len(result) else None
44 changes: 21 additions & 23 deletions sportorg/modules/teamwork/client.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import queue
import select
import selectors
import socket
from threading import Event, Thread
from typing import Tuple
from typing import Tuple, cast

import orjson

from .command import Command
from .packet_header import Header, Operations
from .server import Command


class ClientSender:
def __init__(self, in_queue: queue.Queue):
self._in_queue = in_queue

def send(self, conn: socket.socket) -> None:
def __call__(self, conn: socket.socket) -> None:
try:
while True:
cmd = self._in_queue.get_nowait()
Expand All @@ -32,7 +32,7 @@ def __init__(self, out_queue: queue.Queue):
self._hdr = Header()
self._is_new_pack = True

def read(self, conn: socket.socket) -> None:
def __call__(self, conn: socket.socket) -> None:
data = conn.recv(self.MSG_SIZE)
if not data:
return
Expand Down Expand Up @@ -76,39 +76,37 @@ def __init__(
self._out_queue = out_queue
self._stop_event = stop_event
self._logger = logger
self._client_started = Event()
self._started = Event()

def join_client(self) -> None:
self._client_started.wait()
def wait(self) -> None:
self._started.wait()

def run(self) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
selector = selectors.DefaultSelector()
try:
s.connect(self._addr)
s.settimeout(5)
s.setblocking(False)
selector.register(s, selectors.EVENT_READ | selectors.EVENT_WRITE)
self._logger.info('Client started')
self._client_started.set()
self._started.set()
sender = ClientSender(self._in_queue)
receiver = ClientReceiver(self._out_queue)
sockets = [s]
while True:
if self._stop_event.is_set():
break
rread, rwrite, err = select.select(sockets, sockets, [], 1)
if rread:
receiver.read(s)
if rwrite:
sender.send(s)

except ConnectionRefusedError as e:
self._logger.exception(e)
self._stop_event.set()
return
events = selector.select(timeout=1)
if not events:
continue
for key, mask in events:
if mask & selectors.EVENT_READ:
receiver(cast(socket.socket, key.fileobj))
if mask & selectors.EVENT_WRITE:
sender(cast(socket.socket, key.fileobj))
except Exception as e:
self._logger.exception(e)
self._stop_event.set()
return

s.close()
finally:
selector.close()
self._logger.info('Client stopped')
29 changes: 29 additions & 0 deletions sportorg/modules/teamwork/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import socket
from typing import Optional

import orjson

from .packet_header import Header, ObjectTypes, Operations


class Command:
def __init__(
self,
data=None,
op=Operations.Update.name,
sender: Optional[socket.socket] = None,
):
self.data = data
self.header = Header(data, op)
self.next_cmd_obj_type = ObjectTypes.Unknown.value
self._sender = sender

def __repr__(self) -> str:
return str(self.data)

def is_sender(self, sender: socket.socket) -> bool:
return self._sender is sender

def get_packet(self) -> bytes:
pack_data = orjson.dumps(self.data)
return self.header.pack_header(len(pack_data)) + pack_data
Loading

0 comments on commit d0f971f

Please sign in to comment.