Skip to content

Commit

Permalink
Merge pull request #216 from GaloisInc/glguy/blocking-io
Browse files Browse the repository at this point in the history
Using blocking network IO to avoid busy-wait
  • Loading branch information
glguy authored Sep 4, 2024
2 parents 8da9e41 + 2b3d235 commit c34f0aa
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 85 deletions.
5 changes: 5 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# argo-client next (TBA)
+ Use blocking IO to reduce CPU load when receiving replies
+ wait_for_reply_to now consumes the reply waited for. previously the reply was held in memory indefinitely
+ Removes RemoteSocketProcess.buffer_replies method. Replies are processed during wait_for_reply_to

# argo-client v0.0.12 (15 May 2024)
+ Bump `mypy` to `mypy-1.10`, and update its dependencies to support the bump. This allows for Python 3.12 support.

Expand Down
106 changes: 41 additions & 65 deletions python/argo_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,32 +158,22 @@ def __init__(self, command: str, *,
self.persist = persist
super().__init__(command, environment=environment)


def buffer_replies(self) -> None:
"""Read any replies that the server has sent, and add their byte
representation to the internal buffer, freeing up space in
the pipe or socket.
"""
try:
arrived = self.socket.recv(4096)
while arrived != b'':
self.buf.extend(arrived)
arrived = self.socket.recv(4096)
return None
except BlockingIOError:
return None

def get_one_reply(self) -> Optional[str]:
"""If a complete reply has been buffered, parse it from the buffer and
return it as a bytestring."""
self.buffer_replies()
try:
(msg, rest) = netstring.decode(self.buf)
self.buf = bytearray(rest)
self._log_rx(msg)
return msg
except (ValueError, IndexError):
return None
"""Return the next message if there is one. Block until the message
is ready or the socket has closed. Return None if the socket closes
and there are no buffered messages ready."""
while True:
got = netstring.decode(self.buf)
if got is None:
arrived = self.socket.recv(4096)
if arrived == '':
return None
self.buf.extend(arrived)
else:
(msg, rest) = got
self.buf = bytearray(rest)
self._log_rx(msg)
return msg

def send_one_message(self, message: str, expecting_response : bool = True) -> None:
self._log_tx(message)
Expand Down Expand Up @@ -233,7 +223,6 @@ def setup(self) -> None:

self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.connect(("localhost", self.port))
self.socket.setblocking(False)



Expand Down Expand Up @@ -261,33 +250,23 @@ def __init__(self, host: str, port: int, ipv6: bool=True):
def setup(self) -> None:
self.socket = socket.socket(socket.AF_INET6 if self.ipv6 else socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
self.socket.setblocking(False)

def buffer_replies(self) -> None:
"""Read any replies that the server has sent, and add their byte
representation to the internal buffer, freeing up space in
the pipe or socket.
"""
try:
arrived = self.socket.recv(4096)
while arrived != b'':
self.buf.extend(arrived)
arrived = self.socket.recv(4096)
return None
except BlockingIOError:
return None

def get_one_reply(self) -> Optional[str]:
"""If a complete reply has been buffered, parse it from the buffer and
return it as a bytestring."""
self.buffer_replies()
try:
(msg, rest) = netstring.decode(self.buf)
self.buf = bytearray(rest)
self._log_rx(msg)
return msg
except (ValueError, IndexError):
return None
"""Return the next message if there is one. Block until the message
is ready or the socket has closed. Return None if the socket closes
and there are no buffered messages ready."""
while True:
got = netstring.decode(self.buf)
if got is None:
arrived = self.socket.recv(4096)
if arrived == '':
return None
self.buf.extend(arrived)
else:
(msg, rest) = got
self.buf = bytearray(rest)
self._log_rx(msg)
return msg

def send_one_message(self, message: str, *, expecting_response : bool = True) -> None:
self._log_tx(message)
Expand Down Expand Up @@ -425,16 +404,6 @@ def get_id(self) -> int:
this connection."""
return self.ids.get()

def _process_replies(self) -> None:
"""Remove all pending replies from the internal buffer, parse them
into JSON, and add them to the internal collection of replies.
"""
reply_bytes = self.process.get_one_reply()
while reply_bytes is not None:
the_reply = json.loads(reply_bytes)
self.replies[the_reply['id']] = the_reply
reply_bytes = self.process.get_one_reply()

def send_command(self, method: str, params: dict, *, timeout : Optional[float] = None) -> int:
"""Send a message to the server with the given JSONRPC command
method and parameters. The return value is the unique request
Expand Down Expand Up @@ -478,11 +447,18 @@ def send_notification(self, method: str, params: dict) -> None:
def wait_for_reply_to(self, request_id: int) -> Any:
"""Block until a reply is received for the given
``request_id``. Return the reply."""
self._process_replies()
while request_id not in self.replies:
self._process_replies()

return self.replies[request_id] #self.replies.pop(request_id) # delete reply while returning it
if request_id in self.replies:
return self.replies.pop(request_id)

while True:
reply_bytes = self.process.get_one_reply()
if reply_bytes is not None:
the_reply = json.loads(reply_bytes)
if the_reply['id'] == request_id:
return the_reply
else:
self.replies[the_reply['id']] = the_reply

def logging(self, on : bool, *, dest : TextIO = sys.stderr) -> None:
"""Whether to log received and transmitted JSON."""
Expand Down
58 changes: 39 additions & 19 deletions python/argo_client/netstring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
as a lightweight transport layer for JSON RPC.
"""

from typing import Tuple
from typing import Optional, Tuple

def encode(string : str) -> bytes:
"""Encode a ``str`` into a netstring.
Expand All @@ -13,29 +13,49 @@ def encode(string : str) -> bytes:
bytestring = string.encode()
return str(len(bytestring)).encode() + b':' + bytestring + b','

def decode(netstring : bytes) -> Tuple[str, bytes]:
class InvalidNetstring(Exception):
"""Exception for malformed netstrings"""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)

def decode(netstring : bytes) -> Optional[Tuple[str, bytes]]:
"""Decode the first valid netstring from a bytestring, returning its
string contents and the remainder of the bytestring.
Returns None when the bytes are a prefix of a valid netstring.
Raises InvalidNetstring when the bytes are not a prefix of a valid
netstring.
>>> decode(b'5:hello,more')
('hello', b'more')
"""

i = 0
length_bytes = bytearray(b'')
while chr(netstring[i]).isdigit():
length_bytes.append(netstring[i])
i += 1
if chr(netstring[i]).encode() != b':':
raise ValueError("Malformed netstring, missing :")
length = int(length_bytes.decode())
i += 1
out = bytearray(b'')
for j in range(0, length):
out.append(netstring[i])
i += 1
if chr(netstring[i]).encode() != b',':
raise ValueError("Malformed netstring, missing ,")
i += 1
return (out.decode(), netstring[i:])
colon = netstring.find(b':')
if colon == -1 and len(netstring) >= 10 or colon >= 10:
# Avoid cases where the incomplete length is already too
# long or the length is complete but is too long.
# A minimum ten-digit length will be approximately 1GB or more
# which is larger than we should need to handle for this API
raise InvalidNetstring("message length too long")

if colon == -1:
# incomplete length, wait for more bytes
return None

lengthstring = netstring[0:colon]
if colon == 0 or not lengthstring.isdigit():
raise InvalidNetstring("invalid format, malformed message length")

length = int(lengthstring)
comma = colon + length + 1
if len(netstring) <= comma:
# incomplete message, wait for more bytes
return None

if netstring[comma] != 44: # comma
raise InvalidNetstring("invalid format, missing comma")

return (netstring[colon + 1 : comma].decode(), netstring[comma+1:])
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "argo-client"
version = "0.0.12"
version = "0.0.13"
readme = "README.md"
keywords = ["JSON", "RPC"]
description = "A JSON RPC client library."
Expand Down

0 comments on commit c34f0aa

Please sign in to comment.