Skip to content

Commit

Permalink
Merge pull request #25 from ezmsg-org/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
cboulay authored Nov 26, 2024
2 parents b39b85c + fbd2450 commit b163402
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 128 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
rm liblsl-1.16.0-OSX_arm64.tar.bz2
- name: Install uv
uses: astral-sh/setup-uv@v2
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
cache-dependency-glob: "uv.lock"
Expand All @@ -57,7 +57,7 @@ jobs:
run: uv python install ${{ matrix.python-version }}

- name: Install the project
run: uv sync --all-extras --dev
run: uv sync --all-extras

- name: Ruff check
uses: astral-sh/ruff-action@v1
Expand Down
14 changes: 5 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ dynamic = ["version"]

[project.optional-dependencies]
test = [
"ezmsg-sigproc>=1.5.0",
"flake8>=7.1.1",
"pytest-cov>=5.0.0",
"pytest>=8.3.3",
"ezmsg-sigproc>=1.5.0",
]
dev = [
"ruff>=0.6.6",
"typer>=0.13.0",
]

[build-system]
Expand All @@ -33,11 +37,3 @@ version-file = "src/ezmsg/lsl/__version__.py"

[tool.hatch.build.targets.wheel]
packages = ["src/ezmsg"]

[tool.uv]
dev-dependencies = [
"ruff>=0.6.6",
"typer>=0.13.0",
]

[tool.uv.sources]
9 changes: 8 additions & 1 deletion src/ezmsg/lsl/inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._msg_template: typing.Optional[AxisArray] = None
self._fetch_buffer: typing.Optional[npt.NDArray] = None
self._clock_sync = ClockSync()
self._clock_sync = ClockSync(run_thread=False)

def _reset_resolver(self) -> None:
self.STATE.resolver = pylsl.ContinuousResolver(pred=None, forget_after=30.0)
Expand Down Expand Up @@ -218,6 +218,13 @@ def shutdown(self) -> None:
del self.STATE.resolver
self.STATE.resolver = None

@ez.task
async def update_clock(self) -> None:
while True:
if self.STATE.inlet is not None:
self._clock_sync.run_once()
await asyncio.sleep(0.1)

@ez.subscriber(INPUT_SETTINGS)
async def on_settings(self, msg: LSLInletSettings) -> None:
# The message may be full LSLInletSettings, a dict of settings, just the info, or dict of just info.
Expand Down
10 changes: 9 additions & 1 deletion src/ezmsg/lsl/outlet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import typing

import ezmsg.core as ez
Expand Down Expand Up @@ -52,12 +53,19 @@ class LSLOutletUnit(ez.Unit):

async def initialize(self) -> None:
self._stream_created = False
self._clock_sync = ClockSync()
self._clock_sync = ClockSync(run_thread=False)

def shutdown(self) -> None:
del self.STATE.outlet
self.STATE.outlet = None

@ez.task
async def update_clock(self) -> None:
while True:
if self.STATE.outlet is not None:
self._clock_sync.run_once()
await asyncio.sleep(0.1)

@ez.subscriber(INPUT_SIGNAL, zero_copy=True)
async def lsl_outlet(self, msg: AxisArray) -> None:
if self.STATE.outlet is None:
Expand Down
34 changes: 24 additions & 10 deletions src/ezmsg/lsl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,41 @@ def __new__(cls, *args, **kwargs):
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, alpha: float = 0.1, min_interval: float = 0.1):
def __init__(self, alpha: float = 0.1, min_interval: float = 0.1, run_thread: bool = True):
if not hasattr(self, "_initialized"):
self._alpha = alpha
self._interval = min_interval

self._initialized = True
self._last_time = time.time() - 1e9
self._running = False
self._thread: typing.Optional[threading.Thread] = None
# Do first burst so we have a real offset even before the thread starts.
xs, ys = collect_timestamp_pairs(100)
self._offset: float = np.mean(ys - xs)

self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._initialized = True
self._running = True
self._thread.start()
if run_thread:
self.start()

def run_once(self, n: int = 4, force: bool = False):
if force or (time.time() - self._last_time) > self._interval:
xs, ys = collect_timestamp_pairs(n)
offset = np.mean(ys - xs)
self._offset = (1 - self._alpha) * self._offset + self._alpha * offset
self._last_time = time.time()

def _run(self):
while self._running:
time.sleep(self._interval)
xs, ys = collect_timestamp_pairs(4)
offset = np.mean(ys - xs)
self._offset = (1 - self._alpha) * self._offset + self._alpha * offset
self.run_once(4, True)

def start(self):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._thread.start()

def stop(self):
self._running = False

@property
def offset(self) -> float:
Expand Down
49 changes: 46 additions & 3 deletions tests/test_inlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import pylsl
import pytest
import ezmsg.core as ez
from ezmsg.sigproc.synth import Clock, ClockSettings
from ezmsg.util.debuglog import DebugLog, DebugLogSettings
from ezmsg.util.messages.axisarray import AxisArray
from ezmsg.util.messagelogger import MessageLogger
from ezmsg.util.messagecodec import message_log
from ezmsg.util.terminate import TerminateOnTotal
from ezmsg.util.terminate import TerminateOnTotal, TerminateOnTimeout, TerminateOnTimeoutSettings, \
TerminateOnTotalSettings

from ezmsg.lsl.units import LSLInfo, LSLInletSettings, LSLInletUnit

Expand All @@ -38,7 +41,10 @@ class DummyOutlet(ez.Unit):
@ez.task
async def run_dummy(self) -> None:
info = pylsl.StreamInfo(
name="dummy", type="dummy", channel_count=self.SETTINGS.n_chans, nominal_srate=self.SETTINGS.rate
name="dummy",
type="dummy",
channel_count=self.SETTINGS.n_chans,
nominal_srate=self.SETTINGS.rate,
)
outlet = pylsl.StreamOutlet(info)
eff_rate = self.SETTINGS.rate or 100.0
Expand All @@ -54,8 +60,45 @@ async def run_dummy(self) -> None:
n_pushed += n_interval


def test_inlet_collection():
"""The primary purpose of this test is to verify that LSLInletUnit can be included in a collection."""

class LSLTestSystemSettings(ez.Settings):
stream_name: str = "dummy"
stream_type: str = "dummy"

class LSLTestSystem(ez.Collection):
SETTINGS = LSLTestSystemSettings

DUMMY = DummyOutlet()
INLET = LSLInletUnit()
LOGGER = DebugLog()
TERM = TerminateOnTotal()

def configure(self) -> None:
self.INLET.apply_settings(
LSLInletSettings(
LSLInfo(
name=self.SETTINGS.stream_name, type=self.SETTINGS.stream_type
)
)
)
self.LOGGER.apply_settings(DebugLogSettings(name="test_inlet_collection"))
self.TERM.apply_settings(TerminateOnTotalSettings(total=10))

def network(self) -> ez.NetworkDefinition:
return (
(self.INLET.OUTPUT_SIGNAL, self.LOGGER.INPUT),
(self.LOGGER.OUTPUT, self.TERM.INPUT_MESSAGE),
)

# This next line raises an error if the ClockSync object runs its own thread.
system = LSLTestSystem()
ez.run(SYSTEM=system)


@pytest.mark.parametrize("rate", [100.0, 0.0])
def test_inlet_system(rate: float):
def test_inlet_comps_conns(rate: float):
n_messages = 20
file_path = Path(tempfile.gettempdir())
file_path = file_path / Path("test_inlet_system.txt")
Expand Down
16 changes: 12 additions & 4 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import time

import numpy as np
import pytest

from ezmsg.lsl.util import ClockSync, collect_timestamp_pairs


def test_clock_sync():
@pytest.mark.parametrize("own_thread", [True, False])
def test_clock_sync(own_thread: bool):
tol = 10e-3 # 1 msec

clock_sync = ClockSync()
# Let it run a bit to get a stable estimate.
time.sleep(1.0)
clock_sync = ClockSync(run_thread=own_thread)
if own_thread:
# Let it run a bit to get a stable estimate.
time.sleep(1.0)
clock_sync.stop()
else:
for ix in range(10):
clock_sync.run_once()
time.sleep(0.1)

offsets = []
for _ in range(10):
Expand Down
Loading

0 comments on commit b163402

Please sign in to comment.