[RFC] Notification/Standard broadcasting v2#745
[RFC] Notification/Standard broadcasting v2#745kurtjd wants to merge 1 commit intoOpenDevicePartnership:v0.2.0from
Conversation
There was a problem hiding this comment.
Pull request overview
This RFC introduces a v2 approach for service message broadcasting and relay notifications by standardizing on a single-publisher pubsub channel per service, adding per-service message types, and extending the relay handler macro/traits to wait for notifications across subscribed services.
Changes:
- Add
SinglePublisherChannel(wrapper overembassy_sync::PubSubChannel) and new per-service*Messagetypes for broadcasting. - Introduce
odp-service-commonwith a(control_handle, Runner)service pattern andspawn_service!helper macro; migrate time-alarm and eSPI services accordingly. - Extend relay traits +
impl_odp_mctp_relay_handler!to accept per-service subscribers and implementwait_for_notification()usingis_notification().
Reviewed changes
Copilot reviewed 25 out of 29 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| time-alarm-service/tests/tad_test.rs | Updates tests to new (Service, Runner) init/run pattern (currently missing message_publisher init param). |
| time-alarm-service/src/task.rs | Removes legacy “run the service task” wrapper. |
| time-alarm-service/src/lib.rs | Refactors into Resources/Runner/control-handle + adds DynPublisher for broadcasting TimeAlarmMessage. |
| time-alarm-service/Cargo.toml | Adds dependency on odp-service-common. |
| time-alarm-service-messages/src/lib.rs | Adds TimeAlarmMessage enum for broadcast/notification classification. |
| thermal-service/src/lib.rs | Adds ThermalMessage + implements is_notification/MessageType. |
| thermal-service-messages/src/lib.rs | Adds ThermalMessage type. |
| odp-service-common/src/runnable_service.rs | Introduces Service/ServiceRunner traits and spawn_service! macro. |
| odp-service-common/src/lib.rs | New crate root exporting runnable_service. |
| odp-service-common/Cargo.toml | New crate manifest for shared service infrastructure. |
| examples/rt685s-evk/src/bin/time_alarm.rs | Updates example to create SinglePublisherChannel, spawn service via spawn_service!, and pass subscriber into relay handler. |
| examples/rt685s-evk/Cargo.toml | Adds odp-service-common dependency. |
| examples/rt685s-evk/Cargo.lock | Lockfile update reflecting new crate + dependency revisions. |
| examples/rt633/Cargo.lock | Lockfile update reflecting dependency revisions. |
| espi-service/src/task.rs | Legacy task helper signature adjusted but now stale vs new runner-based design. |
| espi-service/src/lib.rs | Stops exporting task module; re-exports new implementation. |
| espi-service/src/espi_service.rs | Migrates to Resources/Runner pattern and integrates wait_for_notification() into main select loop. |
| espi-service/Cargo.toml | Replaces message-crate dependencies with odp-service-common. |
| embedded-service/src/relay/mod.rs | Extends relay traits/macro: adds MessageType, is_notification, and wait_for_notification with per-service subscribers + notification ids. |
| embedded-service/src/lib.rs | Re-exports embassy_sync in _macro_internal for macro expansion. |
| embedded-service/src/broadcaster/single_publisher.rs | Adds SinglePublisherChannel wrapper and unit tests. |
| embedded-service/src/broadcaster/mod.rs | Exposes new single_publisher broadcaster module. |
| docs/api-guidelines.md | Adds repository-wide API guidelines documenting external resources + runner-based concurrency patterns. |
| debug-service/src/debug_service.rs | Adds DebugMessage and relay MessageType/is_notification implementation. |
| debug-service-messages/src/lib.rs | Adds DebugMessage type. |
| battery-service/src/lib.rs | Adds BatteryMessage and relay MessageType/is_notification implementation. |
| battery-service-messages/src/lib.rs | Adds BatteryMessage type. |
| Cargo.toml | Adds odp-service-common to workspace members and workspace deps. |
| Cargo.lock | Lockfile update reflecting new crate + dependency adjustments. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 15 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
embedded-service/src/relay/mod.rs:214
- The macro docs/example for
impl_odp_mctp_relay_handler!still describenew()as taking only service handler references, but the generatednew()now also requires aDynSubscriberper service. Update the docs and example call so users can construct the relay handler correctly with subscribers.
/// This macro will emit a type with the name you specify that is generic over a lifetime for the hardware (probably 'static in production code),
/// implements the `RelayHandler` trait, and has a single constructor method `new` that takes as arguments references to the service handler
/// types that you specify that have the 'hardware lifetime'.
///
/// The macro takes the following inputs once:
/// relay_type_name: The name of the relay type to generate. This is arbitrary. The macro will emit a type with this name.
///
/// Followed by a list of any number of service entries, which are specified by the following inputs:
/// service_name: A name to assign to generated identifiers associated with the service, e.g. "Battery".
/// This can be arbitrary.
/// service_id: A unique u8 that addresses that service on the EC.
/// service_notification_id: A unique u8 identifying notifications from this service, distinct from service_id.
/// service_handler_type: A type that implements the RelayServiceHandler trait, which will be used to process messages
/// for this service.
///
/// Example usage:
///
/// ```ignore
///
/// impl_odp_mctp_relay_handler!(
/// MyRelayHanderType;
/// Battery, 0x9, 0, battery_service::Service<'static>;
/// TimeAlarm, 0xB, 1, time_alarm_service::Service<'static>;
/// );
///
/// let relay_handler = MyRelayHandlerType::new(battery_service_instance, time_alarm_service_instance);
///
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| //! Single-publisher broadcast channel. | ||
| //! | ||
| //! A wrapper around [`embassy_sync::pubsub::PubSubChannel`] that enforces exactly one publisher | ||
| //! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting |
There was a problem hiding this comment.
Doc comment says "any number of subscribers", but the channel is still limited by the SUBS const generic (maximum subscribers). Consider rewording to avoid implying it’s unbounded.
| //! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting | |
| //! with up to `SUBS` subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting |
| $( | ||
| if let core::task::Poll::Ready(wait_result) = [<$service_name:snake _fut>].as_mut().poll(cx) { | ||
| match wait_result { | ||
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => { | ||
| if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) { | ||
| return core::task::Poll::Ready(Some($service_notification_id)); | ||
| } else { | ||
| return core::task::Poll::Ready(None); | ||
| } | ||
| } | ||
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => { | ||
| // Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded. | ||
| // We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing, | ||
| // or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost. | ||
| $crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count); | ||
| return core::task::Poll::Ready(None); | ||
| } | ||
| } | ||
| } | ||
| )+ | ||
| core::task::Poll::Pending |
There was a problem hiding this comment.
wait_for_notification can starve later services: the poll loop returns immediately when the first polled subscriber becomes Ready, even if that message is not a notification (or is Lagged). If an earlier service publishes frequent non-notification messages, notifications from later services may never be observed. Consider polling all subscriber futures in the same poll_fn call and only returning Ready(None) after checking every service, or otherwise implementing fairness/round-robin so one noisy service can’t block others.
| $( | |
| if let core::task::Poll::Ready(wait_result) = [<$service_name:snake _fut>].as_mut().poll(cx) { | |
| match wait_result { | |
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => { | |
| if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) { | |
| return core::task::Poll::Ready(Some($service_notification_id)); | |
| } else { | |
| return core::task::Poll::Ready(None); | |
| } | |
| } | |
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => { | |
| // Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded. | |
| // We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing, | |
| // or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost. | |
| $crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count); | |
| return core::task::Poll::Ready(None); | |
| } | |
| } | |
| } | |
| )+ | |
| core::task::Poll::Pending | |
| // Track the first notification we see in this poll, and whether | |
| // any futures are still pending or produced non-notification messages. | |
| let mut notification_id: core::option::Option<u8> = core::option::Option::None; | |
| let mut any_pending = false; | |
| let mut saw_non_notification_or_lagged = false; | |
| $( | |
| match [<$service_name:snake _fut>].as_mut().poll(cx) { | |
| core::task::Poll::Ready(wait_result) => { | |
| match wait_result { | |
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => { | |
| if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) { | |
| notification_id = core::option::Option::Some($service_notification_id); | |
| } else { | |
| saw_non_notification_or_lagged = true; | |
| } | |
| } | |
| $crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => { | |
| // Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded. | |
| // We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing, | |
| // or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost. | |
| $crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count); | |
| saw_non_notification_or_lagged = true; | |
| } | |
| } | |
| } | |
| core::task::Poll::Pending => { | |
| any_pending = true; | |
| } | |
| } | |
| )+ | |
| if let core::option::Option::Some(id) = notification_id { | |
| core::task::Poll::Ready(core::option::Option::Some(id)) | |
| } else if any_pending { | |
| core::task::Poll::Pending | |
| } else if saw_non_notification_or_lagged { | |
| // At least one message was received, but none were notifications. | |
| core::task::Poll::Ready(core::option::Option::None) | |
| } else { | |
| // No futures were pending and no messages were observed; be conservative. | |
| core::task::Poll::Pending | |
| } |
|
Yeah, I like where this design is going :) I have a couple notes on implementation, will start posting those, but I think the broad shape of this is good |
|
|
||
| fn is_notification(message: &Self::MessageType) -> bool { | ||
| matches!(message, TimeAlarmMessage::TimerExpired) | ||
| } |
There was a problem hiding this comment.
I think in the specific case of the time-alarm service, notifications may not be consumed by the relay service - rather, I think there's some not-yet-written power management service that's going to need to listen to these and power on the device when the timer expires, so I think the correct implementation here might just be "return false"
[edit] actually, may want to do an explicit match and return false in all cases so future changes have to make an explicit decision if they add another variant
There was a problem hiding this comment.
Ah gotchya, thanks. Threw this in as an example but can definitely change it so it will return false on existing variants.
| #[derive(Clone, Copy, PartialEq, Debug)] | ||
| #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| pub enum TimeAlarmMessage { | ||
| TimerExpired, |
There was a problem hiding this comment.
Sounds good! Yeah if this overall design is agreed upon I can spend a bit more time coming up with more useful Service messages.
| /// Message type for the Thermal service. | ||
| #[derive(Clone, Copy, PartialEq, Debug)] | ||
| #[cfg_attr(feature = "defmt", derive(defmt::Format))] | ||
| pub struct ThermalMessage; |
There was a problem hiding this comment.
nit: for future extensibility, it may be better to declare an enum with a single variant now and match on it in the is_notification method; that way, if someone adds a new variant, they have to make an explicit decision on whether or not it should trigger a notification, or else the is_notification implementation won't compile because not all variants are matched
| embedded_services::init().await; | ||
| info!("services initialized"); | ||
|
|
||
| // All services will basically need to create a SinglePublisherChannel for themsevles. |
| //! | ||
| //! A wrapper around [`embassy_sync::pubsub::PubSubChannel`] that enforces exactly one publisher | ||
| //! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting | ||
| //! to create a second publisher returns [`embassy_sync::pubsub::Error::MaximumPublishersReached`]. |
There was a problem hiding this comment.
We may want to do some sort of type-state thing to enforce this? That way we can catch misuse at compile time
There was a problem hiding this comment.
Was thinking about this since you mentioned it the other day but couldn't really think of a good way to do that in practice. Well I suppose when we call publisher() it could consume self and return a tuple (the Channel in a new type state, as well as the DynPublisher). Maybe something like:
struct PublisherExists;
struct NoPublisherExists;
pub struct SinglePublisherChannel<MODE, T: Clone, const CAP: usize, const SUBS: usize> {
inner: PubSubChannel<GlobalRawMutex, T, CAP, SUBS, 1>,
_phantom: PhantomData<MODE>,
}
impl<T: Clone, const CAP: usize, const SUBS: usize> SinglePublisherChannel<NoPublisherExists, T, CAP, SUBS> {
/// Obtain the single publisher for this channel.
pub fn publisher(self) -> (SinglePublisherChannel<PublisherExists, T, CAP, SUBS>, DynPublisher<'_, T>) {
let new_channel = SinglePublisherChannel { self.inner, _phantom: PhantomData };
(new_channel, new_channel.inner.dyn_publisher())
}
}Probably some wrong syntax there since writing it off the top of my head but to give you an idea of an approach.
| let _relay_handler = EspiRelayHandler::new(&time_service); | ||
| // Here we pass a subscriber into the relay handler so it can listen for notifications from the time alarm service | ||
| // and then relay those to the host SoC over eSPI when they occur | ||
| let _relay_handler = EspiRelayHandler::new(&time_service, time_alarm_subscriber); |
There was a problem hiding this comment.
I haven't fully thought this out yet, but at first glance it seems like it might be nice for ergonomics to have part of the relayable trait be 'give me a subscriber handle'? That way the user doesn't have to pass both foo_service and foo_service_subscriber explicitly and if they're running two instances of a particular service they can't mix up the publishing handles?
I guess the tradeoff is that it makes it less clear that they're spending a slot in their subscriber budget to do relaying, but I think that'd come up pretty quickly in testing, so it might be worth it?
There was a problem hiding this comment.
Hm I think you make a good point. I'll experiment and see if I can come up with a slightly better approach.
| // async fn process_notification_to_host(&self, espi: &mut espi::Espi<'_>, notification: &NotificationMsg) { | ||
| // espi.irq_push(notification.offset).await; | ||
| // info!("espi: Notification id {} sent to Host!", notification.offset); | ||
| // } |
There was a problem hiding this comment.
Full disclosure, I have never observed this code actually working :) When I got here I found that nobody was actually firing messages that wound up triggering this so I commented it out as-written when I got rid of NotificationMsg, but I have no idea if it's actually correct, might need to test
There was a problem hiding this comment.
Yeah it was actually working to an extent from sometime last year. At least, the VWire interrupt (or whatever it's actually called) was picked up by secure services and could be bubbled up to ec-test-app. There were just some weird issues with this I think causing some form of deadlock that I never fully resolved so will definitely need to revisit once we start making use of notifications from host-side apps.
| &'a self, | ||
| message: HostRequest, | ||
| ) -> impl core::future::Future<Output = HostResult> + Send + 'a { | ||
| ) -> impl core::future::Future<Output = HostResult> + 'a { |
There was a problem hiding this comment.
There was a problem hiding this comment.
Oh good point, thanks!
| ); | ||
| )+ | ||
|
|
||
| let result = core::future::poll_fn(|cx| { |
There was a problem hiding this comment.
This pin/poll_fn looks a bit odd to me - can't we just select over the list of subscribers?
There was a problem hiding this comment.
Yeah it does seem odd and I had the same hunch initially. The issue is the select macro is only provided upto like 5 I think? So if we have more than 5 services provided in the macro we'd need to use something like select_slice or select_array which I think is difficult because each subscriber is a different type and can't be stored easily together in a slice to the best of my knowledge.
| ) -> impl core::future::Future<Output = Self::ResultEnumType> + 'a; | ||
|
|
||
| /// Wait for a notification from any service and return the associated service ID. | ||
| fn wait_for_notification<'a>(&'a mut self) -> impl core::future::Future<Output = u8> + 'a; |
This RFC is another attempt at designing a notification system (see #741) that's hopefully a bit more closely aligned to what everyone's thinking. Additionally, tried to attempt a form of standardization for service message broadcasting based on some discussions I had offline with Billy.
At a high level, basically what I'm thinking is:
PubSubChannelI callSinglePublisherChannelthat only allows a singleDynPublisher(intended to be consumed by a single service) butSUBSnumber ofDynSubscribers. The dyn variants were used since the cost of dynamic dispatch seems minimal and this helps prevent needingCAPSandSUBSgenerics to bleed into everything using this broadcasting system.ServiceMessageenum type (e.g.TimeAlarmServiceMessage) that is used by the service's broadcasting channel.SinglePublisherChannelfor every service that wants to broadcast messages. Those services will have a field in their structs for aDynPublisherthat we pass into their constructor. Services that want to subscribe to another service will then need to update their structs to hold a field for a DynPublisher of each service they want to subscribe to (e.g. the power policy service might want to hold aDynSubscriber<TimeAlarmMessage>to receive timer expired messages).Then for notifications:
DynSubscriberfor each Service provided in the relay macro in the generated relay struct.wait_notificationmethod to theRelaytrait which the macro implements by waiting for a message to be received on any of theDynSubscribersit holds.RelayServiceTraitnow has ais_notificationmethod that each service needs to implement to match a message variant to a notification.I've updated the time-alarm-service and espi-service to show how these might be used and also updated the time-alarm example to show how it would all be integrated together. eSPI service had some little changes due to the fact wait_for_notification needs an
&mut selfbut will look into a way to make wait_for_notification take only an&self.@RobertZ2011 and @williampMSFT curious to see if this is closer to how you were envisioning message broadcasting. I've looked at the existing
broadcaster/immediate.rsto see if it could be reworked with intrusive list removed but to be honest, I don't know the context behind its design and it's not clear to me yet exactly what it is trying to solve over a rawPubSubChannelbroadcaster. But I'm open to seeing if we can make use of that instead.