Skip to content

Commit

Permalink
py/stream: multicast fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Jan 20, 2025
1 parent 53f84e9 commit a36e294
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
2 changes: 1 addition & 1 deletion hitl/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def _main():
try:
logger.info("Testing stream reception")
_transport, stream = await StabilizerStream.open(
args.port, args.ip, args.broker
args.port, addr=args.ip, bind=get_local_ip(args.broker)
)
loss = await measure(stream, args.duration)
if loss > args.max_loss:
Expand Down
40 changes: 31 additions & 9 deletions py/stabilizer/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ def to_traces(self):
]


class ThermostatEem:
"""Thermostat-EEM format"""

format_id = 3

def __init__(self, header, body):
self.header = header
self.body = body

def size(self):
"""Return the data size of the frame in bytes"""
return len(self.body)

def to_si(self):
return np.frombuffer(
self.body, np.dtype([("input", "<f4", (4, 4)), ("output", "<f4", (4,))])
)


class Frame:
"""Stream frame constisting of a header and multiple data batches"""

Expand All @@ -88,6 +107,7 @@ class Frame:
header = namedtuple("Header", "magic format_id batches sequence")
parsers = {
AdcDac.format_id: AdcDac,
ThermostatEem.format_id: ThermostatEem,
}

@classmethod
Expand All @@ -107,26 +127,28 @@ class StabilizerStream(asyncio.DatagramProtocol):
"""Stabilizer streaming receiver protocol"""

@classmethod
async def open(cls, port=9293, addr="0.0.0.0", broker=None, maxsize=1):
async def open(cls, port=9293, addr="0.0.0.0", bind=None, maxsize=1):
"""Open a UDP socket and start receiving frames"""
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Increase the OS UDP receive buffer size to 4 MiB so that latency
# spikes don't impact much. Achieving 4 MiB may require increasing
# Increase the OS UDP receive buffer size to 16 MiB so that latency
# spikes don't impact much. Achieving 16 MiB may require increasing
# the max allowed buffer size, e.g. via
# `sudo sysctl net.core.rmem_max=26214400` but nowadays the default
# max appears to be ~ 50 MiB already.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 << 20)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16 << 20)

# We need to specify which interface to receive broadcasts from, or Windows may choose the
# We need to specify which interface to receive multicasts from, or Windows may choose the
# wrong one. Thus, use the broker address to figure out our local address for the interface
# of interest.
if ipaddress.ip_address(addr).is_multicast:
group = socket.inet_aton(addr)
iface = socket.inet_aton(get_local_ip(broker))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, group + iface)
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(addr) + socket.inet_aton(bind),
)
sock.bind(("", port))
else:
sock.bind((addr, port))
Expand Down Expand Up @@ -212,7 +234,7 @@ async def main():

logging.basicConfig(level=logging.INFO)
_transport, stream = await StabilizerStream.open(
args.port, args.host, args.broker, args.maxsize
args.port, args.host, get_local_ip(args.broker), args.maxsize
)
await measure(stream, args.duration)

Expand Down

0 comments on commit a36e294

Please sign in to comment.