nros_node/executor/spin.rs
1//! Executor struct and core spin methods.
2
3use core::{marker::PhantomData, mem::MaybeUninit};
4
5use nros_core::{BorrowedMessage, RosMessage, RosService};
6use nros_rmw::{QosSettings, ServiceInfo, Session, TopicInfo, TransportError};
7
8use crate::{session, timer::TimerDuration};
9
10#[cfg(feature = "safety-e2e")]
11use super::arena::{
12 SubSafetyEntry, sub_safety_has_data, sub_safety_pre_sample, sub_safety_try_process,
13};
14#[cfg(feature = "rmw-cffi")]
15use super::types::ExecutorConfig;
16#[cfg(feature = "std")]
17use super::types::SpinOptions;
18use super::{
19 arena::{
20 BufferStrategy, CallbackMeta, EntryKind, GuardConditionEntry, ServiceClientCallbackEntry,
21 ServiceClientRawArenaEntry, ServiceClientSendHeader, SrvEntry, SrvRawEntry,
22 SubBufferedBorrowedEntry, SubBufferedEntry, SubBufferedRawCEntry, SubBufferedRawEntry,
23 SubBufferedRawInfoCEntry, SubBufferedRawInfoEntry, SubInfoEntry, SubInplaceEntry,
24 TimerEntry, TimerHeader, always_ready, buffered_region_size, drop_entry, guard_has_data,
25 guard_try_process, no_pre_sample, service_client_callback_try_process,
26 service_client_raw_try_process, srv_has_data, srv_raw_has_data, srv_raw_try_process,
27 srv_try_process, sub_buffered_borrowed_has_data, sub_buffered_borrowed_try_process,
28 sub_buffered_has_data, sub_buffered_raw_c_has_data, sub_buffered_raw_c_try_process,
29 sub_buffered_raw_has_data, sub_buffered_raw_info_c_has_data,
30 sub_buffered_raw_info_c_try_process, sub_buffered_raw_info_has_data,
31 sub_buffered_raw_info_try_process, sub_buffered_raw_try_process, sub_buffered_try_process,
32 sub_info_has_data, sub_info_pre_sample, sub_info_try_process, sub_inplace_has_data,
33 sub_inplace_try_process, timer_try_process,
34 },
35 node::NodeHandle,
36 spsc_ring::SpscRing,
37 triple_buffer::TripleBuffer,
38 types::{
39 ExecutorSemantics, GuardConditionHandle, HandleId, InvocationMode, NodeError,
40 RawResponseCallback, RawServiceCallback, RawSubscriptionCallback,
41 RawSubscriptionInfoCallback, ReadinessSnapshot, SpinOnceResult, SpinPeriodPollingResult,
42 Trigger,
43 },
44};
45
46// ============================================================================
47// Executor::open() factory method
48// ============================================================================
49
50#[cfg(feature = "rmw-cffi")]
51impl Executor {
52 /// Open a new executor session using the active RMW backend.
53 ///
54 /// Phase 115.M.4 — auto-registers the cffi vtable for whichever
55 /// backend the build was configured for, mirroring the C++ side's
56 /// `#ifdef NROS_RMW_<NAME>` fan-out in `<nros/node.hpp>`. The
57 /// runtime's atomic vtable slot is idempotent: a re-call of any
58 /// backend's `register()` is a no-op, so the fan-out below is safe
59 /// to invoke on every `Executor::open` (cheaper than a `Once` and
60 /// doesn't pull in `std::sync` for no_std targets).
61 ///
62 /// Connects to the middleware at the locator specified in `config`.
63 ///
64 /// # Example
65 ///
66 /// ```ignore
67 /// let config = ExecutorConfig::from_env().node_name("my_node");
68 /// let mut executor = Executor::open(&config)?;
69 /// ```
70 pub fn open(config: &ExecutorConfig<'_>) -> Result<Self, NodeError> {
71 use nros_rmw::Rmw;
72
73 // Phase 128.A.3 / 249 P4b.1 — manifest-driven backend selection.
74 //
75 // Every linked backend self-registered via its `.init_array`
76 // ctor before `main` (RFC-0042 §D3.3), so the registry is
77 // already populated — no runtime section walk.
78 //
79 // 1. Consult `$NROS_RMW` (when std/env is available) for
80 // explicit override, mirroring ROS 2's `RMW_IMPLEMENTATION`.
81 // 2. With no selector, pick the unique registered backend.
82 // Zero registered → `NoBackend`; more than one →
83 // `Ambiguous` (user must set `$NROS_RMW` or use
84 // `Executor::open_multi`).
85 let selector = read_rmw_selector_env();
86 // `as_deref()` on `Option<Vec<u8>>` yields `Option<&[u8]>`;
87 // on the no_std `Option<&'static [u8]>` variant it's a
88 // no-op the lint catches but the std signature still
89 // requires the call. Allowed locally.
90 #[allow(clippy::needless_option_as_deref)]
91 let sel_ref = selector.as_deref();
92 match nros_rmw_cffi::resolve_backend(sel_ref) {
93 nros_rmw_cffi::BackendResolution::Single(_) => {}
94 // Map every non-`Single` outcome to a transport
95 // ConnectionFailed for now; the more granular ret codes
96 // (NO_BACKEND / AMBIGUOUS / UNKNOWN) are exposed to C
97 // callers via `nros_init`'s return value (Phase 128.C.2).
98 _ => return Err(NodeError::Transport(TransportError::ConnectionFailed)),
99 }
100
101 let rmw_config = nros_rmw::RmwConfig {
102 locator: config.locator,
103 mode: config.mode,
104 domain_id: config.domain_id,
105 node_name: config.node_name,
106 namespace: config.namespace,
107 properties: &[],
108 };
109 let session = if let Some(name) = sel_ref {
110 // Selector path: route to the specific named backend so
111 // the env-var-disambiguated outcome matches what the
112 // resolver above identified.
113 nros_rmw_cffi::CffiRmw::open_with_rmw(
114 core::str::from_utf8(name).unwrap_or(""),
115 &rmw_config,
116 )
117 } else {
118 nros_rmw_cffi::CffiRmw.open(&rmw_config)
119 }
120 .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))?;
121 let mut executor = Self::from_session(session);
122 #[cfg(not(feature = "std"))]
123 {
124 executor.clock_us_fn = config.clock_us;
125 executor.last_spin_end_us = config.clock_us.map(|clock| clock());
126 }
127 executor.set_node_identity(config.node_name, config.namespace);
128 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
129 executor.install_wake_signal_on_primary();
130 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
131 executor.install_wake_signal_on_primary_alloc();
132 Ok(executor)
133 }
134
135 /// Phase 128.F.1 — explicit per-backend session declaration for
136 /// bridge mode. `specs[0]` becomes the primary session; `specs[1..]`
137 /// open as extras keyed by RMW name. After construction, every
138 /// `create_node_on(name, rmw)` call dispatches to whichever
139 /// session was opened under that RMW name (or, when the rmw name
140 /// matches the primary, the primary session itself).
141 ///
142 /// Single-backend callers should keep using
143 /// [`open`](Self::open) — this entry costs an extra
144 /// `open_with_rmw` per spec and adds no value when only one
145 /// backend is linked.
146 ///
147 /// `$NROS_RMW` env is ignored: bridge mode wants explicit names.
148 #[cfg(feature = "rmw-cffi")]
149 pub fn open_multi(specs: &[SessionSpec<'_>]) -> Result<Self, NodeError> {
150 // Phase 249 P4b.1 — backends self-registered via their
151 // `.init_array` ctor before `main`; no runtime section walk.
152 let primary = specs
153 .first()
154 .ok_or(NodeError::Transport(TransportError::ConnectionFailed))?;
155 let primary_session =
156 nros_rmw_cffi::CffiRmw::open_with_rmw(primary.rmw, &primary.to_rmw_config())
157 .map_err(NodeError::Transport)?;
158 let mut executor = Self::from_session(primary_session);
159 executor.set_node_identity("", "/");
160 // Phase 156 — see `Executor::open` for primary-identity
161 // recording rationale.
162 let _ = executor.primary_rmw_name.push_str(primary.rmw);
163 let _ = executor.primary_locator.push_str(primary.locator);
164 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
165 executor.install_wake_signal_on_primary();
166 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
167 executor.install_wake_signal_on_primary_alloc();
168
169 for spec in specs.iter().skip(1) {
170 let session = nros_rmw_cffi::CffiRmw::open_with_rmw(spec.rmw, &spec.to_rmw_config())
171 .map_err(NodeError::Transport)?;
172 executor
173 .extra_sessions
174 .push(session)
175 .map_err(|_| NodeError::NodeTableFull)?;
176 #[cfg(feature = "std")]
177 {
178 let idx = executor.extra_sessions.len() - 1;
179 executor.install_wake_signal_on_extra(idx);
180 }
181 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
182 {
183 let idx = executor.extra_sessions.len() - 1;
184 executor.install_wake_signal_on_extra_alloc(idx);
185 }
186 }
187
188 Ok(executor)
189 }
190
191 /// Phase 104.C.1 — open the Executor against a specific RMW
192 /// backend by name. Selects from the named registry (Phase
193 /// 104.B.2). `rmw_name` must match one of the names a backend
194 /// registered under (`"zenoh"`, `"cyclonedds"`, `"xrce"`, …).
195 ///
196 /// Equivalent to [`Executor::open`] when the registry has exactly
197 /// one backend (the default-backend fast path). Use this entry
198 /// point in multi-backend builds where `Executor::open` would
199 /// pick the first-registered slot.
200 ///
201 /// Single-Executor multi-Node multi-RMW (the long-term Design X
202 /// from `docs/roadmap/phase-104-multi-backend-bridges.md`) is
203 /// follow-up work — Phase 104.C.2 + C.3.
204 #[cfg(feature = "rmw-cffi")]
205 pub fn open_with_rmw(rmw_name: &str, config: &ExecutorConfig<'_>) -> Result<Self, NodeError> {
206 if !nros_rmw_cffi::backend_registered() {
207 return Err(NodeError::Transport(TransportError::ConnectionFailed));
208 }
209
210 let rmw_config = nros_rmw::RmwConfig {
211 locator: config.locator,
212 mode: config.mode,
213 domain_id: config.domain_id,
214 node_name: config.node_name,
215 namespace: config.namespace,
216 properties: &[],
217 };
218 let session = nros_rmw_cffi::CffiRmw::open_with_rmw(rmw_name, &rmw_config)
219 .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))?;
220 let mut executor = Self::from_session(session);
221 #[cfg(not(feature = "std"))]
222 {
223 executor.clock_us_fn = config.clock_us;
224 executor.last_spin_end_us = config.clock_us.map(|clock| clock());
225 }
226 executor.set_node_identity(config.node_name, config.namespace);
227 // Phase 156 — record primary identity for the session-
228 // cache hit path. See `Executor::open` for the rationale.
229 let _ = executor.primary_rmw_name.push_str(rmw_name);
230 let _ = executor.primary_locator.push_str(config.locator);
231 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
232 executor.install_wake_signal_on_primary();
233 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
234 executor.install_wake_signal_on_primary_alloc();
235 Ok(executor)
236 }
237}
238
239/// Phase 128.F.1 — per-backend session declaration for
240/// [`Executor::open_multi`]. Each spec names an RMW backend (must
241/// match one a backend registered under via
242/// `nros_rmw_cffi_register_named` / the `RMW_INIT_ENTRIES` linker
243/// section) and the locator + domain id to open against it.
244#[cfg(feature = "rmw-cffi")]
245#[derive(Clone, Copy)]
246pub struct SessionSpec<'cfg> {
247 pub rmw: &'cfg str,
248 pub locator: &'cfg str,
249 pub domain_id: u32,
250 pub node_name: &'cfg str,
251 pub namespace: &'cfg str,
252}
253
254#[cfg(feature = "rmw-cffi")]
255impl<'cfg> SessionSpec<'cfg> {
256 /// Minimal spec — just RMW name + locator. Domain id defaults to
257 /// 0; node name and namespace are empty.
258 pub const fn new(rmw: &'cfg str, locator: &'cfg str) -> Self {
259 Self {
260 rmw,
261 locator,
262 domain_id: 0,
263 node_name: "",
264 namespace: "",
265 }
266 }
267
268 pub const fn domain_id(mut self, domain_id: u32) -> Self {
269 self.domain_id = domain_id;
270 self
271 }
272
273 pub const fn node_name(mut self, name: &'cfg str) -> Self {
274 self.node_name = name;
275 self
276 }
277
278 pub const fn namespace(mut self, ns: &'cfg str) -> Self {
279 self.namespace = ns;
280 self
281 }
282
283 fn to_rmw_config(self) -> nros_rmw::RmwConfig<'cfg> {
284 nros_rmw::RmwConfig {
285 locator: self.locator,
286 mode: nros_rmw::SessionMode::Client,
287 domain_id: self.domain_id,
288 node_name: self.node_name,
289 namespace: self.namespace,
290 properties: &[],
291 }
292 }
293}
294
295// Phase 128.A.3 — selector for the single-backend resolution path.
296//
297// On hosted (`std`) builds, read `$NROS_RMW`; mirrors ROS 2's
298// `RMW_IMPLEMENTATION`. Returns the name as a byte vector so the
299// caller can pass it to `nros_rmw_cffi::resolve_backend` and (when
300// `Some`) to `CffiRmw::open_with_rmw`.
301//
302// On `no_std` / bare-metal builds, environment variables are not
303// available; resolution always falls through to the single-backend
304// or ambiguous path. Embedded users with multiple backends use the
305// bridge surface `Executor::open_multi` instead.
306#[cfg(all(feature = "std", feature = "rmw-cffi"))]
307fn read_rmw_selector_env() -> Option<alloc::vec::Vec<u8>> {
308 let raw = std::env::var_os("NROS_RMW")?;
309 let bytes = raw.as_encoded_bytes();
310 if bytes.is_empty() {
311 return None;
312 }
313 Some(bytes.to_vec())
314}
315
316#[cfg(all(not(feature = "std"), feature = "rmw-cffi"))]
317fn read_rmw_selector_env() -> Option<&'static [u8]> {
318 None
319}
320
321// ============================================================================
322// SessionStore — owned or borrowed session
323// ============================================================================
324
325/// Session storage: owned or borrowed via raw pointer.
326///
327/// The C API creates a session in `nros_support_init()` before the
328/// executor. `Borrowed` lets the executor use that session without owning it.
329#[allow(clippy::large_enum_variant)]
330pub(crate) enum SessionStore {
331 Owned(session::ConcreteSession),
332 Borrowed(*mut session::ConcreteSession),
333}
334
335impl core::ops::Deref for SessionStore {
336 type Target = session::ConcreteSession;
337 fn deref(&self) -> &session::ConcreteSession {
338 match self {
339 SessionStore::Owned(s) => s,
340 SessionStore::Borrowed(ptr) => unsafe { &**ptr },
341 }
342 }
343}
344
345impl core::ops::DerefMut for SessionStore {
346 fn deref_mut(&mut self) -> &mut session::ConcreteSession {
347 match self {
348 SessionStore::Owned(s) => s,
349 SessionStore::Borrowed(ptr) => unsafe { &mut **ptr },
350 }
351 }
352}
353
354/// Phase 228.E — an opaque, `Send` handle to an [`Executor`]'s RMW session.
355///
356/// In the per-tier model the boot executor opens the one session and hands each
357/// spawned tier task a handle (not a borrow) so the task opens its own
358/// [`Executor`] over that *same* session across the RTOS task boundary. Wrapping
359/// the `pub(crate)` session pointer lets board crates (`nros-board-posix`,
360/// `nros-board-freertos`, …) name + move the handle without naming the session
361/// type. Obtain via [`Executor::session_handle`]; consume via
362/// [`Executor::open_with_session_handle`].
363#[cfg(any(has_rmw, test))]
364pub struct SessionHandle(*mut session::ConcreteSession);
365
366// SAFETY: the per-tier model deliberately shares one session across RTOS tasks;
367// concurrent access is serialized by the RMW backend's internal locks (the RTOS
368// targets build zenoh-pico `Z_FEATURE_MULTI_THREAD=1` — RFC-0032 §5.0). The
369// boot executor owns the session and outlives every tier task.
370#[cfg(any(has_rmw, test))]
371unsafe impl Send for SessionHandle {}
372
373/// Phase 228.C — pure callback-group filter decision. `None` = wildcard (accept
374/// every group); `Some` = accept only listed groups. Backs
375/// [`Executor::group_active`]; split out so the logic is unit-testable without a
376/// live session.
377pub(crate) fn group_filter_accepts<const N: usize, const M: usize>(
378 active: &Option<heapless::Vec<heapless::String<N>, M>>,
379 group: &str,
380) -> bool {
381 match active {
382 None => true,
383 Some(v) => v.iter().any(|g| g.as_str() == group),
384 }
385}
386
387#[cfg(test)]
388mod group_filter_tests {
389 use super::group_filter_accepts;
390
391 type Groups = heapless::Vec<heapless::String<32>, { crate::config::MAX_NODES }>;
392
393 #[test]
394 fn wildcard_accepts_all() {
395 let none: Option<Groups> = None;
396 assert!(group_filter_accepts(&none, "anything"));
397 }
398
399 #[test]
400 fn set_accepts_only_listed_groups() {
401 let mut v: Groups = heapless::Vec::new();
402 let mut s = heapless::String::new();
403 s.push_str("ctrl").unwrap();
404 v.push(s).unwrap();
405 let active = Some(v);
406 assert!(group_filter_accepts(&active, "ctrl"));
407 assert!(!group_filter_accepts(&active, "telem"));
408 }
409}
410
411// ============================================================================
412// Executor
413// ============================================================================
414
415/// Backend-agnostic executor that owns a session.
416///
417/// Provides `create_node()` for entity creation and `drive_io()` for polling.
418///
419/// # Callback Mode
420///
421/// The executor supports arena-based callback registration via the
422/// `node_mut(id).subscription(t)` builder and
423/// [`register_service()`](Self::register_service), with dispatch via
424/// [`spin_once()`](Self::spin_once). No heap allocation is needed.
425///
426/// The sizes are set via `NROS_EXECUTOR_MAX_CBS` (default 4) and
427/// `NROS_EXECUTOR_ARENA_SIZE` (default 4096) environment variables at build time.
428///
429/// Phase 124.B.2 — opaque context handed to the runtime wake
430/// callback. Backends store the raw pointer + invoke the callback;
431/// the callback decodes back to `&WakeCtx`.
432#[cfg(all(feature = "std", feature = "rmw-cffi"))]
433pub(crate) struct WakeCtx {
434 pub(crate) flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
435 pub(crate) cv: std::sync::Arc<std::sync::Condvar>,
436 #[allow(dead_code)] // Held by spin_once's wait predicate (124.B.4).
437 pub(crate) mu: std::sync::Arc<std::sync::Mutex<()>>,
438 /// Phase 130.3 — Zephyr+std uses the k_sem wake primitive; the
439 /// runtime cb signals both this and the std cv so a future
440 /// migration to a single primitive flips one branch instead
441 /// of two.
442 pub(crate) node_wake: Option<std::sync::Arc<super::node_wake::NodeWake>>,
443}
444
445/// Phase 124.B.2 — runtime wake callback.
446///
447/// RT-context contract:
448///
449/// * **Thread-safe**: callable from any thread. The cb is lock-free
450/// on the cv path — no mutex held during `notify_all`. Lost-wakeup
451/// is prevented by the waiter checking `wake_flag` under
452/// `wake_mu` via the `wait_timeout_while` predicate.
453/// * **NOT async-signal-safe on POSIX**: `pthread_cond_signal`
454/// isn't on the POSIX async-signal-safe function list. For POSIX
455/// signal handler wake, use a `signalfd` + select pattern in a
456/// thread that owns the wake duty.
457/// * **RTOS ISR**: per-RTOS platform layer wraps the cv with an
458/// ISR-safe primitive (`xSemaphoreGiveFromISR`,
459/// `tx_event_flags_set` from ISR, `k_sem_give` from ISR on
460/// Zephyr). Backend's ISR caller routes through the platform's
461/// `signal_from_isr` API instead of this cb directly.
462/// * **Bounded execution time**: O(1) — atomic store + cv notify.
463/// No allocation, no contended lock.
464///
465/// The cb is the symbol backends invoke from their async wake path
466/// (datagram arrival, worker-thread enqueue, etc.). It does
467/// flag-write + condvar-signal in that order, lock-free.
468#[cfg(all(feature = "std", feature = "rmw-cffi"))]
469pub(crate) unsafe extern "C" fn nros_rmw_runtime_wake_cb(ctx: *mut core::ffi::c_void) {
470 if ctx.is_null() {
471 return;
472 }
473 // Phase 141.B.2 — capture T0 at cb entry. No-op when the
474 // probe feature is off or no cycle reader is installed.
475 #[cfg(feature = "wake-latency-probe")]
476 super::wake_probe::on_wake();
477 // SAFETY: ctx points at a `WakeCtx` owned by an Executor still
478 // alive at the time of the call. Executor::drop must clear the
479 // callback via `set_wake_callback(None, _)` on all sessions
480 // before dropping wake_ctx; this happens in `install_wake_*`
481 // teardown path.
482 let wake = unsafe { &*(ctx as *const WakeCtx) };
483 wake.flag.store(true, std::sync::atomic::Ordering::SeqCst);
484 // Lock-free notify. The waiter observes wake_flag under wake_mu
485 // in its wait_timeout_while predicate — flag.store with SeqCst
486 // happens-before any subsequent acquire in the waiter, so the
487 // waiter cannot miss the signal even though we don't hold mu
488 // here. Standard pthread cond-var idiom.
489 wake.cv.notify_all();
490 // Phase 130.3 — Zephyr+std waits on `NodeWake` (k_sem) instead
491 // of the std cv. Signal both so the cb keeps working whichever
492 // wait primitive spin_once is using.
493 if let Some(nw) = wake.node_wake.as_ref() {
494 nw.signal();
495 }
496}
497
498/// Phase 124.B.7.c — POSIX signalfd worker.
499///
500/// Owns a Linux `eventfd` plus a worker thread that `read()`s the
501/// fd and forwards via `wake_ctx.cv.notify_all()`. The eventfd
502/// write side is async-signal-safe per the kernel contract
503/// (`write(2)` to an eventfd is permitted from signal handlers),
504/// closing the gap that `pthread_cond_signal` leaves open on POSIX.
505///
506/// Lifecycle:
507/// * Constructed lazily in `Executor::signal_fd()` on first
508/// caller request.
509/// * `Drop` writes a shutdown sentinel + joins the worker.
510///
511/// Caller flow (signal handler):
512/// 1. Get fd via `Executor::signal_fd()` before installing the
513/// handler.
514/// 2. Handler does `eventfd_write(fd, 1)` (equivalently,
515/// `write(fd, &1u64, 8)`).
516/// 3. Worker thread reads the fd, signals wake_cv. spin_once
517/// blocked in cv.wait_timeout_while sees flag=true and exits.
518#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
519pub struct WakeSignalFd {
520 fd: core::ffi::c_int,
521 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
522 worker: Option<std::thread::JoinHandle<()>>,
523}
524
525#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
526impl WakeSignalFd {
527 /// Spawn the worker. `wake_ctx_ptr` is the `*const WakeCtx`
528 /// produced by `Executor::wake_ctx_ptr` — same value the
529 /// runtime wake cb decodes.
530 fn new(wake_ctx_ptr: *const WakeCtx) -> Result<Self, std::io::Error> {
531 let fd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC) };
532 if fd < 0 {
533 return Err(std::io::Error::last_os_error());
534 }
535
536 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
537 let shutdown_clone = std::sync::Arc::clone(&shutdown);
538
539 // Pass wake_ctx pointer as usize so the closure is Send.
540 // SAFETY: the pointer is valid for the Executor's lifetime
541 // (WakeCtx is owned by the Executor's `wake_ctx: Arc<WakeCtx>`
542 // field which outlives this worker thread — we join in Drop).
543 let ctx_addr = wake_ctx_ptr as usize;
544 let worker = std::thread::Builder::new()
545 .name("nros-wakefd".into())
546 .spawn(move || {
547 let ctx = ctx_addr as *const WakeCtx;
548 loop {
549 let mut buf = [0u8; 8];
550 let n =
551 unsafe { libc::read(fd, buf.as_mut_ptr() as *mut core::ffi::c_void, 8) };
552 if n <= 0 {
553 // EINTR / EOF — re-check shutdown then loop.
554 if shutdown_clone.load(std::sync::atomic::Ordering::Acquire) {
555 return;
556 }
557 continue;
558 }
559 if shutdown_clone.load(std::sync::atomic::Ordering::Acquire) {
560 return;
561 }
562 // Same effect as nros_rmw_runtime_wake_cb. We
563 // can't call it directly because it dereferences
564 // ctx as &WakeCtx which would race with Executor
565 // drop unless we hold a guarantee — the
566 // shutdown_flag check above + Drop's join gives it.
567 unsafe {
568 let w = &*ctx;
569 w.flag.store(true, std::sync::atomic::Ordering::SeqCst);
570 w.cv.notify_all();
571 }
572 }
573 })
574 .map_err(|e| {
575 unsafe { libc::close(fd) };
576 std::io::Error::other(alloc::format!("spawn nros-wakefd worker: {e}"))
577 })?;
578
579 Ok(Self {
580 fd,
581 shutdown,
582 worker: Some(worker),
583 })
584 }
585
586 /// Returns the writable eventfd. The caller (typically a POSIX
587 /// signal handler) writes any non-zero 8-byte value to trigger
588 /// a wake. `write(2)` on an eventfd is async-signal-safe per
589 /// `eventfd(2)` man page.
590 pub fn fd(&self) -> core::ffi::c_int {
591 self.fd
592 }
593}
594
595#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
596impl Drop for WakeSignalFd {
597 fn drop(&mut self) {
598 self.shutdown
599 .store(true, std::sync::atomic::Ordering::Release);
600 // Wake the worker so it re-checks shutdown.
601 let one: u64 = 1;
602 unsafe {
603 libc::write(self.fd, &one as *const u64 as *const core::ffi::c_void, 8);
604 }
605 if let Some(j) = self.worker.take() {
606 let _ = j.join();
607 }
608 unsafe { libc::close(self.fd) };
609 }
610}
611
612/// Phase 124.B.7.b — ISR / interrupt-context wake callback.
613///
614/// Same semantics as [`nros_rmw_runtime_wake_cb`] but constrained to
615/// async-signal-safe / ISR-safe primitives.
616///
617/// Per-platform routing:
618///
619/// * **POSIX (std)**: `pthread_cond_signal` is NOT on the POSIX
620/// async-signal-safe function list. Calling from a SIGUSR1
621/// handler is technically UB. Real fix (Phase 124.B.7.c) routes
622/// via `signalfd`/`eventfd` + a runtime worker thread; until that
623/// lands, signal-handler callers MUST use
624/// `nros_guard_condition_trigger` from a **separate thread** (not
625/// from the handler itself), OR set the wake_flag and rely on
626/// the next poll deadline. This cb currently aliases the regular
627/// `wake_cb` and is safe only from non-signal-handler ISR-like
628/// contexts (e.g. timer thread, kernel callback).
629///
630/// * **RTOS no_std (Zephyr/FreeRTOS/ThreadX)**: routes through the
631/// platform-cffi `condvar_signal_from_isr` slot. Each backend
632/// uses its ISR-safe variant — `xSemaphoreGiveFromISR`,
633/// `tx_semaphore_put`, `k_condvar_signal`.
634///
635/// `ctx` semantics identical to [`nros_rmw_runtime_wake_cb`].
636#[cfg(all(feature = "std", feature = "rmw-cffi"))]
637#[allow(dead_code)] // Public exposure pending B.7.c signalfd worker.
638pub(crate) unsafe extern "C" fn nros_rmw_runtime_wake_cb_from_isr(ctx: *mut core::ffi::c_void) {
639 // Today: alias regular wake_cb. POSIX signal-handler safety
640 // pending B.7.c (signalfd worker-thread forward). Documented in
641 // the contract above so callers know the boundary.
642 unsafe { nros_rmw_runtime_wake_cb(ctx) };
643}
644
645/// Phase 216 follow-up — per-Node dispatch trampoline registered with
646/// [`Executor::register_dispatch_slot`].
647///
648/// The board-side dispatch task (RTIC `__nros_run` / Embassy
649/// `__nros_run_task`) dequeues a `nros_platform::SignaledCallback`
650/// envelope and forwards `(cb_id, ctx_ptr)` into
651/// [`Executor::dispatch_callback`]; that method linear-scans this
652/// slot table and invokes every registered `on_callback` with the
653/// owning Node's per-Node `state` blob. Each Node's
654/// `__nros_node_<pkg>_on_callback` self-filters on its own
655/// `CallbackId` tag set, so a slot whose Node doesn't own this
656/// callback is a cheap no-op string compare.
657///
658/// The shape mirrors the per-pkg `__nros_node_<pkg>_on_callback`
659/// extern "C" trampoline emitted by the `nros::node!()` macro
660/// (see `packages/core/nros-macros/src/lib.rs` Phase 216.A.5).
661///
662/// # Why not `linkme`
663///
664/// `linkme::distributed_slice` hangs on bare-metal Cortex-M /
665/// RISC-V because `cortex_m_rt`'s link script doesn't provide the
666/// `__start_/__stop_` section anchors in a shape that lets the
667/// iterator terminate (see
668/// `packages/core/nros-rmw-cffi/src/section.rs` Phase 142). Since
669/// stm32f4 RTIC / Embassy boards are the whole point of Phase 216,
670/// the registry uses the explicit `register()` pattern from Phase
671/// 104.A.
672#[derive(Clone, Copy)]
673pub struct DispatchSlot {
674 /// Owning Node's `State` blob — produced by the macro-emitted
675 /// `i()` and round-tripped through
676 /// `nros::__private_node_state_into_raw`. Opaque to the
677 /// executor.
678 pub state: *mut core::ffi::c_void,
679 /// Per-Node `extern "C"` trampoline; signature matches the
680 /// `__nros_node_<pkg>_on_callback` symbol the `nros::node!()`
681 /// macro emits.
682 pub on_callback: unsafe extern "C" fn(
683 state: *mut core::ffi::c_void,
684 cb_id_ptr: *const u8,
685 cb_id_len: usize,
686 ctx: *mut core::ffi::c_void,
687 ),
688}
689
690// SAFETY: `DispatchSlot` carries two raw pointers (`state` + an
691// extern "C" fn pointer). The fn pointer is `Send`/`Sync` by
692// definition; the `state` pointer's `Send`/`Sync` story matches the
693// owning `Executor` (which is `unsafe impl Send`). Treating the
694// slot itself as `Send` keeps the existing `Executor` Send impl
695// intact — see `unsafe impl Send for Executor {}` later in this
696// file.
697unsafe impl Send for DispatchSlot {}
698unsafe impl Sync for DispatchSlot {}
699
700/// Phase 258 (Track 2, 2a) — executor-owned component tick slot.
701///
702/// The layering-clean half of the W0-B `install` seam's tick fix
703/// (phase-257 D2). A `nros`-layer `install`/`register_node_borrowed`
704/// builds an `Arc<ComponentCell>` (the typed/poll-driven component
705/// state) and enrolls it here via [`Executor::enroll_component`]; the
706/// executor then drives `tick` on every enrolled slot at the tail of
707/// each [`spin_once`](Executor::spin_once) — so `install`'d nodes
708/// (C, C++, **and Rust owned-spin**) tick, closing the
709/// service-client/action poll gap that the callback-`Arc`-only
710/// lifetime left open.
711///
712/// Like [`DispatchSlot`] the executor only sees raw pointers + `extern
713/// "C"` fn pointers (no `nros` dep — `nros-node` is the lower layer):
714///
715/// * `state` — a *leaked* `Arc<ComponentCell>` (via `Arc::into_raw`),
716/// re-borrowed by the `nros`-side `tick`/`drop` fns. Unlike a
717/// pub/sub/timer component (kept alive by the executor's per-entity
718/// callback `Arc` clones), a poll-only component has no callbacks, so
719/// the slot must own a clone of the cell — hence the paired `drop`.
720/// * `tick` — `nros`-side `extern "C"` fn that casts `state` back to
721/// `&ComponentCell`, casts `exec_ctx` back to `*mut Executor`, and
722/// runs that one cell's tick (mirrors `ExecutorNodeRuntime::run_ticks`).
723/// * `drop` — `nros`-side `extern "C"` fn run on `Executor::drop` that
724/// reconstitutes + drops the leaked `Arc`, so the executor owns the
725/// cell's lifetime.
726///
727/// Kept a SEPARATE registry from [`DispatchSlot`] on purpose: framework
728/// dispatch (RTIC / Embassy) is interrupt-driven, name-keyed, and has no
729/// tick/own concern — mixing the two risks that path.
730#[derive(Clone, Copy)]
731pub struct ComponentSlot {
732 /// Leaked `Arc<ComponentCell>` (opaque to the executor). Owned by
733 /// this slot — dropped via `drop` on `Executor::drop`.
734 pub state: *mut core::ffi::c_void,
735 /// `nros`-side tick trampoline: `(state, exec_ctx)` where `exec_ctx`
736 /// is `*mut Executor`. Drives one component's `tick`.
737 pub tick: unsafe extern "C" fn(state: *mut core::ffi::c_void, exec_ctx: *mut core::ffi::c_void),
738 /// `nros`-side drop trampoline: reconstitutes + drops the leaked
739 /// `Arc<ComponentCell>` at `state`. Run once on `Executor::drop`.
740 pub drop: unsafe extern "C" fn(state: *mut core::ffi::c_void),
741}
742
743// SAFETY: same story as `DispatchSlot` — two raw pointers + two extern
744// "C" fn pointers. The `state` pointer's Send/Sync matches the owning
745// `Executor` (`unsafe impl Send for Executor`); the fn pointers are
746// Send/Sync by definition.
747unsafe impl Send for ComponentSlot {}
748unsafe impl Sync for ComponentSlot {}
749
750pub struct Executor {
751 pub(crate) session: SessionStore,
752 pub(crate) arena: [MaybeUninit<u8>; crate::config::ARENA_SIZE],
753 pub(crate) arena_used: usize,
754 pub(crate) entries: [Option<CallbackMeta>; crate::config::MAX_CBS],
755 /// Phase 110.B — registered scheduling contexts. Slot 0 is
756 /// auto-populated with a `Fifo` SC at construction; every entry
757 /// without an explicit binding maps to it via
758 /// `sched_context_bindings`.
759 pub(crate) sched_contexts: [Option<super::sched_context::SchedContext>; crate::config::MAX_SC],
760 /// Per-entry SC binding parallel to `entries`. Defaults to
761 /// `SchedContextId(0)` (the auto-created Fifo SC).
762 pub(crate) sched_context_bindings:
763 [super::sched_context::SchedContextId; crate::config::MAX_CBS],
764 /// Phase 110.E — user-space sporadic-server budget state per
765 /// Sporadic-class SC. Slot indices match `sched_contexts`; non-
766 /// Sporadic slots stay `None`.
767 pub(crate) sporadic_states:
768 [Option<super::sched_context::SporadicState>; crate::config::MAX_SC],
769 /// Phase 110.E.b — atomic sporadic state + opaque platform-timer
770 /// handle for ISR-driven refill. Populated by
771 /// `register_sporadic_timer`; dropped on Executor `Drop` via the
772 /// stored `destroy_fn`.
773 #[cfg(feature = "alloc")]
774 pub(crate) sporadic_atomic_states: [Option<(
775 portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>,
776 OpaqueTimerHandle,
777 )>; crate::config::MAX_SC],
778 /// Phase 110.G — major-frame length for time-triggered dispatch.
779 /// `0` (default) disables the TT gate entirely; non-zero enables
780 /// gating per
781 /// `SchedContext.tt_window_offset_us / tt_window_duration_us`.
782 pub(crate) major_frame_us: u32,
783 /// Phase 110.F — per-OS-priority worker pool. Lazily populated
784 /// on first dispatch routing to a non-zero `os_pri`. Lives
785 /// behind `feature = "scheduler-os-priority"` + `feature =
786 /// "std"` because workers need `std::thread` + `mpsc`.
787 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
788 pub(crate) os_priority_workers: std::collections::HashMap<u8, OsPriorityWorker>,
789 /// Phase 110.F — caller-supplied `apply_policy` function pointer
790 /// each worker invokes at startup to elevate its OS priority.
791 /// `None` = the worker pool is disabled; entries bound to non-
792 /// zero `os_pri` SCs fall back to the cooperative path.
793 /// Mirrors `Executor::open_threaded`'s `apply_policy: fn(...)`
794 /// shape — keeps Executor non-generic over Platform.
795 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
796 pub(crate) os_priority_apply_policy:
797 Option<fn(nros_platform_api::SchedPolicy) -> Result<(), nros_platform_api::SchedError>>,
798 pub(crate) trigger: Trigger,
799 pub(crate) semantics: ExecutorSemantics,
800 /// Node name for entities created via `register_subscription`/`register_service`.
801 /// Empty means unset — no liveliness tokens will be declared.
802 pub(crate) node_name: heapless::String<64>,
803 /// Phase 228.C — per-tier callback-group filter. `None` = wildcard (register
804 /// every callback — the single-tier degenerate case + today's behaviour).
805 /// `Some(groups)` = this tier's executor accepts only callbacks whose
806 /// `.callback_group()` is in the set; others are skipped at registration.
807 pub(crate) active_groups:
808 Option<heapless::Vec<heapless::String<32>, { crate::config::MAX_NODES }>>,
809 /// Node namespace (default: "/").
810 pub(crate) namespace: heapless::String<64>,
811 /// Phase 104.C.2 — rclcpp-style `add_node` table. Holds the
812 /// per-Node metadata (name, namespace, rmw, locator, default
813 /// SchedContext) for every Node attached to this Executor. The
814 /// implicit "primary" Node (NodeId(0)) mirrors `node_name` +
815 /// `namespace` above and is auto-populated on first use.
816 pub(crate) nodes: heapless::Vec<super::node_record::NodeRecord, { crate::config::MAX_NODES }>,
817 /// Phase 216 follow-up — per-Node dispatch trampoline registry.
818 ///
819 /// Populated by [`Executor::register_dispatch_slot`]; walked by
820 /// [`Executor::dispatch_callback`] each time the board-side
821 /// dispatch task hands off a `SignaledCallback` envelope.
822 /// Sized by `MAX_NODES` because the upper-bound is one slot per
823 /// Node pkg deployed on this executor (the same upper bound used
824 /// by `nodes` and `extra_sessions`). `MAX_NODES` is driven by the
825 /// `NROS_EXECUTOR_MAX_NODES` build-script env var (default 4);
826 /// boards that deploy more Node pkgs raise it at build time.
827 ///
828 /// Default is `heapless::Vec::new()` (empty) — Nodes register
829 /// themselves explicitly via the `register_dispatch_slot` API.
830 /// The fallback shape avoids the `linkme` hazard on bare-metal
831 /// Cortex-M / RISC-V (see `DispatchSlot` doc).
832 pub(crate) dispatch_slots: heapless::Vec<DispatchSlot, { crate::config::MAX_NODES }>,
833 /// Phase 258 (Track 2, 2a) — executor-owned component tick registry.
834 /// Enrolled by [`Executor::enroll_component`] (from `nros`'s
835 /// `install`/`register_node_borrowed`); each slot's `tick` runs at the
836 /// tail of [`spin_once`](Self::spin_once); each slot's `drop` runs on
837 /// `Executor::drop`. Bounded `MAX_NODES` (matches `dispatch_slots` /
838 /// `nodes`). See [`ComponentSlot`] for why it's separate from
839 /// `dispatch_slots`.
840 pub(crate) component_slots: heapless::Vec<ComponentSlot, { crate::config::MAX_NODES }>,
841 /// Phase 104.C.3 — extra sessions opened by `node_builder.rmw()`
842 /// calls that named a backend different from the Executor's
843 /// primary session. Indexed by `NodeRecord.session_idx`
844 /// (1..=N maps to `extra_sessions[N-1]`; idx 0 is the primary
845 /// `self.session`). Sized by `NROS_EXECUTOR_MAX_NODES` since one
846 /// extra session per Node is the worst case.
847 pub(crate) extra_sessions:
848 heapless::Vec<session::ConcreteSession, { crate::config::MAX_NODES }>,
849 /// Phase 156 — primary session's rmw name + locator, captured
850 /// at `open*` time so `NodeBuilder::resolve_session_slot`'s
851 /// cache lookup can detect when a `.rmw(name).locator(loc)`
852 /// matches the primary (slot 0) instead of falling through to
853 /// `CffiRmw::open_with_rmw` and trying to open a SECOND
854 /// session against the same backend. zenoh-pico's global state
855 /// is a process singleton; opening twice fails. Empty when
856 /// constructed via `from_session(_ptr)` without `open*`
857 /// recording the metadata; in that case the cache check
858 /// degrades to "always miss" (today's behaviour).
859 pub(crate) primary_rmw_name: heapless::String<32>,
860 pub(crate) primary_locator: heapless::String<128>,
861 #[cfg(feature = "std")]
862 pub(crate) halt_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
863 /// Phase 104.C.6 — shared executor wake flag. Any source of work
864 /// (foreign thread handing off a callback, signal handler, future
865 /// per-session vtable wake hook) sets this; `spin_once` swaps it to
866 /// `false` on entry and, if it was `true`, polls every session with
867 /// a 0-ms timeout instead of blocking. Lets one notification wake
868 /// the executor regardless of which session the user is currently
869 /// blocked on (the multi-RMW bridge case).
870 #[cfg(feature = "std")]
871 pub(crate) wake_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
872 /// Phase 124.B.2 — wake condvar paired with `wake_flag`. The
873 /// runtime-supplied wake callback (`nros_rmw_runtime_wake_cb` in
874 /// nros-rmw-cffi) writes `wake_flag = true` AND signals
875 /// `wake_cv` atomically under `wake_mu`. `spin_once` blocks on
876 /// the cv with a deadline instead of calling `drive_io` with the
877 /// user's timeout — sub-poll-period wake latency.
878 ///
879 /// Poll-only backends (NULL `set_wake_callback` slot) leave the
880 /// cb uninstalled; the cv wait still fires on its deadline,
881 /// then drive_io(0) drains whatever the backend's internal
882 /// poll has buffered.
883 #[cfg(feature = "std")]
884 #[allow(dead_code)] // Wired by spin_once after 124.B.4.
885 pub(crate) wake_cv: std::sync::Arc<std::sync::Condvar>,
886 #[cfg(feature = "std")]
887 #[allow(dead_code)]
888 pub(crate) wake_mu: std::sync::Arc<std::sync::Mutex<()>>,
889 /// Phase 130.3 — Zephyr+std uses `nros_platform_wake_*` (k_sem)
890 /// instead of `std::sync::Condvar` because Zephyr's libc
891 /// `pthread_cond_timedwait` hangs past its deadline. `None`
892 /// when the platform provider didn't link a wake primitive
893 /// (e.g. test builds with `rmw-cffi` but no `platform-*`
894 /// feature); spin_once falls back to driving the transport
895 /// for the full timeout in that case.
896 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
897 pub(crate) node_wake: Option<std::sync::Arc<super::node_wake::NodeWake>>,
898 /// Phase 130.4 — true when at least one session's backend
899 /// installed the wake callback. Drives whether `spin_once`
900 /// uses the wake-primitive wait (`NodeWake` / `Condvar`) or
901 /// just `drive_io(timeout_ms)`. Poll-only backends
902 /// (XRCE-DDS-Client, current Cyclone/dust-DDS shims) leave
903 /// this `false`; the wait then becomes a no-op sleep that
904 /// starves reliable retransmission (Phase 127.C.4 root
905 /// cause: server's `send_reply` flushes 100 ms once, then a
906 /// blind `wait_ms(100)` sleeps with zero session activity, so
907 /// the agent's ACK arrives into a stalled session and reliable
908 /// redelivery never fires).
909 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
910 pub(crate) has_async_wake: bool,
911 /// Phase 124.B.2 — opaque context Arc handed to backends via
912 /// `set_wake_callback`. Lazy-allocated on first install; stays
913 /// alive for the Executor's lifetime so the raw pointer stored
914 /// in backends remains valid.
915 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
916 pub(crate) wake_ctx: Option<std::sync::Arc<WakeCtx>>,
917 // Phase 141.A.3 — alloc-mode (no_std RTOS) mirror of the wake
918 // state above. Same semantics: `wake_flag_alloc` is set SeqCst
919 // by the runtime cb + cleared by spin_once on entry;
920 // `node_wake_alloc` is the kernel-native binary semaphore
921 // (lifted to alloc cfg in e36ee8cf) the cb signals;
922 // `wake_ctx_alloc` is the Arc handed to backends via
923 // `set_wake_callback(Some(cb), Arc::as_ptr(ctx) as *mut _)`.
924 // `has_async_wake_alloc` is `true` after the first session
925 // accepts the wake-cb install (`supports_wake_callback`).
926 // Drives the no_std spin_once wait branch to block on
927 // `node_wake_alloc.wait_ms(deadline)` instead of relying on
928 // `drive_io`'s transport-blocking recv for the full timeout.
929 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
930 pub(crate) wake_flag_alloc: portable_atomic_util::Arc<portable_atomic::AtomicBool>,
931 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
932 pub(crate) node_wake_alloc: Option<portable_atomic_util::Arc<super::node_wake::NodeWake>>,
933 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
934 pub(crate) wake_ctx_alloc: Option<portable_atomic_util::Arc<super::wake_alloc::WakeCtxAlloc>>,
935 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
936 pub(crate) has_async_wake_alloc: bool,
937 /// Phase 124.B.7.c — lazily-allocated POSIX signalfd worker.
938 /// Owned by the Executor; spawned on first `signal_fd()` call.
939 /// Drop joins the worker thread and closes the fd.
940 #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
941 pub(crate) signal_fd: Option<WakeSignalFd>,
942 #[cfg(feature = "param-services")]
943 pub(crate) params: Option<alloc::boxed::Box<crate::parameter_services::ParamState>>,
944 #[cfg(feature = "lifecycle-services")]
945 pub(crate) lifecycle:
946 Option<alloc::boxed::Box<crate::lifecycle_services::LifecycleRuntimeState>>,
947 /// Sub-millisecond wall-clock residual carried across `spin_once` calls
948 /// so timers tick at true wall-clock rate even when `drive_io` returns
949 /// in well under 1 ms (e.g. zenoh-pico condvar wakeups under load).
950 #[cfg(feature = "std")]
951 pub(crate) spin_residual_us: u64,
952 /// Sub-millisecond residual for no_std wall-clock timer accounting.
953 #[cfg(not(feature = "std"))]
954 pub(crate) spin_residual_us: u64,
955 /// Wall-clock instant at which the previous `spin_once` exited. The
956 /// timer delta on the next call is measured from this point so any
957 /// time the caller spent between `spin_once` invocations (e.g. an
958 /// explicit `thread::sleep`) counts toward timer accumulation just
959 /// like time spent inside `drive_io`.
960 #[cfg(feature = "std")]
961 pub(crate) last_spin_end: Option<std::time::Instant>,
962 /// Monotonic clock endpoint for no_std timer accounting.
963 #[cfg(not(feature = "std"))]
964 pub(crate) last_spin_end_us: Option<u64>,
965 /// Optional platform clock hook supplied by `ExecutorConfig`.
966 #[cfg(not(feature = "std"))]
967 pub(crate) clock_us_fn: Option<fn() -> u64>,
968}
969
970impl Executor {
971 /// Create an executor from an already-opened session.
972 // The `arena` field intentionally lives inline so embedded callers can
973 // place an `Executor` in `static` storage without an allocator. The
974 // construction expression is large but is RVO'd into its destination
975 // by the optimiser; the clippy lint flags it because it can't prove
976 // the move-elision.
977 #[allow(clippy::large_stack_arrays)]
978 pub fn from_session(session: session::ConcreteSession) -> Self {
979 // SAFETY: MaybeUninit::uninit() is always safe; these bytes are only
980 // accessed through properly-typed ptr::write / ptr::read via the
981 // dispatch function pointers stored in `entries`.
982 Self {
983 session: SessionStore::Owned(session),
984 arena: [MaybeUninit::uninit(); crate::config::ARENA_SIZE],
985 arena_used: 0,
986 entries: [None; crate::config::MAX_CBS],
987 sched_contexts: {
988 let mut s = [None; crate::config::MAX_SC];
989 s[0] = Some(super::sched_context::SchedContext::default());
990 s
991 },
992 sched_context_bindings: [super::sched_context::SchedContextId(0);
993 crate::config::MAX_CBS],
994 sporadic_states: [None; crate::config::MAX_SC],
995 #[cfg(feature = "alloc")]
996 sporadic_atomic_states: [const { None }; crate::config::MAX_SC],
997 major_frame_us: 0,
998 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
999 os_priority_workers: std::collections::HashMap::new(),
1000 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1001 os_priority_apply_policy: None,
1002 trigger: Trigger::Any,
1003 semantics: ExecutorSemantics::RclcppExecutor,
1004 node_name: heapless::String::new(),
1005 active_groups: None,
1006 nodes: heapless::Vec::new(),
1007 dispatch_slots: heapless::Vec::new(),
1008 component_slots: heapless::Vec::new(),
1009 extra_sessions: heapless::Vec::new(),
1010 primary_rmw_name: heapless::String::new(),
1011 primary_locator: heapless::String::new(),
1012 namespace: {
1013 let mut ns = heapless::String::new();
1014 let _ = ns.push_str("/");
1015 ns
1016 },
1017 #[cfg(feature = "std")]
1018 halt_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1019 #[cfg(feature = "std")]
1020 wake_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1021 #[cfg(feature = "std")]
1022 wake_cv: std::sync::Arc::new(std::sync::Condvar::new()),
1023 #[cfg(feature = "std")]
1024 wake_mu: std::sync::Arc::new(std::sync::Mutex::new(())),
1025 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1026 node_wake: super::node_wake::NodeWake::new().map(std::sync::Arc::new),
1027 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1028 wake_ctx: None,
1029 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1030 has_async_wake: false,
1031 // Phase 141.A.3 — alloc-mode wake state init. Constructed
1032 // eagerly (NodeWake allocation) so the runtime cb can be
1033 // installed lazily on first session without a fallible
1034 // alloc inside spin_once. `None` when the platform
1035 // provider reports the primitive unavailable (matches
1036 // the std-RTOS path's `node_wake: Option<...>`).
1037 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1038 wake_flag_alloc: portable_atomic_util::Arc::new(portable_atomic::AtomicBool::new(
1039 false,
1040 )),
1041 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1042 node_wake_alloc: super::node_wake::NodeWake::new().map(portable_atomic_util::Arc::new),
1043 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1044 wake_ctx_alloc: None,
1045 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1046 has_async_wake_alloc: false,
1047 #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
1048 signal_fd: None,
1049 #[cfg(feature = "param-services")]
1050 params: None,
1051 #[cfg(feature = "lifecycle-services")]
1052 lifecycle: None,
1053 #[cfg(feature = "std")]
1054 spin_residual_us: 0,
1055 #[cfg(not(feature = "std"))]
1056 spin_residual_us: 0,
1057 // Initialise the spin endpoint to construction time so the
1058 // very first `spin_once` credits time the caller spent
1059 // *before* it (e.g. setup, an explicit pre-spin sleep) just
1060 // like time spent between later calls.
1061 #[cfg(feature = "std")]
1062 last_spin_end: Some(std::time::Instant::now()),
1063 #[cfg(not(feature = "std"))]
1064 last_spin_end_us: None,
1065 #[cfg(not(feature = "std"))]
1066 clock_us_fn: None,
1067 }
1068 }
1069
1070 /// Create an executor from a borrowed session pointer.
1071 ///
1072 /// # Safety
1073 /// - `session_ptr` must point to a valid, initialized session that lives at
1074 /// least as long as this executor.
1075 /// - The caller must not move or drop the session while the executor exists.
1076 // See `from_session` for the lint rationale.
1077 #[allow(clippy::large_stack_arrays)]
1078 pub unsafe fn from_session_ptr(session_ptr: *mut session::ConcreteSession) -> Self {
1079 Self {
1080 session: SessionStore::Borrowed(session_ptr),
1081 arena: [MaybeUninit::uninit(); crate::config::ARENA_SIZE],
1082 arena_used: 0,
1083 entries: [None; crate::config::MAX_CBS],
1084 sched_contexts: {
1085 let mut s = [None; crate::config::MAX_SC];
1086 s[0] = Some(super::sched_context::SchedContext::default());
1087 s
1088 },
1089 sched_context_bindings: [super::sched_context::SchedContextId(0);
1090 crate::config::MAX_CBS],
1091 sporadic_states: [None; crate::config::MAX_SC],
1092 #[cfg(feature = "alloc")]
1093 sporadic_atomic_states: [const { None }; crate::config::MAX_SC],
1094 major_frame_us: 0,
1095 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1096 os_priority_workers: std::collections::HashMap::new(),
1097 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1098 os_priority_apply_policy: None,
1099 trigger: Trigger::Any,
1100 semantics: ExecutorSemantics::RclcppExecutor,
1101 node_name: heapless::String::new(),
1102 active_groups: None,
1103 nodes: heapless::Vec::new(),
1104 dispatch_slots: heapless::Vec::new(),
1105 component_slots: heapless::Vec::new(),
1106 extra_sessions: heapless::Vec::new(),
1107 primary_rmw_name: heapless::String::new(),
1108 primary_locator: heapless::String::new(),
1109 namespace: {
1110 let mut ns = heapless::String::new();
1111 let _ = ns.push_str("/");
1112 ns
1113 },
1114 #[cfg(feature = "std")]
1115 halt_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1116 #[cfg(feature = "std")]
1117 wake_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1118 #[cfg(feature = "std")]
1119 wake_cv: std::sync::Arc::new(std::sync::Condvar::new()),
1120 #[cfg(feature = "std")]
1121 wake_mu: std::sync::Arc::new(std::sync::Mutex::new(())),
1122 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1123 node_wake: super::node_wake::NodeWake::new().map(std::sync::Arc::new),
1124 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1125 wake_ctx: None,
1126 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1127 has_async_wake: false,
1128 // Phase 141.A.3 — alloc-mode wake state init. Constructed
1129 // eagerly (NodeWake allocation) so the runtime cb can be
1130 // installed lazily on first session without a fallible
1131 // alloc inside spin_once. `None` when the platform
1132 // provider reports the primitive unavailable (matches
1133 // the std-RTOS path's `node_wake: Option<...>`).
1134 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1135 wake_flag_alloc: portable_atomic_util::Arc::new(portable_atomic::AtomicBool::new(
1136 false,
1137 )),
1138 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1139 node_wake_alloc: super::node_wake::NodeWake::new().map(portable_atomic_util::Arc::new),
1140 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1141 wake_ctx_alloc: None,
1142 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1143 has_async_wake_alloc: false,
1144 #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
1145 signal_fd: None,
1146 #[cfg(feature = "param-services")]
1147 params: None,
1148 #[cfg(feature = "lifecycle-services")]
1149 lifecycle: None,
1150 #[cfg(feature = "std")]
1151 spin_residual_us: 0,
1152 #[cfg(not(feature = "std"))]
1153 spin_residual_us: 0,
1154 // Initialise the spin endpoint to construction time so the
1155 // very first `spin_once` credits time the caller spent
1156 // *before* it (e.g. setup, an explicit pre-spin sleep) just
1157 // like time spent between later calls.
1158 #[cfg(feature = "std")]
1159 last_spin_end: Some(std::time::Instant::now()),
1160 #[cfg(not(feature = "std"))]
1161 last_spin_end_us: None,
1162 #[cfg(not(feature = "std"))]
1163 clock_us_fn: None,
1164 }
1165 }
1166
1167 /// Phase 228.B (RFC-0015) — construct a tier task's executor that **shares**
1168 /// a session opened once by the orchestration `main()`.
1169 ///
1170 /// In the per-tier execution model `main()` opens one RMW session, then
1171 /// spawns one RTOS task per priority tier; each task calls this to get an
1172 /// [`Executor`] over the *same* session (the `Borrowed` session store — this
1173 /// executor neither owns nor closes it), registers its tier's callback
1174 /// groups, and spins. Thin alias over [`Executor::from_session_ptr`].
1175 ///
1176 /// # Safety
1177 /// `session` must outlive every executor/task built from it (the
1178 /// orchestration `main()` holds it and never returns / WFIs), and must not
1179 /// be mutated except through these executors' spin calls.
1180 pub unsafe fn open_with_session(session: *mut session::ConcreteSession) -> Self {
1181 unsafe { Self::from_session_ptr(session) }
1182 }
1183
1184 /// Raw pointer to this executor's RMW session, for the per-tier model:
1185 /// the boot task opens the one session via [`Executor::open`] (the RMW
1186 /// session is a process-wide singleton — opening twice fails), then hands
1187 /// this pointer to each spawned tier task's
1188 /// [`Executor::open_with_session`]. The boot task's executor owns the
1189 /// session and outlives every borrower, so the pointer stays valid for the
1190 /// program's life. Works for both `Owned` and `Borrowed` stores.
1191 ///
1192 /// # Safety
1193 /// The returned pointer aliases `self.session`. Callers must keep `self`
1194 /// alive (not moved/dropped) for as long as any tier executor uses the
1195 /// pointer, and must only touch the session through executor spin calls
1196 /// (the RMW backend serializes concurrent access through its own locks).
1197 pub fn session_ptr(&mut self) -> *mut session::ConcreteSession {
1198 &mut *self.session as *mut session::ConcreteSession
1199 }
1200
1201 /// Opaque, `Send` form of [`session_ptr`](Self::session_ptr) — the per-tier
1202 /// model hands this to each spawned tier task (it can cross the RTOS task /
1203 /// thread boundary, which a bare `*mut` cannot). See [`SessionHandle`].
1204 ///
1205 /// # Safety
1206 /// Same contract as [`session_ptr`](Self::session_ptr): `self` (the session
1207 /// owner) must outlive every executor built from the handle.
1208 pub fn session_handle(&mut self) -> SessionHandle {
1209 SessionHandle(self.session_ptr())
1210 }
1211
1212 /// Open an [`Executor`] over the session a [`SessionHandle`] refers to (the
1213 /// `Borrowed` store — neither owns nor closes it). The tier-task counterpart
1214 /// to [`session_handle`](Self::session_handle).
1215 ///
1216 /// # Safety
1217 /// The handle's session must still be alive (its owning executor not moved
1218 /// or dropped); access only through executor spin calls.
1219 pub unsafe fn open_with_session_handle(handle: SessionHandle) -> Self {
1220 unsafe { Self::open_with_session(handle.0) }
1221 }
1222
1223 /// Phase 228.C — set this tier executor's active callback-group filter. The
1224 /// generated per-tier task calls this before registering nodes; afterwards
1225 /// only callbacks whose `.callback_group()` is in `groups` register here.
1226 /// An empty slice (or never calling it) leaves the wildcard — register all
1227 /// callbacks (the single-tier degenerate case + today's behaviour).
1228 pub fn set_active_groups(&mut self, groups: &[&str]) {
1229 if groups.is_empty() {
1230 self.active_groups = None;
1231 return;
1232 }
1233 let mut v = heapless::Vec::new();
1234 for g in groups {
1235 let mut s = heapless::String::new();
1236 if s.push_str(g).is_ok() {
1237 let _ = v.push(s);
1238 }
1239 }
1240 self.active_groups = Some(v);
1241 }
1242
1243 /// Phase 228.C — whether a callback in `group` should register in this
1244 /// executor under the current filter. Wildcard (`None`) accepts everything.
1245 pub fn group_active(&self, group: &str) -> bool {
1246 group_filter_accepts(&self.active_groups, group)
1247 }
1248
1249 /// Set the node name and namespace used for liveliness tokens.
1250 ///
1251 /// Called by `open()` to propagate config values. When `register_subscription`
1252 /// or `register_service` creates entities, these values are attached to the
1253 /// Phase 156 — record the primary session's backend identity
1254 /// (rmw name + locator) so `NodeBuilder::resolve_session_slot`
1255 /// can detect when a `.rmw(name)` matches the primary instead
1256 /// of opening a SECOND backend session against the same
1257 /// singleton (zenoh-pico's `g_session` is process-wide;
1258 /// opening twice fails). `Executor::open*` calls this
1259 /// automatically; the C surface (`nros_executor_init`) calls
1260 /// it manually because it constructs via `from_session_ptr`
1261 /// which doesn't know the open metadata. Empty strings = "no
1262 /// primary identity tracked"; the cache check degrades to
1263 /// always-miss.
1264 pub fn set_primary_identity(&mut self, rmw_name: &str, locator: &str) {
1265 self.primary_rmw_name.clear();
1266 let _ = self.primary_rmw_name.push_str(rmw_name);
1267 self.primary_locator.clear();
1268 let _ = self.primary_locator.push_str(locator);
1269 }
1270
1271 /// `TopicInfo`/`ServiceInfo` so the zenoh backend can declare liveliness.
1272 pub fn set_node_identity(&mut self, node_name: &str, namespace: &str) {
1273 self.node_name.clear();
1274 let _ = self.node_name.push_str(node_name);
1275 if !namespace.is_empty() {
1276 self.namespace.clear();
1277 let _ = self.namespace.push_str(namespace);
1278 }
1279 }
1280
1281 // =========================================================================
1282 // Phase 110.B — SchedContext API
1283 // =========================================================================
1284
1285 /// Identifier of the auto-created default `Fifo`-class scheduling
1286 /// context. Every callback registered without an explicit
1287 /// [`bind_handle_to_sched_context`] binds to this SC.
1288 pub fn default_sched_context_id(&self) -> super::sched_context::SchedContextId {
1289 super::sched_context::SchedContextId(0)
1290 }
1291
1292 /// Register a new scheduling context. Returns a [`SchedContextId`]
1293 /// callers pass to [`bind_handle_to_sched_context`] to attach
1294 /// callbacks. Phase 110.B.
1295 pub fn create_sched_context(
1296 &mut self,
1297 sc: super::sched_context::SchedContext,
1298 ) -> Result<super::sched_context::SchedContextId, NodeError> {
1299 // Slot 0 is reserved for the default Fifo SC; search 1..MAX_SC.
1300 for (i, slot) in self.sched_contexts.iter_mut().enumerate().skip(1) {
1301 if slot.is_none() {
1302 *slot = Some(sc);
1303 // Phase 110.E — Sporadic-class SCs get a sibling
1304 // `SporadicState` entry that the spin_once dispatch
1305 // path consults each cycle to refill the budget at
1306 // period boundaries and skip dispatch when budget
1307 // is exhausted.
1308 if matches!(sc.class, super::sched_context::SchedClass::Sporadic) {
1309 let budget = sc.budget_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1310 let period = sc.period_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1311 self.sporadic_states[i] =
1312 Some(super::sched_context::SporadicState::new(budget, period));
1313 }
1314 return Ok(super::sched_context::SchedContextId(i as u8));
1315 }
1316 }
1317 Err(NodeError::NoSchedContextSlot)
1318 }
1319
1320 /// Bind a registered callback to a scheduling context. The next
1321 /// `spin_once` cycle dispatches the callback through that SC's
1322 /// queue (FIFO bitmap or EDF heap). Phase 110.B.
1323 pub fn bind_handle_to_sched_context(
1324 &mut self,
1325 handle: HandleId,
1326 sc_id: super::sched_context::SchedContextId,
1327 ) -> Result<(), NodeError> {
1328 let i = handle.0;
1329 if i >= crate::config::MAX_CBS {
1330 return Err(NodeError::InvalidSchedContextBinding);
1331 }
1332 if self.entries[i].is_none() {
1333 return Err(NodeError::InvalidSchedContextBinding);
1334 }
1335 let sc_idx = sc_id.0 as usize;
1336 if sc_idx >= crate::config::MAX_SC || self.sched_contexts[sc_idx].is_none() {
1337 return Err(NodeError::InvalidSchedContextBinding);
1338 }
1339 self.sched_context_bindings[i] = sc_id;
1340 Ok(())
1341 }
1342
1343 /// Phase 110.F — opt in to per-callback OS-priority dispatch.
1344 /// Once registered, every `spin_once` cycle routes ready entries
1345 /// whose bound SC has `os_pri > 0` onto a worker thread the OS
1346 /// scheduler has elevated to that numeric priority. Workers are
1347 /// spawned lazily on first use and self-halt when the Executor
1348 /// drops.
1349 ///
1350 /// `apply_policy` is the same `fn(SchedPolicy) -> Result<(),
1351 /// SchedError>` shape `open_threaded` takes — keeps the
1352 /// Executor non-generic over Platform.
1353 ///
1354 /// Calling this with `apply_policy = noop` is fine for testing
1355 /// (workers spawn but don't actually elevate priority); real
1356 /// hard-RT use needs `CAP_SYS_NICE` on Linux or the equivalent
1357 /// kernel config on RTOSes.
1358 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1359 pub fn register_os_priority_dispatcher(
1360 &mut self,
1361 apply_policy: fn(
1362 nros_platform_api::SchedPolicy,
1363 ) -> Result<(), nros_platform_api::SchedError>,
1364 ) {
1365 self.os_priority_apply_policy = Some(apply_policy);
1366 }
1367
1368 /// Phase 110.G — enable time-triggered dispatch by setting the
1369 /// executor's major-frame length. Once set, every `spin_once`
1370 /// cycle gates dispatch through each entry's bound SC's
1371 /// `tt_window_offset_us` / `tt_window_duration_us` fields:
1372 /// dispatch only fires when the current monotonic time falls
1373 /// inside the window `[off, off + duration) mod major_frame`.
1374 ///
1375 /// `major_frame_us = 0` disables the TT gate (default state).
1376 /// Setting a non-zero major frame after callbacks are already
1377 /// registered is allowed — TT gates take effect on the next
1378 /// `spin_once` cycle.
1379 pub fn register_time_triggered_dispatcher(&mut self, major_frame_us: u32) {
1380 self.major_frame_us = major_frame_us;
1381 }
1382
1383 /// Phase 110.G — apply a declarative cyclic schedule.
1384 ///
1385 /// One-shot helper that wraps the underlying primitives:
1386 /// validates the schedule (`major_frame > 0`, no overlapping
1387 /// windows, every window fits inside the major frame), sets the
1388 /// executor's major-frame length, then materialises one
1389 /// `SchedContext` per window with `class = TimeTriggered` +
1390 /// the window's offset / duration. Returns the per-window
1391 /// [`SchedContextId`] array so callers can immediately
1392 /// `bind_handle_to_sched_context(handle, sc_id)` for their
1393 /// subscription / timer handles.
1394 ///
1395 /// `N` is the schedule's *declared* maximum window count;
1396 /// `schedule.window_count` gates how many SCs are actually
1397 /// created. Unused trailing slots return
1398 /// `SchedContextId::default()` (sentinel — callers must respect
1399 /// `window_count`).
1400 pub fn apply_time_triggered_schedule<const N: usize>(
1401 &mut self,
1402 schedule: &super::sched_context::TimeTriggeredSchedule<N>,
1403 ) -> Result<
1404 [super::sched_context::SchedContextId; N],
1405 super::sched_context::TimeTriggeredScheduleError,
1406 > {
1407 schedule.validate()?;
1408 self.major_frame_us = schedule.major_frame_us;
1409 // SC slot 0 is the auto-created default; reusing it as a
1410 // sentinel for unused trailing slots is safe because the
1411 // caller respects `schedule.window_count`.
1412 let mut ids: [super::sched_context::SchedContextId; N] =
1413 [super::sched_context::SchedContextId(0); N];
1414 for (i, window) in schedule.windows[..schedule.window_count].iter().enumerate() {
1415 // Deprecation note on `SchedClass::TimeTriggered`: TT
1416 // is implemented as a per-SC *window gate* on top of
1417 // the existing class-based dispatch (Fifo here keeps
1418 // the EDF / Sporadic budgets out of the picture for
1419 // pure cyclic schedules). The window-gate fields set
1420 // below are what `spin_once`'s 110.G runtime gate
1421 // actually reads.
1422 let sc = super::sched_context::SchedContext {
1423 tt_window_offset_us: super::sched_context::OptUs::from_us(window.offset_us),
1424 tt_window_duration_us: super::sched_context::OptUs::from_us(window.duration_us),
1425 ..super::sched_context::SchedContext::new_fifo()
1426 };
1427 ids[i] = self.create_sched_context(sc).map_err(|_| {
1428 super::sched_context::TimeTriggeredScheduleError::WindowCountOverflow
1429 })?;
1430 }
1431 Ok(ids)
1432 }
1433
1434 /// Phase 110.E.b — register an ISR-driven refill timer for an
1435 /// already-created Sporadic SC. The caller invokes their
1436 /// platform's `PlatformTimer::create_periodic` with the returned
1437 /// `Arc<AtomicSporadicState>` as `user_data` and the
1438 /// `atomic_sporadic_refill_thunk` as the callback, then hands
1439 /// the resulting platform handle to this method via
1440 /// `OpaqueTimerHandle::new(handle, destroy_fn)`.
1441 ///
1442 /// The Executor stores both the Arc and the handle so Drop can
1443 /// clean them up. Calling this on a non-Sporadic SC returns
1444 /// `Err(InvalidSchedContextBinding)`.
1445 #[cfg(feature = "alloc")]
1446 pub fn register_sporadic_timer(
1447 &mut self,
1448 sc_id: super::sched_context::SchedContextId,
1449 timer: OpaqueTimerHandle,
1450 ) -> Result<portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>, NodeError>
1451 {
1452 let i = sc_id.0 as usize;
1453 if i >= crate::config::MAX_SC {
1454 return Err(NodeError::InvalidSchedContextBinding);
1455 }
1456 let sc = self.sched_contexts[i]
1457 .as_ref()
1458 .ok_or(NodeError::InvalidSchedContextBinding)?;
1459 if !matches!(sc.class, super::sched_context::SchedClass::Sporadic) {
1460 return Err(NodeError::InvalidSchedContextBinding);
1461 }
1462 let budget = sc.budget_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1463 let period = sc.period_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1464 let state = portable_atomic_util::Arc::new(super::sched_context::AtomicSporadicState::new(
1465 budget, period,
1466 ));
1467 self.sporadic_atomic_states[i] = Some((portable_atomic_util::Arc::clone(&state), timer));
1468 Ok(state)
1469 }
1470
1471 /// Inspect a registered scheduling context. Phase 110.B.
1472 pub fn sched_context(
1473 &self,
1474 sc_id: super::sched_context::SchedContextId,
1475 ) -> Option<&super::sched_context::SchedContext> {
1476 self.sched_contexts.get(sc_id.0 as usize)?.as_ref()
1477 }
1478
1479 /// Phase 104.C.2 — start a rclcpp-style Node builder for this
1480 /// Executor. The returned [`NodeBuilder`](super::node_record::NodeBuilder)
1481 /// is chainable:
1482 ///
1483 /// ```ignore
1484 /// let id = exec.node_builder("ingress")
1485 /// .rmw("zenoh")
1486 /// .locator("tcp/127.0.0.1:7447")
1487 /// .sched(my_sc_id)
1488 /// .build()?;
1489 /// ```
1490 ///
1491 /// In Phase 104.C.2 the Node table is storage-only — all
1492 /// registered Nodes share the Executor's primary session. Per-
1493 /// Node session binding (the bridge feature) lands in Phase
1494 /// 104.C.3 when the session cache is wired.
1495 pub fn node_builder<'a, 'cfg>(
1496 &'a mut self,
1497 name: &'cfg str,
1498 ) -> super::node_record::NodeBuilder<'a, 'cfg> {
1499 super::node_record::NodeBuilder {
1500 executor: self,
1501 name,
1502 namespace: None,
1503 rmw_name: None,
1504 locator: None,
1505 domain_id: None,
1506 sched: None,
1507 session_idx: None,
1508 }
1509 }
1510
1511 /// Return the Node table — Phase 104.C.2 read accessor.
1512 pub fn nodes(&self) -> &[super::node_record::NodeRecord] {
1513 &self.nodes
1514 }
1515
1516 /// Borrow a Node's metadata by id, returning `None` if the id
1517 /// is out of range.
1518 pub fn node(&self, id: super::node_record::NodeId) -> Option<&super::node_record::NodeRecord> {
1519 self.nodes.get(id.index())
1520 }
1521
1522 /// Phase 189.M1 — an executor-borrowing node handle for the entity builders
1523 /// (`exec.node_mut(id).subscription(t)...` / `.create_subscription(...)`).
1524 /// A short-lived `&mut Executor` borrow — use one at a time; entity handles
1525 /// are owned and outlive it (see `NodeCtx`).
1526 pub fn node_mut(&mut self, id: super::node_record::NodeId) -> super::node::NodeCtx<'_> {
1527 super::node::NodeCtx::new(self, id)
1528 }
1529
1530 /// Phase 104.C.3 — resolve a session-slot index to a mutable
1531 /// session reference. Slot 0 = the Executor's primary session;
1532 /// slots 1..=N = the `extra_sessions` vec opened by
1533 /// `node_builder.rmw(name)` calls that named a backend
1534 /// different from the primary.
1535 pub(crate) fn session_at_mut(&mut self, idx: u8) -> Option<&mut session::ConcreteSession> {
1536 if idx == 0 {
1537 Some(&mut *self.session)
1538 } else {
1539 self.extra_sessions.get_mut((idx - 1) as usize)
1540 }
1541 }
1542
1543 /// Phase 104.C.9.b — resolve the per-Node session for direct
1544 /// entity creation paths (C++ FFI publisher / subscription /
1545 /// service that bypass the `register_*_on` arena dispatch).
1546 /// Returns `None` when `node_id` is out of range or the Node's
1547 /// `session_idx` lands outside the executor's session table.
1548 pub fn node_session_mut(
1549 &mut self,
1550 node_id: super::node_record::NodeId,
1551 ) -> Option<&mut session::ConcreteSession> {
1552 let session_idx = self.nodes.get(node_id.index())?.session_idx;
1553 self.session_at_mut(session_idx)
1554 }
1555
1556 /// Phase 189.M1 — create a typed publisher bound to a node's session.
1557 /// Backs `node.publisher(t).typed::<M>().build()` on the
1558 /// executor-borrowing [`NodeCtx`](super::node::NodeCtx); the returned
1559 /// handle is owned and outlives the `NodeCtx`.
1560 pub fn create_publisher_on<M: crate::cyclonedds_register::MessageForRmw>(
1561 &mut self,
1562 node_id: super::node_record::NodeId,
1563 topic_name: &str,
1564 qos: QosSettings,
1565 ) -> Result<crate::executor::handles::EmbeddedPublisher<M>, NodeError> {
1566 // Phase 212.K.7.6.b — register `M`'s cyclonedds descriptor before
1567 // creating the underlying publisher handle. No-op for other RMWs.
1568 crate::cyclonedds_register::register_type::<M>()?;
1569 let handle = self.create_raw_publisher_handle_on(
1570 node_id,
1571 topic_name,
1572 <M as RosMessage>::TYPE_NAME,
1573 <M as RosMessage>::TYPE_HASH,
1574 qos,
1575 )?;
1576 Ok(crate::executor::handles::EmbeddedPublisher {
1577 handle,
1578 event_regs: crate::executor::handles::empty_event_regs(),
1579 _phantom: PhantomData,
1580 })
1581 }
1582
1583 /// Phase 189.M1 — create a generic (type-erased) publisher bound to a
1584 /// node's session. Backs `node.publisher(t).generic(ty, hash).build()`;
1585 /// the bridge re-publishes through this handle on the dest session.
1586 pub fn create_publisher_raw_on(
1587 &mut self,
1588 node_id: super::node_record::NodeId,
1589 topic_name: &str,
1590 type_name: &str,
1591 type_hash: &str,
1592 qos: QosSettings,
1593 ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1594 let handle =
1595 self.create_raw_publisher_handle_on(node_id, topic_name, type_name, type_hash, qos)?;
1596 Ok(crate::executor::handles::EmbeddedRawPublisher {
1597 handle,
1598 arena: crate::executor::handles::TxArena::new(),
1599 event_regs: crate::executor::handles::empty_event_regs(),
1600 })
1601 }
1602
1603 /// Shared prelude for the publisher-on-node paths: resolve the node's
1604 /// identity + session slot, build the [`TopicInfo`], validate QoS, and
1605 /// create the backend publisher handle. Mirrors
1606 /// `register_subscription_buffered_raw_on`'s session resolution so a
1607 /// bridge's source sub + dest pub agree on topic construction.
1608 fn create_raw_publisher_handle_on(
1609 &mut self,
1610 node_id: super::node_record::NodeId,
1611 topic_name: &str,
1612 type_name: &str,
1613 type_hash: &str,
1614 qos: QosSettings,
1615 ) -> Result<session::RmwPublisher, NodeError> {
1616 let (node_name, ns, session_idx) = {
1617 let r = self
1618 .nodes
1619 .get(node_id.index())
1620 .ok_or(NodeError::InvalidSchedContextBinding)?;
1621 (r.name.clone(), r.namespace.clone(), r.session_idx)
1622 };
1623 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
1624 if !node_name.is_empty() {
1625 topic = topic.with_node_name(&node_name);
1626 }
1627 let session = self
1628 .session_at_mut(session_idx)
1629 .ok_or(NodeError::BackendMismatch)?;
1630 qos.validate_against(Session::supported_qos_policies(session))
1631 .map_err(NodeError::Transport)?;
1632 session
1633 .create_publisher(&topic, qos)
1634 .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))
1635 }
1636
1637 /// Phase 124.B.1 — install the executor's wake callback onto the
1638 /// primary session. Best-effort: backends that don't override
1639 /// `Session::set_wake_callback` (poll-only XRCE, bare-metal)
1640 /// ignore the call and continue to be drained on the executor's
1641 /// deadline-bound cv-wait boundary.
1642 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1643 fn install_wake_signal_on_primary(&mut self) {
1644 use nros_rmw::Session as _;
1645 let ctx = self.wake_ctx_ptr();
1646 // SAFETY: `ctx` points at executor-owned wake state that outlives
1647 // the session callback installation and is cleared on executor drop.
1648 unsafe {
1649 self.session
1650 .set_wake_callback(Some(nros_rmw_runtime_wake_cb), ctx);
1651 }
1652 if self.session.supports_wake_callback() {
1653 self.has_async_wake = true;
1654 }
1655 }
1656
1657 /// Phase 124.B.1 — install the wake callback onto an extra
1658 /// session opened by `node_builder.rmw(...)`. Called from
1659 /// `NodeBuilder::build()` right after `extra_sessions.push(...)`.
1660 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1661 pub(crate) fn install_wake_signal_on_extra(&mut self, idx: usize) {
1662 use nros_rmw::Session as _;
1663 let ctx = self.wake_ctx_ptr();
1664 if let Some(s) = self.extra_sessions.get_mut(idx) {
1665 // SAFETY: same executor-owned wake state as the primary session;
1666 // the extra session is owned by this executor.
1667 unsafe {
1668 s.set_wake_callback(Some(nros_rmw_runtime_wake_cb), ctx);
1669 }
1670 if s.supports_wake_callback() {
1671 self.has_async_wake = true;
1672 }
1673 }
1674 }
1675
1676 /// Phase 124.B.2 — opaque context pointer the runtime wake
1677 /// callback receives. Encodes `(flag, mu, cv)` as a borrowed
1678 /// `&WakeCtx` reference; the callback decodes via
1679 /// `*const WakeCtx`.
1680 ///
1681 /// Lifetime: tied to the Executor instance. WakeCtx storage
1682 /// lives inside Executor (lazy-allocated on first install), so
1683 /// the pointer stays valid as long as the Executor is.
1684 /// Phase 124.B.7.c — POSIX signal-handler-safe wake fd.
1685 ///
1686 /// Returns a Linux `eventfd` that callers (typically POSIX
1687 /// signal handlers) can `write(fd, &1u64, 8)` to from any
1688 /// context, including signal handlers. A runtime-owned worker
1689 /// thread reads the fd and signals `wake_cv`, unblocking
1690 /// `spin_once`.
1691 ///
1692 /// The worker thread is spawned lazily on first call and
1693 /// joined on Executor drop. Linux-only and gated behind
1694 /// `feature = "signal-fd-wake"`; binaries that don't install
1695 /// signal handlers shouldn't enable it.
1696 ///
1697 /// Returns the raw fd. The Executor retains ownership; do not
1698 /// `close()` it from the caller.
1699 #[cfg(all(feature = "signal-fd-wake", feature = "rmw-cffi", target_os = "linux"))]
1700 pub fn signal_fd(&mut self) -> std::io::Result<core::ffi::c_int> {
1701 let ctx_ptr = self.wake_ctx_ptr() as *const WakeCtx;
1702 if self.signal_fd.is_none() {
1703 self.signal_fd = Some(WakeSignalFd::new(ctx_ptr)?);
1704 }
1705 Ok(self.signal_fd.as_ref().expect("just set").fd())
1706 }
1707
1708 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1709 fn wake_ctx_ptr(&mut self) -> *mut core::ffi::c_void {
1710 if self.wake_ctx.is_none() {
1711 self.wake_ctx = Some(std::sync::Arc::new(WakeCtx {
1712 flag: std::sync::Arc::clone(&self.wake_flag),
1713 cv: std::sync::Arc::clone(&self.wake_cv),
1714 mu: std::sync::Arc::clone(&self.wake_mu),
1715 node_wake: self.node_wake.as_ref().map(std::sync::Arc::clone),
1716 }));
1717 }
1718 let arc = self.wake_ctx.as_ref().expect("just set");
1719 std::sync::Arc::as_ptr(arc) as *mut core::ffi::c_void
1720 }
1721
1722 // Phase 141.A.3 — alloc-mode (no_std RTOS) mirror of
1723 // `install_wake_signal_on_primary` /
1724 // `install_wake_signal_on_extra` / `wake_ctx_ptr`. Same
1725 // best-effort install contract: backends that don't override
1726 // `Session::set_wake_callback` ignore the call and continue
1727 // to be drained on the executor's deadline-bound wait.
1728 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1729 fn wake_ctx_alloc_ptr(&mut self) -> Option<*mut core::ffi::c_void> {
1730 // Without a NodeWake there's no kernel primitive to signal;
1731 // skip the install + let spin_once fall back to drive_io
1732 // for the full timeout. Mirrors the std-RTOS path's
1733 // `if let Some(wake) = self.node_wake.as_ref()` predicate.
1734 let node_wake = self.node_wake_alloc.as_ref()?;
1735 if self.wake_ctx_alloc.is_none() {
1736 self.wake_ctx_alloc = Some(portable_atomic_util::Arc::new(
1737 super::wake_alloc::WakeCtxAlloc {
1738 flag: portable_atomic_util::Arc::clone(&self.wake_flag_alloc),
1739 node_wake: portable_atomic_util::Arc::clone(node_wake),
1740 },
1741 ));
1742 }
1743 let arc = self.wake_ctx_alloc.as_ref().expect("just set");
1744 Some(portable_atomic_util::Arc::as_ptr(arc) as *mut core::ffi::c_void)
1745 }
1746
1747 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1748 fn install_wake_signal_on_primary_alloc(&mut self) {
1749 use nros_rmw::Session as _;
1750 let Some(ctx) = self.wake_ctx_alloc_ptr() else {
1751 return;
1752 };
1753 // SAFETY: `ctx` is the raw pointer of an Arc<WakeCtxAlloc>
1754 // owned by the Executor (`self.wake_ctx_alloc`); the Arc
1755 // lives as long as the Executor and is cleared on drop.
1756 unsafe {
1757 self.session
1758 .set_wake_callback(Some(super::wake_alloc::nros_rmw_runtime_wake_cb), ctx);
1759 }
1760 if self.session.supports_wake_callback() {
1761 self.has_async_wake_alloc = true;
1762 }
1763 }
1764
1765 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1766 pub(crate) fn install_wake_signal_on_extra_alloc(&mut self, idx: usize) {
1767 use nros_rmw::Session as _;
1768 let Some(ctx) = self.wake_ctx_alloc_ptr() else {
1769 return;
1770 };
1771 if let Some(s) = self.extra_sessions.get_mut(idx) {
1772 // SAFETY: same wake state as the primary session; the
1773 // extra session is owned by this Executor.
1774 unsafe {
1775 s.set_wake_callback(Some(super::wake_alloc::nros_rmw_runtime_wake_cb), ctx);
1776 }
1777 if s.supports_wake_callback() {
1778 self.has_async_wake_alloc = true;
1779 }
1780 }
1781 }
1782
1783 /// Phase 104.C.4 — apply a Node's default SchedContext to a
1784 /// freshly-registered handle. Called from every `_inner`
1785 /// register variant after the entry slot is committed. No-op
1786 /// when `node_id` is None (legacy path), when the Node is
1787 /// out of range, or when the Node's `default_sched` is the
1788 /// auto-created Fifo slot (0) which matches the executor's
1789 /// default binding already.
1790 ///
1791 /// Handles can still override per-call via
1792 /// `bind_handle_to_sched_context(handle, sc_id)` post-register.
1793 pub(crate) fn apply_node_default_sched(
1794 &mut self,
1795 slot: usize,
1796 node_id: Option<super::node_record::NodeId>,
1797 ) {
1798 let Some(id) = node_id else { return };
1799 let Some(rec) = self.nodes.get(id.index()) else {
1800 return;
1801 };
1802 let sc = rec.default_sched;
1803 if sc.0 == 0 {
1804 return;
1805 }
1806 if slot >= crate::config::MAX_CBS {
1807 return;
1808 }
1809 let sc_idx = sc.0 as usize;
1810 if sc_idx >= crate::config::MAX_SC || self.sched_contexts[sc_idx].is_none() {
1811 return;
1812 }
1813 self.sched_context_bindings[slot] = sc;
1814 }
1815
1816 /// Phase 104.C.3.2 — scoped Node-handle access. The closure
1817 /// receives a [`Node`] bound to the requested [`NodeId`]'s
1818 /// session + identity. Use the standard `Node::create_publisher`,
1819 /// `create_subscription`, etc. APIs inside.
1820 ///
1821 /// rclcpp-aligned bridge pattern:
1822 ///
1823 /// ```ignore
1824 /// let node_in = exec.node_builder("ingress").rmw("zenoh").build()?;
1825 /// let node_out = exec.node_builder("egress").rmw("xrce").build()?;
1826 ///
1827 /// let pub_out = exec.with_node(node_out, |n| {
1828 /// n.create_publisher::<Int32>("/fwd")
1829 /// })??;
1830 ///
1831 /// exec.with_node(node_in, |n| {
1832 /// n.create_subscription_buffered::<Int32, _, 1024>(
1833 /// "/src", qos(), move |m| { let _ = pub_out.publish(m); }
1834 /// )
1835 /// })??;
1836 /// ```
1837 ///
1838 /// The closure can return any type; double-`?` unwraps the
1839 /// outer `Result<R, NodeError>` from `with_node` and the inner
1840 /// result returned by the closure.
1841 /// Phase 104.C.3.3.d — flat-Result variant of
1842 /// [`with_node`](Self::with_node). When the closure already
1843 /// returns `Result<R, NodeError>`, this avoids the double-`?`:
1844 ///
1845 /// ```ignore
1846 /// // Without `with_node_try`:
1847 /// let pub_ = exec.with_node(id, |n| n.create_publisher(...))??;
1848 ///
1849 /// // With `with_node_try`:
1850 /// let pub_ = exec.with_node_try(id, |n| n.create_publisher(...))?;
1851 /// ```
1852 pub fn with_node_try<R>(
1853 &mut self,
1854 id: super::node_record::NodeId,
1855 f: impl FnOnce(&mut NodeHandle<'_>) -> Result<R, NodeError>,
1856 ) -> Result<R, NodeError> {
1857 self.with_node(id, f)?
1858 }
1859
1860 pub fn with_node<R>(
1861 &mut self,
1862 id: super::node_record::NodeId,
1863 f: impl FnOnce(&mut NodeHandle<'_>) -> R,
1864 ) -> Result<R, NodeError> {
1865 let (name, ns, session_idx) = {
1866 let r = self
1867 .nodes
1868 .get(id.index())
1869 .ok_or(NodeError::InvalidSchedContextBinding)?;
1870 (r.name.clone(), r.namespace.clone(), r.session_idx)
1871 };
1872 let session = self
1873 .session_at_mut(session_idx)
1874 .ok_or(NodeError::BackendMismatch)?;
1875 // SAFETY: short-lived scoped reference. `Node::new` takes
1876 // `&mut ConcreteSession`; lifetime is bound to this fn's
1877 // body via the closure's borrow of `node`.
1878 let mut node = NodeHandle::new(name, ns, session, 0);
1879 Ok(f(&mut node))
1880 }
1881
1882 /// Find a registered executor node by final name and namespace.
1883 pub fn node_id_by_name(
1884 &self,
1885 name: &str,
1886 namespace: &str,
1887 ) -> Option<super::node_record::NodeId> {
1888 self.nodes
1889 .iter()
1890 .enumerate()
1891 .find(|(_, node)| node.name.as_str() == name && node.namespace.as_str() == namespace)
1892 .map(|(index, _)| super::node_record::NodeId::from_raw(index as u8))
1893 }
1894
1895 /// Create a node on this executor.
1896 pub fn create_node(&mut self, name: &str) -> Result<NodeHandle<'_>, NodeError> {
1897 if name.len() > 64 {
1898 return Err(NodeError::NameTooLong);
1899 }
1900
1901 let mut node_name = heapless::String::<64>::new();
1902 node_name
1903 .push_str(name)
1904 .map_err(|_| NodeError::NameTooLong)?;
1905
1906 Ok(NodeHandle::new(
1907 node_name,
1908 self.namespace.clone(),
1909 &mut self.session,
1910 0,
1911 ))
1912 }
1913
1914 /// Phase 128.F.2 — bridge-mode node factory. Registers a Node
1915 /// bound to the named RMW backend by opening (or reusing) an
1916 /// extra session via `node_builder().rmw(rmw).build()`, then
1917 /// returns a [`Node`] borrowing that session. Use when the
1918 /// binary intentionally links more than one backend and a Node
1919 /// must speak a specific one.
1920 ///
1921 /// The single-backend common case should keep using
1922 /// [`create_node`](Self::create_node) — this entry costs an
1923 /// extra session lookup and serves no purpose when only one
1924 /// backend is registered.
1925 #[cfg(feature = "rmw-cffi")]
1926 pub fn create_node_on(&mut self, name: &str, rmw: &str) -> Result<NodeHandle<'_>, NodeError> {
1927 if name.len() > 64 {
1928 return Err(NodeError::NameTooLong);
1929 }
1930 // Register the Node (opens an extra session under `rmw` if
1931 // none exists yet for that backend).
1932 let id = self.node_builder(name).rmw(rmw).build()?;
1933 let session_idx = self.node(id).ok_or(NodeError::NodeTableFull)?.session_idx;
1934
1935 let mut node_name = heapless::String::<64>::new();
1936 node_name
1937 .push_str(name)
1938 .map_err(|_| NodeError::NameTooLong)?;
1939 let namespace = self.namespace.clone();
1940 let session = self
1941 .session_at_mut(session_idx)
1942 .ok_or(NodeError::NodeTableFull)?;
1943 Ok(NodeHandle::new(node_name, namespace, session, 0))
1944 }
1945
1946 /// Drive transport I/O (poll network, dispatch callbacks).
1947 #[allow(dead_code)]
1948 pub(crate) fn drive_io(&mut self, timeout_ms: i32) -> Result<(), NodeError> {
1949 self.session
1950 .drive_io(timeout_ms)
1951 .map_err(|_| NodeError::Transport(TransportError::PollFailed))
1952 }
1953
1954 /// Close the underlying session.
1955 pub fn close(&mut self) -> Result<(), NodeError> {
1956 self.session
1957 .close()
1958 .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))
1959 }
1960
1961 /// Phase 216 follow-up — register a per-Node dispatch trampoline.
1962 ///
1963 /// The board-side Entry pkg (or the macro-emitted
1964 /// `register_dispatch(executor)` wrapper, once wired) calls this
1965 /// once per deployed Node pkg, handing in the
1966 /// `__nros_node_<pkg>_on_callback` symbol + the Node's per-pkg
1967 /// `state` blob. [`Executor::dispatch_callback`] then linear-scans
1968 /// the registered slots when the dispatch task hands off a
1969 /// `SignaledCallback`.
1970 ///
1971 /// Returns `Err(())` when the registry is full (`MAX_NODES`
1972 /// entries — raise via `NROS_EXECUTOR_MAX_NODES` at build time).
1973 ///
1974 /// # Safety
1975 ///
1976 /// `state` must outlive the executor (the typical shape is a
1977 /// `*mut State` produced by
1978 /// `nros::__private_node_state_into_raw` from the
1979 /// macro-emitted `i()`; that pointer's lifetime IS the
1980 /// `Executor`'s by construction). `on_callback` must be safe to
1981 /// invoke with `(state, cb_id_ptr, cb_id_len, ctx)` matching the
1982 /// per-Node `__nros_node_<pkg>_on_callback` ABI emitted by the
1983 /// `nros::node!()` macro (Phase 216.A.5).
1984 #[allow(clippy::result_unit_err)]
1985 pub fn register_dispatch_slot(
1986 &mut self,
1987 state: *mut core::ffi::c_void,
1988 on_callback: unsafe extern "C" fn(
1989 *mut core::ffi::c_void,
1990 *const u8,
1991 usize,
1992 *mut core::ffi::c_void,
1993 ),
1994 ) -> Result<(), ()> {
1995 self.dispatch_slots
1996 .push(DispatchSlot { state, on_callback })
1997 .map_err(|_| ())
1998 }
1999
2000 /// Phase 216 follow-up — current registered dispatch-slot count.
2001 /// Diagnostic / test surface.
2002 pub fn dispatch_slot_count(&self) -> usize {
2003 self.dispatch_slots.len()
2004 }
2005
2006 /// Phase 258 (Track 2, 2a) — enroll a component into the executor-owned
2007 /// tick registry. Called by `nros`'s `install`/`register_node_borrowed`
2008 /// after it builds the `Arc<ComponentCell>`: `state` is the leaked
2009 /// `Arc<ComponentCell>` (the slot takes ownership), `tick`/`drop` are the
2010 /// `nros`-side trampolines (see [`ComponentSlot`]). The slot's `tick`
2011 /// runs at the tail of every [`spin_once`](Self::spin_once); its `drop`
2012 /// runs once on `Executor::drop`.
2013 ///
2014 /// Returns `Err(())` when the registry is full (`MAX_NODES` — raise via
2015 /// `NROS_EXECUTOR_MAX_NODES` at build time). On error the caller still
2016 /// owns `state` (the slot was not stored) and must drop it.
2017 ///
2018 /// # Safety
2019 /// `state` must be a `*mut` produced by leaking the component cell the
2020 /// `tick`/`drop` trampolines expect (an `Arc<ComponentCell>` via
2021 /// `Arc::into_raw` in the canonical `nros` caller), and must remain valid
2022 /// until the matching `drop` runs. `tick` must be safe to invoke with
2023 /// `(state, exec_ctx = *mut Executor)` each spin; `drop` must be safe to
2024 /// invoke exactly once with `state`.
2025 #[allow(clippy::result_unit_err)]
2026 pub unsafe fn enroll_component(
2027 &mut self,
2028 state: *mut core::ffi::c_void,
2029 tick: unsafe extern "C" fn(*mut core::ffi::c_void, *mut core::ffi::c_void),
2030 drop: unsafe extern "C" fn(*mut core::ffi::c_void),
2031 ) -> Result<(), ()> {
2032 self.component_slots
2033 .push(ComponentSlot { state, tick, drop })
2034 .map_err(|_| ())
2035 }
2036
2037 /// Phase 258 (Track 2, 2a) — current enrolled component-slot count.
2038 /// Diagnostic / test surface.
2039 pub fn component_slot_count(&self) -> usize {
2040 self.component_slots.len()
2041 }
2042
2043 /// Phase 216 final dispatch hook — stable entry point the
2044 /// framework's dispatch task (RTIC `__nros_run` /
2045 /// Embassy `__nros_run_task`) calls for each `SignaledCallback`
2046 /// envelope it dequeues from the board-side SPSC / Embassy
2047 /// channel.
2048 ///
2049 /// ## Signature shape
2050 ///
2051 /// `nros-node` sits below `nros` in the dep graph, so the typed
2052 /// `nros::CallbackId<'_>` / `nros::CallbackCtx<'_>` types
2053 /// referenced in the Phase 216 design notes cannot appear in the
2054 /// signature here. The macro emit translates the dequeued
2055 /// envelope to the layer-clean `(cb_id: &str, ctx: *mut c_void)`
2056 /// pair before calling this method; the per-Node `on_callback`
2057 /// trampoline ABI (Phase 216.A.5,
2058 /// `__nros_node_<pkg>_on_callback(state, cb_id_ptr, cb_id_len,
2059 /// ctx)`) uses the same untyped shape on the other side of the
2060 /// fence, so the round-trip stays type-consistent.
2061 ///
2062 /// ## Body — linear scan of the dispatch registry
2063 ///
2064 /// Each registered [`DispatchSlot`] holds an
2065 /// `__nros_node_<pkg>_on_callback` fn pointer + the owning Node's
2066 /// `state` blob. The macro-emitted trampoline body
2067 /// `match`es on `CallbackId` tags the Node declared and is a
2068 /// no-op for non-matching `cb_id`s — at most one Node per
2069 /// `cb_id` actually acts, the rest are cheap string-compare
2070 /// no-ops. This mirrors the strategy
2071 /// `ExecutorNodeRuntime::dispatch_callback` uses in
2072 /// `packages/core/nros/src/node_runtime.rs:470`.
2073 ///
2074 /// ## What's NOT auto-wired today
2075 ///
2076 /// The `nros::node!()` macro doesn't yet emit a
2077 /// `register_dispatch(executor)` wrapper that pushes the per-pkg
2078 /// `(state, on_callback)` into this registry. Until that wiring
2079 /// lands (Phase 216 follow-up — see commit msg), downstream
2080 /// consumers (board's `init_hardware`, or the codegen-emitted
2081 /// `run_plan`) must call
2082 /// [`Executor::register_dispatch_slot`] explicitly with the
2083 /// `__nros_node_<pkg>_on_callback` symbol + a `state` blob from
2084 /// the macro-emitted `i()`.
2085 //
2086 // `ctx` is an opaque FFI cookie forwarded verbatim to each slot's
2087 // `on_callback`; this fn never dereferences it (the registered callback
2088 // does, under the `register_dispatch_slot` safety contract), so it is sound
2089 // to call from safe code.
2090 #[allow(clippy::not_unsafe_ptr_arg_deref)]
2091 pub fn dispatch_callback(&mut self, cb_id: &str, ctx: *mut core::ffi::c_void) {
2092 let cb_id_ptr = cb_id.as_ptr();
2093 let cb_id_len = cb_id.len();
2094 // Snapshot pointer + length to avoid an outstanding borrow
2095 // across the unsafe fn calls below; each `DispatchSlot` is
2096 // `Copy`, so iterating by value sidesteps any aliasing
2097 // worry the borrow checker would flag if a slot's
2098 // `on_callback` re-entered the executor.
2099 for slot in self.dispatch_slots.iter().copied() {
2100 // SAFETY: caller of `register_dispatch_slot` guaranteed
2101 // `state` outlives the executor + `on_callback` matches
2102 // the per-Node `__nros_node_<pkg>_on_callback` ABI;
2103 // `cb_id_ptr`/`cb_id_len` describe the live `&str` the
2104 // caller passed in.
2105 unsafe {
2106 (slot.on_callback)(slot.state, cb_id_ptr, cb_id_len, ctx);
2107 }
2108 }
2109 }
2110
2111 /// Get a reference to the underlying session.
2112 pub fn session(&self) -> &session::ConcreteSession {
2113 &self.session
2114 }
2115
2116 /// Get a mutable reference to the underlying session.
2117 pub fn session_mut(&mut self) -> &mut session::ConcreteSession {
2118 &mut self.session
2119 }
2120
2121 /// Phase 124.F.3 — session-level connectivity probe. Wire-level
2122 /// round-trip "is the peer / agent / router still reachable?"
2123 /// — cheaper than the service-availability probe (no discovery
2124 /// state required).
2125 ///
2126 /// Returns `Ok(())` on reply within `timeout_ms`,
2127 /// `Err(NodeError::Transport(Timeout))` on no reply,
2128 /// `Err(NodeError::Transport(Unsupported))` when the active
2129 /// backend can't probe.
2130 ///
2131 /// Mirrors micro-ROS's `rmw_uros_ping_agent`. Useful for
2132 /// reconnect-on-link-loss patterns: bare-metal code can call
2133 /// `ping(100)` periodically and tear down / re-open the session
2134 /// on timeout.
2135 pub fn ping(&mut self, timeout_ms: i32) -> Result<(), NodeError> {
2136 use nros_rmw::Session;
2137 self.session
2138 .ping_session(timeout_ms)
2139 .map_err(NodeError::Transport)
2140 }
2141
2142 /// Get a mutable reference to an action client core in the arena by entry index.
2143 ///
2144 /// # Safety
2145 /// The caller must ensure that `entry_index` refers to an `ActionClientRawArenaEntry`.
2146 pub unsafe fn action_client_core_mut(
2147 &mut self,
2148 entry_index: usize,
2149 ) -> Option<&mut super::action_core::ActionClientCore> {
2150 let meta = self.entries.get(entry_index)?.as_ref()?;
2151 if !matches!(meta.kind, EntryKind::ActionClient) {
2152 return None;
2153 }
2154 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2155 unsafe {
2156 let entry_ptr = arena_ptr.add(meta.offset)
2157 as *mut super::arena::ActionClientRawArenaEntry<
2158 { crate::config::DEFAULT_RX_BUF_SIZE },
2159 { crate::config::DEFAULT_RX_BUF_SIZE },
2160 { crate::config::DEFAULT_RX_BUF_SIZE },
2161 >;
2162 Some(&mut (*entry_ptr).core)
2163 }
2164 }
2165
2166 /// Get a mutable reference to a service-client arena entry (Phase 82).
2167 ///
2168 /// Returns `None` if `entry_index` doesn't refer to a service client
2169 /// entry. The default reply buffer size is assumed because the C API
2170 /// always uses the default — the entry was registered via
2171 /// `register_service_client_raw_sized::<DEFAULT_RX_BUF_SIZE>`.
2172 ///
2173 /// # Safety
2174 /// `entry_index` must refer to a `ServiceClientRawArenaEntry`.
2175 pub unsafe fn service_client_entry_mut(
2176 &mut self,
2177 entry_index: usize,
2178 ) -> Option<&mut super::arena::ServiceClientRawArenaEntry<{ crate::config::DEFAULT_RX_BUF_SIZE }>>
2179 {
2180 let meta = self.entries.get(entry_index)?.as_ref()?;
2181 if !matches!(meta.kind, EntryKind::ServiceClient) {
2182 return None;
2183 }
2184 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2185 unsafe {
2186 let entry_ptr = arena_ptr.add(meta.offset)
2187 as *mut super::arena::ServiceClientRawArenaEntry<
2188 { crate::config::DEFAULT_RX_BUF_SIZE },
2189 >;
2190 Some(&mut *entry_ptr)
2191 }
2192 }
2193
2194 /// Set the executor-level trigger condition.
2195 ///
2196 /// Controls which handles must be ready before `spin_once` dispatches
2197 /// callbacks. Defaults to [`Trigger::AnyReady`](crate::Trigger).
2198 pub fn set_trigger(&mut self, trigger: Trigger) {
2199 self.trigger = trigger;
2200 }
2201
2202 /// Set the executor data communication semantics.
2203 ///
2204 /// Choose between `Direct` (process in place) and `LET`
2205 /// (snapshot-then-process) semantics. See [`ExecutorSemantics`].
2206 pub fn set_semantics(&mut self, semantics: ExecutorSemantics) {
2207 self.semantics = semantics;
2208 }
2209
2210 /// Set the invocation mode for a specific handle.
2211 ///
2212 /// Controls whether the callback fires on every spin
2213 /// ([`Always`](InvocationMode::Always)) or only when new data
2214 /// arrives ([`OnNewData`](InvocationMode::OnNewData), the default).
2215 pub fn set_invocation(&mut self, id: HandleId, mode: InvocationMode) {
2216 if let Some(Some(meta)) = self.entries.get_mut(id.0) {
2217 meta.invocation = mode;
2218 }
2219 }
2220
2221 // ========================================================================
2222 // Arena-based callback registration
2223 // ========================================================================
2224
2225 /// Bump-allocate space for `T` in the arena. Returns the byte offset.
2226 pub(crate) fn arena_alloc<T>(&mut self) -> Result<usize, NodeError> {
2227 let align = core::mem::align_of::<T>();
2228 let size = core::mem::size_of::<T>();
2229 let aligned_offset = (self.arena_used + align - 1) & !(align - 1);
2230 let new_used = aligned_offset + size;
2231 if new_used > crate::config::ARENA_SIZE {
2232 return Err(NodeError::BufferTooSmall);
2233 }
2234 self.arena_used = new_used;
2235 Ok(aligned_offset)
2236 }
2237
2238 /// Bump-allocate space for `T` plus `trailing_bytes` extra bytes.
2239 ///
2240 /// Returns `(entry_offset, trailing_offset)`. The trailing region starts
2241 /// immediately after `T` (aligned to 8 bytes).
2242 pub(crate) fn arena_alloc_with_trailing<T>(
2243 &mut self,
2244 trailing_bytes: usize,
2245 ) -> Result<(usize, usize), NodeError> {
2246 let align = core::mem::align_of::<T>();
2247 let entry_size = core::mem::size_of::<T>();
2248 let entry_offset = self.arena_used.next_multiple_of(align);
2249 // Trailing region starts on an 8-byte (u64) boundary after the entry.
2250 let trailing_offset =
2251 (entry_offset + entry_size).next_multiple_of(core::mem::align_of::<u64>());
2252 let new_used = trailing_offset + trailing_bytes;
2253 if new_used > crate::config::ARENA_SIZE {
2254 return Err(NodeError::BufferTooSmall);
2255 }
2256 self.arena_used = new_used;
2257 Ok((entry_offset, trailing_offset))
2258 }
2259
2260 /// Find the next free entry slot index.
2261 pub(crate) fn next_entry_slot(&self) -> Result<usize, NodeError> {
2262 self.entries
2263 .iter()
2264 .position(|e| e.is_none())
2265 .ok_or(NodeError::BufferTooSmall)
2266 }
2267
2268 /// Typed buffered subscription core (the `node_mut(id).subscription(t)
2269 /// .typed::<M>()` builder lowers here). Routes the typed subscription
2270 /// through the [`NodeId`]'s session + identity (rclcpp `add_node` pattern).
2271 pub(crate) fn register_subscription_buffered_on<M, F, const RX_BUF: usize>(
2272 &mut self,
2273 node_id: super::node_record::NodeId,
2274 topic_name: &str,
2275 qos: QosSettings,
2276 callback: F,
2277 ) -> Result<HandleId, NodeError>
2278 where
2279 M: crate::cyclonedds_register::MessageForRmw + 'static,
2280 F: FnMut(&M) + 'static,
2281 {
2282 type Entry<M, F> = SubBufferedEntry<M, F>;
2283
2284 // Phase 212.K.7.6.b — see `create_publisher_on`.
2285 crate::cyclonedds_register::register_type::<M>()?;
2286
2287 let slot = self.next_entry_slot()?;
2288 let (node_name, ns, session_idx) = {
2289 let r = self
2290 .nodes
2291 .get(node_id.index())
2292 .ok_or(NodeError::InvalidSchedContextBinding)?;
2293 (r.name.clone(), r.namespace.clone(), r.session_idx)
2294 };
2295 let mut topic = TopicInfo::new(
2296 topic_name,
2297 <M as RosMessage>::TYPE_NAME,
2298 <M as RosMessage>::TYPE_HASH,
2299 )
2300 .with_namespace(&ns)
2301 // Phase 231 (RFC-0038) — hand the backend the receive-buffer size so it
2302 // can size-class its receive storage (zenoh-pico: small vs large).
2303 .with_rx_buffer_hint(RX_BUF);
2304 if !node_name.is_empty() {
2305 topic = topic.with_node_name(&node_name);
2306 }
2307 let handle = {
2308 let session = self
2309 .session_at_mut(session_idx)
2310 .ok_or(NodeError::BackendMismatch)?;
2311 session
2312 .create_subscriber(&topic, qos)
2313 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2314 };
2315
2316 // Phase 231 Wave 0.2 (RFC-0038) — in-place dispatch when the backend
2317 // advertises it: deserialize straight from the borrowed receive slot,
2318 // no arena buffer (copy #1 removed). Else the buffered path below.
2319 {
2320 use nros_rmw::Subscriber as _;
2321 if handle.supports_process_in_place() {
2322 let entry_offset = self.arena_alloc::<SubInplaceEntry<M, F>>()?;
2323 unsafe {
2324 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2325 let entry_ptr = arena_ptr.add(entry_offset) as *mut SubInplaceEntry<M, F>;
2326 core::ptr::write(
2327 entry_ptr,
2328 SubInplaceEntry {
2329 handle,
2330 callback,
2331 _phantom: PhantomData,
2332 },
2333 );
2334 }
2335 self.entries[slot] = Some(CallbackMeta {
2336 offset: entry_offset,
2337 kind: EntryKind::Subscription,
2338 try_process: sub_inplace_try_process::<M, F>,
2339 has_data: sub_inplace_has_data::<M, F>,
2340 pre_sample: no_pre_sample,
2341 invocation: InvocationMode::OnNewData,
2342 drop_fn: drop_entry::<SubInplaceEntry<M, F>>,
2343 });
2344 self.apply_node_default_sched(slot, Some(node_id));
2345 return Ok(HandleId(slot));
2346 }
2347 }
2348
2349 let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2350
2351 let (entry_offset, trailing_offset) =
2352 self.arena_alloc_with_trailing::<Entry<M, F>>(trailing_bytes)?;
2353
2354 let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2355
2356 let buffer = if qos.depth <= 1 {
2357 BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
2358 } else {
2359 BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
2360 };
2361
2362 unsafe {
2363 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2364 let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<M, F>;
2365 core::ptr::write(
2366 entry_ptr,
2367 Entry {
2368 handle,
2369 buffer,
2370 callback,
2371 _phantom: PhantomData,
2372 },
2373 );
2374 }
2375
2376 self.entries[slot] = Some(CallbackMeta {
2377 offset: entry_offset,
2378 kind: EntryKind::Subscription,
2379 try_process: sub_buffered_try_process::<M, F>,
2380 has_data: sub_buffered_has_data::<M, F>,
2381 pre_sample: no_pre_sample,
2382 invocation: InvocationMode::OnNewData,
2383 drop_fn: drop_entry::<Entry<M, F>>,
2384 });
2385 // Phase 104.C.4 — apply Node's default SchedContext.
2386 self.apply_node_default_sched(slot, Some(node_id));
2387 Ok(HandleId(slot))
2388 }
2389
2390 /// Generic (type-erased) buffered subscription core (the
2391 /// `node_mut(id).subscription(t).generic(ty, hash)` builder lowers here).
2392 /// Routes the subscriber creation through the [`NodeId`]'s
2393 /// session + identity (rclcpp `add_node` pattern).
2394 ///
2395 /// Use this in bridge code where two Nodes bind to different RMW
2396 /// backends:
2397 ///
2398 /// ```ignore
2399 /// let node_in = exec.node_builder("ingress").rmw("zenoh").build()?;
2400 /// let pub_out = exec.with_node(node_out, |n| {
2401 /// n.create_publisher_raw("/fwd", TYPE, HASH)
2402 /// })??;
2403 /// exec.register_subscription_buffered_raw_on::<_, 1024>(
2404 /// node_in, "/src", TYPE, HASH, qos(),
2405 /// move |bytes: &[u8]| { let _ = pub_out.publish_raw(bytes); },
2406 /// )?;
2407 /// ```
2408 pub(crate) fn register_subscription_buffered_raw_on<F, const RX_BUF: usize>(
2409 &mut self,
2410 node_id: super::node_record::NodeId,
2411 topic_name: &str,
2412 type_name: &str,
2413 type_hash: &str,
2414 qos: QosSettings,
2415 callback: F,
2416 ) -> Result<HandleId, NodeError>
2417 where
2418 F: FnMut(&[u8]) + 'static,
2419 {
2420 // Pull the Node's identity + session slot out first so the
2421 // mutable session borrow doesn't conflict with the arena
2422 // alloc inside `add_arena_subscription_callback`.
2423 let (node_name, ns, session_idx) = {
2424 let r = self
2425 .nodes
2426 .get(node_id.index())
2427 .ok_or(NodeError::InvalidSchedContextBinding)?;
2428 (r.name.clone(), r.namespace.clone(), r.session_idx)
2429 };
2430 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2431 if !node_name.is_empty() {
2432 topic = topic.with_node_name(&node_name);
2433 }
2434 let handle = {
2435 let session = self
2436 .session_at_mut(session_idx)
2437 .ok_or(NodeError::BackendMismatch)?;
2438 session
2439 .create_subscriber(&topic, qos)
2440 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2441 };
2442 let handle_id = self.add_arena_subscription_callback::<F, RX_BUF>(handle, qos, callback)?;
2443 // Phase 104.C.4 — apply Node's default SchedContext.
2444 self.apply_node_default_sched(handle_id.0, Some(node_id));
2445 Ok(handle_id)
2446 }
2447
2448 /// Register a borrowed (zero-copy) buffered subscription (Phase 229.6,
2449 /// issue 0007 / RFC-0033 `borrowed` mode).
2450 ///
2451 /// `B` is the code-generated borrowed-message marker (e.g. `ImageBorrow`)
2452 /// implementing [`BorrowedMessage`](nros_core::BorrowedMessage); the
2453 /// callback receives `&B::View<'a>` — a lifetime-carrying message whose
2454 /// unbounded sequence/string fields borrow directly from the receive buffer
2455 /// (no `heapless::Vec` copy). The view is valid only for the callback's
2456 /// duration.
2457 ///
2458 /// **Triple-buffer only.** A borrowed view must reference exactly one
2459 /// well-defined buffer slot for the callback's duration; an SPSC ring
2460 /// (`qos.depth > 1`) keeps several samples in flight with no single such
2461 /// slot. `qos.depth > 1` is therefore rejected with
2462 /// [`TransportError::Unsupported`].
2463 pub(crate) fn register_subscription_buffered_borrowed_on<B, F, const RX_BUF: usize>(
2464 &mut self,
2465 node_id: super::node_record::NodeId,
2466 topic_name: &str,
2467 qos: QosSettings,
2468 callback: F,
2469 ) -> Result<HandleId, NodeError>
2470 where
2471 B: nros_core::BorrowedMessage + 'static,
2472 F: for<'a> FnMut(&B::View<'a>) + 'static,
2473 {
2474 type Entry<B, F> = SubBufferedBorrowedEntry<B, F>;
2475
2476 // Borrowed views require a single well-defined slot (triple buffer).
2477 if qos.depth > 1 {
2478 return Err(NodeError::Transport(TransportError::Unsupported));
2479 }
2480
2481 let slot = self.next_entry_slot()?;
2482 let (node_name, ns, session_idx) = {
2483 let r = self
2484 .nodes
2485 .get(node_id.index())
2486 .ok_or(NodeError::InvalidSchedContextBinding)?;
2487 (r.name.clone(), r.namespace.clone(), r.session_idx)
2488 };
2489 let mut topic = TopicInfo::new(
2490 topic_name,
2491 <B as BorrowedMessage>::TYPE_NAME,
2492 <B as BorrowedMessage>::TYPE_HASH,
2493 )
2494 .with_namespace(&ns);
2495 if !node_name.is_empty() {
2496 topic = topic.with_node_name(&node_name);
2497 }
2498 let handle = {
2499 let session = self
2500 .session_at_mut(session_idx)
2501 .ok_or(NodeError::BackendMismatch)?;
2502 session
2503 .create_subscriber(&topic, qos)
2504 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2505 };
2506
2507 let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2508 let (entry_offset, trailing_offset) =
2509 self.arena_alloc_with_trailing::<Entry<B, F>>(trailing_bytes)?;
2510 let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2511
2512 // depth <= 1 guaranteed above → always triple buffer.
2513 let buffer = BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) });
2514
2515 unsafe {
2516 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2517 let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<B, F>;
2518 core::ptr::write(
2519 entry_ptr,
2520 Entry {
2521 handle,
2522 buffer,
2523 callback,
2524 _phantom: PhantomData,
2525 },
2526 );
2527 }
2528
2529 self.entries[slot] = Some(CallbackMeta {
2530 offset: entry_offset,
2531 kind: EntryKind::Subscription,
2532 try_process: sub_buffered_borrowed_try_process::<B, F>,
2533 has_data: sub_buffered_borrowed_has_data::<B, F>,
2534 pre_sample: no_pre_sample,
2535 invocation: InvocationMode::OnNewData,
2536 drop_fn: drop_entry::<Entry<B, F>>,
2537 });
2538 self.apply_node_default_sched(slot, Some(node_id));
2539 Ok(HandleId(slot))
2540 }
2541
2542 /// Register a raw (type-erased) buffered subscription whose callback
2543 /// also receives a [`RawMessageInfo`](nros_core::RawMessageInfo)
2544 /// carrying the sample's wire **attachment** (Phase 189.M1).
2545 ///
2546 /// Backs the `node.subscription(t).generic(..).message_info().build(cb)`
2547 /// builder — the cross-RMW bridge reads the `bridge_origin` tag from
2548 /// `info.attachment()` for echo suppression. One sample per
2549 /// `spin_once`; the attachment is staged in a flat per-entry buffer
2550 /// (cap [`RAW_INFO_ATT_CAP`](super::arena::RAW_INFO_ATT_CAP)).
2551 pub fn register_subscription_buffered_raw_info_on<F, const RX_BUF: usize>(
2552 &mut self,
2553 node_id: super::node_record::NodeId,
2554 topic_name: &str,
2555 type_name: &str,
2556 type_hash: &str,
2557 qos: QosSettings,
2558 callback: F,
2559 ) -> Result<HandleId, NodeError>
2560 where
2561 F: FnMut(&[u8], &nros_core::RawMessageInfo) + 'static,
2562 {
2563 type Entry<F, const N: usize> = SubBufferedRawInfoEntry<F, N>;
2564
2565 let slot = self.next_entry_slot()?;
2566 let (node_name, ns, session_idx) = {
2567 let r = self
2568 .nodes
2569 .get(node_id.index())
2570 .ok_or(NodeError::InvalidSchedContextBinding)?;
2571 (r.name.clone(), r.namespace.clone(), r.session_idx)
2572 };
2573 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2574 if !node_name.is_empty() {
2575 topic = topic.with_node_name(&node_name);
2576 }
2577 let handle = {
2578 let session = self
2579 .session_at_mut(session_idx)
2580 .ok_or(NodeError::BackendMismatch)?;
2581 session
2582 .create_subscriber(&topic, qos)
2583 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2584 };
2585
2586 let offset = self.arena_alloc::<Entry<F, RX_BUF>>()?;
2587 unsafe {
2588 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2589 let entry_ptr = arena_ptr.add(offset) as *mut Entry<F, RX_BUF>;
2590 core::ptr::write(
2591 entry_ptr,
2592 Entry {
2593 handle,
2594 buffer: [0u8; RX_BUF],
2595 att: [0u8; super::arena::RAW_INFO_ATT_CAP],
2596 callback,
2597 },
2598 );
2599 }
2600
2601 self.entries[slot] = Some(CallbackMeta {
2602 offset,
2603 kind: EntryKind::Subscription,
2604 try_process: sub_buffered_raw_info_try_process::<F, RX_BUF>,
2605 has_data: sub_buffered_raw_info_has_data::<F, RX_BUF>,
2606 pre_sample: no_pre_sample,
2607 invocation: InvocationMode::OnNewData,
2608 drop_fn: drop_entry::<Entry<F, RX_BUF>>,
2609 });
2610 self.apply_node_default_sched(slot, Some(node_id));
2611 Ok(HandleId(slot))
2612 }
2613
2614 /// Phase 250 (Wave 2) — register a generic (type-erased) raw subscription
2615 /// that surfaces E2E [`IntegrityStatus`](nros_rmw::IntegrityStatus) (CRC +
2616 /// sequence gap/dup) alongside the raw CDR bytes
2617 /// (`FnMut(&[u8], &IntegrityStatus)`). The type-erased analog of
2618 /// [`register_subscription_with_safety_sized_inner`]: the validator lives in
2619 /// the `RmwSubscriber` (`try_recv_validated`), so the subscriber is created
2620 /// plainly and no `register_type::<M>()` is needed (the declarative `Node`
2621 /// path is generic). Used by the declarative runtime's `.safety()` opt-in.
2622 #[cfg(feature = "safety-e2e")]
2623 pub fn register_subscription_buffered_raw_safety_on<F, const RX_BUF: usize>(
2624 &mut self,
2625 node_id: super::node_record::NodeId,
2626 topic_name: &str,
2627 type_name: &str,
2628 type_hash: &str,
2629 qos: QosSettings,
2630 callback: F,
2631 ) -> Result<HandleId, NodeError>
2632 where
2633 F: FnMut(&[u8], &nros_rmw::IntegrityStatus) + 'static,
2634 {
2635 use super::arena::{
2636 SubBufferedRawSafetyEntry, sub_buffered_raw_safety_has_data,
2637 sub_buffered_raw_safety_try_process,
2638 };
2639 type Entry<F, const N: usize> = SubBufferedRawSafetyEntry<F, N>;
2640
2641 let slot = self.next_entry_slot()?;
2642 let (node_name, ns, session_idx) = {
2643 let r = self
2644 .nodes
2645 .get(node_id.index())
2646 .ok_or(NodeError::InvalidSchedContextBinding)?;
2647 (r.name.clone(), r.namespace.clone(), r.session_idx)
2648 };
2649 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2650 if !node_name.is_empty() {
2651 topic = topic.with_node_name(&node_name);
2652 }
2653 let handle = {
2654 let session = self
2655 .session_at_mut(session_idx)
2656 .ok_or(NodeError::BackendMismatch)?;
2657 session
2658 .create_subscriber(&topic, qos)
2659 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2660 };
2661
2662 let offset = self.arena_alloc::<Entry<F, RX_BUF>>()?;
2663 unsafe {
2664 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2665 let entry_ptr = arena_ptr.add(offset) as *mut Entry<F, RX_BUF>;
2666 core::ptr::write(
2667 entry_ptr,
2668 Entry {
2669 handle,
2670 buffer: [0u8; RX_BUF],
2671 callback,
2672 },
2673 );
2674 }
2675
2676 self.entries[slot] = Some(CallbackMeta {
2677 offset,
2678 kind: EntryKind::Subscription,
2679 try_process: sub_buffered_raw_safety_try_process::<F, RX_BUF>,
2680 has_data: sub_buffered_raw_safety_has_data::<F, RX_BUF>,
2681 pre_sample: no_pre_sample,
2682 invocation: InvocationMode::OnNewData,
2683 drop_fn: drop_entry::<Entry<F, RX_BUF>>,
2684 });
2685 self.apply_node_default_sched(slot, Some(node_id));
2686 Ok(HandleId(slot))
2687 }
2688
2689 /// Register a raw byte-shaped callback against a pre-built
2690 /// `RmwSubscriber` handle.
2691 ///
2692 /// Backend-agnostic primitive — the caller is responsible for
2693 /// obtaining the handle by whatever route the active backend
2694 /// supports:
2695 ///
2696 /// - **Generic ROS-typed flow**: call `Session::create_subscriber`
2697 /// on `self.session_mut()` with a [`TopicInfo`]. The
2698 /// `node_mut(id).subscription(t).generic(ty, hash)` builder is the
2699 /// convenience wrapper for this path.
2700 /// - **Backend-specific flow** (e.g. uORB needs `&'static orb_metadata`):
2701 /// reach into the concrete session via [`Self::session_mut`] and
2702 /// call its backend-specific create method, then hand the handle
2703 /// here. `nros-px4::uorb::create_subscription_with_callback` is
2704 /// the example.
2705 ///
2706 /// The arena-store + vtable wiring is identical to
2707 /// `register_subscription_buffered_raw`; the only thing that varies is
2708 /// where the handle came from. Callback fires on every message
2709 /// delivery during [`spin_once`](Self::spin_once); bytes are
2710 /// passed as `&[u8]`.
2711 pub fn add_arena_subscription_callback<F, const RX_BUF: usize>(
2712 &mut self,
2713 handle: session::RmwSubscriber,
2714 qos: QosSettings,
2715 callback: F,
2716 ) -> Result<HandleId, NodeError>
2717 where
2718 F: FnMut(&[u8]) + 'static,
2719 {
2720 type Entry<F> = SubBufferedRawEntry<F>;
2721
2722 let slot = self.next_entry_slot()?;
2723 let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2724
2725 let (entry_offset, trailing_offset) =
2726 self.arena_alloc_with_trailing::<Entry<F>>(trailing_bytes)?;
2727
2728 let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2729
2730 let buffer = if qos.depth <= 1 {
2731 BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
2732 } else {
2733 BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
2734 };
2735
2736 unsafe {
2737 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2738 let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<F>;
2739 core::ptr::write(
2740 entry_ptr,
2741 Entry {
2742 handle,
2743 buffer,
2744 callback,
2745 },
2746 );
2747 }
2748
2749 self.entries[slot] = Some(CallbackMeta {
2750 offset: entry_offset,
2751 kind: EntryKind::Subscription,
2752 try_process: sub_buffered_raw_try_process::<F>,
2753 has_data: sub_buffered_raw_has_data::<F>,
2754 pre_sample: no_pre_sample,
2755 invocation: InvocationMode::OnNewData,
2756 drop_fn: drop_entry::<Entry<F>>,
2757 });
2758 Ok(HandleId(slot))
2759 }
2760
2761 pub(crate) fn register_subscription_with_info_sized_inner<M, F, const RX_BUF: usize>(
2762 &mut self,
2763 node_id: Option<super::node_record::NodeId>,
2764 topic_name: &str,
2765 qos: QosSettings,
2766 callback: F,
2767 ) -> Result<HandleId, NodeError>
2768 where
2769 M: crate::cyclonedds_register::MessageForRmw + 'static,
2770 F: FnMut(&M, Option<&nros_core::MessageInfo>) + 'static,
2771 {
2772 type Entry<M, F, const N: usize> = SubInfoEntry<M, F, N>;
2773
2774 // Phase 212.K.7.6.b — see `create_publisher_on`.
2775 crate::cyclonedds_register::register_type::<M>()?;
2776
2777 let slot = self.next_entry_slot()?;
2778 let (node_name, ns, session_idx) = match node_id {
2779 Some(id) => {
2780 let r = self
2781 .nodes
2782 .get(id.index())
2783 .ok_or(NodeError::InvalidSchedContextBinding)?;
2784 (r.name.clone(), r.namespace.clone(), r.session_idx)
2785 }
2786 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
2787 };
2788 let mut topic = TopicInfo::new(
2789 topic_name,
2790 <M as RosMessage>::TYPE_NAME,
2791 <M as RosMessage>::TYPE_HASH,
2792 )
2793 .with_namespace(&ns);
2794 if !node_name.is_empty() {
2795 topic = topic.with_node_name(&node_name);
2796 }
2797 let handle = {
2798 let session = self
2799 .session_at_mut(session_idx)
2800 .ok_or(NodeError::BackendMismatch)?;
2801 session
2802 .create_subscriber(&topic, qos)
2803 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2804 };
2805
2806 let offset = self.arena_alloc::<Entry<M, F, RX_BUF>>()?;
2807
2808 unsafe {
2809 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2810 let entry_ptr = arena_ptr.add(offset) as *mut Entry<M, F, RX_BUF>;
2811 core::ptr::write(
2812 entry_ptr,
2813 Entry {
2814 handle,
2815 buffer: [0u8; RX_BUF],
2816 sampled_len: 0,
2817 callback,
2818 _phantom: PhantomData,
2819 },
2820 );
2821 }
2822
2823 self.entries[slot] = Some(CallbackMeta {
2824 offset,
2825 kind: EntryKind::Subscription,
2826 try_process: sub_info_try_process::<M, F, RX_BUF>,
2827 has_data: sub_info_has_data::<M, F, RX_BUF>,
2828 pre_sample: sub_info_pre_sample::<M, F, RX_BUF>,
2829 invocation: InvocationMode::OnNewData,
2830 drop_fn: drop_entry::<Entry<M, F, RX_BUF>>,
2831 });
2832 self.apply_node_default_sched(slot, node_id);
2833 Ok(HandleId(slot))
2834 }
2835
2836 #[cfg(feature = "safety-e2e")]
2837 pub(crate) fn register_subscription_with_safety_sized_inner<M, F, const RX_BUF: usize>(
2838 &mut self,
2839 node_id: Option<super::node_record::NodeId>,
2840 topic_name: &str,
2841 qos: QosSettings,
2842 callback: F,
2843 ) -> Result<HandleId, NodeError>
2844 where
2845 M: crate::cyclonedds_register::MessageForRmw + 'static,
2846 F: FnMut(&M, &nros_rmw::IntegrityStatus) + 'static,
2847 {
2848 type Entry<M, F, const N: usize> = SubSafetyEntry<M, F, N>;
2849
2850 // Phase 212.K.7.6.b — see `create_publisher_on`.
2851 crate::cyclonedds_register::register_type::<M>()?;
2852
2853 let slot = self.next_entry_slot()?;
2854 let (node_name, ns, session_idx) = match node_id {
2855 Some(id) => {
2856 let r = self
2857 .nodes
2858 .get(id.index())
2859 .ok_or(NodeError::InvalidSchedContextBinding)?;
2860 (r.name.clone(), r.namespace.clone(), r.session_idx)
2861 }
2862 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
2863 };
2864 let mut topic = TopicInfo::new(
2865 topic_name,
2866 <M as RosMessage>::TYPE_NAME,
2867 <M as RosMessage>::TYPE_HASH,
2868 )
2869 .with_namespace(&ns);
2870 if !node_name.is_empty() {
2871 topic = topic.with_node_name(&node_name);
2872 }
2873 let handle = {
2874 let session = self
2875 .session_at_mut(session_idx)
2876 .ok_or(NodeError::BackendMismatch)?;
2877 session
2878 .create_subscriber(&topic, qos)
2879 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2880 };
2881
2882 let offset = self.arena_alloc::<Entry<M, F, RX_BUF>>()?;
2883
2884 unsafe {
2885 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2886 let entry_ptr = arena_ptr.add(offset) as *mut Entry<M, F, RX_BUF>;
2887 core::ptr::write(
2888 entry_ptr,
2889 Entry {
2890 handle,
2891 buffer: [0u8; RX_BUF],
2892 sampled_len: 0,
2893 callback,
2894 _phantom: PhantomData,
2895 },
2896 );
2897 }
2898
2899 self.entries[slot] = Some(CallbackMeta {
2900 offset,
2901 kind: EntryKind::Subscription,
2902 try_process: sub_safety_try_process::<M, F, RX_BUF>,
2903 has_data: sub_safety_has_data::<M, F, RX_BUF>,
2904 pre_sample: sub_safety_pre_sample::<M, F, RX_BUF>,
2905 invocation: InvocationMode::OnNewData,
2906 drop_fn: drop_entry::<Entry<M, F, RX_BUF>>,
2907 });
2908 self.apply_node_default_sched(slot, node_id);
2909 Ok(HandleId(slot))
2910 }
2911
2912 /// Register a service callback with the default buffer size.
2913 ///
2914 /// The callback is stored in the arena and invoked during [`spin_once()`](Self::spin_once).
2915 pub fn register_service<Svc, F>(
2916 &mut self,
2917 service_name: &str,
2918 callback: F,
2919 ) -> Result<HandleId, NodeError>
2920 where
2921 Svc: RosService + 'static,
2922 Svc::Request: crate::cyclonedds_register::MessageForRmw,
2923 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
2924 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
2925 {
2926 self.register_service_sized::<Svc, F, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, callback)
2927 }
2928
2929 /// Register a service callback with custom request/reply buffer sizes.
2930 pub fn register_service_sized<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
2931 &mut self,
2932 service_name: &str,
2933 callback: F,
2934 ) -> Result<HandleId, NodeError>
2935 where
2936 Svc: RosService + 'static,
2937 Svc::Request: crate::cyclonedds_register::MessageForRmw,
2938 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
2939 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
2940 {
2941 type Entry<Svc, F, const RQ: usize, const RP: usize> = SrvEntry<Svc, F, RQ, RP>;
2942
2943 // Phase 212.K.7.7.b — register both halves of the service round-trip
2944 // under cyclonedds. No-op for other RMWs. Mirrors the K.7.6.b hook
2945 // on `Node::create_service_sized`.
2946 crate::cyclonedds_register::register_type::<Svc::Request>()?;
2947 crate::cyclonedds_register::register_type::<Svc::Reply>()?;
2948
2949 let slot = self.next_entry_slot()?;
2950 let node_name: heapless::String<64> = self.node_name.clone();
2951 let ns: heapless::String<64> = self.namespace.clone();
2952 let mut info = ServiceInfo::new(service_name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
2953 .with_namespace(&ns);
2954 if !node_name.is_empty() {
2955 info = info.with_node_name(&node_name);
2956 }
2957 let handle = self
2958 .session
2959 .create_service_server(&info, QosSettings::services_default())
2960 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
2961
2962 let offset = self.arena_alloc::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>()?;
2963
2964 // SAFETY: same guarantees as register_subscription_sized.
2965 unsafe {
2966 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2967 let entry_ptr = arena_ptr.add(offset) as *mut Entry<Svc, F, REQ_BUF, REPLY_BUF>;
2968 core::ptr::write(
2969 entry_ptr,
2970 Entry {
2971 handle,
2972 req_buffer: [0u8; REQ_BUF],
2973 reply_buffer: [0u8; REPLY_BUF],
2974 callback,
2975 _phantom: PhantomData,
2976 },
2977 );
2978 }
2979
2980 self.entries[slot] = Some(CallbackMeta {
2981 offset,
2982 kind: EntryKind::Service,
2983 try_process: srv_try_process::<Svc, F, REQ_BUF, REPLY_BUF>,
2984 has_data: srv_has_data::<Svc, F, REQ_BUF, REPLY_BUF>,
2985 pre_sample: no_pre_sample,
2986 invocation: InvocationMode::OnNewData,
2987 drop_fn: drop_entry::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>,
2988 });
2989 Ok(HandleId(slot))
2990 }
2991
2992 /// Phase 104.C.3.3.a — Node-aware variant of
2993 /// [`register_service_sized`](Self::register_service_sized).
2994 pub fn register_service_sized_on<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
2995 &mut self,
2996 node_id: super::node_record::NodeId,
2997 service_name: &str,
2998 qos: QosSettings,
2999 callback: F,
3000 ) -> Result<HandleId, NodeError>
3001 where
3002 Svc: RosService + 'static,
3003 Svc::Request: crate::cyclonedds_register::MessageForRmw,
3004 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
3005 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
3006 {
3007 type Entry<Svc, F, const RQ: usize, const RP: usize> = SrvEntry<Svc, F, RQ, RP>;
3008
3009 // Phase 212.K.7.7.b — see `register_service_sized`.
3010 crate::cyclonedds_register::register_type::<Svc::Request>()?;
3011 crate::cyclonedds_register::register_type::<Svc::Reply>()?;
3012
3013 let slot = self.next_entry_slot()?;
3014 let (node_name, ns, session_idx) = {
3015 let r = self
3016 .nodes
3017 .get(node_id.index())
3018 .ok_or(NodeError::InvalidSchedContextBinding)?;
3019 (r.name.clone(), r.namespace.clone(), r.session_idx)
3020 };
3021 let mut info = ServiceInfo::new(service_name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
3022 .with_namespace(&ns);
3023 if !node_name.is_empty() {
3024 info = info.with_node_name(&node_name);
3025 }
3026 let handle = {
3027 let session = self
3028 .session_at_mut(session_idx)
3029 .ok_or(NodeError::BackendMismatch)?;
3030 // Phase 193.5 — validate against the backend's supported policies
3031 // (no silent downgrade); request/reply effectively requires RELIABLE.
3032 qos.validate_against(session.supported_qos_policies())
3033 .map_err(NodeError::Transport)?;
3034 session
3035 .create_service_server(&info, qos)
3036 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?
3037 };
3038
3039 let offset = self.arena_alloc::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>()?;
3040 unsafe {
3041 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3042 let entry_ptr = arena_ptr.add(offset) as *mut Entry<Svc, F, REQ_BUF, REPLY_BUF>;
3043 core::ptr::write(
3044 entry_ptr,
3045 Entry {
3046 handle,
3047 req_buffer: [0u8; REQ_BUF],
3048 reply_buffer: [0u8; REPLY_BUF],
3049 callback,
3050 _phantom: PhantomData,
3051 },
3052 );
3053 }
3054
3055 self.entries[slot] = Some(CallbackMeta {
3056 offset,
3057 kind: EntryKind::Service,
3058 try_process: srv_try_process::<Svc, F, REQ_BUF, REPLY_BUF>,
3059 has_data: srv_has_data::<Svc, F, REQ_BUF, REPLY_BUF>,
3060 pre_sample: no_pre_sample,
3061 invocation: InvocationMode::OnNewData,
3062 drop_fn: drop_entry::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>,
3063 });
3064 self.apply_node_default_sched(slot, Some(node_id));
3065 Ok(HandleId(slot))
3066 }
3067
3068 /// Phase 104.C.3.3.a — Node-aware variant of
3069 /// [`register_service`](Self::register_service).
3070 pub fn register_service_on<Svc, F>(
3071 &mut self,
3072 node_id: super::node_record::NodeId,
3073 service_name: &str,
3074 callback: F,
3075 ) -> Result<HandleId, NodeError>
3076 where
3077 Svc: RosService + 'static,
3078 Svc::Request: crate::cyclonedds_register::MessageForRmw,
3079 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
3080 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
3081 {
3082 self.register_service_sized_on::<
3083 Svc,
3084 F,
3085 { crate::config::DEFAULT_RX_BUF_SIZE },
3086 { crate::config::DEFAULT_RX_BUF_SIZE },
3087 >(node_id, service_name, QosSettings::services_default(), callback)
3088 }
3089
3090 // ========================================================================
3091 // Timer registration
3092 // ========================================================================
3093
3094 /// Register a repeating timer callback.
3095 ///
3096 /// The callback fires every `period` milliseconds during [`spin_once()`](Self::spin_once).
3097 /// The timer delta is approximated by the `timeout_ms` argument to `spin_once`.
3098 pub fn register_timer<F>(
3099 &mut self,
3100 period: TimerDuration,
3101 callback: F,
3102 ) -> Result<HandleId, NodeError>
3103 where
3104 F: FnMut() + 'static,
3105 {
3106 let slot = self.next_entry_slot()?;
3107 let offset = self.arena_alloc::<TimerEntry<F>>()?;
3108
3109 unsafe {
3110 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3111 let entry_ptr = arena_ptr.add(offset) as *mut TimerEntry<F>;
3112 core::ptr::write(
3113 entry_ptr,
3114 TimerEntry {
3115 period_ms: period.as_millis(),
3116 elapsed_ms: 0,
3117 oneshot: false,
3118 fired: false,
3119 cancelled: false,
3120 callback,
3121 },
3122 );
3123 }
3124
3125 self.entries[slot] = Some(CallbackMeta {
3126 offset,
3127 kind: EntryKind::Timer,
3128 try_process: timer_try_process::<F>,
3129 has_data: always_ready,
3130 pre_sample: no_pre_sample,
3131 invocation: InvocationMode::Always,
3132 drop_fn: drop_entry::<TimerEntry<F>>,
3133 });
3134 Ok(HandleId(slot))
3135 }
3136
3137 /// Register a one-shot timer callback.
3138 ///
3139 /// The callback fires once after `delay` milliseconds, then becomes inert.
3140 pub fn register_timer_oneshot<F>(
3141 &mut self,
3142 delay: TimerDuration,
3143 callback: F,
3144 ) -> Result<HandleId, NodeError>
3145 where
3146 F: FnMut() + 'static,
3147 {
3148 let slot = self.next_entry_slot()?;
3149 let offset = self.arena_alloc::<TimerEntry<F>>()?;
3150
3151 unsafe {
3152 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3153 let entry_ptr = arena_ptr.add(offset) as *mut TimerEntry<F>;
3154 core::ptr::write(
3155 entry_ptr,
3156 TimerEntry {
3157 period_ms: delay.as_millis(),
3158 elapsed_ms: 0,
3159 oneshot: true,
3160 fired: false,
3161 cancelled: false,
3162 callback,
3163 },
3164 );
3165 }
3166
3167 self.entries[slot] = Some(CallbackMeta {
3168 offset,
3169 kind: EntryKind::Timer,
3170 try_process: timer_try_process::<F>,
3171 has_data: always_ready,
3172 pre_sample: no_pre_sample,
3173 invocation: InvocationMode::Always,
3174 drop_fn: drop_entry::<TimerEntry<F>>,
3175 });
3176 Ok(HandleId(slot))
3177 }
3178
3179 // ========================================================================
3180 // Raw callback registration (for C API)
3181 // ========================================================================
3182
3183 /// The kept C-FFI subscription core (Phase 189.M2.b): registers a
3184 /// raw `RawSubscriptionCallback` fn-ptr + `context` against an
3185 /// optional node's session. The Rust ergonomic surface is the
3186 /// `node.subscription(t)` builder (closures); this is the single
3187 /// primitive the `nros-c` thin wrapper lowers to. `node_id == None`
3188 /// is the legacy single-node path.
3189 #[allow(clippy::too_many_arguments)]
3190 pub fn add_arena_subscription_c_callback<const RX_BUF: usize>(
3191 &mut self,
3192 node_id: Option<super::node_record::NodeId>,
3193 topic_name: &str,
3194 type_name: &str,
3195 type_hash: &str,
3196 qos: QosSettings,
3197 callback: RawSubscriptionCallback,
3198 context: *mut core::ffi::c_void,
3199 ) -> Result<HandleId, NodeError> {
3200 let slot = self.next_entry_slot()?;
3201 let (node_name, ns, session_idx) = match node_id {
3202 Some(id) => {
3203 let r = self
3204 .nodes
3205 .get(id.index())
3206 .ok_or(NodeError::InvalidSchedContextBinding)?;
3207 (r.name.clone(), r.namespace.clone(), r.session_idx)
3208 }
3209 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3210 };
3211 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
3212 if !node_name.is_empty() {
3213 topic = topic.with_node_name(&node_name);
3214 }
3215 let handle = {
3216 let session = self
3217 .session_at_mut(session_idx)
3218 .ok_or(NodeError::BackendMismatch)?;
3219 session
3220 .create_subscriber(&topic, qos)
3221 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
3222 };
3223
3224 let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
3225
3226 let (entry_offset, trailing_offset) =
3227 self.arena_alloc_with_trailing::<SubBufferedRawCEntry>(trailing_bytes)?;
3228
3229 let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
3230
3231 let buffer = if qos.depth <= 1 {
3232 BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
3233 } else {
3234 BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
3235 };
3236
3237 unsafe {
3238 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3239 let entry_ptr = arena_ptr.add(entry_offset) as *mut SubBufferedRawCEntry;
3240 core::ptr::write(
3241 entry_ptr,
3242 SubBufferedRawCEntry {
3243 handle,
3244 buffer,
3245 callback,
3246 context,
3247 },
3248 );
3249 }
3250
3251 self.entries[slot] = Some(CallbackMeta {
3252 offset: entry_offset,
3253 kind: EntryKind::Subscription,
3254 try_process: sub_buffered_raw_c_try_process,
3255 has_data: sub_buffered_raw_c_has_data,
3256 pre_sample: no_pre_sample,
3257 invocation: InvocationMode::OnNewData,
3258 drop_fn: drop_entry::<SubBufferedRawCEntry>,
3259 });
3260 self.apply_node_default_sched(slot, node_id);
3261 Ok(HandleId(slot))
3262 }
3263
3264 /// Phase 189.M3.4 — register a raw C-fn-ptr subscription whose callback
3265 /// also receives the sample's wire **attachment**
3266 /// ([`RawSubscriptionInfoCallback`]: `(data, len, attachment, att_len,
3267 /// context)`) — the C analog of the Rust
3268 /// `node.subscription(t).generic(..).message_info()` builder. Backs the C
3269 /// FFI `nros_executor_register_subscription_raw_with_info`. Flat per-entry
3270 /// payload + attachment buffers (cap [`RAW_INFO_ATT_CAP`](super::arena::RAW_INFO_ATT_CAP));
3271 /// one sample per `spin_once`.
3272 #[allow(clippy::too_many_arguments)]
3273 pub fn add_arena_subscription_c_info_callback<const RX_BUF: usize>(
3274 &mut self,
3275 node_id: Option<super::node_record::NodeId>,
3276 topic_name: &str,
3277 type_name: &str,
3278 type_hash: &str,
3279 qos: QosSettings,
3280 callback: RawSubscriptionInfoCallback,
3281 context: *mut core::ffi::c_void,
3282 ) -> Result<HandleId, NodeError> {
3283 type Entry<const N: usize> = SubBufferedRawInfoCEntry<N>;
3284
3285 let slot = self.next_entry_slot()?;
3286 let (node_name, ns, session_idx) = match node_id {
3287 Some(id) => {
3288 let r = self
3289 .nodes
3290 .get(id.index())
3291 .ok_or(NodeError::InvalidSchedContextBinding)?;
3292 (r.name.clone(), r.namespace.clone(), r.session_idx)
3293 }
3294 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3295 };
3296 let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
3297 if !node_name.is_empty() {
3298 topic = topic.with_node_name(&node_name);
3299 }
3300 let handle = {
3301 let session = self
3302 .session_at_mut(session_idx)
3303 .ok_or(NodeError::BackendMismatch)?;
3304 session
3305 .create_subscriber(&topic, qos)
3306 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
3307 };
3308
3309 let offset = self.arena_alloc::<Entry<RX_BUF>>()?;
3310 unsafe {
3311 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3312 let entry_ptr = arena_ptr.add(offset) as *mut Entry<RX_BUF>;
3313 core::ptr::write(
3314 entry_ptr,
3315 Entry {
3316 handle,
3317 buffer: [0u8; RX_BUF],
3318 att: [0u8; super::arena::RAW_INFO_ATT_CAP],
3319 callback,
3320 context,
3321 },
3322 );
3323 }
3324
3325 self.entries[slot] = Some(CallbackMeta {
3326 offset,
3327 kind: EntryKind::Subscription,
3328 try_process: sub_buffered_raw_info_c_try_process::<RX_BUF>,
3329 has_data: sub_buffered_raw_info_c_has_data::<RX_BUF>,
3330 pre_sample: no_pre_sample,
3331 invocation: InvocationMode::OnNewData,
3332 drop_fn: drop_entry::<Entry<RX_BUF>>,
3333 });
3334 self.apply_node_default_sched(slot, node_id);
3335 Ok(HandleId(slot))
3336 }
3337
3338 /// Register a raw (untyped) service callback.
3339 ///
3340 /// Register a raw (untyped) service callback with the default buffer size.
3341 ///
3342 /// The callback receives and produces CDR bytes without typed
3343 /// deserialization/serialization. Used by the C API wrapper.
3344 pub fn register_service_raw(
3345 &mut self,
3346 service_name: &str,
3347 service_type: &str,
3348 service_hash: &str,
3349 callback: RawServiceCallback,
3350 context: *mut core::ffi::c_void,
3351 ) -> Result<HandleId, NodeError> {
3352 self.register_service_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(
3353 service_name,
3354 service_type,
3355 service_hash,
3356 QosSettings::services_default(),
3357 callback,
3358 context,
3359 )
3360 }
3361
3362 /// Register a raw (untyped) service callback with custom buffer sizes + QoS.
3363 ///
3364 /// `REQ_BUF` and `REPLY_BUF` set the stack-allocated CDR buffers
3365 /// for the request and reply respectively. Increase for services
3366 /// with large payloads (e.g., parameter services). `qos` applies to both
3367 /// the request + reply endpoints (Phase 193.2c).
3368 #[allow(clippy::too_many_arguments)]
3369 pub fn register_service_raw_sized<const REQ_BUF: usize, const REPLY_BUF: usize>(
3370 &mut self,
3371 service_name: &str,
3372 service_type: &str,
3373 service_hash: &str,
3374 qos: QosSettings,
3375 callback: RawServiceCallback,
3376 context: *mut core::ffi::c_void,
3377 ) -> Result<HandleId, NodeError> {
3378 self.register_service_raw_sized_inner::<REQ_BUF, REPLY_BUF>(
3379 None,
3380 service_name,
3381 service_type,
3382 service_hash,
3383 qos,
3384 callback,
3385 context,
3386 )
3387 }
3388
3389 /// Phase 104.C.3.3.a — Node-aware variant of
3390 /// [`register_service_raw_sized`]. C-FFI path.
3391 #[allow(clippy::too_many_arguments)]
3392 pub fn register_service_raw_sized_on<const REQ_BUF: usize, const REPLY_BUF: usize>(
3393 &mut self,
3394 node_id: super::node_record::NodeId,
3395 service_name: &str,
3396 service_type: &str,
3397 service_hash: &str,
3398 qos: QosSettings,
3399 callback: RawServiceCallback,
3400 context: *mut core::ffi::c_void,
3401 ) -> Result<HandleId, NodeError> {
3402 self.register_service_raw_sized_inner::<REQ_BUF, REPLY_BUF>(
3403 Some(node_id),
3404 service_name,
3405 service_type,
3406 service_hash,
3407 qos,
3408 callback,
3409 context,
3410 )
3411 }
3412
3413 #[allow(clippy::too_many_arguments)]
3414 fn register_service_raw_sized_inner<const REQ_BUF: usize, const REPLY_BUF: usize>(
3415 &mut self,
3416 node_id: Option<super::node_record::NodeId>,
3417 service_name: &str,
3418 service_type: &str,
3419 service_hash: &str,
3420 qos: QosSettings,
3421 callback: RawServiceCallback,
3422 context: *mut core::ffi::c_void,
3423 ) -> Result<HandleId, NodeError> {
3424 let slot = self.next_entry_slot()?;
3425 let (node_name, ns, session_idx) = match node_id {
3426 Some(id) => {
3427 let r = self
3428 .nodes
3429 .get(id.index())
3430 .ok_or(NodeError::InvalidSchedContextBinding)?;
3431 (r.name.clone(), r.namespace.clone(), r.session_idx)
3432 }
3433 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3434 };
3435 let mut info =
3436 ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3437 if !node_name.is_empty() {
3438 info = info.with_node_name(&node_name);
3439 }
3440 let handle = {
3441 let session = self
3442 .session_at_mut(session_idx)
3443 .ok_or(NodeError::BackendMismatch)?;
3444 // Phase 193.5 — validate against the backend's supported policies
3445 // (no silent downgrade); request/reply effectively requires RELIABLE.
3446 qos.validate_against(session.supported_qos_policies())
3447 .map_err(NodeError::Transport)?;
3448 session
3449 .create_service_server(&info, qos)
3450 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?
3451 };
3452
3453 let offset = self.arena_alloc::<SrvRawEntry<REQ_BUF, REPLY_BUF>>()?;
3454
3455 unsafe {
3456 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3457 let entry_ptr = arena_ptr.add(offset) as *mut SrvRawEntry<REQ_BUF, REPLY_BUF>;
3458 core::ptr::write(
3459 entry_ptr,
3460 SrvRawEntry {
3461 handle,
3462 req_buffer: [0u8; REQ_BUF],
3463 reply_buffer: [0u8; REPLY_BUF],
3464 callback,
3465 context,
3466 },
3467 );
3468 }
3469
3470 self.entries[slot] = Some(CallbackMeta {
3471 offset,
3472 kind: EntryKind::Service,
3473 try_process: srv_raw_try_process::<REQ_BUF, REPLY_BUF>,
3474 has_data: srv_raw_has_data::<REQ_BUF, REPLY_BUF>,
3475 pre_sample: no_pre_sample,
3476 invocation: InvocationMode::OnNewData,
3477 drop_fn: drop_entry::<SrvRawEntry<REQ_BUF, REPLY_BUF>>,
3478 });
3479 self.apply_node_default_sched(slot, node_id);
3480 Ok(HandleId(slot))
3481 }
3482
3483 // ========================================================================
3484 // Raw service client registration (Phase 82)
3485 // ========================================================================
3486
3487 /// Register a raw (untyped) service client with the default reply
3488 /// buffer size.
3489 ///
3490 /// The client is owned by the executor's arena. Each `spin_once`
3491 /// dispatch polls the in-flight reply slot via `try_recv_reply_raw`
3492 /// and fires the registered callback when the response arrives.
3493 /// Used by the C API thin wrapper — see Phase 82.
3494 pub fn register_service_client_raw(
3495 &mut self,
3496 service_name: &str,
3497 service_type: &str,
3498 service_hash: &str,
3499 callback: Option<RawResponseCallback>,
3500 context: *mut core::ffi::c_void,
3501 ) -> Result<HandleId, NodeError> {
3502 self.register_service_client_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }>(
3503 service_name,
3504 service_type,
3505 service_hash,
3506 QosSettings::services_default(),
3507 callback,
3508 context,
3509 )
3510 }
3511
3512 /// Register a raw service client with a custom reply buffer size + QoS.
3513 ///
3514 /// `qos` applies to the client's request + reply endpoints (Phase 193.3b);
3515 /// defaults to [`QosSettings::services_default`] via the convenience
3516 /// wrapper.
3517 #[allow(clippy::too_many_arguments)]
3518 pub fn register_service_client_raw_sized<const REPLY_BUF: usize>(
3519 &mut self,
3520 service_name: &str,
3521 service_type: &str,
3522 service_hash: &str,
3523 qos: QosSettings,
3524 callback: Option<RawResponseCallback>,
3525 context: *mut core::ffi::c_void,
3526 ) -> Result<HandleId, NodeError> {
3527 self.register_service_client_raw_sized_inner::<REPLY_BUF>(
3528 None,
3529 service_name,
3530 service_type,
3531 service_hash,
3532 qos,
3533 callback,
3534 context,
3535 )
3536 }
3537
3538 /// Phase 104.C.3.3.a — Node-aware variant of
3539 /// [`register_service_client_raw_sized`]. Routes the client
3540 /// creation through the named Node's session.
3541 #[allow(clippy::too_many_arguments)]
3542 pub fn register_service_client_raw_sized_on<const REPLY_BUF: usize>(
3543 &mut self,
3544 node_id: super::node_record::NodeId,
3545 service_name: &str,
3546 service_type: &str,
3547 service_hash: &str,
3548 qos: QosSettings,
3549 callback: Option<RawResponseCallback>,
3550 context: *mut core::ffi::c_void,
3551 ) -> Result<HandleId, NodeError> {
3552 self.register_service_client_raw_sized_inner::<REPLY_BUF>(
3553 Some(node_id),
3554 service_name,
3555 service_type,
3556 service_hash,
3557 qos,
3558 callback,
3559 context,
3560 )
3561 }
3562
3563 #[allow(clippy::too_many_arguments)]
3564 fn register_service_client_raw_sized_inner<const REPLY_BUF: usize>(
3565 &mut self,
3566 node_id: Option<super::node_record::NodeId>,
3567 service_name: &str,
3568 service_type: &str,
3569 service_hash: &str,
3570 qos: QosSettings,
3571 callback: Option<RawResponseCallback>,
3572 context: *mut core::ffi::c_void,
3573 ) -> Result<HandleId, NodeError> {
3574 let slot = self.next_entry_slot()?;
3575 let (node_name, ns, session_idx) = match node_id {
3576 Some(id) => {
3577 let r = self
3578 .nodes
3579 .get(id.index())
3580 .ok_or(NodeError::InvalidSchedContextBinding)?;
3581 (r.name.clone(), r.namespace.clone(), r.session_idx)
3582 }
3583 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3584 };
3585 let mut info =
3586 ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3587 if !node_name.is_empty() {
3588 info = info.with_node_name(&node_name);
3589 }
3590 let handle = {
3591 let session = self
3592 .session_at_mut(session_idx)
3593 .ok_or(NodeError::BackendMismatch)?;
3594 // Phase 193.5 — validate against the backend's supported policies
3595 // (no silent downgrade); request/reply effectively requires RELIABLE.
3596 qos.validate_against(session.supported_qos_policies())
3597 .map_err(NodeError::Transport)?;
3598 session
3599 .create_service_client(&info, qos)
3600 .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?
3601 };
3602
3603 let offset = self.arena_alloc::<ServiceClientRawArenaEntry<REPLY_BUF>>()?;
3604 unsafe {
3605 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3606 let entry_ptr = arena_ptr.add(offset) as *mut ServiceClientRawArenaEntry<REPLY_BUF>;
3607 core::ptr::write(
3608 entry_ptr,
3609 ServiceClientRawArenaEntry {
3610 handle,
3611 reply_buffer: [0u8; REPLY_BUF],
3612 pending: false,
3613 reply_ready: core::sync::atomic::AtomicBool::new(false),
3614 callback,
3615 context,
3616 },
3617 );
3618 }
3619
3620 self.entries[slot] = Some(CallbackMeta {
3621 offset,
3622 kind: EntryKind::ServiceClient,
3623 try_process: service_client_raw_try_process::<REPLY_BUF>,
3624 has_data: always_ready,
3625 pre_sample: no_pre_sample,
3626 invocation: InvocationMode::Always,
3627 drop_fn: drop_entry::<ServiceClientRawArenaEntry<REPLY_BUF>>,
3628 });
3629 self.apply_node_default_sched(slot, node_id);
3630 Ok(HandleId(slot))
3631 }
3632
3633 /// RFC-0041 / Phase 239.1 — register a **typed callback** service client.
3634 /// The reply is eager-drained at `spin_once` and dispatched to `callback` as
3635 /// a deserialized `Svc::Reply`. Returns the scheduling [`HandleId`] and a
3636 /// `*mut` to the arena entry's send header (used to build the typed
3637 /// [`ServiceClientCallback`](super::handles::ServiceClientCallback)).
3638 #[allow(clippy::too_many_arguments)]
3639 pub(crate) fn register_service_client_callback<Svc, F, const REPLY_BUF: usize>(
3640 &mut self,
3641 node_id: Option<super::node_record::NodeId>,
3642 service_name: &str,
3643 service_type: &str,
3644 service_hash: &str,
3645 qos: QosSettings,
3646 callback: F,
3647 ) -> Result<(HandleId, *mut ServiceClientSendHeader<REPLY_BUF>), NodeError>
3648 where
3649 Svc: nros_core::RosService + 'static,
3650 F: FnMut(&Svc::Reply) + 'static,
3651 {
3652 let slot = self.next_entry_slot()?;
3653 let (node_name, ns, session_idx) = match node_id {
3654 Some(id) => {
3655 let r = self
3656 .nodes
3657 .get(id.index())
3658 .ok_or(NodeError::InvalidSchedContextBinding)?;
3659 (r.name.clone(), r.namespace.clone(), r.session_idx)
3660 }
3661 None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3662 };
3663 let mut info =
3664 ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3665 if !node_name.is_empty() {
3666 info = info.with_node_name(&node_name);
3667 }
3668 let handle = {
3669 let session = self
3670 .session_at_mut(session_idx)
3671 .ok_or(NodeError::BackendMismatch)?;
3672 qos.validate_against(session.supported_qos_policies())
3673 .map_err(NodeError::Transport)?;
3674 session
3675 .create_service_client(&info, qos)
3676 .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?
3677 };
3678
3679 let offset = self.arena_alloc::<ServiceClientCallbackEntry<Svc, F, REPLY_BUF>>()?;
3680 let hdr_ptr = unsafe {
3681 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3682 let entry_ptr =
3683 arena_ptr.add(offset) as *mut ServiceClientCallbackEntry<Svc, F, REPLY_BUF>;
3684 core::ptr::write(
3685 entry_ptr,
3686 ServiceClientCallbackEntry {
3687 hdr: ServiceClientSendHeader {
3688 handle,
3689 reply_buffer: [0u8; REPLY_BUF],
3690 pending: false,
3691 reply_ready: core::sync::atomic::AtomicBool::new(false),
3692 },
3693 callback,
3694 _phantom: core::marker::PhantomData,
3695 },
3696 );
3697 &mut (*entry_ptr).hdr as *mut ServiceClientSendHeader<REPLY_BUF>
3698 };
3699
3700 self.entries[slot] = Some(CallbackMeta {
3701 offset,
3702 kind: EntryKind::ServiceClient,
3703 try_process: service_client_callback_try_process::<Svc, F, REPLY_BUF>,
3704 has_data: always_ready,
3705 pre_sample: no_pre_sample,
3706 invocation: InvocationMode::Always,
3707 drop_fn: drop_entry::<ServiceClientCallbackEntry<Svc, F, REPLY_BUF>>,
3708 });
3709 self.apply_node_default_sched(slot, node_id);
3710 Ok((HandleId(slot), hdr_ptr))
3711 }
3712
3713 // ========================================================================
3714 // Guard condition registration
3715 // ========================================================================
3716
3717 /// Register a guard condition with a callback.
3718 ///
3719 /// Returns both the [`HandleId`] for trigger configuration and a
3720 /// [`GuardConditionHandle`] for triggering from other threads.
3721 pub fn register_guard_condition<F>(
3722 &mut self,
3723 callback: F,
3724 ) -> Result<(HandleId, GuardConditionHandle), NodeError>
3725 where
3726 F: FnMut() + 'static,
3727 {
3728 let slot = self.next_entry_slot()?;
3729 let offset = self.arena_alloc::<GuardConditionEntry<F>>()?;
3730
3731 unsafe {
3732 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3733 let entry_ptr = arena_ptr.add(offset) as *mut GuardConditionEntry<F>;
3734 core::ptr::write(
3735 entry_ptr,
3736 GuardConditionEntry {
3737 flag: portable_atomic::AtomicBool::new(false),
3738 callback,
3739 },
3740 );
3741
3742 // Create a handle pointing to the flag in the arena
3743 let flag_ptr = &(*entry_ptr).flag as *const portable_atomic::AtomicBool;
3744 #[allow(unused_mut)]
3745 let mut guard_handle = GuardConditionHandle::new(flag_ptr);
3746 // Phase 124.B.5 — wire the wake callback so trigger()
3747 // also signals the executor's wake_cv.
3748 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
3749 {
3750 let ctx = self.wake_ctx_ptr();
3751 guard_handle.set_wake_cb(nros_rmw_runtime_wake_cb, ctx);
3752 }
3753
3754 self.entries[slot] = Some(CallbackMeta {
3755 offset,
3756 kind: EntryKind::GuardCondition,
3757 try_process: guard_try_process::<F>,
3758 has_data: guard_has_data::<F>,
3759 pre_sample: no_pre_sample,
3760 invocation: InvocationMode::OnNewData,
3761 drop_fn: drop_entry::<GuardConditionEntry<F>>,
3762 });
3763
3764 Ok((HandleId(slot), guard_handle))
3765 }
3766 }
3767
3768 // ========================================================================
3769 // Timer control methods
3770 // ========================================================================
3771
3772 /// Cancel a timer. A cancelled timer will not fire but still accumulates
3773 /// elapsed time. The timer can be restarted with [`reset_timer()`](Self::reset_timer).
3774 pub fn cancel_timer(&mut self, id: HandleId) -> Result<(), NodeError> {
3775 let meta = self
3776 .entries
3777 .get(id.0)
3778 .and_then(|e| e.as_ref())
3779 .ok_or(NodeError::BufferTooSmall)?;
3780 if !matches!(meta.kind, EntryKind::Timer) {
3781 return Err(NodeError::BufferTooSmall);
3782 }
3783 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3784 // SAFETY: meta.offset points to a valid TimerEntry<F> which shares
3785 // layout with TimerHeader for its initial fields (both #[repr(C)]).
3786 let header = unsafe { &mut *(arena_ptr.add(meta.offset) as *mut TimerHeader) };
3787 header.cancelled = true;
3788 Ok(())
3789 }
3790
3791 /// Reset a timer. Clears the cancelled state and resets the elapsed time
3792 /// to zero, so the timer starts a fresh period.
3793 pub fn reset_timer(&mut self, id: HandleId) -> Result<(), NodeError> {
3794 let meta = self
3795 .entries
3796 .get(id.0)
3797 .and_then(|e| e.as_ref())
3798 .ok_or(NodeError::BufferTooSmall)?;
3799 if !matches!(meta.kind, EntryKind::Timer) {
3800 return Err(NodeError::BufferTooSmall);
3801 }
3802 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3803 let header = unsafe { &mut *(arena_ptr.add(meta.offset) as *mut TimerHeader) };
3804 header.cancelled = false;
3805 header.elapsed_ms = 0;
3806 Ok(())
3807 }
3808
3809 /// Check if a timer is cancelled.
3810 pub fn timer_is_cancelled(&self, id: HandleId) -> bool {
3811 let meta = match self.entries.get(id.0).and_then(|e| e.as_ref()) {
3812 Some(m) if matches!(m.kind, EntryKind::Timer) => m,
3813 _ => return false,
3814 };
3815 let arena_ptr = self.arena.as_ptr() as *const u8;
3816 let header = unsafe { &*(arena_ptr.add(meta.offset) as *const TimerHeader) };
3817 header.cancelled
3818 }
3819
3820 /// Get the period of a timer in milliseconds, or `None` if the handle
3821 /// is not a valid timer.
3822 pub fn timer_period_ms(&self, id: HandleId) -> Option<u64> {
3823 let meta = self
3824 .entries
3825 .get(id.0)
3826 .and_then(|e| e.as_ref())
3827 .filter(|m| matches!(m.kind, EntryKind::Timer))?;
3828 let arena_ptr = self.arena.as_ptr() as *const u8;
3829 let header = unsafe { &*(arena_ptr.add(meta.offset) as *const TimerHeader) };
3830 Some(header.period_ms)
3831 }
3832
3833 // ========================================================================
3834 // spin_once (three-phase: readiness -> trigger -> dispatch)
3835 // ========================================================================
3836
3837 /// Drive I/O and dispatch registered callbacks once.
3838 ///
3839 /// Three-phase execution:
3840 /// 1. **Readiness scan** — query each handle's `has_data()`.
3841 /// 2. **Trigger evaluation** — check if the executor-level trigger passes.
3842 /// 3. **Dispatch** — invoke callbacks according to their `InvocationMode`.
3843 ///
3844 /// Returns a [`SpinOnceResult`] with counts of processed items and errors.
3845 ///
3846 /// # Arguments
3847 /// * `timeout` — upper bound on the I/O wait. Saturated at
3848 /// `i32::MAX` ms (~24 days) for the underlying transport call.
3849 ///
3850 /// Phase 84.D7: unified on `core::time::Duration`. The previous
3851 /// `timeout_ms: i32` signature had a latent footgun where
3852 /// `spin_once(-1)` silently froze timers while still polling I/O;
3853 /// `Duration` has no negative sentinel.
3854 pub fn spin_once(&mut self, timeout: core::time::Duration) -> SpinOnceResult {
3855 let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;
3856
3857 // Phase 110.0 — cap against the backend's next internal-event
3858 // deadline (lease keepalive, heartbeat, ACK-NACK timeout, ...).
3859 // Default backend impl returns `None`, so this is a no-op
3860 // unless the active backend opts in.
3861 #[allow(unused_variables)]
3862 let timeout_ms = match self.session.next_deadline_ms() {
3863 Some(next) => timeout_ms.min(next.min(i32::MAX as u32) as i32),
3864 None => timeout_ms,
3865 };
3866
3867 // Wall-clock-accurate timer accumulation. Measure real time
3868 // since the previous `spin_once` exited (or, on the first call,
3869 // since `drive_io` started). Two failure modes the requested
3870 // `timeout_ms` doesn't capture:
3871 // 1. `drive_io` returns early — e.g. zenoh-pico's condvar wakes
3872 // on data arrival, well under 1 ms.
3873 // 2. The caller spends time outside `spin_once` (explicit sleep,
3874 // ROS-2 cooperative scheduling, etc.) and that time should
3875 // still count toward timers.
3876 // Crediting the requested timeout to timers in either case ticks
3877 // them faster than wall-clock — observed as a 30 Hz control loop
3878 // overshooting to >200 Hz under sustained traffic. Carry the
3879 // sub-ms remainder across calls so precision is preserved.
3880 #[cfg(feature = "std")]
3881 let spin_start = std::time::Instant::now();
3882 #[cfg(not(feature = "std"))]
3883 let spin_start_us = self.clock_us_fn.map(|clock| clock());
3884
3885 // Phase 104.C.6 — shared executor wake. Swap-and-clear the
3886 // wake flag; if it was set before this `spin_once` entered,
3887 // skip the blocking wait on the primary session and poll
3888 // every session non-blockingly. Lets a wake signal from any
3889 // thread (or, post-104.C.6.b, any backend's vtable hook)
3890 // pre-empt whichever session the executor would otherwise
3891 // sleep on. Cost on the no-wake path is one atomic swap.
3892 #[cfg(feature = "std")]
3893 #[allow(unused_variables)]
3894 let was_woken = self
3895 .wake_flag
3896 .swap(false, std::sync::atomic::Ordering::SeqCst);
3897
3898 // Phase 124.B.4 — condvar-blocked wait.
3899 //
3900 // RT contract:
3901 // * cv.wait_timeout_while: bounded by `timeout_ms`.
3902 // Predicate is O(1) — one atomic swap + Instant::now.
3903 // No allocation. PI-mutex consideration: wake_mu held
3904 // only during predicate check (microseconds);
3905 // contended worst-case = notify_all execution time
3906 // (~10s of µs).
3907 // * Backend's `set_wake_callback`-installed cb is called
3908 // on async data arrival from its transport-notify path
3909 // (worker thread, ISR-safe variant via 124.B.7). The
3910 // runtime cb writes wake_flag + signals wake_cv,
3911 // unblocking this loop sub-poll-period.
3912 // * Poll-only backends (XRCE, bare-metal) leave the slot
3913 // NULL; the cv wait still fires on its deadline, then
3914 // drive_io(0) drains whatever the backend's internal
3915 // poll has buffered. Equivalent to their pre-124
3916 // behaviour minus the blocking wait inside drive_io.
3917 //
3918 // Lost-wakeup safe: SeqCst flag write happens-before
3919 // notify, and the waiter checks the flag under wake_mu in
3920 // the predicate. If wake fires between drain and cv.wait
3921 // entry, the predicate sees flag=true on first eval and
3922 // exits immediately.
3923 // Phase 130.4 — only sleep in the wake-primitive wait when a
3924 // backend actually installed `set_wake_callback`. Poll-only
3925 // backends (XRCE, current Cyclone / dust-DDS) leave the
3926 // vtable slot NULL → `has_async_wake == false` → drive_io
3927 // for the caller's full timeout instead of sleeping in a
3928 // never-signaled wait that starves reliable retransmission
3929 // (Phase 127.C.4 root cause: server's send_reply flushes
3930 // 100 ms once, then NodeWake.wait_ms(100) sleeps 100 ms
3931 // with zero session activity, so the agent's ACK arrives
3932 // into a stalled session and reliable redelivery never
3933 // fires). RTOS std builds with an event-driven backend
3934 // installed still use `NodeWake` (kernel-native binary
3935 // semaphore — honors its deadline, dodges Zephyr's libc
3936 // `pthread_cond_timedwait` hang); POSIX/macOS std keep
3937 // the existing `std::Condvar` path.
3938 // Phase 248 (C2) — platform-agnostic wake wait. The choice of
3939 // wait primitive is made at runtime from the platform vtable's
3940 // wake probe, NOT a compile-time per-RTOS `cfg`:
3941 //
3942 // * `node_wake.is_some()` → a kernel-native binary semaphore
3943 // (`nros_platform_wake_*`) is linked; block on it. It honors
3944 // its deadline (dodging e.g. Zephyr's libc
3945 // `pthread_cond_timedwait` hang) and `nros_rmw_runtime_wake_cb`
3946 // signals it on transport arrival.
3947 // * `node_wake.is_none()` → no platform wake primitive; fall
3948 // back to the std `Condvar` wake pair.
3949 //
3950 // Either way, only sleep in the wake-primitive/cv wait when a
3951 // backend actually installed `set_wake_callback`
3952 // (`has_async_wake`); poll-only backends (XRCE, current Cyclone)
3953 // leave the slot NULL and get a full-timeout `drive_io` so
3954 // reliable retransmission isn't starved (Phase 127.C.4).
3955 #[cfg(all(feature = "std", feature = "rmw-cffi"))]
3956 let primary_drive_timeout_ms = if let Some(wake) = self.node_wake.as_ref() {
3957 if !was_woken && self.has_async_wake {
3958 let _ = wake.wait_ms(timeout_ms as u32);
3959 // Clear any pending flag the cb set while we were
3960 // waiting; mirrors the std cv predicate's flag drain.
3961 let _ = self
3962 .wake_flag
3963 .swap(false, std::sync::atomic::Ordering::SeqCst);
3964 0
3965 } else {
3966 timeout_ms
3967 }
3968 } else {
3969 if !was_woken && self.has_async_wake {
3970 let dur = core::time::Duration::from_millis(timeout_ms as u64);
3971 // SAFETY-invariant: `wake_mu` guards `()` — it is purely the
3972 // companion mutex for `wake_cv`, protecting no shared state. A
3973 // poison (another thread panicked while holding it) cannot have
3974 // corrupted anything, so recover the guard rather than aborting
3975 // this hot spin loop.
3976 let g = self.wake_mu.lock().unwrap_or_else(|e| e.into_inner());
3977 let _ = self.wake_cv.wait_timeout_while(g, dur, |_| {
3978 !self
3979 .wake_flag
3980 .swap(false, std::sync::atomic::Ordering::SeqCst)
3981 });
3982 }
3983 // drive_io is non-blocking when the cv-wait above ran;
3984 // full-timeout otherwise so the transport's blocking recv
3985 // yields the thread instead of busy-spinning.
3986 if self.has_async_wake { 0 } else { timeout_ms }
3987 };
3988
3989 // std builds without rmw-cffi (mock-session tests, future
3990 // alternative backends) keep the original "drive_io is
3991 // non-blocking" assumption.
3992 #[cfg(all(feature = "std", not(feature = "rmw-cffi")))]
3993 let primary_drive_timeout_ms = 0;
3994
3995 // Phase 248 (C2) — no_std + alloc + rmw-cffi path. When a backend
3996 // installed the wake-cb (`has_async_wake_alloc`) and a platform
3997 // wake primitive is available (`node_wake_alloc.is_some()` — the
3998 // runtime vtable probe), block on `node_wake_alloc.wait_ms` so the
3999 // executor unblocks on transport arrival rather than relying on
4000 // drive_io's blocking recv for the full timeout. Then drive_io(0)
4001 // drains whatever the backend's poll path buffered. Platforms with
4002 // no wake primitive (bare-metal) fall through to the full timeout.
4003 #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
4004 let primary_drive_timeout_ms = {
4005 let was_woken_alloc = self
4006 .wake_flag_alloc
4007 .swap(false, portable_atomic::Ordering::SeqCst);
4008 if !was_woken_alloc
4009 && self.has_async_wake_alloc
4010 && let Some(wake) = self.node_wake_alloc.as_ref()
4011 {
4012 let _ = wake.wait_ms(timeout_ms as u32);
4013 // Drain any flag the cb set while we were waiting.
4014 let _ = self
4015 .wake_flag_alloc
4016 .swap(false, portable_atomic::Ordering::SeqCst);
4017 0
4018 } else {
4019 timeout_ms
4020 }
4021 };
4022
4023 // no_std without (alloc + rmw-cffi) keeps the legacy
4024 // full-timeout drive_io call.
4025 #[cfg(all(
4026 not(feature = "std"),
4027 not(all(feature = "alloc", feature = "rmw-cffi"))
4028 ))]
4029 let primary_drive_timeout_ms = timeout_ms;
4030
4031 let _ = self.session.drive_io(primary_drive_timeout_ms);
4032 for extra in self.extra_sessions.iter_mut() {
4033 let _ = extra.drive_io(0);
4034 }
4035
4036 #[cfg(feature = "std")]
4037 let delta_ms = {
4038 let now = std::time::Instant::now();
4039 // `last_spin_end` is seeded at construction time, so this
4040 // path always has a Some(_) on every call.
4041 let prev = self.last_spin_end.unwrap_or(spin_start);
4042 let elapsed = now.saturating_duration_since(prev);
4043 self.last_spin_end = Some(now);
4044 let total_us = self
4045 .spin_residual_us
4046 .saturating_add(elapsed.as_micros() as u64);
4047 let ms = total_us / 1000;
4048 self.spin_residual_us = total_us % 1000;
4049 ms
4050 };
4051 #[cfg(not(feature = "std"))]
4052 let delta_ms = if let Some(clock) = self.clock_us_fn {
4053 let now = clock();
4054 let prev = self
4055 .last_spin_end_us
4056 .unwrap_or_else(|| spin_start_us.unwrap_or(now));
4057 self.last_spin_end_us = Some(now);
4058 let elapsed_us = now.saturating_sub(prev);
4059 let total_us = self.spin_residual_us.saturating_add(elapsed_us);
4060 let ms = total_us / 1000;
4061 self.spin_residual_us = total_us % 1000;
4062 ms
4063 } else {
4064 timeout_ms as u64
4065 };
4066 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
4067
4068 // Phase 1: Readiness scan (Phase 110.A.b — backed by FifoReadySet).
4069 //
4070 // `bits` carries data-readiness only (used by trigger eval +
4071 // by `InvocationMode::OnNewData`). `always_mask` carries the
4072 // `InvocationMode::Always` entries that fire regardless of
4073 // data presence. The dispatcher drains
4074 // `FifoReadySet(bits | always_mask)` after the trigger
4075 // passes; `pop_next` yields registration order (lowest bit
4076 // first) so behavior is bit-identical to the pre-refactor
4077 // `for (i, meta) in entries.iter().enumerate()` loop.
4078 let mut bits: u64 = 0;
4079 let mut count: usize = 0;
4080 let mut non_timer_mask: u64 = 0;
4081 let mut always_mask: u64 = 0;
4082
4083 for (i, meta) in self.entries.iter().enumerate() {
4084 if let Some(meta) = meta {
4085 let data_ptr = unsafe { arena_ptr.add(meta.offset) as *const u8 };
4086 if unsafe { (meta.has_data)(data_ptr) } {
4087 bits |= 1u64 << i;
4088 }
4089 if !matches!(meta.kind, EntryKind::Timer | EntryKind::GuardCondition) {
4090 non_timer_mask |= 1u64 << i;
4091 }
4092 if matches!(meta.invocation, InvocationMode::Always) {
4093 always_mask |= 1u64 << i;
4094 }
4095 count += 1;
4096 }
4097 }
4098
4099 let snapshot = ReadinessSnapshot { bits, count };
4100
4101 // Phase 2: Trigger evaluation
4102 let trigger_passes = match &self.trigger {
4103 Trigger::Any => bits & non_timer_mask != 0 || non_timer_mask == 0,
4104 Trigger::All => bits & non_timer_mask == non_timer_mask,
4105 Trigger::One(id) => snapshot.is_ready(*id),
4106 Trigger::AllOf(set) => snapshot.all_ready(*set),
4107 Trigger::AnyOf(set) => snapshot.any_ready(*set),
4108 Trigger::Always => true,
4109 Trigger::Predicate(f) => f(&snapshot),
4110 Trigger::RawPredicate { callback, context } => {
4111 // Convert ReadinessSnapshot bitmask to a bool array for the C callback
4112 let mut ready_array = [false; 64];
4113 for (i, slot) in ready_array
4114 .iter_mut()
4115 .enumerate()
4116 .take(snapshot.count.min(64))
4117 {
4118 *slot = snapshot.bits & (1u64 << i) != 0;
4119 }
4120 // SAFETY: The callback and context are provided by the C API caller.
4121 // The ready_array is valid for snapshot.count elements.
4122 unsafe { callback(ready_array.as_ptr(), snapshot.count, *context) }
4123 }
4124 };
4125
4126 if !trigger_passes {
4127 // Timers still need delta accumulation even when trigger doesn't pass
4128 for meta in self.entries.iter().flatten() {
4129 if matches!(meta.kind, EntryKind::Timer) {
4130 let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4131 let _ = unsafe { (meta.try_process)(data_ptr, delta_ms) };
4132 }
4133 }
4134
4135 // Parameter services live outside the arena and must be processed
4136 // regardless of trigger state, otherwise ROS 2 param queries time out.
4137 #[cfg(feature = "param-services")]
4138 if let Some(params) = &mut self.params {
4139 {
4140 let crate::parameter_services::ParamState {
4141 server, services, ..
4142 } = &mut **params;
4143 let _ = services.process_services(server);
4144 }
4145 // Phase 172.H — persist any runtime override applied this tick.
4146 crate::parameter_services::flush_param_store(params);
4147 }
4148
4149 // Same treatment for lifecycle services — `ros2 lifecycle get`
4150 // must succeed even when no callbacks fired this tick.
4151 // SAFETY: see the matching invariant on the later call site.
4152 #[cfg(feature = "lifecycle-services")]
4153 if let Some(lc) = &mut self.lifecycle {
4154 let crate::lifecycle_services::LifecycleRuntimeState {
4155 state_machine,
4156 services,
4157 } = &mut **lc;
4158 let _ = unsafe { services.process_services(state_machine) };
4159 }
4160
4161 return SpinOnceResult::new();
4162 }
4163
4164 // Phase 2.5: LET pre-sample (only when LogicalExecutionTime)
4165 //
4166 // Sample all subscription data into entry buffers BEFORE dispatching
4167 // any callbacks. This ensures all callbacks in this cycle see a
4168 // consistent snapshot of data from the same point in time.
4169 // Services are NOT pre-sampled (request-reply is sequential).
4170 if matches!(self.semantics, ExecutorSemantics::LogicalExecutionTime) {
4171 for meta in self.entries.iter().flatten() {
4172 if matches!(meta.kind, EntryKind::Subscription) {
4173 let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4174 unsafe { (meta.pre_sample)(data_ptr) };
4175 }
4176 }
4177 }
4178
4179 // Phase 3: Dispatch (Phase 110.C — bucketed by SC.priority).
4180 //
4181 // Two ready-set families, each split across `Priority::COUNT`
4182 // buckets (Critical / Normal / BestEffort). Per-entry SC
4183 // `class` selects FIFO bitmap vs EDF heap; SC `priority`
4184 // selects the bucket within. Drain order:
4185 // for each bucket in priority order (Critical first):
4186 // drain EDF heap (deadline-priority), then FIFO bitmap
4187 // (registration-order)
4188 // Default workloads — every entry on the auto-default Fifo SC
4189 // (Normal priority) — populate only `fifo[Normal]`, so
4190 // dispatch order is bit-identical to 110.B.b for those.
4191 const NB: usize = super::sched_context::Priority::COUNT;
4192 let mut result = SpinOnceResult::new();
4193 let mut fifo: super::ready_set::BucketedFifoSet<NB, { crate::config::MAX_CBS }> =
4194 super::ready_set::BucketedFifoSet::new();
4195 let mut edf: super::ready_set::BucketedEdfSet<NB, { crate::config::MAX_CBS }> =
4196 super::ready_set::BucketedEdfSet::new();
4197 let active_mask = bits | always_mask;
4198
4199 // Phase 110.E — refill any Sporadic SC budgets at period
4200 // boundaries before deciding what to dispatch this cycle.
4201 // Refill is polled (not ISR-driven) — coarse but correct
4202 // upper-bound bandwidth limiter.
4203 #[cfg(feature = "std")]
4204 {
4205 // Monotonic ms relative to a process-static epoch so the
4206 // refill clock survives wall-clock jumps.
4207 use std::sync::OnceLock;
4208 static EPOCH: OnceLock<std::time::Instant> = OnceLock::new();
4209 let now_ms = std::time::Instant::now()
4210 .saturating_duration_since(*EPOCH.get_or_init(std::time::Instant::now))
4211 .as_millis() as u64;
4212 // Use the cycle's `delta_ms` as the per-SC consumption
4213 // estimate — worst-case attribution. Per-callback
4214 // measurement lands with a higher-precision clock hook.
4215 let delta_us = (delta_ms as u32).saturating_mul(1000);
4216 for slot in self.sporadic_states.iter_mut().flatten() {
4217 let _ = slot.tick(now_ms, delta_us);
4218 }
4219 }
4220
4221 for i in 0..crate::config::MAX_CBS {
4222 if active_mask & (1u64 << i) == 0 {
4223 continue;
4224 }
4225 let sc_idx = self.sched_context_bindings[i].0 as usize;
4226 let sc_class_priority_deadline = self
4227 .sched_contexts
4228 .get(sc_idx)
4229 .and_then(|s| s.as_ref())
4230 .map(|sc| {
4231 (
4232 sc.class,
4233 sc.priority.index(),
4234 sc.deadline_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX),
4235 )
4236 });
4237 let (sc_class, bucket, deadline_us) = sc_class_priority_deadline.unwrap_or((
4238 super::sched_context::SchedClass::Fifo,
4239 super::sched_context::Priority::Normal.index(),
4240 u32::MAX,
4241 ));
4242 // Phase 110.E — Sporadic SC dispatch is suppressed when
4243 // its budget is exhausted. Atomic path (110.E.b PlatformTimer
4244 // refill) takes precedence when registered; polled path
4245 // (cycle-level delta_us attribution) handles the unregistered
4246 // case. Either way, exhausted budget skips dispatch.
4247 if matches!(sc_class, super::sched_context::SchedClass::Sporadic) {
4248 #[cfg(feature = "alloc")]
4249 let atomic_has_budget = self
4250 .sporadic_atomic_states
4251 .get(sc_idx)
4252 .and_then(|s| s.as_ref())
4253 .map(|(state, _)| state.has_budget());
4254 #[cfg(not(feature = "alloc"))]
4255 let atomic_has_budget: Option<bool> = None;
4256 let has_budget = match atomic_has_budget {
4257 Some(b) => b,
4258 None => self
4259 .sporadic_states
4260 .get(sc_idx)
4261 .and_then(|s| s.as_ref())
4262 .map(|s| s.budget_remaining_us > 0)
4263 .unwrap_or(true),
4264 };
4265 if !has_budget {
4266 continue;
4267 }
4268 // Phase 110.E.b follow-up — per-callback runtime
4269 // accounting (replaces this cycle-level attribution)
4270 // is applied at dispatch time below via
4271 // `consume_dispatch_runtime_us`. We only update the
4272 // polled-path `SporadicState` (no_std fallback) here
4273 // because the atomic path now records actual
4274 // wall-clock per-callback runtime. The
4275 // `delta_us` over-attribution that previously hit the
4276 // atomic state was a worst-case bandwidth limiter;
4277 // per-callback measurement is strictly tighter.
4278 #[cfg(not(feature = "alloc"))]
4279 {
4280 let _ = sc_idx; // polled-state path lives in
4281 // `sporadic_states`; this branch is a no-op when
4282 // the atomic path is enabled.
4283 }
4284 }
4285 // Phase 110.F — per-callback OS priority routing. Entries
4286 // bound to an SC with `os_pri > 0` dispatch onto a worker
4287 // thread the OS has elevated to that priority; the
4288 // cooperative path is skipped for those entries. Workers
4289 // are spawned lazily.
4290 #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
4291 {
4292 let os_pri = self
4293 .sched_contexts
4294 .get(sc_idx)
4295 .and_then(|s| s.as_ref())
4296 .map(|sc| sc.os_pri)
4297 .unwrap_or(0);
4298 if os_pri > 0
4299 && let Some(apply_policy) = self.os_priority_apply_policy
4300 {
4301 let worker = self
4302 .os_priority_workers
4303 .entry(os_pri)
4304 .or_insert_with(|| OsPriorityWorker::spawn(os_pri, apply_policy));
4305 if let Some(meta) = self.entries[i].as_ref() {
4306 let _ = worker.try_dispatch(WorkItem {
4307 arena_base: arena_ptr as usize,
4308 arena_offset: meta.offset,
4309 try_process: meta.try_process,
4310 delta_ms,
4311 });
4312 }
4313 continue;
4314 }
4315 }
4316 // Phase 110.G — TT window gate, orthogonal to class.
4317 // Skips dispatch when the SC has a TT window AND the
4318 // current monotonic time is outside it. Both gates apply
4319 // independently — a Sporadic SC with a TT window must
4320 // pass both.
4321 if self.major_frame_us > 0 {
4322 let sc_opt = self.sched_contexts.get(sc_idx).and_then(|s| s.as_ref());
4323 if let Some(sc) = sc_opt {
4324 let off = sc.tt_window_offset_us.get().map(|nz| nz.get()).unwrap_or(0);
4325 let dur = sc
4326 .tt_window_duration_us
4327 .get()
4328 .map(|nz| nz.get())
4329 .unwrap_or(0);
4330 if dur > 0 {
4331 // Compute current phase within the major
4332 // frame using the accumulated `delta_ms` clock
4333 // (std-only precise; no_std uses `delta_ms`
4334 // approximation from spin cadence).
4335 #[cfg(feature = "std")]
4336 let now_us = {
4337 use std::sync::OnceLock;
4338 static EPOCH: OnceLock<std::time::Instant> = OnceLock::new();
4339 std::time::Instant::now()
4340 .saturating_duration_since(
4341 *EPOCH.get_or_init(std::time::Instant::now),
4342 )
4343 .as_micros() as u64
4344 };
4345 #[cfg(not(feature = "std"))]
4346 let now_us = delta_ms.saturating_mul(1000);
4347 let phase = (now_us % self.major_frame_us as u64) as u32;
4348 let in_window = if off + dur <= self.major_frame_us {
4349 phase >= off && phase < off + dur
4350 } else {
4351 // Window wraps the major frame boundary.
4352 let end = (off as u64 + dur as u64) % self.major_frame_us as u64;
4353 phase >= off || (phase as u64) < end
4354 };
4355 if !in_window {
4356 continue;
4357 }
4358 }
4359 }
4360 }
4361 let is_edf = matches!(sc_class, super::sched_context::SchedClass::Edf);
4362 let job = super::types::ActiveJob {
4363 sort_key: if is_edf { deadline_us } else { i as u32 },
4364 desc_idx: i as super::types::DescIdx,
4365 };
4366 if is_edf {
4367 let _ = edf.insert_into(bucket, job);
4368 } else {
4369 let _ = fifo.insert_into(bucket, job);
4370 }
4371 }
4372
4373 // SAFETY: each `desc_idx` we pop was set above only when the
4374 // corresponding `entries[i]` slot was `Some`; no Executor
4375 // mutation happens between that scan and this dispatch.
4376 let dispatch_one = |meta: &CallbackMeta,
4377 arena_ptr: *mut u8,
4378 delta_ms: u64,
4379 result: &mut SpinOnceResult| {
4380 // Phase 141.B.2 — capture T1 at subscription dispatch
4381 // entry. Probe pairs it with the most recent T0 from
4382 // `nros_rmw_runtime_wake_cb` (std + alloc variants)
4383 // and pushes `T1 - T0` onto the ring buffer 141.C
4384 // drains. No-op when the probe feature is off or
4385 // no cycle reader is installed. Other entry kinds
4386 // (Service / Timer / GuardCondition) skip the probe
4387 // because the 141 acceptance is specifically
4388 // wake-to-subscription-dispatch latency.
4389 #[cfg(feature = "wake-latency-probe")]
4390 if matches!(meta.kind, EntryKind::Subscription) {
4391 super::wake_probe::on_dispatch();
4392 }
4393 let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4394 match unsafe { (meta.try_process)(data_ptr, delta_ms) } {
4395 Ok(true) => match meta.kind {
4396 EntryKind::Subscription => result.subscriptions_processed += 1,
4397 EntryKind::Service
4398 | EntryKind::ServiceClient
4399 | EntryKind::ActionServer
4400 | EntryKind::ActionClient => result.services_handled += 1,
4401 EntryKind::Timer => result.timers_fired += 1,
4402 EntryKind::GuardCondition => {}
4403 },
4404 Ok(false) => {}
4405 Err(_) => match meta.kind {
4406 EntryKind::Subscription => result.subscription_errors += 1,
4407 EntryKind::Service
4408 | EntryKind::ServiceClient
4409 | EntryKind::ActionServer
4410 | EntryKind::ActionClient => result.service_errors += 1,
4411 EntryKind::Timer | EntryKind::GuardCondition => {}
4412 },
4413 }
4414 };
4415
4416 // Phase 110.E.b follow-up — per-callback runtime accounting
4417 // for Sporadic SCs. Wall-clock-measure each dispatch and
4418 // consume the elapsed microseconds from the bound SC's
4419 // atomic budget. This replaces the cycle-level over-
4420 // attribution that previously charged the FULL `delta_us`
4421 // against every Sporadic SC regardless of which entries
4422 // actually fired — accurate per-callback measurement is the
4423 // shape the design doc's per-callback runtime acceptance
4424 // calls out. The closure is `feature = "std"`-gated because
4425 // it needs a `core::time::Instant`-equivalent monotonic
4426 // clock; the no_std fallback continues to use the polled
4427 // `SporadicState` path (cycle delta_us) until a board-side
4428 // monotonic-microsecond accessor lands.
4429 #[cfg(feature = "std")]
4430 let consume_dispatch_runtime_us =
4431 |desc_idx: usize,
4432 elapsed_us: u32,
4433 sched_context_bindings: &[super::sched_context::SchedContextId;
4434 crate::config::MAX_CBS],
4435 sched_contexts: &[Option<super::sched_context::SchedContext>;
4436 crate::config::MAX_SC],
4437 #[cfg(feature = "alloc")] sporadic_atomic_states: &[Option<(
4438 portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>,
4439 OpaqueTimerHandle,
4440 )>;
4441 crate::config::MAX_SC]| {
4442 let sc_idx = sched_context_bindings[desc_idx].0 as usize;
4443 let sc_class = sched_contexts
4444 .get(sc_idx)
4445 .and_then(|s| s.as_ref())
4446 .map(|sc| sc.class)
4447 .unwrap_or(super::sched_context::SchedClass::Fifo);
4448 if !matches!(sc_class, super::sched_context::SchedClass::Sporadic) {
4449 return;
4450 }
4451 #[cfg(feature = "alloc")]
4452 if let Some((state, _)) =
4453 sporadic_atomic_states.get(sc_idx).and_then(|s| s.as_ref())
4454 {
4455 state.consume(elapsed_us);
4456 // Phase 110.E.b — overrun detection. Cooperative
4457 // single-thread can't preempt a runaway callback,
4458 // so post-dispatch wall-clock comparison delivers
4459 // the same observable signal as the design's
4460 // oneshot-IRQ-and-cancel pattern, without needing
4461 // a separate timer per SC. `budget_capacity_us` is
4462 // the per-period budget the SC was sized against;
4463 // any callback exceeding that has run past its
4464 // bandwidth allotment.
4465 if elapsed_us > state.budget_capacity_us {
4466 state.record_overrun(elapsed_us - state.budget_capacity_us);
4467 }
4468 }
4469 #[cfg(not(feature = "alloc"))]
4470 {
4471 let _ = (sc_idx, elapsed_us);
4472 }
4473 };
4474
4475 // For each priority bucket (Critical → Normal → BestEffort),
4476 // drain EDF first then FIFO so an EDF callback in this bucket
4477 // beats a FIFO peer at the same priority, but no lower-priority
4478 // entry runs while a higher-priority bucket has work pending.
4479 // Strict static priority across buckets; non-preemptive within
4480 // an in-flight callback (see Phase 110.D).
4481 for bucket in 0..NB {
4482 while let Some(job) = edf.pop_from(bucket) {
4483 let i = job.desc_idx as usize;
4484 if let Some(meta) = self.entries[i].as_ref() {
4485 #[cfg(feature = "std")]
4486 let start = std::time::Instant::now();
4487 dispatch_one(meta, arena_ptr, delta_ms, &mut result);
4488 #[cfg(feature = "std")]
4489 {
4490 let elapsed_us = start.elapsed().as_micros().min(u32::MAX as u128) as u32;
4491 consume_dispatch_runtime_us(
4492 i,
4493 elapsed_us,
4494 &self.sched_context_bindings,
4495 &self.sched_contexts,
4496 #[cfg(feature = "alloc")]
4497 &self.sporadic_atomic_states,
4498 );
4499 }
4500 }
4501 }
4502 while let Some(job) = fifo.pop_from(bucket) {
4503 let i = job.desc_idx as usize;
4504 if let Some(meta) = self.entries[i].as_ref() {
4505 #[cfg(feature = "std")]
4506 let start = std::time::Instant::now();
4507 dispatch_one(meta, arena_ptr, delta_ms, &mut result);
4508 #[cfg(feature = "std")]
4509 {
4510 let elapsed_us = start.elapsed().as_micros().min(u32::MAX as u128) as u32;
4511 consume_dispatch_runtime_us(
4512 i,
4513 elapsed_us,
4514 &self.sched_context_bindings,
4515 &self.sched_contexts,
4516 #[cfg(feature = "alloc")]
4517 &self.sporadic_atomic_states,
4518 );
4519 }
4520 }
4521 }
4522 }
4523
4524 // Process parameter services (outside the arena)
4525 #[cfg(feature = "param-services")]
4526 if let Some(params) = &mut self.params {
4527 {
4528 let crate::parameter_services::ParamState {
4529 server, services, ..
4530 } = &mut **params;
4531 if let Ok(n) = services.process_services(server) {
4532 result.services_handled += n;
4533 }
4534 }
4535 // Phase 172.H — persist any runtime override applied this tick.
4536 crate::parameter_services::flush_param_store(params);
4537 }
4538
4539 // Process lifecycle services (outside the arena).
4540 //
4541 // SAFETY: `change_state` dispatches a user-supplied C callback through a
4542 // raw function pointer stored in `LifecyclePollingNodeCtx`. The caller
4543 // of `register_lifecycle_services` guarantees the callback/context pair
4544 // stays live for as long as the executor (see that method's docs).
4545 #[cfg(feature = "lifecycle-services")]
4546 if let Some(lc) = &mut self.lifecycle {
4547 let crate::lifecycle_services::LifecycleRuntimeState {
4548 state_machine,
4549 services,
4550 } = &mut **lc;
4551 if let Ok(n) = unsafe { services.process_services(state_machine) } {
4552 result.services_handled += n;
4553 }
4554 }
4555
4556 // Phase 258 (Track 2, 2a) — executor-owned component tick pass.
4557 // Mirrors `ExecutorNodeRuntime::run_ticks`: after the transport +
4558 // callbacks have been pumped, drive each enrolled component's `tick`
4559 // (service-client/action poll, etc.). `exec_ctx` hands the component
4560 // the whole executor as a raw `*mut Executor` so its tick can
4561 // reborrow it (the same disjoint-field raw-ptr pattern run_ticks
4562 // uses). Index-iterate over `Copy` slots so no borrow of
4563 // `self.component_slots` is held while `tick` runs (which aliases
4564 // `self` through `exec_ctx`).
4565 let exec_ctx = self as *mut Executor as *mut core::ffi::c_void;
4566 let slot_count = self.component_slots.len();
4567 for i in 0..slot_count {
4568 let slot = self.component_slots[i];
4569 // SAFETY: `slot.state` is the leaked cell the matching `tick`
4570 // expects (enrolled via `enroll_component`); `exec_ctx` is a live
4571 // `*mut Executor` for `self`. The slot was copied out, so no
4572 // borrow of `component_slots` is outstanding during the call.
4573 unsafe {
4574 (slot.tick)(slot.state, exec_ctx);
4575 }
4576 }
4577
4578 result
4579 }
4580
4581 /// Drive I/O and dispatch callbacks in an infinite loop.
4582 ///
4583 /// Each iteration calls [`spin_once(timeout_ms)`](Self::spin_once),
4584 /// which pumps the transport and dispatches all registered callbacks.
4585 ///
4586 /// This is the primary run loop for embedded applications:
4587 ///
4588 /// ```ignore
4589 /// let mut executor = Executor::open(&config)?;
4590 /// executor.register_subscription::<Int32, _>("/topic", |msg| { /* ... */ })?;
4591 /// executor.spin(10); // never returns
4592 /// ```
4593 pub fn spin(&mut self, timeout: core::time::Duration) -> ! {
4594 loop {
4595 self.spin_once(timeout);
4596 }
4597 }
4598
4599 /// Phase 104.C.3.3.c — rclcpp-`spin()`-shape no-arg variant.
4600 /// Defaults the per-iteration timeout to 50 ms, which keeps
4601 /// idle binaries from busy-spinning while staying responsive
4602 /// enough for default-QoS messaging.
4603 pub fn spin_default(&mut self) -> ! {
4604 self.spin(core::time::Duration::from_millis(50))
4605 }
4606
4607 /// Drive I/O and dispatch callbacks asynchronously.
4608 ///
4609 /// Runs forever, yielding between poll cycles so that other async tasks
4610 /// (e.g., [`Promise`](super::handles::Promise)) can make progress.
4611 ///
4612 /// Uses only `core::future` — no external async runtime dependency.
4613 ///
4614 /// # Usage patterns
4615 ///
4616 /// ```ignore
4617 /// // Pattern 1: select with a promise (embassy-futures)
4618 /// use embassy_futures::select::{select, Either};
4619 /// let promise = client.call(&req)?;
4620 /// let Either::Second(reply) = select(executor.spin_async(), promise).await
4621 /// else { unreachable!() };
4622 ///
4623 /// // Pattern 2: manual polling (no async runtime)
4624 /// let mut promise = client.call(&req)?;
4625 /// loop {
4626 /// executor.spin_once(core::time::Duration::from_millis(10));
4627 /// if let Ok(Some(r)) = promise.try_recv() { break r; }
4628 /// }
4629 /// ```
4630 pub async fn spin_async(&mut self) -> ! {
4631 loop {
4632 self.spin_once(core::time::Duration::from_millis(1));
4633 core::future::poll_fn::<(), _>(|cx| {
4634 cx.waker().wake_by_ref();
4635 core::task::Poll::Pending
4636 })
4637 .await;
4638 }
4639 }
4640
4641 // ========================================================================
4642 // spin_one_period (no_std)
4643 // ========================================================================
4644
4645 /// Process one iteration and return remaining sleep time.
4646 ///
4647 /// This is `no_std` compatible — the caller is responsible for the actual
4648 /// delay using platform-specific sleep.
4649 ///
4650 /// # Arguments
4651 /// * `period_ms` - Target period in milliseconds
4652 /// * `elapsed_ms` - Time elapsed since last call (used for timer ticking)
4653 ///
4654 /// # Example
4655 ///
4656 /// ```ignore
4657 /// loop {
4658 /// let r = executor.spin_one_period(10, elapsed_ms);
4659 /// platform_sleep_ms(r.remaining_ms);
4660 /// }
4661 /// ```
4662 pub fn spin_one_period(&mut self, period_ms: u64, elapsed_ms: u64) -> SpinPeriodPollingResult {
4663 let result = self.spin_once(core::time::Duration::from_millis(elapsed_ms));
4664 SpinPeriodPollingResult {
4665 work: result,
4666 remaining_ms: period_ms.saturating_sub(elapsed_ms),
4667 }
4668 }
4669}
4670
4671// ============================================================================
4672// Parameter services (cfg param-services)
4673// ============================================================================
4674
4675#[cfg(feature = "param-services")]
4676impl Executor {
4677 /// Register the 6 ROS 2 parameter services for this node.
4678 ///
4679 /// Creates service servers for `get_parameters`, `set_parameters`,
4680 /// `set_parameters_atomically`, `list_parameters`, `describe_parameters`,
4681 /// and `get_parameter_types`.
4682 ///
4683 /// The service names follow the ROS 2 convention: `/{namespace}/{node_name}/{suffix}`.
4684 /// For the default namespace `/`, this becomes `/{node_name}/{suffix}` (e.g.
4685 /// `/sentinel/list_parameters`).
4686 ///
4687 /// Parameter services are stored outside the arena and don't consume
4688 /// callback slots.
4689 ///
4690 /// # Example
4691 ///
4692 /// ```ignore
4693 /// let config = ExecutorConfig::from_env().node_name("talker");
4694 /// let mut executor = Executor::open(&config)?;
4695 /// executor.register_parameter_services()?;
4696 /// executor.declare_parameter("start_value", ParameterValue::Integer(0));
4697 /// ```
4698 pub fn register_parameter_services(&mut self) -> Result<(), NodeError> {
4699 use crate::parameter_services::{
4700 DescribeParameters, GetParameterTypes, GetParameters, ListParameters,
4701 PARAM_SERVICE_BUFFER_SIZE, ParameterServiceServers, SetParameters,
4702 SetParametersAtomically,
4703 };
4704 use nros_core::RosService;
4705
4706 type PSrv<Svc> = super::handles::EmbeddedServiceServer<
4707 Svc,
4708 PARAM_SERVICE_BUFFER_SIZE,
4709 PARAM_SERVICE_BUFFER_SIZE,
4710 >;
4711
4712 // Build the node FQN from namespace + node_name, following ROS 2 convention.
4713 // Default namespace "/" → "/{node_name}"; otherwise "/{namespace}/{node_name}".
4714 let mut node_fqn = heapless::String::<256>::new();
4715 let ns: &str = &self.namespace;
4716 let nn: &str = &self.node_name;
4717 if ns.is_empty() || ns == "/" {
4718 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4719 node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4720 } else {
4721 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4722 node_fqn
4723 .push_str(ns.trim_matches('/'))
4724 .map_err(|_| NodeError::NameTooLong)?;
4725 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4726 node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4727 }
4728
4729 /// Build a service name like `{node_fqn}/{suffix}` and create the server handle.
4730 fn create_param_srv<Svc: RosService>(
4731 session: &mut session::ConcreteSession,
4732 node_fqn: &str,
4733 namespace: &str,
4734 node_name: &str,
4735 suffix: &str,
4736 ) -> Result<session::RmwServiceServer, NodeError> {
4737 let mut name = heapless::String::<256>::new();
4738 name.push_str(node_fqn)
4739 .map_err(|_| NodeError::NameTooLong)?;
4740 name.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4741 name.push_str(suffix).map_err(|_| NodeError::NameTooLong)?;
4742 let mut info = ServiceInfo::new(&name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
4743 .with_namespace(namespace);
4744 if !node_name.is_empty() {
4745 info = info.with_node_name(node_name);
4746 }
4747 session
4748 .create_service_server(&info, QosSettings::services_default())
4749 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))
4750 }
4751
4752 let get_handle = create_param_srv::<GetParameters>(
4753 &mut self.session,
4754 &node_fqn,
4755 ns,
4756 nn,
4757 "get_parameters",
4758 )?;
4759 let set_handle = create_param_srv::<SetParameters>(
4760 &mut self.session,
4761 &node_fqn,
4762 ns,
4763 nn,
4764 "set_parameters",
4765 )?;
4766 let set_atomic_handle = create_param_srv::<SetParametersAtomically>(
4767 &mut self.session,
4768 &node_fqn,
4769 ns,
4770 nn,
4771 "set_parameters_atomically",
4772 )?;
4773 let list_handle = create_param_srv::<ListParameters>(
4774 &mut self.session,
4775 &node_fqn,
4776 ns,
4777 nn,
4778 "list_parameters",
4779 )?;
4780 let desc_handle = create_param_srv::<DescribeParameters>(
4781 &mut self.session,
4782 &node_fqn,
4783 ns,
4784 nn,
4785 "describe_parameters",
4786 )?;
4787 let types_handle = create_param_srv::<GetParameterTypes>(
4788 &mut self.session,
4789 &node_fqn,
4790 ns,
4791 nn,
4792 "get_parameter_types",
4793 )?;
4794
4795 let servers = ParameterServiceServers::new(
4796 PSrv::<GetParameters> {
4797 handle: get_handle,
4798 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4799 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4800 _phantom: core::marker::PhantomData,
4801 },
4802 PSrv::<SetParameters> {
4803 handle: set_handle,
4804 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4805 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4806 _phantom: core::marker::PhantomData,
4807 },
4808 PSrv::<SetParametersAtomically> {
4809 handle: set_atomic_handle,
4810 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4811 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4812 _phantom: core::marker::PhantomData,
4813 },
4814 PSrv::<ListParameters> {
4815 handle: list_handle,
4816 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4817 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4818 _phantom: core::marker::PhantomData,
4819 },
4820 PSrv::<DescribeParameters> {
4821 handle: desc_handle,
4822 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4823 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4824 _phantom: core::marker::PhantomData,
4825 },
4826 PSrv::<GetParameterTypes> {
4827 handle: types_handle,
4828 req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4829 reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4830 _phantom: core::marker::PhantomData,
4831 },
4832 );
4833
4834 self.params = Some(alloc::boxed::Box::new(
4835 crate::parameter_services::ParamState {
4836 server: nros_params::ParameterServer::new(),
4837 services: alloc::boxed::Box::new(servers),
4838 store: alloc::boxed::Box::new(nros_params::NullParamStore),
4839 },
4840 ));
4841
4842 Ok(())
4843 }
4844
4845 /// Phase 172.H — attach a parameter-override persistence backend.
4846 ///
4847 /// Call this **after** [`register_parameter_services`](Self::register_parameter_services)
4848 /// and after declaring the plan's default parameters, so persisted
4849 /// overrides win over compile-time defaults. It immediately overlays any
4850 /// values the backend already holds onto the declared parameters, then
4851 /// keeps the store to flush future runtime `set_parameters` changes (the
4852 /// executor flushes from its spin loop whenever a value changed).
4853 ///
4854 /// Returns [`NodeError::NotInitialized`] if parameter services have not
4855 /// been registered yet.
4856 pub fn enable_parameter_persistence(
4857 &mut self,
4858 store: alloc::boxed::Box<dyn nros_params::ParamStore>,
4859 ) -> Result<(), NodeError> {
4860 let state = self.params.as_mut().ok_or(NodeError::NotInitialized)?;
4861 // Overlay persisted overrides onto the declared defaults.
4862 store.load(&mut |name, value| {
4863 let _ = state.server.set(name, value);
4864 });
4865 // Loading persisted state is restoration, not a new runtime change —
4866 // don't let it trigger an immediate re-flush.
4867 state.server.take_dirty();
4868 state.store = store;
4869 Ok(())
4870 }
4871
4872 /// Phase 172.H — like [`enable_parameter_persistence`](Self::enable_parameter_persistence)
4873 /// but boxes the backend for you, so callers (and generated code) need no
4874 /// `Box` import.
4875 pub fn enable_parameter_persistence_with<S>(&mut self, store: S) -> Result<(), NodeError>
4876 where
4877 S: nros_params::ParamStore + 'static,
4878 {
4879 self.enable_parameter_persistence(alloc::boxed::Box::new(store))
4880 }
4881}
4882
4883// ============================================================================
4884// Lifecycle services (cfg lifecycle-services)
4885// ============================================================================
4886
4887#[cfg(feature = "lifecycle-services")]
4888impl Executor {
4889 /// Register the five REP-2002 lifecycle services on this executor.
4890 ///
4891 /// After this call, `ros2 lifecycle set|get|list|nodes` can drive the
4892 /// stored [`LifecyclePollingNodeCtx`](crate::lifecycle::LifecyclePollingNodeCtx)
4893 /// through the node's lifecycle. The state machine is created fresh
4894 /// (starting in `Unconfigured`); callers register their transition
4895 /// callbacks via [`Executor::lifecycle_state_machine_mut`].
4896 ///
4897 /// # Safety
4898 /// Registered callbacks on the state machine are C FFI function pointers.
4899 /// The caller must keep the callback code and any context it captures
4900 /// valid for as long as the executor processes services.
4901 pub fn register_lifecycle_services(&mut self) -> Result<(), NodeError> {
4902 use crate::{
4903 lifecycle::LifecyclePollingNodeCtx,
4904 lifecycle_services::{
4905 ChangeState, GetAvailableStates, GetAvailableTransitions, GetState,
4906 LIFECYCLE_SERVICE_BUFFER_SIZE, LifecycleRuntimeState, LifecycleServiceServers,
4907 },
4908 };
4909 use nros_core::RosService;
4910
4911 type LcSrv<Svc> = super::handles::EmbeddedServiceServer<
4912 Svc,
4913 LIFECYCLE_SERVICE_BUFFER_SIZE,
4914 LIFECYCLE_SERVICE_BUFFER_SIZE,
4915 >;
4916
4917 // Build the node FQN from namespace + node_name (same convention as
4918 // register_parameter_services).
4919 let mut node_fqn = heapless::String::<256>::new();
4920 let ns: &str = &self.namespace;
4921 let nn: &str = &self.node_name;
4922 if ns.is_empty() || ns == "/" {
4923 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4924 node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4925 } else {
4926 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4927 node_fqn
4928 .push_str(ns.trim_matches('/'))
4929 .map_err(|_| NodeError::NameTooLong)?;
4930 node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4931 node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4932 }
4933
4934 fn create_lc_srv<Svc: RosService>(
4935 session: &mut session::ConcreteSession,
4936 node_fqn: &str,
4937 namespace: &str,
4938 node_name: &str,
4939 suffix: &str,
4940 ) -> Result<session::RmwServiceServer, NodeError> {
4941 let mut name = heapless::String::<256>::new();
4942 name.push_str(node_fqn)
4943 .map_err(|_| NodeError::NameTooLong)?;
4944 name.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4945 name.push_str(suffix).map_err(|_| NodeError::NameTooLong)?;
4946 let mut info = ServiceInfo::new(&name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
4947 .with_namespace(namespace);
4948 if !node_name.is_empty() {
4949 info = info.with_node_name(node_name);
4950 }
4951 session
4952 .create_service_server(&info, QosSettings::services_default())
4953 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))
4954 }
4955
4956 let cs_handle =
4957 create_lc_srv::<ChangeState>(&mut self.session, &node_fqn, ns, nn, "change_state")?;
4958 let gs_handle =
4959 create_lc_srv::<GetState>(&mut self.session, &node_fqn, ns, nn, "get_state")?;
4960 let gas_handle = create_lc_srv::<GetAvailableStates>(
4961 &mut self.session,
4962 &node_fqn,
4963 ns,
4964 nn,
4965 "get_available_states",
4966 )?;
4967 let gat_handle = create_lc_srv::<GetAvailableTransitions>(
4968 &mut self.session,
4969 &node_fqn,
4970 ns,
4971 nn,
4972 "get_available_transitions",
4973 )?;
4974 let gtg_handle = create_lc_srv::<GetAvailableTransitions>(
4975 &mut self.session,
4976 &node_fqn,
4977 ns,
4978 nn,
4979 "get_transition_graph",
4980 )?;
4981
4982 let servers = LifecycleServiceServers::new(
4983 LcSrv::<ChangeState> {
4984 handle: cs_handle,
4985 req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4986 reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4987 _phantom: core::marker::PhantomData,
4988 },
4989 LcSrv::<GetState> {
4990 handle: gs_handle,
4991 req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4992 reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4993 _phantom: core::marker::PhantomData,
4994 },
4995 LcSrv::<GetAvailableStates> {
4996 handle: gas_handle,
4997 req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4998 reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4999 _phantom: core::marker::PhantomData,
5000 },
5001 LcSrv::<GetAvailableTransitions> {
5002 handle: gat_handle,
5003 req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5004 reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5005 _phantom: core::marker::PhantomData,
5006 },
5007 LcSrv::<GetAvailableTransitions> {
5008 handle: gtg_handle,
5009 req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5010 reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5011 _phantom: core::marker::PhantomData,
5012 },
5013 );
5014
5015 self.lifecycle = Some(alloc::boxed::Box::new(LifecycleRuntimeState {
5016 state_machine: LifecyclePollingNodeCtx::new(),
5017 services: alloc::boxed::Box::new(servers),
5018 }));
5019
5020 Ok(())
5021 }
5022
5023 /// Mutable access to the lifecycle state machine, if registered.
5024 ///
5025 /// Used to register transition callbacks before spinning and to read the
5026 /// current state from application code.
5027 pub fn lifecycle_state_machine_mut(
5028 &mut self,
5029 ) -> Option<&mut crate::lifecycle::LifecyclePollingNodeCtx> {
5030 self.lifecycle.as_mut().map(|lc| &mut lc.state_machine)
5031 }
5032
5033 /// Immutable access to the lifecycle state machine, if registered.
5034 pub fn lifecycle_state_machine(&self) -> Option<&crate::lifecycle::LifecyclePollingNodeCtx> {
5035 self.lifecycle.as_ref().map(|lc| &lc.state_machine)
5036 }
5037}
5038
5039// ============================================================================
5040// Parameter declaration API (cfg param-services)
5041// ============================================================================
5042
5043#[cfg(feature = "param-services")]
5044impl Executor {
5045 /// Declare a parameter with a value. Returns `true` if successful.
5046 pub fn declare_parameter(&mut self, name: &str, value: nros_params::ParameterValue) -> bool {
5047 if let Some(params) = &mut self.params {
5048 params.server.declare(name, value)
5049 } else {
5050 false
5051 }
5052 }
5053
5054 /// Declare a parameter with a value and descriptor. Returns `true` if successful.
5055 pub fn declare_parameter_with_descriptor(
5056 &mut self,
5057 name: &str,
5058 value: nros_params::ParameterValue,
5059 descriptor: nros_params::ParameterDescriptor,
5060 ) -> bool {
5061 if let Some(params) = &mut self.params {
5062 params
5063 .server
5064 .declare_with_descriptor(name, value, Some(descriptor))
5065 } else {
5066 false
5067 }
5068 }
5069
5070 /// Get a parameter value by name.
5071 pub fn get_parameter(&self, name: &str) -> Option<&nros_params::ParameterValue> {
5072 self.params.as_ref()?.server.get(name)
5073 }
5074
5075 /// Get an integer parameter value by name (convenience).
5076 pub fn get_parameter_integer(&self, name: &str) -> Option<i64> {
5077 self.params.as_ref()?.server.get_integer(name)
5078 }
5079
5080 /// Get a reference to the parameter server (if registered).
5081 pub fn params(&self) -> Option<&nros_params::ParameterServer> {
5082 self.params.as_ref().map(|p| &p.server)
5083 }
5084
5085 /// Get a mutable reference to the parameter server (if registered).
5086 pub fn params_mut(&mut self) -> Option<&mut nros_params::ParameterServer> {
5087 self.params.as_mut().map(|p| &mut p.server)
5088 }
5089
5090 /// Create a typed parameter builder (rclrs-compatible API).
5091 ///
5092 /// Returns a [`ParameterBuilder`] for fluent parameter declaration with
5093 /// `.default()`, `.description()`, `.range()`, and terminal methods
5094 /// `.mandatory()`, `.optional()`, or `.read_only()`.
5095 ///
5096 /// Returns [`NodeError::NotInitialized`] if parameter services have
5097 /// not been registered yet — call [`register_parameter_services`]
5098 /// first.
5099 ///
5100 /// # Example
5101 ///
5102 /// ```ignore
5103 /// let max_speed = executor.parameter::<f64>("max_speed")?
5104 /// .default(25.0)
5105 /// .description("Maximum velocity (m/s)")
5106 /// .read_only()?;
5107 /// ```
5108 ///
5109 /// [`ParameterBuilder`]: nros_params::ParameterBuilder
5110 /// [`register_parameter_services`]: Self::register_parameter_services
5111 pub fn parameter<'a, T: nros_params::ParameterVariant>(
5112 &'a mut self,
5113 name: &'a str,
5114 ) -> Result<nros_params::ParameterBuilder<'a, T>, NodeError> {
5115 let server = self
5116 .params
5117 .as_mut()
5118 .map(|p| &mut p.server)
5119 .ok_or(NodeError::NotInitialized)?;
5120 Ok(nros_params::ParameterBuilder::new(server, name))
5121 }
5122}
5123
5124// ============================================================================
5125// std-gated spin and halt methods
5126// ============================================================================
5127
5128#[cfg(feature = "std")]
5129impl Executor {
5130 /// Blocking spin loop with configurable exit conditions.
5131 ///
5132 /// Runs until one of:
5133 /// - [`halt()`](Self::halt) is called (from another thread or signal handler)
5134 /// - Timeout expires (if set in options)
5135 /// - Max callbacks reached (if set in options)
5136 /// - `only_next` is true (single iteration)
5137 ///
5138 /// # Example
5139 ///
5140 /// ```ignore
5141 /// // Spin forever until halted
5142 /// executor.spin_blocking(SpinOptions::default())?;
5143 ///
5144 /// // Spin with 5-second timeout
5145 /// executor.spin_blocking(SpinOptions::new().timeout_ms(5000))?;
5146 ///
5147 /// // Single iteration
5148 /// executor.spin_blocking(SpinOptions::spin_once())?;
5149 /// ```
5150 pub fn spin_blocking(&mut self, opts: SpinOptions) -> Result<(), NodeError> {
5151 use std::time::{Duration, Instant};
5152
5153 const POLL_INTERVAL: core::time::Duration = core::time::Duration::from_millis(10);
5154
5155 let start = Instant::now();
5156 let timeout = opts.timeout_ms.map(Duration::from_millis);
5157 let mut total_callbacks = 0usize;
5158
5159 self.halt_flag
5160 .store(false, std::sync::atomic::Ordering::SeqCst);
5161
5162 loop {
5163 if self.halt_flag.load(std::sync::atomic::Ordering::SeqCst) {
5164 break;
5165 }
5166
5167 if timeout.is_some_and(|t| start.elapsed() >= t) {
5168 break;
5169 }
5170
5171 let result = self.spin_once(POLL_INTERVAL);
5172 total_callbacks += result.total();
5173
5174 if opts.max_callbacks.is_some_and(|max| total_callbacks >= max) {
5175 break;
5176 }
5177
5178 if opts.only_next {
5179 break;
5180 }
5181 }
5182
5183 Ok(())
5184 }
5185
5186 /// Execute one period with wall-clock overrun detection.
5187 ///
5188 /// Calls [`spin_once()`](Self::spin_once), measures wall-clock time, sleeps
5189 /// for the remainder if under budget.
5190 ///
5191 /// # Example
5192 ///
5193 /// ```ignore
5194 /// let period = std::time::Duration::from_millis(10);
5195 /// let result = executor.spin_one_period_timed(period);
5196 /// if result.overrun {
5197 /// log::warn!("Period overrun: {:?}", result.elapsed);
5198 /// }
5199 /// ```
5200 pub fn spin_one_period_timed(
5201 &mut self,
5202 period: std::time::Duration,
5203 ) -> super::types::SpinPeriodResult {
5204 let start = std::time::Instant::now();
5205 let result = self.spin_once(period);
5206 let elapsed = start.elapsed();
5207 let overrun = elapsed > period;
5208 if !overrun {
5209 std::thread::sleep(period - elapsed);
5210 }
5211 super::types::SpinPeriodResult {
5212 work: result,
5213 overrun,
5214 elapsed,
5215 }
5216 }
5217
5218 /// Spin at a fixed rate with drift compensation. Blocks until halted.
5219 ///
5220 /// Uses wall-clock time to maintain the target rate. The next invocation
5221 /// time is accumulated (not reset to `now + period`) to prevent cumulative
5222 /// drift.
5223 ///
5224 /// # Example
5225 ///
5226 /// ```ignore
5227 /// // 100Hz control loop — blocks until halt() is called
5228 /// executor.spin_period(std::time::Duration::from_millis(10))?;
5229 /// ```
5230 pub fn spin_period(&mut self, period: std::time::Duration) -> Result<(), NodeError> {
5231 self.halt_flag
5232 .store(false, std::sync::atomic::Ordering::SeqCst);
5233 let mut next_invocation = std::time::Instant::now() + period;
5234
5235 loop {
5236 if self.halt_flag.load(std::sync::atomic::Ordering::SeqCst) {
5237 break;
5238 }
5239
5240 self.spin_once(period);
5241
5242 let now = std::time::Instant::now();
5243 if now < next_invocation {
5244 std::thread::sleep(next_invocation - now);
5245 }
5246 // Accumulate to prevent drift (not = now + period)
5247 next_invocation += period;
5248 }
5249 Ok(())
5250 }
5251
5252 /// Request the executor to stop spinning.
5253 ///
5254 /// Sets a flag that causes [`spin_blocking()`](Self::spin_blocking) or
5255 /// [`spin_period()`](Self::spin_period) to exit on the next iteration.
5256 /// Safe to call from another thread or signal handler.
5257 ///
5258 /// Also raises the Phase 104.C.6 wake flag so a `spin_once` already
5259 /// blocked inside a backend's `drive_io` falls through to the halt
5260 /// check on its next loop iteration instead of waiting out its full
5261 /// `timeout_ms` first.
5262 pub fn halt(&self) {
5263 self.halt_flag
5264 .store(true, std::sync::atomic::Ordering::SeqCst);
5265 self.wake_flag
5266 .store(true, std::sync::atomic::Ordering::SeqCst);
5267 }
5268
5269 /// Phase 110.D.b — move this Executor onto a fresh OS thread,
5270 /// apply a per-thread scheduling policy via the caller-supplied
5271 /// `apply_policy` function, and run the spin loop until
5272 /// [`ThreadHandle::halt`] fires.
5273 ///
5274 /// The function-pointer indirection on `apply_policy` lets the
5275 /// caller pass any platform's `PlatformScheduler::set_current_thread_policy`
5276 /// without forcing `Executor` to be generic over the platform —
5277 /// keeps the existing `Executor` type stable.
5278 ///
5279 /// Multi-executor preemption (the actual hard-RT win) comes from
5280 /// the OS scheduler — call `open_threaded` once per criticality
5281 /// tier, each with its own policy / priority. The kernel handles
5282 /// preemption across executors; within a single executor,
5283 /// dispatch remains non-preemptive (110.A–C bucketed sets).
5284 ///
5285 /// # Safety
5286 ///
5287 /// Moves `self` across thread boundaries. `Executor` contains a
5288 /// raw `*mut session::ConcreteSession` when constructed via
5289 /// `from_session_ptr`; the caller must ensure that pointer's
5290 /// referent stays valid across the lifetime of the spawned thread
5291 /// and that no other thread mutates the session concurrently.
5292 /// `from_session` (Owned) is safer — `ConcreteSession` ownership
5293 /// transfers cleanly into the thread.
5294 #[cfg(feature = "std")]
5295 pub unsafe fn open_threaded(
5296 self,
5297 policy: nros_platform_api::SchedPolicy,
5298 apply_policy: fn(
5299 nros_platform_api::SchedPolicy,
5300 ) -> Result<(), nros_platform_api::SchedError>,
5301 spin_period: core::time::Duration,
5302 ) -> ThreadHandle {
5303 let halt = std::sync::Arc::clone(&self.halt_flag);
5304 // SAFETY: Send is asserted via `unsafe impl Send for Executor`
5305 // below; the caller's safety contract on `from_session_ptr`
5306 // covers the pointer-validity invariant.
5307 let mut executor = self;
5308 let join = std::thread::spawn(move || {
5309 // Apply the requested OS scheduling policy to this fresh
5310 // thread. Failure is reported but not propagated — a
5311 // runtime that fails to lift to SCHED_FIFO still spins
5312 // correctly at SCHED_OTHER (just without RT guarantees).
5313 let _ = apply_policy(policy);
5314 while !executor.is_halted() {
5315 executor.spin_once(spin_period);
5316 }
5317 });
5318 ThreadHandle {
5319 join: Some(join),
5320 halt,
5321 }
5322 }
5323
5324 /// Check if halt has been requested.
5325 pub fn is_halted(&self) -> bool {
5326 self.halt_flag.load(std::sync::atomic::Ordering::SeqCst)
5327 }
5328
5329 /// Get a clone of the halt flag for use in signal handlers or other threads.
5330 ///
5331 /// # Example
5332 ///
5333 /// ```ignore
5334 /// let halt = executor.halt_flag();
5335 /// std::thread::spawn(move || {
5336 /// std::thread::sleep(Duration::from_secs(5));
5337 /// halt.store(true, Ordering::SeqCst);
5338 /// });
5339 /// executor.spin_blocking(SpinOptions::default())?;
5340 /// ```
5341 pub fn halt_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
5342 self.halt_flag.clone()
5343 }
5344
5345 /// Phase 104.C.6 — wake the executor from another thread / ISR /
5346 /// signal handler.
5347 ///
5348 /// Sets the shared `wake_flag`. The next `spin_once` swap-clears the
5349 /// flag, skips the blocking wait on the primary session, and polls
5350 /// every session non-blockingly so whatever queued the wake is
5351 /// observed in a single iteration. Idempotent — multiple `wake()`
5352 /// calls collapse into one observed wake per `spin_once`.
5353 pub fn wake(&self) {
5354 self.wake_flag
5355 .store(true, std::sync::atomic::Ordering::SeqCst);
5356 }
5357
5358 /// Phase 104.C.6 — clone of the shared wake flag for cross-thread
5359 /// use (signal handlers, foreign threads, future per-backend vtable
5360 /// wake hooks).
5361 ///
5362 /// # Example
5363 ///
5364 /// ```ignore
5365 /// let wake = executor.wake_handle();
5366 /// std::thread::spawn(move || {
5367 /// // ... compute something ...
5368 /// // hand off to executor by setting the flag.
5369 /// wake.store(true, Ordering::SeqCst);
5370 /// });
5371 /// loop { executor.spin_once(Duration::from_millis(100)); }
5372 /// ```
5373 pub fn wake_handle(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
5374 self.wake_flag.clone()
5375 }
5376}
5377
5378/// Phase 110.E.b — opaque per-platform timer handle. Stores the
5379/// raw platform handle (POSIX `timer_t` boxed via `PosixTimerHandle`,
5380/// FreeRTOS `TimerHandle_t`, etc.) plus a destroy thunk so the
5381/// Executor can clean up without being generic over the platform.
5382///
5383/// Caller of `register_sporadic_timer` builds this via
5384/// `OpaqueTimerHandle::new(handle, destroy_fn)` after their
5385/// `PlatformTimer::create_periodic` call returns.
5386#[cfg(feature = "alloc")]
5387pub struct OpaqueTimerHandle {
5388 handle: *mut core::ffi::c_void,
5389 destroy_fn: extern "C" fn(*mut core::ffi::c_void),
5390}
5391
5392#[cfg(feature = "alloc")]
5393unsafe impl Send for OpaqueTimerHandle {}
5394#[cfg(feature = "alloc")]
5395unsafe impl Sync for OpaqueTimerHandle {}
5396
5397#[cfg(feature = "alloc")]
5398impl OpaqueTimerHandle {
5399 /// # Safety
5400 /// `handle` must be a live platform-specific timer handle that
5401 /// `destroy_fn` knows how to drop. Caller surrenders ownership
5402 /// of the underlying handle to the Executor.
5403 pub unsafe fn new(
5404 handle: *mut core::ffi::c_void,
5405 destroy_fn: extern "C" fn(*mut core::ffi::c_void),
5406 ) -> Self {
5407 Self { handle, destroy_fn }
5408 }
5409}
5410
5411#[cfg(feature = "alloc")]
5412impl Drop for OpaqueTimerHandle {
5413 fn drop(&mut self) {
5414 if !self.handle.is_null() {
5415 (self.destroy_fn)(self.handle);
5416 self.handle = core::ptr::null_mut();
5417 }
5418 }
5419}
5420
5421/// Handle returned from [`Executor::open_threaded`]. Holds the
5422/// spawned thread's join handle and a clone of the executor's halt
5423/// flag. Drop runs `halt() + join()` so the thread can't outlive the
5424/// handle.
5425#[cfg(feature = "std")]
5426pub struct ThreadHandle {
5427 join: Option<std::thread::JoinHandle<()>>,
5428 halt: std::sync::Arc<std::sync::atomic::AtomicBool>,
5429}
5430
5431#[cfg(feature = "std")]
5432impl ThreadHandle {
5433 /// Signal the spawned executor thread to stop. The thread exits
5434 /// on its next `spin_once` iteration.
5435 pub fn halt(&self) {
5436 self.halt.store(true, std::sync::atomic::Ordering::SeqCst);
5437 }
5438
5439 /// Wait for the spawned thread to exit. Returns the join result.
5440 /// After `join`, calling it again is a no-op (returns `Ok(())`).
5441 pub fn join(mut self) -> std::thread::Result<()> {
5442 self.halt();
5443 match self.join.take() {
5444 Some(j) => j.join(),
5445 None => Ok(()),
5446 }
5447 }
5448}
5449
5450#[cfg(feature = "std")]
5451impl Drop for ThreadHandle {
5452 fn drop(&mut self) {
5453 self.halt.store(true, std::sync::atomic::Ordering::SeqCst);
5454 if let Some(j) = self.join.take() {
5455 let _ = j.join();
5456 }
5457 }
5458}
5459
5460// SAFETY: Phase 110.D.b — `Executor` contains a raw `*mut
5461// session::ConcreteSession` only on the `from_session_ptr` (Borrowed)
5462// path; the `from_session` (Owned) path is plain Send-able. The
5463// `unsafe fn open_threaded` entry point documents the safety
5464// contract for Borrowed sessions; for Owned sessions the Send claim
5465// is unconditional.
5466#[cfg(feature = "std")]
5467unsafe impl Send for Executor {}
5468
5469// =============================================================================
5470// Phase 110.F — `OsPriorityWorker` + `WorkItem`
5471// =============================================================================
5472
5473/// One worker thread per distinct `SchedContext.os_pri` value used
5474/// across registered SCs. Self-elevates via the executor's stored
5475/// `apply_policy` fn pointer at startup; drains a bounded mpsc
5476/// mailbox of `WorkItem`s. Phase 110.F.
5477#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5478pub(crate) struct OsPriorityWorker {
5479 sender: std::sync::mpsc::Sender<WorkItem>,
5480 halt: std::sync::Arc<std::sync::atomic::AtomicBool>,
5481 join: Option<std::thread::JoinHandle<()>>,
5482}
5483
5484#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5485struct WorkItem {
5486 arena_base: usize,
5487 arena_offset: usize,
5488 try_process: unsafe fn(*mut u8, u64) -> Result<bool, nros_rmw::TransportError>,
5489 delta_ms: u64,
5490}
5491
5492// SAFETY: Phase 110.F per-DescIdx exclusive-access invariant — the
5493// activator scan in `spin_once` only sends a `WorkItem` for a given
5494// `arena_offset` to one worker per cycle, and won't re-send the same
5495// offset until the worker drains the previous one (`os_pri` dispatch
5496// is the worker's exclusive path; cooperative dispatch is skipped
5497// for SCs with non-zero `os_pri`). The fn pointer is Send-clean.
5498#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5499unsafe impl Send for WorkItem {}
5500
5501#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5502impl OsPriorityWorker {
5503 fn spawn(
5504 os_pri: u8,
5505 apply_policy: fn(
5506 nros_platform_api::SchedPolicy,
5507 ) -> Result<(), nros_platform_api::SchedError>,
5508 ) -> Self {
5509 use std::sync::atomic::Ordering;
5510 let (tx, rx) = std::sync::mpsc::channel::<WorkItem>();
5511 let halt = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
5512 let halt_w = std::sync::Arc::clone(&halt);
5513 let join = std::thread::Builder::new()
5514 .name(alloc::format!("nros-os-pri-{os_pri}"))
5515 .spawn(move || {
5516 // Self-elevate. Failure is logged but doesn't stop
5517 // the worker — running at SCHED_OTHER is still
5518 // correct, just without the priority guarantee.
5519 let _ = apply_policy(nros_platform_api::SchedPolicy::Fifo { os_pri });
5520 while !halt_w.load(Ordering::Acquire) {
5521 match rx.recv_timeout(core::time::Duration::from_millis(10)) {
5522 Ok(item) => {
5523 // SAFETY: arena_base + arena_offset point
5524 // into the executor's arena, which
5525 // outlives the worker per Drop ordering
5526 // (Executor::Drop halts + joins workers
5527 // before the arena is freed).
5528 let data = (item.arena_base as *mut u8).wrapping_add(item.arena_offset);
5529 let _ = unsafe { (item.try_process)(data, item.delta_ms) };
5530 }
5531 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
5532 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
5533 }
5534 }
5535 })
5536 // SAFETY-invariant: spawn failure means the OS refused a new
5537 // thread (resource exhaustion). This runs once per priority
5538 // level at lazy worker setup — not on any hot/spin path — and
5539 // a runtime that cannot create its priority worker has no
5540 // correct way to continue, so fail fast at the setup point.
5541 .expect("os-priority worker spawn");
5542 Self {
5543 sender: tx,
5544 halt,
5545 join: Some(join),
5546 }
5547 }
5548
5549 fn try_dispatch(&self, item: WorkItem) -> bool {
5550 self.sender.send(item).is_ok()
5551 }
5552}
5553
5554#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5555impl Drop for OsPriorityWorker {
5556 fn drop(&mut self) {
5557 self.halt.store(true, std::sync::atomic::Ordering::Release);
5558 if let Some(j) = self.join.take() {
5559 let _ = j.join();
5560 }
5561 }
5562}
5563
5564impl Drop for Executor {
5565 fn drop(&mut self) {
5566 // Phase 258 (Track 2, 2a) — release executor-owned component cells
5567 // first (before the arena entries), so a component's `drop`
5568 // trampoline can still touch its own (cell-owned) state. Each slot
5569 // owns a leaked `Arc<ComponentCell>`; its `drop` reconstitutes +
5570 // drops that Arc exactly once.
5571 for slot in self.component_slots.iter() {
5572 // SAFETY: `slot.state` is the leaked cell enrolled via
5573 // `enroll_component`; `slot.drop` is its matching trampoline, run
5574 // exactly once here (slots are not removed before Drop).
5575 unsafe {
5576 (slot.drop)(slot.state);
5577 }
5578 }
5579 let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
5580 for meta in self.entries.iter().flatten() {
5581 // SAFETY: each entry was written by `ptr::write` in `add_*` and
5582 // has not been dropped yet. `drop_fn` matches the concrete type.
5583 unsafe {
5584 let data_ptr = arena_ptr.add(meta.offset);
5585 (meta.drop_fn)(data_ptr);
5586 }
5587 }
5588 }
5589}
5590
5591#[cfg(all(test, not(feature = "rmw-cffi")))]
5592mod dispatch_registry_tests {
5593 //! Phase 216 follow-up — `Executor::register_dispatch_slot` +
5594 //! `Executor::dispatch_callback` round-trip.
5595 //!
5596 //! Uses `MockSession` (same pattern as
5597 //! `lifecycle_services::tests::mock_integration`) so the test
5598 //! doesn't need a live RMW backend. Gated `not(feature =
5599 //! "rmw-cffi")` because under `rmw-cffi` the `ConcreteSession`
5600 //! type alias resolves to the cffi session, which `MockSession`
5601 //! can't impersonate.
5602
5603 extern crate alloc;
5604
5605 use super::Executor;
5606 use crate::mock::MockSession;
5607 use std::sync::Mutex;
5608
5609 static CAPTURED: Mutex<alloc::vec::Vec<(usize, alloc::vec::Vec<u8>, usize)>> =
5610 Mutex::new(alloc::vec::Vec::new());
5611
5612 /// Test trampoline matching the per-Node
5613 /// `__nros_node_<pkg>_on_callback` ABI shape (Phase 216.A.5).
5614 unsafe extern "C" fn recording_on_callback(
5615 state: *mut core::ffi::c_void,
5616 cb_id_ptr: *const u8,
5617 cb_id_len: usize,
5618 ctx: *mut core::ffi::c_void,
5619 ) {
5620 // SAFETY: caller (test body below) holds storage live;
5621 // `cb_id_ptr..len` points into a `&str` literal.
5622 let cb_id_bytes = unsafe { core::slice::from_raw_parts(cb_id_ptr, cb_id_len).to_vec() };
5623 let mut guard = CAPTURED.lock().expect("CAPTURED poisoned");
5624 guard.push((state as usize, cb_id_bytes, ctx as usize));
5625 }
5626
5627 #[test]
5628 fn register_dispatch_slot_round_trip() {
5629 let session = MockSession::new();
5630 let mut executor: Executor = Executor::from_session(session);
5631
5632 // Pre-condition: empty registry.
5633 assert_eq!(executor.dispatch_slot_count(), 0);
5634
5635 // Two distinct "states" so we prove every slot gets called
5636 // with its OWN state.
5637 let mut state_blob_a: u32 = 0xABCD_0001;
5638 let mut state_blob_b: u32 = 0xABCD_0002;
5639 let state_a_ptr = &mut state_blob_a as *mut u32 as *mut core::ffi::c_void;
5640 let state_b_ptr = &mut state_blob_b as *mut u32 as *mut core::ffi::c_void;
5641
5642 executor
5643 .register_dispatch_slot(state_a_ptr, recording_on_callback)
5644 .expect("register slot A");
5645 executor
5646 .register_dispatch_slot(state_b_ptr, recording_on_callback)
5647 .expect("register slot B");
5648 assert_eq!(executor.dispatch_slot_count(), 2);
5649
5650 let mut ctx_blob: u32 = 0xFEED_BEEF;
5651 let ctx_ptr = &mut ctx_blob as *mut u32 as *mut core::ffi::c_void;
5652 let cb_id = "/talker/timer/publish";
5653
5654 CAPTURED.lock().expect("CAPTURED poisoned").clear();
5655 executor.dispatch_callback(cb_id, ctx_ptr);
5656
5657 let captured = CAPTURED.lock().expect("CAPTURED poisoned").clone();
5658 assert_eq!(
5659 captured.len(),
5660 2,
5661 "every registered slot must be invoked — linear scan, \
5662 no self-filter at the registry layer"
5663 );
5664 // heapless::Vec iterates in insertion order.
5665 assert_eq!(captured[0].0, state_a_ptr as usize, "slot A's state");
5666 assert_eq!(captured[1].0, state_b_ptr as usize, "slot B's state");
5667 for (idx, capture) in captured.iter().enumerate() {
5668 assert_eq!(
5669 capture.1.as_slice(),
5670 cb_id.as_bytes(),
5671 "slot {idx} cb_id bytes round-trip"
5672 );
5673 assert_eq!(
5674 capture.2, ctx_ptr as usize,
5675 "slot {idx} ctx pointer round-trip"
5676 );
5677 }
5678 }
5679
5680 #[test]
5681 fn register_dispatch_slot_capacity_full() {
5682 let session = MockSession::new();
5683 let mut executor: Executor = Executor::from_session(session);
5684
5685 let mut state_blob: u32 = 0;
5686 let state_ptr = &mut state_blob as *mut u32 as *mut core::ffi::c_void;
5687
5688 // `MAX_NODES` slots fit; the next one must error.
5689 for _ in 0..crate::config::MAX_NODES {
5690 executor
5691 .register_dispatch_slot(state_ptr, recording_on_callback)
5692 .expect("under-capacity push must succeed");
5693 }
5694 assert_eq!(executor.dispatch_slot_count(), crate::config::MAX_NODES);
5695 let overflow = executor.register_dispatch_slot(state_ptr, recording_on_callback);
5696 assert!(
5697 overflow.is_err(),
5698 "over-capacity push must return Err(()) — raise \
5699 NROS_EXECUTOR_MAX_NODES at build time to grow the registry"
5700 );
5701 }
5702}