nros_node/executor/handoff.rs
1//! Phase 104.E.3 — cross-priority handoff queue.
2//!
3//! Bridges that span priority boundaries (sub callback at
4//! priority A, pub callback at priority B) need a bounded
5//! handoff queue between the two so the high-priority sub
6//! doesn't block on the lower-priority pub's transport drain.
7//! The existing pattern is `Arc<Mutex<heapless::Vec<M, N>>>` +
8//! a timer-driven pub; this module wraps it in a small
9//! `Handoff<M, N>` type so bridge code stays terse.
10//!
11//! Optional sugar — the manual pattern remains supported. The
12//! spec (Phase 104.E.3) explicitly lists this as "optional"
13//! convenience to avoid forcing every bridge to use the same
14//! shape.
15//!
16//! ```ignore
17//! use nros_node::executor::handoff::Handoff;
18//! use std::sync::Arc;
19//!
20//! // Shared bounded queue, N = 32, message type M.
21//! let q: Arc<Handoff<MyMsg, 32>> = Arc::new(Handoff::new());
22//!
23//! // High-priority ingress: push into the queue inside the
24//! // sub callback. `push` is non-blocking — overflow returns
25//! // `Err(msg)` so the high-pri side never stalls.
26//! let q_pub = Arc::clone(&q);
27//! executor.register_subscription::<MyMsg, _>(topic, move |msg: &MyMsg| {
28//! let _ = q_pub.push(msg.clone()); // drop on overflow
29//! })?;
30//!
31//! // Low-priority egress: timer drains the queue + publishes.
32//! let q_sub = Arc::clone(&q);
33//! let pub_out = ...;
34//! executor.register_timer(period, move || {
35//! while let Some(msg) = q_sub.pop() {
36//! let _ = pub_out.publish(&msg);
37//! }
38//! })?;
39//! ```
40//!
41//! Cross-priority safety: every `push` / `pop` takes the
42//! internal mutex for the duration of one queue slot
43//! operation (O(1)). On PiCAS-aware dispatchers (Phase 110.F)
44//! the mutex inherits the holder's effective priority, so
45//! the low-pri drain doesn't priority-invert the high-pri
46//! push.
47//!
48//! `std`-gated for now — the `alloc`-only path needs a
49//! lock-free SPSC queue (heapless::spsc requires a `.split()`
50//! call that doesn't compose with Arc-sharing across
51//! callbacks). Tracked under follow-up if no_std bridges
52//! become a use case.
53
54#![cfg(feature = "std")]
55
56use std::sync::Mutex;
57
58use heapless::Vec;
59
60/// Bounded FIFO between two callbacks running on different
61/// `SchedContext`s. Generic over message type `M` (must be
62/// `Send` for cross-thread executors) and capacity `N`.
63///
64/// Constructed via [`Handoff::new`]; share between callbacks
65/// via `std::sync::Arc<Handoff<M, N>>`.
66#[derive(Debug)]
67pub struct Handoff<M, const N: usize> {
68 inner: Mutex<Vec<M, N>>,
69}
70
71impl<M, const N: usize> Default for Handoff<M, N> {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl<M, const N: usize> Handoff<M, N> {
78 /// Empty queue.
79 pub fn new() -> Self {
80 Self {
81 inner: Mutex::new(Vec::new()),
82 }
83 }
84
85 /// Push a message. Non-blocking on the high-priority side —
86 /// returns `Err(msg)` when the queue is full so the caller
87 /// can decide whether to drop, overwrite, or escalate.
88 /// O(1) under the internal mutex.
89 pub fn push(&self, msg: M) -> Result<(), M> {
90 let Ok(mut guard) = self.inner.lock() else {
91 return Err(msg);
92 };
93 guard.push(msg)
94 }
95
96 /// Pop one message. Returns `None` when the queue is empty.
97 /// O(N) under the internal mutex (shifts the tail of the
98 /// `heapless::Vec`); switch to a true ring buffer if the
99 /// dispatcher's bench shows this as a hotspot.
100 pub fn pop(&self) -> Option<M> {
101 let mut guard = self.inner.lock().ok()?;
102 if guard.is_empty() {
103 None
104 } else {
105 Some(guard.remove(0))
106 }
107 }
108
109 /// Current depth. Useful for monitoring + telemetry.
110 pub fn len(&self) -> usize {
111 self.inner.lock().map(|g| g.len()).unwrap_or(0)
112 }
113
114 /// True when empty.
115 pub fn is_empty(&self) -> bool {
116 self.len() == 0
117 }
118
119 /// True when at capacity. Caller's `push` will return
120 /// `Err(msg)` on the next call until a `pop` drains a slot.
121 pub fn is_full(&self) -> bool {
122 self.len() >= N
123 }
124
125 /// Compile-time capacity.
126 pub const fn capacity(&self) -> usize {
127 N
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn push_pop_fifo() {
137 let q: Handoff<i32, 4> = Handoff::new();
138 assert!(q.is_empty());
139 assert_eq!(q.capacity(), 4);
140 q.push(1).unwrap();
141 q.push(2).unwrap();
142 q.push(3).unwrap();
143 assert_eq!(q.len(), 3);
144 assert_eq!(q.pop(), Some(1));
145 assert_eq!(q.pop(), Some(2));
146 assert_eq!(q.pop(), Some(3));
147 assert_eq!(q.pop(), None);
148 }
149
150 #[test]
151 fn overflow_returns_err_msg() {
152 let q: Handoff<i32, 2> = Handoff::new();
153 q.push(1).unwrap();
154 q.push(2).unwrap();
155 assert!(q.is_full());
156 assert_eq!(q.push(3), Err(3));
157 // Drain + retry succeeds.
158 q.pop();
159 q.push(3).unwrap();
160 assert_eq!(q.len(), 2);
161 }
162
163 #[test]
164 fn shared_across_threads() {
165 use std::{sync::Arc, thread};
166
167 let q: Arc<Handoff<u32, 8>> = Arc::new(Handoff::new());
168 let q_prod = Arc::clone(&q);
169 let producer = thread::spawn(move || {
170 for i in 0..16u32 {
171 let _ = q_prod.push(i);
172 thread::sleep(std::time::Duration::from_micros(50));
173 }
174 });
175 let mut drained: Vec<u32, 32> = Vec::new();
176 for _ in 0..200 {
177 while let Some(v) = q.pop() {
178 let _ = drained.push(v);
179 }
180 thread::sleep(std::time::Duration::from_micros(100));
181 }
182 producer.join().unwrap();
183 // Best-effort drain — some pushes may have hit the
184 // bounded cap and returned Err. We assert at least
185 // SOME drained + monotonic order on whatever made it
186 // through.
187 assert!(!drained.is_empty());
188 for w in drained.windows(2) {
189 assert!(w[0] < w[1]);
190 }
191 }
192}