Skip to main content

nros_node/executor/
handles.rs

1//! Entity wrapper types for the embedded executor.
2
3use core::marker::PhantomData;
4
5use nros_core::{CdrReader, CdrWriter, Deserialize, RosAction, RosMessage, RosService, Serialize};
6use nros_rmw::{Publisher, ServiceClientTrait, ServiceServerTrait, Subscriber, TransportError};
7
8use crate::session;
9
10use super::types::{DEFAULT_TX_BUF, NodeError};
11
12/// Default polling interval (ms) for sync wait loops.
13const DEFAULT_SPIN_INTERVAL_MS: u64 = 10;
14
15/// Check whether the given budget has been exhausted.
16///
17/// `std` builds measure wall-clock against `Instant::now()`; `no_std`
18/// builds count iterations and exhaust after `max_iterations` calls.
19///
20/// **Phase 89.8**: the plain `max_iters` approach is insufficient on
21/// multi-threaded zpico backends (POSIX/Zephyr/NuttX). There,
22/// `executor.spin_once(10ms)` waits on a condvar that zenoh-pico's
23/// background tasks signal on any inbound frame (keep-alives,
24/// discovery gossip, interest messages). Each signal returns the
25/// spin well before the 10 ms budget, so a nominal
26/// `1000 × 10 ms = 10 s` iteration count collapses to milliseconds
27/// of real time and the wait returns `Timeout` long before the
28/// awaited reply can arrive.
29///
30/// Same class of bug 89.2 fixed in `nros-c`'s blocking service call
31/// and 89.3 fixed in `nros-cpp`'s action-client helpers. The
32/// maintainer explicitly flagged this `Promise::wait` / `wait_next`
33/// path in the 89.2 commit: *"Promise::wait in nros-node has the
34/// same structural bug but currently passes all tests. Left on
35/// max_spins until a test surfaces it."* The NuttX Rust action
36/// E2E is that test.
37struct WaitBudget {
38    #[cfg(feature = "std")]
39    deadline: std::time::Instant,
40    // Pure no_std fallback: iteration-count budget (no wall clock).
41    #[cfg(not(feature = "std"))]
42    remaining: u64,
43}
44
45impl WaitBudget {
46    fn new(_max_iterations: u64, _timeout: core::time::Duration) -> Self {
47        #[cfg(feature = "std")]
48        {
49            Self {
50                deadline: std::time::Instant::now() + _timeout,
51            }
52        }
53        #[cfg(not(feature = "std"))]
54        {
55            Self {
56                remaining: _max_iterations,
57            }
58        }
59    }
60
61    fn tick(&mut self) -> bool {
62        #[cfg(feature = "std")]
63        {
64            std::time::Instant::now() < self.deadline
65        }
66        #[cfg(not(feature = "std"))]
67        {
68            if self.remaining == 0 {
69                false
70            } else {
71                self.remaining -= 1;
72                true
73            }
74        }
75    }
76}
77
78/// UUID byte count in a ROS 2 GoalId.
79///
80/// CDR encoding: a fixed `uint8[16]` array — ROS 2 `unique_identifier_msgs/UUID`
81/// — with **no** length prefix (fixed arrays are unprefixed in CDR).
82const GOAL_UUID_SIZE: usize = 16;
83
84// ============================================================================
85// EmbeddedPublisher
86// ============================================================================
87
88/// Typed publisher handle.
89///
90/// Two methods, both byte-oriented at the wire:
91///
92/// - [`publish`](Self::publish) / [`publish_with_buffer`](Self::publish_with_buffer)
93///   — accept `&M: RosMessage`, CDR-encode into a stack buffer, then
94///   call [`Publisher::publish_raw`](nros_rmw::Publisher::publish_raw).
95/// - [`publish_raw`](Self::publish_raw) — accepts pre-encoded CDR bytes
96///   for callers that already produced the wire payload.
97///
98/// **No typed `loan()` exists.** Loan/borrow live exclusively on
99/// [`EmbeddedRawPublisher`] / [`RawSubscription`]. `try_loan(len)`
100/// requires the byte length up front, which CDR ser/de can only
101/// discover after encoding — the two APIs are incompatible by
102/// construction. See `docs/design/0010-zero-copy-raw-api.md` decision D7.
103pub struct EmbeddedPublisher<M> {
104    pub(crate) handle: session::RmwPublisher,
105    /// Phase 108 — registered event closures kept alive for the
106    /// publisher's lifetime; freed in `Drop`.
107    pub(crate) event_regs: EventRegs,
108    pub(crate) _phantom: PhantomData<M>,
109}
110
111impl<M> Drop for EmbeddedPublisher<M> {
112    fn drop(&mut self) {
113        drop_event_regs(&mut self.event_regs);
114    }
115}
116
117impl<M: RosMessage> EmbeddedPublisher<M> {
118    /// Publish a message using the default buffer size.
119    pub fn publish(&self, msg: &M) -> Result<(), NodeError> {
120        self.publish_with_buffer::<DEFAULT_TX_BUF>(msg)
121    }
122
123    /// Publish a message with a custom buffer size.
124    pub fn publish_with_buffer<const BUF: usize>(&self, msg: &M) -> Result<(), NodeError> {
125        let mut buffer = [0u8; BUF];
126        let mut writer =
127            CdrWriter::new_with_header(&mut buffer).map_err(|_| NodeError::BufferTooSmall)?;
128        msg.serialize(&mut writer)
129            .map_err(|_| NodeError::Serialization)?;
130        let len = writer.position();
131        self.handle
132            .publish_raw(&buffer[..len])
133            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
134    }
135
136    /// Publish raw CDR-encoded data (must include CDR header).
137    pub fn publish_raw(&self, data: &[u8]) -> Result<(), NodeError> {
138        self.handle
139            .publish_raw(data)
140            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
141    }
142
143    /// Phase 124.E.1 — streamed publish. Use a closure-driven writer
144    /// to serialise straight into the backend's outbound buffer
145    /// without a per-publisher staging copy. Saves on RAM-constrained
146    /// nodes that publish multi-KB payloads.
147    ///
148    /// The `writer` closure receives a [`StreamWriter`] mutable
149    /// reference and uses [`StreamWriter::write`] / [`extend`] /
150    /// [`reserved_len`] to fill the slot in chunks. The total
151    /// payload length must be declared up-front via
152    /// [`StreamWriter::reserve_total`]; the backend allocates that
153    /// many bytes in its outbound buffer before any chunks land.
154    ///
155    /// Backends without a native stream slot fall through to a
156    /// stack-allocated staging buffer (capped at 4 KiB) + a single
157    /// `publish_raw` — same observable result, just no zero-staging
158    /// win for the big-message case.
159    pub fn publish_streamed<F>(&self, total_len: usize, writer: F) -> Result<(), NodeError>
160    where
161        F: FnMut(&mut [u8]) -> usize,
162    {
163        // Wrap the closure in a `*mut c_void` so it survives the
164        // crossing into the C callback contract the vtable expects.
165        // The closure is consumed by reference, so the lifetime is
166        // bounded by this function's frame — no escape.
167        use nros_rmw::Publisher;
168        struct Ctx<W> {
169            writer: W,
170            total: usize,
171        }
172        unsafe extern "C" fn size_cb<W>(
173            out_total_len: *mut usize,
174            user_ctx: *mut core::ffi::c_void,
175        ) {
176            unsafe {
177                let ctx = &*(user_ctx as *const Ctx<W>);
178                *out_total_len = ctx.total;
179            }
180        }
181        unsafe extern "C" fn chunk_cb<W: FnMut(&mut [u8]) -> usize>(
182            out_buf: *mut u8,
183            cap: usize,
184            out_written: *mut usize,
185            user_ctx: *mut core::ffi::c_void,
186        ) {
187            unsafe {
188                let ctx = &mut *(user_ctx as *mut Ctx<W>);
189                let slot = core::slice::from_raw_parts_mut(out_buf, cap);
190                let n = (ctx.writer)(slot);
191                *out_written = n;
192            }
193        }
194        let mut ctx = Ctx {
195            writer,
196            total: total_len,
197        };
198        let ctx_ptr = &mut ctx as *mut Ctx<F> as *mut core::ffi::c_void;
199        // SAFETY: `ctx` lives until this call returns, and both callbacks
200        // only cast `user_ctx` back to that stack-local `Ctx<F>`.
201        unsafe {
202            self.handle
203                .publish_streamed(size_cb::<F>, chunk_cb::<F>, ctx_ptr)
204        }
205        .map_err(NodeError::Transport)
206    }
207
208    /// Phase 108.B — manually assert this publisher's liveliness.
209    /// Required for publishers configured with
210    /// [`QosLivelinessPolicy::ManualByTopic`] /
211    /// [`QosLivelinessPolicy::ManualByNode`]. No-op for AUTOMATIC /
212    /// NONE kinds. Returns `Err(Unsupported)` if the backend doesn't
213    /// implement manual liveliness.
214    pub fn assert_liveliness(&self) -> Result<(), NodeError> {
215        use nros_rmw::Publisher as _;
216        self.handle
217            .assert_liveliness()
218            .map_err(NodeError::Transport)
219    }
220
221    // ====================================================================
222    // Phase 108 — status events
223    // ====================================================================
224    //
225    // Publisher-side: `LivelinessLost` and `OfferedDeadlineMissed`.
226    // Returns `NodeError::Transport(TransportError::Unsupported)` if
227    // the active backend doesn't generate the event for this entity.
228
229    /// `true` if the active backend can fire the named event for this
230    /// publisher.
231    #[cfg(feature = "alloc")]
232    pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
233        use nros_rmw::Publisher as _;
234        self.handle.supports_event(kind)
235    }
236
237    /// Register a callback for `LivelinessLost`. Fires when this
238    /// publisher misses its own liveliness assertion deadline.
239    #[cfg(feature = "alloc")]
240    pub fn on_liveliness_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
241    where
242        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
243    {
244        register_pub_event::<F, _>(
245            &mut self.handle,
246            &mut self.event_regs,
247            nros_rmw::EventKind::LivelinessLost,
248            0,
249            cb,
250            |payload, f| {
251                if let nros_rmw::EventPayload::LivelinessLost(s) = payload {
252                    f(*s);
253                }
254            },
255        )
256    }
257
258    /// Register a callback for `OfferedDeadlineMissed`. Fires when
259    /// this publisher promised `deadline` and falls behind.
260    #[cfg(feature = "alloc")]
261    pub fn on_offered_deadline_missed<F>(
262        &mut self,
263        deadline: core::time::Duration,
264        cb: F,
265    ) -> Result<(), NodeError>
266    where
267        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
268    {
269        register_pub_event::<F, _>(
270            &mut self.handle,
271            &mut self.event_regs,
272            nros_rmw::EventKind::OfferedDeadlineMissed,
273            deadline.as_millis().min(u32::MAX as u128) as u32,
274            cb,
275            |payload, f| {
276                if let nros_rmw::EventPayload::OfferedDeadlineMissed(s) = payload {
277                    f(*s);
278                }
279            },
280        )
281    }
282}
283
284/// Cap on registered event callbacks per entity. Subscribers can hold
285/// up to 3 (LivelinessChanged + RequestedDeadlineMissed + MessageLost);
286/// publishers up to 2 (LivelinessLost + OfferedDeadlineMissed). One vec
287/// type fits both — extra slots are unused on publishers.
288#[cfg(feature = "alloc")]
289pub(crate) const MAX_EVENTS_PER_ENTITY: usize = 3;
290
291/// One row of the per-entity event-callback registry. Stores enough to
292/// type-erase the boxed closure for `Drop`-time deallocation.
293#[cfg(feature = "alloc")]
294#[derive(Clone, Copy)]
295pub(crate) struct EventReg {
296    /// `Box::into_raw`-derived pointer; valid for the entity's lifetime.
297    pub(crate) ctx: *mut core::ffi::c_void,
298    /// Type-erased destructor. Calls `Box::from_raw` w/ the originating
299    /// monomorphic type, dropping the closure + freeing the heap slot.
300    pub(crate) drop_fn: unsafe fn(*mut core::ffi::c_void),
301}
302
303#[cfg(feature = "alloc")]
304pub(crate) type EventRegs = heapless::Vec<EventReg, MAX_EVENTS_PER_ENTITY>;
305
306/// Empty placeholder for no-alloc builds — keeps struct layout stable
307/// across feature combinations without paying any space.
308#[cfg(not(feature = "alloc"))]
309#[derive(Default, Clone, Copy)]
310pub(crate) struct EventRegs;
311
312/// Empty initial value for the `event_regs` field. Selected at compile
313/// time so call sites are uniform across feature combinations
314/// (`heapless::Vec::new()` for `alloc`; unit-struct constructor
315/// otherwise — clippy's `default_constructed_unit_structs` lint
316/// rejects the `EventRegs::default()` form on the unit-struct branch).
317#[cfg(feature = "alloc")]
318#[inline]
319pub(crate) fn empty_event_regs() -> EventRegs {
320    heapless::Vec::new()
321}
322#[cfg(not(feature = "alloc"))]
323#[inline]
324pub(crate) fn empty_event_regs() -> EventRegs {
325    EventRegs
326}
327
328#[cfg(feature = "alloc")]
329pub(crate) fn drop_event_regs(regs: &mut EventRegs) {
330    while let Some(reg) = regs.pop() {
331        // SAFETY: `reg.ctx` was obtained from `Box::into_raw` of the
332        // monomorphic type that `reg.drop_fn` knows about. Each reg is
333        // visited exactly once because we drain via `pop`.
334        unsafe { (reg.drop_fn)(reg.ctx) };
335    }
336}
337
338#[cfg(not(feature = "alloc"))]
339#[inline]
340pub(crate) fn drop_event_regs(_regs: &mut EventRegs) {}
341
342#[cfg(feature = "alloc")]
343fn register_pub_event<F, D>(
344    handle: &mut session::RmwPublisher,
345    regs: &mut EventRegs,
346    kind: nros_rmw::EventKind,
347    deadline_ms: u32,
348    user_cb: F,
349    dispatch: D,
350) -> Result<(), NodeError>
351where
352    F: FnMut(nros_rmw::CountStatus) + Send + 'static,
353    D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
354{
355    use nros_rmw::Publisher as _;
356    if regs.is_full() {
357        return Err(NodeError::Transport(TransportError::Unsupported));
358    }
359    let state = alloc::boxed::Box::new(EventClosureState { user_cb, dispatch });
360    let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
361    // SAFETY: trampoline downcasts `user_ctx` back to the boxed
362    // EventClosureState. Box ownership is recorded in `regs`; entity
363    // Drop walks the registry and frees via `drop_event_state::<F, D>`.
364    let res = unsafe {
365        handle.register_event_callback(kind, deadline_ms, event_trampoline::<F, D>, user_ctx)
366    };
367    match res {
368        Ok(()) => {
369            // is_full check above guarantees push() succeeds.
370            let _ = regs.push(EventReg {
371                ctx: user_ctx,
372                drop_fn: drop_event_state::<F, D>,
373            });
374            Ok(())
375        }
376        Err(e) => {
377            // SAFETY: backend rejected the registration; reclaim the
378            // box we just leaked into raw form.
379            unsafe {
380                drop(alloc::boxed::Box::from_raw(
381                    user_ctx as *mut EventClosureState<F, D>,
382                ));
383            }
384            Err(NodeError::Transport(e))
385        }
386    }
387}
388
389#[cfg(feature = "alloc")]
390struct EventClosureState<F, D> {
391    user_cb: F,
392    dispatch: D,
393}
394
395#[cfg(feature = "alloc")]
396unsafe extern "C" fn event_trampoline<F, D>(
397    kind: nros_rmw::EventKind,
398    payload_ptr: *const core::ffi::c_void,
399    user_ctx: *mut core::ffi::c_void,
400) where
401    F: FnMut(nros_rmw::CountStatus) + Send + 'static,
402    D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
403{
404    let state = unsafe { &mut *(user_ctx as *mut EventClosureState<F, D>) };
405    let payload = unsafe { nros_rmw::payload_from_raw(kind, payload_ptr) };
406    (state.dispatch)(payload, &mut state.user_cb);
407}
408
409#[cfg(feature = "alloc")]
410unsafe fn drop_event_state<F, D>(ctx: *mut core::ffi::c_void)
411where
412    F: FnMut(nros_rmw::CountStatus) + Send + 'static,
413    D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
414{
415    // SAFETY: caller guarantees `ctx` was obtained from
416    // `Box::into_raw::<EventClosureState<F, D>>` and not yet freed.
417    unsafe {
418        drop(alloc::boxed::Box::from_raw(
419            ctx as *mut EventClosureState<F, D>,
420        ));
421    }
422}
423
424#[cfg(feature = "alloc")]
425fn register_sub_event_count<F, D>(
426    handle: &mut session::RmwSubscriber,
427    regs: &mut EventRegs,
428    kind: nros_rmw::EventKind,
429    deadline_ms: u32,
430    user_cb: F,
431    dispatch: D,
432) -> Result<(), NodeError>
433where
434    F: FnMut(nros_rmw::CountStatus) + Send + 'static,
435    D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
436{
437    use nros_rmw::Subscriber as _;
438    if regs.is_full() {
439        return Err(NodeError::Transport(TransportError::Unsupported));
440    }
441    let state = alloc::boxed::Box::new(EventClosureState { user_cb, dispatch });
442    let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
443    let res = unsafe {
444        handle.register_event_callback(kind, deadline_ms, event_trampoline::<F, D>, user_ctx)
445    };
446    match res {
447        Ok(()) => {
448            let _ = regs.push(EventReg {
449                ctx: user_ctx,
450                drop_fn: drop_event_state::<F, D>,
451            });
452            Ok(())
453        }
454        Err(e) => {
455            unsafe {
456                drop(alloc::boxed::Box::from_raw(
457                    user_ctx as *mut EventClosureState<F, D>,
458                ));
459            }
460            Err(NodeError::Transport(e))
461        }
462    }
463}
464
465#[cfg(feature = "alloc")]
466fn register_sub_event_liveliness<F>(
467    handle: &mut session::RmwSubscriber,
468    regs: &mut EventRegs,
469    user_cb: F,
470) -> Result<(), NodeError>
471where
472    F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
473{
474    use nros_rmw::Subscriber as _;
475    if regs.is_full() {
476        return Err(NodeError::Transport(TransportError::Unsupported));
477    }
478    let state = alloc::boxed::Box::new(LivelinessClosureState { user_cb });
479    let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
480    let res = unsafe {
481        handle.register_event_callback(
482            nros_rmw::EventKind::LivelinessChanged,
483            0,
484            liveliness_trampoline::<F>,
485            user_ctx,
486        )
487    };
488    match res {
489        Ok(()) => {
490            let _ = regs.push(EventReg {
491                ctx: user_ctx,
492                drop_fn: drop_liveliness_state::<F>,
493            });
494            Ok(())
495        }
496        Err(e) => {
497            unsafe {
498                drop(alloc::boxed::Box::from_raw(
499                    user_ctx as *mut LivelinessClosureState<F>,
500                ));
501            }
502            Err(NodeError::Transport(e))
503        }
504    }
505}
506
507#[cfg(feature = "alloc")]
508unsafe fn drop_liveliness_state<F>(ctx: *mut core::ffi::c_void)
509where
510    F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
511{
512    unsafe {
513        drop(alloc::boxed::Box::from_raw(
514            ctx as *mut LivelinessClosureState<F>,
515        ));
516    }
517}
518
519#[cfg(feature = "alloc")]
520struct LivelinessClosureState<F> {
521    user_cb: F,
522}
523
524#[cfg(feature = "alloc")]
525unsafe extern "C" fn liveliness_trampoline<F>(
526    kind: nros_rmw::EventKind,
527    payload_ptr: *const core::ffi::c_void,
528    user_ctx: *mut core::ffi::c_void,
529) where
530    F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
531{
532    let state = unsafe { &mut *(user_ctx as *mut LivelinessClosureState<F>) };
533    let payload = unsafe { nros_rmw::payload_from_raw(kind, payload_ptr) };
534    if let nros_rmw::EventPayload::LivelinessChanged(s) = payload {
535        (state.user_cb)(*s);
536    }
537}
538
539// ============================================================================
540// EmbeddedRawPublisher — typeless publisher for non-ROS message wire formats
541// ============================================================================
542
543/// Default size of each per-publisher arena slot, in bytes.
544pub const DEFAULT_LOAN_BUF: usize = 1024;
545
546use core::cell::UnsafeCell;
547// portable-atomic AtomicBool — resolves to native on targets that support it,
548// software fallback on those that don't (e.g. some Xtensa ESP32 SoCs). Use
549// portable-atomic's Ordering too so the type sees the matching trait
550// implementation across all targets.
551use portable_atomic::{AtomicBool, Ordering};
552
553/// Typeless publisher handle. Use when the wire format is not ROS CDR
554/// (e.g. PX4 uORB raw POD bytes, custom binary protocols).
555///
556/// Two publish paths:
557///
558/// - [`publish_raw`](Self::publish_raw): user supplies a `&[u8]`, backend
559///   memcpys into its outbound buffer. One copy.
560/// - [`try_loan`](Self::try_loan): backend (or per-publisher arena fallback)
561///   hands user a `&mut [u8]` slice. User writes in place. [`PublishLoan::commit`]
562///   triggers the wire write. Zero-copy on backends with native lending
563///   (Phase 99: zenoh-pico `unstable-zenoh-api`, XRCE-DDS); single-memcpy
564///   fallback on backends without (uORB).
565///
566/// The const-generic `TX_BUF` sizes the inline arena slot (default
567/// [`DEFAULT_LOAN_BUF`]). Loans larger than `TX_BUF` return
568/// `LoanError::TooLarge`.
569pub struct EmbeddedRawPublisher<const TX_BUF: usize = DEFAULT_LOAN_BUF> {
570    pub(crate) handle: session::RmwPublisher,
571    /// Single-slot arena: writable buffer + busy flag. SLOTS=1 in v1
572    /// (concurrent loans on the same publisher return WouldBlock).
573    /// Unused when the `rmw-lending` feature is on — `try_loan`
574    /// dispatches to the backend's `SlotLending` instead.
575    #[allow(dead_code)]
576    pub(crate) arena: TxArena<TX_BUF>,
577    /// Phase 108 — registered event closures.
578    pub(crate) event_regs: EventRegs,
579}
580
581impl<const TX_BUF: usize> Drop for EmbeddedRawPublisher<TX_BUF> {
582    fn drop(&mut self) {
583        drop_event_regs(&mut self.event_regs);
584    }
585}
586
587/// Single-slot per-publisher arena. Concurrent `try_loan` calls on the
588/// same publisher race on the busy flag; loser gets `WouldBlock`.
589///
590/// `waker` lets `loan().await` register a waker before returning
591/// `Pending`; `release()` wakes it so the next `try_loan` succeeds
592/// without polling the executor loop. Phase 99.H' — replaces the
593/// earlier `wake_by_ref + Pending` busy yield with an event-driven
594/// wake, and gives `LoanFuture::Drop` a place to release a pending
595/// reservation cleanly.
596#[allow(dead_code)]
597pub(crate) struct TxArena<const TX_BUF: usize> {
598    busy: AtomicBool,
599    buf: UnsafeCell<[u8; TX_BUF]>,
600    waker: atomic_waker::AtomicWaker,
601}
602
603// SAFETY: Sync-ness of the arena is enforced by the `busy` flag — only
604// the thread that won the CAS may access `buf`, and only until commit/
605// discard releases the slot.
606unsafe impl<const TX_BUF: usize> Sync for TxArena<TX_BUF> {}
607
608#[allow(dead_code)] // unused when `rmw-lending` is on
609impl<const TX_BUF: usize> TxArena<TX_BUF> {
610    pub(crate) const fn new() -> Self {
611        Self {
612            busy: AtomicBool::new(false),
613            buf: UnsafeCell::new([0u8; TX_BUF]),
614            waker: atomic_waker::AtomicWaker::new(),
615        }
616    }
617
618    /// Try to claim the arena slot. Returns a raw pointer + len pair on
619    /// success; caller wraps it in a `PublishLoan`. Returns `false` if
620    /// the slot is already loaned.
621    ///
622    /// `&self` returning `&mut` is sound because the `busy` flag
623    /// gates exclusivity at runtime — the CAS in this function is
624    /// the only writer, and `release()` is only callable through the
625    /// loan's `Drop`.
626    #[allow(clippy::mut_from_ref)]
627    fn try_claim(&self, len: usize) -> Result<&mut [u8], LoanError> {
628        if len > TX_BUF {
629            return Err(LoanError::TooLarge);
630        }
631        if self
632            .busy
633            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
634            .is_err()
635        {
636            return Err(LoanError::WouldBlock);
637        }
638        // SAFETY: we just won the busy CAS; we hold exclusive access
639        // until release(). Lifetime is tied to `&self` for the loan.
640        let buf_ref: &mut [u8; TX_BUF] = unsafe { &mut *self.buf.get() };
641        Ok(&mut buf_ref[..len])
642    }
643
644    fn release(&self) {
645        self.busy.store(false, Ordering::Release);
646        // Wake any pending `LoanFuture` waiting on this arena. Cheap
647        // no-op if no one is waiting.
648        self.waker.wake();
649    }
650}
651
652impl<const TX_BUF: usize> EmbeddedRawPublisher<TX_BUF> {
653    /// Construct an [`EmbeddedRawPublisher`] from a backend-allocated
654    /// `RmwPublisher` handle. Public so external extension crates
655    /// (e.g. `nros-px4` for typed uORB wrappers) can wrap a handle
656    /// they obtained directly from the active session via
657    /// [`crate::Node::session_mut`] + a backend-specific create method.
658    ///
659    /// Most users should not call this — use [`crate::Node::create_publisher`]
660    /// or [`crate::Node::create_publisher_raw`] instead.
661    pub fn new(handle: session::RmwPublisher) -> Self {
662        Self {
663            handle,
664            arena: TxArena::new(),
665            event_regs: empty_event_regs(),
666        }
667    }
668
669    /// Phase 108.A — `true` if the active backend can fire the named
670    /// event for this raw publisher.
671    #[cfg(feature = "alloc")]
672    pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
673        use nros_rmw::Publisher as _;
674        self.handle.supports_event(kind)
675    }
676
677    /// Publish a pre-encoded byte slice. The byte format depends entirely
678    /// on the active RMW backend:
679    ///
680    /// - **zenoh / XRCE-DDS / DDS**: CDR-encoded payload including the
681    ///   4-byte CDR header.
682    /// - **uORB**: raw POD struct bytes (no header). Length must equal
683    ///   `size_of::<T::Msg>()` for the registered topic.
684    pub fn publish_raw(&self, data: &[u8]) -> Result<(), NodeError> {
685        self.handle
686            .publish_raw(data)
687            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
688    }
689
690    /// Phase 128.F.4 — raw publish with a wire-level attachment block.
691    ///
692    /// `attachment` rides alongside the payload on backends that
693    /// natively support it (zenoh-pico, Cyclone DDS). Backends without
694    /// native support silently discard `attachment` and fall back to
695    /// the regular [`publish_raw`](Self::publish_raw) path — the
696    /// default `Publisher::publish_raw_with_attachment` body in
697    /// `nros-rmw` does this delegation.
698    ///
699    /// Primary use case: cross-RMW bridges stamp the source backend's
700    /// RMW name as `bridge_origin` so a paired return bridge can drop
701    /// echoed frames deterministically.
702    pub fn publish_raw_with_attachment(
703        &self,
704        data: &[u8],
705        attachment: &[u8],
706    ) -> Result<(), NodeError> {
707        self.handle
708            .publish_raw_with_attachment(data, attachment)
709            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
710    }
711
712    /// Phase 108.B — manually assert this publisher's liveliness.
713    /// Required for `QosLivelinessPolicy::ManualByTopic` /
714    /// `ManualByNode`. No-op for AUTOMATIC / NONE.
715    pub fn assert_liveliness(&self) -> Result<(), NodeError> {
716        use nros_rmw::Publisher as _;
717        self.handle
718            .assert_liveliness()
719            .map_err(NodeError::Transport)
720    }
721
722    /// Reserve a writable slot of `len` bytes. Caller writes into the
723    /// returned [`PublishLoan`] and calls [`PublishLoan::commit`] to
724    /// publish. Never blocks; returns [`LoanError::WouldBlock`] when the
725    /// slot is already in use (arena fallback) or the backend's outbound
726    /// stream is full (lending path), and [`LoanError::TooLarge`] when
727    /// `len` exceeds the publisher's slot capacity.
728    ///
729    /// With the `rmw-lending` cargo feature on, this dispatches to the
730    /// active backend's [`SlotLending::try_lend_slot`](nros_rmw::SlotLending::try_lend_slot)
731    /// — zero-copy on backends that natively lend (zenoh-pico via
732    /// `z_bytes_from_static_buf`, XRCE-DDS via `uxr_prepare_output_stream`).
733    /// Without `rmw-lending`, the arena fallback is used: caller fills a
734    /// per-publisher inline slot, [`commit`](PublishLoan::commit) calls
735    /// the backend's `publish_raw` (single memcpy into the backend's
736    /// outbound buffer, same as `publish_raw` directly).
737    #[cfg(not(feature = "rmw-lending"))]
738    pub fn try_loan(&self, len: usize) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
739        let slice = self.arena.try_claim(len)?;
740        Ok(PublishLoan {
741            publisher: self,
742            slice,
743            committed: false,
744        })
745    }
746
747    /// `rmw-lending` variant — see the no-lending [`try_loan`] for the docs.
748    #[cfg(feature = "rmw-lending")]
749    pub fn try_loan(&self, len: usize) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
750        use nros_rmw::SlotLending;
751        match self.handle.try_lend_slot(len) {
752            Ok(Some(slot)) => Ok(PublishLoan {
753                publisher: self,
754                backend_slot: Some(slot),
755                committed: false,
756            }),
757            Ok(None) => Err(LoanError::WouldBlock),
758            Err(e) => Err(LoanError::Backend(e)),
759        }
760    }
761
762    /// Sync blocking loan with timeout. Spins the executor until the
763    /// arena slot is free or `timeout` elapses.
764    ///
765    /// Useful when you publish from a sync context that owns the
766    /// executor and want to block on a busy arena (rare — single-slot
767    /// arena means contention only when concurrent task tries the same
768    /// publisher, in which case the offending other task should have
769    /// committed promptly).
770    pub fn loan_with_timeout(
771        &self,
772        len: usize,
773        executor: &mut super::Executor,
774        timeout: core::time::Duration,
775    ) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
776        if len > TX_BUF {
777            return Err(LoanError::TooLarge);
778        }
779        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
780        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
781        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
782        let mut budget = WaitBudget::new(max_spins, timeout);
783        loop {
784            match self.try_loan(len) {
785                Ok(loan) => return Ok(loan),
786                Err(LoanError::WouldBlock) => {
787                    executor.spin_once(spin_interval);
788                    if !budget.tick() {
789                        return Err(LoanError::WouldBlock);
790                    }
791                }
792                Err(other) => return Err(other),
793            }
794        }
795    }
796
797    /// Async-await on a free loan slot. Returns the loan as soon as
798    /// the arena's busy flag clears (no-lending path) or as soon as
799    /// the backend's outbound stream has room (lending path).
800    ///
801    /// Phase 99.H': cancellation-safe Future. Registers the task's
802    /// waker on the arena's [`AtomicWaker`] before checking
803    /// [`try_loan`]; another task's `commit` / `discard` calls
804    /// [`TxArena::release`] which wakes us. Dropping the future before
805    /// it resolves removes nothing from any wait queue (single-slot
806    /// AtomicWaker semantics: only the latest registration matters)
807    /// and explicitly wakes another waiter so the next task in line
808    /// gets a poll. No `PublishLoan` is materialised on cancel paths.
809    pub fn loan(&self, len: usize) -> LoanFuture<'_, TX_BUF> {
810        LoanFuture {
811            publisher: self,
812            len,
813            registered: false,
814        }
815    }
816}
817
818/// Future returned by [`EmbeddedRawPublisher::loan`]. Phase 99.H'
819/// cancellation-safe variant: if dropped before resolving, it wakes
820/// the next pending waiter so the busy-flag-clear signal isn't lost
821/// to the cancelled task.
822#[must_use = "futures do nothing unless polled"]
823pub struct LoanFuture<'a, const TX_BUF: usize> {
824    publisher: &'a EmbeddedRawPublisher<TX_BUF>,
825    len: usize,
826    /// Set on the first `Pending` return so `Drop` knows whether a
827    /// waker was registered (and thus another waiter may need a wake).
828    registered: bool,
829}
830
831impl<'a, const TX_BUF: usize> core::future::Future for LoanFuture<'a, TX_BUF> {
832    type Output = Result<PublishLoan<'a, TX_BUF>, LoanError>;
833
834    fn poll(
835        self: core::pin::Pin<&mut Self>,
836        cx: &mut core::task::Context<'_>,
837    ) -> core::task::Poll<Self::Output> {
838        // SAFETY: LoanFuture is `Unpin` for all practical purposes —
839        // it holds only `&publisher`, `len`, and a bool. Move out of
840        // Pin for the body.
841        let this = self.get_mut();
842
843        // Register-then-check: closes the race where another task's
844        // `release` fires between `try_loan` returning WouldBlock and
845        // the waker landing. The arena's AtomicWaker stores the
846        // latest waker; we update it on every poll so a `select!` /
847        // re-poll under a different waker observes the right one.
848        loan_register_waker(this.publisher, cx.waker());
849        this.registered = true;
850
851        match this.publisher.try_loan(this.len) {
852            Ok(loan) => core::task::Poll::Ready(Ok(loan)),
853            Err(LoanError::WouldBlock) => core::task::Poll::Pending,
854            Err(other) => core::task::Poll::Ready(Err(other)),
855        }
856    }
857}
858
859impl<'a, const TX_BUF: usize> Drop for LoanFuture<'a, TX_BUF> {
860    fn drop(&mut self) {
861        // If we registered a waker but never resolved, the busy flag
862        // may have just cleared and we'd swallow the wake. Forward it
863        // to the next waiter so the line keeps moving. Cheap no-op
864        // when no one else is waiting.
865        if self.registered {
866            loan_wake_next(self.publisher);
867        }
868    }
869}
870
871// Indirection so the `rmw-lending` build (which has no arena) can
872// stub these. With `rmw-lending` on, the lending Future variant uses
873// the executor's drive_io spin to drain the backend stream — there's
874// no arena-level wake source, so the helpers degrade to a self-wake.
875#[cfg(not(feature = "rmw-lending"))]
876fn loan_register_waker<const TX_BUF: usize>(
877    pub_: &EmbeddedRawPublisher<TX_BUF>,
878    waker: &core::task::Waker,
879) {
880    pub_.arena.waker.register(waker);
881}
882
883#[cfg(not(feature = "rmw-lending"))]
884fn loan_wake_next<const TX_BUF: usize>(pub_: &EmbeddedRawPublisher<TX_BUF>) {
885    pub_.arena.waker.wake();
886}
887
888#[cfg(feature = "rmw-lending")]
889fn loan_register_waker<const TX_BUF: usize>(
890    _pub_: &EmbeddedRawPublisher<TX_BUF>,
891    waker: &core::task::Waker,
892) {
893    // No arena wake source under lending; self-wake so the runtime
894    // re-polls after the next executor tick (which drains the
895    // backend's outbound stream via `drive_io`).
896    waker.wake_by_ref();
897}
898
899#[cfg(feature = "rmw-lending")]
900fn loan_wake_next<const TX_BUF: usize>(_pub_: &EmbeddedRawPublisher<TX_BUF>) {
901    // No-op: no AtomicWaker on the arena under the lending build.
902}
903
904/// Error type for [`EmbeddedRawPublisher::try_loan`].
905#[derive(Debug, Clone, PartialEq, Eq)]
906pub enum LoanError {
907    /// Requested length exceeds the publisher's arena slot capacity.
908    TooLarge,
909    /// Arena slot already in use; another publish is in flight on this
910    /// publisher. Retry after the other loan commits or discards.
911    WouldBlock,
912    /// Backend rejected the publish at commit time.
913    Backend(TransportError),
914}
915
916impl From<TransportError> for LoanError {
917    fn from(e: TransportError) -> Self {
918        LoanError::Backend(e)
919    }
920}
921
922/// Writable loan into a [`EmbeddedRawPublisher`]'s slot.
923///
924/// User fills `as_mut()` then calls [`commit`](Self::commit) to publish,
925/// or [`discard`](Self::discard) to release the slot without publishing.
926/// Dropping without either silently discards (slot freed); a
927/// `#[must_use]` warning catches accidental drops at compile time.
928///
929/// Two backings, selected at compile time by the `rmw-lending` feature:
930///
931/// - **Arena (default)**: per-publisher inline `[u8; TX_BUF]` slot. On
932///   commit, `publish_raw` memcpys into the backend's outbound buffer.
933/// - **Backend lending (`rmw-lending`)**: slot owned by the backend
934///   (zenoh-pico's static buffer aliased via `z_bytes_from_static_buf`,
935///   XRCE's `ucdrBuffer` reservation). True zero-copy publish.
936#[must_use = "PublishLoan must be committed or discarded; dropping silently rolls back"]
937#[cfg(not(feature = "rmw-lending"))]
938pub struct PublishLoan<'a, const TX_BUF: usize> {
939    publisher: &'a EmbeddedRawPublisher<TX_BUF>,
940    slice: &'a mut [u8],
941    committed: bool,
942}
943
944#[must_use = "PublishLoan must be committed or discarded; dropping silently rolls back"]
945#[cfg(feature = "rmw-lending")]
946pub struct PublishLoan<'a, const TX_BUF: usize> {
947    publisher: &'a EmbeddedRawPublisher<TX_BUF>,
948    /// `Option` so `commit` can move the slot out via `take()` without
949    /// triggering Drop's release path. Always `Some(_)` until `commit`
950    /// or `discard` runs.
951    backend_slot: Option<<session::RmwPublisher as nros_rmw::SlotLending>::Slot<'a>>,
952    committed: bool,
953}
954
955#[cfg(not(feature = "rmw-lending"))]
956impl<'a, const TX_BUF: usize> PublishLoan<'a, TX_BUF> {
957    /// Mutable view into the loaned bytes. Caller writes message data here.
958    #[allow(clippy::should_implement_trait)]
959    pub fn as_mut(&mut self) -> &mut [u8] {
960        self.slice
961    }
962
963    /// Commit the loan: hand the bytes to the backend's `publish_raw`,
964    /// then release the arena slot. Returns the backend's publish error
965    /// if any (slot is released regardless).
966    pub fn commit(mut self) -> Result<(), LoanError> {
967        let res = self
968            .publisher
969            .handle
970            .publish_raw(self.slice)
971            .map_err(|_| LoanError::Backend(TransportError::PublishFailed));
972        self.committed = true;
973        // Drop runs and releases the slot.
974        res
975    }
976
977    /// Discard the loan without publishing. Equivalent to dropping, but
978    /// explicit (no #[must_use] warning).
979    pub fn discard(mut self) {
980        self.committed = true; // Suppress Drop's "discard" log if any.
981        drop(self);
982    }
983}
984
985#[cfg(feature = "rmw-lending")]
986impl<'a, const TX_BUF: usize> PublishLoan<'a, TX_BUF> {
987    /// Mutable view into the backend-lent bytes.
988    #[allow(clippy::should_implement_trait)]
989    pub fn as_mut(&mut self) -> &mut [u8] {
990        // SAFETY-invariant: `backend_slot` is `Some` for the whole life of
991        // a `PublishLoan` — only `commit`/`discard` take it, and both
992        // consume `self` by value, so no `&mut self` method can observe
993        // `None`.
994        self.backend_slot
995            .as_mut()
996            .expect("PublishLoan slot already consumed")
997            .as_mut()
998    }
999
1000    /// Commit the loan: hand the slot to the backend's `commit_slot` for
1001    /// flushing. The slot's bytes are written to the wire without an
1002    /// extra user-side memcpy.
1003    pub fn commit(mut self) -> Result<(), LoanError> {
1004        use nros_rmw::SlotLending;
1005        // SAFETY-invariant: first and only `take` — `commit` consumes
1006        // `self`, so the slot is still `Some` here.
1007        let slot = self
1008            .backend_slot
1009            .take()
1010            .expect("PublishLoan slot already consumed");
1011        self.committed = true;
1012        self.publisher
1013            .handle
1014            .commit_slot(slot)
1015            .map_err(LoanError::Backend)
1016    }
1017
1018    /// Discard the loan without publishing. The backend-owned slot is
1019    /// released by its own Drop when this `PublishLoan` is dropped.
1020    pub fn discard(mut self) {
1021        self.committed = true;
1022        // backend_slot's Option<Slot> drops here, releasing the slot.
1023        drop(self.backend_slot.take());
1024    }
1025}
1026
1027#[cfg(not(feature = "rmw-lending"))]
1028impl<'a, const TX_BUF: usize> Drop for PublishLoan<'a, TX_BUF> {
1029    fn drop(&mut self) {
1030        // Slot always returned to the free pool; whether the bytes were
1031        // actually published is encoded in `committed`. Future telemetry
1032        // hook could log uncommitted drops in debug builds.
1033        self.publisher.arena.release();
1034    }
1035}
1036
1037// rmw-lending variant relies on the backend's `Slot` Drop impl to release
1038// the underlying buffer/stream slot. No explicit nros-side Drop needed.
1039
1040// ============================================================================
1041// Subscription
1042// ============================================================================
1043
1044/// Typed subscription handle with internal receive buffer.
1045///
1046/// Two methods, both byte-oriented at the wire:
1047///
1048/// - [`try_recv`](Self::try_recv) / [`recv`](Self::recv) — pull bytes
1049///   from the backend, CDR-decode into `M: RosMessage`, hand back
1050///   ownership of the typed message.
1051/// - [`try_recv_raw`](Self::try_recv_raw) — copy bytes into the
1052///   subscription's internal buffer and return the length, leaving CDR
1053///   decoding to the caller.
1054///
1055/// **No typed `borrow()` exists.** Borrow lives exclusively on
1056/// [`RawSubscription`]. `RecvView` is `&[u8]` semantics; CDR decoding
1057/// into a typed `M` requires owning the bytes (or running the decoder
1058/// in place), which the borrow contract doesn't fit. See
1059/// `docs/design/0010-zero-copy-raw-api.md` decision D7.
1060pub struct Subscription<M, const RX_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE }> {
1061    pub(crate) handle: session::RmwSubscriber,
1062    pub(crate) buffer: [u8; RX_BUF],
1063    /// Phase 108 — registered event closures.
1064    pub(crate) event_regs: EventRegs,
1065    pub(crate) _phantom: PhantomData<M>,
1066}
1067
1068impl<M, const RX_BUF: usize> Drop for Subscription<M, RX_BUF> {
1069    fn drop(&mut self) {
1070        drop_event_regs(&mut self.event_regs);
1071    }
1072}
1073
1074impl<M: RosMessage, const RX_BUF: usize> Subscription<M, RX_BUF> {
1075    /// Try to receive a typed message (non-blocking).
1076    pub fn try_recv(&mut self) -> Result<Option<M>, NodeError> {
1077        match self
1078            .handle
1079            .try_recv_raw(&mut self.buffer)
1080            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?
1081        {
1082            Some(len) => {
1083                let mut reader = CdrReader::new_with_header(&self.buffer[..len])
1084                    .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1085                let msg = M::deserialize(&mut reader)
1086                    .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1087                Ok(Some(msg))
1088            }
1089            None => Ok(None),
1090        }
1091    }
1092
1093    /// Try to receive raw CDR-encoded data (non-blocking).
1094    pub fn try_recv_raw(&mut self) -> Result<Option<usize>, NodeError> {
1095        self.handle
1096            .try_recv_raw(&mut self.buffer)
1097            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1098    }
1099
1100    /// Get the receive buffer (valid after `try_recv_raw`).
1101    pub fn buffer(&self) -> &[u8] {
1102        &self.buffer
1103    }
1104
1105    // ====================================================================
1106    // Phase 108 — status events
1107    // ====================================================================
1108    //
1109    // Subscriber-side: `LivelinessChanged`, `RequestedDeadlineMissed`,
1110    // `MessageLost`. Returns
1111    // `NodeError::Transport(TransportError::Unsupported)` if the
1112    // active backend doesn't generate the event for this entity.
1113
1114    /// `true` if the active backend can fire the named event for this
1115    /// subscriber.
1116    #[cfg(feature = "alloc")]
1117    pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
1118        use nros_rmw::Subscriber as _;
1119        self.handle.supports_event(kind)
1120    }
1121
1122    /// Register a callback for `LivelinessChanged`. Fires when a
1123    /// tracked publisher's liveliness state changes.
1124    #[cfg(feature = "alloc")]
1125    pub fn on_liveliness_changed<F>(&mut self, cb: F) -> Result<(), NodeError>
1126    where
1127        F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
1128    {
1129        register_sub_event_liveliness::<F>(&mut self.handle, &mut self.event_regs, cb)
1130    }
1131
1132    /// Register a callback for `RequestedDeadlineMissed`. Fires when
1133    /// an expected sample doesn't arrive within `deadline`.
1134    #[cfg(feature = "alloc")]
1135    pub fn on_requested_deadline_missed<F>(
1136        &mut self,
1137        deadline: core::time::Duration,
1138        cb: F,
1139    ) -> Result<(), NodeError>
1140    where
1141        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1142    {
1143        register_sub_event_count::<F, _>(
1144            &mut self.handle,
1145            &mut self.event_regs,
1146            nros_rmw::EventKind::RequestedDeadlineMissed,
1147            deadline.as_millis().min(u32::MAX as u128) as u32,
1148            cb,
1149            |payload, f| {
1150                if let nros_rmw::EventPayload::RequestedDeadlineMissed(s) = payload {
1151                    f(*s);
1152                }
1153            },
1154        )
1155    }
1156
1157    /// Register a callback for `MessageLost`. Fires when the backend
1158    /// drops a sample (overflow, etc.).
1159    #[cfg(feature = "alloc")]
1160    pub fn on_message_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
1161    where
1162        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1163    {
1164        register_sub_event_count::<F, _>(
1165            &mut self.handle,
1166            &mut self.event_regs,
1167            nros_rmw::EventKind::MessageLost,
1168            0,
1169            cb,
1170            |payload, f| {
1171                if let nros_rmw::EventPayload::MessageLost(s) = payload {
1172                    f(*s);
1173                }
1174            },
1175        )
1176    }
1177
1178    /// Check if data is available without consuming it.
1179    pub fn has_data(&self) -> bool {
1180        self.handle.has_data()
1181    }
1182
1183    /// Process the received message in-place without copying.
1184    pub fn process_in_place(&mut self, f: impl FnOnce(&M)) -> Result<bool, NodeError> {
1185        let mut deser_err = false;
1186        let processed = self
1187            .handle
1188            .process_raw_in_place(|raw| {
1189                match CdrReader::new_with_header(raw).and_then(|mut r| M::deserialize(&mut r)) {
1190                    Ok(msg) => f(&msg),
1191                    Err(_) => deser_err = true,
1192                }
1193            })
1194            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1195
1196        if deser_err {
1197            return Err(NodeError::Transport(TransportError::DeserializationError));
1198        }
1199        Ok(processed)
1200    }
1201
1202    /// Async: wait for the next message (no `futures` dependency needed).
1203    ///
1204    /// Requires a background task running `executor.spin_async()` to drive
1205    /// I/O. Returns `Ok(msg)` on the next received message, or `Err` if the
1206    /// transport reports an error.
1207    ///
1208    /// When the `stream` feature is enabled, prefer `StreamExt::next()` /
1209    /// `TryStreamExt::try_next()` for combinator support.
1210    ///
1211    /// # Example
1212    ///
1213    /// ```ignore
1214    /// let mut sub = node.create_subscription::<Int32>("/topic")?;
1215    /// loop {
1216    ///     let msg = sub.recv().await?;
1217    ///     /* handle msg */
1218    /// }
1219    /// ```
1220    pub async fn recv(&mut self) -> Result<M, NodeError> {
1221        core::future::poll_fn(|cx| {
1222            // Register the waker FIRST, then check for data. This ordering
1223            // closes the race window where a subscriber callback fires
1224            // between `try_recv` returning `None` and the waker being
1225            // registered — the wake would otherwise be delivered to the
1226            // previous waker (or nowhere) and the task would hang.
1227            self.handle.register_waker(cx.waker());
1228            match self.try_recv() {
1229                Ok(Some(msg)) => core::task::Poll::Ready(Ok(msg)),
1230                Ok(None) => core::task::Poll::Pending,
1231                Err(e) => core::task::Poll::Ready(Err(e)),
1232            }
1233        })
1234        .await
1235    }
1236
1237    /// Sync: wait for the next message, spinning the executor.
1238    ///
1239    /// Returns `Ok(Some(msg))` if a message arrives within `timeout_ms`,
1240    /// or `Ok(None)` on timeout. Unlike [`Promise::wait()`], timeout is
1241    /// not an error — the caller typically retries in a loop.
1242    ///
1243    /// # Example
1244    ///
1245    /// ```ignore
1246    /// while let Some(msg) = sub.wait_next(&mut executor, core::time::Duration::from_millis(1000))? {
1247    ///     /* handle msg */
1248    /// }
1249    /// ```
1250    pub fn wait_next(
1251        &mut self,
1252        executor: &mut super::Executor,
1253        timeout: core::time::Duration,
1254    ) -> Result<Option<M>, NodeError> {
1255        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
1256        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
1257        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
1258        let mut budget = WaitBudget::new(max_spins, timeout);
1259        loop {
1260            executor.spin_once(spin_interval);
1261            if let Some(msg) = self.try_recv()? {
1262                return Ok(Some(msg));
1263            }
1264            if !budget.tick() {
1265                return Ok(None);
1266            }
1267        }
1268    }
1269}
1270
1271// ============================================================================
1272// RawSubscription — typeless subscription for non-ROS message wire formats
1273// ============================================================================
1274
1275/// Typeless subscription handle. Counterpart of [`EmbeddedRawPublisher`].
1276///
1277/// The user owns the decoding step: call [`try_recv_raw`](Self::try_recv_raw)
1278/// to fill an internal buffer with bytes whose format depends on the active
1279/// RMW backend, then interpret them however is appropriate (memcpy, custom
1280/// parser, …).
1281pub struct RawSubscription<const RX_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE }> {
1282    pub(crate) handle: session::RmwSubscriber,
1283    pub(crate) buffer: [u8; RX_BUF],
1284    /// Phase 108 — registered event closures.
1285    pub(crate) event_regs: EventRegs,
1286}
1287
1288impl<const RX_BUF: usize> Drop for RawSubscription<RX_BUF> {
1289    fn drop(&mut self) {
1290        drop_event_regs(&mut self.event_regs);
1291    }
1292}
1293
1294impl<const RX_BUF: usize> RawSubscription<RX_BUF> {
1295    /// Construct a [`RawSubscription`] from a backend-allocated
1296    /// `RmwSubscriber` handle. Public so external extension crates
1297    /// (e.g. `nros-px4` for typed uORB wrappers) can wrap a handle
1298    /// they obtained directly from the active session via
1299    /// [`crate::Node::session_mut`] + a backend-specific create method.
1300    ///
1301    /// Most users should not call this — use
1302    /// [`crate::Node::create_subscription`] or
1303    /// [`crate::Node::create_subscription_raw`] instead.
1304    pub fn new(handle: session::RmwSubscriber) -> Self {
1305        Self {
1306            handle,
1307            buffer: [0u8; RX_BUF],
1308            event_regs: empty_event_regs(),
1309        }
1310    }
1311
1312    /// Try to receive raw bytes (non-blocking). Returns `Ok(Some(len))`
1313    /// with the message length on success; the bytes live in
1314    /// [`buffer`](Self::buffer) until the next call.
1315    pub fn try_recv_raw(&mut self) -> Result<Option<usize>, NodeError> {
1316        self.handle
1317            .try_recv_raw(&mut self.buffer)
1318            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1319    }
1320
1321    /// Phase 128.F.4 — raw receive that also surfaces the incoming
1322    /// sample's wire-level attachment block.
1323    ///
1324    /// Returns `Ok(Some((payload_len, attachment_len)))`. The payload
1325    /// lives in [`buffer`](Self::buffer); the attachment is written
1326    /// into caller-supplied `att_buf`. `attachment_len == 0` means
1327    /// the incoming sample carried no attachment.
1328    ///
1329    /// Backends without native attachment support delegate to
1330    /// [`try_recv_raw`](Self::try_recv_raw) and always report
1331    /// `attachment_len == 0` (default `Subscriber` trait body in
1332    /// `nros-rmw`).
1333    pub fn try_recv_raw_with_attachment(
1334        &mut self,
1335        att_buf: &mut [u8],
1336    ) -> Result<Option<(usize, usize)>, NodeError> {
1337        self.handle
1338            .try_recv_raw_with_attachment(&mut self.buffer, att_buf)
1339            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1340    }
1341
1342    /// Phase 252 / issue 0073 — raw receive that also returns the E2E
1343    /// [`IntegrityStatus`](nros_rmw::IntegrityStatus) (CRC + sequence gap/dup) for
1344    /// the C/C++ `nros_subscription_try_recv_validated` path. The validator lives
1345    /// in the backend handle (`try_recv_validated`), so no typed message is needed;
1346    /// the payload lives in [`buffer`](Self::buffer). `crc_valid == None` when the
1347    /// wire sample carried no CRC (e.g. a publisher built without `safety-e2e`).
1348    #[cfg(feature = "safety-e2e")]
1349    pub fn try_recv_validated(
1350        &mut self,
1351    ) -> Result<Option<(usize, nros_rmw::IntegrityStatus)>, NodeError> {
1352        use nros_rmw::Subscriber;
1353        self.handle
1354            .try_recv_validated(&mut self.buffer)
1355            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1356    }
1357
1358    /// Phase 124.D.1 — burst-take. Drain up to `max_msgs` queued
1359    /// samples into the caller-supplied contiguous block in one
1360    /// call, with the i-th sample at
1361    /// `buf[i * per_msg_cap .. i * per_msg_cap + out_lens[i]]`.
1362    /// Returns the number of messages delivered.
1363    ///
1364    /// Backends without a native batch take inherit the
1365    /// `Subscriber::try_recv_sequence` default body which loop-drives
1366    /// `try_recv_raw` — same shape, same observable result; the
1367    /// batched API just lets sensor loops commit to the call shape
1368    /// regardless of backend support.
1369    pub fn try_recv_sequence(
1370        &mut self,
1371        buf: &mut [u8],
1372        per_msg_cap: usize,
1373        max_msgs: usize,
1374        out_lens: &mut [usize],
1375    ) -> Result<usize, NodeError> {
1376        use nros_rmw::Subscriber;
1377        self.handle
1378            .try_recv_sequence(buf, per_msg_cap, max_msgs, out_lens)
1379            .map_err(NodeError::Transport)
1380    }
1381
1382    /// Phase 122.3.c.6.e — register a `Waker` that fires when a new
1383    /// message arrives. Mirror of the existing service-server /
1384    /// service-client wake plumbing. No-op on backends that don't
1385    /// support waking — caller falls back to polling.
1386    pub fn register_waker(&self, waker: &core::task::Waker) {
1387        use nros_rmw::Subscriber;
1388        self.handle.register_waker(waker);
1389    }
1390
1391    /// Phase 108.A — `true` if the active backend can fire the named
1392    /// event for this raw subscription.
1393    #[cfg(feature = "alloc")]
1394    pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
1395        use nros_rmw::Subscriber as _;
1396        self.handle.supports_event(kind)
1397    }
1398
1399    /// Phase 108.A — register a callback for `LivelinessChanged`.
1400    #[cfg(feature = "alloc")]
1401    pub fn on_liveliness_changed<F>(&mut self, cb: F) -> Result<(), NodeError>
1402    where
1403        F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
1404    {
1405        register_sub_event_liveliness::<F>(&mut self.handle, &mut self.event_regs, cb)
1406    }
1407
1408    /// Phase 108.A — register a callback for `RequestedDeadlineMissed`.
1409    #[cfg(feature = "alloc")]
1410    pub fn on_requested_deadline_missed<F>(
1411        &mut self,
1412        deadline: core::time::Duration,
1413        cb: F,
1414    ) -> Result<(), NodeError>
1415    where
1416        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1417    {
1418        register_sub_event_count::<F, _>(
1419            &mut self.handle,
1420            &mut self.event_regs,
1421            nros_rmw::EventKind::RequestedDeadlineMissed,
1422            deadline.as_millis().min(u32::MAX as u128) as u32,
1423            cb,
1424            |payload, f| {
1425                if let nros_rmw::EventPayload::RequestedDeadlineMissed(s) = payload {
1426                    f(*s);
1427                }
1428            },
1429        )
1430    }
1431
1432    /// Phase 108.A — register a callback for `MessageLost`.
1433    #[cfg(feature = "alloc")]
1434    pub fn on_message_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
1435    where
1436        F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1437    {
1438        register_sub_event_count::<F, _>(
1439            &mut self.handle,
1440            &mut self.event_regs,
1441            nros_rmw::EventKind::MessageLost,
1442            0,
1443            cb,
1444            |payload, f| {
1445                if let nros_rmw::EventPayload::MessageLost(s) = payload {
1446                    f(*s);
1447                }
1448            },
1449        )
1450    }
1451
1452    /// Get the receive buffer (valid after [`try_recv_raw`](Self::try_recv_raw)).
1453    pub fn buffer(&self) -> &[u8] {
1454        &self.buffer
1455    }
1456
1457    /// Check if data is available without consuming it.
1458    pub fn has_data(&self) -> bool {
1459        self.handle.has_data()
1460    }
1461
1462    /// Try to borrow the next available message in place. Returns
1463    /// `Ok(None)` if no message is ready; never blocks.
1464    ///
1465    /// The returned [`RecvView`] borrows the subscriber's internal
1466    /// receive buffer. Lifetime is tied to `&mut self` — only one view
1467    /// can be live at a time, and the next `try_borrow` / `try_recv_raw`
1468    /// call invalidates the previous view's bytes.
1469    ///
1470    /// View is `!Send + !Sync` to discourage holding it across `.await`
1471    /// or thread boundaries (would block subsequent receives on the
1472    /// same subscriber).
1473    #[cfg(not(feature = "rmw-lending"))]
1474    pub fn try_borrow(&mut self) -> Result<Option<RecvView<'_>>, NodeError> {
1475        match self.try_recv_raw()? {
1476            Some(len) => Ok(Some(RecvView {
1477                bytes: &self.buffer[..len],
1478                _marker: core::marker::PhantomData,
1479            })),
1480            None => Ok(None),
1481        }
1482    }
1483
1484    /// `rmw-lending` variant — dispatches to the backend's
1485    /// [`SlotBorrowing::try_borrow`](nros_rmw::SlotBorrowing::try_borrow)
1486    /// for true zero-copy receive (zenoh-pico's static buffer borrowed
1487    /// directly via `z_bytes_get_contiguous_view`, XRCE's slot borrowed
1488    /// in place). The bytes never touch `self.buffer`.
1489    #[cfg(feature = "rmw-lending")]
1490    pub fn try_borrow(&mut self) -> Result<Option<RecvView<'_>>, NodeError> {
1491        use nros_rmw::SlotBorrowing;
1492        match self.handle.try_borrow() {
1493            Ok(Some(view)) => Ok(Some(RecvView {
1494                view: Some(view),
1495                _marker: core::marker::PhantomData,
1496            })),
1497            Ok(None) => Ok(None),
1498            Err(e) => Err(NodeError::Transport(e)),
1499        }
1500    }
1501
1502    /// Async-await on the next message, returning a [`RecvView`].
1503    /// Mirrors the `Subscription::recv` pattern but typeless.
1504    ///
1505    /// Backend wake source: `Subscriber::register_waker`. Same race-
1506    /// safe register-then-check ordering as `Subscription::recv`.
1507    pub async fn borrow(&mut self) -> Result<RecvView<'_>, NodeError> {
1508        // Wait for `has_data` to flip true via the backend's
1509        // AtomicWaker, *without* holding any borrow that `try_borrow`
1510        // would need afterwards. Borrow `&self.handle` immutably
1511        // inside poll_fn so the borrow checker can prove `&mut self`
1512        // is free by the time we return Ok(view) below.
1513        //
1514        // Phase 99.H' cancellation safety: there is no reservation
1515        // taken inside poll. Dropping the future before it resolves
1516        // simply abandons whatever waker registration the backend
1517        // accepted; the next call to `borrow().await` (or
1518        // `try_borrow`) re-registers. No leaked state.
1519        {
1520            let handle = &self.handle;
1521            core::future::poll_fn(|cx| {
1522                // Register-then-check: closes the race where a backend
1523                // callback fires between has_data returning false and
1524                // the waker landing.
1525                handle.register_waker(cx.waker());
1526                if handle.has_data() {
1527                    core::task::Poll::Ready(())
1528                } else {
1529                    core::task::Poll::Pending
1530                }
1531            })
1532            .await;
1533        }
1534        // has_data was true at some point; in the single-threaded
1535        // executor there's no other reader, so try_borrow returns Some.
1536        // A spurious wake (very unlikely on shipping backends) returns
1537        // WouldBlock and the caller can retry.
1538        match self.try_borrow()? {
1539            Some(view) => Ok(view),
1540            None => Err(NodeError::Transport(TransportError::WouldBlock)),
1541        }
1542    }
1543
1544    /// Sync blocking borrow with timeout. Spins the executor until a
1545    /// message is available or `timeout` elapses.
1546    ///
1547    /// Returns `Ok(Some(view))` on success, `Ok(None)` on timeout.
1548    /// The view's lifetime is tied to `&mut self`.
1549    pub fn borrow_with_timeout(
1550        &mut self,
1551        executor: &mut super::Executor,
1552        timeout: core::time::Duration,
1553    ) -> Result<Option<RecvView<'_>>, NodeError> {
1554        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
1555        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
1556        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
1557        let mut budget = WaitBudget::new(max_spins, timeout);
1558        loop {
1559            executor.spin_once(spin_interval);
1560            if self.has_data() {
1561                return self.try_borrow();
1562            }
1563            if !budget.tick() {
1564                return Ok(None);
1565            }
1566        }
1567    }
1568}
1569
1570// ============================================================================
1571// RawServiceServer / RawServiceClient (Phase 122.3.c — L1 polling, typeless)
1572// ============================================================================
1573
1574/// Typeless service-server handle. L1 counterpart of
1575/// [`EmbeddedServiceServer`] for callers that own their own scheduler
1576/// (RTIC, embassy, FreeRTOS-task-per-entity) and the C / C++ FFI
1577/// shims.
1578///
1579/// Holds the transport handle plus an inline request buffer. The
1580/// caller polls [`try_recv_request_raw`](Self::try_recv_request_raw)
1581/// and sends replies via [`send_reply_raw`](Self::send_reply_raw)
1582/// with raw CDR bytes.
1583pub struct RawServiceServer<
1584    const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1585    const RESP_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1586> {
1587    pub(crate) handle: session::RmwServiceServer,
1588    pub(crate) req_buffer: [u8; REQ_BUF],
1589    pub(crate) _phantom_resp: PhantomData<[u8; RESP_BUF]>,
1590}
1591
1592impl<const REQ_BUF: usize, const RESP_BUF: usize> RawServiceServer<REQ_BUF, RESP_BUF> {
1593    /// Phase 122.3.c.6.e — register a `Waker` that fires when a new
1594    /// request arrives. Mirror of the existing subscriber /
1595    /// service-client wake plumbing. No-op on backends that don't
1596    /// support waking — caller falls back to polling.
1597    pub fn register_waker(&self, waker: &core::task::Waker) {
1598        use nros_rmw::ServiceServerTrait;
1599        self.handle.register_waker(waker);
1600    }
1601
1602    /// Construct a [`RawServiceServer`] from a backend-allocated
1603    /// `RmwServiceServer` handle. Public so external crates and the
1604    /// C / C++ FFI shims can wrap a handle obtained directly from
1605    /// [`crate::Node::session_mut`].
1606    pub fn new(handle: session::RmwServiceServer) -> Self {
1607        Self {
1608            handle,
1609            req_buffer: [0u8; REQ_BUF],
1610            _phantom_resp: PhantomData,
1611        }
1612    }
1613
1614    /// Try to receive a service request (non-blocking).
1615    ///
1616    /// Returns `Ok(Some((len, sequence_number)))` when a request is
1617    /// available — the raw CDR bytes live in
1618    /// [`req_buffer`](Self::req_buffer) at `&req_buffer()[..len]`
1619    /// until the next call. The sequence number is required by
1620    /// [`send_reply_raw`](Self::send_reply_raw).
1621    pub fn try_recv_request_raw(&mut self) -> Result<Option<(usize, i64)>, NodeError> {
1622        match self.handle.try_recv_request(&mut self.req_buffer) {
1623            Ok(Some(req)) => Ok(Some((req.data.len(), req.sequence_number))),
1624            Ok(None) => Ok(None),
1625            Err(_) => Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
1626        }
1627    }
1628
1629    /// Borrow the inline request buffer. Valid after a successful
1630    /// [`try_recv_request_raw`](Self::try_recv_request_raw) call.
1631    pub fn req_buffer(&self) -> &[u8] {
1632        &self.req_buffer
1633    }
1634
1635    /// Send a reply with raw CDR bytes. `sequence_number` must match
1636    /// the value returned by the most recent
1637    /// [`try_recv_request_raw`](Self::try_recv_request_raw).
1638    pub fn send_reply_raw(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), NodeError> {
1639        self.handle
1640            .send_reply(sequence_number, data)
1641            .map_err(|_| NodeError::ServiceReplyFailed)
1642    }
1643}
1644
1645/// Typeless service-client handle. L1 counterpart of
1646/// [`EmbeddedServiceClient`] for the same audience as
1647/// [`RawServiceServer`].
1648pub struct RawServiceClient<
1649    const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1650    const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1651> {
1652    pub(crate) handle: session::RmwServiceClient,
1653    pub(crate) reply_buffer: [u8; REPLY_BUF],
1654    pub(crate) _phantom_req: PhantomData<[u8; REQ_BUF]>,
1655}
1656
1657impl<const REQ_BUF: usize, const REPLY_BUF: usize> RawServiceClient<REQ_BUF, REPLY_BUF> {
1658    /// Phase 122.3.c.6.e — register a `Waker` that fires when the
1659    /// reply to a previously-sent request lands.
1660    pub fn register_waker(&self, waker: &core::task::Waker) {
1661        use nros_rmw::ServiceClientTrait;
1662        self.handle.register_waker(waker);
1663    }
1664
1665    /// Construct from a backend-allocated handle. Same audience as
1666    /// [`RawServiceServer::new`].
1667    pub fn new(handle: session::RmwServiceClient) -> Self {
1668        Self {
1669            handle,
1670            reply_buffer: [0u8; REPLY_BUF],
1671            _phantom_req: PhantomData,
1672        }
1673    }
1674
1675    /// Send a raw CDR request. Non-blocking; the reply arrives via
1676    /// [`try_recv_reply_raw`](Self::try_recv_reply_raw).
1677    pub fn send_request_raw(&mut self, request: &[u8]) -> Result<(), NodeError> {
1678        self.handle
1679            .send_request_raw(request)
1680            .map_err(|_| NodeError::ServiceRequestFailed)
1681    }
1682
1683    /// Phase 124.G.3 — graph-aware "is the matching server up?"
1684    /// probe. Mirrors [`Client::server_available`] for the raw API.
1685    pub fn server_available(&self) -> Result<bool, NodeError> {
1686        use nros_rmw::ServiceClientTrait;
1687        self.handle.server_available().map_err(NodeError::Transport)
1688    }
1689
1690    /// Try to receive a reply (non-blocking). Returns
1691    /// `Ok(Some(len))` with the reply length on success; bytes
1692    /// live in [`reply_buffer`](Self::reply_buffer) until the next
1693    /// call.
1694    pub fn try_recv_reply_raw(&mut self) -> Result<Option<usize>, NodeError> {
1695        self.handle
1696            .try_recv_reply_raw(&mut self.reply_buffer)
1697            .map_err(|_| NodeError::Transport(TransportError::ServiceRequestFailed))
1698    }
1699
1700    /// Borrow the inline reply buffer. Valid after a successful
1701    /// [`try_recv_reply_raw`](Self::try_recv_reply_raw) call.
1702    pub fn reply_buffer(&self) -> &[u8] {
1703        &self.reply_buffer
1704    }
1705}
1706
1707/// Read-only view into a [`RawSubscription`]'s receive buffer.
1708///
1709/// `!Send + !Sync`: cannot cross `.await` or threads. Drop releases
1710/// any backend lock + lets the next message advance.
1711///
1712/// Two backings, selected at compile time by the `rmw-lending` feature:
1713/// the no-lending variant points at `RawSubscription::buffer` (filled by
1714/// `try_recv_raw`'s memcpy); the lending variant holds the backend's
1715/// own [`SlotBorrowing::View`](nros_rmw::SlotBorrowing::View) — zero
1716/// copies on the receive path, with the backend's Drop taking care of
1717/// releasing the buffer lock.
1718#[cfg(not(feature = "rmw-lending"))]
1719pub struct RecvView<'a> {
1720    bytes: &'a [u8],
1721    _marker: core::marker::PhantomData<*const ()>,
1722}
1723
1724#[cfg(feature = "rmw-lending")]
1725pub struct RecvView<'a> {
1726    /// `Option` for symmetry with `PublishLoan::backend_slot`. Always
1727    /// `Some(_)` until the view is dropped.
1728    view: Option<<session::RmwSubscriber as nros_rmw::SlotBorrowing>::View<'a>>,
1729    _marker: core::marker::PhantomData<*const ()>,
1730}
1731
1732#[cfg(not(feature = "rmw-lending"))]
1733impl<'a> core::ops::Deref for RecvView<'a> {
1734    type Target = [u8];
1735    fn deref(&self) -> &[u8] {
1736        self.bytes
1737    }
1738}
1739
1740#[cfg(feature = "rmw-lending")]
1741impl<'a> core::ops::Deref for RecvView<'a> {
1742    type Target = [u8];
1743    fn deref(&self) -> &[u8] {
1744        // SAFETY-invariant: `view` is `Some` for the whole life of a
1745        // `RecvView` — only `Drop` takes it, after which no method (incl.
1746        // this `deref`) is reachable.
1747        self.view
1748            .as_ref()
1749            .expect("RecvView accessed after drop")
1750            .as_ref()
1751    }
1752}
1753
1754#[cfg(not(feature = "rmw-lending"))]
1755impl<'a> AsRef<[u8]> for RecvView<'a> {
1756    fn as_ref(&self) -> &[u8] {
1757        self.bytes
1758    }
1759}
1760
1761#[cfg(feature = "rmw-lending")]
1762impl<'a> AsRef<[u8]> for RecvView<'a> {
1763    fn as_ref(&self) -> &[u8] {
1764        // SAFETY-invariant: `view` is `Some` until `Drop`; see the `Deref`
1765        // impl above.
1766        self.view
1767            .as_ref()
1768            .expect("RecvView accessed after drop")
1769            .as_ref()
1770    }
1771}
1772
1773#[cfg(feature = "stream")]
1774impl<M: RosMessage + Unpin, const RX_BUF: usize> futures_core::Stream for Subscription<M, RX_BUF> {
1775    type Item = Result<M, NodeError>;
1776
1777    fn poll_next(
1778        self: core::pin::Pin<&mut Self>,
1779        cx: &mut core::task::Context<'_>,
1780    ) -> core::task::Poll<Option<Self::Item>> {
1781        let this = self.get_mut();
1782        // Register-then-check: see Subscription::recv for rationale.
1783        this.handle.register_waker(cx.waker());
1784        match this.try_recv() {
1785            Ok(Some(msg)) => core::task::Poll::Ready(Some(Ok(msg))),
1786            Ok(None) => core::task::Poll::Pending,
1787            Err(e) => core::task::Poll::Ready(Some(Err(e))),
1788        }
1789    }
1790}
1791
1792// ============================================================================
1793// EmbeddedServiceServer
1794// ============================================================================
1795
1796/// Typed service server handle with internal buffers.
1797pub struct EmbeddedServiceServer<
1798    Svc: RosService,
1799    const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1800    const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1801> {
1802    pub(crate) handle: session::RmwServiceServer,
1803    pub(crate) req_buffer: [u8; REQ_BUF],
1804    pub(crate) reply_buffer: [u8; REPLY_BUF],
1805    pub(crate) _phantom: PhantomData<Svc>,
1806}
1807
1808impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1809    EmbeddedServiceServer<Svc, REQ_BUF, REPLY_BUF>
1810{
1811    /// Handle an incoming service request.
1812    ///
1813    /// Returns `Ok(true)` if a request was handled, `Ok(false)` if none available.
1814    pub fn handle_request(
1815        &mut self,
1816        handler: impl FnOnce(&Svc::Request) -> Svc::Reply,
1817    ) -> Result<bool, NodeError> {
1818        self.handle
1819            .handle_request::<Svc>(&mut self.req_buffer, &mut self.reply_buffer, handler)
1820            .map_err(|_| NodeError::ServiceReplyFailed)
1821    }
1822
1823    /// Handle a request with a heap-allocated reply (for large response types).
1824    ///
1825    /// Used by parameter services and lifecycle services (large response structs
1826    /// that overflow the stack). Returns `Ok(true)` if a request was handled,
1827    /// `Ok(false)` if none available.
1828    #[cfg(any(feature = "param-services", feature = "lifecycle-services"))]
1829    pub fn handle_request_boxed(
1830        &mut self,
1831        handler: impl FnOnce(&Svc::Request) -> alloc::boxed::Box<Svc::Reply>,
1832    ) -> Result<bool, NodeError> {
1833        self.handle
1834            .handle_request_boxed::<Svc>(&mut self.req_buffer, &mut self.reply_buffer, handler)
1835            .map_err(|_| NodeError::ServiceReplyFailed)
1836    }
1837
1838    /// Check if a request is available.
1839    pub fn has_request(&self) -> bool {
1840        self.handle.has_request()
1841    }
1842}
1843
1844// ============================================================================
1845// ServiceClientCallback (RFC-0041, Phase 239.1)
1846// ============================================================================
1847
1848/// Send handle for a **callback-based** typed service client.
1849///
1850/// Returned by `create_client_with_callback`: the reply is delivered to the
1851/// registered closure at `spin_once` (no `Promise` poll). This handle only
1852/// **sends** — it holds a `*mut` to the arena entry's
1853/// [`ServiceClientSendHeader`](super::arena::ServiceClientSendHeader) (pinned in
1854/// the executor arena, like a guard-condition flag), so a single outstanding
1855/// request is gated by `hdr.pending`.
1856///
1857/// # Safety / lifetime
1858/// Valid only while the owning executor lives (the arena backs the header). Do
1859/// not use after the executor is dropped.
1860pub struct ServiceClientCallback<
1861    Svc: RosService,
1862    const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1863    const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1864> {
1865    hdr: *mut super::arena::ServiceClientSendHeader<REPLY_BUF>,
1866    _phantom: PhantomData<Svc>,
1867}
1868
1869impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1870    ServiceClientCallback<Svc, REQ_BUF, REPLY_BUF>
1871{
1872    /// Wrap an arena-resident send header. `hdr` must point at a live
1873    /// `ServiceClientCallbackEntry`'s header for the executor's lifetime.
1874    pub(crate) fn new(hdr: *mut super::arena::ServiceClientSendHeader<REPLY_BUF>) -> Self {
1875        Self {
1876            hdr,
1877            _phantom: PhantomData,
1878        }
1879    }
1880
1881    /// Send a typed request. The reply is delivered to the registered callback
1882    /// at a later `spin_once`. Returns `RequestInFlight` if a prior request has
1883    /// not yet been answered (single outstanding request).
1884    pub fn call(&mut self, request: &Svc::Request) -> Result<(), NodeError> {
1885        let hdr = unsafe { &mut *self.hdr };
1886        if hdr.pending {
1887            return Err(NodeError::RequestInFlight);
1888        }
1889        let mut buf = [0u8; REQ_BUF];
1890        let mut writer =
1891            CdrWriter::new_with_header(&mut buf).map_err(|_| NodeError::BufferTooSmall)?;
1892        request
1893            .serialize(&mut writer)
1894            .map_err(|_| NodeError::Serialization)?;
1895        let req_len = writer.position();
1896        hdr.handle
1897            .send_request_raw(&buf[..req_len])
1898            .map_err(|_| NodeError::ServiceRequestFailed)?;
1899        hdr.pending = true;
1900        Ok(())
1901    }
1902}
1903
1904// ============================================================================
1905// ActionClientCallback (RFC-0041, Phase 239.2)
1906// ============================================================================
1907
1908/// Send handle for a **callback-based** typed action client.
1909///
1910/// Returned by `create_action_client_with_callbacks`: goal-response, feedback,
1911/// and result are delivered to the registered closures at `spin_once` (no
1912/// `Promise` poll). This handle only **sends** — it holds a `*mut` to the arena
1913/// entry's [`ActionClientCore`](super::action_core::ActionClientCore) (offset 0,
1914/// pinned in the executor arena, like a guard-condition flag).
1915///
1916/// # Safety / lifetime
1917/// Valid only while the owning executor lives.
1918pub struct ActionClientCallback<
1919    A: RosAction,
1920    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1921    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1922    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1923> {
1924    core: *mut super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
1925    _phantom: PhantomData<A>,
1926}
1927
1928impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
1929    ActionClientCallback<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
1930{
1931    /// Wrap an arena-resident core. `core` must point at a live
1932    /// `ActionClientCallbackEntry`'s core for the executor's lifetime.
1933    pub(crate) fn new(
1934        core: *mut super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
1935    ) -> Self {
1936        Self {
1937            core,
1938            _phantom: PhantomData,
1939        }
1940    }
1941
1942    /// Send a typed goal. Returns its `GoalId`; acceptance arrives via the
1943    /// registered goal-response callback.
1944    pub fn send_goal(&mut self, goal: &A::Goal) -> Result<nros_core::GoalId, NodeError> {
1945        let core = unsafe { &mut *self.core };
1946        let mut buf = [0u8; GOAL_BUF];
1947        let mut writer =
1948            CdrWriter::new_with_header(&mut buf).map_err(|_| NodeError::BufferTooSmall)?;
1949        goal.serialize(&mut writer)
1950            .map_err(|_| NodeError::Serialization)?;
1951        let len = writer.position();
1952        core.send_goal_raw(&buf[..len])
1953    }
1954
1955    /// Request the result for `goal_id`; the result arrives via the registered
1956    /// result callback.
1957    pub fn get_result(&mut self, goal_id: &nros_core::GoalId) -> Result<(), NodeError> {
1958        let core = unsafe { &mut *self.core };
1959        core.send_get_result_request(goal_id)
1960    }
1961}
1962
1963// ============================================================================
1964// EmbeddedServiceClient
1965// ============================================================================
1966
1967/// Typed service client handle with internal buffers.
1968pub struct EmbeddedServiceClient<
1969    Svc: RosService,
1970    const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1971    const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1972> {
1973    pub(crate) handle: session::RmwServiceClient,
1974    pub(crate) req_buffer: [u8; REQ_BUF],
1975    pub(crate) reply_buffer: [u8; REPLY_BUF],
1976    /// Phase 84.D3: set after a successful `send_request`, cleared on a
1977    /// successful `Promise::try_recv`. Guards against "drop Promise
1978    /// without awaiting, then `call()` again" which would otherwise
1979    /// deliver the stale reply to the new caller.
1980    pub(crate) in_flight: bool,
1981    pub(crate) _phantom: PhantomData<Svc>,
1982}
1983
1984impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1985    EmbeddedServiceClient<Svc, REQ_BUF, REPLY_BUF>
1986{
1987    /// Call the service (non-blocking). Returns a [`Promise`] that can be polled.
1988    ///
1989    /// Use with `Executor::spin_once()` to drive I/O while waiting:
1990    ///
1991    /// ```ignore
1992    /// let mut promise = client.call(&request)?;
1993    /// loop {
1994    ///     executor.spin_once(core::time::Duration::from_millis(10));
1995    ///     if let Some(reply) = promise.try_recv()? {
1996    ///         break;
1997    ///     }
1998    /// }
1999    /// ```
2000    ///
2001    /// # Errors
2002    ///
2003    /// Returns [`NodeError::RequestInFlight`] if a previous call's reply
2004    /// has not been received. This prevents the old hazard where dropping
2005    /// a [`Promise`] without awaiting its reply left the stale reply
2006    /// queued to land on the next [`call`](Self::call). Resolve by
2007    /// polling the existing promise to completion or calling
2008    /// [`reset_in_flight`](Self::reset_in_flight).
2009    pub fn call(&mut self, request: &Svc::Request) -> Result<Promise<'_, Svc::Reply>, NodeError> {
2010        if self.in_flight {
2011            return Err(NodeError::RequestInFlight);
2012        }
2013
2014        // Serialize request into req_buffer
2015        let mut writer = CdrWriter::new_with_header(&mut self.req_buffer)
2016            .map_err(|_| NodeError::BufferTooSmall)?;
2017        request
2018            .serialize(&mut writer)
2019            .map_err(|_| NodeError::Serialization)?;
2020        let req_len = writer.position();
2021
2022        // Send the request (non-blocking)
2023        self.handle
2024            .send_request_raw(&self.req_buffer[..req_len])
2025            .map_err(|_| NodeError::ServiceRequestFailed)?;
2026
2027        self.in_flight = true;
2028
2029        Ok(Promise {
2030            handle: &mut self.handle,
2031            reply_buffer: &mut self.reply_buffer,
2032            parse: cdr_deserialize_reply::<Svc>,
2033            in_flight_flag: &mut self.in_flight,
2034        })
2035    }
2036
2037    /// Explicitly clear the in-flight flag (Phase 84.D3).
2038    ///
2039    /// Call this if a previous [`Promise`] was dropped without completing
2040    /// and you want to abandon the pending reply. The next
2041    /// [`call`](Self::call) will proceed but may still observe the stale
2042    /// reply if one is in the transport's queue — callers that need strict
2043    /// correctness should drain / ignore one extra `try_recv` first.
2044    pub fn reset_in_flight(&mut self) {
2045        self.in_flight = false;
2046    }
2047
2048    /// Block until at least one matching service server is discoverable on
2049    /// the network, or `timeout` elapses.
2050    ///
2051    /// Returns `Ok(true)` if a matching server reported back inside the
2052    /// budget; `Ok(false)` on timeout (no server visible). Mirrors
2053    /// `rclcpp::ClientBase::wait_for_service` and
2054    /// `rclpy.client.Client.wait_for_service`.
2055    ///
2056    /// On the Zenoh backend this issues a `z_liveliness_get` against the
2057    /// matching server's wildcarded liveliness keyexpr; the executor is
2058    /// spun cooperatively while the query is in flight so other
2059    /// subscribers / timers continue to make progress. Backends without
2060    /// liveliness discovery answer `Ok(true)` immediately (default trait
2061    /// impl in `nros-rmw`), so the call is a no-op cost when discovery
2062    /// isn't supported.
2063    ///
2064    /// Recommended usage — gate the first `call()` on this:
2065    ///
2066    /// ```ignore
2067    /// let mut client = node.create_client::<AddTwoInts>("/add_two_ints")?;
2068    /// if !client.wait_for_service(&mut executor, Duration::from_secs(5))? {
2069    ///     return Err(NodeError::Timeout);
2070    /// }
2071    /// let mut promise = client.call(&request)?;
2072    /// ```
2073    ///
2074    /// Once the server is observed, the result is latched: subsequent
2075    /// `service_is_ready` checks return `true` without another round
2076    /// trip. This matches `rclcpp`'s snapshot semantic — discovery isn't
2077    /// re-proven on every call.
2078    pub fn wait_for_service(
2079        &mut self,
2080        executor: &mut super::Executor,
2081        timeout: core::time::Duration,
2082    ) -> Result<bool, NodeError> {
2083        // Already proven once — don't re-query.
2084        if self.handle.is_server_ready() {
2085            return Ok(true);
2086        }
2087        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2088        let max_spins = (timeout.as_millis() as u64 / DEFAULT_SPIN_INTERVAL_MS).max(1);
2089        let mut budget = WaitBudget::new(max_spins, timeout);
2090        // Per-query budget. A liveliness_get is a single-shot probe of the
2091        // router's current token list; if the server hasn't declared its
2092        // token yet when our query arrives, the router replies "no
2093        // matching tokens" and the query terminates. We loop, re-issuing
2094        // shorter probes until either a matching token is observed or the
2095        // outer wall-clock budget expires. 1 s per probe balances "see
2096        // freshly-declared tokens quickly" against "burn fewer FFI
2097        // round-trips on a healthy network".
2098        const PROBE_TIMEOUT_MS: u32 = 1000;
2099        loop {
2100            self.handle
2101                .start_server_discovery(PROBE_TIMEOUT_MS)
2102                .map_err(|_| NodeError::ServiceRequestFailed)?;
2103            // Drain this probe to completion (token reply or empty FINAL).
2104            loop {
2105                executor.spin_once(spin_interval);
2106                match self
2107                    .handle
2108                    .poll_server_discovery()
2109                    .map_err(|_| NodeError::ServiceRequestFailed)?
2110                {
2111                    Some(true) => return Ok(true),
2112                    Some(false) => break, // probe finished empty — re-issue
2113                    None => {}            // still in flight
2114                }
2115                if !budget.tick() {
2116                    return Ok(false);
2117                }
2118            }
2119            if !budget.tick() {
2120                return Ok(false);
2121            }
2122        }
2123    }
2124
2125    /// Snapshot whether a matching service server is currently visible.
2126    ///
2127    /// Non-blocking. Matches `rclcpp::ClientBase::service_is_ready` and
2128    /// `rclpy.client.Client.service_is_ready`. Backends without liveliness
2129    /// discovery return `true` (assume always reachable).
2130    pub fn service_is_ready(&self) -> bool {
2131        self.handle.is_server_ready()
2132    }
2133
2134    /// Phase 124.C.3 — graph-aware server-availability probe.
2135    ///
2136    /// Returns `Ok(true)` / `Ok(false)` when the backend can answer
2137    /// (zenoh queryable interest, DDS built-in topic reader), or
2138    /// `Err(NodeError::Transport(Unsupported))` when it can't (XRCE
2139    /// agent without participant enumeration). Distinct from
2140    /// [`service_is_ready`](Self::service_is_ready) — that one
2141    /// collapses "no" and "don't know" into the same `false`.
2142    ///
2143    /// Used to gate the first `call_raw` so a startup-ordering race
2144    /// (client opens before server's discovery announcement lands)
2145    /// doesn't surface as a request-side timeout.
2146    pub fn server_available(&self) -> Result<bool, NodeError> {
2147        use nros_rmw::ServiceClientTrait;
2148        self.handle.server_available().map_err(NodeError::Transport)
2149    }
2150}
2151
2152// ============================================================================
2153// Promise
2154// ============================================================================
2155
2156/// A pending reply from a non-blocking service or action call.
2157///
2158/// Poll with [`try_recv()`](Promise::try_recv) to check for the reply.
2159/// Implements [`Future`](core::future::Future) for use with async executors.
2160pub struct Promise<'a, T> {
2161    pub(crate) handle: &'a mut session::RmwServiceClient,
2162    pub(crate) reply_buffer: &'a mut [u8],
2163    pub(crate) parse: fn(&[u8]) -> Result<T, NodeError>,
2164    /// Phase 84.D3: cleared on a successful `try_recv` so the client's
2165    /// next `call()` can proceed. If the `Promise` is dropped before the
2166    /// reply is consumed, the flag stays set — forcing the user to
2167    /// explicitly acknowledge the abandoned call via
2168    /// `reset_in_flight()`.
2169    pub(crate) in_flight_flag: &'a mut bool,
2170}
2171
2172impl<T> Promise<'_, T> {
2173    /// Try to receive the reply (non-blocking).
2174    ///
2175    /// Returns `Ok(Some(reply))` if the reply has arrived,
2176    /// `Ok(None)` if still pending.
2177    pub fn try_recv(&mut self) -> Result<Option<T>, NodeError> {
2178        // Phase 120: NoData (no reply yet) is the steady-state polling
2179        // condition — map to Ok(None) instead of ServiceRequestFailed.
2180        match match self.handle.try_recv_reply_raw(self.reply_buffer) {
2181            Ok(opt) => opt,
2182            Err(TransportError::NoData) => return Ok(None),
2183            Err(e) => return Err(NodeError::Transport(e)),
2184        } {
2185            Some(len) => {
2186                let reply = (self.parse)(&self.reply_buffer[..len])?;
2187                // Reply consumed — allow the client to issue another call.
2188                *self.in_flight_flag = false;
2189                Ok(Some(reply))
2190            }
2191            None => Ok(None),
2192        }
2193    }
2194}
2195
2196impl<T> Promise<'_, T> {
2197    /// Block until the reply arrives, spinning the executor.
2198    ///
2199    /// Internally calls `executor.spin_once()` in a loop until the reply
2200    /// arrives or `timeout_ms` is exhausted. This is equivalent to the
2201    /// manual spin+poll loop pattern but more ergonomic for simple use cases.
2202    ///
2203    /// No borrow conflict: `executor` and `self` (which borrows the standalone
2204    /// client) are disjoint objects.
2205    ///
2206    /// # Errors
2207    ///
2208    /// Returns [`NodeError::Timeout`] if the reply does not arrive within
2209    /// `timeout_ms` milliseconds.
2210    pub fn wait(
2211        &mut self,
2212        executor: &mut super::Executor,
2213        timeout: core::time::Duration,
2214    ) -> Result<T, NodeError> {
2215        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2216        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2217        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2218        let mut budget = WaitBudget::new(max_spins, timeout);
2219        // Always spin at least once so a zero-timeout still polls.
2220        loop {
2221            executor.spin_once(spin_interval);
2222            if let Some(result) = self.try_recv()? {
2223                return Ok(result);
2224            }
2225            if !budget.tick() {
2226                return Err(NodeError::Timeout);
2227            }
2228        }
2229    }
2230}
2231
2232impl<T> core::future::Future for Promise<'_, T> {
2233    type Output = Result<T, NodeError>;
2234
2235    fn poll(
2236        self: core::pin::Pin<&mut Self>,
2237        cx: &mut core::task::Context<'_>,
2238    ) -> core::task::Poll<Self::Output> {
2239        let this = self.get_mut();
2240        // Register-then-check (closes the race where a reply lands
2241        // between try_recv returning None and the waker registering).
2242        this.handle.register_waker(cx.waker());
2243        match this.try_recv() {
2244            Ok(Some(reply)) => core::task::Poll::Ready(Ok(reply)),
2245            Ok(None) => core::task::Poll::Pending,
2246            Err(e) => core::task::Poll::Ready(Err(e)),
2247        }
2248    }
2249}
2250
2251impl<T> Promise<'_, T> {
2252    /// Async poll-until-ready helper for environments where the
2253    /// backend's `register_waker` path can't deliver a wake.
2254    ///
2255    /// The plain `.await` (via the `Future` impl above) parks the
2256    /// caller until the backend's listener fires the stored Waker.
2257    /// That works on `std` builds where the backend has a
2258    /// background-thread pool actively polling its listener tasks,
2259    /// but it deadlocks on the `nostd-runtime` DDS path (and any
2260    /// other cooperative backend whose listener future only runs
2261    /// when something actively drives the runtime). The 160.B.1
2262    /// trace pinned this to DDS's nostd runtime: listener
2263    /// futures only advance inside `runtime.block_on(...)`, and the
2264    /// parked `.await` consumer never issues such a call.
2265    ///
2266    /// `poll_until_ready(yield_fn)` instead actively polls
2267    /// `try_recv()` on each turn and awaits the caller-supplied
2268    /// yield future between attempts. The yield gives the executor
2269    /// a chance to run other ready tasks (typically a `spin_task`
2270    /// that drives the backend runtime via `executor.spin_once()`),
2271    /// which in turn pumps the listener future. On the `std` path
2272    /// this devolves to a fast poll-then-yield loop with no
2273    /// correctness penalty; on `nostd-runtime` it's the only shape
2274    /// that completes.
2275    ///
2276    /// # Example
2277    ///
2278    /// ```ignore
2279    /// let reply = client
2280    ///     .call(&req)?
2281    ///     .poll_until_ready(|| embassy_time::Timer::after_millis(5))
2282    ///     .await?;
2283    /// ```
2284    pub async fn poll_until_ready<F, Fut>(&mut self, mut yield_fn: F) -> Result<T, NodeError>
2285    where
2286        F: FnMut() -> Fut,
2287        Fut: core::future::Future<Output = ()>,
2288    {
2289        loop {
2290            match self.try_recv()? {
2291                Some(reply) => return Ok(reply),
2292                None => yield_fn().await,
2293            }
2294        }
2295    }
2296}
2297
2298/// Deserialize a CDR-encoded service reply.
2299fn cdr_deserialize_reply<Svc: RosService>(data: &[u8]) -> Result<Svc::Reply, NodeError> {
2300    let mut reader = CdrReader::new_with_header(data).map_err(|_| NodeError::Deserialization)?;
2301    Svc::Reply::deserialize(&mut reader).map_err(|_| NodeError::Deserialization)
2302}
2303
2304// ============================================================================
2305// Action types
2306// ============================================================================
2307
2308/// Active goal tracking for action server.
2309#[derive(Clone)]
2310pub struct ActiveGoal<A: RosAction> {
2311    /// Goal ID.
2312    pub goal_id: nros_core::GoalId,
2313    /// Current status.
2314    pub status: nros_core::GoalStatus,
2315    /// The goal data.
2316    pub goal: A::Goal,
2317}
2318
2319/// Completed goal with result.
2320pub struct CompletedGoal<A: RosAction> {
2321    /// Goal ID.
2322    pub goal_id: nros_core::GoalId,
2323    /// Final status.
2324    pub status: nros_core::GoalStatus,
2325    /// The result data.
2326    pub result: A::Result,
2327}
2328
2329// ============================================================================
2330// ActionServer
2331// ============================================================================
2332
2333/// Typed action server with goal state management.
2334///
2335/// Wraps [`ActionServerCore`](super::action_core::ActionServerCore) for
2336/// raw-bytes protocol handling, adding typed goal/feedback/result
2337/// serialization at the boundary.
2338pub struct ActionServer<
2339    A: RosAction,
2340    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2341    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2342    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2343    const MAX_GOALS: usize = 4,
2344> {
2345    pub(crate) core:
2346        super::action_core::ActionServerCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>,
2347    /// Typed goal data parallel to `core.active_goals`.
2348    pub(crate) typed_goals: heapless::Vec<A::Goal, MAX_GOALS>,
2349    /// Completed goals with typed results.
2350    pub(crate) completed_goals: heapless::Vec<CompletedGoal<A>, MAX_GOALS>,
2351}
2352
2353impl<
2354    A: RosAction,
2355    const GOAL_BUF: usize,
2356    const RESULT_BUF: usize,
2357    const FEEDBACK_BUF: usize,
2358    const MAX_GOALS: usize,
2359> ActionServer<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>
2360{
2361    /// Try to accept a new goal.
2362    ///
2363    /// Checks for incoming send_goal requests. If one is available, calls the
2364    /// handler to decide acceptance. Returns the goal ID if accepted.
2365    pub fn try_accept_goal(
2366        &mut self,
2367        goal_handler: impl FnOnce(&nros_core::GoalId, &A::Goal) -> nros_core::GoalResponse,
2368    ) -> Result<Option<nros_core::GoalId>, NodeError>
2369    where
2370        A::Goal: Clone,
2371    {
2372        let raw_req = self.core.try_recv_goal_request()?;
2373        let raw_req = match raw_req {
2374            Some(r) => r,
2375            None => return Ok(None),
2376        };
2377
2378        // Deserialize the goal from the buffer at the offset captured
2379        // by the core (DDS prepends an 8-byte seq prefix; zenoh uses 0).
2380        let buf = self.core.goal_buffer();
2381        let start = raw_req.data_offset;
2382        let end = start + raw_req.data_len;
2383        let mut reader = CdrReader::new_with_header(&buf[start..end])
2384            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2385        // Skip past the GoalId — a fixed `uint8[16]` UUID, no length prefix
2386        // (ROS 2 `unique_identifier_msgs/UUID`; see action_core::read_goal_id).
2387        for _ in 0..GOAL_UUID_SIZE {
2388            let _ = reader.read_u8();
2389        }
2390        let goal = A::Goal::deserialize(&mut reader)
2391            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2392
2393        let response = goal_handler(&raw_req.goal_id, &goal);
2394        let accepted = response.is_accepted();
2395
2396        if accepted {
2397            self.core
2398                .accept_goal(raw_req.goal_id, raw_req.sequence_number)?;
2399            let _ = self.typed_goals.push(goal);
2400            Ok(Some(raw_req.goal_id))
2401        } else {
2402            self.core.reject_goal(raw_req.sequence_number)?;
2403            Ok(None)
2404        }
2405    }
2406
2407    /// Publish feedback for a goal.
2408    pub fn publish_feedback(
2409        &mut self,
2410        goal_id: &nros_core::GoalId,
2411        feedback: &A::Feedback,
2412    ) -> Result<(), NodeError> {
2413        // Serialize feedback into a temp buffer (without CDR header or GoalId)
2414        let mut tmp = [0u8; FEEDBACK_BUF];
2415        let mut writer = CdrWriter::new(&mut tmp);
2416        feedback
2417            .serialize(&mut writer)
2418            .map_err(|_| NodeError::Serialization)?;
2419        let feedback_len = writer.position();
2420
2421        self.core
2422            .publish_feedback_raw(goal_id, &tmp[..feedback_len])
2423    }
2424
2425    /// Set a goal's status.
2426    ///
2427    /// Also publishes the updated `GoalStatusArray` on the status topic.
2428    pub fn set_goal_status(&mut self, goal_id: &nros_core::GoalId, status: nros_core::GoalStatus) {
2429        self.core.set_goal_status(goal_id, status);
2430    }
2431
2432    /// Complete a goal and store the result.
2433    ///
2434    /// Also publishes the updated `GoalStatusArray` on the status topic.
2435    pub fn complete_goal(
2436        &mut self,
2437        goal_id: &nros_core::GoalId,
2438        status: nros_core::GoalStatus,
2439        result: A::Result,
2440    ) {
2441        // Serialize result for the core slab
2442        let mut tmp = [0u8; RESULT_BUF];
2443        let mut writer = CdrWriter::new(&mut tmp);
2444        let result_len = match result.serialize(&mut writer) {
2445            Ok(()) => writer.position(),
2446            Err(_) => 0,
2447        };
2448
2449        // Remove typed goal parallel to core's active_goals removal
2450        if let Some(pos) = self
2451            .core
2452            .active_goals()
2453            .iter()
2454            .position(|g| g.goal_id.uuid == goal_id.uuid)
2455        {
2456            self.typed_goals.swap_remove(pos);
2457        }
2458
2459        self.core
2460            .complete_goal_raw(goal_id, status, &tmp[..result_len]);
2461
2462        let _ = self.completed_goals.push(CompletedGoal {
2463            goal_id: *goal_id,
2464            status,
2465            result,
2466        });
2467    }
2468
2469    /// Try to handle a cancel_goal request.
2470    pub fn try_handle_cancel(
2471        &mut self,
2472        cancel_handler: impl FnOnce(
2473            &nros_core::GoalId,
2474            nros_core::GoalStatus,
2475        ) -> nros_core::CancelResponse,
2476    ) -> Result<Option<(nros_core::GoalId, nros_core::CancelResponse)>, NodeError> {
2477        self.core.try_handle_cancel(cancel_handler)
2478    }
2479
2480    /// Try to handle a get_result request.
2481    pub fn try_handle_get_result(&mut self) -> Result<Option<nros_core::GoalId>, NodeError>
2482    where
2483        A::Result: Clone + Default,
2484    {
2485        // Serialize default result for non-completed goals
2486        let mut default_buf = [0u8; RESULT_BUF];
2487        let mut writer = CdrWriter::new(&mut default_buf);
2488        let default_len = match A::Result::default().serialize(&mut writer) {
2489            Ok(()) => writer.position(),
2490            Err(_) => 0,
2491        };
2492
2493        self.core
2494            .try_handle_get_result_raw(&default_buf[..default_len])
2495    }
2496
2497    /// Drain all pending server-side work in one call.
2498    ///
2499    /// Calls `try_accept_goal`, `try_handle_cancel`, and
2500    /// `try_handle_get_result` in sequence. Invoke this on every
2501    /// `spin_once` iteration in manual-poll code — otherwise clients
2502    /// will hang on `get_result` because `create_action_server()`
2503    /// servers are not arena-registered.
2504    ///
2505    /// The two callbacks may be called zero or one times per `poll()`:
2506    ///   * `on_goal` fires when a new goal arrives.
2507    ///   * `on_cancel` fires when a cancel request arrives.
2508    ///
2509    /// Get-result requests are drained unconditionally (no callback
2510    /// needed — the result is pulled from the goal's stored state).
2511    ///
2512    /// # Example
2513    /// ```ignore
2514    /// let mut server = node.create_action_server::<Fibonacci>("/fibonacci")?;
2515    /// loop {
2516    ///     executor.spin_once(Duration::from_millis(10));
2517    ///     server.poll(
2518    ///         |id, goal| {
2519    ///             /* accept or reject based on `goal` */
2520    ///             GoalResponse::AcceptAndExecute
2521    ///         },
2522    ///         |_id, _status| CancelResponse::Accept,
2523    ///     )?;
2524    /// }
2525    /// ```
2526    pub fn poll<GF, CF>(&mut self, mut on_goal: GF, mut on_cancel: CF) -> Result<(), NodeError>
2527    where
2528        GF: FnMut(&nros_core::GoalId, &A::Goal) -> nros_core::GoalResponse,
2529        CF: FnMut(&nros_core::GoalId, nros_core::GoalStatus) -> nros_core::CancelResponse,
2530        A::Goal: Clone,
2531        A::Result: Clone + Default,
2532    {
2533        let _ = self.try_accept_goal(|id, goal| on_goal(id, goal))?;
2534        let _ = self.try_handle_cancel(|id, status| on_cancel(id, status))?;
2535        let _ = self.try_handle_get_result()?;
2536        Ok(())
2537    }
2538
2539    /// Get a reference to an active goal.
2540    pub fn get_goal(&self, goal_id: &nros_core::GoalId) -> Option<ActiveGoal<A>>
2541    where
2542        A::Goal: Clone,
2543    {
2544        self.core
2545            .active_goals()
2546            .iter()
2547            .enumerate()
2548            .find(|(_, g)| g.goal_id.uuid == goal_id.uuid)
2549            .map(|(i, raw)| ActiveGoal {
2550                goal_id: raw.goal_id,
2551                status: raw.status,
2552                goal: self.typed_goals[i].clone(),
2553            })
2554    }
2555
2556    /// Get the number of active goals.
2557    pub fn active_goal_count(&self) -> usize {
2558        self.core.active_goal_count()
2559    }
2560}
2561
2562// ============================================================================
2563// ActionClient
2564// ============================================================================
2565
2566/// Typed action client handle.
2567///
2568/// Wraps [`ActionClientCore`](super::action_core::ActionClientCore) for
2569/// raw-bytes protocol handling, adding typed goal/feedback/result
2570/// serialization at the boundary.
2571pub struct ActionClient<
2572    A: RosAction,
2573    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2574    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2575    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2576> {
2577    pub(crate) core: super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2578    pub(crate) _phantom: PhantomData<A>,
2579}
2580
2581impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2582    ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2583{
2584    /// Send a goal (non-blocking). Returns the goal ID and a [`Promise`] for acceptance.
2585    ///
2586    /// The promise resolves to `true` if accepted, `false` if rejected.
2587    pub fn send_goal(
2588        &mut self,
2589        goal: &A::Goal,
2590    ) -> Result<(nros_core::GoalId, Promise<'_, bool>), NodeError> {
2591        if self.core.in_flight_send_goal {
2592            return Err(NodeError::RequestInFlight);
2593        }
2594
2595        // Serialize goal into a temp buffer (without CDR header or GoalId)
2596        let mut tmp = [0u8; GOAL_BUF];
2597        let mut writer = CdrWriter::new(&mut tmp);
2598        goal.serialize(&mut writer)
2599            .map_err(|_| NodeError::Serialization)?;
2600        let goal_len = writer.position();
2601
2602        let goal_id = self.core.send_goal_raw(&tmp[..goal_len])?;
2603        self.core.in_flight_send_goal = true;
2604
2605        Ok((
2606            goal_id,
2607            Promise {
2608                handle: &mut self.core.send_goal_client,
2609                reply_buffer: &mut self.core.result_buffer,
2610                parse: parse_goal_accepted,
2611                in_flight_flag: &mut self.core.in_flight_send_goal,
2612            },
2613        ))
2614    }
2615
2616    /// Try to receive feedback (non-blocking).
2617    pub fn try_recv_feedback(
2618        &mut self,
2619    ) -> Result<Option<(nros_core::GoalId, A::Feedback)>, NodeError> {
2620        let (goal_id, len) = match self.core.try_recv_feedback_raw()? {
2621            Some(v) => v,
2622            None => return Ok(None),
2623        };
2624
2625        // Deserialize feedback from the core's feedback buffer (after GoalId)
2626        let mut reader = CdrReader::new_with_header(&self.core.feedback_buffer[..len])
2627            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2628        // Skip GoalId — a fixed `uint8[16]` UUID, no length prefix
2629        // (ROS 2 `unique_identifier_msgs/UUID`; see action_core::read_goal_id).
2630        for _ in 0..GOAL_UUID_SIZE {
2631            let _ = reader.read_u8();
2632        }
2633
2634        let feedback = A::Feedback::deserialize(&mut reader)
2635            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2636
2637        Ok(Some((goal_id, feedback)))
2638    }
2639
2640    /// Cancel a goal (non-blocking). Returns a [`Promise`] for the cancel response.
2641    pub fn cancel_goal(
2642        &mut self,
2643        goal_id: &nros_core::GoalId,
2644    ) -> Result<Promise<'_, nros_core::CancelResponse>, NodeError> {
2645        if self.core.in_flight_cancel {
2646            return Err(NodeError::RequestInFlight);
2647        }
2648        self.core.send_cancel_request(goal_id)?;
2649        self.core.in_flight_cancel = true;
2650
2651        Ok(Promise {
2652            handle: &mut self.core.cancel_goal_client,
2653            reply_buffer: &mut self.core.result_buffer,
2654            parse: parse_cancel_response,
2655            in_flight_flag: &mut self.core.in_flight_cancel,
2656        })
2657    }
2658
2659    /// Get the result of a completed goal (non-blocking). Returns a [`Promise`].
2660    pub fn get_result(
2661        &mut self,
2662        goal_id: &nros_core::GoalId,
2663    ) -> Result<Promise<'_, (nros_core::GoalStatus, A::Result)>, NodeError> {
2664        if self.core.in_flight_get_result {
2665            return Err(NodeError::RequestInFlight);
2666        }
2667        self.core.send_get_result_request(goal_id)?;
2668        self.core.in_flight_get_result = true;
2669
2670        Ok(Promise {
2671            handle: &mut self.core.get_result_client,
2672            reply_buffer: &mut self.core.result_buffer,
2673            parse: parse_result_response::<A>,
2674            in_flight_flag: &mut self.core.in_flight_get_result,
2675        })
2676    }
2677
2678    /// Explicitly clear the "send_goal reply in flight" flag (Phase 84.D3).
2679    pub fn reset_send_goal_in_flight(&mut self) {
2680        self.core.in_flight_send_goal = false;
2681    }
2682
2683    /// Block until the action server's send-goal queryable is discoverable
2684    /// on the network, or `timeout` elapses.
2685    ///
2686    /// Returns `Ok(true)` on discovery, `Ok(false)` on timeout. Mirrors
2687    /// `rclcpp_action::Client::wait_for_action_server`.
2688    ///
2689    /// Implementation: probes the action's `send_goal` service-server
2690    /// liveliness keyexpr via the same primitive as
2691    /// [`Client::wait_for_service`]. Once that service is reachable the
2692    /// remaining four action entities (cancel queryable + feedback /
2693    /// status / result publishers) are also reachable in practice — they
2694    /// were declared by the same server in one batch.
2695    pub fn wait_for_action_server(
2696        &mut self,
2697        executor: &mut super::Executor,
2698        timeout: core::time::Duration,
2699    ) -> Result<bool, NodeError> {
2700        if self.core.send_goal_client.is_server_ready() {
2701            return Ok(true);
2702        }
2703        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2704        let max_spins = (timeout.as_millis() as u64 / DEFAULT_SPIN_INTERVAL_MS).max(1);
2705        let mut budget = WaitBudget::new(max_spins, timeout);
2706        // See `Client::wait_for_service` for the re-probe rationale: a
2707        // single liveliness_get samples the router's current token list
2708        // and terminates; we loop with shorter per-probe timeouts so the
2709        // outer budget covers servers that come up after we start
2710        // waiting.
2711        const PROBE_TIMEOUT_MS: u32 = 1000;
2712        loop {
2713            self.core
2714                .send_goal_client
2715                .start_server_discovery(PROBE_TIMEOUT_MS)
2716                .map_err(|_| NodeError::ServiceRequestFailed)?;
2717            loop {
2718                executor.spin_once(spin_interval);
2719                match self
2720                    .core
2721                    .send_goal_client
2722                    .poll_server_discovery()
2723                    .map_err(|_| NodeError::ServiceRequestFailed)?
2724                {
2725                    Some(true) => return Ok(true),
2726                    Some(false) => break,
2727                    None => {}
2728                }
2729                if !budget.tick() {
2730                    return Ok(false);
2731                }
2732            }
2733            if !budget.tick() {
2734                return Ok(false);
2735            }
2736        }
2737    }
2738
2739    /// Snapshot whether the action server is currently visible.
2740    /// Mirrors `rclcpp_action::Client::action_server_is_ready`.
2741    pub fn action_server_is_ready(&self) -> bool {
2742        self.core.send_goal_client.is_server_ready()
2743    }
2744
2745    /// Explicitly clear the "cancel reply in flight" flag (Phase 84.D3).
2746    pub fn reset_cancel_in_flight(&mut self) {
2747        self.core.in_flight_cancel = false;
2748    }
2749
2750    /// Explicitly clear the "get_result reply in flight" flag (Phase 84.D3).
2751    pub fn reset_get_result_in_flight(&mut self) {
2752        self.core.in_flight_get_result = false;
2753    }
2754
2755    /// Create a feedback stream (receives feedback for all goals).
2756    ///
2757    /// The stream borrows `&mut self` exclusively. Drop it before calling
2758    /// `get_result()` or `cancel_goal()`.
2759    pub fn feedback_stream(&mut self) -> FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF> {
2760        FeedbackStream { client: self }
2761    }
2762
2763    /// Create a goal-filtered feedback stream.
2764    ///
2765    /// Only yields feedback for the given `goal_id`, returning `A::Feedback`
2766    /// directly (without the `GoalId` wrapper).
2767    pub fn feedback_stream_for(
2768        &mut self,
2769        goal_id: nros_core::GoalId,
2770    ) -> GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF> {
2771        GoalFeedbackStream {
2772            client: self,
2773            goal_id,
2774        }
2775    }
2776}
2777
2778// ============================================================================
2779// FeedbackStream
2780// ============================================================================
2781
2782/// A stream of feedback messages from an action server.
2783///
2784/// Created by [`ActionClient::feedback_stream()`]. Receives feedback for
2785/// all active goals. The stream never self-terminates — use combinators
2786/// like `take_while` or `break` to stop.
2787///
2788/// Three access modes:
2789/// - **Async (`Stream`)**: Enable the `stream` feature for
2790///   `futures_core::Stream` + `StreamExt` combinators
2791/// - **Async (no deps)**: Use `next()` in
2792///   `while let` loops (always available)
2793/// - **Sync**: Use [`wait_next()`](FeedbackStream::wait_next) which
2794///   drives the executor internally
2795pub struct FeedbackStream<
2796    'a,
2797    A: RosAction,
2798    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2799    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2800    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2801> {
2802    client: &'a mut ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2803}
2804
2805impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2806    FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2807{
2808    /// Async: wait for the next feedback message (no `futures` dependency needed).
2809    ///
2810    /// Requires a background task running `executor.spin_async()` to drive
2811    /// I/O. Returns `None` only on error.
2812    ///
2813    /// When the `stream` feature is enabled, prefer `StreamExt::next()` or
2814    /// `TryStreamExt::try_next()` for combinator support.
2815    ///
2816    /// # Example
2817    ///
2818    /// ```ignore
2819    /// let mut stream = client.feedback_stream();
2820    /// while let Some(result) = stream.recv().await {
2821    ///     let (goal_id, feedback) = result?;
2822    ///     // process feedback...
2823    /// }
2824    /// ```
2825    pub async fn recv(&mut self) -> Option<Result<(nros_core::GoalId, A::Feedback), NodeError>> {
2826        core::future::poll_fn(|cx| {
2827            // Register-then-check (closes the AtomicWaker race).
2828            self.client
2829                .core
2830                .feedback_subscriber
2831                .register_waker(cx.waker());
2832            match self.client.try_recv_feedback() {
2833                Ok(Some(item)) => core::task::Poll::Ready(Some(Ok(item))),
2834                Ok(None) => core::task::Poll::Pending,
2835                Err(e) => core::task::Poll::Ready(Some(Err(e))),
2836            }
2837        })
2838        .await
2839    }
2840
2841    /// Sync: wait for the next feedback message, spinning the executor.
2842    ///
2843    /// Returns `Ok(Some(feedback))` if a message arrives within `timeout_ms`,
2844    /// or `Ok(None)` on timeout. Unlike [`Promise::wait()`], timeout is not
2845    /// an error — the caller typically retries in a loop.
2846    pub fn wait_next(
2847        &mut self,
2848        executor: &mut super::Executor,
2849        timeout: core::time::Duration,
2850    ) -> Result<Option<(nros_core::GoalId, A::Feedback)>, NodeError> {
2851        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2852        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2853        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2854        let mut budget = WaitBudget::new(max_spins, timeout);
2855        loop {
2856            executor.spin_once(spin_interval);
2857            if let Some(item) = self.client.try_recv_feedback()? {
2858                return Ok(Some(item));
2859            }
2860            if !budget.tick() {
2861                return Ok(None);
2862            }
2863        }
2864    }
2865}
2866
2867#[cfg(feature = "stream")]
2868impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2869    futures_core::Stream for FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2870{
2871    type Item = Result<(nros_core::GoalId, A::Feedback), NodeError>;
2872
2873    fn poll_next(
2874        self: core::pin::Pin<&mut Self>,
2875        cx: &mut core::task::Context<'_>,
2876    ) -> core::task::Poll<Option<Self::Item>> {
2877        let this = self.get_mut();
2878        // Register-then-check (closes the AtomicWaker race).
2879        this.client
2880            .core
2881            .feedback_subscriber
2882            .register_waker(cx.waker());
2883        match this.client.try_recv_feedback() {
2884            Ok(Some(item)) => core::task::Poll::Ready(Some(Ok(item))),
2885            Ok(None) => core::task::Poll::Pending,
2886            Err(e) => core::task::Poll::Ready(Some(Err(e))),
2887        }
2888    }
2889}
2890
2891// ============================================================================
2892// GoalFeedbackStream
2893// ============================================================================
2894
2895/// A goal-filtered feedback stream.
2896///
2897/// Created by [`ActionClient::feedback_stream_for()`]. Only yields feedback
2898/// messages matching the specified goal ID.
2899pub struct GoalFeedbackStream<
2900    'a,
2901    A: RosAction,
2902    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2903    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2904    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2905> {
2906    client: &'a mut ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2907    goal_id: nros_core::GoalId,
2908}
2909
2910impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2911    GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2912{
2913    /// Async: wait for the next feedback message for this goal (no `futures` dependency needed).
2914    ///
2915    /// When the `stream` feature is enabled, prefer `StreamExt::next()` or
2916    /// `TryStreamExt::try_next()` for combinator support.
2917    pub async fn recv(&mut self) -> Option<Result<A::Feedback, NodeError>> {
2918        core::future::poll_fn(|cx| {
2919            // Register-then-check (closes the AtomicWaker race). The
2920            // waker is registered once for both the "no data" and
2921            // "wrong goal" branches that fall through to Pending.
2922            self.client
2923                .core
2924                .feedback_subscriber
2925                .register_waker(cx.waker());
2926            match self.client.try_recv_feedback() {
2927                Ok(Some((id, feedback))) if id.uuid == self.goal_id.uuid => {
2928                    core::task::Poll::Ready(Some(Ok(feedback)))
2929                }
2930                // Feedback for a different goal — keep waiting.
2931                Ok(Some(_)) => core::task::Poll::Pending,
2932                Ok(None) => core::task::Poll::Pending,
2933                Err(e) => core::task::Poll::Ready(Some(Err(e))),
2934            }
2935        })
2936        .await
2937    }
2938
2939    /// Sync: wait for the next feedback message for this goal, spinning the executor.
2940    pub fn wait_next(
2941        &mut self,
2942        executor: &mut super::Executor,
2943        timeout: core::time::Duration,
2944    ) -> Result<Option<A::Feedback>, NodeError> {
2945        let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2946        let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2947        let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2948        let mut budget = WaitBudget::new(max_spins, timeout);
2949        loop {
2950            executor.spin_once(spin_interval);
2951            if let Some((id, feedback)) = self.client.try_recv_feedback()?
2952                && id.uuid == self.goal_id.uuid
2953            {
2954                return Ok(Some(feedback));
2955            }
2956            if !budget.tick() {
2957                return Ok(None);
2958            }
2959        }
2960    }
2961}
2962
2963#[cfg(feature = "stream")]
2964impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2965    futures_core::Stream for GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2966{
2967    type Item = Result<A::Feedback, NodeError>;
2968
2969    fn poll_next(
2970        self: core::pin::Pin<&mut Self>,
2971        cx: &mut core::task::Context<'_>,
2972    ) -> core::task::Poll<Option<Self::Item>> {
2973        let this = self.get_mut();
2974        // Register-then-check (closes the AtomicWaker race).
2975        this.client
2976            .core
2977            .feedback_subscriber
2978            .register_waker(cx.waker());
2979        match this.client.try_recv_feedback() {
2980            Ok(Some((id, feedback))) if id.uuid == this.goal_id.uuid => {
2981                core::task::Poll::Ready(Some(Ok(feedback)))
2982            }
2983            Ok(Some(_)) => core::task::Poll::Pending,
2984            Ok(None) => core::task::Poll::Pending,
2985            Err(e) => core::task::Poll::Ready(Some(Err(e))),
2986        }
2987    }
2988}
2989
2990/// Parse a goal acceptance response (bool).
2991fn parse_goal_accepted(data: &[u8]) -> Result<bool, NodeError> {
2992    let mut reader =
2993        CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
2994    let accepted = reader.read_u8().unwrap_or(0) != 0;
2995    Ok(accepted)
2996}
2997
2998/// Parse a cancel response.
2999fn parse_cancel_response(data: &[u8]) -> Result<nros_core::CancelResponse, NodeError> {
3000    let mut reader =
3001        CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
3002    let return_code = reader.read_i8().unwrap_or(2);
3003    Ok(nros_core::CancelResponse::from_i8(return_code).unwrap_or_default())
3004}
3005
3006/// Parse an action result response (status + result).
3007fn parse_result_response<A: RosAction>(
3008    data: &[u8],
3009) -> Result<(nros_core::GoalStatus, A::Result), NodeError> {
3010    let mut reader =
3011        CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
3012    let status_code = reader.read_i8().unwrap_or(0);
3013    let status = nros_core::GoalStatus::from_i8(status_code).unwrap_or_default();
3014    let result =
3015        A::Result::deserialize(&mut reader).map_err(|_| NodeError::ServiceRequestFailed)?;
3016    Ok((status, result))
3017}