Skip to content

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215

Open
griffinmilsap wants to merge 3 commits intodevfrom
feature/sync-lowlevel-api
Open

Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics)#215
griffinmilsap wants to merge 3 commits intodevfrom
feature/sync-lowlevel-api

Conversation

@griffinmilsap
Copy link
Collaborator

Summary

  • Adds a synchronous low‑level API (SyncContext, SyncPublisher, SyncSubscriber, init, spin, spin_once) so users can publish/subscribe without asyncio.
  • Adds ROS2‑style usage examples for both sync and async APIs.
  • Improves GraphServer auto‑start control via GraphContext(auto_start=...) and GraphService.ensure(auto_start=...).
  • Adds tests for the sync wrapper and a small perf script to quantify overhead.

Motivation

Many users find asyncio intimidating; the goal is to let them use ezmsg with a simple, synchronous API similar to ROS2 (with ez.sync.init(...), spin(), spin_once()), while preserving ezmsg’s backpressure semantics and zero‑copy safety.
This wrapper is explicitly for ergonomics, not peak throughput.

Implementation Details

New Sync API (src/ezmsg/core/sync.py)

  • SyncContext wraps GraphContext and runs an asyncio event loop in a background thread using new_threaded_event_loop.
  • create_publisher / create_subscription call the underlying async API via asyncio.run_coroutine_threadsafe.
  • spin() / spin_once() pull messages directly via recv_zero_copy() and only release backpressure after the user callback returns.
    • This preserves backpressure and avoids the prior “queueing” behavior that could release SHM‑backed messages too early.
  • spin_once() returns a boolean for “did work”.
  • Handles CacheMiss gracefully when a publisher exits under backpressure (stale notifications).

GraphServer Auto‑Start Control

  • GraphService.ensure(auto_start: bool | None = None)
    • None preserves existing “auto‑start only when address is not specified + no env override.”
    • True / False overrides.
  • GraphContext(..., auto_start=...) passes through to GraphService.ensure.
  • ez.sync.init(..., auto_start=...) mirrors GraphContext defaulting to None.

Examples

  • examples/simple_publisher.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        pub = ctx.create_publisher(TOPIC, force_tcp=True)

        print("Publisher Task Launched")
        count = 0
        try:
            while True:
                output = f"{count=}"
                pub.publish(output)
                print(output)
                time.sleep(0.1)
                count += 1
        except KeyboardInterrupt:
            pass
        print("Publisher Task Concluded")

    print("Done")
 
if __name__ == '__main__':
    main()
  • examples/simple_subscriber.py
import time
import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
    with ez.sync.init((host, port), auto_start=True) as ctx:
        print("Subscriber Task Launched")

        def on_message(msg: str) -> None:
            # Uncomment if you want to witness backpressure!
            # time.sleep(1.0)
            print(msg)

        ctx.create_subscription(TOPIC, callback=on_message)
        ez.sync.spin(ctx)

    print("Subscriber Task Concluded")

if __name__ == '__main__':
    main()
  • examples/simple_async_publisher.py
  • examples/simple_async_subscriber.py

Tests

  • tests/test_sync_api.py

Perf Script

  • tests/perf_sync_overhead.py

Threads and Concurrency Model

  • The sync wrapper runs one background asyncio loop thread for all async work.
  • The calling thread runs user code (spin, spin_once, callbacks).
  • Every publish() / recv() call crosses threads via run_coroutine_threadsafe.
    • This is safe and preserves backpressure, but introduces measurable overhead.

Performance

Measured using tests/perf_sync_overhead.py (local macOS example):

async: 0.0520s total, 5.20 us/msg, 192,203 msg/s
sync : 1.1506s total, 115.06 us/msg, 8,691 msg/s
overhead: 2111.5%

Interpretation:

  • Sync wrapper is significantly slower for micro‑messages due to per‑message thread hops.
  • The sync API is intended for ergonomics, not maximum throughput.
  • For high‑rate paths, the async API remains recommended.

Files Changed / Added

  • Added src/ezmsg/core/sync.py
  • Updated src/ezmsg/core/__init__.py
  • Updated src/ezmsg/core/graphcontext.py
  • Updated src/ezmsg/core/graphserver.py
  • Added examples/simple_publisher.py
  • Added examples/simple_subscriber.py
  • Added examples/simple_async_publisher.py
  • Added examples/simple_async_subscriber.py
  • Added tests/test_sync_api.py
  • Added tests/perf_sync_overhead.py
  • Removed examples/lowlevel.py (replaced by split examples)

Copy link
Contributor

@KonradPilch KonradPilch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great addition Griffin.

I couldn't see anything I'd change. I've run the copilot check just in case.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a synchronous “ROS2-style” low-level API layer on top of ezmsg’s async pub/sub, plus examples/tests, and introduces explicit control over GraphServer auto-start behavior via auto_start.

Changes:

  • Introduces ezmsg.core.sync (SyncContext, SyncPublisher, SyncSubscriber, init, spin, spin_once) backed by a background asyncio loop thread.
  • Extends GraphServer bring-up behavior with GraphContext(auto_start=...) and GraphService.ensure(auto_start=...).
  • Adds split sync/async low-level examples, sync API tests, and a perf comparison script.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
src/ezmsg/core/sync.py New synchronous wrapper API around GraphContext/Publisher/Subscriber, including callback-based spinning.
src/ezmsg/core/graphserver.py Adds auto_start override to GraphService.ensure and logs GraphServer startup.
src/ezmsg/core/graphcontext.py Plumbs auto_start through GraphContext to GraphService.ensure.
src/ezmsg/core/__init__.py Exposes the new sync API/module from ezmsg.core.
examples/simple_publisher.py New sync publisher example using ez.sync.init.
examples/simple_subscriber.py New sync subscriber example using spin().
examples/simple_async_publisher.py New async publisher example (replacement for prior combined example).
examples/simple_async_subscriber.py New async subscriber example (replacement for prior combined example).
examples/lowlevel_api.py Removes the previous combined low-level example in favor of split examples.
tests/test_sync_api.py Adds tests for sync autostart/spin_once, backpressure behavior, and CacheMiss handling.
tests/perf_sync_overhead.py Adds a standalone script to measure sync-wrapper overhead vs async usage.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

self._loop = asyncio.new_event_loop()
super().start()
self._server_up.wait()
logger.info(f'Started GraphServer at {address}')
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup log message uses the address argument, which is None when binding to an ephemeral port, and may not reflect the actual bound address. Prefer logging self.address (after _server_up.wait()) so the message always reports the real listening endpoint.

Suggested change
logger.info(f'Started GraphServer at {address}')
logger.info(f"Started GraphServer at {self.address}")

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing this locally actually caused a few tests to fail. I'm investigating.

Comment on lines +47 to +53
self._pub.pause()

def resume(self) -> None:
self._pub.resume()

def close(self) -> None:
self._pub.close()
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncPublisher.pause()/resume()/close() call methods on the underlying async Publisher directly from the calling thread. Those methods mutate asyncio primitives and cancel asyncio tasks owned by the background loop, which is not thread-safe and can raise runtime errors or cause missed wakeups. Marshal these operations onto the loop thread (e.g., loop.call_soon_threadsafe for sync methods, or provide async wrappers and invoke via run_coroutine_threadsafe).

Suggested change
self._pub.pause()
def resume(self) -> None:
self._pub.resume()
def close(self) -> None:
self._pub.close()
self._loop.call_soon_threadsafe(self._pub.pause)
def resume(self) -> None:
self._loop.call_soon_threadsafe(self._pub.resume)
def close(self) -> None:
self._loop.call_soon_threadsafe(self._pub.close)

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +105
def close(self) -> None:
self._sub.close()

Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncSubscriber.close() calls Subscriber.close() directly from the calling thread, but Subscriber.close() cancels an asyncio task created on the background loop. Task cancellation is not thread-safe across event loops/threads. Schedule close onto the loop thread (e.g., loop.call_soon_threadsafe) and consider offering a synchronous wait_closed() pattern to ensure teardown completes.

Copilot uses AI. Check for mistakes.
Comment on lines +382 to +404
while True:
remaining = None if deadline is None else max(0.0, deadline - loop.time())
tasks = [asyncio.create_task(_recv_entry(entry)) for entry in entries]
try:
done, pending = await asyncio.wait(
tasks, timeout=remaining, return_when=asyncio.FIRST_COMPLETED
)
if not done:
for task in pending:
try:
task.cancel()
except RuntimeError:
pass
await asyncio.gather(*pending, return_exceptions=True)
return None

for task in pending:
try:
task.cancel()
except RuntimeError:
pass
await asyncio.gather(*pending, return_exceptions=True)

Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_recv_any() cancels and gathers pending recv_zero_copy() tasks without handling the case where a “pending” task actually completed between asyncio.wait() returning and cancellation. In that race, the task result contains an acquired zero-copy context manager (cm) that will never have aexit called, leaking backpressure/message leases. When gathering pending tasks, inspect results and explicitly aexit any successfully-entered context managers, or restructure to guarantee only one task can ever successfully enter before the others are cleaned up.

Copilot uses AI. Check for mistakes.
return self._graph_address
return self._graph_context.graph_address

def __enter__(self) -> "SyncContext":
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncContext instances aren’t safely reusable after shutdown(): _shutdown_requested and _closed are never reset, so re-entering the same instance (or calling spin/spin_once after a shutdown) will immediately no-op or skip cleanup paths. Either document that SyncContext is single-use and raise if enter is called after shutdown, or reset shutdown/closed state (and clear callback subscriptions) on enter.

Suggested change
def __enter__(self) -> "SyncContext":
def __enter__(self) -> "SyncContext":
# SyncContext instances are single-use: they cannot be re-entered after shutdown.
if self._closed:
raise RuntimeError(
"SyncContext instances cannot be reused after shutdown; "
"create a new SyncContext instead."
)

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also very good catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave the SyncContext single-use or make it reusable by resetting _closed/_shutdown_requested? It's not clear to me what is better from a typical use case.

t2 = time.monotonic()

assert t2 - t1 >= 0.15
done.wait(2.0)
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can pass even if no messages are ever received: done.wait(2.0) isn’t asserted, and the received contents/order aren’t validated. Consider asserting that done.wait(...) returns True and that received matches the expected messages so the test reliably detects subscription/buffering regressions.

Suggested change
done.wait(2.0)
assert done.wait(2.0), "Timed out waiting for messages to be received"
assert received == ["one", "two"]

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +3
import time
import ezmsg.core as ez

Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time is imported but only used in commented-out code, which will trigger Ruff/pyflakes unused-import checks. Remove the import or use it unconditionally in the example (e.g., behind a flag).

Copilot uses AI. Check for mistakes.
Comment on lines +5 to +7
PORT = 12345
TOPIC = "/TEST"

Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PORT = 12345 is unused (the function uses the port parameter instead). This will fail Ruff unused-variable checks; remove the constant or use it as the default for the port parameter.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments