Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 0 additions & 119 deletions examples/lowlevel_api.py

This file was deleted.

33 changes: 33 additions & 0 deletions examples/simple_async_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

import ezmsg.core as ez

TOPIC = "/TEST"


async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
async with ez.GraphContext((host, port), auto_start=True) as ctx:
pub = await ctx.publisher(TOPIC)
try:
print("Publisher Task Launched")
count = 0
while True:
await pub.broadcast(f"{count=}")
await asyncio.sleep(0.1)
count += 1
except asyncio.CancelledError:
pass
finally:
print("Publisher Task Concluded")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")

args = parser.parse_args()

asyncio.run(main(host=args.host, port=args.port))
35 changes: 35 additions & 0 deletions examples/simple_async_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

import ezmsg.core as ez

PORT = 12345
TOPIC = "/TEST"

Comment on lines +5 to +7
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PORT = 12345 is unused (the function uses the port parameter instead). This will fail Ruff unused-variable checks; remove the constant or use it as the default for the port parameter.

Copilot uses AI. Check for mistakes.

async def main(host: str = "127.0.0.1", port: int = 12345) -> None:
async with ez.GraphContext((host, port), auto_start=True) as ctx:
sub = await ctx.subscriber(TOPIC)
try:
print("Subscriber Task Launched")
while True:
async with sub.recv_zero_copy() as msg:
# Uncomment if you want to witness backpressure!
# await asyncio.sleep(1.0)
print(msg)
except asyncio.CancelledError:
pass
finally:
print("Subscriber Task Concluded")
print("Detached")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")

args = parser.parse_args()

asyncio.run(main(host=args.host, port=args.port))
35 changes: 35 additions & 0 deletions examples/simple_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import time

import ezmsg.core as ez

TOPIC = "/TEST"

def main(host: str = "127.0.0.1", port: int = 12345) -> None:
with ez.sync.init((host, port), auto_start=True) as ctx:
pub = ctx.create_publisher(TOPIC, force_tcp=True)

print("Publisher Task Launched")
count = 0
try:
while True:
output = f"{count=}"
pub.publish(output)
print(output)
time.sleep(0.1)
count += 1
except KeyboardInterrupt:
pass
print("Publisher Task Concluded")

print("Done")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
args = parser.parse_args()

main(host=args.host, port=args.port)
30 changes: 30 additions & 0 deletions examples/simple_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time
import ezmsg.core as ez

Comment on lines +1 to +3
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time is imported but only used in commented-out code, which will trigger Ruff/pyflakes unused-import checks. Remove the import or use it unconditionally in the example (e.g., behind a flag).

Copilot uses AI. Check for mistakes.
TOPIC = "/TEST"


def main(host: str = "127.0.0.1", port: int = 12345) -> None:
with ez.sync.init((host, port), auto_start=True) as ctx:
print("Subscriber Task Launched")

def on_message(msg: str) -> None:
# Uncomment if you want to witness backpressure!
# time.sleep(1.0)
print(msg)

ctx.create_subscription(TOPIC, callback=on_message)
ez.sync.spin(ctx)

print("Subscriber Task Concluded")


if __name__ == "__main__":
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver")
parser.add_argument("--port", default=12345, type=int, help="port for graphserver")
args = parser.parse_args()

main(host=args.host, port=args.port)
6 changes: 6 additions & 0 deletions src/ezmsg/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
"NormalTermination",
"GraphServer",
"GraphContext",
"sync",
"SyncContext",
"SyncPublisher",
"SyncSubscriber",
"run_command",
"Publisher",
"Subscriber",
Expand All @@ -45,6 +49,8 @@
from .backendprocess import Complete, NormalTermination
from .graphserver import GraphServer
from .graphcontext import GraphContext
from . import sync
from .sync import SyncContext, SyncPublisher, SyncSubscriber
from .command import run_command
from .pubclient import Publisher
from .subclient import Subscriber
Expand Down
4 changes: 1 addition & 3 deletions src/ezmsg/core/backendprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any

from .stream import Stream, InputStream, OutputStream
from .unit import Unit, TIMEIT_ATTR, SUBSCRIBES_ATTR, ZERO_COPY_ATTR
from .unit import Unit, TIMEIT_ATTR, SUBSCRIBES_ATTR
from .messagechannel import LeakyQueue

from .graphcontext import GraphContext
Expand Down Expand Up @@ -374,8 +374,6 @@ async def wrapped_task(msg: Any = None) -> None:
result = call_fn(msg)
if inspect.isasyncgen(result):
async for stream, obj in result:
if obj and getattr(task, ZERO_COPY_ATTR, False) and obj is msg:
obj = deepcopy(obj)
await pub_fn(stream, obj)

elif asyncio.iscoroutine(result):
Expand Down
10 changes: 9 additions & 1 deletion src/ezmsg/core/graphcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class GraphContext:

:param graph_service: Optional graph service instance to use
:type graph_service: GraphService | None
:param auto_start: Whether to auto-start a GraphServer if connection fails.
If None, defaults to auto-start only when graph_address is not provided
and no environment override is set.
:type auto_start: bool | None

.. note::
The GraphContext is typically managed automatically by the ezmsg runtime
Expand All @@ -40,11 +44,13 @@ class GraphContext:
def __init__(
self,
graph_address: AddressType | None = None,
auto_start: bool | None = None,
) -> None:
self._clients = set()
self._edges = set()
self._graph_address = graph_address
self._graph_server = None
self._auto_start = auto_start

@property
def graph_address(self) -> AddressType | None:
Expand Down Expand Up @@ -130,7 +136,9 @@ async def resume(self) -> None:
await GraphService(self.graph_address).resume()

async def _ensure_servers(self) -> None:
self._graph_server = await GraphService(self.graph_address).ensure()
self._graph_server = await GraphService(self.graph_address).ensure(
auto_start=self._auto_start
)

async def _shutdown_servers(self) -> None:
if self._graph_server is not None:
Expand Down
10 changes: 7 additions & 3 deletions src/ezmsg/core/graphserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def start(self, address: AddressType | None = None) -> None: # type: ignore[ove
self._loop = asyncio.new_event_loop()
super().start()
self._server_up.wait()
logger.info(f'Started GraphServer at {address}')
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup log message uses the address argument, which is None when binding to an ephemeral port, and may not reflect the actual bound address. Prefer logging self.address (after _server_up.wait()) so the message always reports the real listening endpoint.

Suggested change
logger.info(f'Started GraphServer at {address}')
logger.info(f"Started GraphServer at {self.address}")

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementing this locally actually caused a few tests to fail. I'm investigating.


def stop(self) -> None:
self._shutdown.set()
Expand Down Expand Up @@ -453,15 +454,18 @@ def create_server(self) -> GraphServer:
self._address = server.address
return server

async def ensure(self) -> GraphServer | None:
async def ensure(self, auto_start: bool | None = None) -> GraphServer | None:
"""
Try connecting to an existing server. If none is listening and no explicit
address/environment is set, start one and return it. If an existing one is
found, return None.
found, return None. If auto_start is provided, it overrides the default
behavior.
"""
server = None
ensure_server = False
if self._address is None:
if auto_start is not None:
ensure_server = auto_start
elif self._address is None:
# Only auto-start if env var not forcing a location
ensure_server = self.ADDR_ENV not in os.environ

Expand Down
Loading