Skip to content

[RFC] Notification/Standard broadcasting v2#745

Draft
kurtjd wants to merge 1 commit intoOpenDevicePartnership:v0.2.0from
kurtjd:notification-v2
Draft

[RFC] Notification/Standard broadcasting v2#745
kurtjd wants to merge 1 commit intoOpenDevicePartnership:v0.2.0from
kurtjd:notification-v2

Conversation

@kurtjd
Copy link
Contributor

@kurtjd kurtjd commented Mar 11, 2026

This RFC is another attempt at designing a notification system (see #741) that's hopefully a bit more closely aligned to what everyone's thinking. Additionally, tried to attempt a form of standardization for service message broadcasting based on some discussions I had offline with Billy.

At a high level, basically what I'm thinking is:

  • Have a wrapper around a PubSubChannel I call SinglePublisherChannel that only allows a single DynPublisher (intended to be consumed by a single service) but SUBS number of DynSubscribers. The dyn variants were used since the cost of dynamic dispatch seems minimal and this helps prevent needing CAPS and SUBS generics to bleed into everything using this broadcasting system.
  • In the service-messages crates, add a new ServiceMessage enum type (e.g. TimeAlarmServiceMessage) that is used by the service's broadcasting channel.
  • During service setup on a platform, we create a new SinglePublisherChannel for every service that wants to broadcast messages. Those services will have a field in their structs for a DynPublisher that we pass into their constructor. Services that want to subscribe to another service will then need to update their structs to hold a field for a DynPublisher of each service they want to subscribe to (e.g. the power policy service might want to hold a DynSubscriber<TimeAlarmMessage> to receive timer expired messages).

Then for notifications:

  • We piggy back off the broadcasting system by similarly holding a DynSubscriber for each Service provided in the relay macro in the generated relay struct.
  • We then add a wait_notification method to the Relay trait which the macro implements by waiting for a message to be received on any of the DynSubscribers it holds.
  • To determine if any of these messages are a notification, the RelayServiceTrait now has a is_notification method that each service needs to implement to match a message variant to a notification.
  • This is just a bool since the relay service will discard the payload (if any). Instead, the macro now requires an additional notification id associated with each service. Basically, if the relay handler receives a notification, it discards the payload and returns the associated notification id for the service the notification id belongs to, and the relay service can then directly use this id without needing to know anything about which service produced it.

I've updated the time-alarm-service and espi-service to show how these might be used and also updated the time-alarm example to show how it would all be integrated together. eSPI service had some little changes due to the fact wait_for_notification needs an &mut self but will look into a way to make wait_for_notification take only an &self.

@RobertZ2011 and @williampMSFT curious to see if this is closer to how you were envisioning message broadcasting. I've looked at the existing broadcaster/immediate.rs to see if it could be reworked with intrusive list removed but to be honest, I don't know the context behind its design and it's not clear to me yet exactly what it is trying to solve over a raw PubSubChannel broadcaster. But I'm open to seeing if we can make use of that instead.

@kurtjd kurtjd self-assigned this Mar 11, 2026
Copilot AI review requested due to automatic review settings March 11, 2026 23:04
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This RFC introduces a v2 approach for service message broadcasting and relay notifications by standardizing on a single-publisher pubsub channel per service, adding per-service message types, and extending the relay handler macro/traits to wait for notifications across subscribed services.

Changes:

  • Add SinglePublisherChannel (wrapper over embassy_sync::PubSubChannel) and new per-service *Message types for broadcasting.
  • Introduce odp-service-common with a (control_handle, Runner) service pattern and spawn_service! helper macro; migrate time-alarm and eSPI services accordingly.
  • Extend relay traits + impl_odp_mctp_relay_handler! to accept per-service subscribers and implement wait_for_notification() using is_notification().

Reviewed changes

Copilot reviewed 25 out of 29 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
time-alarm-service/tests/tad_test.rs Updates tests to new (Service, Runner) init/run pattern (currently missing message_publisher init param).
time-alarm-service/src/task.rs Removes legacy “run the service task” wrapper.
time-alarm-service/src/lib.rs Refactors into Resources/Runner/control-handle + adds DynPublisher for broadcasting TimeAlarmMessage.
time-alarm-service/Cargo.toml Adds dependency on odp-service-common.
time-alarm-service-messages/src/lib.rs Adds TimeAlarmMessage enum for broadcast/notification classification.
thermal-service/src/lib.rs Adds ThermalMessage + implements is_notification/MessageType.
thermal-service-messages/src/lib.rs Adds ThermalMessage type.
odp-service-common/src/runnable_service.rs Introduces Service/ServiceRunner traits and spawn_service! macro.
odp-service-common/src/lib.rs New crate root exporting runnable_service.
odp-service-common/Cargo.toml New crate manifest for shared service infrastructure.
examples/rt685s-evk/src/bin/time_alarm.rs Updates example to create SinglePublisherChannel, spawn service via spawn_service!, and pass subscriber into relay handler.
examples/rt685s-evk/Cargo.toml Adds odp-service-common dependency.
examples/rt685s-evk/Cargo.lock Lockfile update reflecting new crate + dependency revisions.
examples/rt633/Cargo.lock Lockfile update reflecting dependency revisions.
espi-service/src/task.rs Legacy task helper signature adjusted but now stale vs new runner-based design.
espi-service/src/lib.rs Stops exporting task module; re-exports new implementation.
espi-service/src/espi_service.rs Migrates to Resources/Runner pattern and integrates wait_for_notification() into main select loop.
espi-service/Cargo.toml Replaces message-crate dependencies with odp-service-common.
embedded-service/src/relay/mod.rs Extends relay traits/macro: adds MessageType, is_notification, and wait_for_notification with per-service subscribers + notification ids.
embedded-service/src/lib.rs Re-exports embassy_sync in _macro_internal for macro expansion.
embedded-service/src/broadcaster/single_publisher.rs Adds SinglePublisherChannel wrapper and unit tests.
embedded-service/src/broadcaster/mod.rs Exposes new single_publisher broadcaster module.
docs/api-guidelines.md Adds repository-wide API guidelines documenting external resources + runner-based concurrency patterns.
debug-service/src/debug_service.rs Adds DebugMessage and relay MessageType/is_notification implementation.
debug-service-messages/src/lib.rs Adds DebugMessage type.
battery-service/src/lib.rs Adds BatteryMessage and relay MessageType/is_notification implementation.
battery-service-messages/src/lib.rs Adds BatteryMessage type.
Cargo.toml Adds odp-service-common to workspace members and workspace deps.
Cargo.lock Lockfile update reflecting new crate + dependency adjustments.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Copilot AI review requested due to automatic review settings March 12, 2026 17:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 15 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

embedded-service/src/relay/mod.rs:214

  • The macro docs/example for impl_odp_mctp_relay_handler! still describe new() as taking only service handler references, but the generated new() now also requires a DynSubscriber per service. Update the docs and example call so users can construct the relay handler correctly with subscribers.
    /// This macro will emit a type with the name you specify that is generic over a lifetime for the hardware (probably 'static in production code),
    /// implements the `RelayHandler` trait, and has a single constructor method `new` that takes as arguments references to the service handler
    /// types that you specify that have the 'hardware lifetime'.
    ///
    /// The macro takes the following inputs once:
    ///   relay_type_name: The name of the relay type to generate. This is arbitrary. The macro will emit a type with this name.
    ///
    /// Followed by a list of any number of service entries, which are specified by the following inputs:
    ///   service_name:            A name to assign to generated identifiers associated with the service, e.g. "Battery".
    ///                            This can be arbitrary.
    ///   service_id:              A unique u8 that addresses that service on the EC.
    ///   service_notification_id: A unique u8 identifying notifications from this service, distinct from service_id.
    ///   service_handler_type:    A type that implements the RelayServiceHandler trait, which will be used to process messages
    ///                            for this service.
    ///
    /// Example usage:
    ///
    /// ```ignore
    ///
    ///     impl_odp_mctp_relay_handler!(
    ///         MyRelayHanderType;
    ///         Battery,   0x9, 0, battery_service::Service<'static>;
    ///         TimeAlarm, 0xB, 1, time_alarm_service::Service<'static>;
    ///     );
    ///
    ///     let relay_handler = MyRelayHandlerType::new(battery_service_instance, time_alarm_service_instance);
    ///

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

//! Single-publisher broadcast channel.
//!
//! A wrapper around [`embassy_sync::pubsub::PubSubChannel`] that enforces exactly one publisher
//! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment says "any number of subscribers", but the channel is still limited by the SUBS const generic (maximum subscribers). Consider rewording to avoid implying it’s unbounded.

Suggested change
//! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting
//! with up to `SUBS` subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting

Copilot uses AI. Check for mistakes.
Comment on lines +520 to +540
$(
if let core::task::Poll::Ready(wait_result) = [<$service_name:snake _fut>].as_mut().poll(cx) {
match wait_result {
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => {
if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) {
return core::task::Poll::Ready(Some($service_notification_id));
} else {
return core::task::Poll::Ready(None);
}
}
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => {
// Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded.
// We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing,
// or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost.
$crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count);
return core::task::Poll::Ready(None);
}
}
}
)+
core::task::Poll::Pending
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_notification can starve later services: the poll loop returns immediately when the first polled subscriber becomes Ready, even if that message is not a notification (or is Lagged). If an earlier service publishes frequent non-notification messages, notifications from later services may never be observed. Consider polling all subscriber futures in the same poll_fn call and only returning Ready(None) after checking every service, or otherwise implementing fairness/round-robin so one noisy service can’t block others.

Suggested change
$(
if let core::task::Poll::Ready(wait_result) = [<$service_name:snake _fut>].as_mut().poll(cx) {
match wait_result {
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => {
if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) {
return core::task::Poll::Ready(Some($service_notification_id));
} else {
return core::task::Poll::Ready(None);
}
}
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => {
// Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded.
// We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing,
// or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost.
$crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count);
return core::task::Poll::Ready(None);
}
}
}
)+
core::task::Poll::Pending
// Track the first notification we see in this poll, and whether
// any futures are still pending or produced non-notification messages.
let mut notification_id: core::option::Option<u8> = core::option::Option::None;
let mut any_pending = false;
let mut saw_non_notification_or_lagged = false;
$(
match [<$service_name:snake _fut>].as_mut().poll(cx) {
core::task::Poll::Ready(wait_result) => {
match wait_result {
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Message(msg) => {
if <$service_handler_type as $crate::relay::mctp::RelayServiceHandler>::is_notification(&msg) {
notification_id = core::option::Option::Some($service_notification_id);
} else {
saw_non_notification_or_lagged = true;
}
}
$crate::_macro_internal::embassy_sync::pubsub::WaitResult::Lagged(count) => {
// Revisit: This can only happen if other services use a `publish_immediate` on their channel, which can result in older messages getting discarded.
// We really don't want notifications potentially getting lost, so we could change `SinglePublisherChannel` to not allow immediate publishing,
// or we could just keep the burden on services so they have the flexibility to use `publish_immediate` if they want to at the risk of their own notifications being lost.
$crate::error!("[Relay] {} subscriber lagged by {} messages, notifications may have been lost", stringify!($service_name), count);
saw_non_notification_or_lagged = true;
}
}
}
core::task::Poll::Pending => {
any_pending = true;
}
}
)+
if let core::option::Option::Some(id) = notification_id {
core::task::Poll::Ready(core::option::Option::Some(id))
} else if any_pending {
core::task::Poll::Pending
} else if saw_non_notification_or_lagged {
// At least one message was received, but none were notifications.
core::task::Poll::Ready(core::option::Option::None)
} else {
// No futures were pending and no messages were observed; be conservative.
core::task::Poll::Pending
}

Copilot uses AI. Check for mistakes.
@williampMSFT
Copy link
Contributor

Yeah, I like where this design is going :) I have a couple notes on implementation, will start posting those, but I think the broad shape of this is good


fn is_notification(message: &Self::MessageType) -> bool {
matches!(message, TimeAlarmMessage::TimerExpired)
}
Copy link
Contributor

@williampMSFT williampMSFT Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the specific case of the time-alarm service, notifications may not be consumed by the relay service - rather, I think there's some not-yet-written power management service that's going to need to listen to these and power on the device when the timer expires, so I think the correct implementation here might just be "return false"

[edit] actually, may want to do an explicit match and return false in all cases so future changes have to make an explicit decision if they add another variant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotchya, thanks. Threw this in as an example but can definitely change it so it will return false on existing variants.

#[derive(Clone, Copy, PartialEq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum TimeAlarmMessage {
TimerExpired,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimerExpired

We may want to have the specific timer as an associated type on this variant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Yeah if this overall design is agreed upon I can spend a bit more time coming up with more useful Service messages.

/// Message type for the Thermal service.
#[derive(Clone, Copy, PartialEq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ThermalMessage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for future extensibility, it may be better to declare an enum with a single variant now and match on it in the is_notification method; that way, if someone adds a new variant, they have to make an explicit decision on whether or not it should trigger a notification, or else the is_notification implementation won't compile because not all variants are matched

embedded_services::init().await;
info!("services initialized");

// All services will basically need to create a SinglePublisherChannel for themsevles.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

themsevles

nit: sp: "themselves"

//!
//! A wrapper around [`embassy_sync::pubsub::PubSubChannel`] that enforces exactly one publisher
//! with any number of subscribers. The inner channel is hardcoded with `PUBS = 1`, so attempting
//! to create a second publisher returns [`embassy_sync::pubsub::Error::MaximumPublishersReached`].
Copy link
Contributor

@williampMSFT williampMSFT Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to do some sort of type-state thing to enforce this? That way we can catch misuse at compile time

Copy link
Contributor Author

@kurtjd kurtjd Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking about this since you mentioned it the other day but couldn't really think of a good way to do that in practice. Well I suppose when we call publisher() it could consume self and return a tuple (the Channel in a new type state, as well as the DynPublisher). Maybe something like:

struct PublisherExists;
struct NoPublisherExists;

pub struct SinglePublisherChannel<MODE, T: Clone, const CAP: usize, const SUBS: usize> {
    inner: PubSubChannel<GlobalRawMutex, T, CAP, SUBS, 1>,
    _phantom: PhantomData<MODE>,
}

impl<T: Clone, const CAP: usize, const SUBS: usize> SinglePublisherChannel<NoPublisherExists, T, CAP, SUBS> {
    /// Obtain the single publisher for this channel.
    pub fn publisher(self) -> (SinglePublisherChannel<PublisherExists, T, CAP, SUBS>, DynPublisher<'_, T>) {
        let new_channel = SinglePublisherChannel { self.inner, _phantom: PhantomData };
        (new_channel, new_channel.inner.dyn_publisher())
    }
}

Probably some wrong syntax there since writing it off the top of my head but to give you an idea of an approach.

let _relay_handler = EspiRelayHandler::new(&time_service);
// Here we pass a subscriber into the relay handler so it can listen for notifications from the time alarm service
// and then relay those to the host SoC over eSPI when they occur
let _relay_handler = EspiRelayHandler::new(&time_service, time_alarm_subscriber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&time_service, time_alarm_subscriber

I haven't fully thought this out yet, but at first glance it seems like it might be nice for ergonomics to have part of the relayable trait be 'give me a subscriber handle'? That way the user doesn't have to pass both foo_service and foo_service_subscriber explicitly and if they're running two instances of a particular service they can't mix up the publishing handles?

I guess the tradeoff is that it makes it less clear that they're spending a slot in their subscriber budget to do relaying, but I think that'd come up pretty quickly in testing, so it might be worth it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I think you make a good point. I'll experiment and see if I can come up with a slightly better approach.

// async fn process_notification_to_host(&self, espi: &mut espi::Espi<'_>, notification: &NotificationMsg) {
// espi.irq_push(notification.offset).await;
// info!("espi: Notification id {} sent to Host!", notification.offset);
// }
Copy link
Contributor

@williampMSFT williampMSFT Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full disclosure, I have never observed this code actually working :) When I got here I found that nobody was actually firing messages that wound up triggering this so I commented it out as-written when I got rid of NotificationMsg, but I have no idea if it's actually correct, might need to test

Copy link
Contributor Author

@kurtjd kurtjd Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it was actually working to an extent from sometime last year. At least, the VWire interrupt (or whatever it's actually called) was picked up by secure services and could be bubbled up to ec-test-app. There were just some weird issues with this I think causing some form of deadlock that I never fully resolved so will definitely need to revisit once we start making use of notifications from host-side apps.

&'a self,
message: HostRequest,
) -> impl core::future::Future<Output = HostResult> + Send + 'a {
) -> impl core::future::Future<Output = HostResult> + 'a {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl core::future::Future<Output = HostResult> + 'a

nit: I think if you get rid of the Send bound, you can actually replace this with just async fn process_request(...) -> HostResult (ditto for wait_for_notification). The lint is only on the trait definition, not on the trait impl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point, thanks!

);
)+

let result = core::future::poll_fn(|cx| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pin/poll_fn looks a bit odd to me - can't we just select over the list of subscribers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does seem odd and I had the same hunch initially. The issue is the select macro is only provided upto like 5 I think? So if we have more than 5 services provided in the macro we'd need to use something like select_slice or select_array which I think is difficult because each subscriber is a different type and can't be stored easily together in a slice to the best of my knowledge.

) -> impl core::future::Future<Output = Self::ResultEnumType> + 'a;

/// Wait for a notification from any service and return the associated service ID.
fn wait_for_notification<'a>(&'a mut self) -> impl core::future::Future<Output = u8> + 'a;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&'a mut self

I think the mut-ness self on this and process_request should probably agree? I think if we want to commit to always being single-tasked then probably make them both mut and we can revisit multi-task support if we ever decide we need it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants