-
Notifications
You must be signed in to change notification settings - Fork 9
Feature: Synchronous Low‑Level API for ezmsg (ROS2‑style ergonomics) #215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
30995d7
406eb98
f7705b4
d552644
5c17e7c
f778edd
efa6b7e
698906b
911d7bb
bbea28c
0174559
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| import asyncio | ||
|
|
||
| import ezmsg.core as ez | ||
|
|
||
| TOPIC = "/TEST" | ||
|
|
||
|
|
||
| async def main(host: str = "127.0.0.1", port: int = 12345) -> None: | ||
| async with ez.GraphContext((host, port), auto_start=True) as ctx: | ||
| pub = await ctx.publisher(TOPIC) | ||
| try: | ||
| print("Publisher Task Launched") | ||
| count = 0 | ||
| while True: | ||
| await pub.broadcast(f"{count=}") | ||
| await asyncio.sleep(0.1) | ||
| count += 1 | ||
| except asyncio.CancelledError: | ||
| pass | ||
| finally: | ||
| print("Publisher Task Concluded") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| from argparse import ArgumentParser | ||
|
|
||
| parser = ArgumentParser() | ||
| parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver") | ||
| parser.add_argument("--port", default=12345, type=int, help="port for graphserver") | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| asyncio.run(main(host=args.host, port=args.port)) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| import asyncio | ||
|
|
||
| import ezmsg.core as ez | ||
|
|
||
| PORT = 12345 | ||
| TOPIC = "/TEST" | ||
|
|
||
|
|
||
| async def main(host: str = "127.0.0.1", port: int = 12345) -> None: | ||
| async with ez.GraphContext((host, port), auto_start=True) as ctx: | ||
| sub = await ctx.subscriber(TOPIC) | ||
| try: | ||
| print("Subscriber Task Launched") | ||
| while True: | ||
| async with sub.recv_zero_copy() as msg: | ||
| # Uncomment if you want to witness backpressure! | ||
| # await asyncio.sleep(1.0) | ||
| print(msg) | ||
| except asyncio.CancelledError: | ||
| pass | ||
| finally: | ||
| print("Subscriber Task Concluded") | ||
| print("Detached") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| from argparse import ArgumentParser | ||
|
|
||
| parser = ArgumentParser() | ||
| parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver") | ||
| parser.add_argument("--port", default=12345, type=int, help="port for graphserver") | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| asyncio.run(main(host=args.host, port=args.port)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| import time | ||
|
|
||
| import ezmsg.core as ez | ||
|
|
||
| TOPIC = "/TEST" | ||
|
|
||
| def main(host: str = "127.0.0.1", port: int = 12345) -> None: | ||
| with ez.sync.init((host, port), auto_start=True) as ctx: | ||
| pub = ctx.create_publisher(TOPIC, force_tcp=True) | ||
|
|
||
| print("Publisher Task Launched") | ||
| count = 0 | ||
| try: | ||
| while True: | ||
| output = f"{count=}" | ||
| pub.publish(output) | ||
| print(output) | ||
| time.sleep(0.1) | ||
| count += 1 | ||
| except KeyboardInterrupt: | ||
| pass | ||
| print("Publisher Task Concluded") | ||
|
|
||
| print("Done") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| from argparse import ArgumentParser | ||
|
|
||
| parser = ArgumentParser() | ||
| parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver") | ||
| parser.add_argument("--port", default=12345, type=int, help="port for graphserver") | ||
| args = parser.parse_args() | ||
|
|
||
| main(host=args.host, port=args.port) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| import time | ||
| import ezmsg.core as ez | ||
|
|
||
|
Comment on lines
+1
to
+3
|
||
| TOPIC = "/TEST" | ||
|
|
||
|
|
||
| def main(host: str = "127.0.0.1", port: int = 12345) -> None: | ||
| with ez.sync.init((host, port), auto_start=True) as ctx: | ||
| print("Subscriber Task Launched") | ||
|
|
||
| def on_message(msg: str) -> None: | ||
| # Uncomment if you want to witness backpressure! | ||
| # time.sleep(1.0) | ||
| print(msg) | ||
|
|
||
| ctx.create_subscription(TOPIC, callback=on_message) | ||
| ez.sync.spin(ctx) | ||
|
|
||
| print("Subscriber Task Concluded") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| from argparse import ArgumentParser | ||
|
|
||
| parser = ArgumentParser() | ||
| parser.add_argument("--host", default="127.0.0.1", help="hostname for graphserver") | ||
| parser.add_argument("--port", default=12345, type=int, help="port for graphserver") | ||
| args = parser.parse_args() | ||
|
|
||
| main(host=args.host, port=args.port) | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -95,6 +95,7 @@ def start(self, address: AddressType | None = None) -> None: # type: ignore[ove | |||||
| self._loop = asyncio.new_event_loop() | ||||||
| super().start() | ||||||
| self._server_up.wait() | ||||||
| logger.info(f'Started GraphServer at {address}') | ||||||
|
||||||
| logger.info(f'Started GraphServer at {address}') | |
| logger.info(f"Started GraphServer at {self.address}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementing this locally actually caused a few tests to fail. I'm investigating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PORT = 12345is unused (the function uses theportparameter instead). This will fail Ruff unused-variable checks; remove the constant or use it as the default for theportparameter.