nros_node/executor/handles.rs
1//! Entity wrapper types for the embedded executor.
2
3use core::marker::PhantomData;
4
5use nros_core::{CdrReader, CdrWriter, Deserialize, RosAction, RosMessage, RosService, Serialize};
6use nros_rmw::{Publisher, ServiceClientTrait, ServiceServerTrait, Subscriber, TransportError};
7
8use crate::session;
9
10use super::types::{DEFAULT_TX_BUF, NodeError};
11
12/// Default polling interval (ms) for sync wait loops.
13const DEFAULT_SPIN_INTERVAL_MS: u64 = 10;
14
15/// Check whether the given budget has been exhausted.
16///
17/// `std` builds measure wall-clock against `Instant::now()`; `no_std`
18/// builds count iterations and exhaust after `max_iterations` calls.
19///
20/// **Phase 89.8**: the plain `max_iters` approach is insufficient on
21/// multi-threaded zpico backends (POSIX/Zephyr/NuttX). There,
22/// `executor.spin_once(10ms)` waits on a condvar that zenoh-pico's
23/// background tasks signal on any inbound frame (keep-alives,
24/// discovery gossip, interest messages). Each signal returns the
25/// spin well before the 10 ms budget, so a nominal
26/// `1000 × 10 ms = 10 s` iteration count collapses to milliseconds
27/// of real time and the wait returns `Timeout` long before the
28/// awaited reply can arrive.
29///
30/// Same class of bug 89.2 fixed in `nros-c`'s blocking service call
31/// and 89.3 fixed in `nros-cpp`'s action-client helpers. The
32/// maintainer explicitly flagged this `Promise::wait` / `wait_next`
33/// path in the 89.2 commit: *"Promise::wait in nros-node has the
34/// same structural bug but currently passes all tests. Left on
35/// max_spins until a test surfaces it."* The NuttX Rust action
36/// E2E is that test.
37struct WaitBudget {
38 #[cfg(feature = "std")]
39 deadline: std::time::Instant,
40 // Pure no_std fallback: iteration-count budget (no wall clock).
41 #[cfg(not(feature = "std"))]
42 remaining: u64,
43}
44
45impl WaitBudget {
46 fn new(_max_iterations: u64, _timeout: core::time::Duration) -> Self {
47 #[cfg(feature = "std")]
48 {
49 Self {
50 deadline: std::time::Instant::now() + _timeout,
51 }
52 }
53 #[cfg(not(feature = "std"))]
54 {
55 Self {
56 remaining: _max_iterations,
57 }
58 }
59 }
60
61 fn tick(&mut self) -> bool {
62 #[cfg(feature = "std")]
63 {
64 std::time::Instant::now() < self.deadline
65 }
66 #[cfg(not(feature = "std"))]
67 {
68 if self.remaining == 0 {
69 false
70 } else {
71 self.remaining -= 1;
72 true
73 }
74 }
75 }
76}
77
78/// UUID byte count in a ROS 2 GoalId.
79///
80/// CDR encoding: a fixed `uint8[16]` array — ROS 2 `unique_identifier_msgs/UUID`
81/// — with **no** length prefix (fixed arrays are unprefixed in CDR).
82const GOAL_UUID_SIZE: usize = 16;
83
84// ============================================================================
85// EmbeddedPublisher
86// ============================================================================
87
88/// Typed publisher handle.
89///
90/// Two methods, both byte-oriented at the wire:
91///
92/// - [`publish`](Self::publish) / [`publish_with_buffer`](Self::publish_with_buffer)
93/// — accept `&M: RosMessage`, CDR-encode into a stack buffer, then
94/// call [`Publisher::publish_raw`](nros_rmw::Publisher::publish_raw).
95/// - [`publish_raw`](Self::publish_raw) — accepts pre-encoded CDR bytes
96/// for callers that already produced the wire payload.
97///
98/// **No typed `loan()` exists.** Loan/borrow live exclusively on
99/// [`EmbeddedRawPublisher`] / [`RawSubscription`]. `try_loan(len)`
100/// requires the byte length up front, which CDR ser/de can only
101/// discover after encoding — the two APIs are incompatible by
102/// construction. See `docs/design/0010-zero-copy-raw-api.md` decision D7.
103pub struct EmbeddedPublisher<M> {
104 pub(crate) handle: session::RmwPublisher,
105 /// Phase 108 — registered event closures kept alive for the
106 /// publisher's lifetime; freed in `Drop`.
107 pub(crate) event_regs: EventRegs,
108 pub(crate) _phantom: PhantomData<M>,
109}
110
111impl<M> Drop for EmbeddedPublisher<M> {
112 fn drop(&mut self) {
113 drop_event_regs(&mut self.event_regs);
114 }
115}
116
117impl<M: RosMessage> EmbeddedPublisher<M> {
118 /// Publish a message using the default buffer size.
119 pub fn publish(&self, msg: &M) -> Result<(), NodeError> {
120 self.publish_with_buffer::<DEFAULT_TX_BUF>(msg)
121 }
122
123 /// Publish a message with a custom buffer size.
124 pub fn publish_with_buffer<const BUF: usize>(&self, msg: &M) -> Result<(), NodeError> {
125 let mut buffer = [0u8; BUF];
126 let mut writer =
127 CdrWriter::new_with_header(&mut buffer).map_err(|_| NodeError::BufferTooSmall)?;
128 msg.serialize(&mut writer)
129 .map_err(|_| NodeError::Serialization)?;
130 let len = writer.position();
131 self.handle
132 .publish_raw(&buffer[..len])
133 .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
134 }
135
136 /// Publish raw CDR-encoded data (must include CDR header).
137 pub fn publish_raw(&self, data: &[u8]) -> Result<(), NodeError> {
138 self.handle
139 .publish_raw(data)
140 .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
141 }
142
143 /// Phase 124.E.1 — streamed publish. Use a closure-driven writer
144 /// to serialise straight into the backend's outbound buffer
145 /// without a per-publisher staging copy. Saves on RAM-constrained
146 /// nodes that publish multi-KB payloads.
147 ///
148 /// The `writer` closure receives a [`StreamWriter`] mutable
149 /// reference and uses [`StreamWriter::write`] / [`extend`] /
150 /// [`reserved_len`] to fill the slot in chunks. The total
151 /// payload length must be declared up-front via
152 /// [`StreamWriter::reserve_total`]; the backend allocates that
153 /// many bytes in its outbound buffer before any chunks land.
154 ///
155 /// Backends without a native stream slot fall through to a
156 /// stack-allocated staging buffer (capped at 4 KiB) + a single
157 /// `publish_raw` — same observable result, just no zero-staging
158 /// win for the big-message case.
159 pub fn publish_streamed<F>(&self, total_len: usize, writer: F) -> Result<(), NodeError>
160 where
161 F: FnMut(&mut [u8]) -> usize,
162 {
163 // Wrap the closure in a `*mut c_void` so it survives the
164 // crossing into the C callback contract the vtable expects.
165 // The closure is consumed by reference, so the lifetime is
166 // bounded by this function's frame — no escape.
167 use nros_rmw::Publisher;
168 struct Ctx<W> {
169 writer: W,
170 total: usize,
171 }
172 unsafe extern "C" fn size_cb<W>(
173 out_total_len: *mut usize,
174 user_ctx: *mut core::ffi::c_void,
175 ) {
176 unsafe {
177 let ctx = &*(user_ctx as *const Ctx<W>);
178 *out_total_len = ctx.total;
179 }
180 }
181 unsafe extern "C" fn chunk_cb<W: FnMut(&mut [u8]) -> usize>(
182 out_buf: *mut u8,
183 cap: usize,
184 out_written: *mut usize,
185 user_ctx: *mut core::ffi::c_void,
186 ) {
187 unsafe {
188 let ctx = &mut *(user_ctx as *mut Ctx<W>);
189 let slot = core::slice::from_raw_parts_mut(out_buf, cap);
190 let n = (ctx.writer)(slot);
191 *out_written = n;
192 }
193 }
194 let mut ctx = Ctx {
195 writer,
196 total: total_len,
197 };
198 let ctx_ptr = &mut ctx as *mut Ctx<F> as *mut core::ffi::c_void;
199 // SAFETY: `ctx` lives until this call returns, and both callbacks
200 // only cast `user_ctx` back to that stack-local `Ctx<F>`.
201 unsafe {
202 self.handle
203 .publish_streamed(size_cb::<F>, chunk_cb::<F>, ctx_ptr)
204 }
205 .map_err(NodeError::Transport)
206 }
207
208 /// Phase 108.B — manually assert this publisher's liveliness.
209 /// Required for publishers configured with
210 /// [`QosLivelinessPolicy::ManualByTopic`] /
211 /// [`QosLivelinessPolicy::ManualByNode`]. No-op for AUTOMATIC /
212 /// NONE kinds. Returns `Err(Unsupported)` if the backend doesn't
213 /// implement manual liveliness.
214 pub fn assert_liveliness(&self) -> Result<(), NodeError> {
215 use nros_rmw::Publisher as _;
216 self.handle
217 .assert_liveliness()
218 .map_err(NodeError::Transport)
219 }
220
221 // ====================================================================
222 // Phase 108 — status events
223 // ====================================================================
224 //
225 // Publisher-side: `LivelinessLost` and `OfferedDeadlineMissed`.
226 // Returns `NodeError::Transport(TransportError::Unsupported)` if
227 // the active backend doesn't generate the event for this entity.
228
229 /// `true` if the active backend can fire the named event for this
230 /// publisher.
231 #[cfg(feature = "alloc")]
232 pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
233 use nros_rmw::Publisher as _;
234 self.handle.supports_event(kind)
235 }
236
237 /// Register a callback for `LivelinessLost`. Fires when this
238 /// publisher misses its own liveliness assertion deadline.
239 #[cfg(feature = "alloc")]
240 pub fn on_liveliness_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
241 where
242 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
243 {
244 register_pub_event::<F, _>(
245 &mut self.handle,
246 &mut self.event_regs,
247 nros_rmw::EventKind::LivelinessLost,
248 0,
249 cb,
250 |payload, f| {
251 if let nros_rmw::EventPayload::LivelinessLost(s) = payload {
252 f(*s);
253 }
254 },
255 )
256 }
257
258 /// Register a callback for `OfferedDeadlineMissed`. Fires when
259 /// this publisher promised `deadline` and falls behind.
260 #[cfg(feature = "alloc")]
261 pub fn on_offered_deadline_missed<F>(
262 &mut self,
263 deadline: core::time::Duration,
264 cb: F,
265 ) -> Result<(), NodeError>
266 where
267 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
268 {
269 register_pub_event::<F, _>(
270 &mut self.handle,
271 &mut self.event_regs,
272 nros_rmw::EventKind::OfferedDeadlineMissed,
273 deadline.as_millis().min(u32::MAX as u128) as u32,
274 cb,
275 |payload, f| {
276 if let nros_rmw::EventPayload::OfferedDeadlineMissed(s) = payload {
277 f(*s);
278 }
279 },
280 )
281 }
282}
283
284/// Cap on registered event callbacks per entity. Subscribers can hold
285/// up to 3 (LivelinessChanged + RequestedDeadlineMissed + MessageLost);
286/// publishers up to 2 (LivelinessLost + OfferedDeadlineMissed). One vec
287/// type fits both — extra slots are unused on publishers.
288#[cfg(feature = "alloc")]
289pub(crate) const MAX_EVENTS_PER_ENTITY: usize = 3;
290
291/// One row of the per-entity event-callback registry. Stores enough to
292/// type-erase the boxed closure for `Drop`-time deallocation.
293#[cfg(feature = "alloc")]
294#[derive(Clone, Copy)]
295pub(crate) struct EventReg {
296 /// `Box::into_raw`-derived pointer; valid for the entity's lifetime.
297 pub(crate) ctx: *mut core::ffi::c_void,
298 /// Type-erased destructor. Calls `Box::from_raw` w/ the originating
299 /// monomorphic type, dropping the closure + freeing the heap slot.
300 pub(crate) drop_fn: unsafe fn(*mut core::ffi::c_void),
301}
302
303#[cfg(feature = "alloc")]
304pub(crate) type EventRegs = heapless::Vec<EventReg, MAX_EVENTS_PER_ENTITY>;
305
306/// Empty placeholder for no-alloc builds — keeps struct layout stable
307/// across feature combinations without paying any space.
308#[cfg(not(feature = "alloc"))]
309#[derive(Default, Clone, Copy)]
310pub(crate) struct EventRegs;
311
312/// Empty initial value for the `event_regs` field. Selected at compile
313/// time so call sites are uniform across feature combinations
314/// (`heapless::Vec::new()` for `alloc`; unit-struct constructor
315/// otherwise — clippy's `default_constructed_unit_structs` lint
316/// rejects the `EventRegs::default()` form on the unit-struct branch).
317#[cfg(feature = "alloc")]
318#[inline]
319pub(crate) fn empty_event_regs() -> EventRegs {
320 heapless::Vec::new()
321}
322#[cfg(not(feature = "alloc"))]
323#[inline]
324pub(crate) fn empty_event_regs() -> EventRegs {
325 EventRegs
326}
327
328#[cfg(feature = "alloc")]
329pub(crate) fn drop_event_regs(regs: &mut EventRegs) {
330 while let Some(reg) = regs.pop() {
331 // SAFETY: `reg.ctx` was obtained from `Box::into_raw` of the
332 // monomorphic type that `reg.drop_fn` knows about. Each reg is
333 // visited exactly once because we drain via `pop`.
334 unsafe { (reg.drop_fn)(reg.ctx) };
335 }
336}
337
338#[cfg(not(feature = "alloc"))]
339#[inline]
340pub(crate) fn drop_event_regs(_regs: &mut EventRegs) {}
341
342#[cfg(feature = "alloc")]
343fn register_pub_event<F, D>(
344 handle: &mut session::RmwPublisher,
345 regs: &mut EventRegs,
346 kind: nros_rmw::EventKind,
347 deadline_ms: u32,
348 user_cb: F,
349 dispatch: D,
350) -> Result<(), NodeError>
351where
352 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
353 D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
354{
355 use nros_rmw::Publisher as _;
356 if regs.is_full() {
357 return Err(NodeError::Transport(TransportError::Unsupported));
358 }
359 let state = alloc::boxed::Box::new(EventClosureState { user_cb, dispatch });
360 let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
361 // SAFETY: trampoline downcasts `user_ctx` back to the boxed
362 // EventClosureState. Box ownership is recorded in `regs`; entity
363 // Drop walks the registry and frees via `drop_event_state::<F, D>`.
364 let res = unsafe {
365 handle.register_event_callback(kind, deadline_ms, event_trampoline::<F, D>, user_ctx)
366 };
367 match res {
368 Ok(()) => {
369 // is_full check above guarantees push() succeeds.
370 let _ = regs.push(EventReg {
371 ctx: user_ctx,
372 drop_fn: drop_event_state::<F, D>,
373 });
374 Ok(())
375 }
376 Err(e) => {
377 // SAFETY: backend rejected the registration; reclaim the
378 // box we just leaked into raw form.
379 unsafe {
380 drop(alloc::boxed::Box::from_raw(
381 user_ctx as *mut EventClosureState<F, D>,
382 ));
383 }
384 Err(NodeError::Transport(e))
385 }
386 }
387}
388
389#[cfg(feature = "alloc")]
390struct EventClosureState<F, D> {
391 user_cb: F,
392 dispatch: D,
393}
394
395#[cfg(feature = "alloc")]
396unsafe extern "C" fn event_trampoline<F, D>(
397 kind: nros_rmw::EventKind,
398 payload_ptr: *const core::ffi::c_void,
399 user_ctx: *mut core::ffi::c_void,
400) where
401 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
402 D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
403{
404 let state = unsafe { &mut *(user_ctx as *mut EventClosureState<F, D>) };
405 let payload = unsafe { nros_rmw::payload_from_raw(kind, payload_ptr) };
406 (state.dispatch)(payload, &mut state.user_cb);
407}
408
409#[cfg(feature = "alloc")]
410unsafe fn drop_event_state<F, D>(ctx: *mut core::ffi::c_void)
411where
412 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
413 D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
414{
415 // SAFETY: caller guarantees `ctx` was obtained from
416 // `Box::into_raw::<EventClosureState<F, D>>` and not yet freed.
417 unsafe {
418 drop(alloc::boxed::Box::from_raw(
419 ctx as *mut EventClosureState<F, D>,
420 ));
421 }
422}
423
424#[cfg(feature = "alloc")]
425fn register_sub_event_count<F, D>(
426 handle: &mut session::RmwSubscriber,
427 regs: &mut EventRegs,
428 kind: nros_rmw::EventKind,
429 deadline_ms: u32,
430 user_cb: F,
431 dispatch: D,
432) -> Result<(), NodeError>
433where
434 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
435 D: Fn(nros_rmw::EventPayload<'_>, &mut F) + 'static,
436{
437 use nros_rmw::Subscriber as _;
438 if regs.is_full() {
439 return Err(NodeError::Transport(TransportError::Unsupported));
440 }
441 let state = alloc::boxed::Box::new(EventClosureState { user_cb, dispatch });
442 let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
443 let res = unsafe {
444 handle.register_event_callback(kind, deadline_ms, event_trampoline::<F, D>, user_ctx)
445 };
446 match res {
447 Ok(()) => {
448 let _ = regs.push(EventReg {
449 ctx: user_ctx,
450 drop_fn: drop_event_state::<F, D>,
451 });
452 Ok(())
453 }
454 Err(e) => {
455 unsafe {
456 drop(alloc::boxed::Box::from_raw(
457 user_ctx as *mut EventClosureState<F, D>,
458 ));
459 }
460 Err(NodeError::Transport(e))
461 }
462 }
463}
464
465#[cfg(feature = "alloc")]
466fn register_sub_event_liveliness<F>(
467 handle: &mut session::RmwSubscriber,
468 regs: &mut EventRegs,
469 user_cb: F,
470) -> Result<(), NodeError>
471where
472 F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
473{
474 use nros_rmw::Subscriber as _;
475 if regs.is_full() {
476 return Err(NodeError::Transport(TransportError::Unsupported));
477 }
478 let state = alloc::boxed::Box::new(LivelinessClosureState { user_cb });
479 let user_ctx = alloc::boxed::Box::into_raw(state) as *mut core::ffi::c_void;
480 let res = unsafe {
481 handle.register_event_callback(
482 nros_rmw::EventKind::LivelinessChanged,
483 0,
484 liveliness_trampoline::<F>,
485 user_ctx,
486 )
487 };
488 match res {
489 Ok(()) => {
490 let _ = regs.push(EventReg {
491 ctx: user_ctx,
492 drop_fn: drop_liveliness_state::<F>,
493 });
494 Ok(())
495 }
496 Err(e) => {
497 unsafe {
498 drop(alloc::boxed::Box::from_raw(
499 user_ctx as *mut LivelinessClosureState<F>,
500 ));
501 }
502 Err(NodeError::Transport(e))
503 }
504 }
505}
506
507#[cfg(feature = "alloc")]
508unsafe fn drop_liveliness_state<F>(ctx: *mut core::ffi::c_void)
509where
510 F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
511{
512 unsafe {
513 drop(alloc::boxed::Box::from_raw(
514 ctx as *mut LivelinessClosureState<F>,
515 ));
516 }
517}
518
519#[cfg(feature = "alloc")]
520struct LivelinessClosureState<F> {
521 user_cb: F,
522}
523
524#[cfg(feature = "alloc")]
525unsafe extern "C" fn liveliness_trampoline<F>(
526 kind: nros_rmw::EventKind,
527 payload_ptr: *const core::ffi::c_void,
528 user_ctx: *mut core::ffi::c_void,
529) where
530 F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
531{
532 let state = unsafe { &mut *(user_ctx as *mut LivelinessClosureState<F>) };
533 let payload = unsafe { nros_rmw::payload_from_raw(kind, payload_ptr) };
534 if let nros_rmw::EventPayload::LivelinessChanged(s) = payload {
535 (state.user_cb)(*s);
536 }
537}
538
539// ============================================================================
540// EmbeddedRawPublisher — typeless publisher for non-ROS message wire formats
541// ============================================================================
542
543/// Default size of each per-publisher arena slot, in bytes.
544pub const DEFAULT_LOAN_BUF: usize = 1024;
545
546use core::cell::UnsafeCell;
547// portable-atomic AtomicBool — resolves to native on targets that support it,
548// software fallback on those that don't (e.g. some Xtensa ESP32 SoCs). Use
549// portable-atomic's Ordering too so the type sees the matching trait
550// implementation across all targets.
551use portable_atomic::{AtomicBool, Ordering};
552
553/// Typeless publisher handle. Use when the wire format is not ROS CDR
554/// (e.g. PX4 uORB raw POD bytes, custom binary protocols).
555///
556/// Two publish paths:
557///
558/// - [`publish_raw`](Self::publish_raw): user supplies a `&[u8]`, backend
559/// memcpys into its outbound buffer. One copy.
560/// - [`try_loan`](Self::try_loan): backend (or per-publisher arena fallback)
561/// hands user a `&mut [u8]` slice. User writes in place. [`PublishLoan::commit`]
562/// triggers the wire write. Zero-copy on backends with native lending
563/// (Phase 99: zenoh-pico `unstable-zenoh-api`, XRCE-DDS); single-memcpy
564/// fallback on backends without (uORB).
565///
566/// The const-generic `TX_BUF` sizes the inline arena slot (default
567/// [`DEFAULT_LOAN_BUF`]). Loans larger than `TX_BUF` return
568/// `LoanError::TooLarge`.
569pub struct EmbeddedRawPublisher<const TX_BUF: usize = DEFAULT_LOAN_BUF> {
570 pub(crate) handle: session::RmwPublisher,
571 /// Single-slot arena: writable buffer + busy flag. SLOTS=1 in v1
572 /// (concurrent loans on the same publisher return WouldBlock).
573 /// Unused when the `rmw-lending` feature is on — `try_loan`
574 /// dispatches to the backend's `SlotLending` instead.
575 #[allow(dead_code)]
576 pub(crate) arena: TxArena<TX_BUF>,
577 /// Phase 108 — registered event closures.
578 pub(crate) event_regs: EventRegs,
579}
580
581impl<const TX_BUF: usize> Drop for EmbeddedRawPublisher<TX_BUF> {
582 fn drop(&mut self) {
583 drop_event_regs(&mut self.event_regs);
584 }
585}
586
587/// Single-slot per-publisher arena. Concurrent `try_loan` calls on the
588/// same publisher race on the busy flag; loser gets `WouldBlock`.
589///
590/// `waker` lets `loan().await` register a waker before returning
591/// `Pending`; `release()` wakes it so the next `try_loan` succeeds
592/// without polling the executor loop. Phase 99.H' — replaces the
593/// earlier `wake_by_ref + Pending` busy yield with an event-driven
594/// wake, and gives `LoanFuture::Drop` a place to release a pending
595/// reservation cleanly.
596#[allow(dead_code)]
597pub(crate) struct TxArena<const TX_BUF: usize> {
598 busy: AtomicBool,
599 buf: UnsafeCell<[u8; TX_BUF]>,
600 waker: atomic_waker::AtomicWaker,
601}
602
603// SAFETY: Sync-ness of the arena is enforced by the `busy` flag — only
604// the thread that won the CAS may access `buf`, and only until commit/
605// discard releases the slot.
606unsafe impl<const TX_BUF: usize> Sync for TxArena<TX_BUF> {}
607
608#[allow(dead_code)] // unused when `rmw-lending` is on
609impl<const TX_BUF: usize> TxArena<TX_BUF> {
610 pub(crate) const fn new() -> Self {
611 Self {
612 busy: AtomicBool::new(false),
613 buf: UnsafeCell::new([0u8; TX_BUF]),
614 waker: atomic_waker::AtomicWaker::new(),
615 }
616 }
617
618 /// Try to claim the arena slot. Returns a raw pointer + len pair on
619 /// success; caller wraps it in a `PublishLoan`. Returns `false` if
620 /// the slot is already loaned.
621 ///
622 /// `&self` returning `&mut` is sound because the `busy` flag
623 /// gates exclusivity at runtime — the CAS in this function is
624 /// the only writer, and `release()` is only callable through the
625 /// loan's `Drop`.
626 #[allow(clippy::mut_from_ref)]
627 fn try_claim(&self, len: usize) -> Result<&mut [u8], LoanError> {
628 if len > TX_BUF {
629 return Err(LoanError::TooLarge);
630 }
631 if self
632 .busy
633 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
634 .is_err()
635 {
636 return Err(LoanError::WouldBlock);
637 }
638 // SAFETY: we just won the busy CAS; we hold exclusive access
639 // until release(). Lifetime is tied to `&self` for the loan.
640 let buf_ref: &mut [u8; TX_BUF] = unsafe { &mut *self.buf.get() };
641 Ok(&mut buf_ref[..len])
642 }
643
644 fn release(&self) {
645 self.busy.store(false, Ordering::Release);
646 // Wake any pending `LoanFuture` waiting on this arena. Cheap
647 // no-op if no one is waiting.
648 self.waker.wake();
649 }
650}
651
652impl<const TX_BUF: usize> EmbeddedRawPublisher<TX_BUF> {
653 /// Construct an [`EmbeddedRawPublisher`] from a backend-allocated
654 /// `RmwPublisher` handle. Public so external extension crates
655 /// (e.g. `nros-px4` for typed uORB wrappers) can wrap a handle
656 /// they obtained directly from the active session via
657 /// [`crate::Node::session_mut`] + a backend-specific create method.
658 ///
659 /// Most users should not call this — use [`crate::Node::create_publisher`]
660 /// or [`crate::Node::create_publisher_raw`] instead.
661 pub fn new(handle: session::RmwPublisher) -> Self {
662 Self {
663 handle,
664 arena: TxArena::new(),
665 event_regs: empty_event_regs(),
666 }
667 }
668
669 /// Phase 108.A — `true` if the active backend can fire the named
670 /// event for this raw publisher.
671 #[cfg(feature = "alloc")]
672 pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
673 use nros_rmw::Publisher as _;
674 self.handle.supports_event(kind)
675 }
676
677 /// Publish a pre-encoded byte slice. The byte format depends entirely
678 /// on the active RMW backend:
679 ///
680 /// - **zenoh / XRCE-DDS / DDS**: CDR-encoded payload including the
681 /// 4-byte CDR header.
682 /// - **uORB**: raw POD struct bytes (no header). Length must equal
683 /// `size_of::<T::Msg>()` for the registered topic.
684 pub fn publish_raw(&self, data: &[u8]) -> Result<(), NodeError> {
685 self.handle
686 .publish_raw(data)
687 .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
688 }
689
690 /// Phase 128.F.4 — raw publish with a wire-level attachment block.
691 ///
692 /// `attachment` rides alongside the payload on backends that
693 /// natively support it (zenoh-pico, Cyclone DDS). Backends without
694 /// native support silently discard `attachment` and fall back to
695 /// the regular [`publish_raw`](Self::publish_raw) path — the
696 /// default `Publisher::publish_raw_with_attachment` body in
697 /// `nros-rmw` does this delegation.
698 ///
699 /// Primary use case: cross-RMW bridges stamp the source backend's
700 /// RMW name as `bridge_origin` so a paired return bridge can drop
701 /// echoed frames deterministically.
702 pub fn publish_raw_with_attachment(
703 &self,
704 data: &[u8],
705 attachment: &[u8],
706 ) -> Result<(), NodeError> {
707 self.handle
708 .publish_raw_with_attachment(data, attachment)
709 .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
710 }
711
712 /// Phase 108.B — manually assert this publisher's liveliness.
713 /// Required for `QosLivelinessPolicy::ManualByTopic` /
714 /// `ManualByNode`. No-op for AUTOMATIC / NONE.
715 pub fn assert_liveliness(&self) -> Result<(), NodeError> {
716 use nros_rmw::Publisher as _;
717 self.handle
718 .assert_liveliness()
719 .map_err(NodeError::Transport)
720 }
721
722 /// Reserve a writable slot of `len` bytes. Caller writes into the
723 /// returned [`PublishLoan`] and calls [`PublishLoan::commit`] to
724 /// publish. Never blocks; returns [`LoanError::WouldBlock`] when the
725 /// slot is already in use (arena fallback) or the backend's outbound
726 /// stream is full (lending path), and [`LoanError::TooLarge`] when
727 /// `len` exceeds the publisher's slot capacity.
728 ///
729 /// With the `rmw-lending` cargo feature on, this dispatches to the
730 /// active backend's [`SlotLending::try_lend_slot`](nros_rmw::SlotLending::try_lend_slot)
731 /// — zero-copy on backends that natively lend (zenoh-pico via
732 /// `z_bytes_from_static_buf`, XRCE-DDS via `uxr_prepare_output_stream`).
733 /// Without `rmw-lending`, the arena fallback is used: caller fills a
734 /// per-publisher inline slot, [`commit`](PublishLoan::commit) calls
735 /// the backend's `publish_raw` (single memcpy into the backend's
736 /// outbound buffer, same as `publish_raw` directly).
737 #[cfg(not(feature = "rmw-lending"))]
738 pub fn try_loan(&self, len: usize) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
739 let slice = self.arena.try_claim(len)?;
740 Ok(PublishLoan {
741 publisher: self,
742 slice,
743 committed: false,
744 })
745 }
746
747 /// `rmw-lending` variant — see the no-lending [`try_loan`] for the docs.
748 #[cfg(feature = "rmw-lending")]
749 pub fn try_loan(&self, len: usize) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
750 use nros_rmw::SlotLending;
751 match self.handle.try_lend_slot(len) {
752 Ok(Some(slot)) => Ok(PublishLoan {
753 publisher: self,
754 backend_slot: Some(slot),
755 committed: false,
756 }),
757 Ok(None) => Err(LoanError::WouldBlock),
758 Err(e) => Err(LoanError::Backend(e)),
759 }
760 }
761
762 /// Sync blocking loan with timeout. Spins the executor until the
763 /// arena slot is free or `timeout` elapses.
764 ///
765 /// Useful when you publish from a sync context that owns the
766 /// executor and want to block on a busy arena (rare — single-slot
767 /// arena means contention only when concurrent task tries the same
768 /// publisher, in which case the offending other task should have
769 /// committed promptly).
770 pub fn loan_with_timeout(
771 &self,
772 len: usize,
773 executor: &mut super::Executor,
774 timeout: core::time::Duration,
775 ) -> Result<PublishLoan<'_, TX_BUF>, LoanError> {
776 if len > TX_BUF {
777 return Err(LoanError::TooLarge);
778 }
779 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
780 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
781 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
782 let mut budget = WaitBudget::new(max_spins, timeout);
783 loop {
784 match self.try_loan(len) {
785 Ok(loan) => return Ok(loan),
786 Err(LoanError::WouldBlock) => {
787 executor.spin_once(spin_interval);
788 if !budget.tick() {
789 return Err(LoanError::WouldBlock);
790 }
791 }
792 Err(other) => return Err(other),
793 }
794 }
795 }
796
797 /// Async-await on a free loan slot. Returns the loan as soon as
798 /// the arena's busy flag clears (no-lending path) or as soon as
799 /// the backend's outbound stream has room (lending path).
800 ///
801 /// Phase 99.H': cancellation-safe Future. Registers the task's
802 /// waker on the arena's [`AtomicWaker`] before checking
803 /// [`try_loan`]; another task's `commit` / `discard` calls
804 /// [`TxArena::release`] which wakes us. Dropping the future before
805 /// it resolves removes nothing from any wait queue (single-slot
806 /// AtomicWaker semantics: only the latest registration matters)
807 /// and explicitly wakes another waiter so the next task in line
808 /// gets a poll. No `PublishLoan` is materialised on cancel paths.
809 pub fn loan(&self, len: usize) -> LoanFuture<'_, TX_BUF> {
810 LoanFuture {
811 publisher: self,
812 len,
813 registered: false,
814 }
815 }
816}
817
818/// Future returned by [`EmbeddedRawPublisher::loan`]. Phase 99.H'
819/// cancellation-safe variant: if dropped before resolving, it wakes
820/// the next pending waiter so the busy-flag-clear signal isn't lost
821/// to the cancelled task.
822#[must_use = "futures do nothing unless polled"]
823pub struct LoanFuture<'a, const TX_BUF: usize> {
824 publisher: &'a EmbeddedRawPublisher<TX_BUF>,
825 len: usize,
826 /// Set on the first `Pending` return so `Drop` knows whether a
827 /// waker was registered (and thus another waiter may need a wake).
828 registered: bool,
829}
830
831impl<'a, const TX_BUF: usize> core::future::Future for LoanFuture<'a, TX_BUF> {
832 type Output = Result<PublishLoan<'a, TX_BUF>, LoanError>;
833
834 fn poll(
835 self: core::pin::Pin<&mut Self>,
836 cx: &mut core::task::Context<'_>,
837 ) -> core::task::Poll<Self::Output> {
838 // SAFETY: LoanFuture is `Unpin` for all practical purposes —
839 // it holds only `&publisher`, `len`, and a bool. Move out of
840 // Pin for the body.
841 let this = self.get_mut();
842
843 // Register-then-check: closes the race where another task's
844 // `release` fires between `try_loan` returning WouldBlock and
845 // the waker landing. The arena's AtomicWaker stores the
846 // latest waker; we update it on every poll so a `select!` /
847 // re-poll under a different waker observes the right one.
848 loan_register_waker(this.publisher, cx.waker());
849 this.registered = true;
850
851 match this.publisher.try_loan(this.len) {
852 Ok(loan) => core::task::Poll::Ready(Ok(loan)),
853 Err(LoanError::WouldBlock) => core::task::Poll::Pending,
854 Err(other) => core::task::Poll::Ready(Err(other)),
855 }
856 }
857}
858
859impl<'a, const TX_BUF: usize> Drop for LoanFuture<'a, TX_BUF> {
860 fn drop(&mut self) {
861 // If we registered a waker but never resolved, the busy flag
862 // may have just cleared and we'd swallow the wake. Forward it
863 // to the next waiter so the line keeps moving. Cheap no-op
864 // when no one else is waiting.
865 if self.registered {
866 loan_wake_next(self.publisher);
867 }
868 }
869}
870
871// Indirection so the `rmw-lending` build (which has no arena) can
872// stub these. With `rmw-lending` on, the lending Future variant uses
873// the executor's drive_io spin to drain the backend stream — there's
874// no arena-level wake source, so the helpers degrade to a self-wake.
875#[cfg(not(feature = "rmw-lending"))]
876fn loan_register_waker<const TX_BUF: usize>(
877 pub_: &EmbeddedRawPublisher<TX_BUF>,
878 waker: &core::task::Waker,
879) {
880 pub_.arena.waker.register(waker);
881}
882
883#[cfg(not(feature = "rmw-lending"))]
884fn loan_wake_next<const TX_BUF: usize>(pub_: &EmbeddedRawPublisher<TX_BUF>) {
885 pub_.arena.waker.wake();
886}
887
888#[cfg(feature = "rmw-lending")]
889fn loan_register_waker<const TX_BUF: usize>(
890 _pub_: &EmbeddedRawPublisher<TX_BUF>,
891 waker: &core::task::Waker,
892) {
893 // No arena wake source under lending; self-wake so the runtime
894 // re-polls after the next executor tick (which drains the
895 // backend's outbound stream via `drive_io`).
896 waker.wake_by_ref();
897}
898
899#[cfg(feature = "rmw-lending")]
900fn loan_wake_next<const TX_BUF: usize>(_pub_: &EmbeddedRawPublisher<TX_BUF>) {
901 // No-op: no AtomicWaker on the arena under the lending build.
902}
903
904/// Error type for [`EmbeddedRawPublisher::try_loan`].
905#[derive(Debug, Clone, PartialEq, Eq)]
906pub enum LoanError {
907 /// Requested length exceeds the publisher's arena slot capacity.
908 TooLarge,
909 /// Arena slot already in use; another publish is in flight on this
910 /// publisher. Retry after the other loan commits or discards.
911 WouldBlock,
912 /// Backend rejected the publish at commit time.
913 Backend(TransportError),
914}
915
916impl From<TransportError> for LoanError {
917 fn from(e: TransportError) -> Self {
918 LoanError::Backend(e)
919 }
920}
921
922/// Writable loan into a [`EmbeddedRawPublisher`]'s slot.
923///
924/// User fills `as_mut()` then calls [`commit`](Self::commit) to publish,
925/// or [`discard`](Self::discard) to release the slot without publishing.
926/// Dropping without either silently discards (slot freed); a
927/// `#[must_use]` warning catches accidental drops at compile time.
928///
929/// Two backings, selected at compile time by the `rmw-lending` feature:
930///
931/// - **Arena (default)**: per-publisher inline `[u8; TX_BUF]` slot. On
932/// commit, `publish_raw` memcpys into the backend's outbound buffer.
933/// - **Backend lending (`rmw-lending`)**: slot owned by the backend
934/// (zenoh-pico's static buffer aliased via `z_bytes_from_static_buf`,
935/// XRCE's `ucdrBuffer` reservation). True zero-copy publish.
936#[must_use = "PublishLoan must be committed or discarded; dropping silently rolls back"]
937#[cfg(not(feature = "rmw-lending"))]
938pub struct PublishLoan<'a, const TX_BUF: usize> {
939 publisher: &'a EmbeddedRawPublisher<TX_BUF>,
940 slice: &'a mut [u8],
941 committed: bool,
942}
943
944#[must_use = "PublishLoan must be committed or discarded; dropping silently rolls back"]
945#[cfg(feature = "rmw-lending")]
946pub struct PublishLoan<'a, const TX_BUF: usize> {
947 publisher: &'a EmbeddedRawPublisher<TX_BUF>,
948 /// `Option` so `commit` can move the slot out via `take()` without
949 /// triggering Drop's release path. Always `Some(_)` until `commit`
950 /// or `discard` runs.
951 backend_slot: Option<<session::RmwPublisher as nros_rmw::SlotLending>::Slot<'a>>,
952 committed: bool,
953}
954
955#[cfg(not(feature = "rmw-lending"))]
956impl<'a, const TX_BUF: usize> PublishLoan<'a, TX_BUF> {
957 /// Mutable view into the loaned bytes. Caller writes message data here.
958 #[allow(clippy::should_implement_trait)]
959 pub fn as_mut(&mut self) -> &mut [u8] {
960 self.slice
961 }
962
963 /// Commit the loan: hand the bytes to the backend's `publish_raw`,
964 /// then release the arena slot. Returns the backend's publish error
965 /// if any (slot is released regardless).
966 pub fn commit(mut self) -> Result<(), LoanError> {
967 let res = self
968 .publisher
969 .handle
970 .publish_raw(self.slice)
971 .map_err(|_| LoanError::Backend(TransportError::PublishFailed));
972 self.committed = true;
973 // Drop runs and releases the slot.
974 res
975 }
976
977 /// Discard the loan without publishing. Equivalent to dropping, but
978 /// explicit (no #[must_use] warning).
979 pub fn discard(mut self) {
980 self.committed = true; // Suppress Drop's "discard" log if any.
981 drop(self);
982 }
983}
984
985#[cfg(feature = "rmw-lending")]
986impl<'a, const TX_BUF: usize> PublishLoan<'a, TX_BUF> {
987 /// Mutable view into the backend-lent bytes.
988 #[allow(clippy::should_implement_trait)]
989 pub fn as_mut(&mut self) -> &mut [u8] {
990 // SAFETY-invariant: `backend_slot` is `Some` for the whole life of
991 // a `PublishLoan` — only `commit`/`discard` take it, and both
992 // consume `self` by value, so no `&mut self` method can observe
993 // `None`.
994 self.backend_slot
995 .as_mut()
996 .expect("PublishLoan slot already consumed")
997 .as_mut()
998 }
999
1000 /// Commit the loan: hand the slot to the backend's `commit_slot` for
1001 /// flushing. The slot's bytes are written to the wire without an
1002 /// extra user-side memcpy.
1003 pub fn commit(mut self) -> Result<(), LoanError> {
1004 use nros_rmw::SlotLending;
1005 // SAFETY-invariant: first and only `take` — `commit` consumes
1006 // `self`, so the slot is still `Some` here.
1007 let slot = self
1008 .backend_slot
1009 .take()
1010 .expect("PublishLoan slot already consumed");
1011 self.committed = true;
1012 self.publisher
1013 .handle
1014 .commit_slot(slot)
1015 .map_err(LoanError::Backend)
1016 }
1017
1018 /// Discard the loan without publishing. The backend-owned slot is
1019 /// released by its own Drop when this `PublishLoan` is dropped.
1020 pub fn discard(mut self) {
1021 self.committed = true;
1022 // backend_slot's Option<Slot> drops here, releasing the slot.
1023 drop(self.backend_slot.take());
1024 }
1025}
1026
1027#[cfg(not(feature = "rmw-lending"))]
1028impl<'a, const TX_BUF: usize> Drop for PublishLoan<'a, TX_BUF> {
1029 fn drop(&mut self) {
1030 // Slot always returned to the free pool; whether the bytes were
1031 // actually published is encoded in `committed`. Future telemetry
1032 // hook could log uncommitted drops in debug builds.
1033 self.publisher.arena.release();
1034 }
1035}
1036
1037// rmw-lending variant relies on the backend's `Slot` Drop impl to release
1038// the underlying buffer/stream slot. No explicit nros-side Drop needed.
1039
1040// ============================================================================
1041// Subscription
1042// ============================================================================
1043
1044/// Typed subscription handle with internal receive buffer.
1045///
1046/// Two methods, both byte-oriented at the wire:
1047///
1048/// - [`try_recv`](Self::try_recv) / [`recv`](Self::recv) — pull bytes
1049/// from the backend, CDR-decode into `M: RosMessage`, hand back
1050/// ownership of the typed message.
1051/// - [`try_recv_raw`](Self::try_recv_raw) — copy bytes into the
1052/// subscription's internal buffer and return the length, leaving CDR
1053/// decoding to the caller.
1054///
1055/// **No typed `borrow()` exists.** Borrow lives exclusively on
1056/// [`RawSubscription`]. `RecvView` is `&[u8]` semantics; CDR decoding
1057/// into a typed `M` requires owning the bytes (or running the decoder
1058/// in place), which the borrow contract doesn't fit. See
1059/// `docs/design/0010-zero-copy-raw-api.md` decision D7.
1060pub struct Subscription<M, const RX_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE }> {
1061 pub(crate) handle: session::RmwSubscriber,
1062 pub(crate) buffer: [u8; RX_BUF],
1063 /// Phase 108 — registered event closures.
1064 pub(crate) event_regs: EventRegs,
1065 pub(crate) _phantom: PhantomData<M>,
1066}
1067
1068impl<M, const RX_BUF: usize> Drop for Subscription<M, RX_BUF> {
1069 fn drop(&mut self) {
1070 drop_event_regs(&mut self.event_regs);
1071 }
1072}
1073
1074impl<M: RosMessage, const RX_BUF: usize> Subscription<M, RX_BUF> {
1075 /// Try to receive a typed message (non-blocking).
1076 pub fn try_recv(&mut self) -> Result<Option<M>, NodeError> {
1077 match self
1078 .handle
1079 .try_recv_raw(&mut self.buffer)
1080 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?
1081 {
1082 Some(len) => {
1083 let mut reader = CdrReader::new_with_header(&self.buffer[..len])
1084 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1085 let msg = M::deserialize(&mut reader)
1086 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1087 Ok(Some(msg))
1088 }
1089 None => Ok(None),
1090 }
1091 }
1092
1093 /// Try to receive raw CDR-encoded data (non-blocking).
1094 pub fn try_recv_raw(&mut self) -> Result<Option<usize>, NodeError> {
1095 self.handle
1096 .try_recv_raw(&mut self.buffer)
1097 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1098 }
1099
1100 /// Get the receive buffer (valid after `try_recv_raw`).
1101 pub fn buffer(&self) -> &[u8] {
1102 &self.buffer
1103 }
1104
1105 // ====================================================================
1106 // Phase 108 — status events
1107 // ====================================================================
1108 //
1109 // Subscriber-side: `LivelinessChanged`, `RequestedDeadlineMissed`,
1110 // `MessageLost`. Returns
1111 // `NodeError::Transport(TransportError::Unsupported)` if the
1112 // active backend doesn't generate the event for this entity.
1113
1114 /// `true` if the active backend can fire the named event for this
1115 /// subscriber.
1116 #[cfg(feature = "alloc")]
1117 pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
1118 use nros_rmw::Subscriber as _;
1119 self.handle.supports_event(kind)
1120 }
1121
1122 /// Register a callback for `LivelinessChanged`. Fires when a
1123 /// tracked publisher's liveliness state changes.
1124 #[cfg(feature = "alloc")]
1125 pub fn on_liveliness_changed<F>(&mut self, cb: F) -> Result<(), NodeError>
1126 where
1127 F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
1128 {
1129 register_sub_event_liveliness::<F>(&mut self.handle, &mut self.event_regs, cb)
1130 }
1131
1132 /// Register a callback for `RequestedDeadlineMissed`. Fires when
1133 /// an expected sample doesn't arrive within `deadline`.
1134 #[cfg(feature = "alloc")]
1135 pub fn on_requested_deadline_missed<F>(
1136 &mut self,
1137 deadline: core::time::Duration,
1138 cb: F,
1139 ) -> Result<(), NodeError>
1140 where
1141 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1142 {
1143 register_sub_event_count::<F, _>(
1144 &mut self.handle,
1145 &mut self.event_regs,
1146 nros_rmw::EventKind::RequestedDeadlineMissed,
1147 deadline.as_millis().min(u32::MAX as u128) as u32,
1148 cb,
1149 |payload, f| {
1150 if let nros_rmw::EventPayload::RequestedDeadlineMissed(s) = payload {
1151 f(*s);
1152 }
1153 },
1154 )
1155 }
1156
1157 /// Register a callback for `MessageLost`. Fires when the backend
1158 /// drops a sample (overflow, etc.).
1159 #[cfg(feature = "alloc")]
1160 pub fn on_message_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
1161 where
1162 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1163 {
1164 register_sub_event_count::<F, _>(
1165 &mut self.handle,
1166 &mut self.event_regs,
1167 nros_rmw::EventKind::MessageLost,
1168 0,
1169 cb,
1170 |payload, f| {
1171 if let nros_rmw::EventPayload::MessageLost(s) = payload {
1172 f(*s);
1173 }
1174 },
1175 )
1176 }
1177
1178 /// Check if data is available without consuming it.
1179 pub fn has_data(&self) -> bool {
1180 self.handle.has_data()
1181 }
1182
1183 /// Process the received message in-place without copying.
1184 pub fn process_in_place(&mut self, f: impl FnOnce(&M)) -> Result<bool, NodeError> {
1185 let mut deser_err = false;
1186 let processed = self
1187 .handle
1188 .process_raw_in_place(|raw| {
1189 match CdrReader::new_with_header(raw).and_then(|mut r| M::deserialize(&mut r)) {
1190 Ok(msg) => f(&msg),
1191 Err(_) => deser_err = true,
1192 }
1193 })
1194 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
1195
1196 if deser_err {
1197 return Err(NodeError::Transport(TransportError::DeserializationError));
1198 }
1199 Ok(processed)
1200 }
1201
1202 /// Async: wait for the next message (no `futures` dependency needed).
1203 ///
1204 /// Requires a background task running `executor.spin_async()` to drive
1205 /// I/O. Returns `Ok(msg)` on the next received message, or `Err` if the
1206 /// transport reports an error.
1207 ///
1208 /// When the `stream` feature is enabled, prefer `StreamExt::next()` /
1209 /// `TryStreamExt::try_next()` for combinator support.
1210 ///
1211 /// # Example
1212 ///
1213 /// ```ignore
1214 /// let mut sub = node.create_subscription::<Int32>("/topic")?;
1215 /// loop {
1216 /// let msg = sub.recv().await?;
1217 /// /* handle msg */
1218 /// }
1219 /// ```
1220 pub async fn recv(&mut self) -> Result<M, NodeError> {
1221 core::future::poll_fn(|cx| {
1222 // Register the waker FIRST, then check for data. This ordering
1223 // closes the race window where a subscriber callback fires
1224 // between `try_recv` returning `None` and the waker being
1225 // registered — the wake would otherwise be delivered to the
1226 // previous waker (or nowhere) and the task would hang.
1227 self.handle.register_waker(cx.waker());
1228 match self.try_recv() {
1229 Ok(Some(msg)) => core::task::Poll::Ready(Ok(msg)),
1230 Ok(None) => core::task::Poll::Pending,
1231 Err(e) => core::task::Poll::Ready(Err(e)),
1232 }
1233 })
1234 .await
1235 }
1236
1237 /// Sync: wait for the next message, spinning the executor.
1238 ///
1239 /// Returns `Ok(Some(msg))` if a message arrives within `timeout_ms`,
1240 /// or `Ok(None)` on timeout. Unlike [`Promise::wait()`], timeout is
1241 /// not an error — the caller typically retries in a loop.
1242 ///
1243 /// # Example
1244 ///
1245 /// ```ignore
1246 /// while let Some(msg) = sub.wait_next(&mut executor, core::time::Duration::from_millis(1000))? {
1247 /// /* handle msg */
1248 /// }
1249 /// ```
1250 pub fn wait_next(
1251 &mut self,
1252 executor: &mut super::Executor,
1253 timeout: core::time::Duration,
1254 ) -> Result<Option<M>, NodeError> {
1255 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
1256 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
1257 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
1258 let mut budget = WaitBudget::new(max_spins, timeout);
1259 loop {
1260 executor.spin_once(spin_interval);
1261 if let Some(msg) = self.try_recv()? {
1262 return Ok(Some(msg));
1263 }
1264 if !budget.tick() {
1265 return Ok(None);
1266 }
1267 }
1268 }
1269}
1270
1271// ============================================================================
1272// RawSubscription — typeless subscription for non-ROS message wire formats
1273// ============================================================================
1274
1275/// Typeless subscription handle. Counterpart of [`EmbeddedRawPublisher`].
1276///
1277/// The user owns the decoding step: call [`try_recv_raw`](Self::try_recv_raw)
1278/// to fill an internal buffer with bytes whose format depends on the active
1279/// RMW backend, then interpret them however is appropriate (memcpy, custom
1280/// parser, …).
1281pub struct RawSubscription<const RX_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE }> {
1282 pub(crate) handle: session::RmwSubscriber,
1283 pub(crate) buffer: [u8; RX_BUF],
1284 /// Phase 108 — registered event closures.
1285 pub(crate) event_regs: EventRegs,
1286}
1287
1288impl<const RX_BUF: usize> Drop for RawSubscription<RX_BUF> {
1289 fn drop(&mut self) {
1290 drop_event_regs(&mut self.event_regs);
1291 }
1292}
1293
1294impl<const RX_BUF: usize> RawSubscription<RX_BUF> {
1295 /// Construct a [`RawSubscription`] from a backend-allocated
1296 /// `RmwSubscriber` handle. Public so external extension crates
1297 /// (e.g. `nros-px4` for typed uORB wrappers) can wrap a handle
1298 /// they obtained directly from the active session via
1299 /// [`crate::Node::session_mut`] + a backend-specific create method.
1300 ///
1301 /// Most users should not call this — use
1302 /// [`crate::Node::create_subscription`] or
1303 /// [`crate::Node::create_subscription_raw`] instead.
1304 pub fn new(handle: session::RmwSubscriber) -> Self {
1305 Self {
1306 handle,
1307 buffer: [0u8; RX_BUF],
1308 event_regs: empty_event_regs(),
1309 }
1310 }
1311
1312 /// Try to receive raw bytes (non-blocking). Returns `Ok(Some(len))`
1313 /// with the message length on success; the bytes live in
1314 /// [`buffer`](Self::buffer) until the next call.
1315 pub fn try_recv_raw(&mut self) -> Result<Option<usize>, NodeError> {
1316 self.handle
1317 .try_recv_raw(&mut self.buffer)
1318 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1319 }
1320
1321 /// Phase 128.F.4 — raw receive that also surfaces the incoming
1322 /// sample's wire-level attachment block.
1323 ///
1324 /// Returns `Ok(Some((payload_len, attachment_len)))`. The payload
1325 /// lives in [`buffer`](Self::buffer); the attachment is written
1326 /// into caller-supplied `att_buf`. `attachment_len == 0` means
1327 /// the incoming sample carried no attachment.
1328 ///
1329 /// Backends without native attachment support delegate to
1330 /// [`try_recv_raw`](Self::try_recv_raw) and always report
1331 /// `attachment_len == 0` (default `Subscriber` trait body in
1332 /// `nros-rmw`).
1333 pub fn try_recv_raw_with_attachment(
1334 &mut self,
1335 att_buf: &mut [u8],
1336 ) -> Result<Option<(usize, usize)>, NodeError> {
1337 self.handle
1338 .try_recv_raw_with_attachment(&mut self.buffer, att_buf)
1339 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1340 }
1341
1342 /// Phase 252 / issue 0073 — raw receive that also returns the E2E
1343 /// [`IntegrityStatus`](nros_rmw::IntegrityStatus) (CRC + sequence gap/dup) for
1344 /// the C/C++ `nros_subscription_try_recv_validated` path. The validator lives
1345 /// in the backend handle (`try_recv_validated`), so no typed message is needed;
1346 /// the payload lives in [`buffer`](Self::buffer). `crc_valid == None` when the
1347 /// wire sample carried no CRC (e.g. a publisher built without `safety-e2e`).
1348 #[cfg(feature = "safety-e2e")]
1349 pub fn try_recv_validated(
1350 &mut self,
1351 ) -> Result<Option<(usize, nros_rmw::IntegrityStatus)>, NodeError> {
1352 use nros_rmw::Subscriber;
1353 self.handle
1354 .try_recv_validated(&mut self.buffer)
1355 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))
1356 }
1357
1358 /// Phase 124.D.1 — burst-take. Drain up to `max_msgs` queued
1359 /// samples into the caller-supplied contiguous block in one
1360 /// call, with the i-th sample at
1361 /// `buf[i * per_msg_cap .. i * per_msg_cap + out_lens[i]]`.
1362 /// Returns the number of messages delivered.
1363 ///
1364 /// Backends without a native batch take inherit the
1365 /// `Subscriber::try_recv_sequence` default body which loop-drives
1366 /// `try_recv_raw` — same shape, same observable result; the
1367 /// batched API just lets sensor loops commit to the call shape
1368 /// regardless of backend support.
1369 pub fn try_recv_sequence(
1370 &mut self,
1371 buf: &mut [u8],
1372 per_msg_cap: usize,
1373 max_msgs: usize,
1374 out_lens: &mut [usize],
1375 ) -> Result<usize, NodeError> {
1376 use nros_rmw::Subscriber;
1377 self.handle
1378 .try_recv_sequence(buf, per_msg_cap, max_msgs, out_lens)
1379 .map_err(NodeError::Transport)
1380 }
1381
1382 /// Phase 122.3.c.6.e — register a `Waker` that fires when a new
1383 /// message arrives. Mirror of the existing service-server /
1384 /// service-client wake plumbing. No-op on backends that don't
1385 /// support waking — caller falls back to polling.
1386 pub fn register_waker(&self, waker: &core::task::Waker) {
1387 use nros_rmw::Subscriber;
1388 self.handle.register_waker(waker);
1389 }
1390
1391 /// Phase 108.A — `true` if the active backend can fire the named
1392 /// event for this raw subscription.
1393 #[cfg(feature = "alloc")]
1394 pub fn supports_event(&self, kind: nros_rmw::EventKind) -> bool {
1395 use nros_rmw::Subscriber as _;
1396 self.handle.supports_event(kind)
1397 }
1398
1399 /// Phase 108.A — register a callback for `LivelinessChanged`.
1400 #[cfg(feature = "alloc")]
1401 pub fn on_liveliness_changed<F>(&mut self, cb: F) -> Result<(), NodeError>
1402 where
1403 F: FnMut(nros_rmw::LivelinessChangedStatus) + Send + 'static,
1404 {
1405 register_sub_event_liveliness::<F>(&mut self.handle, &mut self.event_regs, cb)
1406 }
1407
1408 /// Phase 108.A — register a callback for `RequestedDeadlineMissed`.
1409 #[cfg(feature = "alloc")]
1410 pub fn on_requested_deadline_missed<F>(
1411 &mut self,
1412 deadline: core::time::Duration,
1413 cb: F,
1414 ) -> Result<(), NodeError>
1415 where
1416 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1417 {
1418 register_sub_event_count::<F, _>(
1419 &mut self.handle,
1420 &mut self.event_regs,
1421 nros_rmw::EventKind::RequestedDeadlineMissed,
1422 deadline.as_millis().min(u32::MAX as u128) as u32,
1423 cb,
1424 |payload, f| {
1425 if let nros_rmw::EventPayload::RequestedDeadlineMissed(s) = payload {
1426 f(*s);
1427 }
1428 },
1429 )
1430 }
1431
1432 /// Phase 108.A — register a callback for `MessageLost`.
1433 #[cfg(feature = "alloc")]
1434 pub fn on_message_lost<F>(&mut self, cb: F) -> Result<(), NodeError>
1435 where
1436 F: FnMut(nros_rmw::CountStatus) + Send + 'static,
1437 {
1438 register_sub_event_count::<F, _>(
1439 &mut self.handle,
1440 &mut self.event_regs,
1441 nros_rmw::EventKind::MessageLost,
1442 0,
1443 cb,
1444 |payload, f| {
1445 if let nros_rmw::EventPayload::MessageLost(s) = payload {
1446 f(*s);
1447 }
1448 },
1449 )
1450 }
1451
1452 /// Get the receive buffer (valid after [`try_recv_raw`](Self::try_recv_raw)).
1453 pub fn buffer(&self) -> &[u8] {
1454 &self.buffer
1455 }
1456
1457 /// Check if data is available without consuming it.
1458 pub fn has_data(&self) -> bool {
1459 self.handle.has_data()
1460 }
1461
1462 /// Try to borrow the next available message in place. Returns
1463 /// `Ok(None)` if no message is ready; never blocks.
1464 ///
1465 /// The returned [`RecvView`] borrows the subscriber's internal
1466 /// receive buffer. Lifetime is tied to `&mut self` — only one view
1467 /// can be live at a time, and the next `try_borrow` / `try_recv_raw`
1468 /// call invalidates the previous view's bytes.
1469 ///
1470 /// View is `!Send + !Sync` to discourage holding it across `.await`
1471 /// or thread boundaries (would block subsequent receives on the
1472 /// same subscriber).
1473 #[cfg(not(feature = "rmw-lending"))]
1474 pub fn try_borrow(&mut self) -> Result<Option<RecvView<'_>>, NodeError> {
1475 match self.try_recv_raw()? {
1476 Some(len) => Ok(Some(RecvView {
1477 bytes: &self.buffer[..len],
1478 _marker: core::marker::PhantomData,
1479 })),
1480 None => Ok(None),
1481 }
1482 }
1483
1484 /// `rmw-lending` variant — dispatches to the backend's
1485 /// [`SlotBorrowing::try_borrow`](nros_rmw::SlotBorrowing::try_borrow)
1486 /// for true zero-copy receive (zenoh-pico's static buffer borrowed
1487 /// directly via `z_bytes_get_contiguous_view`, XRCE's slot borrowed
1488 /// in place). The bytes never touch `self.buffer`.
1489 #[cfg(feature = "rmw-lending")]
1490 pub fn try_borrow(&mut self) -> Result<Option<RecvView<'_>>, NodeError> {
1491 use nros_rmw::SlotBorrowing;
1492 match self.handle.try_borrow() {
1493 Ok(Some(view)) => Ok(Some(RecvView {
1494 view: Some(view),
1495 _marker: core::marker::PhantomData,
1496 })),
1497 Ok(None) => Ok(None),
1498 Err(e) => Err(NodeError::Transport(e)),
1499 }
1500 }
1501
1502 /// Async-await on the next message, returning a [`RecvView`].
1503 /// Mirrors the `Subscription::recv` pattern but typeless.
1504 ///
1505 /// Backend wake source: `Subscriber::register_waker`. Same race-
1506 /// safe register-then-check ordering as `Subscription::recv`.
1507 pub async fn borrow(&mut self) -> Result<RecvView<'_>, NodeError> {
1508 // Wait for `has_data` to flip true via the backend's
1509 // AtomicWaker, *without* holding any borrow that `try_borrow`
1510 // would need afterwards. Borrow `&self.handle` immutably
1511 // inside poll_fn so the borrow checker can prove `&mut self`
1512 // is free by the time we return Ok(view) below.
1513 //
1514 // Phase 99.H' cancellation safety: there is no reservation
1515 // taken inside poll. Dropping the future before it resolves
1516 // simply abandons whatever waker registration the backend
1517 // accepted; the next call to `borrow().await` (or
1518 // `try_borrow`) re-registers. No leaked state.
1519 {
1520 let handle = &self.handle;
1521 core::future::poll_fn(|cx| {
1522 // Register-then-check: closes the race where a backend
1523 // callback fires between has_data returning false and
1524 // the waker landing.
1525 handle.register_waker(cx.waker());
1526 if handle.has_data() {
1527 core::task::Poll::Ready(())
1528 } else {
1529 core::task::Poll::Pending
1530 }
1531 })
1532 .await;
1533 }
1534 // has_data was true at some point; in the single-threaded
1535 // executor there's no other reader, so try_borrow returns Some.
1536 // A spurious wake (very unlikely on shipping backends) returns
1537 // WouldBlock and the caller can retry.
1538 match self.try_borrow()? {
1539 Some(view) => Ok(view),
1540 None => Err(NodeError::Transport(TransportError::WouldBlock)),
1541 }
1542 }
1543
1544 /// Sync blocking borrow with timeout. Spins the executor until a
1545 /// message is available or `timeout` elapses.
1546 ///
1547 /// Returns `Ok(Some(view))` on success, `Ok(None)` on timeout.
1548 /// The view's lifetime is tied to `&mut self`.
1549 pub fn borrow_with_timeout(
1550 &mut self,
1551 executor: &mut super::Executor,
1552 timeout: core::time::Duration,
1553 ) -> Result<Option<RecvView<'_>>, NodeError> {
1554 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
1555 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
1556 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
1557 let mut budget = WaitBudget::new(max_spins, timeout);
1558 loop {
1559 executor.spin_once(spin_interval);
1560 if self.has_data() {
1561 return self.try_borrow();
1562 }
1563 if !budget.tick() {
1564 return Ok(None);
1565 }
1566 }
1567 }
1568}
1569
1570// ============================================================================
1571// RawServiceServer / RawServiceClient (Phase 122.3.c — L1 polling, typeless)
1572// ============================================================================
1573
1574/// Typeless service-server handle. L1 counterpart of
1575/// [`EmbeddedServiceServer`] for callers that own their own scheduler
1576/// (RTIC, embassy, FreeRTOS-task-per-entity) and the C / C++ FFI
1577/// shims.
1578///
1579/// Holds the transport handle plus an inline request buffer. The
1580/// caller polls [`try_recv_request_raw`](Self::try_recv_request_raw)
1581/// and sends replies via [`send_reply_raw`](Self::send_reply_raw)
1582/// with raw CDR bytes.
1583pub struct RawServiceServer<
1584 const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1585 const RESP_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1586> {
1587 pub(crate) handle: session::RmwServiceServer,
1588 pub(crate) req_buffer: [u8; REQ_BUF],
1589 pub(crate) _phantom_resp: PhantomData<[u8; RESP_BUF]>,
1590}
1591
1592impl<const REQ_BUF: usize, const RESP_BUF: usize> RawServiceServer<REQ_BUF, RESP_BUF> {
1593 /// Phase 122.3.c.6.e — register a `Waker` that fires when a new
1594 /// request arrives. Mirror of the existing subscriber /
1595 /// service-client wake plumbing. No-op on backends that don't
1596 /// support waking — caller falls back to polling.
1597 pub fn register_waker(&self, waker: &core::task::Waker) {
1598 use nros_rmw::ServiceServerTrait;
1599 self.handle.register_waker(waker);
1600 }
1601
1602 /// Construct a [`RawServiceServer`] from a backend-allocated
1603 /// `RmwServiceServer` handle. Public so external crates and the
1604 /// C / C++ FFI shims can wrap a handle obtained directly from
1605 /// [`crate::Node::session_mut`].
1606 pub fn new(handle: session::RmwServiceServer) -> Self {
1607 Self {
1608 handle,
1609 req_buffer: [0u8; REQ_BUF],
1610 _phantom_resp: PhantomData,
1611 }
1612 }
1613
1614 /// Try to receive a service request (non-blocking).
1615 ///
1616 /// Returns `Ok(Some((len, sequence_number)))` when a request is
1617 /// available — the raw CDR bytes live in
1618 /// [`req_buffer`](Self::req_buffer) at `&req_buffer()[..len]`
1619 /// until the next call. The sequence number is required by
1620 /// [`send_reply_raw`](Self::send_reply_raw).
1621 pub fn try_recv_request_raw(&mut self) -> Result<Option<(usize, i64)>, NodeError> {
1622 match self.handle.try_recv_request(&mut self.req_buffer) {
1623 Ok(Some(req)) => Ok(Some((req.data.len(), req.sequence_number))),
1624 Ok(None) => Ok(None),
1625 Err(_) => Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
1626 }
1627 }
1628
1629 /// Borrow the inline request buffer. Valid after a successful
1630 /// [`try_recv_request_raw`](Self::try_recv_request_raw) call.
1631 pub fn req_buffer(&self) -> &[u8] {
1632 &self.req_buffer
1633 }
1634
1635 /// Send a reply with raw CDR bytes. `sequence_number` must match
1636 /// the value returned by the most recent
1637 /// [`try_recv_request_raw`](Self::try_recv_request_raw).
1638 pub fn send_reply_raw(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), NodeError> {
1639 self.handle
1640 .send_reply(sequence_number, data)
1641 .map_err(|_| NodeError::ServiceReplyFailed)
1642 }
1643}
1644
1645/// Typeless service-client handle. L1 counterpart of
1646/// [`EmbeddedServiceClient`] for the same audience as
1647/// [`RawServiceServer`].
1648pub struct RawServiceClient<
1649 const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1650 const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1651> {
1652 pub(crate) handle: session::RmwServiceClient,
1653 pub(crate) reply_buffer: [u8; REPLY_BUF],
1654 pub(crate) _phantom_req: PhantomData<[u8; REQ_BUF]>,
1655}
1656
1657impl<const REQ_BUF: usize, const REPLY_BUF: usize> RawServiceClient<REQ_BUF, REPLY_BUF> {
1658 /// Phase 122.3.c.6.e — register a `Waker` that fires when the
1659 /// reply to a previously-sent request lands.
1660 pub fn register_waker(&self, waker: &core::task::Waker) {
1661 use nros_rmw::ServiceClientTrait;
1662 self.handle.register_waker(waker);
1663 }
1664
1665 /// Construct from a backend-allocated handle. Same audience as
1666 /// [`RawServiceServer::new`].
1667 pub fn new(handle: session::RmwServiceClient) -> Self {
1668 Self {
1669 handle,
1670 reply_buffer: [0u8; REPLY_BUF],
1671 _phantom_req: PhantomData,
1672 }
1673 }
1674
1675 /// Send a raw CDR request. Non-blocking; the reply arrives via
1676 /// [`try_recv_reply_raw`](Self::try_recv_reply_raw).
1677 pub fn send_request_raw(&mut self, request: &[u8]) -> Result<(), NodeError> {
1678 self.handle
1679 .send_request_raw(request)
1680 .map_err(|_| NodeError::ServiceRequestFailed)
1681 }
1682
1683 /// Phase 124.G.3 — graph-aware "is the matching server up?"
1684 /// probe. Mirrors [`Client::server_available`] for the raw API.
1685 pub fn server_available(&self) -> Result<bool, NodeError> {
1686 use nros_rmw::ServiceClientTrait;
1687 self.handle.server_available().map_err(NodeError::Transport)
1688 }
1689
1690 /// Try to receive a reply (non-blocking). Returns
1691 /// `Ok(Some(len))` with the reply length on success; bytes
1692 /// live in [`reply_buffer`](Self::reply_buffer) until the next
1693 /// call.
1694 pub fn try_recv_reply_raw(&mut self) -> Result<Option<usize>, NodeError> {
1695 self.handle
1696 .try_recv_reply_raw(&mut self.reply_buffer)
1697 .map_err(|_| NodeError::Transport(TransportError::ServiceRequestFailed))
1698 }
1699
1700 /// Borrow the inline reply buffer. Valid after a successful
1701 /// [`try_recv_reply_raw`](Self::try_recv_reply_raw) call.
1702 pub fn reply_buffer(&self) -> &[u8] {
1703 &self.reply_buffer
1704 }
1705}
1706
1707/// Read-only view into a [`RawSubscription`]'s receive buffer.
1708///
1709/// `!Send + !Sync`: cannot cross `.await` or threads. Drop releases
1710/// any backend lock + lets the next message advance.
1711///
1712/// Two backings, selected at compile time by the `rmw-lending` feature:
1713/// the no-lending variant points at `RawSubscription::buffer` (filled by
1714/// `try_recv_raw`'s memcpy); the lending variant holds the backend's
1715/// own [`SlotBorrowing::View`](nros_rmw::SlotBorrowing::View) — zero
1716/// copies on the receive path, with the backend's Drop taking care of
1717/// releasing the buffer lock.
1718#[cfg(not(feature = "rmw-lending"))]
1719pub struct RecvView<'a> {
1720 bytes: &'a [u8],
1721 _marker: core::marker::PhantomData<*const ()>,
1722}
1723
1724#[cfg(feature = "rmw-lending")]
1725pub struct RecvView<'a> {
1726 /// `Option` for symmetry with `PublishLoan::backend_slot`. Always
1727 /// `Some(_)` until the view is dropped.
1728 view: Option<<session::RmwSubscriber as nros_rmw::SlotBorrowing>::View<'a>>,
1729 _marker: core::marker::PhantomData<*const ()>,
1730}
1731
1732#[cfg(not(feature = "rmw-lending"))]
1733impl<'a> core::ops::Deref for RecvView<'a> {
1734 type Target = [u8];
1735 fn deref(&self) -> &[u8] {
1736 self.bytes
1737 }
1738}
1739
1740#[cfg(feature = "rmw-lending")]
1741impl<'a> core::ops::Deref for RecvView<'a> {
1742 type Target = [u8];
1743 fn deref(&self) -> &[u8] {
1744 // SAFETY-invariant: `view` is `Some` for the whole life of a
1745 // `RecvView` — only `Drop` takes it, after which no method (incl.
1746 // this `deref`) is reachable.
1747 self.view
1748 .as_ref()
1749 .expect("RecvView accessed after drop")
1750 .as_ref()
1751 }
1752}
1753
1754#[cfg(not(feature = "rmw-lending"))]
1755impl<'a> AsRef<[u8]> for RecvView<'a> {
1756 fn as_ref(&self) -> &[u8] {
1757 self.bytes
1758 }
1759}
1760
1761#[cfg(feature = "rmw-lending")]
1762impl<'a> AsRef<[u8]> for RecvView<'a> {
1763 fn as_ref(&self) -> &[u8] {
1764 // SAFETY-invariant: `view` is `Some` until `Drop`; see the `Deref`
1765 // impl above.
1766 self.view
1767 .as_ref()
1768 .expect("RecvView accessed after drop")
1769 .as_ref()
1770 }
1771}
1772
1773#[cfg(feature = "stream")]
1774impl<M: RosMessage + Unpin, const RX_BUF: usize> futures_core::Stream for Subscription<M, RX_BUF> {
1775 type Item = Result<M, NodeError>;
1776
1777 fn poll_next(
1778 self: core::pin::Pin<&mut Self>,
1779 cx: &mut core::task::Context<'_>,
1780 ) -> core::task::Poll<Option<Self::Item>> {
1781 let this = self.get_mut();
1782 // Register-then-check: see Subscription::recv for rationale.
1783 this.handle.register_waker(cx.waker());
1784 match this.try_recv() {
1785 Ok(Some(msg)) => core::task::Poll::Ready(Some(Ok(msg))),
1786 Ok(None) => core::task::Poll::Pending,
1787 Err(e) => core::task::Poll::Ready(Some(Err(e))),
1788 }
1789 }
1790}
1791
1792// ============================================================================
1793// EmbeddedServiceServer
1794// ============================================================================
1795
1796/// Typed service server handle with internal buffers.
1797pub struct EmbeddedServiceServer<
1798 Svc: RosService,
1799 const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1800 const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1801> {
1802 pub(crate) handle: session::RmwServiceServer,
1803 pub(crate) req_buffer: [u8; REQ_BUF],
1804 pub(crate) reply_buffer: [u8; REPLY_BUF],
1805 pub(crate) _phantom: PhantomData<Svc>,
1806}
1807
1808impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1809 EmbeddedServiceServer<Svc, REQ_BUF, REPLY_BUF>
1810{
1811 /// Handle an incoming service request.
1812 ///
1813 /// Returns `Ok(true)` if a request was handled, `Ok(false)` if none available.
1814 pub fn handle_request(
1815 &mut self,
1816 handler: impl FnOnce(&Svc::Request) -> Svc::Reply,
1817 ) -> Result<bool, NodeError> {
1818 self.handle
1819 .handle_request::<Svc>(&mut self.req_buffer, &mut self.reply_buffer, handler)
1820 .map_err(|_| NodeError::ServiceReplyFailed)
1821 }
1822
1823 /// Handle a request with a heap-allocated reply (for large response types).
1824 ///
1825 /// Used by parameter services and lifecycle services (large response structs
1826 /// that overflow the stack). Returns `Ok(true)` if a request was handled,
1827 /// `Ok(false)` if none available.
1828 #[cfg(any(feature = "param-services", feature = "lifecycle-services"))]
1829 pub fn handle_request_boxed(
1830 &mut self,
1831 handler: impl FnOnce(&Svc::Request) -> alloc::boxed::Box<Svc::Reply>,
1832 ) -> Result<bool, NodeError> {
1833 self.handle
1834 .handle_request_boxed::<Svc>(&mut self.req_buffer, &mut self.reply_buffer, handler)
1835 .map_err(|_| NodeError::ServiceReplyFailed)
1836 }
1837
1838 /// Check if a request is available.
1839 pub fn has_request(&self) -> bool {
1840 self.handle.has_request()
1841 }
1842}
1843
1844// ============================================================================
1845// ServiceClientCallback (RFC-0041, Phase 239.1)
1846// ============================================================================
1847
1848/// Send handle for a **callback-based** typed service client.
1849///
1850/// Returned by `create_client_with_callback`: the reply is delivered to the
1851/// registered closure at `spin_once` (no `Promise` poll). This handle only
1852/// **sends** — it holds a `*mut` to the arena entry's
1853/// [`ServiceClientSendHeader`](super::arena::ServiceClientSendHeader) (pinned in
1854/// the executor arena, like a guard-condition flag), so a single outstanding
1855/// request is gated by `hdr.pending`.
1856///
1857/// # Safety / lifetime
1858/// Valid only while the owning executor lives (the arena backs the header). Do
1859/// not use after the executor is dropped.
1860pub struct ServiceClientCallback<
1861 Svc: RosService,
1862 const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1863 const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1864> {
1865 hdr: *mut super::arena::ServiceClientSendHeader<REPLY_BUF>,
1866 _phantom: PhantomData<Svc>,
1867}
1868
1869impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1870 ServiceClientCallback<Svc, REQ_BUF, REPLY_BUF>
1871{
1872 /// Wrap an arena-resident send header. `hdr` must point at a live
1873 /// `ServiceClientCallbackEntry`'s header for the executor's lifetime.
1874 pub(crate) fn new(hdr: *mut super::arena::ServiceClientSendHeader<REPLY_BUF>) -> Self {
1875 Self {
1876 hdr,
1877 _phantom: PhantomData,
1878 }
1879 }
1880
1881 /// Send a typed request. The reply is delivered to the registered callback
1882 /// at a later `spin_once`. Returns `RequestInFlight` if a prior request has
1883 /// not yet been answered (single outstanding request).
1884 pub fn call(&mut self, request: &Svc::Request) -> Result<(), NodeError> {
1885 let hdr = unsafe { &mut *self.hdr };
1886 if hdr.pending {
1887 return Err(NodeError::RequestInFlight);
1888 }
1889 let mut buf = [0u8; REQ_BUF];
1890 let mut writer =
1891 CdrWriter::new_with_header(&mut buf).map_err(|_| NodeError::BufferTooSmall)?;
1892 request
1893 .serialize(&mut writer)
1894 .map_err(|_| NodeError::Serialization)?;
1895 let req_len = writer.position();
1896 hdr.handle
1897 .send_request_raw(&buf[..req_len])
1898 .map_err(|_| NodeError::ServiceRequestFailed)?;
1899 hdr.pending = true;
1900 Ok(())
1901 }
1902}
1903
1904// ============================================================================
1905// ActionClientCallback (RFC-0041, Phase 239.2)
1906// ============================================================================
1907
1908/// Send handle for a **callback-based** typed action client.
1909///
1910/// Returned by `create_action_client_with_callbacks`: goal-response, feedback,
1911/// and result are delivered to the registered closures at `spin_once` (no
1912/// `Promise` poll). This handle only **sends** — it holds a `*mut` to the arena
1913/// entry's [`ActionClientCore`](super::action_core::ActionClientCore) (offset 0,
1914/// pinned in the executor arena, like a guard-condition flag).
1915///
1916/// # Safety / lifetime
1917/// Valid only while the owning executor lives.
1918pub struct ActionClientCallback<
1919 A: RosAction,
1920 const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1921 const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1922 const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1923> {
1924 core: *mut super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
1925 _phantom: PhantomData<A>,
1926}
1927
1928impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
1929 ActionClientCallback<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
1930{
1931 /// Wrap an arena-resident core. `core` must point at a live
1932 /// `ActionClientCallbackEntry`'s core for the executor's lifetime.
1933 pub(crate) fn new(
1934 core: *mut super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
1935 ) -> Self {
1936 Self {
1937 core,
1938 _phantom: PhantomData,
1939 }
1940 }
1941
1942 /// Send a typed goal. Returns its `GoalId`; acceptance arrives via the
1943 /// registered goal-response callback.
1944 pub fn send_goal(&mut self, goal: &A::Goal) -> Result<nros_core::GoalId, NodeError> {
1945 let core = unsafe { &mut *self.core };
1946 let mut buf = [0u8; GOAL_BUF];
1947 let mut writer =
1948 CdrWriter::new_with_header(&mut buf).map_err(|_| NodeError::BufferTooSmall)?;
1949 goal.serialize(&mut writer)
1950 .map_err(|_| NodeError::Serialization)?;
1951 let len = writer.position();
1952 core.send_goal_raw(&buf[..len])
1953 }
1954
1955 /// Request the result for `goal_id`; the result arrives via the registered
1956 /// result callback.
1957 pub fn get_result(&mut self, goal_id: &nros_core::GoalId) -> Result<(), NodeError> {
1958 let core = unsafe { &mut *self.core };
1959 core.send_get_result_request(goal_id)
1960 }
1961}
1962
1963// ============================================================================
1964// EmbeddedServiceClient
1965// ============================================================================
1966
1967/// Typed service client handle with internal buffers.
1968pub struct EmbeddedServiceClient<
1969 Svc: RosService,
1970 const REQ_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1971 const REPLY_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1972> {
1973 pub(crate) handle: session::RmwServiceClient,
1974 pub(crate) req_buffer: [u8; REQ_BUF],
1975 pub(crate) reply_buffer: [u8; REPLY_BUF],
1976 /// Phase 84.D3: set after a successful `send_request`, cleared on a
1977 /// successful `Promise::try_recv`. Guards against "drop Promise
1978 /// without awaiting, then `call()` again" which would otherwise
1979 /// deliver the stale reply to the new caller.
1980 pub(crate) in_flight: bool,
1981 pub(crate) _phantom: PhantomData<Svc>,
1982}
1983
1984impl<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>
1985 EmbeddedServiceClient<Svc, REQ_BUF, REPLY_BUF>
1986{
1987 /// Call the service (non-blocking). Returns a [`Promise`] that can be polled.
1988 ///
1989 /// Use with `Executor::spin_once()` to drive I/O while waiting:
1990 ///
1991 /// ```ignore
1992 /// let mut promise = client.call(&request)?;
1993 /// loop {
1994 /// executor.spin_once(core::time::Duration::from_millis(10));
1995 /// if let Some(reply) = promise.try_recv()? {
1996 /// break;
1997 /// }
1998 /// }
1999 /// ```
2000 ///
2001 /// # Errors
2002 ///
2003 /// Returns [`NodeError::RequestInFlight`] if a previous call's reply
2004 /// has not been received. This prevents the old hazard where dropping
2005 /// a [`Promise`] without awaiting its reply left the stale reply
2006 /// queued to land on the next [`call`](Self::call). Resolve by
2007 /// polling the existing promise to completion or calling
2008 /// [`reset_in_flight`](Self::reset_in_flight).
2009 pub fn call(&mut self, request: &Svc::Request) -> Result<Promise<'_, Svc::Reply>, NodeError> {
2010 if self.in_flight {
2011 return Err(NodeError::RequestInFlight);
2012 }
2013
2014 // Serialize request into req_buffer
2015 let mut writer = CdrWriter::new_with_header(&mut self.req_buffer)
2016 .map_err(|_| NodeError::BufferTooSmall)?;
2017 request
2018 .serialize(&mut writer)
2019 .map_err(|_| NodeError::Serialization)?;
2020 let req_len = writer.position();
2021
2022 // Send the request (non-blocking)
2023 self.handle
2024 .send_request_raw(&self.req_buffer[..req_len])
2025 .map_err(|_| NodeError::ServiceRequestFailed)?;
2026
2027 self.in_flight = true;
2028
2029 Ok(Promise {
2030 handle: &mut self.handle,
2031 reply_buffer: &mut self.reply_buffer,
2032 parse: cdr_deserialize_reply::<Svc>,
2033 in_flight_flag: &mut self.in_flight,
2034 })
2035 }
2036
2037 /// Explicitly clear the in-flight flag (Phase 84.D3).
2038 ///
2039 /// Call this if a previous [`Promise`] was dropped without completing
2040 /// and you want to abandon the pending reply. The next
2041 /// [`call`](Self::call) will proceed but may still observe the stale
2042 /// reply if one is in the transport's queue — callers that need strict
2043 /// correctness should drain / ignore one extra `try_recv` first.
2044 pub fn reset_in_flight(&mut self) {
2045 self.in_flight = false;
2046 }
2047
2048 /// Block until at least one matching service server is discoverable on
2049 /// the network, or `timeout` elapses.
2050 ///
2051 /// Returns `Ok(true)` if a matching server reported back inside the
2052 /// budget; `Ok(false)` on timeout (no server visible). Mirrors
2053 /// `rclcpp::ClientBase::wait_for_service` and
2054 /// `rclpy.client.Client.wait_for_service`.
2055 ///
2056 /// On the Zenoh backend this issues a `z_liveliness_get` against the
2057 /// matching server's wildcarded liveliness keyexpr; the executor is
2058 /// spun cooperatively while the query is in flight so other
2059 /// subscribers / timers continue to make progress. Backends without
2060 /// liveliness discovery answer `Ok(true)` immediately (default trait
2061 /// impl in `nros-rmw`), so the call is a no-op cost when discovery
2062 /// isn't supported.
2063 ///
2064 /// Recommended usage — gate the first `call()` on this:
2065 ///
2066 /// ```ignore
2067 /// let mut client = node.create_client::<AddTwoInts>("/add_two_ints")?;
2068 /// if !client.wait_for_service(&mut executor, Duration::from_secs(5))? {
2069 /// return Err(NodeError::Timeout);
2070 /// }
2071 /// let mut promise = client.call(&request)?;
2072 /// ```
2073 ///
2074 /// Once the server is observed, the result is latched: subsequent
2075 /// `service_is_ready` checks return `true` without another round
2076 /// trip. This matches `rclcpp`'s snapshot semantic — discovery isn't
2077 /// re-proven on every call.
2078 pub fn wait_for_service(
2079 &mut self,
2080 executor: &mut super::Executor,
2081 timeout: core::time::Duration,
2082 ) -> Result<bool, NodeError> {
2083 // Already proven once — don't re-query.
2084 if self.handle.is_server_ready() {
2085 return Ok(true);
2086 }
2087 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2088 let max_spins = (timeout.as_millis() as u64 / DEFAULT_SPIN_INTERVAL_MS).max(1);
2089 let mut budget = WaitBudget::new(max_spins, timeout);
2090 // Per-query budget. A liveliness_get is a single-shot probe of the
2091 // router's current token list; if the server hasn't declared its
2092 // token yet when our query arrives, the router replies "no
2093 // matching tokens" and the query terminates. We loop, re-issuing
2094 // shorter probes until either a matching token is observed or the
2095 // outer wall-clock budget expires. 1 s per probe balances "see
2096 // freshly-declared tokens quickly" against "burn fewer FFI
2097 // round-trips on a healthy network".
2098 const PROBE_TIMEOUT_MS: u32 = 1000;
2099 loop {
2100 self.handle
2101 .start_server_discovery(PROBE_TIMEOUT_MS)
2102 .map_err(|_| NodeError::ServiceRequestFailed)?;
2103 // Drain this probe to completion (token reply or empty FINAL).
2104 loop {
2105 executor.spin_once(spin_interval);
2106 match self
2107 .handle
2108 .poll_server_discovery()
2109 .map_err(|_| NodeError::ServiceRequestFailed)?
2110 {
2111 Some(true) => return Ok(true),
2112 Some(false) => break, // probe finished empty — re-issue
2113 None => {} // still in flight
2114 }
2115 if !budget.tick() {
2116 return Ok(false);
2117 }
2118 }
2119 if !budget.tick() {
2120 return Ok(false);
2121 }
2122 }
2123 }
2124
2125 /// Snapshot whether a matching service server is currently visible.
2126 ///
2127 /// Non-blocking. Matches `rclcpp::ClientBase::service_is_ready` and
2128 /// `rclpy.client.Client.service_is_ready`. Backends without liveliness
2129 /// discovery return `true` (assume always reachable).
2130 pub fn service_is_ready(&self) -> bool {
2131 self.handle.is_server_ready()
2132 }
2133
2134 /// Phase 124.C.3 — graph-aware server-availability probe.
2135 ///
2136 /// Returns `Ok(true)` / `Ok(false)` when the backend can answer
2137 /// (zenoh queryable interest, DDS built-in topic reader), or
2138 /// `Err(NodeError::Transport(Unsupported))` when it can't (XRCE
2139 /// agent without participant enumeration). Distinct from
2140 /// [`service_is_ready`](Self::service_is_ready) — that one
2141 /// collapses "no" and "don't know" into the same `false`.
2142 ///
2143 /// Used to gate the first `call_raw` so a startup-ordering race
2144 /// (client opens before server's discovery announcement lands)
2145 /// doesn't surface as a request-side timeout.
2146 pub fn server_available(&self) -> Result<bool, NodeError> {
2147 use nros_rmw::ServiceClientTrait;
2148 self.handle.server_available().map_err(NodeError::Transport)
2149 }
2150}
2151
2152// ============================================================================
2153// Promise
2154// ============================================================================
2155
2156/// A pending reply from a non-blocking service or action call.
2157///
2158/// Poll with [`try_recv()`](Promise::try_recv) to check for the reply.
2159/// Implements [`Future`](core::future::Future) for use with async executors.
2160pub struct Promise<'a, T> {
2161 pub(crate) handle: &'a mut session::RmwServiceClient,
2162 pub(crate) reply_buffer: &'a mut [u8],
2163 pub(crate) parse: fn(&[u8]) -> Result<T, NodeError>,
2164 /// Phase 84.D3: cleared on a successful `try_recv` so the client's
2165 /// next `call()` can proceed. If the `Promise` is dropped before the
2166 /// reply is consumed, the flag stays set — forcing the user to
2167 /// explicitly acknowledge the abandoned call via
2168 /// `reset_in_flight()`.
2169 pub(crate) in_flight_flag: &'a mut bool,
2170}
2171
2172impl<T> Promise<'_, T> {
2173 /// Try to receive the reply (non-blocking).
2174 ///
2175 /// Returns `Ok(Some(reply))` if the reply has arrived,
2176 /// `Ok(None)` if still pending.
2177 pub fn try_recv(&mut self) -> Result<Option<T>, NodeError> {
2178 // Phase 120: NoData (no reply yet) is the steady-state polling
2179 // condition — map to Ok(None) instead of ServiceRequestFailed.
2180 match match self.handle.try_recv_reply_raw(self.reply_buffer) {
2181 Ok(opt) => opt,
2182 Err(TransportError::NoData) => return Ok(None),
2183 Err(e) => return Err(NodeError::Transport(e)),
2184 } {
2185 Some(len) => {
2186 let reply = (self.parse)(&self.reply_buffer[..len])?;
2187 // Reply consumed — allow the client to issue another call.
2188 *self.in_flight_flag = false;
2189 Ok(Some(reply))
2190 }
2191 None => Ok(None),
2192 }
2193 }
2194}
2195
2196impl<T> Promise<'_, T> {
2197 /// Block until the reply arrives, spinning the executor.
2198 ///
2199 /// Internally calls `executor.spin_once()` in a loop until the reply
2200 /// arrives or `timeout_ms` is exhausted. This is equivalent to the
2201 /// manual spin+poll loop pattern but more ergonomic for simple use cases.
2202 ///
2203 /// No borrow conflict: `executor` and `self` (which borrows the standalone
2204 /// client) are disjoint objects.
2205 ///
2206 /// # Errors
2207 ///
2208 /// Returns [`NodeError::Timeout`] if the reply does not arrive within
2209 /// `timeout_ms` milliseconds.
2210 pub fn wait(
2211 &mut self,
2212 executor: &mut super::Executor,
2213 timeout: core::time::Duration,
2214 ) -> Result<T, NodeError> {
2215 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2216 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2217 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2218 let mut budget = WaitBudget::new(max_spins, timeout);
2219 // Always spin at least once so a zero-timeout still polls.
2220 loop {
2221 executor.spin_once(spin_interval);
2222 if let Some(result) = self.try_recv()? {
2223 return Ok(result);
2224 }
2225 if !budget.tick() {
2226 return Err(NodeError::Timeout);
2227 }
2228 }
2229 }
2230}
2231
2232impl<T> core::future::Future for Promise<'_, T> {
2233 type Output = Result<T, NodeError>;
2234
2235 fn poll(
2236 self: core::pin::Pin<&mut Self>,
2237 cx: &mut core::task::Context<'_>,
2238 ) -> core::task::Poll<Self::Output> {
2239 let this = self.get_mut();
2240 // Register-then-check (closes the race where a reply lands
2241 // between try_recv returning None and the waker registering).
2242 this.handle.register_waker(cx.waker());
2243 match this.try_recv() {
2244 Ok(Some(reply)) => core::task::Poll::Ready(Ok(reply)),
2245 Ok(None) => core::task::Poll::Pending,
2246 Err(e) => core::task::Poll::Ready(Err(e)),
2247 }
2248 }
2249}
2250
2251impl<T> Promise<'_, T> {
2252 /// Async poll-until-ready helper for environments where the
2253 /// backend's `register_waker` path can't deliver a wake.
2254 ///
2255 /// The plain `.await` (via the `Future` impl above) parks the
2256 /// caller until the backend's listener fires the stored Waker.
2257 /// That works on `std` builds where the backend has a
2258 /// background-thread pool actively polling its listener tasks,
2259 /// but it deadlocks on the `nostd-runtime` DDS path (and any
2260 /// other cooperative backend whose listener future only runs
2261 /// when something actively drives the runtime). The 160.B.1
2262 /// trace pinned this to DDS's nostd runtime: listener
2263 /// futures only advance inside `runtime.block_on(...)`, and the
2264 /// parked `.await` consumer never issues such a call.
2265 ///
2266 /// `poll_until_ready(yield_fn)` instead actively polls
2267 /// `try_recv()` on each turn and awaits the caller-supplied
2268 /// yield future between attempts. The yield gives the executor
2269 /// a chance to run other ready tasks (typically a `spin_task`
2270 /// that drives the backend runtime via `executor.spin_once()`),
2271 /// which in turn pumps the listener future. On the `std` path
2272 /// this devolves to a fast poll-then-yield loop with no
2273 /// correctness penalty; on `nostd-runtime` it's the only shape
2274 /// that completes.
2275 ///
2276 /// # Example
2277 ///
2278 /// ```ignore
2279 /// let reply = client
2280 /// .call(&req)?
2281 /// .poll_until_ready(|| embassy_time::Timer::after_millis(5))
2282 /// .await?;
2283 /// ```
2284 pub async fn poll_until_ready<F, Fut>(&mut self, mut yield_fn: F) -> Result<T, NodeError>
2285 where
2286 F: FnMut() -> Fut,
2287 Fut: core::future::Future<Output = ()>,
2288 {
2289 loop {
2290 match self.try_recv()? {
2291 Some(reply) => return Ok(reply),
2292 None => yield_fn().await,
2293 }
2294 }
2295 }
2296}
2297
2298/// Deserialize a CDR-encoded service reply.
2299fn cdr_deserialize_reply<Svc: RosService>(data: &[u8]) -> Result<Svc::Reply, NodeError> {
2300 let mut reader = CdrReader::new_with_header(data).map_err(|_| NodeError::Deserialization)?;
2301 Svc::Reply::deserialize(&mut reader).map_err(|_| NodeError::Deserialization)
2302}
2303
2304// ============================================================================
2305// Action types
2306// ============================================================================
2307
2308/// Active goal tracking for action server.
2309#[derive(Clone)]
2310pub struct ActiveGoal<A: RosAction> {
2311 /// Goal ID.
2312 pub goal_id: nros_core::GoalId,
2313 /// Current status.
2314 pub status: nros_core::GoalStatus,
2315 /// The goal data.
2316 pub goal: A::Goal,
2317}
2318
2319/// Completed goal with result.
2320pub struct CompletedGoal<A: RosAction> {
2321 /// Goal ID.
2322 pub goal_id: nros_core::GoalId,
2323 /// Final status.
2324 pub status: nros_core::GoalStatus,
2325 /// The result data.
2326 pub result: A::Result,
2327}
2328
2329// ============================================================================
2330// ActionServer
2331// ============================================================================
2332
2333/// Typed action server with goal state management.
2334///
2335/// Wraps [`ActionServerCore`](super::action_core::ActionServerCore) for
2336/// raw-bytes protocol handling, adding typed goal/feedback/result
2337/// serialization at the boundary.
2338pub struct ActionServer<
2339 A: RosAction,
2340 const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2341 const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2342 const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2343 const MAX_GOALS: usize = 4,
2344> {
2345 pub(crate) core:
2346 super::action_core::ActionServerCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>,
2347 /// Typed goal data parallel to `core.active_goals`.
2348 pub(crate) typed_goals: heapless::Vec<A::Goal, MAX_GOALS>,
2349 /// Completed goals with typed results.
2350 pub(crate) completed_goals: heapless::Vec<CompletedGoal<A>, MAX_GOALS>,
2351}
2352
2353impl<
2354 A: RosAction,
2355 const GOAL_BUF: usize,
2356 const RESULT_BUF: usize,
2357 const FEEDBACK_BUF: usize,
2358 const MAX_GOALS: usize,
2359> ActionServer<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>
2360{
2361 /// Try to accept a new goal.
2362 ///
2363 /// Checks for incoming send_goal requests. If one is available, calls the
2364 /// handler to decide acceptance. Returns the goal ID if accepted.
2365 pub fn try_accept_goal(
2366 &mut self,
2367 goal_handler: impl FnOnce(&nros_core::GoalId, &A::Goal) -> nros_core::GoalResponse,
2368 ) -> Result<Option<nros_core::GoalId>, NodeError>
2369 where
2370 A::Goal: Clone,
2371 {
2372 let raw_req = self.core.try_recv_goal_request()?;
2373 let raw_req = match raw_req {
2374 Some(r) => r,
2375 None => return Ok(None),
2376 };
2377
2378 // Deserialize the goal from the buffer at the offset captured
2379 // by the core (DDS prepends an 8-byte seq prefix; zenoh uses 0).
2380 let buf = self.core.goal_buffer();
2381 let start = raw_req.data_offset;
2382 let end = start + raw_req.data_len;
2383 let mut reader = CdrReader::new_with_header(&buf[start..end])
2384 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2385 // Skip past the GoalId — a fixed `uint8[16]` UUID, no length prefix
2386 // (ROS 2 `unique_identifier_msgs/UUID`; see action_core::read_goal_id).
2387 for _ in 0..GOAL_UUID_SIZE {
2388 let _ = reader.read_u8();
2389 }
2390 let goal = A::Goal::deserialize(&mut reader)
2391 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2392
2393 let response = goal_handler(&raw_req.goal_id, &goal);
2394 let accepted = response.is_accepted();
2395
2396 if accepted {
2397 self.core
2398 .accept_goal(raw_req.goal_id, raw_req.sequence_number)?;
2399 let _ = self.typed_goals.push(goal);
2400 Ok(Some(raw_req.goal_id))
2401 } else {
2402 self.core.reject_goal(raw_req.sequence_number)?;
2403 Ok(None)
2404 }
2405 }
2406
2407 /// Publish feedback for a goal.
2408 pub fn publish_feedback(
2409 &mut self,
2410 goal_id: &nros_core::GoalId,
2411 feedback: &A::Feedback,
2412 ) -> Result<(), NodeError> {
2413 // Serialize feedback into a temp buffer (without CDR header or GoalId)
2414 let mut tmp = [0u8; FEEDBACK_BUF];
2415 let mut writer = CdrWriter::new(&mut tmp);
2416 feedback
2417 .serialize(&mut writer)
2418 .map_err(|_| NodeError::Serialization)?;
2419 let feedback_len = writer.position();
2420
2421 self.core
2422 .publish_feedback_raw(goal_id, &tmp[..feedback_len])
2423 }
2424
2425 /// Set a goal's status.
2426 ///
2427 /// Also publishes the updated `GoalStatusArray` on the status topic.
2428 pub fn set_goal_status(&mut self, goal_id: &nros_core::GoalId, status: nros_core::GoalStatus) {
2429 self.core.set_goal_status(goal_id, status);
2430 }
2431
2432 /// Complete a goal and store the result.
2433 ///
2434 /// Also publishes the updated `GoalStatusArray` on the status topic.
2435 pub fn complete_goal(
2436 &mut self,
2437 goal_id: &nros_core::GoalId,
2438 status: nros_core::GoalStatus,
2439 result: A::Result,
2440 ) {
2441 // Serialize result for the core slab
2442 let mut tmp = [0u8; RESULT_BUF];
2443 let mut writer = CdrWriter::new(&mut tmp);
2444 let result_len = match result.serialize(&mut writer) {
2445 Ok(()) => writer.position(),
2446 Err(_) => 0,
2447 };
2448
2449 // Remove typed goal parallel to core's active_goals removal
2450 if let Some(pos) = self
2451 .core
2452 .active_goals()
2453 .iter()
2454 .position(|g| g.goal_id.uuid == goal_id.uuid)
2455 {
2456 self.typed_goals.swap_remove(pos);
2457 }
2458
2459 self.core
2460 .complete_goal_raw(goal_id, status, &tmp[..result_len]);
2461
2462 let _ = self.completed_goals.push(CompletedGoal {
2463 goal_id: *goal_id,
2464 status,
2465 result,
2466 });
2467 }
2468
2469 /// Try to handle a cancel_goal request.
2470 pub fn try_handle_cancel(
2471 &mut self,
2472 cancel_handler: impl FnOnce(
2473 &nros_core::GoalId,
2474 nros_core::GoalStatus,
2475 ) -> nros_core::CancelResponse,
2476 ) -> Result<Option<(nros_core::GoalId, nros_core::CancelResponse)>, NodeError> {
2477 self.core.try_handle_cancel(cancel_handler)
2478 }
2479
2480 /// Try to handle a get_result request.
2481 pub fn try_handle_get_result(&mut self) -> Result<Option<nros_core::GoalId>, NodeError>
2482 where
2483 A::Result: Clone + Default,
2484 {
2485 // Serialize default result for non-completed goals
2486 let mut default_buf = [0u8; RESULT_BUF];
2487 let mut writer = CdrWriter::new(&mut default_buf);
2488 let default_len = match A::Result::default().serialize(&mut writer) {
2489 Ok(()) => writer.position(),
2490 Err(_) => 0,
2491 };
2492
2493 self.core
2494 .try_handle_get_result_raw(&default_buf[..default_len])
2495 }
2496
2497 /// Drain all pending server-side work in one call.
2498 ///
2499 /// Calls `try_accept_goal`, `try_handle_cancel`, and
2500 /// `try_handle_get_result` in sequence. Invoke this on every
2501 /// `spin_once` iteration in manual-poll code — otherwise clients
2502 /// will hang on `get_result` because `create_action_server()`
2503 /// servers are not arena-registered.
2504 ///
2505 /// The two callbacks may be called zero or one times per `poll()`:
2506 /// * `on_goal` fires when a new goal arrives.
2507 /// * `on_cancel` fires when a cancel request arrives.
2508 ///
2509 /// Get-result requests are drained unconditionally (no callback
2510 /// needed — the result is pulled from the goal's stored state).
2511 ///
2512 /// # Example
2513 /// ```ignore
2514 /// let mut server = node.create_action_server::<Fibonacci>("/fibonacci")?;
2515 /// loop {
2516 /// executor.spin_once(Duration::from_millis(10));
2517 /// server.poll(
2518 /// |id, goal| {
2519 /// /* accept or reject based on `goal` */
2520 /// GoalResponse::AcceptAndExecute
2521 /// },
2522 /// |_id, _status| CancelResponse::Accept,
2523 /// )?;
2524 /// }
2525 /// ```
2526 pub fn poll<GF, CF>(&mut self, mut on_goal: GF, mut on_cancel: CF) -> Result<(), NodeError>
2527 where
2528 GF: FnMut(&nros_core::GoalId, &A::Goal) -> nros_core::GoalResponse,
2529 CF: FnMut(&nros_core::GoalId, nros_core::GoalStatus) -> nros_core::CancelResponse,
2530 A::Goal: Clone,
2531 A::Result: Clone + Default,
2532 {
2533 let _ = self.try_accept_goal(|id, goal| on_goal(id, goal))?;
2534 let _ = self.try_handle_cancel(|id, status| on_cancel(id, status))?;
2535 let _ = self.try_handle_get_result()?;
2536 Ok(())
2537 }
2538
2539 /// Get a reference to an active goal.
2540 pub fn get_goal(&self, goal_id: &nros_core::GoalId) -> Option<ActiveGoal<A>>
2541 where
2542 A::Goal: Clone,
2543 {
2544 self.core
2545 .active_goals()
2546 .iter()
2547 .enumerate()
2548 .find(|(_, g)| g.goal_id.uuid == goal_id.uuid)
2549 .map(|(i, raw)| ActiveGoal {
2550 goal_id: raw.goal_id,
2551 status: raw.status,
2552 goal: self.typed_goals[i].clone(),
2553 })
2554 }
2555
2556 /// Get the number of active goals.
2557 pub fn active_goal_count(&self) -> usize {
2558 self.core.active_goal_count()
2559 }
2560}
2561
2562// ============================================================================
2563// ActionClient
2564// ============================================================================
2565
2566/// Typed action client handle.
2567///
2568/// Wraps [`ActionClientCore`](super::action_core::ActionClientCore) for
2569/// raw-bytes protocol handling, adding typed goal/feedback/result
2570/// serialization at the boundary.
2571pub struct ActionClient<
2572 A: RosAction,
2573 const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2574 const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2575 const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2576> {
2577 pub(crate) core: super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2578 pub(crate) _phantom: PhantomData<A>,
2579}
2580
2581impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2582 ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2583{
2584 /// Send a goal (non-blocking). Returns the goal ID and a [`Promise`] for acceptance.
2585 ///
2586 /// The promise resolves to `true` if accepted, `false` if rejected.
2587 pub fn send_goal(
2588 &mut self,
2589 goal: &A::Goal,
2590 ) -> Result<(nros_core::GoalId, Promise<'_, bool>), NodeError> {
2591 if self.core.in_flight_send_goal {
2592 return Err(NodeError::RequestInFlight);
2593 }
2594
2595 // Serialize goal into a temp buffer (without CDR header or GoalId)
2596 let mut tmp = [0u8; GOAL_BUF];
2597 let mut writer = CdrWriter::new(&mut tmp);
2598 goal.serialize(&mut writer)
2599 .map_err(|_| NodeError::Serialization)?;
2600 let goal_len = writer.position();
2601
2602 let goal_id = self.core.send_goal_raw(&tmp[..goal_len])?;
2603 self.core.in_flight_send_goal = true;
2604
2605 Ok((
2606 goal_id,
2607 Promise {
2608 handle: &mut self.core.send_goal_client,
2609 reply_buffer: &mut self.core.result_buffer,
2610 parse: parse_goal_accepted,
2611 in_flight_flag: &mut self.core.in_flight_send_goal,
2612 },
2613 ))
2614 }
2615
2616 /// Try to receive feedback (non-blocking).
2617 pub fn try_recv_feedback(
2618 &mut self,
2619 ) -> Result<Option<(nros_core::GoalId, A::Feedback)>, NodeError> {
2620 let (goal_id, len) = match self.core.try_recv_feedback_raw()? {
2621 Some(v) => v,
2622 None => return Ok(None),
2623 };
2624
2625 // Deserialize feedback from the core's feedback buffer (after GoalId)
2626 let mut reader = CdrReader::new_with_header(&self.core.feedback_buffer[..len])
2627 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2628 // Skip GoalId — a fixed `uint8[16]` UUID, no length prefix
2629 // (ROS 2 `unique_identifier_msgs/UUID`; see action_core::read_goal_id).
2630 for _ in 0..GOAL_UUID_SIZE {
2631 let _ = reader.read_u8();
2632 }
2633
2634 let feedback = A::Feedback::deserialize(&mut reader)
2635 .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
2636
2637 Ok(Some((goal_id, feedback)))
2638 }
2639
2640 /// Cancel a goal (non-blocking). Returns a [`Promise`] for the cancel response.
2641 pub fn cancel_goal(
2642 &mut self,
2643 goal_id: &nros_core::GoalId,
2644 ) -> Result<Promise<'_, nros_core::CancelResponse>, NodeError> {
2645 if self.core.in_flight_cancel {
2646 return Err(NodeError::RequestInFlight);
2647 }
2648 self.core.send_cancel_request(goal_id)?;
2649 self.core.in_flight_cancel = true;
2650
2651 Ok(Promise {
2652 handle: &mut self.core.cancel_goal_client,
2653 reply_buffer: &mut self.core.result_buffer,
2654 parse: parse_cancel_response,
2655 in_flight_flag: &mut self.core.in_flight_cancel,
2656 })
2657 }
2658
2659 /// Get the result of a completed goal (non-blocking). Returns a [`Promise`].
2660 pub fn get_result(
2661 &mut self,
2662 goal_id: &nros_core::GoalId,
2663 ) -> Result<Promise<'_, (nros_core::GoalStatus, A::Result)>, NodeError> {
2664 if self.core.in_flight_get_result {
2665 return Err(NodeError::RequestInFlight);
2666 }
2667 self.core.send_get_result_request(goal_id)?;
2668 self.core.in_flight_get_result = true;
2669
2670 Ok(Promise {
2671 handle: &mut self.core.get_result_client,
2672 reply_buffer: &mut self.core.result_buffer,
2673 parse: parse_result_response::<A>,
2674 in_flight_flag: &mut self.core.in_flight_get_result,
2675 })
2676 }
2677
2678 /// Explicitly clear the "send_goal reply in flight" flag (Phase 84.D3).
2679 pub fn reset_send_goal_in_flight(&mut self) {
2680 self.core.in_flight_send_goal = false;
2681 }
2682
2683 /// Block until the action server's send-goal queryable is discoverable
2684 /// on the network, or `timeout` elapses.
2685 ///
2686 /// Returns `Ok(true)` on discovery, `Ok(false)` on timeout. Mirrors
2687 /// `rclcpp_action::Client::wait_for_action_server`.
2688 ///
2689 /// Implementation: probes the action's `send_goal` service-server
2690 /// liveliness keyexpr via the same primitive as
2691 /// [`Client::wait_for_service`]. Once that service is reachable the
2692 /// remaining four action entities (cancel queryable + feedback /
2693 /// status / result publishers) are also reachable in practice — they
2694 /// were declared by the same server in one batch.
2695 pub fn wait_for_action_server(
2696 &mut self,
2697 executor: &mut super::Executor,
2698 timeout: core::time::Duration,
2699 ) -> Result<bool, NodeError> {
2700 if self.core.send_goal_client.is_server_ready() {
2701 return Ok(true);
2702 }
2703 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2704 let max_spins = (timeout.as_millis() as u64 / DEFAULT_SPIN_INTERVAL_MS).max(1);
2705 let mut budget = WaitBudget::new(max_spins, timeout);
2706 // See `Client::wait_for_service` for the re-probe rationale: a
2707 // single liveliness_get samples the router's current token list
2708 // and terminates; we loop with shorter per-probe timeouts so the
2709 // outer budget covers servers that come up after we start
2710 // waiting.
2711 const PROBE_TIMEOUT_MS: u32 = 1000;
2712 loop {
2713 self.core
2714 .send_goal_client
2715 .start_server_discovery(PROBE_TIMEOUT_MS)
2716 .map_err(|_| NodeError::ServiceRequestFailed)?;
2717 loop {
2718 executor.spin_once(spin_interval);
2719 match self
2720 .core
2721 .send_goal_client
2722 .poll_server_discovery()
2723 .map_err(|_| NodeError::ServiceRequestFailed)?
2724 {
2725 Some(true) => return Ok(true),
2726 Some(false) => break,
2727 None => {}
2728 }
2729 if !budget.tick() {
2730 return Ok(false);
2731 }
2732 }
2733 if !budget.tick() {
2734 return Ok(false);
2735 }
2736 }
2737 }
2738
2739 /// Snapshot whether the action server is currently visible.
2740 /// Mirrors `rclcpp_action::Client::action_server_is_ready`.
2741 pub fn action_server_is_ready(&self) -> bool {
2742 self.core.send_goal_client.is_server_ready()
2743 }
2744
2745 /// Explicitly clear the "cancel reply in flight" flag (Phase 84.D3).
2746 pub fn reset_cancel_in_flight(&mut self) {
2747 self.core.in_flight_cancel = false;
2748 }
2749
2750 /// Explicitly clear the "get_result reply in flight" flag (Phase 84.D3).
2751 pub fn reset_get_result_in_flight(&mut self) {
2752 self.core.in_flight_get_result = false;
2753 }
2754
2755 /// Create a feedback stream (receives feedback for all goals).
2756 ///
2757 /// The stream borrows `&mut self` exclusively. Drop it before calling
2758 /// `get_result()` or `cancel_goal()`.
2759 pub fn feedback_stream(&mut self) -> FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF> {
2760 FeedbackStream { client: self }
2761 }
2762
2763 /// Create a goal-filtered feedback stream.
2764 ///
2765 /// Only yields feedback for the given `goal_id`, returning `A::Feedback`
2766 /// directly (without the `GoalId` wrapper).
2767 pub fn feedback_stream_for(
2768 &mut self,
2769 goal_id: nros_core::GoalId,
2770 ) -> GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF> {
2771 GoalFeedbackStream {
2772 client: self,
2773 goal_id,
2774 }
2775 }
2776}
2777
2778// ============================================================================
2779// FeedbackStream
2780// ============================================================================
2781
2782/// A stream of feedback messages from an action server.
2783///
2784/// Created by [`ActionClient::feedback_stream()`]. Receives feedback for
2785/// all active goals. The stream never self-terminates — use combinators
2786/// like `take_while` or `break` to stop.
2787///
2788/// Three access modes:
2789/// - **Async (`Stream`)**: Enable the `stream` feature for
2790/// `futures_core::Stream` + `StreamExt` combinators
2791/// - **Async (no deps)**: Use `next()` in
2792/// `while let` loops (always available)
2793/// - **Sync**: Use [`wait_next()`](FeedbackStream::wait_next) which
2794/// drives the executor internally
2795pub struct FeedbackStream<
2796 'a,
2797 A: RosAction,
2798 const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2799 const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2800 const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2801> {
2802 client: &'a mut ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2803}
2804
2805impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2806 FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2807{
2808 /// Async: wait for the next feedback message (no `futures` dependency needed).
2809 ///
2810 /// Requires a background task running `executor.spin_async()` to drive
2811 /// I/O. Returns `None` only on error.
2812 ///
2813 /// When the `stream` feature is enabled, prefer `StreamExt::next()` or
2814 /// `TryStreamExt::try_next()` for combinator support.
2815 ///
2816 /// # Example
2817 ///
2818 /// ```ignore
2819 /// let mut stream = client.feedback_stream();
2820 /// while let Some(result) = stream.recv().await {
2821 /// let (goal_id, feedback) = result?;
2822 /// // process feedback...
2823 /// }
2824 /// ```
2825 pub async fn recv(&mut self) -> Option<Result<(nros_core::GoalId, A::Feedback), NodeError>> {
2826 core::future::poll_fn(|cx| {
2827 // Register-then-check (closes the AtomicWaker race).
2828 self.client
2829 .core
2830 .feedback_subscriber
2831 .register_waker(cx.waker());
2832 match self.client.try_recv_feedback() {
2833 Ok(Some(item)) => core::task::Poll::Ready(Some(Ok(item))),
2834 Ok(None) => core::task::Poll::Pending,
2835 Err(e) => core::task::Poll::Ready(Some(Err(e))),
2836 }
2837 })
2838 .await
2839 }
2840
2841 /// Sync: wait for the next feedback message, spinning the executor.
2842 ///
2843 /// Returns `Ok(Some(feedback))` if a message arrives within `timeout_ms`,
2844 /// or `Ok(None)` on timeout. Unlike [`Promise::wait()`], timeout is not
2845 /// an error — the caller typically retries in a loop.
2846 pub fn wait_next(
2847 &mut self,
2848 executor: &mut super::Executor,
2849 timeout: core::time::Duration,
2850 ) -> Result<Option<(nros_core::GoalId, A::Feedback)>, NodeError> {
2851 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2852 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2853 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2854 let mut budget = WaitBudget::new(max_spins, timeout);
2855 loop {
2856 executor.spin_once(spin_interval);
2857 if let Some(item) = self.client.try_recv_feedback()? {
2858 return Ok(Some(item));
2859 }
2860 if !budget.tick() {
2861 return Ok(None);
2862 }
2863 }
2864 }
2865}
2866
2867#[cfg(feature = "stream")]
2868impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2869 futures_core::Stream for FeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2870{
2871 type Item = Result<(nros_core::GoalId, A::Feedback), NodeError>;
2872
2873 fn poll_next(
2874 self: core::pin::Pin<&mut Self>,
2875 cx: &mut core::task::Context<'_>,
2876 ) -> core::task::Poll<Option<Self::Item>> {
2877 let this = self.get_mut();
2878 // Register-then-check (closes the AtomicWaker race).
2879 this.client
2880 .core
2881 .feedback_subscriber
2882 .register_waker(cx.waker());
2883 match this.client.try_recv_feedback() {
2884 Ok(Some(item)) => core::task::Poll::Ready(Some(Ok(item))),
2885 Ok(None) => core::task::Poll::Pending,
2886 Err(e) => core::task::Poll::Ready(Some(Err(e))),
2887 }
2888 }
2889}
2890
2891// ============================================================================
2892// GoalFeedbackStream
2893// ============================================================================
2894
2895/// A goal-filtered feedback stream.
2896///
2897/// Created by [`ActionClient::feedback_stream_for()`]. Only yields feedback
2898/// messages matching the specified goal ID.
2899pub struct GoalFeedbackStream<
2900 'a,
2901 A: RosAction,
2902 const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2903 const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2904 const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2905> {
2906 client: &'a mut ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>,
2907 goal_id: nros_core::GoalId,
2908}
2909
2910impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2911 GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2912{
2913 /// Async: wait for the next feedback message for this goal (no `futures` dependency needed).
2914 ///
2915 /// When the `stream` feature is enabled, prefer `StreamExt::next()` or
2916 /// `TryStreamExt::try_next()` for combinator support.
2917 pub async fn recv(&mut self) -> Option<Result<A::Feedback, NodeError>> {
2918 core::future::poll_fn(|cx| {
2919 // Register-then-check (closes the AtomicWaker race). The
2920 // waker is registered once for both the "no data" and
2921 // "wrong goal" branches that fall through to Pending.
2922 self.client
2923 .core
2924 .feedback_subscriber
2925 .register_waker(cx.waker());
2926 match self.client.try_recv_feedback() {
2927 Ok(Some((id, feedback))) if id.uuid == self.goal_id.uuid => {
2928 core::task::Poll::Ready(Some(Ok(feedback)))
2929 }
2930 // Feedback for a different goal — keep waiting.
2931 Ok(Some(_)) => core::task::Poll::Pending,
2932 Ok(None) => core::task::Poll::Pending,
2933 Err(e) => core::task::Poll::Ready(Some(Err(e))),
2934 }
2935 })
2936 .await
2937 }
2938
2939 /// Sync: wait for the next feedback message for this goal, spinning the executor.
2940 pub fn wait_next(
2941 &mut self,
2942 executor: &mut super::Executor,
2943 timeout: core::time::Duration,
2944 ) -> Result<Option<A::Feedback>, NodeError> {
2945 let spin_interval = core::time::Duration::from_millis(DEFAULT_SPIN_INTERVAL_MS);
2946 let timeout_ms = timeout.as_millis().min(u64::MAX as u128) as u64;
2947 let max_spins = (timeout_ms / DEFAULT_SPIN_INTERVAL_MS).max(1);
2948 let mut budget = WaitBudget::new(max_spins, timeout);
2949 loop {
2950 executor.spin_once(spin_interval);
2951 if let Some((id, feedback)) = self.client.try_recv_feedback()?
2952 && id.uuid == self.goal_id.uuid
2953 {
2954 return Ok(Some(feedback));
2955 }
2956 if !budget.tick() {
2957 return Ok(None);
2958 }
2959 }
2960 }
2961}
2962
2963#[cfg(feature = "stream")]
2964impl<A: RosAction, const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
2965 futures_core::Stream for GoalFeedbackStream<'_, A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
2966{
2967 type Item = Result<A::Feedback, NodeError>;
2968
2969 fn poll_next(
2970 self: core::pin::Pin<&mut Self>,
2971 cx: &mut core::task::Context<'_>,
2972 ) -> core::task::Poll<Option<Self::Item>> {
2973 let this = self.get_mut();
2974 // Register-then-check (closes the AtomicWaker race).
2975 this.client
2976 .core
2977 .feedback_subscriber
2978 .register_waker(cx.waker());
2979 match this.client.try_recv_feedback() {
2980 Ok(Some((id, feedback))) if id.uuid == this.goal_id.uuid => {
2981 core::task::Poll::Ready(Some(Ok(feedback)))
2982 }
2983 Ok(Some(_)) => core::task::Poll::Pending,
2984 Ok(None) => core::task::Poll::Pending,
2985 Err(e) => core::task::Poll::Ready(Some(Err(e))),
2986 }
2987 }
2988}
2989
2990/// Parse a goal acceptance response (bool).
2991fn parse_goal_accepted(data: &[u8]) -> Result<bool, NodeError> {
2992 let mut reader =
2993 CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
2994 let accepted = reader.read_u8().unwrap_or(0) != 0;
2995 Ok(accepted)
2996}
2997
2998/// Parse a cancel response.
2999fn parse_cancel_response(data: &[u8]) -> Result<nros_core::CancelResponse, NodeError> {
3000 let mut reader =
3001 CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
3002 let return_code = reader.read_i8().unwrap_or(2);
3003 Ok(nros_core::CancelResponse::from_i8(return_code).unwrap_or_default())
3004}
3005
3006/// Parse an action result response (status + result).
3007fn parse_result_response<A: RosAction>(
3008 data: &[u8],
3009) -> Result<(nros_core::GoalStatus, A::Result), NodeError> {
3010 let mut reader =
3011 CdrReader::new_with_header(data).map_err(|_| NodeError::ServiceRequestFailed)?;
3012 let status_code = reader.read_i8().unwrap_or(0);
3013 let status = nros_core::GoalStatus::from_i8(status_code).unwrap_or_default();
3014 let result =
3015 A::Result::deserialize(&mut reader).map_err(|_| NodeError::ServiceRequestFailed)?;
3016 Ok((status, result))
3017}