Skip to main content

nros_node/executor/
handoff.rs

1//! Phase 104.E.3 — cross-priority handoff queue.
2//!
3//! Bridges that span priority boundaries (sub callback at
4//! priority A, pub callback at priority B) need a bounded
5//! handoff queue between the two so the high-priority sub
6//! doesn't block on the lower-priority pub's transport drain.
7//! The existing pattern is `Arc<Mutex<heapless::Vec<M, N>>>` +
8//! a timer-driven pub; this module wraps it in a small
9//! `Handoff<M, N>` type so bridge code stays terse.
10//!
11//! Optional sugar — the manual pattern remains supported. The
12//! spec (Phase 104.E.3) explicitly lists this as "optional"
13//! convenience to avoid forcing every bridge to use the same
14//! shape.
15//!
16//! ```ignore
17//! use nros_node::executor::handoff::Handoff;
18//! use std::sync::Arc;
19//!
20//! // Shared bounded queue, N = 32, message type M.
21//! let q: Arc<Handoff<MyMsg, 32>> = Arc::new(Handoff::new());
22//!
23//! // High-priority ingress: push into the queue inside the
24//! // sub callback. `push` is non-blocking — overflow returns
25//! // `Err(msg)` so the high-pri side never stalls.
26//! let q_pub = Arc::clone(&q);
27//! executor.register_subscription::<MyMsg, _>(topic, move |msg: &MyMsg| {
28//!     let _ = q_pub.push(msg.clone());  // drop on overflow
29//! })?;
30//!
31//! // Low-priority egress: timer drains the queue + publishes.
32//! let q_sub = Arc::clone(&q);
33//! let pub_out = ...;
34//! executor.register_timer(period, move || {
35//!     while let Some(msg) = q_sub.pop() {
36//!         let _ = pub_out.publish(&msg);
37//!     }
38//! })?;
39//! ```
40//!
41//! Cross-priority safety: every `push` / `pop` takes the
42//! internal mutex for the duration of one queue slot
43//! operation (O(1)). On PiCAS-aware dispatchers (Phase 110.F)
44//! the mutex inherits the holder's effective priority, so
45//! the low-pri drain doesn't priority-invert the high-pri
46//! push.
47//!
48//! `std`-gated for now — the `alloc`-only path needs a
49//! lock-free SPSC queue (heapless::spsc requires a `.split()`
50//! call that doesn't compose with Arc-sharing across
51//! callbacks). Tracked under follow-up if no_std bridges
52//! become a use case.
53
54#![cfg(feature = "std")]
55
56use std::sync::Mutex;
57
58use heapless::Vec;
59
60/// Bounded FIFO between two callbacks running on different
61/// `SchedContext`s. Generic over message type `M` (must be
62/// `Send` for cross-thread executors) and capacity `N`.
63///
64/// Constructed via [`Handoff::new`]; share between callbacks
65/// via `std::sync::Arc<Handoff<M, N>>`.
66#[derive(Debug)]
67pub struct Handoff<M, const N: usize> {
68    inner: Mutex<Vec<M, N>>,
69}
70
71impl<M, const N: usize> Default for Handoff<M, N> {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl<M, const N: usize> Handoff<M, N> {
78    /// Empty queue.
79    pub fn new() -> Self {
80        Self {
81            inner: Mutex::new(Vec::new()),
82        }
83    }
84
85    /// Push a message. Non-blocking on the high-priority side —
86    /// returns `Err(msg)` when the queue is full so the caller
87    /// can decide whether to drop, overwrite, or escalate.
88    /// O(1) under the internal mutex.
89    pub fn push(&self, msg: M) -> Result<(), M> {
90        let Ok(mut guard) = self.inner.lock() else {
91            return Err(msg);
92        };
93        guard.push(msg)
94    }
95
96    /// Pop one message. Returns `None` when the queue is empty.
97    /// O(N) under the internal mutex (shifts the tail of the
98    /// `heapless::Vec`); switch to a true ring buffer if the
99    /// dispatcher's bench shows this as a hotspot.
100    pub fn pop(&self) -> Option<M> {
101        let mut guard = self.inner.lock().ok()?;
102        if guard.is_empty() {
103            None
104        } else {
105            Some(guard.remove(0))
106        }
107    }
108
109    /// Current depth. Useful for monitoring + telemetry.
110    pub fn len(&self) -> usize {
111        self.inner.lock().map(|g| g.len()).unwrap_or(0)
112    }
113
114    /// True when empty.
115    pub fn is_empty(&self) -> bool {
116        self.len() == 0
117    }
118
119    /// True when at capacity. Caller's `push` will return
120    /// `Err(msg)` on the next call until a `pop` drains a slot.
121    pub fn is_full(&self) -> bool {
122        self.len() >= N
123    }
124
125    /// Compile-time capacity.
126    pub const fn capacity(&self) -> usize {
127        N
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn push_pop_fifo() {
137        let q: Handoff<i32, 4> = Handoff::new();
138        assert!(q.is_empty());
139        assert_eq!(q.capacity(), 4);
140        q.push(1).unwrap();
141        q.push(2).unwrap();
142        q.push(3).unwrap();
143        assert_eq!(q.len(), 3);
144        assert_eq!(q.pop(), Some(1));
145        assert_eq!(q.pop(), Some(2));
146        assert_eq!(q.pop(), Some(3));
147        assert_eq!(q.pop(), None);
148    }
149
150    #[test]
151    fn overflow_returns_err_msg() {
152        let q: Handoff<i32, 2> = Handoff::new();
153        q.push(1).unwrap();
154        q.push(2).unwrap();
155        assert!(q.is_full());
156        assert_eq!(q.push(3), Err(3));
157        // Drain + retry succeeds.
158        q.pop();
159        q.push(3).unwrap();
160        assert_eq!(q.len(), 2);
161    }
162
163    #[test]
164    fn shared_across_threads() {
165        use std::{sync::Arc, thread};
166
167        let q: Arc<Handoff<u32, 8>> = Arc::new(Handoff::new());
168        let q_prod = Arc::clone(&q);
169        let producer = thread::spawn(move || {
170            for i in 0..16u32 {
171                let _ = q_prod.push(i);
172                thread::sleep(std::time::Duration::from_micros(50));
173            }
174        });
175        let mut drained: Vec<u32, 32> = Vec::new();
176        for _ in 0..200 {
177            while let Some(v) = q.pop() {
178                let _ = drained.push(v);
179            }
180            thread::sleep(std::time::Duration::from_micros(100));
181        }
182        producer.join().unwrap();
183        // Best-effort drain — some pushes may have hit the
184        // bounded cap and returned Err. We assert at least
185        // SOME drained + monotonic order on whatever made it
186        // through.
187        assert!(!drained.is_empty());
188        for w in drained.windows(2) {
189            assert!(w[0] < w[1]);
190        }
191    }
192}