Skip to main content

nros_node/executor/
spin.rs

1//! Executor struct and core spin methods.
2
3use core::{marker::PhantomData, mem::MaybeUninit};
4
5use nros_core::{BorrowedMessage, RosMessage, RosService};
6use nros_rmw::{QosSettings, ServiceInfo, Session, TopicInfo, TransportError};
7
8use crate::{session, timer::TimerDuration};
9
10#[cfg(feature = "safety-e2e")]
11use super::arena::{
12    SubSafetyEntry, sub_safety_has_data, sub_safety_pre_sample, sub_safety_try_process,
13};
14#[cfg(feature = "rmw-cffi")]
15use super::types::ExecutorConfig;
16#[cfg(feature = "std")]
17use super::types::SpinOptions;
18use super::{
19    arena::{
20        BufferStrategy, CallbackMeta, EntryKind, GuardConditionEntry, ServiceClientCallbackEntry,
21        ServiceClientRawArenaEntry, ServiceClientSendHeader, SrvEntry, SrvRawEntry,
22        SubBufferedBorrowedEntry, SubBufferedEntry, SubBufferedRawCEntry, SubBufferedRawEntry,
23        SubBufferedRawInfoCEntry, SubBufferedRawInfoEntry, SubInfoEntry, SubInplaceEntry,
24        TimerEntry, TimerHeader, always_ready, buffered_region_size, drop_entry, guard_has_data,
25        guard_try_process, no_pre_sample, service_client_callback_try_process,
26        service_client_raw_try_process, srv_has_data, srv_raw_has_data, srv_raw_try_process,
27        srv_try_process, sub_buffered_borrowed_has_data, sub_buffered_borrowed_try_process,
28        sub_buffered_has_data, sub_buffered_raw_c_has_data, sub_buffered_raw_c_try_process,
29        sub_buffered_raw_has_data, sub_buffered_raw_info_c_has_data,
30        sub_buffered_raw_info_c_try_process, sub_buffered_raw_info_has_data,
31        sub_buffered_raw_info_try_process, sub_buffered_raw_try_process, sub_buffered_try_process,
32        sub_info_has_data, sub_info_pre_sample, sub_info_try_process, sub_inplace_has_data,
33        sub_inplace_try_process, timer_try_process,
34    },
35    node::NodeHandle,
36    spsc_ring::SpscRing,
37    triple_buffer::TripleBuffer,
38    types::{
39        ExecutorSemantics, GuardConditionHandle, HandleId, InvocationMode, NodeError,
40        RawResponseCallback, RawServiceCallback, RawSubscriptionCallback,
41        RawSubscriptionInfoCallback, ReadinessSnapshot, SpinOnceResult, SpinPeriodPollingResult,
42        Trigger,
43    },
44};
45
46// ============================================================================
47// Executor::open() factory method
48// ============================================================================
49
50#[cfg(feature = "rmw-cffi")]
51impl Executor {
52    /// Open a new executor session using the active RMW backend.
53    ///
54    /// Phase 115.M.4 — auto-registers the cffi vtable for whichever
55    /// backend the build was configured for, mirroring the C++ side's
56    /// `#ifdef NROS_RMW_<NAME>` fan-out in `<nros/node.hpp>`. The
57    /// runtime's atomic vtable slot is idempotent: a re-call of any
58    /// backend's `register()` is a no-op, so the fan-out below is safe
59    /// to invoke on every `Executor::open` (cheaper than a `Once` and
60    /// doesn't pull in `std::sync` for no_std targets).
61    ///
62    /// Connects to the middleware at the locator specified in `config`.
63    ///
64    /// # Example
65    ///
66    /// ```ignore
67    /// let config = ExecutorConfig::from_env().node_name("my_node");
68    /// let mut executor = Executor::open(&config)?;
69    /// ```
70    pub fn open(config: &ExecutorConfig<'_>) -> Result<Self, NodeError> {
71        use nros_rmw::Rmw;
72
73        // Phase 128.A.3 / 249 P4b.1 — manifest-driven backend selection.
74        //
75        // Every linked backend self-registered via its `.init_array`
76        // ctor before `main` (RFC-0042 §D3.3), so the registry is
77        // already populated — no runtime section walk.
78        //
79        // 1. Consult `$NROS_RMW` (when std/env is available) for
80        //    explicit override, mirroring ROS 2's `RMW_IMPLEMENTATION`.
81        // 2. With no selector, pick the unique registered backend.
82        //    Zero registered → `NoBackend`; more than one →
83        //    `Ambiguous` (user must set `$NROS_RMW` or use
84        //    `Executor::open_multi`).
85        let selector = read_rmw_selector_env();
86        // `as_deref()` on `Option<Vec<u8>>` yields `Option<&[u8]>`;
87        // on the no_std `Option<&'static [u8]>` variant it's a
88        // no-op the lint catches but the std signature still
89        // requires the call. Allowed locally.
90        #[allow(clippy::needless_option_as_deref)]
91        let sel_ref = selector.as_deref();
92        match nros_rmw_cffi::resolve_backend(sel_ref) {
93            nros_rmw_cffi::BackendResolution::Single(_) => {}
94            // Map every non-`Single` outcome to a transport
95            // ConnectionFailed for now; the more granular ret codes
96            // (NO_BACKEND / AMBIGUOUS / UNKNOWN) are exposed to C
97            // callers via `nros_init`'s return value (Phase 128.C.2).
98            _ => return Err(NodeError::Transport(TransportError::ConnectionFailed)),
99        }
100
101        let rmw_config = nros_rmw::RmwConfig {
102            locator: config.locator,
103            mode: config.mode,
104            domain_id: config.domain_id,
105            node_name: config.node_name,
106            namespace: config.namespace,
107            properties: &[],
108        };
109        let session = if let Some(name) = sel_ref {
110            // Selector path: route to the specific named backend so
111            // the env-var-disambiguated outcome matches what the
112            // resolver above identified.
113            nros_rmw_cffi::CffiRmw::open_with_rmw(
114                core::str::from_utf8(name).unwrap_or(""),
115                &rmw_config,
116            )
117        } else {
118            nros_rmw_cffi::CffiRmw.open(&rmw_config)
119        }
120        .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))?;
121        let mut executor = Self::from_session(session);
122        #[cfg(not(feature = "std"))]
123        {
124            executor.clock_us_fn = config.clock_us;
125            executor.last_spin_end_us = config.clock_us.map(|clock| clock());
126        }
127        executor.set_node_identity(config.node_name, config.namespace);
128        #[cfg(all(feature = "std", feature = "rmw-cffi"))]
129        executor.install_wake_signal_on_primary();
130        #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
131        executor.install_wake_signal_on_primary_alloc();
132        Ok(executor)
133    }
134
135    /// Phase 128.F.1 — explicit per-backend session declaration for
136    /// bridge mode. `specs[0]` becomes the primary session; `specs[1..]`
137    /// open as extras keyed by RMW name. After construction, every
138    /// `create_node_on(name, rmw)` call dispatches to whichever
139    /// session was opened under that RMW name (or, when the rmw name
140    /// matches the primary, the primary session itself).
141    ///
142    /// Single-backend callers should keep using
143    /// [`open`](Self::open) — this entry costs an extra
144    /// `open_with_rmw` per spec and adds no value when only one
145    /// backend is linked.
146    ///
147    /// `$NROS_RMW` env is ignored: bridge mode wants explicit names.
148    #[cfg(feature = "rmw-cffi")]
149    pub fn open_multi(specs: &[SessionSpec<'_>]) -> Result<Self, NodeError> {
150        // Phase 249 P4b.1 — backends self-registered via their
151        // `.init_array` ctor before `main`; no runtime section walk.
152        let primary = specs
153            .first()
154            .ok_or(NodeError::Transport(TransportError::ConnectionFailed))?;
155        let primary_session =
156            nros_rmw_cffi::CffiRmw::open_with_rmw(primary.rmw, &primary.to_rmw_config())
157                .map_err(NodeError::Transport)?;
158        let mut executor = Self::from_session(primary_session);
159        executor.set_node_identity("", "/");
160        // Phase 156 — see `Executor::open` for primary-identity
161        // recording rationale.
162        let _ = executor.primary_rmw_name.push_str(primary.rmw);
163        let _ = executor.primary_locator.push_str(primary.locator);
164        #[cfg(all(feature = "std", feature = "rmw-cffi"))]
165        executor.install_wake_signal_on_primary();
166        #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
167        executor.install_wake_signal_on_primary_alloc();
168
169        for spec in specs.iter().skip(1) {
170            let session = nros_rmw_cffi::CffiRmw::open_with_rmw(spec.rmw, &spec.to_rmw_config())
171                .map_err(NodeError::Transport)?;
172            executor
173                .extra_sessions
174                .push(session)
175                .map_err(|_| NodeError::NodeTableFull)?;
176            #[cfg(feature = "std")]
177            {
178                let idx = executor.extra_sessions.len() - 1;
179                executor.install_wake_signal_on_extra(idx);
180            }
181            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
182            {
183                let idx = executor.extra_sessions.len() - 1;
184                executor.install_wake_signal_on_extra_alloc(idx);
185            }
186        }
187
188        Ok(executor)
189    }
190
191    /// Phase 104.C.1 — open the Executor against a specific RMW
192    /// backend by name. Selects from the named registry (Phase
193    /// 104.B.2). `rmw_name` must match one of the names a backend
194    /// registered under (`"zenoh"`, `"cyclonedds"`, `"xrce"`, …).
195    ///
196    /// Equivalent to [`Executor::open`] when the registry has exactly
197    /// one backend (the default-backend fast path). Use this entry
198    /// point in multi-backend builds where `Executor::open` would
199    /// pick the first-registered slot.
200    ///
201    /// Single-Executor multi-Node multi-RMW (the long-term Design X
202    /// from `docs/roadmap/phase-104-multi-backend-bridges.md`) is
203    /// follow-up work — Phase 104.C.2 + C.3.
204    #[cfg(feature = "rmw-cffi")]
205    pub fn open_with_rmw(rmw_name: &str, config: &ExecutorConfig<'_>) -> Result<Self, NodeError> {
206        if !nros_rmw_cffi::backend_registered() {
207            return Err(NodeError::Transport(TransportError::ConnectionFailed));
208        }
209
210        let rmw_config = nros_rmw::RmwConfig {
211            locator: config.locator,
212            mode: config.mode,
213            domain_id: config.domain_id,
214            node_name: config.node_name,
215            namespace: config.namespace,
216            properties: &[],
217        };
218        let session = nros_rmw_cffi::CffiRmw::open_with_rmw(rmw_name, &rmw_config)
219            .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))?;
220        let mut executor = Self::from_session(session);
221        #[cfg(not(feature = "std"))]
222        {
223            executor.clock_us_fn = config.clock_us;
224            executor.last_spin_end_us = config.clock_us.map(|clock| clock());
225        }
226        executor.set_node_identity(config.node_name, config.namespace);
227        // Phase 156 — record primary identity for the session-
228        // cache hit path. See `Executor::open` for the rationale.
229        let _ = executor.primary_rmw_name.push_str(rmw_name);
230        let _ = executor.primary_locator.push_str(config.locator);
231        #[cfg(all(feature = "std", feature = "rmw-cffi"))]
232        executor.install_wake_signal_on_primary();
233        #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
234        executor.install_wake_signal_on_primary_alloc();
235        Ok(executor)
236    }
237}
238
239/// Phase 128.F.1 — per-backend session declaration for
240/// [`Executor::open_multi`]. Each spec names an RMW backend (must
241/// match one a backend registered under via
242/// `nros_rmw_cffi_register_named` / the `RMW_INIT_ENTRIES` linker
243/// section) and the locator + domain id to open against it.
244#[cfg(feature = "rmw-cffi")]
245#[derive(Clone, Copy)]
246pub struct SessionSpec<'cfg> {
247    pub rmw: &'cfg str,
248    pub locator: &'cfg str,
249    pub domain_id: u32,
250    pub node_name: &'cfg str,
251    pub namespace: &'cfg str,
252}
253
254#[cfg(feature = "rmw-cffi")]
255impl<'cfg> SessionSpec<'cfg> {
256    /// Minimal spec — just RMW name + locator. Domain id defaults to
257    /// 0; node name and namespace are empty.
258    pub const fn new(rmw: &'cfg str, locator: &'cfg str) -> Self {
259        Self {
260            rmw,
261            locator,
262            domain_id: 0,
263            node_name: "",
264            namespace: "",
265        }
266    }
267
268    pub const fn domain_id(mut self, domain_id: u32) -> Self {
269        self.domain_id = domain_id;
270        self
271    }
272
273    pub const fn node_name(mut self, name: &'cfg str) -> Self {
274        self.node_name = name;
275        self
276    }
277
278    pub const fn namespace(mut self, ns: &'cfg str) -> Self {
279        self.namespace = ns;
280        self
281    }
282
283    fn to_rmw_config(self) -> nros_rmw::RmwConfig<'cfg> {
284        nros_rmw::RmwConfig {
285            locator: self.locator,
286            mode: nros_rmw::SessionMode::Client,
287            domain_id: self.domain_id,
288            node_name: self.node_name,
289            namespace: self.namespace,
290            properties: &[],
291        }
292    }
293}
294
295// Phase 128.A.3 — selector for the single-backend resolution path.
296//
297// On hosted (`std`) builds, read `$NROS_RMW`; mirrors ROS 2's
298// `RMW_IMPLEMENTATION`. Returns the name as a byte vector so the
299// caller can pass it to `nros_rmw_cffi::resolve_backend` and (when
300// `Some`) to `CffiRmw::open_with_rmw`.
301//
302// On `no_std` / bare-metal builds, environment variables are not
303// available; resolution always falls through to the single-backend
304// or ambiguous path. Embedded users with multiple backends use the
305// bridge surface `Executor::open_multi` instead.
306#[cfg(all(feature = "std", feature = "rmw-cffi"))]
307fn read_rmw_selector_env() -> Option<alloc::vec::Vec<u8>> {
308    let raw = std::env::var_os("NROS_RMW")?;
309    let bytes = raw.as_encoded_bytes();
310    if bytes.is_empty() {
311        return None;
312    }
313    Some(bytes.to_vec())
314}
315
316#[cfg(all(not(feature = "std"), feature = "rmw-cffi"))]
317fn read_rmw_selector_env() -> Option<&'static [u8]> {
318    None
319}
320
321// ============================================================================
322// SessionStore — owned or borrowed session
323// ============================================================================
324
325/// Session storage: owned or borrowed via raw pointer.
326///
327/// The C API creates a session in `nros_support_init()` before the
328/// executor. `Borrowed` lets the executor use that session without owning it.
329#[allow(clippy::large_enum_variant)]
330pub(crate) enum SessionStore {
331    Owned(session::ConcreteSession),
332    Borrowed(*mut session::ConcreteSession),
333}
334
335impl core::ops::Deref for SessionStore {
336    type Target = session::ConcreteSession;
337    fn deref(&self) -> &session::ConcreteSession {
338        match self {
339            SessionStore::Owned(s) => s,
340            SessionStore::Borrowed(ptr) => unsafe { &**ptr },
341        }
342    }
343}
344
345impl core::ops::DerefMut for SessionStore {
346    fn deref_mut(&mut self) -> &mut session::ConcreteSession {
347        match self {
348            SessionStore::Owned(s) => s,
349            SessionStore::Borrowed(ptr) => unsafe { &mut **ptr },
350        }
351    }
352}
353
354/// Phase 228.E — an opaque, `Send` handle to an [`Executor`]'s RMW session.
355///
356/// In the per-tier model the boot executor opens the one session and hands each
357/// spawned tier task a handle (not a borrow) so the task opens its own
358/// [`Executor`] over that *same* session across the RTOS task boundary. Wrapping
359/// the `pub(crate)` session pointer lets board crates (`nros-board-posix`,
360/// `nros-board-freertos`, …) name + move the handle without naming the session
361/// type. Obtain via [`Executor::session_handle`]; consume via
362/// [`Executor::open_with_session_handle`].
363#[cfg(any(has_rmw, test))]
364pub struct SessionHandle(*mut session::ConcreteSession);
365
366// SAFETY: the per-tier model deliberately shares one session across RTOS tasks;
367// concurrent access is serialized by the RMW backend's internal locks (the RTOS
368// targets build zenoh-pico `Z_FEATURE_MULTI_THREAD=1` — RFC-0032 §5.0). The
369// boot executor owns the session and outlives every tier task.
370#[cfg(any(has_rmw, test))]
371unsafe impl Send for SessionHandle {}
372
373/// Phase 228.C — pure callback-group filter decision. `None` = wildcard (accept
374/// every group); `Some` = accept only listed groups. Backs
375/// [`Executor::group_active`]; split out so the logic is unit-testable without a
376/// live session.
377pub(crate) fn group_filter_accepts<const N: usize, const M: usize>(
378    active: &Option<heapless::Vec<heapless::String<N>, M>>,
379    group: &str,
380) -> bool {
381    match active {
382        None => true,
383        Some(v) => v.iter().any(|g| g.as_str() == group),
384    }
385}
386
387#[cfg(test)]
388mod group_filter_tests {
389    use super::group_filter_accepts;
390
391    type Groups = heapless::Vec<heapless::String<32>, { crate::config::MAX_NODES }>;
392
393    #[test]
394    fn wildcard_accepts_all() {
395        let none: Option<Groups> = None;
396        assert!(group_filter_accepts(&none, "anything"));
397    }
398
399    #[test]
400    fn set_accepts_only_listed_groups() {
401        let mut v: Groups = heapless::Vec::new();
402        let mut s = heapless::String::new();
403        s.push_str("ctrl").unwrap();
404        v.push(s).unwrap();
405        let active = Some(v);
406        assert!(group_filter_accepts(&active, "ctrl"));
407        assert!(!group_filter_accepts(&active, "telem"));
408    }
409}
410
411// ============================================================================
412// Executor
413// ============================================================================
414
415/// Backend-agnostic executor that owns a session.
416///
417/// Provides `create_node()` for entity creation and `drive_io()` for polling.
418///
419/// # Callback Mode
420///
421/// The executor supports arena-based callback registration via the
422/// `node_mut(id).subscription(t)` builder and
423/// [`register_service()`](Self::register_service), with dispatch via
424/// [`spin_once()`](Self::spin_once). No heap allocation is needed.
425///
426/// The sizes are set via `NROS_EXECUTOR_MAX_CBS` (default 4) and
427/// `NROS_EXECUTOR_ARENA_SIZE` (default 4096) environment variables at build time.
428///
429/// Phase 124.B.2 — opaque context handed to the runtime wake
430/// callback. Backends store the raw pointer + invoke the callback;
431/// the callback decodes back to `&WakeCtx`.
432#[cfg(all(feature = "std", feature = "rmw-cffi"))]
433pub(crate) struct WakeCtx {
434    pub(crate) flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
435    pub(crate) cv: std::sync::Arc<std::sync::Condvar>,
436    #[allow(dead_code)] // Held by spin_once's wait predicate (124.B.4).
437    pub(crate) mu: std::sync::Arc<std::sync::Mutex<()>>,
438    /// Phase 130.3 — Zephyr+std uses the k_sem wake primitive; the
439    /// runtime cb signals both this and the std cv so a future
440    /// migration to a single primitive flips one branch instead
441    /// of two.
442    pub(crate) node_wake: Option<std::sync::Arc<super::node_wake::NodeWake>>,
443}
444
445/// Phase 124.B.2 — runtime wake callback.
446///
447/// RT-context contract:
448///
449/// * **Thread-safe**: callable from any thread. The cb is lock-free
450///   on the cv path — no mutex held during `notify_all`. Lost-wakeup
451///   is prevented by the waiter checking `wake_flag` under
452///   `wake_mu` via the `wait_timeout_while` predicate.
453/// * **NOT async-signal-safe on POSIX**: `pthread_cond_signal`
454///   isn't on the POSIX async-signal-safe function list. For POSIX
455///   signal handler wake, use a `signalfd` + select pattern in a
456///   thread that owns the wake duty.
457/// * **RTOS ISR**: per-RTOS platform layer wraps the cv with an
458///   ISR-safe primitive (`xSemaphoreGiveFromISR`,
459///   `tx_event_flags_set` from ISR, `k_sem_give` from ISR on
460///   Zephyr). Backend's ISR caller routes through the platform's
461///   `signal_from_isr` API instead of this cb directly.
462/// * **Bounded execution time**: O(1) — atomic store + cv notify.
463///   No allocation, no contended lock.
464///
465/// The cb is the symbol backends invoke from their async wake path
466/// (datagram arrival, worker-thread enqueue, etc.). It does
467/// flag-write + condvar-signal in that order, lock-free.
468#[cfg(all(feature = "std", feature = "rmw-cffi"))]
469pub(crate) unsafe extern "C" fn nros_rmw_runtime_wake_cb(ctx: *mut core::ffi::c_void) {
470    if ctx.is_null() {
471        return;
472    }
473    // Phase 141.B.2 — capture T0 at cb entry. No-op when the
474    // probe feature is off or no cycle reader is installed.
475    #[cfg(feature = "wake-latency-probe")]
476    super::wake_probe::on_wake();
477    // SAFETY: ctx points at a `WakeCtx` owned by an Executor still
478    // alive at the time of the call. Executor::drop must clear the
479    // callback via `set_wake_callback(None, _)` on all sessions
480    // before dropping wake_ctx; this happens in `install_wake_*`
481    // teardown path.
482    let wake = unsafe { &*(ctx as *const WakeCtx) };
483    wake.flag.store(true, std::sync::atomic::Ordering::SeqCst);
484    // Lock-free notify. The waiter observes wake_flag under wake_mu
485    // in its wait_timeout_while predicate — flag.store with SeqCst
486    // happens-before any subsequent acquire in the waiter, so the
487    // waiter cannot miss the signal even though we don't hold mu
488    // here. Standard pthread cond-var idiom.
489    wake.cv.notify_all();
490    // Phase 130.3 — Zephyr+std waits on `NodeWake` (k_sem) instead
491    // of the std cv. Signal both so the cb keeps working whichever
492    // wait primitive spin_once is using.
493    if let Some(nw) = wake.node_wake.as_ref() {
494        nw.signal();
495    }
496}
497
498/// Phase 124.B.7.c — POSIX signalfd worker.
499///
500/// Owns a Linux `eventfd` plus a worker thread that `read()`s the
501/// fd and forwards via `wake_ctx.cv.notify_all()`. The eventfd
502/// write side is async-signal-safe per the kernel contract
503/// (`write(2)` to an eventfd is permitted from signal handlers),
504/// closing the gap that `pthread_cond_signal` leaves open on POSIX.
505///
506/// Lifecycle:
507///   * Constructed lazily in `Executor::signal_fd()` on first
508///     caller request.
509///   * `Drop` writes a shutdown sentinel + joins the worker.
510///
511/// Caller flow (signal handler):
512///   1. Get fd via `Executor::signal_fd()` before installing the
513///      handler.
514///   2. Handler does `eventfd_write(fd, 1)` (equivalently,
515///      `write(fd, &1u64, 8)`).
516///   3. Worker thread reads the fd, signals wake_cv. spin_once
517///      blocked in cv.wait_timeout_while sees flag=true and exits.
518#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
519pub struct WakeSignalFd {
520    fd: core::ffi::c_int,
521    shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
522    worker: Option<std::thread::JoinHandle<()>>,
523}
524
525#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
526impl WakeSignalFd {
527    /// Spawn the worker. `wake_ctx_ptr` is the `*const WakeCtx`
528    /// produced by `Executor::wake_ctx_ptr` — same value the
529    /// runtime wake cb decodes.
530    fn new(wake_ctx_ptr: *const WakeCtx) -> Result<Self, std::io::Error> {
531        let fd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC) };
532        if fd < 0 {
533            return Err(std::io::Error::last_os_error());
534        }
535
536        let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
537        let shutdown_clone = std::sync::Arc::clone(&shutdown);
538
539        // Pass wake_ctx pointer as usize so the closure is Send.
540        // SAFETY: the pointer is valid for the Executor's lifetime
541        // (WakeCtx is owned by the Executor's `wake_ctx: Arc<WakeCtx>`
542        // field which outlives this worker thread — we join in Drop).
543        let ctx_addr = wake_ctx_ptr as usize;
544        let worker = std::thread::Builder::new()
545            .name("nros-wakefd".into())
546            .spawn(move || {
547                let ctx = ctx_addr as *const WakeCtx;
548                loop {
549                    let mut buf = [0u8; 8];
550                    let n =
551                        unsafe { libc::read(fd, buf.as_mut_ptr() as *mut core::ffi::c_void, 8) };
552                    if n <= 0 {
553                        // EINTR / EOF — re-check shutdown then loop.
554                        if shutdown_clone.load(std::sync::atomic::Ordering::Acquire) {
555                            return;
556                        }
557                        continue;
558                    }
559                    if shutdown_clone.load(std::sync::atomic::Ordering::Acquire) {
560                        return;
561                    }
562                    // Same effect as nros_rmw_runtime_wake_cb. We
563                    // can't call it directly because it dereferences
564                    // ctx as &WakeCtx which would race with Executor
565                    // drop unless we hold a guarantee — the
566                    // shutdown_flag check above + Drop's join gives it.
567                    unsafe {
568                        let w = &*ctx;
569                        w.flag.store(true, std::sync::atomic::Ordering::SeqCst);
570                        w.cv.notify_all();
571                    }
572                }
573            })
574            .map_err(|e| {
575                unsafe { libc::close(fd) };
576                std::io::Error::other(alloc::format!("spawn nros-wakefd worker: {e}"))
577            })?;
578
579        Ok(Self {
580            fd,
581            shutdown,
582            worker: Some(worker),
583        })
584    }
585
586    /// Returns the writable eventfd. The caller (typically a POSIX
587    /// signal handler) writes any non-zero 8-byte value to trigger
588    /// a wake. `write(2)` on an eventfd is async-signal-safe per
589    /// `eventfd(2)` man page.
590    pub fn fd(&self) -> core::ffi::c_int {
591        self.fd
592    }
593}
594
595#[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
596impl Drop for WakeSignalFd {
597    fn drop(&mut self) {
598        self.shutdown
599            .store(true, std::sync::atomic::Ordering::Release);
600        // Wake the worker so it re-checks shutdown.
601        let one: u64 = 1;
602        unsafe {
603            libc::write(self.fd, &one as *const u64 as *const core::ffi::c_void, 8);
604        }
605        if let Some(j) = self.worker.take() {
606            let _ = j.join();
607        }
608        unsafe { libc::close(self.fd) };
609    }
610}
611
612/// Phase 124.B.7.b — ISR / interrupt-context wake callback.
613///
614/// Same semantics as [`nros_rmw_runtime_wake_cb`] but constrained to
615/// async-signal-safe / ISR-safe primitives.
616///
617/// Per-platform routing:
618///
619/// * **POSIX (std)**: `pthread_cond_signal` is NOT on the POSIX
620///   async-signal-safe function list. Calling from a SIGUSR1
621///   handler is technically UB. Real fix (Phase 124.B.7.c) routes
622///   via `signalfd`/`eventfd` + a runtime worker thread; until that
623///   lands, signal-handler callers MUST use
624///   `nros_guard_condition_trigger` from a **separate thread** (not
625///   from the handler itself), OR set the wake_flag and rely on
626///   the next poll deadline. This cb currently aliases the regular
627///   `wake_cb` and is safe only from non-signal-handler ISR-like
628///   contexts (e.g. timer thread, kernel callback).
629///
630/// * **RTOS no_std (Zephyr/FreeRTOS/ThreadX)**: routes through the
631///   platform-cffi `condvar_signal_from_isr` slot. Each backend
632///   uses its ISR-safe variant — `xSemaphoreGiveFromISR`,
633///   `tx_semaphore_put`, `k_condvar_signal`.
634///
635/// `ctx` semantics identical to [`nros_rmw_runtime_wake_cb`].
636#[cfg(all(feature = "std", feature = "rmw-cffi"))]
637#[allow(dead_code)] // Public exposure pending B.7.c signalfd worker.
638pub(crate) unsafe extern "C" fn nros_rmw_runtime_wake_cb_from_isr(ctx: *mut core::ffi::c_void) {
639    // Today: alias regular wake_cb. POSIX signal-handler safety
640    // pending B.7.c (signalfd worker-thread forward). Documented in
641    // the contract above so callers know the boundary.
642    unsafe { nros_rmw_runtime_wake_cb(ctx) };
643}
644
645/// Phase 216 follow-up — per-Node dispatch trampoline registered with
646/// [`Executor::register_dispatch_slot`].
647///
648/// The board-side dispatch task (RTIC `__nros_run` / Embassy
649/// `__nros_run_task`) dequeues a `nros_platform::SignaledCallback`
650/// envelope and forwards `(cb_id, ctx_ptr)` into
651/// [`Executor::dispatch_callback`]; that method linear-scans this
652/// slot table and invokes every registered `on_callback` with the
653/// owning Node's per-Node `state` blob. Each Node's
654/// `__nros_node_<pkg>_on_callback` self-filters on its own
655/// `CallbackId` tag set, so a slot whose Node doesn't own this
656/// callback is a cheap no-op string compare.
657///
658/// The shape mirrors the per-pkg `__nros_node_<pkg>_on_callback`
659/// extern "C" trampoline emitted by the `nros::node!()` macro
660/// (see `packages/core/nros-macros/src/lib.rs` Phase 216.A.5).
661///
662/// # Why not `linkme`
663///
664/// `linkme::distributed_slice` hangs on bare-metal Cortex-M /
665/// RISC-V because `cortex_m_rt`'s link script doesn't provide the
666/// `__start_/__stop_` section anchors in a shape that lets the
667/// iterator terminate (see
668/// `packages/core/nros-rmw-cffi/src/section.rs` Phase 142). Since
669/// stm32f4 RTIC / Embassy boards are the whole point of Phase 216,
670/// the registry uses the explicit `register()` pattern from Phase
671/// 104.A.
672#[derive(Clone, Copy)]
673pub struct DispatchSlot {
674    /// Owning Node's `State` blob — produced by the macro-emitted
675    /// `i()` and round-tripped through
676    /// `nros::__private_node_state_into_raw`. Opaque to the
677    /// executor.
678    pub state: *mut core::ffi::c_void,
679    /// Per-Node `extern "C"` trampoline; signature matches the
680    /// `__nros_node_<pkg>_on_callback` symbol the `nros::node!()`
681    /// macro emits.
682    pub on_callback: unsafe extern "C" fn(
683        state: *mut core::ffi::c_void,
684        cb_id_ptr: *const u8,
685        cb_id_len: usize,
686        ctx: *mut core::ffi::c_void,
687    ),
688}
689
690// SAFETY: `DispatchSlot` carries two raw pointers (`state` + an
691// extern "C" fn pointer). The fn pointer is `Send`/`Sync` by
692// definition; the `state` pointer's `Send`/`Sync` story matches the
693// owning `Executor` (which is `unsafe impl Send`). Treating the
694// slot itself as `Send` keeps the existing `Executor` Send impl
695// intact — see `unsafe impl Send for Executor {}` later in this
696// file.
697unsafe impl Send for DispatchSlot {}
698unsafe impl Sync for DispatchSlot {}
699
700/// Phase 258 (Track 2, 2a) — executor-owned component tick slot.
701///
702/// The layering-clean half of the W0-B `install` seam's tick fix
703/// (phase-257 D2). A `nros`-layer `install`/`register_node_borrowed`
704/// builds an `Arc<ComponentCell>` (the typed/poll-driven component
705/// state) and enrolls it here via [`Executor::enroll_component`]; the
706/// executor then drives `tick` on every enrolled slot at the tail of
707/// each [`spin_once`](Executor::spin_once) — so `install`'d nodes
708/// (C, C++, **and Rust owned-spin**) tick, closing the
709/// service-client/action poll gap that the callback-`Arc`-only
710/// lifetime left open.
711///
712/// Like [`DispatchSlot`] the executor only sees raw pointers + `extern
713/// "C"` fn pointers (no `nros` dep — `nros-node` is the lower layer):
714///
715/// * `state` — a *leaked* `Arc<ComponentCell>` (via `Arc::into_raw`),
716///   re-borrowed by the `nros`-side `tick`/`drop` fns. Unlike a
717///   pub/sub/timer component (kept alive by the executor's per-entity
718///   callback `Arc` clones), a poll-only component has no callbacks, so
719///   the slot must own a clone of the cell — hence the paired `drop`.
720/// * `tick` — `nros`-side `extern "C"` fn that casts `state` back to
721///   `&ComponentCell`, casts `exec_ctx` back to `*mut Executor`, and
722///   runs that one cell's tick (mirrors `ExecutorNodeRuntime::run_ticks`).
723/// * `drop` — `nros`-side `extern "C"` fn run on `Executor::drop` that
724///   reconstitutes + drops the leaked `Arc`, so the executor owns the
725///   cell's lifetime.
726///
727/// Kept a SEPARATE registry from [`DispatchSlot`] on purpose: framework
728/// dispatch (RTIC / Embassy) is interrupt-driven, name-keyed, and has no
729/// tick/own concern — mixing the two risks that path.
730#[derive(Clone, Copy)]
731pub struct ComponentSlot {
732    /// Leaked `Arc<ComponentCell>` (opaque to the executor). Owned by
733    /// this slot — dropped via `drop` on `Executor::drop`.
734    pub state: *mut core::ffi::c_void,
735    /// `nros`-side tick trampoline: `(state, exec_ctx)` where `exec_ctx`
736    /// is `*mut Executor`. Drives one component's `tick`.
737    pub tick: unsafe extern "C" fn(state: *mut core::ffi::c_void, exec_ctx: *mut core::ffi::c_void),
738    /// `nros`-side drop trampoline: reconstitutes + drops the leaked
739    /// `Arc<ComponentCell>` at `state`. Run once on `Executor::drop`.
740    pub drop: unsafe extern "C" fn(state: *mut core::ffi::c_void),
741}
742
743// SAFETY: same story as `DispatchSlot` — two raw pointers + two extern
744// "C" fn pointers. The `state` pointer's Send/Sync matches the owning
745// `Executor` (`unsafe impl Send for Executor`); the fn pointers are
746// Send/Sync by definition.
747unsafe impl Send for ComponentSlot {}
748unsafe impl Sync for ComponentSlot {}
749
750pub struct Executor {
751    pub(crate) session: SessionStore,
752    pub(crate) arena: [MaybeUninit<u8>; crate::config::ARENA_SIZE],
753    pub(crate) arena_used: usize,
754    pub(crate) entries: [Option<CallbackMeta>; crate::config::MAX_CBS],
755    /// Phase 110.B — registered scheduling contexts. Slot 0 is
756    /// auto-populated with a `Fifo` SC at construction; every entry
757    /// without an explicit binding maps to it via
758    /// `sched_context_bindings`.
759    pub(crate) sched_contexts: [Option<super::sched_context::SchedContext>; crate::config::MAX_SC],
760    /// Per-entry SC binding parallel to `entries`. Defaults to
761    /// `SchedContextId(0)` (the auto-created Fifo SC).
762    pub(crate) sched_context_bindings:
763        [super::sched_context::SchedContextId; crate::config::MAX_CBS],
764    /// Phase 110.E — user-space sporadic-server budget state per
765    /// Sporadic-class SC. Slot indices match `sched_contexts`; non-
766    /// Sporadic slots stay `None`.
767    pub(crate) sporadic_states:
768        [Option<super::sched_context::SporadicState>; crate::config::MAX_SC],
769    /// Phase 110.E.b — atomic sporadic state + opaque platform-timer
770    /// handle for ISR-driven refill. Populated by
771    /// `register_sporadic_timer`; dropped on Executor `Drop` via the
772    /// stored `destroy_fn`.
773    #[cfg(feature = "alloc")]
774    pub(crate) sporadic_atomic_states: [Option<(
775        portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>,
776        OpaqueTimerHandle,
777    )>; crate::config::MAX_SC],
778    /// Phase 110.G — major-frame length for time-triggered dispatch.
779    /// `0` (default) disables the TT gate entirely; non-zero enables
780    /// gating per
781    /// `SchedContext.tt_window_offset_us / tt_window_duration_us`.
782    pub(crate) major_frame_us: u32,
783    /// Phase 110.F — per-OS-priority worker pool. Lazily populated
784    /// on first dispatch routing to a non-zero `os_pri`. Lives
785    /// behind `feature = "scheduler-os-priority"` + `feature =
786    /// "std"` because workers need `std::thread` + `mpsc`.
787    #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
788    pub(crate) os_priority_workers: std::collections::HashMap<u8, OsPriorityWorker>,
789    /// Phase 110.F — caller-supplied `apply_policy` function pointer
790    /// each worker invokes at startup to elevate its OS priority.
791    /// `None` = the worker pool is disabled; entries bound to non-
792    /// zero `os_pri` SCs fall back to the cooperative path.
793    /// Mirrors `Executor::open_threaded`'s `apply_policy: fn(...)`
794    /// shape — keeps Executor non-generic over Platform.
795    #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
796    pub(crate) os_priority_apply_policy:
797        Option<fn(nros_platform_api::SchedPolicy) -> Result<(), nros_platform_api::SchedError>>,
798    pub(crate) trigger: Trigger,
799    pub(crate) semantics: ExecutorSemantics,
800    /// Node name for entities created via `register_subscription`/`register_service`.
801    /// Empty means unset — no liveliness tokens will be declared.
802    pub(crate) node_name: heapless::String<64>,
803    /// Phase 228.C — per-tier callback-group filter. `None` = wildcard (register
804    /// every callback — the single-tier degenerate case + today's behaviour).
805    /// `Some(groups)` = this tier's executor accepts only callbacks whose
806    /// `.callback_group()` is in the set; others are skipped at registration.
807    pub(crate) active_groups:
808        Option<heapless::Vec<heapless::String<32>, { crate::config::MAX_NODES }>>,
809    /// Node namespace (default: "/").
810    pub(crate) namespace: heapless::String<64>,
811    /// Phase 104.C.2 — rclcpp-style `add_node` table. Holds the
812    /// per-Node metadata (name, namespace, rmw, locator, default
813    /// SchedContext) for every Node attached to this Executor. The
814    /// implicit "primary" Node (NodeId(0)) mirrors `node_name` +
815    /// `namespace` above and is auto-populated on first use.
816    pub(crate) nodes: heapless::Vec<super::node_record::NodeRecord, { crate::config::MAX_NODES }>,
817    /// Phase 216 follow-up — per-Node dispatch trampoline registry.
818    ///
819    /// Populated by [`Executor::register_dispatch_slot`]; walked by
820    /// [`Executor::dispatch_callback`] each time the board-side
821    /// dispatch task hands off a `SignaledCallback` envelope.
822    /// Sized by `MAX_NODES` because the upper-bound is one slot per
823    /// Node pkg deployed on this executor (the same upper bound used
824    /// by `nodes` and `extra_sessions`). `MAX_NODES` is driven by the
825    /// `NROS_EXECUTOR_MAX_NODES` build-script env var (default 4);
826    /// boards that deploy more Node pkgs raise it at build time.
827    ///
828    /// Default is `heapless::Vec::new()` (empty) — Nodes register
829    /// themselves explicitly via the `register_dispatch_slot` API.
830    /// The fallback shape avoids the `linkme` hazard on bare-metal
831    /// Cortex-M / RISC-V (see `DispatchSlot` doc).
832    pub(crate) dispatch_slots: heapless::Vec<DispatchSlot, { crate::config::MAX_NODES }>,
833    /// Phase 258 (Track 2, 2a) — executor-owned component tick registry.
834    /// Enrolled by [`Executor::enroll_component`] (from `nros`'s
835    /// `install`/`register_node_borrowed`); each slot's `tick` runs at the
836    /// tail of [`spin_once`](Self::spin_once); each slot's `drop` runs on
837    /// `Executor::drop`. Bounded `MAX_NODES` (matches `dispatch_slots` /
838    /// `nodes`). See [`ComponentSlot`] for why it's separate from
839    /// `dispatch_slots`.
840    pub(crate) component_slots: heapless::Vec<ComponentSlot, { crate::config::MAX_NODES }>,
841    /// Phase 104.C.3 — extra sessions opened by `node_builder.rmw()`
842    /// calls that named a backend different from the Executor's
843    /// primary session. Indexed by `NodeRecord.session_idx`
844    /// (1..=N maps to `extra_sessions[N-1]`; idx 0 is the primary
845    /// `self.session`). Sized by `NROS_EXECUTOR_MAX_NODES` since one
846    /// extra session per Node is the worst case.
847    pub(crate) extra_sessions:
848        heapless::Vec<session::ConcreteSession, { crate::config::MAX_NODES }>,
849    /// Phase 156 — primary session's rmw name + locator, captured
850    /// at `open*` time so `NodeBuilder::resolve_session_slot`'s
851    /// cache lookup can detect when a `.rmw(name).locator(loc)`
852    /// matches the primary (slot 0) instead of falling through to
853    /// `CffiRmw::open_with_rmw` and trying to open a SECOND
854    /// session against the same backend. zenoh-pico's global state
855    /// is a process singleton; opening twice fails. Empty when
856    /// constructed via `from_session(_ptr)` without `open*`
857    /// recording the metadata; in that case the cache check
858    /// degrades to "always miss" (today's behaviour).
859    pub(crate) primary_rmw_name: heapless::String<32>,
860    pub(crate) primary_locator: heapless::String<128>,
861    #[cfg(feature = "std")]
862    pub(crate) halt_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
863    /// Phase 104.C.6 — shared executor wake flag. Any source of work
864    /// (foreign thread handing off a callback, signal handler, future
865    /// per-session vtable wake hook) sets this; `spin_once` swaps it to
866    /// `false` on entry and, if it was `true`, polls every session with
867    /// a 0-ms timeout instead of blocking. Lets one notification wake
868    /// the executor regardless of which session the user is currently
869    /// blocked on (the multi-RMW bridge case).
870    #[cfg(feature = "std")]
871    pub(crate) wake_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
872    /// Phase 124.B.2 — wake condvar paired with `wake_flag`. The
873    /// runtime-supplied wake callback (`nros_rmw_runtime_wake_cb` in
874    /// nros-rmw-cffi) writes `wake_flag = true` AND signals
875    /// `wake_cv` atomically under `wake_mu`. `spin_once` blocks on
876    /// the cv with a deadline instead of calling `drive_io` with the
877    /// user's timeout — sub-poll-period wake latency.
878    ///
879    /// Poll-only backends (NULL `set_wake_callback` slot) leave the
880    /// cb uninstalled; the cv wait still fires on its deadline,
881    /// then drive_io(0) drains whatever the backend's internal
882    /// poll has buffered.
883    #[cfg(feature = "std")]
884    #[allow(dead_code)] // Wired by spin_once after 124.B.4.
885    pub(crate) wake_cv: std::sync::Arc<std::sync::Condvar>,
886    #[cfg(feature = "std")]
887    #[allow(dead_code)]
888    pub(crate) wake_mu: std::sync::Arc<std::sync::Mutex<()>>,
889    /// Phase 130.3 — Zephyr+std uses `nros_platform_wake_*` (k_sem)
890    /// instead of `std::sync::Condvar` because Zephyr's libc
891    /// `pthread_cond_timedwait` hangs past its deadline. `None`
892    /// when the platform provider didn't link a wake primitive
893    /// (e.g. test builds with `rmw-cffi` but no `platform-*`
894    /// feature); spin_once falls back to driving the transport
895    /// for the full timeout in that case.
896    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
897    pub(crate) node_wake: Option<std::sync::Arc<super::node_wake::NodeWake>>,
898    /// Phase 130.4 — true when at least one session's backend
899    /// installed the wake callback. Drives whether `spin_once`
900    /// uses the wake-primitive wait (`NodeWake` / `Condvar`) or
901    /// just `drive_io(timeout_ms)`. Poll-only backends
902    /// (XRCE-DDS-Client, current Cyclone/dust-DDS shims) leave
903    /// this `false`; the wait then becomes a no-op sleep that
904    /// starves reliable retransmission (Phase 127.C.4 root
905    /// cause: server's `send_reply` flushes 100 ms once, then a
906    /// blind `wait_ms(100)` sleeps with zero session activity, so
907    /// the agent's ACK arrives into a stalled session and reliable
908    /// redelivery never fires).
909    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
910    pub(crate) has_async_wake: bool,
911    /// Phase 124.B.2 — opaque context Arc handed to backends via
912    /// `set_wake_callback`. Lazy-allocated on first install; stays
913    /// alive for the Executor's lifetime so the raw pointer stored
914    /// in backends remains valid.
915    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
916    pub(crate) wake_ctx: Option<std::sync::Arc<WakeCtx>>,
917    // Phase 141.A.3 — alloc-mode (no_std RTOS) mirror of the wake
918    // state above. Same semantics: `wake_flag_alloc` is set SeqCst
919    // by the runtime cb + cleared by spin_once on entry;
920    // `node_wake_alloc` is the kernel-native binary semaphore
921    // (lifted to alloc cfg in e36ee8cf) the cb signals;
922    // `wake_ctx_alloc` is the Arc handed to backends via
923    // `set_wake_callback(Some(cb), Arc::as_ptr(ctx) as *mut _)`.
924    // `has_async_wake_alloc` is `true` after the first session
925    // accepts the wake-cb install (`supports_wake_callback`).
926    // Drives the no_std spin_once wait branch to block on
927    // `node_wake_alloc.wait_ms(deadline)` instead of relying on
928    // `drive_io`'s transport-blocking recv for the full timeout.
929    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
930    pub(crate) wake_flag_alloc: portable_atomic_util::Arc<portable_atomic::AtomicBool>,
931    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
932    pub(crate) node_wake_alloc: Option<portable_atomic_util::Arc<super::node_wake::NodeWake>>,
933    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
934    pub(crate) wake_ctx_alloc: Option<portable_atomic_util::Arc<super::wake_alloc::WakeCtxAlloc>>,
935    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
936    pub(crate) has_async_wake_alloc: bool,
937    /// Phase 124.B.7.c — lazily-allocated POSIX signalfd worker.
938    /// Owned by the Executor; spawned on first `signal_fd()` call.
939    /// Drop joins the worker thread and closes the fd.
940    #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
941    pub(crate) signal_fd: Option<WakeSignalFd>,
942    #[cfg(feature = "param-services")]
943    pub(crate) params: Option<alloc::boxed::Box<crate::parameter_services::ParamState>>,
944    #[cfg(feature = "lifecycle-services")]
945    pub(crate) lifecycle:
946        Option<alloc::boxed::Box<crate::lifecycle_services::LifecycleRuntimeState>>,
947    /// Sub-millisecond wall-clock residual carried across `spin_once` calls
948    /// so timers tick at true wall-clock rate even when `drive_io` returns
949    /// in well under 1 ms (e.g. zenoh-pico condvar wakeups under load).
950    #[cfg(feature = "std")]
951    pub(crate) spin_residual_us: u64,
952    /// Sub-millisecond residual for no_std wall-clock timer accounting.
953    #[cfg(not(feature = "std"))]
954    pub(crate) spin_residual_us: u64,
955    /// Wall-clock instant at which the previous `spin_once` exited. The
956    /// timer delta on the next call is measured from this point so any
957    /// time the caller spent between `spin_once` invocations (e.g. an
958    /// explicit `thread::sleep`) counts toward timer accumulation just
959    /// like time spent inside `drive_io`.
960    #[cfg(feature = "std")]
961    pub(crate) last_spin_end: Option<std::time::Instant>,
962    /// Monotonic clock endpoint for no_std timer accounting.
963    #[cfg(not(feature = "std"))]
964    pub(crate) last_spin_end_us: Option<u64>,
965    /// Optional platform clock hook supplied by `ExecutorConfig`.
966    #[cfg(not(feature = "std"))]
967    pub(crate) clock_us_fn: Option<fn() -> u64>,
968}
969
970impl Executor {
971    /// Create an executor from an already-opened session.
972    // The `arena` field intentionally lives inline so embedded callers can
973    // place an `Executor` in `static` storage without an allocator. The
974    // construction expression is large but is RVO'd into its destination
975    // by the optimiser; the clippy lint flags it because it can't prove
976    // the move-elision.
977    #[allow(clippy::large_stack_arrays)]
978    pub fn from_session(session: session::ConcreteSession) -> Self {
979        // SAFETY: MaybeUninit::uninit() is always safe; these bytes are only
980        // accessed through properly-typed ptr::write / ptr::read via the
981        // dispatch function pointers stored in `entries`.
982        Self {
983            session: SessionStore::Owned(session),
984            arena: [MaybeUninit::uninit(); crate::config::ARENA_SIZE],
985            arena_used: 0,
986            entries: [None; crate::config::MAX_CBS],
987            sched_contexts: {
988                let mut s = [None; crate::config::MAX_SC];
989                s[0] = Some(super::sched_context::SchedContext::default());
990                s
991            },
992            sched_context_bindings: [super::sched_context::SchedContextId(0);
993                crate::config::MAX_CBS],
994            sporadic_states: [None; crate::config::MAX_SC],
995            #[cfg(feature = "alloc")]
996            sporadic_atomic_states: [const { None }; crate::config::MAX_SC],
997            major_frame_us: 0,
998            #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
999            os_priority_workers: std::collections::HashMap::new(),
1000            #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1001            os_priority_apply_policy: None,
1002            trigger: Trigger::Any,
1003            semantics: ExecutorSemantics::RclcppExecutor,
1004            node_name: heapless::String::new(),
1005            active_groups: None,
1006            nodes: heapless::Vec::new(),
1007            dispatch_slots: heapless::Vec::new(),
1008            component_slots: heapless::Vec::new(),
1009            extra_sessions: heapless::Vec::new(),
1010            primary_rmw_name: heapless::String::new(),
1011            primary_locator: heapless::String::new(),
1012            namespace: {
1013                let mut ns = heapless::String::new();
1014                let _ = ns.push_str("/");
1015                ns
1016            },
1017            #[cfg(feature = "std")]
1018            halt_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1019            #[cfg(feature = "std")]
1020            wake_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1021            #[cfg(feature = "std")]
1022            wake_cv: std::sync::Arc::new(std::sync::Condvar::new()),
1023            #[cfg(feature = "std")]
1024            wake_mu: std::sync::Arc::new(std::sync::Mutex::new(())),
1025            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1026            node_wake: super::node_wake::NodeWake::new().map(std::sync::Arc::new),
1027            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1028            wake_ctx: None,
1029            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1030            has_async_wake: false,
1031            // Phase 141.A.3 — alloc-mode wake state init. Constructed
1032            // eagerly (NodeWake allocation) so the runtime cb can be
1033            // installed lazily on first session without a fallible
1034            // alloc inside spin_once. `None` when the platform
1035            // provider reports the primitive unavailable (matches
1036            // the std-RTOS path's `node_wake: Option<...>`).
1037            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1038            wake_flag_alloc: portable_atomic_util::Arc::new(portable_atomic::AtomicBool::new(
1039                false,
1040            )),
1041            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1042            node_wake_alloc: super::node_wake::NodeWake::new().map(portable_atomic_util::Arc::new),
1043            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1044            wake_ctx_alloc: None,
1045            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1046            has_async_wake_alloc: false,
1047            #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
1048            signal_fd: None,
1049            #[cfg(feature = "param-services")]
1050            params: None,
1051            #[cfg(feature = "lifecycle-services")]
1052            lifecycle: None,
1053            #[cfg(feature = "std")]
1054            spin_residual_us: 0,
1055            #[cfg(not(feature = "std"))]
1056            spin_residual_us: 0,
1057            // Initialise the spin endpoint to construction time so the
1058            // very first `spin_once` credits time the caller spent
1059            // *before* it (e.g. setup, an explicit pre-spin sleep) just
1060            // like time spent between later calls.
1061            #[cfg(feature = "std")]
1062            last_spin_end: Some(std::time::Instant::now()),
1063            #[cfg(not(feature = "std"))]
1064            last_spin_end_us: None,
1065            #[cfg(not(feature = "std"))]
1066            clock_us_fn: None,
1067        }
1068    }
1069
1070    /// Create an executor from a borrowed session pointer.
1071    ///
1072    /// # Safety
1073    /// - `session_ptr` must point to a valid, initialized session that lives at
1074    ///   least as long as this executor.
1075    /// - The caller must not move or drop the session while the executor exists.
1076    // See `from_session` for the lint rationale.
1077    #[allow(clippy::large_stack_arrays)]
1078    pub unsafe fn from_session_ptr(session_ptr: *mut session::ConcreteSession) -> Self {
1079        Self {
1080            session: SessionStore::Borrowed(session_ptr),
1081            arena: [MaybeUninit::uninit(); crate::config::ARENA_SIZE],
1082            arena_used: 0,
1083            entries: [None; crate::config::MAX_CBS],
1084            sched_contexts: {
1085                let mut s = [None; crate::config::MAX_SC];
1086                s[0] = Some(super::sched_context::SchedContext::default());
1087                s
1088            },
1089            sched_context_bindings: [super::sched_context::SchedContextId(0);
1090                crate::config::MAX_CBS],
1091            sporadic_states: [None; crate::config::MAX_SC],
1092            #[cfg(feature = "alloc")]
1093            sporadic_atomic_states: [const { None }; crate::config::MAX_SC],
1094            major_frame_us: 0,
1095            #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1096            os_priority_workers: std::collections::HashMap::new(),
1097            #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1098            os_priority_apply_policy: None,
1099            trigger: Trigger::Any,
1100            semantics: ExecutorSemantics::RclcppExecutor,
1101            node_name: heapless::String::new(),
1102            active_groups: None,
1103            nodes: heapless::Vec::new(),
1104            dispatch_slots: heapless::Vec::new(),
1105            component_slots: heapless::Vec::new(),
1106            extra_sessions: heapless::Vec::new(),
1107            primary_rmw_name: heapless::String::new(),
1108            primary_locator: heapless::String::new(),
1109            namespace: {
1110                let mut ns = heapless::String::new();
1111                let _ = ns.push_str("/");
1112                ns
1113            },
1114            #[cfg(feature = "std")]
1115            halt_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1116            #[cfg(feature = "std")]
1117            wake_flag: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
1118            #[cfg(feature = "std")]
1119            wake_cv: std::sync::Arc::new(std::sync::Condvar::new()),
1120            #[cfg(feature = "std")]
1121            wake_mu: std::sync::Arc::new(std::sync::Mutex::new(())),
1122            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1123            node_wake: super::node_wake::NodeWake::new().map(std::sync::Arc::new),
1124            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1125            wake_ctx: None,
1126            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1127            has_async_wake: false,
1128            // Phase 141.A.3 — alloc-mode wake state init. Constructed
1129            // eagerly (NodeWake allocation) so the runtime cb can be
1130            // installed lazily on first session without a fallible
1131            // alloc inside spin_once. `None` when the platform
1132            // provider reports the primitive unavailable (matches
1133            // the std-RTOS path's `node_wake: Option<...>`).
1134            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1135            wake_flag_alloc: portable_atomic_util::Arc::new(portable_atomic::AtomicBool::new(
1136                false,
1137            )),
1138            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1139            node_wake_alloc: super::node_wake::NodeWake::new().map(portable_atomic_util::Arc::new),
1140            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1141            wake_ctx_alloc: None,
1142            #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1143            has_async_wake_alloc: false,
1144            #[cfg(all(feature = "signal-fd-wake", target_os = "linux"))]
1145            signal_fd: None,
1146            #[cfg(feature = "param-services")]
1147            params: None,
1148            #[cfg(feature = "lifecycle-services")]
1149            lifecycle: None,
1150            #[cfg(feature = "std")]
1151            spin_residual_us: 0,
1152            #[cfg(not(feature = "std"))]
1153            spin_residual_us: 0,
1154            // Initialise the spin endpoint to construction time so the
1155            // very first `spin_once` credits time the caller spent
1156            // *before* it (e.g. setup, an explicit pre-spin sleep) just
1157            // like time spent between later calls.
1158            #[cfg(feature = "std")]
1159            last_spin_end: Some(std::time::Instant::now()),
1160            #[cfg(not(feature = "std"))]
1161            last_spin_end_us: None,
1162            #[cfg(not(feature = "std"))]
1163            clock_us_fn: None,
1164        }
1165    }
1166
1167    /// Phase 228.B (RFC-0015) — construct a tier task's executor that **shares**
1168    /// a session opened once by the orchestration `main()`.
1169    ///
1170    /// In the per-tier execution model `main()` opens one RMW session, then
1171    /// spawns one RTOS task per priority tier; each task calls this to get an
1172    /// [`Executor`] over the *same* session (the `Borrowed` session store — this
1173    /// executor neither owns nor closes it), registers its tier's callback
1174    /// groups, and spins. Thin alias over [`Executor::from_session_ptr`].
1175    ///
1176    /// # Safety
1177    /// `session` must outlive every executor/task built from it (the
1178    /// orchestration `main()` holds it and never returns / WFIs), and must not
1179    /// be mutated except through these executors' spin calls.
1180    pub unsafe fn open_with_session(session: *mut session::ConcreteSession) -> Self {
1181        unsafe { Self::from_session_ptr(session) }
1182    }
1183
1184    /// Raw pointer to this executor's RMW session, for the per-tier model:
1185    /// the boot task opens the one session via [`Executor::open`] (the RMW
1186    /// session is a process-wide singleton — opening twice fails), then hands
1187    /// this pointer to each spawned tier task's
1188    /// [`Executor::open_with_session`]. The boot task's executor owns the
1189    /// session and outlives every borrower, so the pointer stays valid for the
1190    /// program's life. Works for both `Owned` and `Borrowed` stores.
1191    ///
1192    /// # Safety
1193    /// The returned pointer aliases `self.session`. Callers must keep `self`
1194    /// alive (not moved/dropped) for as long as any tier executor uses the
1195    /// pointer, and must only touch the session through executor spin calls
1196    /// (the RMW backend serializes concurrent access through its own locks).
1197    pub fn session_ptr(&mut self) -> *mut session::ConcreteSession {
1198        &mut *self.session as *mut session::ConcreteSession
1199    }
1200
1201    /// Opaque, `Send` form of [`session_ptr`](Self::session_ptr) — the per-tier
1202    /// model hands this to each spawned tier task (it can cross the RTOS task /
1203    /// thread boundary, which a bare `*mut` cannot). See [`SessionHandle`].
1204    ///
1205    /// # Safety
1206    /// Same contract as [`session_ptr`](Self::session_ptr): `self` (the session
1207    /// owner) must outlive every executor built from the handle.
1208    pub fn session_handle(&mut self) -> SessionHandle {
1209        SessionHandle(self.session_ptr())
1210    }
1211
1212    /// Open an [`Executor`] over the session a [`SessionHandle`] refers to (the
1213    /// `Borrowed` store — neither owns nor closes it). The tier-task counterpart
1214    /// to [`session_handle`](Self::session_handle).
1215    ///
1216    /// # Safety
1217    /// The handle's session must still be alive (its owning executor not moved
1218    /// or dropped); access only through executor spin calls.
1219    pub unsafe fn open_with_session_handle(handle: SessionHandle) -> Self {
1220        unsafe { Self::open_with_session(handle.0) }
1221    }
1222
1223    /// Phase 228.C — set this tier executor's active callback-group filter. The
1224    /// generated per-tier task calls this before registering nodes; afterwards
1225    /// only callbacks whose `.callback_group()` is in `groups` register here.
1226    /// An empty slice (or never calling it) leaves the wildcard — register all
1227    /// callbacks (the single-tier degenerate case + today's behaviour).
1228    pub fn set_active_groups(&mut self, groups: &[&str]) {
1229        if groups.is_empty() {
1230            self.active_groups = None;
1231            return;
1232        }
1233        let mut v = heapless::Vec::new();
1234        for g in groups {
1235            let mut s = heapless::String::new();
1236            if s.push_str(g).is_ok() {
1237                let _ = v.push(s);
1238            }
1239        }
1240        self.active_groups = Some(v);
1241    }
1242
1243    /// Phase 228.C — whether a callback in `group` should register in this
1244    /// executor under the current filter. Wildcard (`None`) accepts everything.
1245    pub fn group_active(&self, group: &str) -> bool {
1246        group_filter_accepts(&self.active_groups, group)
1247    }
1248
1249    /// Set the node name and namespace used for liveliness tokens.
1250    ///
1251    /// Called by `open()` to propagate config values. When `register_subscription`
1252    /// or `register_service` creates entities, these values are attached to the
1253    /// Phase 156 — record the primary session's backend identity
1254    /// (rmw name + locator) so `NodeBuilder::resolve_session_slot`
1255    /// can detect when a `.rmw(name)` matches the primary instead
1256    /// of opening a SECOND backend session against the same
1257    /// singleton (zenoh-pico's `g_session` is process-wide;
1258    /// opening twice fails). `Executor::open*` calls this
1259    /// automatically; the C surface (`nros_executor_init`) calls
1260    /// it manually because it constructs via `from_session_ptr`
1261    /// which doesn't know the open metadata. Empty strings = "no
1262    /// primary identity tracked"; the cache check degrades to
1263    /// always-miss.
1264    pub fn set_primary_identity(&mut self, rmw_name: &str, locator: &str) {
1265        self.primary_rmw_name.clear();
1266        let _ = self.primary_rmw_name.push_str(rmw_name);
1267        self.primary_locator.clear();
1268        let _ = self.primary_locator.push_str(locator);
1269    }
1270
1271    /// `TopicInfo`/`ServiceInfo` so the zenoh backend can declare liveliness.
1272    pub fn set_node_identity(&mut self, node_name: &str, namespace: &str) {
1273        self.node_name.clear();
1274        let _ = self.node_name.push_str(node_name);
1275        if !namespace.is_empty() {
1276            self.namespace.clear();
1277            let _ = self.namespace.push_str(namespace);
1278        }
1279    }
1280
1281    // =========================================================================
1282    // Phase 110.B — SchedContext API
1283    // =========================================================================
1284
1285    /// Identifier of the auto-created default `Fifo`-class scheduling
1286    /// context. Every callback registered without an explicit
1287    /// [`bind_handle_to_sched_context`] binds to this SC.
1288    pub fn default_sched_context_id(&self) -> super::sched_context::SchedContextId {
1289        super::sched_context::SchedContextId(0)
1290    }
1291
1292    /// Register a new scheduling context. Returns a [`SchedContextId`]
1293    /// callers pass to [`bind_handle_to_sched_context`] to attach
1294    /// callbacks. Phase 110.B.
1295    pub fn create_sched_context(
1296        &mut self,
1297        sc: super::sched_context::SchedContext,
1298    ) -> Result<super::sched_context::SchedContextId, NodeError> {
1299        // Slot 0 is reserved for the default Fifo SC; search 1..MAX_SC.
1300        for (i, slot) in self.sched_contexts.iter_mut().enumerate().skip(1) {
1301            if slot.is_none() {
1302                *slot = Some(sc);
1303                // Phase 110.E — Sporadic-class SCs get a sibling
1304                // `SporadicState` entry that the spin_once dispatch
1305                // path consults each cycle to refill the budget at
1306                // period boundaries and skip dispatch when budget
1307                // is exhausted.
1308                if matches!(sc.class, super::sched_context::SchedClass::Sporadic) {
1309                    let budget = sc.budget_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1310                    let period = sc.period_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1311                    self.sporadic_states[i] =
1312                        Some(super::sched_context::SporadicState::new(budget, period));
1313                }
1314                return Ok(super::sched_context::SchedContextId(i as u8));
1315            }
1316        }
1317        Err(NodeError::NoSchedContextSlot)
1318    }
1319
1320    /// Bind a registered callback to a scheduling context. The next
1321    /// `spin_once` cycle dispatches the callback through that SC's
1322    /// queue (FIFO bitmap or EDF heap). Phase 110.B.
1323    pub fn bind_handle_to_sched_context(
1324        &mut self,
1325        handle: HandleId,
1326        sc_id: super::sched_context::SchedContextId,
1327    ) -> Result<(), NodeError> {
1328        let i = handle.0;
1329        if i >= crate::config::MAX_CBS {
1330            return Err(NodeError::InvalidSchedContextBinding);
1331        }
1332        if self.entries[i].is_none() {
1333            return Err(NodeError::InvalidSchedContextBinding);
1334        }
1335        let sc_idx = sc_id.0 as usize;
1336        if sc_idx >= crate::config::MAX_SC || self.sched_contexts[sc_idx].is_none() {
1337            return Err(NodeError::InvalidSchedContextBinding);
1338        }
1339        self.sched_context_bindings[i] = sc_id;
1340        Ok(())
1341    }
1342
1343    /// Phase 110.F — opt in to per-callback OS-priority dispatch.
1344    /// Once registered, every `spin_once` cycle routes ready entries
1345    /// whose bound SC has `os_pri > 0` onto a worker thread the OS
1346    /// scheduler has elevated to that numeric priority. Workers are
1347    /// spawned lazily on first use and self-halt when the Executor
1348    /// drops.
1349    ///
1350    /// `apply_policy` is the same `fn(SchedPolicy) -> Result<(),
1351    /// SchedError>` shape `open_threaded` takes — keeps the
1352    /// Executor non-generic over Platform.
1353    ///
1354    /// Calling this with `apply_policy = noop` is fine for testing
1355    /// (workers spawn but don't actually elevate priority); real
1356    /// hard-RT use needs `CAP_SYS_NICE` on Linux or the equivalent
1357    /// kernel config on RTOSes.
1358    #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
1359    pub fn register_os_priority_dispatcher(
1360        &mut self,
1361        apply_policy: fn(
1362            nros_platform_api::SchedPolicy,
1363        ) -> Result<(), nros_platform_api::SchedError>,
1364    ) {
1365        self.os_priority_apply_policy = Some(apply_policy);
1366    }
1367
1368    /// Phase 110.G — enable time-triggered dispatch by setting the
1369    /// executor's major-frame length. Once set, every `spin_once`
1370    /// cycle gates dispatch through each entry's bound SC's
1371    /// `tt_window_offset_us` / `tt_window_duration_us` fields:
1372    /// dispatch only fires when the current monotonic time falls
1373    /// inside the window `[off, off + duration) mod major_frame`.
1374    ///
1375    /// `major_frame_us = 0` disables the TT gate (default state).
1376    /// Setting a non-zero major frame after callbacks are already
1377    /// registered is allowed — TT gates take effect on the next
1378    /// `spin_once` cycle.
1379    pub fn register_time_triggered_dispatcher(&mut self, major_frame_us: u32) {
1380        self.major_frame_us = major_frame_us;
1381    }
1382
1383    /// Phase 110.G — apply a declarative cyclic schedule.
1384    ///
1385    /// One-shot helper that wraps the underlying primitives:
1386    /// validates the schedule (`major_frame > 0`, no overlapping
1387    /// windows, every window fits inside the major frame), sets the
1388    /// executor's major-frame length, then materialises one
1389    /// `SchedContext` per window with `class = TimeTriggered` +
1390    /// the window's offset / duration. Returns the per-window
1391    /// [`SchedContextId`] array so callers can immediately
1392    /// `bind_handle_to_sched_context(handle, sc_id)` for their
1393    /// subscription / timer handles.
1394    ///
1395    /// `N` is the schedule's *declared* maximum window count;
1396    /// `schedule.window_count` gates how many SCs are actually
1397    /// created. Unused trailing slots return
1398    /// `SchedContextId::default()` (sentinel — callers must respect
1399    /// `window_count`).
1400    pub fn apply_time_triggered_schedule<const N: usize>(
1401        &mut self,
1402        schedule: &super::sched_context::TimeTriggeredSchedule<N>,
1403    ) -> Result<
1404        [super::sched_context::SchedContextId; N],
1405        super::sched_context::TimeTriggeredScheduleError,
1406    > {
1407        schedule.validate()?;
1408        self.major_frame_us = schedule.major_frame_us;
1409        // SC slot 0 is the auto-created default; reusing it as a
1410        // sentinel for unused trailing slots is safe because the
1411        // caller respects `schedule.window_count`.
1412        let mut ids: [super::sched_context::SchedContextId; N] =
1413            [super::sched_context::SchedContextId(0); N];
1414        for (i, window) in schedule.windows[..schedule.window_count].iter().enumerate() {
1415            // Deprecation note on `SchedClass::TimeTriggered`: TT
1416            // is implemented as a per-SC *window gate* on top of
1417            // the existing class-based dispatch (Fifo here keeps
1418            // the EDF / Sporadic budgets out of the picture for
1419            // pure cyclic schedules). The window-gate fields set
1420            // below are what `spin_once`'s 110.G runtime gate
1421            // actually reads.
1422            let sc = super::sched_context::SchedContext {
1423                tt_window_offset_us: super::sched_context::OptUs::from_us(window.offset_us),
1424                tt_window_duration_us: super::sched_context::OptUs::from_us(window.duration_us),
1425                ..super::sched_context::SchedContext::new_fifo()
1426            };
1427            ids[i] = self.create_sched_context(sc).map_err(|_| {
1428                super::sched_context::TimeTriggeredScheduleError::WindowCountOverflow
1429            })?;
1430        }
1431        Ok(ids)
1432    }
1433
1434    /// Phase 110.E.b — register an ISR-driven refill timer for an
1435    /// already-created Sporadic SC. The caller invokes their
1436    /// platform's `PlatformTimer::create_periodic` with the returned
1437    /// `Arc<AtomicSporadicState>` as `user_data` and the
1438    /// `atomic_sporadic_refill_thunk` as the callback, then hands
1439    /// the resulting platform handle to this method via
1440    /// `OpaqueTimerHandle::new(handle, destroy_fn)`.
1441    ///
1442    /// The Executor stores both the Arc and the handle so Drop can
1443    /// clean them up. Calling this on a non-Sporadic SC returns
1444    /// `Err(InvalidSchedContextBinding)`.
1445    #[cfg(feature = "alloc")]
1446    pub fn register_sporadic_timer(
1447        &mut self,
1448        sc_id: super::sched_context::SchedContextId,
1449        timer: OpaqueTimerHandle,
1450    ) -> Result<portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>, NodeError>
1451    {
1452        let i = sc_id.0 as usize;
1453        if i >= crate::config::MAX_SC {
1454            return Err(NodeError::InvalidSchedContextBinding);
1455        }
1456        let sc = self.sched_contexts[i]
1457            .as_ref()
1458            .ok_or(NodeError::InvalidSchedContextBinding)?;
1459        if !matches!(sc.class, super::sched_context::SchedClass::Sporadic) {
1460            return Err(NodeError::InvalidSchedContextBinding);
1461        }
1462        let budget = sc.budget_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1463        let period = sc.period_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX);
1464        let state = portable_atomic_util::Arc::new(super::sched_context::AtomicSporadicState::new(
1465            budget, period,
1466        ));
1467        self.sporadic_atomic_states[i] = Some((portable_atomic_util::Arc::clone(&state), timer));
1468        Ok(state)
1469    }
1470
1471    /// Inspect a registered scheduling context. Phase 110.B.
1472    pub fn sched_context(
1473        &self,
1474        sc_id: super::sched_context::SchedContextId,
1475    ) -> Option<&super::sched_context::SchedContext> {
1476        self.sched_contexts.get(sc_id.0 as usize)?.as_ref()
1477    }
1478
1479    /// Phase 104.C.2 — start a rclcpp-style Node builder for this
1480    /// Executor. The returned [`NodeBuilder`](super::node_record::NodeBuilder)
1481    /// is chainable:
1482    ///
1483    /// ```ignore
1484    /// let id = exec.node_builder("ingress")
1485    ///     .rmw("zenoh")
1486    ///     .locator("tcp/127.0.0.1:7447")
1487    ///     .sched(my_sc_id)
1488    ///     .build()?;
1489    /// ```
1490    ///
1491    /// In Phase 104.C.2 the Node table is storage-only — all
1492    /// registered Nodes share the Executor's primary session. Per-
1493    /// Node session binding (the bridge feature) lands in Phase
1494    /// 104.C.3 when the session cache is wired.
1495    pub fn node_builder<'a, 'cfg>(
1496        &'a mut self,
1497        name: &'cfg str,
1498    ) -> super::node_record::NodeBuilder<'a, 'cfg> {
1499        super::node_record::NodeBuilder {
1500            executor: self,
1501            name,
1502            namespace: None,
1503            rmw_name: None,
1504            locator: None,
1505            domain_id: None,
1506            sched: None,
1507            session_idx: None,
1508        }
1509    }
1510
1511    /// Return the Node table — Phase 104.C.2 read accessor.
1512    pub fn nodes(&self) -> &[super::node_record::NodeRecord] {
1513        &self.nodes
1514    }
1515
1516    /// Borrow a Node's metadata by id, returning `None` if the id
1517    /// is out of range.
1518    pub fn node(&self, id: super::node_record::NodeId) -> Option<&super::node_record::NodeRecord> {
1519        self.nodes.get(id.index())
1520    }
1521
1522    /// Phase 189.M1 — an executor-borrowing node handle for the entity builders
1523    /// (`exec.node_mut(id).subscription(t)...` / `.create_subscription(...)`).
1524    /// A short-lived `&mut Executor` borrow — use one at a time; entity handles
1525    /// are owned and outlive it (see `NodeCtx`).
1526    pub fn node_mut(&mut self, id: super::node_record::NodeId) -> super::node::NodeCtx<'_> {
1527        super::node::NodeCtx::new(self, id)
1528    }
1529
1530    /// Phase 104.C.3 — resolve a session-slot index to a mutable
1531    /// session reference. Slot 0 = the Executor's primary session;
1532    /// slots 1..=N = the `extra_sessions` vec opened by
1533    /// `node_builder.rmw(name)` calls that named a backend
1534    /// different from the primary.
1535    pub(crate) fn session_at_mut(&mut self, idx: u8) -> Option<&mut session::ConcreteSession> {
1536        if idx == 0 {
1537            Some(&mut *self.session)
1538        } else {
1539            self.extra_sessions.get_mut((idx - 1) as usize)
1540        }
1541    }
1542
1543    /// Phase 104.C.9.b — resolve the per-Node session for direct
1544    /// entity creation paths (C++ FFI publisher / subscription /
1545    /// service that bypass the `register_*_on` arena dispatch).
1546    /// Returns `None` when `node_id` is out of range or the Node's
1547    /// `session_idx` lands outside the executor's session table.
1548    pub fn node_session_mut(
1549        &mut self,
1550        node_id: super::node_record::NodeId,
1551    ) -> Option<&mut session::ConcreteSession> {
1552        let session_idx = self.nodes.get(node_id.index())?.session_idx;
1553        self.session_at_mut(session_idx)
1554    }
1555
1556    /// Phase 189.M1 — create a typed publisher bound to a node's session.
1557    /// Backs `node.publisher(t).typed::<M>().build()` on the
1558    /// executor-borrowing [`NodeCtx`](super::node::NodeCtx); the returned
1559    /// handle is owned and outlives the `NodeCtx`.
1560    pub fn create_publisher_on<M: crate::cyclonedds_register::MessageForRmw>(
1561        &mut self,
1562        node_id: super::node_record::NodeId,
1563        topic_name: &str,
1564        qos: QosSettings,
1565    ) -> Result<crate::executor::handles::EmbeddedPublisher<M>, NodeError> {
1566        // Phase 212.K.7.6.b — register `M`'s cyclonedds descriptor before
1567        // creating the underlying publisher handle. No-op for other RMWs.
1568        crate::cyclonedds_register::register_type::<M>()?;
1569        let handle = self.create_raw_publisher_handle_on(
1570            node_id,
1571            topic_name,
1572            <M as RosMessage>::TYPE_NAME,
1573            <M as RosMessage>::TYPE_HASH,
1574            qos,
1575        )?;
1576        Ok(crate::executor::handles::EmbeddedPublisher {
1577            handle,
1578            event_regs: crate::executor::handles::empty_event_regs(),
1579            _phantom: PhantomData,
1580        })
1581    }
1582
1583    /// Phase 189.M1 — create a generic (type-erased) publisher bound to a
1584    /// node's session. Backs `node.publisher(t).generic(ty, hash).build()`;
1585    /// the bridge re-publishes through this handle on the dest session.
1586    pub fn create_publisher_raw_on(
1587        &mut self,
1588        node_id: super::node_record::NodeId,
1589        topic_name: &str,
1590        type_name: &str,
1591        type_hash: &str,
1592        qos: QosSettings,
1593    ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1594        let handle =
1595            self.create_raw_publisher_handle_on(node_id, topic_name, type_name, type_hash, qos)?;
1596        Ok(crate::executor::handles::EmbeddedRawPublisher {
1597            handle,
1598            arena: crate::executor::handles::TxArena::new(),
1599            event_regs: crate::executor::handles::empty_event_regs(),
1600        })
1601    }
1602
1603    /// Shared prelude for the publisher-on-node paths: resolve the node's
1604    /// identity + session slot, build the [`TopicInfo`], validate QoS, and
1605    /// create the backend publisher handle. Mirrors
1606    /// `register_subscription_buffered_raw_on`'s session resolution so a
1607    /// bridge's source sub + dest pub agree on topic construction.
1608    fn create_raw_publisher_handle_on(
1609        &mut self,
1610        node_id: super::node_record::NodeId,
1611        topic_name: &str,
1612        type_name: &str,
1613        type_hash: &str,
1614        qos: QosSettings,
1615    ) -> Result<session::RmwPublisher, NodeError> {
1616        let (node_name, ns, session_idx) = {
1617            let r = self
1618                .nodes
1619                .get(node_id.index())
1620                .ok_or(NodeError::InvalidSchedContextBinding)?;
1621            (r.name.clone(), r.namespace.clone(), r.session_idx)
1622        };
1623        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
1624        if !node_name.is_empty() {
1625            topic = topic.with_node_name(&node_name);
1626        }
1627        let session = self
1628            .session_at_mut(session_idx)
1629            .ok_or(NodeError::BackendMismatch)?;
1630        qos.validate_against(Session::supported_qos_policies(session))
1631            .map_err(NodeError::Transport)?;
1632        session
1633            .create_publisher(&topic, qos)
1634            .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))
1635    }
1636
1637    /// Phase 124.B.1 — install the executor's wake callback onto the
1638    /// primary session. Best-effort: backends that don't override
1639    /// `Session::set_wake_callback` (poll-only XRCE, bare-metal)
1640    /// ignore the call and continue to be drained on the executor's
1641    /// deadline-bound cv-wait boundary.
1642    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1643    fn install_wake_signal_on_primary(&mut self) {
1644        use nros_rmw::Session as _;
1645        let ctx = self.wake_ctx_ptr();
1646        // SAFETY: `ctx` points at executor-owned wake state that outlives
1647        // the session callback installation and is cleared on executor drop.
1648        unsafe {
1649            self.session
1650                .set_wake_callback(Some(nros_rmw_runtime_wake_cb), ctx);
1651        }
1652        if self.session.supports_wake_callback() {
1653            self.has_async_wake = true;
1654        }
1655    }
1656
1657    /// Phase 124.B.1 — install the wake callback onto an extra
1658    /// session opened by `node_builder.rmw(...)`. Called from
1659    /// `NodeBuilder::build()` right after `extra_sessions.push(...)`.
1660    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1661    pub(crate) fn install_wake_signal_on_extra(&mut self, idx: usize) {
1662        use nros_rmw::Session as _;
1663        let ctx = self.wake_ctx_ptr();
1664        if let Some(s) = self.extra_sessions.get_mut(idx) {
1665            // SAFETY: same executor-owned wake state as the primary session;
1666            // the extra session is owned by this executor.
1667            unsafe {
1668                s.set_wake_callback(Some(nros_rmw_runtime_wake_cb), ctx);
1669            }
1670            if s.supports_wake_callback() {
1671                self.has_async_wake = true;
1672            }
1673        }
1674    }
1675
1676    /// Phase 124.B.2 — opaque context pointer the runtime wake
1677    /// callback receives. Encodes `(flag, mu, cv)` as a borrowed
1678    /// `&WakeCtx` reference; the callback decodes via
1679    /// `*const WakeCtx`.
1680    ///
1681    /// Lifetime: tied to the Executor instance. WakeCtx storage
1682    /// lives inside Executor (lazy-allocated on first install), so
1683    /// the pointer stays valid as long as the Executor is.
1684    /// Phase 124.B.7.c — POSIX signal-handler-safe wake fd.
1685    ///
1686    /// Returns a Linux `eventfd` that callers (typically POSIX
1687    /// signal handlers) can `write(fd, &1u64, 8)` to from any
1688    /// context, including signal handlers. A runtime-owned worker
1689    /// thread reads the fd and signals `wake_cv`, unblocking
1690    /// `spin_once`.
1691    ///
1692    /// The worker thread is spawned lazily on first call and
1693    /// joined on Executor drop. Linux-only and gated behind
1694    /// `feature = "signal-fd-wake"`; binaries that don't install
1695    /// signal handlers shouldn't enable it.
1696    ///
1697    /// Returns the raw fd. The Executor retains ownership; do not
1698    /// `close()` it from the caller.
1699    #[cfg(all(feature = "signal-fd-wake", feature = "rmw-cffi", target_os = "linux"))]
1700    pub fn signal_fd(&mut self) -> std::io::Result<core::ffi::c_int> {
1701        let ctx_ptr = self.wake_ctx_ptr() as *const WakeCtx;
1702        if self.signal_fd.is_none() {
1703            self.signal_fd = Some(WakeSignalFd::new(ctx_ptr)?);
1704        }
1705        Ok(self.signal_fd.as_ref().expect("just set").fd())
1706    }
1707
1708    #[cfg(all(feature = "std", feature = "rmw-cffi"))]
1709    fn wake_ctx_ptr(&mut self) -> *mut core::ffi::c_void {
1710        if self.wake_ctx.is_none() {
1711            self.wake_ctx = Some(std::sync::Arc::new(WakeCtx {
1712                flag: std::sync::Arc::clone(&self.wake_flag),
1713                cv: std::sync::Arc::clone(&self.wake_cv),
1714                mu: std::sync::Arc::clone(&self.wake_mu),
1715                node_wake: self.node_wake.as_ref().map(std::sync::Arc::clone),
1716            }));
1717        }
1718        let arc = self.wake_ctx.as_ref().expect("just set");
1719        std::sync::Arc::as_ptr(arc) as *mut core::ffi::c_void
1720    }
1721
1722    // Phase 141.A.3 — alloc-mode (no_std RTOS) mirror of
1723    // `install_wake_signal_on_primary` /
1724    // `install_wake_signal_on_extra` / `wake_ctx_ptr`. Same
1725    // best-effort install contract: backends that don't override
1726    // `Session::set_wake_callback` ignore the call and continue
1727    // to be drained on the executor's deadline-bound wait.
1728    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1729    fn wake_ctx_alloc_ptr(&mut self) -> Option<*mut core::ffi::c_void> {
1730        // Without a NodeWake there's no kernel primitive to signal;
1731        // skip the install + let spin_once fall back to drive_io
1732        // for the full timeout. Mirrors the std-RTOS path's
1733        // `if let Some(wake) = self.node_wake.as_ref()` predicate.
1734        let node_wake = self.node_wake_alloc.as_ref()?;
1735        if self.wake_ctx_alloc.is_none() {
1736            self.wake_ctx_alloc = Some(portable_atomic_util::Arc::new(
1737                super::wake_alloc::WakeCtxAlloc {
1738                    flag: portable_atomic_util::Arc::clone(&self.wake_flag_alloc),
1739                    node_wake: portable_atomic_util::Arc::clone(node_wake),
1740                },
1741            ));
1742        }
1743        let arc = self.wake_ctx_alloc.as_ref().expect("just set");
1744        Some(portable_atomic_util::Arc::as_ptr(arc) as *mut core::ffi::c_void)
1745    }
1746
1747    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1748    fn install_wake_signal_on_primary_alloc(&mut self) {
1749        use nros_rmw::Session as _;
1750        let Some(ctx) = self.wake_ctx_alloc_ptr() else {
1751            return;
1752        };
1753        // SAFETY: `ctx` is the raw pointer of an Arc<WakeCtxAlloc>
1754        // owned by the Executor (`self.wake_ctx_alloc`); the Arc
1755        // lives as long as the Executor and is cleared on drop.
1756        unsafe {
1757            self.session
1758                .set_wake_callback(Some(super::wake_alloc::nros_rmw_runtime_wake_cb), ctx);
1759        }
1760        if self.session.supports_wake_callback() {
1761            self.has_async_wake_alloc = true;
1762        }
1763    }
1764
1765    #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
1766    pub(crate) fn install_wake_signal_on_extra_alloc(&mut self, idx: usize) {
1767        use nros_rmw::Session as _;
1768        let Some(ctx) = self.wake_ctx_alloc_ptr() else {
1769            return;
1770        };
1771        if let Some(s) = self.extra_sessions.get_mut(idx) {
1772            // SAFETY: same wake state as the primary session; the
1773            // extra session is owned by this Executor.
1774            unsafe {
1775                s.set_wake_callback(Some(super::wake_alloc::nros_rmw_runtime_wake_cb), ctx);
1776            }
1777            if s.supports_wake_callback() {
1778                self.has_async_wake_alloc = true;
1779            }
1780        }
1781    }
1782
1783    /// Phase 104.C.4 — apply a Node's default SchedContext to a
1784    /// freshly-registered handle. Called from every `_inner`
1785    /// register variant after the entry slot is committed. No-op
1786    /// when `node_id` is None (legacy path), when the Node is
1787    /// out of range, or when the Node's `default_sched` is the
1788    /// auto-created Fifo slot (0) which matches the executor's
1789    /// default binding already.
1790    ///
1791    /// Handles can still override per-call via
1792    /// `bind_handle_to_sched_context(handle, sc_id)` post-register.
1793    pub(crate) fn apply_node_default_sched(
1794        &mut self,
1795        slot: usize,
1796        node_id: Option<super::node_record::NodeId>,
1797    ) {
1798        let Some(id) = node_id else { return };
1799        let Some(rec) = self.nodes.get(id.index()) else {
1800            return;
1801        };
1802        let sc = rec.default_sched;
1803        if sc.0 == 0 {
1804            return;
1805        }
1806        if slot >= crate::config::MAX_CBS {
1807            return;
1808        }
1809        let sc_idx = sc.0 as usize;
1810        if sc_idx >= crate::config::MAX_SC || self.sched_contexts[sc_idx].is_none() {
1811            return;
1812        }
1813        self.sched_context_bindings[slot] = sc;
1814    }
1815
1816    /// Phase 104.C.3.2 — scoped Node-handle access. The closure
1817    /// receives a [`Node`] bound to the requested [`NodeId`]'s
1818    /// session + identity. Use the standard `Node::create_publisher`,
1819    /// `create_subscription`, etc. APIs inside.
1820    ///
1821    /// rclcpp-aligned bridge pattern:
1822    ///
1823    /// ```ignore
1824    /// let node_in = exec.node_builder("ingress").rmw("zenoh").build()?;
1825    /// let node_out = exec.node_builder("egress").rmw("xrce").build()?;
1826    ///
1827    /// let pub_out = exec.with_node(node_out, |n| {
1828    ///     n.create_publisher::<Int32>("/fwd")
1829    /// })??;
1830    ///
1831    /// exec.with_node(node_in, |n| {
1832    ///     n.create_subscription_buffered::<Int32, _, 1024>(
1833    ///         "/src", qos(), move |m| { let _ = pub_out.publish(m); }
1834    ///     )
1835    /// })??;
1836    /// ```
1837    ///
1838    /// The closure can return any type; double-`?` unwraps the
1839    /// outer `Result<R, NodeError>` from `with_node` and the inner
1840    /// result returned by the closure.
1841    /// Phase 104.C.3.3.d — flat-Result variant of
1842    /// [`with_node`](Self::with_node). When the closure already
1843    /// returns `Result<R, NodeError>`, this avoids the double-`?`:
1844    ///
1845    /// ```ignore
1846    /// // Without `with_node_try`:
1847    /// let pub_ = exec.with_node(id, |n| n.create_publisher(...))??;
1848    ///
1849    /// // With `with_node_try`:
1850    /// let pub_ = exec.with_node_try(id, |n| n.create_publisher(...))?;
1851    /// ```
1852    pub fn with_node_try<R>(
1853        &mut self,
1854        id: super::node_record::NodeId,
1855        f: impl FnOnce(&mut NodeHandle<'_>) -> Result<R, NodeError>,
1856    ) -> Result<R, NodeError> {
1857        self.with_node(id, f)?
1858    }
1859
1860    pub fn with_node<R>(
1861        &mut self,
1862        id: super::node_record::NodeId,
1863        f: impl FnOnce(&mut NodeHandle<'_>) -> R,
1864    ) -> Result<R, NodeError> {
1865        let (name, ns, session_idx) = {
1866            let r = self
1867                .nodes
1868                .get(id.index())
1869                .ok_or(NodeError::InvalidSchedContextBinding)?;
1870            (r.name.clone(), r.namespace.clone(), r.session_idx)
1871        };
1872        let session = self
1873            .session_at_mut(session_idx)
1874            .ok_or(NodeError::BackendMismatch)?;
1875        // SAFETY: short-lived scoped reference. `Node::new` takes
1876        // `&mut ConcreteSession`; lifetime is bound to this fn's
1877        // body via the closure's borrow of `node`.
1878        let mut node = NodeHandle::new(name, ns, session, 0);
1879        Ok(f(&mut node))
1880    }
1881
1882    /// Find a registered executor node by final name and namespace.
1883    pub fn node_id_by_name(
1884        &self,
1885        name: &str,
1886        namespace: &str,
1887    ) -> Option<super::node_record::NodeId> {
1888        self.nodes
1889            .iter()
1890            .enumerate()
1891            .find(|(_, node)| node.name.as_str() == name && node.namespace.as_str() == namespace)
1892            .map(|(index, _)| super::node_record::NodeId::from_raw(index as u8))
1893    }
1894
1895    /// Create a node on this executor.
1896    pub fn create_node(&mut self, name: &str) -> Result<NodeHandle<'_>, NodeError> {
1897        if name.len() > 64 {
1898            return Err(NodeError::NameTooLong);
1899        }
1900
1901        let mut node_name = heapless::String::<64>::new();
1902        node_name
1903            .push_str(name)
1904            .map_err(|_| NodeError::NameTooLong)?;
1905
1906        Ok(NodeHandle::new(
1907            node_name,
1908            self.namespace.clone(),
1909            &mut self.session,
1910            0,
1911        ))
1912    }
1913
1914    /// Phase 128.F.2 — bridge-mode node factory. Registers a Node
1915    /// bound to the named RMW backend by opening (or reusing) an
1916    /// extra session via `node_builder().rmw(rmw).build()`, then
1917    /// returns a [`Node`] borrowing that session. Use when the
1918    /// binary intentionally links more than one backend and a Node
1919    /// must speak a specific one.
1920    ///
1921    /// The single-backend common case should keep using
1922    /// [`create_node`](Self::create_node) — this entry costs an
1923    /// extra session lookup and serves no purpose when only one
1924    /// backend is registered.
1925    #[cfg(feature = "rmw-cffi")]
1926    pub fn create_node_on(&mut self, name: &str, rmw: &str) -> Result<NodeHandle<'_>, NodeError> {
1927        if name.len() > 64 {
1928            return Err(NodeError::NameTooLong);
1929        }
1930        // Register the Node (opens an extra session under `rmw` if
1931        // none exists yet for that backend).
1932        let id = self.node_builder(name).rmw(rmw).build()?;
1933        let session_idx = self.node(id).ok_or(NodeError::NodeTableFull)?.session_idx;
1934
1935        let mut node_name = heapless::String::<64>::new();
1936        node_name
1937            .push_str(name)
1938            .map_err(|_| NodeError::NameTooLong)?;
1939        let namespace = self.namespace.clone();
1940        let session = self
1941            .session_at_mut(session_idx)
1942            .ok_or(NodeError::NodeTableFull)?;
1943        Ok(NodeHandle::new(node_name, namespace, session, 0))
1944    }
1945
1946    /// Drive transport I/O (poll network, dispatch callbacks).
1947    #[allow(dead_code)]
1948    pub(crate) fn drive_io(&mut self, timeout_ms: i32) -> Result<(), NodeError> {
1949        self.session
1950            .drive_io(timeout_ms)
1951            .map_err(|_| NodeError::Transport(TransportError::PollFailed))
1952    }
1953
1954    /// Close the underlying session.
1955    pub fn close(&mut self) -> Result<(), NodeError> {
1956        self.session
1957            .close()
1958            .map_err(|_| NodeError::Transport(TransportError::ConnectionFailed))
1959    }
1960
1961    /// Phase 216 follow-up — register a per-Node dispatch trampoline.
1962    ///
1963    /// The board-side Entry pkg (or the macro-emitted
1964    /// `register_dispatch(executor)` wrapper, once wired) calls this
1965    /// once per deployed Node pkg, handing in the
1966    /// `__nros_node_<pkg>_on_callback` symbol + the Node's per-pkg
1967    /// `state` blob. [`Executor::dispatch_callback`] then linear-scans
1968    /// the registered slots when the dispatch task hands off a
1969    /// `SignaledCallback`.
1970    ///
1971    /// Returns `Err(())` when the registry is full (`MAX_NODES`
1972    /// entries — raise via `NROS_EXECUTOR_MAX_NODES` at build time).
1973    ///
1974    /// # Safety
1975    ///
1976    /// `state` must outlive the executor (the typical shape is a
1977    /// `*mut State` produced by
1978    /// `nros::__private_node_state_into_raw` from the
1979    /// macro-emitted `i()`; that pointer's lifetime IS the
1980    /// `Executor`'s by construction). `on_callback` must be safe to
1981    /// invoke with `(state, cb_id_ptr, cb_id_len, ctx)` matching the
1982    /// per-Node `__nros_node_<pkg>_on_callback` ABI emitted by the
1983    /// `nros::node!()` macro (Phase 216.A.5).
1984    #[allow(clippy::result_unit_err)]
1985    pub fn register_dispatch_slot(
1986        &mut self,
1987        state: *mut core::ffi::c_void,
1988        on_callback: unsafe extern "C" fn(
1989            *mut core::ffi::c_void,
1990            *const u8,
1991            usize,
1992            *mut core::ffi::c_void,
1993        ),
1994    ) -> Result<(), ()> {
1995        self.dispatch_slots
1996            .push(DispatchSlot { state, on_callback })
1997            .map_err(|_| ())
1998    }
1999
2000    /// Phase 216 follow-up — current registered dispatch-slot count.
2001    /// Diagnostic / test surface.
2002    pub fn dispatch_slot_count(&self) -> usize {
2003        self.dispatch_slots.len()
2004    }
2005
2006    /// Phase 258 (Track 2, 2a) — enroll a component into the executor-owned
2007    /// tick registry. Called by `nros`'s `install`/`register_node_borrowed`
2008    /// after it builds the `Arc<ComponentCell>`: `state` is the leaked
2009    /// `Arc<ComponentCell>` (the slot takes ownership), `tick`/`drop` are the
2010    /// `nros`-side trampolines (see [`ComponentSlot`]). The slot's `tick`
2011    /// runs at the tail of every [`spin_once`](Self::spin_once); its `drop`
2012    /// runs once on `Executor::drop`.
2013    ///
2014    /// Returns `Err(())` when the registry is full (`MAX_NODES` — raise via
2015    /// `NROS_EXECUTOR_MAX_NODES` at build time). On error the caller still
2016    /// owns `state` (the slot was not stored) and must drop it.
2017    ///
2018    /// # Safety
2019    /// `state` must be a `*mut` produced by leaking the component cell the
2020    /// `tick`/`drop` trampolines expect (an `Arc<ComponentCell>` via
2021    /// `Arc::into_raw` in the canonical `nros` caller), and must remain valid
2022    /// until the matching `drop` runs. `tick` must be safe to invoke with
2023    /// `(state, exec_ctx = *mut Executor)` each spin; `drop` must be safe to
2024    /// invoke exactly once with `state`.
2025    #[allow(clippy::result_unit_err)]
2026    pub unsafe fn enroll_component(
2027        &mut self,
2028        state: *mut core::ffi::c_void,
2029        tick: unsafe extern "C" fn(*mut core::ffi::c_void, *mut core::ffi::c_void),
2030        drop: unsafe extern "C" fn(*mut core::ffi::c_void),
2031    ) -> Result<(), ()> {
2032        self.component_slots
2033            .push(ComponentSlot { state, tick, drop })
2034            .map_err(|_| ())
2035    }
2036
2037    /// Phase 258 (Track 2, 2a) — current enrolled component-slot count.
2038    /// Diagnostic / test surface.
2039    pub fn component_slot_count(&self) -> usize {
2040        self.component_slots.len()
2041    }
2042
2043    /// Phase 216 final dispatch hook — stable entry point the
2044    /// framework's dispatch task (RTIC `__nros_run` /
2045    /// Embassy `__nros_run_task`) calls for each `SignaledCallback`
2046    /// envelope it dequeues from the board-side SPSC / Embassy
2047    /// channel.
2048    ///
2049    /// ## Signature shape
2050    ///
2051    /// `nros-node` sits below `nros` in the dep graph, so the typed
2052    /// `nros::CallbackId<'_>` / `nros::CallbackCtx<'_>` types
2053    /// referenced in the Phase 216 design notes cannot appear in the
2054    /// signature here. The macro emit translates the dequeued
2055    /// envelope to the layer-clean `(cb_id: &str, ctx: *mut c_void)`
2056    /// pair before calling this method; the per-Node `on_callback`
2057    /// trampoline ABI (Phase 216.A.5,
2058    /// `__nros_node_<pkg>_on_callback(state, cb_id_ptr, cb_id_len,
2059    /// ctx)`) uses the same untyped shape on the other side of the
2060    /// fence, so the round-trip stays type-consistent.
2061    ///
2062    /// ## Body — linear scan of the dispatch registry
2063    ///
2064    /// Each registered [`DispatchSlot`] holds an
2065    /// `__nros_node_<pkg>_on_callback` fn pointer + the owning Node's
2066    /// `state` blob. The macro-emitted trampoline body
2067    /// `match`es on `CallbackId` tags the Node declared and is a
2068    /// no-op for non-matching `cb_id`s — at most one Node per
2069    /// `cb_id` actually acts, the rest are cheap string-compare
2070    /// no-ops. This mirrors the strategy
2071    /// `ExecutorNodeRuntime::dispatch_callback` uses in
2072    /// `packages/core/nros/src/node_runtime.rs:470`.
2073    ///
2074    /// ## What's NOT auto-wired today
2075    ///
2076    /// The `nros::node!()` macro doesn't yet emit a
2077    /// `register_dispatch(executor)` wrapper that pushes the per-pkg
2078    /// `(state, on_callback)` into this registry. Until that wiring
2079    /// lands (Phase 216 follow-up — see commit msg), downstream
2080    /// consumers (board's `init_hardware`, or the codegen-emitted
2081    /// `run_plan`) must call
2082    /// [`Executor::register_dispatch_slot`] explicitly with the
2083    /// `__nros_node_<pkg>_on_callback` symbol + a `state` blob from
2084    /// the macro-emitted `i()`.
2085    //
2086    // `ctx` is an opaque FFI cookie forwarded verbatim to each slot's
2087    // `on_callback`; this fn never dereferences it (the registered callback
2088    // does, under the `register_dispatch_slot` safety contract), so it is sound
2089    // to call from safe code.
2090    #[allow(clippy::not_unsafe_ptr_arg_deref)]
2091    pub fn dispatch_callback(&mut self, cb_id: &str, ctx: *mut core::ffi::c_void) {
2092        let cb_id_ptr = cb_id.as_ptr();
2093        let cb_id_len = cb_id.len();
2094        // Snapshot pointer + length to avoid an outstanding borrow
2095        // across the unsafe fn calls below; each `DispatchSlot` is
2096        // `Copy`, so iterating by value sidesteps any aliasing
2097        // worry the borrow checker would flag if a slot's
2098        // `on_callback` re-entered the executor.
2099        for slot in self.dispatch_slots.iter().copied() {
2100            // SAFETY: caller of `register_dispatch_slot` guaranteed
2101            // `state` outlives the executor + `on_callback` matches
2102            // the per-Node `__nros_node_<pkg>_on_callback` ABI;
2103            // `cb_id_ptr`/`cb_id_len` describe the live `&str` the
2104            // caller passed in.
2105            unsafe {
2106                (slot.on_callback)(slot.state, cb_id_ptr, cb_id_len, ctx);
2107            }
2108        }
2109    }
2110
2111    /// Get a reference to the underlying session.
2112    pub fn session(&self) -> &session::ConcreteSession {
2113        &self.session
2114    }
2115
2116    /// Get a mutable reference to the underlying session.
2117    pub fn session_mut(&mut self) -> &mut session::ConcreteSession {
2118        &mut self.session
2119    }
2120
2121    /// Phase 124.F.3 — session-level connectivity probe. Wire-level
2122    /// round-trip "is the peer / agent / router still reachable?"
2123    /// — cheaper than the service-availability probe (no discovery
2124    /// state required).
2125    ///
2126    /// Returns `Ok(())` on reply within `timeout_ms`,
2127    /// `Err(NodeError::Transport(Timeout))` on no reply,
2128    /// `Err(NodeError::Transport(Unsupported))` when the active
2129    /// backend can't probe.
2130    ///
2131    /// Mirrors micro-ROS's `rmw_uros_ping_agent`. Useful for
2132    /// reconnect-on-link-loss patterns: bare-metal code can call
2133    /// `ping(100)` periodically and tear down / re-open the session
2134    /// on timeout.
2135    pub fn ping(&mut self, timeout_ms: i32) -> Result<(), NodeError> {
2136        use nros_rmw::Session;
2137        self.session
2138            .ping_session(timeout_ms)
2139            .map_err(NodeError::Transport)
2140    }
2141
2142    /// Get a mutable reference to an action client core in the arena by entry index.
2143    ///
2144    /// # Safety
2145    /// The caller must ensure that `entry_index` refers to an `ActionClientRawArenaEntry`.
2146    pub unsafe fn action_client_core_mut(
2147        &mut self,
2148        entry_index: usize,
2149    ) -> Option<&mut super::action_core::ActionClientCore> {
2150        let meta = self.entries.get(entry_index)?.as_ref()?;
2151        if !matches!(meta.kind, EntryKind::ActionClient) {
2152            return None;
2153        }
2154        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2155        unsafe {
2156            let entry_ptr = arena_ptr.add(meta.offset)
2157                as *mut super::arena::ActionClientRawArenaEntry<
2158                    { crate::config::DEFAULT_RX_BUF_SIZE },
2159                    { crate::config::DEFAULT_RX_BUF_SIZE },
2160                    { crate::config::DEFAULT_RX_BUF_SIZE },
2161                >;
2162            Some(&mut (*entry_ptr).core)
2163        }
2164    }
2165
2166    /// Get a mutable reference to a service-client arena entry (Phase 82).
2167    ///
2168    /// Returns `None` if `entry_index` doesn't refer to a service client
2169    /// entry. The default reply buffer size is assumed because the C API
2170    /// always uses the default — the entry was registered via
2171    /// `register_service_client_raw_sized::<DEFAULT_RX_BUF_SIZE>`.
2172    ///
2173    /// # Safety
2174    /// `entry_index` must refer to a `ServiceClientRawArenaEntry`.
2175    pub unsafe fn service_client_entry_mut(
2176        &mut self,
2177        entry_index: usize,
2178    ) -> Option<&mut super::arena::ServiceClientRawArenaEntry<{ crate::config::DEFAULT_RX_BUF_SIZE }>>
2179    {
2180        let meta = self.entries.get(entry_index)?.as_ref()?;
2181        if !matches!(meta.kind, EntryKind::ServiceClient) {
2182            return None;
2183        }
2184        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2185        unsafe {
2186            let entry_ptr = arena_ptr.add(meta.offset)
2187                as *mut super::arena::ServiceClientRawArenaEntry<
2188                    { crate::config::DEFAULT_RX_BUF_SIZE },
2189                >;
2190            Some(&mut *entry_ptr)
2191        }
2192    }
2193
2194    /// Set the executor-level trigger condition.
2195    ///
2196    /// Controls which handles must be ready before `spin_once` dispatches
2197    /// callbacks. Defaults to [`Trigger::AnyReady`](crate::Trigger).
2198    pub fn set_trigger(&mut self, trigger: Trigger) {
2199        self.trigger = trigger;
2200    }
2201
2202    /// Set the executor data communication semantics.
2203    ///
2204    /// Choose between `Direct` (process in place) and `LET`
2205    /// (snapshot-then-process) semantics. See [`ExecutorSemantics`].
2206    pub fn set_semantics(&mut self, semantics: ExecutorSemantics) {
2207        self.semantics = semantics;
2208    }
2209
2210    /// Set the invocation mode for a specific handle.
2211    ///
2212    /// Controls whether the callback fires on every spin
2213    /// ([`Always`](InvocationMode::Always)) or only when new data
2214    /// arrives ([`OnNewData`](InvocationMode::OnNewData), the default).
2215    pub fn set_invocation(&mut self, id: HandleId, mode: InvocationMode) {
2216        if let Some(Some(meta)) = self.entries.get_mut(id.0) {
2217            meta.invocation = mode;
2218        }
2219    }
2220
2221    // ========================================================================
2222    // Arena-based callback registration
2223    // ========================================================================
2224
2225    /// Bump-allocate space for `T` in the arena. Returns the byte offset.
2226    pub(crate) fn arena_alloc<T>(&mut self) -> Result<usize, NodeError> {
2227        let align = core::mem::align_of::<T>();
2228        let size = core::mem::size_of::<T>();
2229        let aligned_offset = (self.arena_used + align - 1) & !(align - 1);
2230        let new_used = aligned_offset + size;
2231        if new_used > crate::config::ARENA_SIZE {
2232            return Err(NodeError::BufferTooSmall);
2233        }
2234        self.arena_used = new_used;
2235        Ok(aligned_offset)
2236    }
2237
2238    /// Bump-allocate space for `T` plus `trailing_bytes` extra bytes.
2239    ///
2240    /// Returns `(entry_offset, trailing_offset)`. The trailing region starts
2241    /// immediately after `T` (aligned to 8 bytes).
2242    pub(crate) fn arena_alloc_with_trailing<T>(
2243        &mut self,
2244        trailing_bytes: usize,
2245    ) -> Result<(usize, usize), NodeError> {
2246        let align = core::mem::align_of::<T>();
2247        let entry_size = core::mem::size_of::<T>();
2248        let entry_offset = self.arena_used.next_multiple_of(align);
2249        // Trailing region starts on an 8-byte (u64) boundary after the entry.
2250        let trailing_offset =
2251            (entry_offset + entry_size).next_multiple_of(core::mem::align_of::<u64>());
2252        let new_used = trailing_offset + trailing_bytes;
2253        if new_used > crate::config::ARENA_SIZE {
2254            return Err(NodeError::BufferTooSmall);
2255        }
2256        self.arena_used = new_used;
2257        Ok((entry_offset, trailing_offset))
2258    }
2259
2260    /// Find the next free entry slot index.
2261    pub(crate) fn next_entry_slot(&self) -> Result<usize, NodeError> {
2262        self.entries
2263            .iter()
2264            .position(|e| e.is_none())
2265            .ok_or(NodeError::BufferTooSmall)
2266    }
2267
2268    /// Typed buffered subscription core (the `node_mut(id).subscription(t)
2269    /// .typed::<M>()` builder lowers here). Routes the typed subscription
2270    /// through the [`NodeId`]'s session + identity (rclcpp `add_node` pattern).
2271    pub(crate) fn register_subscription_buffered_on<M, F, const RX_BUF: usize>(
2272        &mut self,
2273        node_id: super::node_record::NodeId,
2274        topic_name: &str,
2275        qos: QosSettings,
2276        callback: F,
2277    ) -> Result<HandleId, NodeError>
2278    where
2279        M: crate::cyclonedds_register::MessageForRmw + 'static,
2280        F: FnMut(&M) + 'static,
2281    {
2282        type Entry<M, F> = SubBufferedEntry<M, F>;
2283
2284        // Phase 212.K.7.6.b — see `create_publisher_on`.
2285        crate::cyclonedds_register::register_type::<M>()?;
2286
2287        let slot = self.next_entry_slot()?;
2288        let (node_name, ns, session_idx) = {
2289            let r = self
2290                .nodes
2291                .get(node_id.index())
2292                .ok_or(NodeError::InvalidSchedContextBinding)?;
2293            (r.name.clone(), r.namespace.clone(), r.session_idx)
2294        };
2295        let mut topic = TopicInfo::new(
2296            topic_name,
2297            <M as RosMessage>::TYPE_NAME,
2298            <M as RosMessage>::TYPE_HASH,
2299        )
2300        .with_namespace(&ns)
2301        // Phase 231 (RFC-0038) — hand the backend the receive-buffer size so it
2302        // can size-class its receive storage (zenoh-pico: small vs large).
2303        .with_rx_buffer_hint(RX_BUF);
2304        if !node_name.is_empty() {
2305            topic = topic.with_node_name(&node_name);
2306        }
2307        let handle = {
2308            let session = self
2309                .session_at_mut(session_idx)
2310                .ok_or(NodeError::BackendMismatch)?;
2311            session
2312                .create_subscriber(&topic, qos)
2313                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2314        };
2315
2316        // Phase 231 Wave 0.2 (RFC-0038) — in-place dispatch when the backend
2317        // advertises it: deserialize straight from the borrowed receive slot,
2318        // no arena buffer (copy #1 removed). Else the buffered path below.
2319        {
2320            use nros_rmw::Subscriber as _;
2321            if handle.supports_process_in_place() {
2322                let entry_offset = self.arena_alloc::<SubInplaceEntry<M, F>>()?;
2323                unsafe {
2324                    let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2325                    let entry_ptr = arena_ptr.add(entry_offset) as *mut SubInplaceEntry<M, F>;
2326                    core::ptr::write(
2327                        entry_ptr,
2328                        SubInplaceEntry {
2329                            handle,
2330                            callback,
2331                            _phantom: PhantomData,
2332                        },
2333                    );
2334                }
2335                self.entries[slot] = Some(CallbackMeta {
2336                    offset: entry_offset,
2337                    kind: EntryKind::Subscription,
2338                    try_process: sub_inplace_try_process::<M, F>,
2339                    has_data: sub_inplace_has_data::<M, F>,
2340                    pre_sample: no_pre_sample,
2341                    invocation: InvocationMode::OnNewData,
2342                    drop_fn: drop_entry::<SubInplaceEntry<M, F>>,
2343                });
2344                self.apply_node_default_sched(slot, Some(node_id));
2345                return Ok(HandleId(slot));
2346            }
2347        }
2348
2349        let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2350
2351        let (entry_offset, trailing_offset) =
2352            self.arena_alloc_with_trailing::<Entry<M, F>>(trailing_bytes)?;
2353
2354        let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2355
2356        let buffer = if qos.depth <= 1 {
2357            BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
2358        } else {
2359            BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
2360        };
2361
2362        unsafe {
2363            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2364            let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<M, F>;
2365            core::ptr::write(
2366                entry_ptr,
2367                Entry {
2368                    handle,
2369                    buffer,
2370                    callback,
2371                    _phantom: PhantomData,
2372                },
2373            );
2374        }
2375
2376        self.entries[slot] = Some(CallbackMeta {
2377            offset: entry_offset,
2378            kind: EntryKind::Subscription,
2379            try_process: sub_buffered_try_process::<M, F>,
2380            has_data: sub_buffered_has_data::<M, F>,
2381            pre_sample: no_pre_sample,
2382            invocation: InvocationMode::OnNewData,
2383            drop_fn: drop_entry::<Entry<M, F>>,
2384        });
2385        // Phase 104.C.4 — apply Node's default SchedContext.
2386        self.apply_node_default_sched(slot, Some(node_id));
2387        Ok(HandleId(slot))
2388    }
2389
2390    /// Generic (type-erased) buffered subscription core (the
2391    /// `node_mut(id).subscription(t).generic(ty, hash)` builder lowers here).
2392    /// Routes the subscriber creation through the [`NodeId`]'s
2393    /// session + identity (rclcpp `add_node` pattern).
2394    ///
2395    /// Use this in bridge code where two Nodes bind to different RMW
2396    /// backends:
2397    ///
2398    /// ```ignore
2399    /// let node_in = exec.node_builder("ingress").rmw("zenoh").build()?;
2400    /// let pub_out = exec.with_node(node_out, |n| {
2401    ///     n.create_publisher_raw("/fwd", TYPE, HASH)
2402    /// })??;
2403    /// exec.register_subscription_buffered_raw_on::<_, 1024>(
2404    ///     node_in, "/src", TYPE, HASH, qos(),
2405    ///     move |bytes: &[u8]| { let _ = pub_out.publish_raw(bytes); },
2406    /// )?;
2407    /// ```
2408    pub(crate) fn register_subscription_buffered_raw_on<F, const RX_BUF: usize>(
2409        &mut self,
2410        node_id: super::node_record::NodeId,
2411        topic_name: &str,
2412        type_name: &str,
2413        type_hash: &str,
2414        qos: QosSettings,
2415        callback: F,
2416    ) -> Result<HandleId, NodeError>
2417    where
2418        F: FnMut(&[u8]) + 'static,
2419    {
2420        // Pull the Node's identity + session slot out first so the
2421        // mutable session borrow doesn't conflict with the arena
2422        // alloc inside `add_arena_subscription_callback`.
2423        let (node_name, ns, session_idx) = {
2424            let r = self
2425                .nodes
2426                .get(node_id.index())
2427                .ok_or(NodeError::InvalidSchedContextBinding)?;
2428            (r.name.clone(), r.namespace.clone(), r.session_idx)
2429        };
2430        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2431        if !node_name.is_empty() {
2432            topic = topic.with_node_name(&node_name);
2433        }
2434        let handle = {
2435            let session = self
2436                .session_at_mut(session_idx)
2437                .ok_or(NodeError::BackendMismatch)?;
2438            session
2439                .create_subscriber(&topic, qos)
2440                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2441        };
2442        let handle_id = self.add_arena_subscription_callback::<F, RX_BUF>(handle, qos, callback)?;
2443        // Phase 104.C.4 — apply Node's default SchedContext.
2444        self.apply_node_default_sched(handle_id.0, Some(node_id));
2445        Ok(handle_id)
2446    }
2447
2448    /// Register a borrowed (zero-copy) buffered subscription (Phase 229.6,
2449    /// issue 0007 / RFC-0033 `borrowed` mode).
2450    ///
2451    /// `B` is the code-generated borrowed-message marker (e.g. `ImageBorrow`)
2452    /// implementing [`BorrowedMessage`](nros_core::BorrowedMessage); the
2453    /// callback receives `&B::View<'a>` — a lifetime-carrying message whose
2454    /// unbounded sequence/string fields borrow directly from the receive buffer
2455    /// (no `heapless::Vec` copy). The view is valid only for the callback's
2456    /// duration.
2457    ///
2458    /// **Triple-buffer only.** A borrowed view must reference exactly one
2459    /// well-defined buffer slot for the callback's duration; an SPSC ring
2460    /// (`qos.depth > 1`) keeps several samples in flight with no single such
2461    /// slot. `qos.depth > 1` is therefore rejected with
2462    /// [`TransportError::Unsupported`].
2463    pub(crate) fn register_subscription_buffered_borrowed_on<B, F, const RX_BUF: usize>(
2464        &mut self,
2465        node_id: super::node_record::NodeId,
2466        topic_name: &str,
2467        qos: QosSettings,
2468        callback: F,
2469    ) -> Result<HandleId, NodeError>
2470    where
2471        B: nros_core::BorrowedMessage + 'static,
2472        F: for<'a> FnMut(&B::View<'a>) + 'static,
2473    {
2474        type Entry<B, F> = SubBufferedBorrowedEntry<B, F>;
2475
2476        // Borrowed views require a single well-defined slot (triple buffer).
2477        if qos.depth > 1 {
2478            return Err(NodeError::Transport(TransportError::Unsupported));
2479        }
2480
2481        let slot = self.next_entry_slot()?;
2482        let (node_name, ns, session_idx) = {
2483            let r = self
2484                .nodes
2485                .get(node_id.index())
2486                .ok_or(NodeError::InvalidSchedContextBinding)?;
2487            (r.name.clone(), r.namespace.clone(), r.session_idx)
2488        };
2489        let mut topic = TopicInfo::new(
2490            topic_name,
2491            <B as BorrowedMessage>::TYPE_NAME,
2492            <B as BorrowedMessage>::TYPE_HASH,
2493        )
2494        .with_namespace(&ns);
2495        if !node_name.is_empty() {
2496            topic = topic.with_node_name(&node_name);
2497        }
2498        let handle = {
2499            let session = self
2500                .session_at_mut(session_idx)
2501                .ok_or(NodeError::BackendMismatch)?;
2502            session
2503                .create_subscriber(&topic, qos)
2504                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2505        };
2506
2507        let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2508        let (entry_offset, trailing_offset) =
2509            self.arena_alloc_with_trailing::<Entry<B, F>>(trailing_bytes)?;
2510        let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2511
2512        // depth <= 1 guaranteed above → always triple buffer.
2513        let buffer = BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) });
2514
2515        unsafe {
2516            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2517            let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<B, F>;
2518            core::ptr::write(
2519                entry_ptr,
2520                Entry {
2521                    handle,
2522                    buffer,
2523                    callback,
2524                    _phantom: PhantomData,
2525                },
2526            );
2527        }
2528
2529        self.entries[slot] = Some(CallbackMeta {
2530            offset: entry_offset,
2531            kind: EntryKind::Subscription,
2532            try_process: sub_buffered_borrowed_try_process::<B, F>,
2533            has_data: sub_buffered_borrowed_has_data::<B, F>,
2534            pre_sample: no_pre_sample,
2535            invocation: InvocationMode::OnNewData,
2536            drop_fn: drop_entry::<Entry<B, F>>,
2537        });
2538        self.apply_node_default_sched(slot, Some(node_id));
2539        Ok(HandleId(slot))
2540    }
2541
2542    /// Register a raw (type-erased) buffered subscription whose callback
2543    /// also receives a [`RawMessageInfo`](nros_core::RawMessageInfo)
2544    /// carrying the sample's wire **attachment** (Phase 189.M1).
2545    ///
2546    /// Backs the `node.subscription(t).generic(..).message_info().build(cb)`
2547    /// builder — the cross-RMW bridge reads the `bridge_origin` tag from
2548    /// `info.attachment()` for echo suppression. One sample per
2549    /// `spin_once`; the attachment is staged in a flat per-entry buffer
2550    /// (cap [`RAW_INFO_ATT_CAP`](super::arena::RAW_INFO_ATT_CAP)).
2551    pub fn register_subscription_buffered_raw_info_on<F, const RX_BUF: usize>(
2552        &mut self,
2553        node_id: super::node_record::NodeId,
2554        topic_name: &str,
2555        type_name: &str,
2556        type_hash: &str,
2557        qos: QosSettings,
2558        callback: F,
2559    ) -> Result<HandleId, NodeError>
2560    where
2561        F: FnMut(&[u8], &nros_core::RawMessageInfo) + 'static,
2562    {
2563        type Entry<F, const N: usize> = SubBufferedRawInfoEntry<F, N>;
2564
2565        let slot = self.next_entry_slot()?;
2566        let (node_name, ns, session_idx) = {
2567            let r = self
2568                .nodes
2569                .get(node_id.index())
2570                .ok_or(NodeError::InvalidSchedContextBinding)?;
2571            (r.name.clone(), r.namespace.clone(), r.session_idx)
2572        };
2573        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2574        if !node_name.is_empty() {
2575            topic = topic.with_node_name(&node_name);
2576        }
2577        let handle = {
2578            let session = self
2579                .session_at_mut(session_idx)
2580                .ok_or(NodeError::BackendMismatch)?;
2581            session
2582                .create_subscriber(&topic, qos)
2583                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2584        };
2585
2586        let offset = self.arena_alloc::<Entry<F, RX_BUF>>()?;
2587        unsafe {
2588            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2589            let entry_ptr = arena_ptr.add(offset) as *mut Entry<F, RX_BUF>;
2590            core::ptr::write(
2591                entry_ptr,
2592                Entry {
2593                    handle,
2594                    buffer: [0u8; RX_BUF],
2595                    att: [0u8; super::arena::RAW_INFO_ATT_CAP],
2596                    callback,
2597                },
2598            );
2599        }
2600
2601        self.entries[slot] = Some(CallbackMeta {
2602            offset,
2603            kind: EntryKind::Subscription,
2604            try_process: sub_buffered_raw_info_try_process::<F, RX_BUF>,
2605            has_data: sub_buffered_raw_info_has_data::<F, RX_BUF>,
2606            pre_sample: no_pre_sample,
2607            invocation: InvocationMode::OnNewData,
2608            drop_fn: drop_entry::<Entry<F, RX_BUF>>,
2609        });
2610        self.apply_node_default_sched(slot, Some(node_id));
2611        Ok(HandleId(slot))
2612    }
2613
2614    /// Phase 250 (Wave 2) — register a generic (type-erased) raw subscription
2615    /// that surfaces E2E [`IntegrityStatus`](nros_rmw::IntegrityStatus) (CRC +
2616    /// sequence gap/dup) alongside the raw CDR bytes
2617    /// (`FnMut(&[u8], &IntegrityStatus)`). The type-erased analog of
2618    /// [`register_subscription_with_safety_sized_inner`]: the validator lives in
2619    /// the `RmwSubscriber` (`try_recv_validated`), so the subscriber is created
2620    /// plainly and no `register_type::<M>()` is needed (the declarative `Node`
2621    /// path is generic). Used by the declarative runtime's `.safety()` opt-in.
2622    #[cfg(feature = "safety-e2e")]
2623    pub fn register_subscription_buffered_raw_safety_on<F, const RX_BUF: usize>(
2624        &mut self,
2625        node_id: super::node_record::NodeId,
2626        topic_name: &str,
2627        type_name: &str,
2628        type_hash: &str,
2629        qos: QosSettings,
2630        callback: F,
2631    ) -> Result<HandleId, NodeError>
2632    where
2633        F: FnMut(&[u8], &nros_rmw::IntegrityStatus) + 'static,
2634    {
2635        use super::arena::{
2636            SubBufferedRawSafetyEntry, sub_buffered_raw_safety_has_data,
2637            sub_buffered_raw_safety_try_process,
2638        };
2639        type Entry<F, const N: usize> = SubBufferedRawSafetyEntry<F, N>;
2640
2641        let slot = self.next_entry_slot()?;
2642        let (node_name, ns, session_idx) = {
2643            let r = self
2644                .nodes
2645                .get(node_id.index())
2646                .ok_or(NodeError::InvalidSchedContextBinding)?;
2647            (r.name.clone(), r.namespace.clone(), r.session_idx)
2648        };
2649        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
2650        if !node_name.is_empty() {
2651            topic = topic.with_node_name(&node_name);
2652        }
2653        let handle = {
2654            let session = self
2655                .session_at_mut(session_idx)
2656                .ok_or(NodeError::BackendMismatch)?;
2657            session
2658                .create_subscriber(&topic, qos)
2659                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2660        };
2661
2662        let offset = self.arena_alloc::<Entry<F, RX_BUF>>()?;
2663        unsafe {
2664            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2665            let entry_ptr = arena_ptr.add(offset) as *mut Entry<F, RX_BUF>;
2666            core::ptr::write(
2667                entry_ptr,
2668                Entry {
2669                    handle,
2670                    buffer: [0u8; RX_BUF],
2671                    callback,
2672                },
2673            );
2674        }
2675
2676        self.entries[slot] = Some(CallbackMeta {
2677            offset,
2678            kind: EntryKind::Subscription,
2679            try_process: sub_buffered_raw_safety_try_process::<F, RX_BUF>,
2680            has_data: sub_buffered_raw_safety_has_data::<F, RX_BUF>,
2681            pre_sample: no_pre_sample,
2682            invocation: InvocationMode::OnNewData,
2683            drop_fn: drop_entry::<Entry<F, RX_BUF>>,
2684        });
2685        self.apply_node_default_sched(slot, Some(node_id));
2686        Ok(HandleId(slot))
2687    }
2688
2689    /// Register a raw byte-shaped callback against a pre-built
2690    /// `RmwSubscriber` handle.
2691    ///
2692    /// Backend-agnostic primitive — the caller is responsible for
2693    /// obtaining the handle by whatever route the active backend
2694    /// supports:
2695    ///
2696    /// - **Generic ROS-typed flow**: call `Session::create_subscriber`
2697    ///   on `self.session_mut()` with a [`TopicInfo`]. The
2698    ///   `node_mut(id).subscription(t).generic(ty, hash)` builder is the
2699    ///   convenience wrapper for this path.
2700    /// - **Backend-specific flow** (e.g. uORB needs `&'static orb_metadata`):
2701    ///   reach into the concrete session via [`Self::session_mut`] and
2702    ///   call its backend-specific create method, then hand the handle
2703    ///   here. `nros-px4::uorb::create_subscription_with_callback` is
2704    ///   the example.
2705    ///
2706    /// The arena-store + vtable wiring is identical to
2707    /// `register_subscription_buffered_raw`; the only thing that varies is
2708    /// where the handle came from. Callback fires on every message
2709    /// delivery during [`spin_once`](Self::spin_once); bytes are
2710    /// passed as `&[u8]`.
2711    pub fn add_arena_subscription_callback<F, const RX_BUF: usize>(
2712        &mut self,
2713        handle: session::RmwSubscriber,
2714        qos: QosSettings,
2715        callback: F,
2716    ) -> Result<HandleId, NodeError>
2717    where
2718        F: FnMut(&[u8]) + 'static,
2719    {
2720        type Entry<F> = SubBufferedRawEntry<F>;
2721
2722        let slot = self.next_entry_slot()?;
2723        let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
2724
2725        let (entry_offset, trailing_offset) =
2726            self.arena_alloc_with_trailing::<Entry<F>>(trailing_bytes)?;
2727
2728        let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
2729
2730        let buffer = if qos.depth <= 1 {
2731            BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
2732        } else {
2733            BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
2734        };
2735
2736        unsafe {
2737            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2738            let entry_ptr = arena_ptr.add(entry_offset) as *mut Entry<F>;
2739            core::ptr::write(
2740                entry_ptr,
2741                Entry {
2742                    handle,
2743                    buffer,
2744                    callback,
2745                },
2746            );
2747        }
2748
2749        self.entries[slot] = Some(CallbackMeta {
2750            offset: entry_offset,
2751            kind: EntryKind::Subscription,
2752            try_process: sub_buffered_raw_try_process::<F>,
2753            has_data: sub_buffered_raw_has_data::<F>,
2754            pre_sample: no_pre_sample,
2755            invocation: InvocationMode::OnNewData,
2756            drop_fn: drop_entry::<Entry<F>>,
2757        });
2758        Ok(HandleId(slot))
2759    }
2760
2761    pub(crate) fn register_subscription_with_info_sized_inner<M, F, const RX_BUF: usize>(
2762        &mut self,
2763        node_id: Option<super::node_record::NodeId>,
2764        topic_name: &str,
2765        qos: QosSettings,
2766        callback: F,
2767    ) -> Result<HandleId, NodeError>
2768    where
2769        M: crate::cyclonedds_register::MessageForRmw + 'static,
2770        F: FnMut(&M, Option<&nros_core::MessageInfo>) + 'static,
2771    {
2772        type Entry<M, F, const N: usize> = SubInfoEntry<M, F, N>;
2773
2774        // Phase 212.K.7.6.b — see `create_publisher_on`.
2775        crate::cyclonedds_register::register_type::<M>()?;
2776
2777        let slot = self.next_entry_slot()?;
2778        let (node_name, ns, session_idx) = match node_id {
2779            Some(id) => {
2780                let r = self
2781                    .nodes
2782                    .get(id.index())
2783                    .ok_or(NodeError::InvalidSchedContextBinding)?;
2784                (r.name.clone(), r.namespace.clone(), r.session_idx)
2785            }
2786            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
2787        };
2788        let mut topic = TopicInfo::new(
2789            topic_name,
2790            <M as RosMessage>::TYPE_NAME,
2791            <M as RosMessage>::TYPE_HASH,
2792        )
2793        .with_namespace(&ns);
2794        if !node_name.is_empty() {
2795            topic = topic.with_node_name(&node_name);
2796        }
2797        let handle = {
2798            let session = self
2799                .session_at_mut(session_idx)
2800                .ok_or(NodeError::BackendMismatch)?;
2801            session
2802                .create_subscriber(&topic, qos)
2803                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2804        };
2805
2806        let offset = self.arena_alloc::<Entry<M, F, RX_BUF>>()?;
2807
2808        unsafe {
2809            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2810            let entry_ptr = arena_ptr.add(offset) as *mut Entry<M, F, RX_BUF>;
2811            core::ptr::write(
2812                entry_ptr,
2813                Entry {
2814                    handle,
2815                    buffer: [0u8; RX_BUF],
2816                    sampled_len: 0,
2817                    callback,
2818                    _phantom: PhantomData,
2819                },
2820            );
2821        }
2822
2823        self.entries[slot] = Some(CallbackMeta {
2824            offset,
2825            kind: EntryKind::Subscription,
2826            try_process: sub_info_try_process::<M, F, RX_BUF>,
2827            has_data: sub_info_has_data::<M, F, RX_BUF>,
2828            pre_sample: sub_info_pre_sample::<M, F, RX_BUF>,
2829            invocation: InvocationMode::OnNewData,
2830            drop_fn: drop_entry::<Entry<M, F, RX_BUF>>,
2831        });
2832        self.apply_node_default_sched(slot, node_id);
2833        Ok(HandleId(slot))
2834    }
2835
2836    #[cfg(feature = "safety-e2e")]
2837    pub(crate) fn register_subscription_with_safety_sized_inner<M, F, const RX_BUF: usize>(
2838        &mut self,
2839        node_id: Option<super::node_record::NodeId>,
2840        topic_name: &str,
2841        qos: QosSettings,
2842        callback: F,
2843    ) -> Result<HandleId, NodeError>
2844    where
2845        M: crate::cyclonedds_register::MessageForRmw + 'static,
2846        F: FnMut(&M, &nros_rmw::IntegrityStatus) + 'static,
2847    {
2848        type Entry<M, F, const N: usize> = SubSafetyEntry<M, F, N>;
2849
2850        // Phase 212.K.7.6.b — see `create_publisher_on`.
2851        crate::cyclonedds_register::register_type::<M>()?;
2852
2853        let slot = self.next_entry_slot()?;
2854        let (node_name, ns, session_idx) = match node_id {
2855            Some(id) => {
2856                let r = self
2857                    .nodes
2858                    .get(id.index())
2859                    .ok_or(NodeError::InvalidSchedContextBinding)?;
2860                (r.name.clone(), r.namespace.clone(), r.session_idx)
2861            }
2862            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
2863        };
2864        let mut topic = TopicInfo::new(
2865            topic_name,
2866            <M as RosMessage>::TYPE_NAME,
2867            <M as RosMessage>::TYPE_HASH,
2868        )
2869        .with_namespace(&ns);
2870        if !node_name.is_empty() {
2871            topic = topic.with_node_name(&node_name);
2872        }
2873        let handle = {
2874            let session = self
2875                .session_at_mut(session_idx)
2876                .ok_or(NodeError::BackendMismatch)?;
2877            session
2878                .create_subscriber(&topic, qos)
2879                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
2880        };
2881
2882        let offset = self.arena_alloc::<Entry<M, F, RX_BUF>>()?;
2883
2884        unsafe {
2885            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2886            let entry_ptr = arena_ptr.add(offset) as *mut Entry<M, F, RX_BUF>;
2887            core::ptr::write(
2888                entry_ptr,
2889                Entry {
2890                    handle,
2891                    buffer: [0u8; RX_BUF],
2892                    sampled_len: 0,
2893                    callback,
2894                    _phantom: PhantomData,
2895                },
2896            );
2897        }
2898
2899        self.entries[slot] = Some(CallbackMeta {
2900            offset,
2901            kind: EntryKind::Subscription,
2902            try_process: sub_safety_try_process::<M, F, RX_BUF>,
2903            has_data: sub_safety_has_data::<M, F, RX_BUF>,
2904            pre_sample: sub_safety_pre_sample::<M, F, RX_BUF>,
2905            invocation: InvocationMode::OnNewData,
2906            drop_fn: drop_entry::<Entry<M, F, RX_BUF>>,
2907        });
2908        self.apply_node_default_sched(slot, node_id);
2909        Ok(HandleId(slot))
2910    }
2911
2912    /// Register a service callback with the default buffer size.
2913    ///
2914    /// The callback is stored in the arena and invoked during [`spin_once()`](Self::spin_once).
2915    pub fn register_service<Svc, F>(
2916        &mut self,
2917        service_name: &str,
2918        callback: F,
2919    ) -> Result<HandleId, NodeError>
2920    where
2921        Svc: RosService + 'static,
2922        Svc::Request: crate::cyclonedds_register::MessageForRmw,
2923        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
2924        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
2925    {
2926        self.register_service_sized::<Svc, F, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, callback)
2927    }
2928
2929    /// Register a service callback with custom request/reply buffer sizes.
2930    pub fn register_service_sized<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
2931        &mut self,
2932        service_name: &str,
2933        callback: F,
2934    ) -> Result<HandleId, NodeError>
2935    where
2936        Svc: RosService + 'static,
2937        Svc::Request: crate::cyclonedds_register::MessageForRmw,
2938        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
2939        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
2940    {
2941        type Entry<Svc, F, const RQ: usize, const RP: usize> = SrvEntry<Svc, F, RQ, RP>;
2942
2943        // Phase 212.K.7.7.b — register both halves of the service round-trip
2944        // under cyclonedds. No-op for other RMWs. Mirrors the K.7.6.b hook
2945        // on `Node::create_service_sized`.
2946        crate::cyclonedds_register::register_type::<Svc::Request>()?;
2947        crate::cyclonedds_register::register_type::<Svc::Reply>()?;
2948
2949        let slot = self.next_entry_slot()?;
2950        let node_name: heapless::String<64> = self.node_name.clone();
2951        let ns: heapless::String<64> = self.namespace.clone();
2952        let mut info = ServiceInfo::new(service_name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
2953            .with_namespace(&ns);
2954        if !node_name.is_empty() {
2955            info = info.with_node_name(&node_name);
2956        }
2957        let handle = self
2958            .session
2959            .create_service_server(&info, QosSettings::services_default())
2960            .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
2961
2962        let offset = self.arena_alloc::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>()?;
2963
2964        // SAFETY: same guarantees as register_subscription_sized.
2965        unsafe {
2966            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
2967            let entry_ptr = arena_ptr.add(offset) as *mut Entry<Svc, F, REQ_BUF, REPLY_BUF>;
2968            core::ptr::write(
2969                entry_ptr,
2970                Entry {
2971                    handle,
2972                    req_buffer: [0u8; REQ_BUF],
2973                    reply_buffer: [0u8; REPLY_BUF],
2974                    callback,
2975                    _phantom: PhantomData,
2976                },
2977            );
2978        }
2979
2980        self.entries[slot] = Some(CallbackMeta {
2981            offset,
2982            kind: EntryKind::Service,
2983            try_process: srv_try_process::<Svc, F, REQ_BUF, REPLY_BUF>,
2984            has_data: srv_has_data::<Svc, F, REQ_BUF, REPLY_BUF>,
2985            pre_sample: no_pre_sample,
2986            invocation: InvocationMode::OnNewData,
2987            drop_fn: drop_entry::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>,
2988        });
2989        Ok(HandleId(slot))
2990    }
2991
2992    /// Phase 104.C.3.3.a — Node-aware variant of
2993    /// [`register_service_sized`](Self::register_service_sized).
2994    pub fn register_service_sized_on<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
2995        &mut self,
2996        node_id: super::node_record::NodeId,
2997        service_name: &str,
2998        qos: QosSettings,
2999        callback: F,
3000    ) -> Result<HandleId, NodeError>
3001    where
3002        Svc: RosService + 'static,
3003        Svc::Request: crate::cyclonedds_register::MessageForRmw,
3004        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
3005        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
3006    {
3007        type Entry<Svc, F, const RQ: usize, const RP: usize> = SrvEntry<Svc, F, RQ, RP>;
3008
3009        // Phase 212.K.7.7.b — see `register_service_sized`.
3010        crate::cyclonedds_register::register_type::<Svc::Request>()?;
3011        crate::cyclonedds_register::register_type::<Svc::Reply>()?;
3012
3013        let slot = self.next_entry_slot()?;
3014        let (node_name, ns, session_idx) = {
3015            let r = self
3016                .nodes
3017                .get(node_id.index())
3018                .ok_or(NodeError::InvalidSchedContextBinding)?;
3019            (r.name.clone(), r.namespace.clone(), r.session_idx)
3020        };
3021        let mut info = ServiceInfo::new(service_name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
3022            .with_namespace(&ns);
3023        if !node_name.is_empty() {
3024            info = info.with_node_name(&node_name);
3025        }
3026        let handle = {
3027            let session = self
3028                .session_at_mut(session_idx)
3029                .ok_or(NodeError::BackendMismatch)?;
3030            // Phase 193.5 — validate against the backend's supported policies
3031            // (no silent downgrade); request/reply effectively requires RELIABLE.
3032            qos.validate_against(session.supported_qos_policies())
3033                .map_err(NodeError::Transport)?;
3034            session
3035                .create_service_server(&info, qos)
3036                .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?
3037        };
3038
3039        let offset = self.arena_alloc::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>()?;
3040        unsafe {
3041            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3042            let entry_ptr = arena_ptr.add(offset) as *mut Entry<Svc, F, REQ_BUF, REPLY_BUF>;
3043            core::ptr::write(
3044                entry_ptr,
3045                Entry {
3046                    handle,
3047                    req_buffer: [0u8; REQ_BUF],
3048                    reply_buffer: [0u8; REPLY_BUF],
3049                    callback,
3050                    _phantom: PhantomData,
3051                },
3052            );
3053        }
3054
3055        self.entries[slot] = Some(CallbackMeta {
3056            offset,
3057            kind: EntryKind::Service,
3058            try_process: srv_try_process::<Svc, F, REQ_BUF, REPLY_BUF>,
3059            has_data: srv_has_data::<Svc, F, REQ_BUF, REPLY_BUF>,
3060            pre_sample: no_pre_sample,
3061            invocation: InvocationMode::OnNewData,
3062            drop_fn: drop_entry::<Entry<Svc, F, REQ_BUF, REPLY_BUF>>,
3063        });
3064        self.apply_node_default_sched(slot, Some(node_id));
3065        Ok(HandleId(slot))
3066    }
3067
3068    /// Phase 104.C.3.3.a — Node-aware variant of
3069    /// [`register_service`](Self::register_service).
3070    pub fn register_service_on<Svc, F>(
3071        &mut self,
3072        node_id: super::node_record::NodeId,
3073        service_name: &str,
3074        callback: F,
3075    ) -> Result<HandleId, NodeError>
3076    where
3077        Svc: RosService + 'static,
3078        Svc::Request: crate::cyclonedds_register::MessageForRmw,
3079        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
3080        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
3081    {
3082        self.register_service_sized_on::<
3083            Svc,
3084            F,
3085            { crate::config::DEFAULT_RX_BUF_SIZE },
3086            { crate::config::DEFAULT_RX_BUF_SIZE },
3087        >(node_id, service_name, QosSettings::services_default(), callback)
3088    }
3089
3090    // ========================================================================
3091    // Timer registration
3092    // ========================================================================
3093
3094    /// Register a repeating timer callback.
3095    ///
3096    /// The callback fires every `period` milliseconds during [`spin_once()`](Self::spin_once).
3097    /// The timer delta is approximated by the `timeout_ms` argument to `spin_once`.
3098    pub fn register_timer<F>(
3099        &mut self,
3100        period: TimerDuration,
3101        callback: F,
3102    ) -> Result<HandleId, NodeError>
3103    where
3104        F: FnMut() + 'static,
3105    {
3106        let slot = self.next_entry_slot()?;
3107        let offset = self.arena_alloc::<TimerEntry<F>>()?;
3108
3109        unsafe {
3110            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3111            let entry_ptr = arena_ptr.add(offset) as *mut TimerEntry<F>;
3112            core::ptr::write(
3113                entry_ptr,
3114                TimerEntry {
3115                    period_ms: period.as_millis(),
3116                    elapsed_ms: 0,
3117                    oneshot: false,
3118                    fired: false,
3119                    cancelled: false,
3120                    callback,
3121                },
3122            );
3123        }
3124
3125        self.entries[slot] = Some(CallbackMeta {
3126            offset,
3127            kind: EntryKind::Timer,
3128            try_process: timer_try_process::<F>,
3129            has_data: always_ready,
3130            pre_sample: no_pre_sample,
3131            invocation: InvocationMode::Always,
3132            drop_fn: drop_entry::<TimerEntry<F>>,
3133        });
3134        Ok(HandleId(slot))
3135    }
3136
3137    /// Register a one-shot timer callback.
3138    ///
3139    /// The callback fires once after `delay` milliseconds, then becomes inert.
3140    pub fn register_timer_oneshot<F>(
3141        &mut self,
3142        delay: TimerDuration,
3143        callback: F,
3144    ) -> Result<HandleId, NodeError>
3145    where
3146        F: FnMut() + 'static,
3147    {
3148        let slot = self.next_entry_slot()?;
3149        let offset = self.arena_alloc::<TimerEntry<F>>()?;
3150
3151        unsafe {
3152            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3153            let entry_ptr = arena_ptr.add(offset) as *mut TimerEntry<F>;
3154            core::ptr::write(
3155                entry_ptr,
3156                TimerEntry {
3157                    period_ms: delay.as_millis(),
3158                    elapsed_ms: 0,
3159                    oneshot: true,
3160                    fired: false,
3161                    cancelled: false,
3162                    callback,
3163                },
3164            );
3165        }
3166
3167        self.entries[slot] = Some(CallbackMeta {
3168            offset,
3169            kind: EntryKind::Timer,
3170            try_process: timer_try_process::<F>,
3171            has_data: always_ready,
3172            pre_sample: no_pre_sample,
3173            invocation: InvocationMode::Always,
3174            drop_fn: drop_entry::<TimerEntry<F>>,
3175        });
3176        Ok(HandleId(slot))
3177    }
3178
3179    // ========================================================================
3180    // Raw callback registration (for C API)
3181    // ========================================================================
3182
3183    /// The kept C-FFI subscription core (Phase 189.M2.b): registers a
3184    /// raw `RawSubscriptionCallback` fn-ptr + `context` against an
3185    /// optional node's session. The Rust ergonomic surface is the
3186    /// `node.subscription(t)` builder (closures); this is the single
3187    /// primitive the `nros-c` thin wrapper lowers to. `node_id == None`
3188    /// is the legacy single-node path.
3189    #[allow(clippy::too_many_arguments)]
3190    pub fn add_arena_subscription_c_callback<const RX_BUF: usize>(
3191        &mut self,
3192        node_id: Option<super::node_record::NodeId>,
3193        topic_name: &str,
3194        type_name: &str,
3195        type_hash: &str,
3196        qos: QosSettings,
3197        callback: RawSubscriptionCallback,
3198        context: *mut core::ffi::c_void,
3199    ) -> Result<HandleId, NodeError> {
3200        let slot = self.next_entry_slot()?;
3201        let (node_name, ns, session_idx) = match node_id {
3202            Some(id) => {
3203                let r = self
3204                    .nodes
3205                    .get(id.index())
3206                    .ok_or(NodeError::InvalidSchedContextBinding)?;
3207                (r.name.clone(), r.namespace.clone(), r.session_idx)
3208            }
3209            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3210        };
3211        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
3212        if !node_name.is_empty() {
3213            topic = topic.with_node_name(&node_name);
3214        }
3215        let handle = {
3216            let session = self
3217                .session_at_mut(session_idx)
3218                .ok_or(NodeError::BackendMismatch)?;
3219            session
3220                .create_subscriber(&topic, qos)
3221                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
3222        };
3223
3224        let (_slot_count, trailing_bytes) = buffered_region_size(qos.depth, RX_BUF);
3225
3226        let (entry_offset, trailing_offset) =
3227            self.arena_alloc_with_trailing::<SubBufferedRawCEntry>(trailing_bytes)?;
3228
3229        let buf_ptr = unsafe { (self.arena.as_mut_ptr() as *mut u8).add(trailing_offset) };
3230
3231        let buffer = if qos.depth <= 1 {
3232            BufferStrategy::Triple(unsafe { TripleBuffer::init(buf_ptr, RX_BUF) })
3233        } else {
3234            BufferStrategy::Ring(unsafe { SpscRing::init(buf_ptr, RX_BUF, qos.depth as usize) })
3235        };
3236
3237        unsafe {
3238            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3239            let entry_ptr = arena_ptr.add(entry_offset) as *mut SubBufferedRawCEntry;
3240            core::ptr::write(
3241                entry_ptr,
3242                SubBufferedRawCEntry {
3243                    handle,
3244                    buffer,
3245                    callback,
3246                    context,
3247                },
3248            );
3249        }
3250
3251        self.entries[slot] = Some(CallbackMeta {
3252            offset: entry_offset,
3253            kind: EntryKind::Subscription,
3254            try_process: sub_buffered_raw_c_try_process,
3255            has_data: sub_buffered_raw_c_has_data,
3256            pre_sample: no_pre_sample,
3257            invocation: InvocationMode::OnNewData,
3258            drop_fn: drop_entry::<SubBufferedRawCEntry>,
3259        });
3260        self.apply_node_default_sched(slot, node_id);
3261        Ok(HandleId(slot))
3262    }
3263
3264    /// Phase 189.M3.4 — register a raw C-fn-ptr subscription whose callback
3265    /// also receives the sample's wire **attachment**
3266    /// ([`RawSubscriptionInfoCallback`]: `(data, len, attachment, att_len,
3267    /// context)`) — the C analog of the Rust
3268    /// `node.subscription(t).generic(..).message_info()` builder. Backs the C
3269    /// FFI `nros_executor_register_subscription_raw_with_info`. Flat per-entry
3270    /// payload + attachment buffers (cap [`RAW_INFO_ATT_CAP`](super::arena::RAW_INFO_ATT_CAP));
3271    /// one sample per `spin_once`.
3272    #[allow(clippy::too_many_arguments)]
3273    pub fn add_arena_subscription_c_info_callback<const RX_BUF: usize>(
3274        &mut self,
3275        node_id: Option<super::node_record::NodeId>,
3276        topic_name: &str,
3277        type_name: &str,
3278        type_hash: &str,
3279        qos: QosSettings,
3280        callback: RawSubscriptionInfoCallback,
3281        context: *mut core::ffi::c_void,
3282    ) -> Result<HandleId, NodeError> {
3283        type Entry<const N: usize> = SubBufferedRawInfoCEntry<N>;
3284
3285        let slot = self.next_entry_slot()?;
3286        let (node_name, ns, session_idx) = match node_id {
3287            Some(id) => {
3288                let r = self
3289                    .nodes
3290                    .get(id.index())
3291                    .ok_or(NodeError::InvalidSchedContextBinding)?;
3292                (r.name.clone(), r.namespace.clone(), r.session_idx)
3293            }
3294            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3295        };
3296        let mut topic = TopicInfo::new(topic_name, type_name, type_hash).with_namespace(&ns);
3297        if !node_name.is_empty() {
3298            topic = topic.with_node_name(&node_name);
3299        }
3300        let handle = {
3301            let session = self
3302                .session_at_mut(session_idx)
3303                .ok_or(NodeError::BackendMismatch)?;
3304            session
3305                .create_subscriber(&topic, qos)
3306                .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?
3307        };
3308
3309        let offset = self.arena_alloc::<Entry<RX_BUF>>()?;
3310        unsafe {
3311            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3312            let entry_ptr = arena_ptr.add(offset) as *mut Entry<RX_BUF>;
3313            core::ptr::write(
3314                entry_ptr,
3315                Entry {
3316                    handle,
3317                    buffer: [0u8; RX_BUF],
3318                    att: [0u8; super::arena::RAW_INFO_ATT_CAP],
3319                    callback,
3320                    context,
3321                },
3322            );
3323        }
3324
3325        self.entries[slot] = Some(CallbackMeta {
3326            offset,
3327            kind: EntryKind::Subscription,
3328            try_process: sub_buffered_raw_info_c_try_process::<RX_BUF>,
3329            has_data: sub_buffered_raw_info_c_has_data::<RX_BUF>,
3330            pre_sample: no_pre_sample,
3331            invocation: InvocationMode::OnNewData,
3332            drop_fn: drop_entry::<Entry<RX_BUF>>,
3333        });
3334        self.apply_node_default_sched(slot, node_id);
3335        Ok(HandleId(slot))
3336    }
3337
3338    /// Register a raw (untyped) service callback.
3339    ///
3340    /// Register a raw (untyped) service callback with the default buffer size.
3341    ///
3342    /// The callback receives and produces CDR bytes without typed
3343    /// deserialization/serialization. Used by the C API wrapper.
3344    pub fn register_service_raw(
3345        &mut self,
3346        service_name: &str,
3347        service_type: &str,
3348        service_hash: &str,
3349        callback: RawServiceCallback,
3350        context: *mut core::ffi::c_void,
3351    ) -> Result<HandleId, NodeError> {
3352        self.register_service_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(
3353            service_name,
3354            service_type,
3355            service_hash,
3356            QosSettings::services_default(),
3357            callback,
3358            context,
3359        )
3360    }
3361
3362    /// Register a raw (untyped) service callback with custom buffer sizes + QoS.
3363    ///
3364    /// `REQ_BUF` and `REPLY_BUF` set the stack-allocated CDR buffers
3365    /// for the request and reply respectively. Increase for services
3366    /// with large payloads (e.g., parameter services). `qos` applies to both
3367    /// the request + reply endpoints (Phase 193.2c).
3368    #[allow(clippy::too_many_arguments)]
3369    pub fn register_service_raw_sized<const REQ_BUF: usize, const REPLY_BUF: usize>(
3370        &mut self,
3371        service_name: &str,
3372        service_type: &str,
3373        service_hash: &str,
3374        qos: QosSettings,
3375        callback: RawServiceCallback,
3376        context: *mut core::ffi::c_void,
3377    ) -> Result<HandleId, NodeError> {
3378        self.register_service_raw_sized_inner::<REQ_BUF, REPLY_BUF>(
3379            None,
3380            service_name,
3381            service_type,
3382            service_hash,
3383            qos,
3384            callback,
3385            context,
3386        )
3387    }
3388
3389    /// Phase 104.C.3.3.a — Node-aware variant of
3390    /// [`register_service_raw_sized`]. C-FFI path.
3391    #[allow(clippy::too_many_arguments)]
3392    pub fn register_service_raw_sized_on<const REQ_BUF: usize, const REPLY_BUF: usize>(
3393        &mut self,
3394        node_id: super::node_record::NodeId,
3395        service_name: &str,
3396        service_type: &str,
3397        service_hash: &str,
3398        qos: QosSettings,
3399        callback: RawServiceCallback,
3400        context: *mut core::ffi::c_void,
3401    ) -> Result<HandleId, NodeError> {
3402        self.register_service_raw_sized_inner::<REQ_BUF, REPLY_BUF>(
3403            Some(node_id),
3404            service_name,
3405            service_type,
3406            service_hash,
3407            qos,
3408            callback,
3409            context,
3410        )
3411    }
3412
3413    #[allow(clippy::too_many_arguments)]
3414    fn register_service_raw_sized_inner<const REQ_BUF: usize, const REPLY_BUF: usize>(
3415        &mut self,
3416        node_id: Option<super::node_record::NodeId>,
3417        service_name: &str,
3418        service_type: &str,
3419        service_hash: &str,
3420        qos: QosSettings,
3421        callback: RawServiceCallback,
3422        context: *mut core::ffi::c_void,
3423    ) -> Result<HandleId, NodeError> {
3424        let slot = self.next_entry_slot()?;
3425        let (node_name, ns, session_idx) = match node_id {
3426            Some(id) => {
3427                let r = self
3428                    .nodes
3429                    .get(id.index())
3430                    .ok_or(NodeError::InvalidSchedContextBinding)?;
3431                (r.name.clone(), r.namespace.clone(), r.session_idx)
3432            }
3433            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3434        };
3435        let mut info =
3436            ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3437        if !node_name.is_empty() {
3438            info = info.with_node_name(&node_name);
3439        }
3440        let handle = {
3441            let session = self
3442                .session_at_mut(session_idx)
3443                .ok_or(NodeError::BackendMismatch)?;
3444            // Phase 193.5 — validate against the backend's supported policies
3445            // (no silent downgrade); request/reply effectively requires RELIABLE.
3446            qos.validate_against(session.supported_qos_policies())
3447                .map_err(NodeError::Transport)?;
3448            session
3449                .create_service_server(&info, qos)
3450                .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?
3451        };
3452
3453        let offset = self.arena_alloc::<SrvRawEntry<REQ_BUF, REPLY_BUF>>()?;
3454
3455        unsafe {
3456            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3457            let entry_ptr = arena_ptr.add(offset) as *mut SrvRawEntry<REQ_BUF, REPLY_BUF>;
3458            core::ptr::write(
3459                entry_ptr,
3460                SrvRawEntry {
3461                    handle,
3462                    req_buffer: [0u8; REQ_BUF],
3463                    reply_buffer: [0u8; REPLY_BUF],
3464                    callback,
3465                    context,
3466                },
3467            );
3468        }
3469
3470        self.entries[slot] = Some(CallbackMeta {
3471            offset,
3472            kind: EntryKind::Service,
3473            try_process: srv_raw_try_process::<REQ_BUF, REPLY_BUF>,
3474            has_data: srv_raw_has_data::<REQ_BUF, REPLY_BUF>,
3475            pre_sample: no_pre_sample,
3476            invocation: InvocationMode::OnNewData,
3477            drop_fn: drop_entry::<SrvRawEntry<REQ_BUF, REPLY_BUF>>,
3478        });
3479        self.apply_node_default_sched(slot, node_id);
3480        Ok(HandleId(slot))
3481    }
3482
3483    // ========================================================================
3484    // Raw service client registration (Phase 82)
3485    // ========================================================================
3486
3487    /// Register a raw (untyped) service client with the default reply
3488    /// buffer size.
3489    ///
3490    /// The client is owned by the executor's arena. Each `spin_once`
3491    /// dispatch polls the in-flight reply slot via `try_recv_reply_raw`
3492    /// and fires the registered callback when the response arrives.
3493    /// Used by the C API thin wrapper — see Phase 82.
3494    pub fn register_service_client_raw(
3495        &mut self,
3496        service_name: &str,
3497        service_type: &str,
3498        service_hash: &str,
3499        callback: Option<RawResponseCallback>,
3500        context: *mut core::ffi::c_void,
3501    ) -> Result<HandleId, NodeError> {
3502        self.register_service_client_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }>(
3503            service_name,
3504            service_type,
3505            service_hash,
3506            QosSettings::services_default(),
3507            callback,
3508            context,
3509        )
3510    }
3511
3512    /// Register a raw service client with a custom reply buffer size + QoS.
3513    ///
3514    /// `qos` applies to the client's request + reply endpoints (Phase 193.3b);
3515    /// defaults to [`QosSettings::services_default`] via the convenience
3516    /// wrapper.
3517    #[allow(clippy::too_many_arguments)]
3518    pub fn register_service_client_raw_sized<const REPLY_BUF: usize>(
3519        &mut self,
3520        service_name: &str,
3521        service_type: &str,
3522        service_hash: &str,
3523        qos: QosSettings,
3524        callback: Option<RawResponseCallback>,
3525        context: *mut core::ffi::c_void,
3526    ) -> Result<HandleId, NodeError> {
3527        self.register_service_client_raw_sized_inner::<REPLY_BUF>(
3528            None,
3529            service_name,
3530            service_type,
3531            service_hash,
3532            qos,
3533            callback,
3534            context,
3535        )
3536    }
3537
3538    /// Phase 104.C.3.3.a — Node-aware variant of
3539    /// [`register_service_client_raw_sized`]. Routes the client
3540    /// creation through the named Node's session.
3541    #[allow(clippy::too_many_arguments)]
3542    pub fn register_service_client_raw_sized_on<const REPLY_BUF: usize>(
3543        &mut self,
3544        node_id: super::node_record::NodeId,
3545        service_name: &str,
3546        service_type: &str,
3547        service_hash: &str,
3548        qos: QosSettings,
3549        callback: Option<RawResponseCallback>,
3550        context: *mut core::ffi::c_void,
3551    ) -> Result<HandleId, NodeError> {
3552        self.register_service_client_raw_sized_inner::<REPLY_BUF>(
3553            Some(node_id),
3554            service_name,
3555            service_type,
3556            service_hash,
3557            qos,
3558            callback,
3559            context,
3560        )
3561    }
3562
3563    #[allow(clippy::too_many_arguments)]
3564    fn register_service_client_raw_sized_inner<const REPLY_BUF: usize>(
3565        &mut self,
3566        node_id: Option<super::node_record::NodeId>,
3567        service_name: &str,
3568        service_type: &str,
3569        service_hash: &str,
3570        qos: QosSettings,
3571        callback: Option<RawResponseCallback>,
3572        context: *mut core::ffi::c_void,
3573    ) -> Result<HandleId, NodeError> {
3574        let slot = self.next_entry_slot()?;
3575        let (node_name, ns, session_idx) = match node_id {
3576            Some(id) => {
3577                let r = self
3578                    .nodes
3579                    .get(id.index())
3580                    .ok_or(NodeError::InvalidSchedContextBinding)?;
3581                (r.name.clone(), r.namespace.clone(), r.session_idx)
3582            }
3583            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3584        };
3585        let mut info =
3586            ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3587        if !node_name.is_empty() {
3588            info = info.with_node_name(&node_name);
3589        }
3590        let handle = {
3591            let session = self
3592                .session_at_mut(session_idx)
3593                .ok_or(NodeError::BackendMismatch)?;
3594            // Phase 193.5 — validate against the backend's supported policies
3595            // (no silent downgrade); request/reply effectively requires RELIABLE.
3596            qos.validate_against(session.supported_qos_policies())
3597                .map_err(NodeError::Transport)?;
3598            session
3599                .create_service_client(&info, qos)
3600                .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?
3601        };
3602
3603        let offset = self.arena_alloc::<ServiceClientRawArenaEntry<REPLY_BUF>>()?;
3604        unsafe {
3605            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3606            let entry_ptr = arena_ptr.add(offset) as *mut ServiceClientRawArenaEntry<REPLY_BUF>;
3607            core::ptr::write(
3608                entry_ptr,
3609                ServiceClientRawArenaEntry {
3610                    handle,
3611                    reply_buffer: [0u8; REPLY_BUF],
3612                    pending: false,
3613                    reply_ready: core::sync::atomic::AtomicBool::new(false),
3614                    callback,
3615                    context,
3616                },
3617            );
3618        }
3619
3620        self.entries[slot] = Some(CallbackMeta {
3621            offset,
3622            kind: EntryKind::ServiceClient,
3623            try_process: service_client_raw_try_process::<REPLY_BUF>,
3624            has_data: always_ready,
3625            pre_sample: no_pre_sample,
3626            invocation: InvocationMode::Always,
3627            drop_fn: drop_entry::<ServiceClientRawArenaEntry<REPLY_BUF>>,
3628        });
3629        self.apply_node_default_sched(slot, node_id);
3630        Ok(HandleId(slot))
3631    }
3632
3633    /// RFC-0041 / Phase 239.1 — register a **typed callback** service client.
3634    /// The reply is eager-drained at `spin_once` and dispatched to `callback` as
3635    /// a deserialized `Svc::Reply`. Returns the scheduling [`HandleId`] and a
3636    /// `*mut` to the arena entry's send header (used to build the typed
3637    /// [`ServiceClientCallback`](super::handles::ServiceClientCallback)).
3638    #[allow(clippy::too_many_arguments)]
3639    pub(crate) fn register_service_client_callback<Svc, F, const REPLY_BUF: usize>(
3640        &mut self,
3641        node_id: Option<super::node_record::NodeId>,
3642        service_name: &str,
3643        service_type: &str,
3644        service_hash: &str,
3645        qos: QosSettings,
3646        callback: F,
3647    ) -> Result<(HandleId, *mut ServiceClientSendHeader<REPLY_BUF>), NodeError>
3648    where
3649        Svc: nros_core::RosService + 'static,
3650        F: FnMut(&Svc::Reply) + 'static,
3651    {
3652        let slot = self.next_entry_slot()?;
3653        let (node_name, ns, session_idx) = match node_id {
3654            Some(id) => {
3655                let r = self
3656                    .nodes
3657                    .get(id.index())
3658                    .ok_or(NodeError::InvalidSchedContextBinding)?;
3659                (r.name.clone(), r.namespace.clone(), r.session_idx)
3660            }
3661            None => (self.node_name.clone(), self.namespace.clone(), 0u8),
3662        };
3663        let mut info =
3664            ServiceInfo::new(service_name, service_type, service_hash).with_namespace(&ns);
3665        if !node_name.is_empty() {
3666            info = info.with_node_name(&node_name);
3667        }
3668        let handle = {
3669            let session = self
3670                .session_at_mut(session_idx)
3671                .ok_or(NodeError::BackendMismatch)?;
3672            qos.validate_against(session.supported_qos_policies())
3673                .map_err(NodeError::Transport)?;
3674            session
3675                .create_service_client(&info, qos)
3676                .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?
3677        };
3678
3679        let offset = self.arena_alloc::<ServiceClientCallbackEntry<Svc, F, REPLY_BUF>>()?;
3680        let hdr_ptr = unsafe {
3681            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3682            let entry_ptr =
3683                arena_ptr.add(offset) as *mut ServiceClientCallbackEntry<Svc, F, REPLY_BUF>;
3684            core::ptr::write(
3685                entry_ptr,
3686                ServiceClientCallbackEntry {
3687                    hdr: ServiceClientSendHeader {
3688                        handle,
3689                        reply_buffer: [0u8; REPLY_BUF],
3690                        pending: false,
3691                        reply_ready: core::sync::atomic::AtomicBool::new(false),
3692                    },
3693                    callback,
3694                    _phantom: core::marker::PhantomData,
3695                },
3696            );
3697            &mut (*entry_ptr).hdr as *mut ServiceClientSendHeader<REPLY_BUF>
3698        };
3699
3700        self.entries[slot] = Some(CallbackMeta {
3701            offset,
3702            kind: EntryKind::ServiceClient,
3703            try_process: service_client_callback_try_process::<Svc, F, REPLY_BUF>,
3704            has_data: always_ready,
3705            pre_sample: no_pre_sample,
3706            invocation: InvocationMode::Always,
3707            drop_fn: drop_entry::<ServiceClientCallbackEntry<Svc, F, REPLY_BUF>>,
3708        });
3709        self.apply_node_default_sched(slot, node_id);
3710        Ok((HandleId(slot), hdr_ptr))
3711    }
3712
3713    // ========================================================================
3714    // Guard condition registration
3715    // ========================================================================
3716
3717    /// Register a guard condition with a callback.
3718    ///
3719    /// Returns both the [`HandleId`] for trigger configuration and a
3720    /// [`GuardConditionHandle`] for triggering from other threads.
3721    pub fn register_guard_condition<F>(
3722        &mut self,
3723        callback: F,
3724    ) -> Result<(HandleId, GuardConditionHandle), NodeError>
3725    where
3726        F: FnMut() + 'static,
3727    {
3728        let slot = self.next_entry_slot()?;
3729        let offset = self.arena_alloc::<GuardConditionEntry<F>>()?;
3730
3731        unsafe {
3732            let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3733            let entry_ptr = arena_ptr.add(offset) as *mut GuardConditionEntry<F>;
3734            core::ptr::write(
3735                entry_ptr,
3736                GuardConditionEntry {
3737                    flag: portable_atomic::AtomicBool::new(false),
3738                    callback,
3739                },
3740            );
3741
3742            // Create a handle pointing to the flag in the arena
3743            let flag_ptr = &(*entry_ptr).flag as *const portable_atomic::AtomicBool;
3744            #[allow(unused_mut)]
3745            let mut guard_handle = GuardConditionHandle::new(flag_ptr);
3746            // Phase 124.B.5 — wire the wake callback so trigger()
3747            // also signals the executor's wake_cv.
3748            #[cfg(all(feature = "std", feature = "rmw-cffi"))]
3749            {
3750                let ctx = self.wake_ctx_ptr();
3751                guard_handle.set_wake_cb(nros_rmw_runtime_wake_cb, ctx);
3752            }
3753
3754            self.entries[slot] = Some(CallbackMeta {
3755                offset,
3756                kind: EntryKind::GuardCondition,
3757                try_process: guard_try_process::<F>,
3758                has_data: guard_has_data::<F>,
3759                pre_sample: no_pre_sample,
3760                invocation: InvocationMode::OnNewData,
3761                drop_fn: drop_entry::<GuardConditionEntry<F>>,
3762            });
3763
3764            Ok((HandleId(slot), guard_handle))
3765        }
3766    }
3767
3768    // ========================================================================
3769    // Timer control methods
3770    // ========================================================================
3771
3772    /// Cancel a timer. A cancelled timer will not fire but still accumulates
3773    /// elapsed time. The timer can be restarted with [`reset_timer()`](Self::reset_timer).
3774    pub fn cancel_timer(&mut self, id: HandleId) -> Result<(), NodeError> {
3775        let meta = self
3776            .entries
3777            .get(id.0)
3778            .and_then(|e| e.as_ref())
3779            .ok_or(NodeError::BufferTooSmall)?;
3780        if !matches!(meta.kind, EntryKind::Timer) {
3781            return Err(NodeError::BufferTooSmall);
3782        }
3783        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3784        // SAFETY: meta.offset points to a valid TimerEntry<F> which shares
3785        // layout with TimerHeader for its initial fields (both #[repr(C)]).
3786        let header = unsafe { &mut *(arena_ptr.add(meta.offset) as *mut TimerHeader) };
3787        header.cancelled = true;
3788        Ok(())
3789    }
3790
3791    /// Reset a timer. Clears the cancelled state and resets the elapsed time
3792    /// to zero, so the timer starts a fresh period.
3793    pub fn reset_timer(&mut self, id: HandleId) -> Result<(), NodeError> {
3794        let meta = self
3795            .entries
3796            .get(id.0)
3797            .and_then(|e| e.as_ref())
3798            .ok_or(NodeError::BufferTooSmall)?;
3799        if !matches!(meta.kind, EntryKind::Timer) {
3800            return Err(NodeError::BufferTooSmall);
3801        }
3802        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
3803        let header = unsafe { &mut *(arena_ptr.add(meta.offset) as *mut TimerHeader) };
3804        header.cancelled = false;
3805        header.elapsed_ms = 0;
3806        Ok(())
3807    }
3808
3809    /// Check if a timer is cancelled.
3810    pub fn timer_is_cancelled(&self, id: HandleId) -> bool {
3811        let meta = match self.entries.get(id.0).and_then(|e| e.as_ref()) {
3812            Some(m) if matches!(m.kind, EntryKind::Timer) => m,
3813            _ => return false,
3814        };
3815        let arena_ptr = self.arena.as_ptr() as *const u8;
3816        let header = unsafe { &*(arena_ptr.add(meta.offset) as *const TimerHeader) };
3817        header.cancelled
3818    }
3819
3820    /// Get the period of a timer in milliseconds, or `None` if the handle
3821    /// is not a valid timer.
3822    pub fn timer_period_ms(&self, id: HandleId) -> Option<u64> {
3823        let meta = self
3824            .entries
3825            .get(id.0)
3826            .and_then(|e| e.as_ref())
3827            .filter(|m| matches!(m.kind, EntryKind::Timer))?;
3828        let arena_ptr = self.arena.as_ptr() as *const u8;
3829        let header = unsafe { &*(arena_ptr.add(meta.offset) as *const TimerHeader) };
3830        Some(header.period_ms)
3831    }
3832
3833    // ========================================================================
3834    // spin_once (three-phase: readiness -> trigger -> dispatch)
3835    // ========================================================================
3836
3837    /// Drive I/O and dispatch registered callbacks once.
3838    ///
3839    /// Three-phase execution:
3840    /// 1. **Readiness scan** — query each handle's `has_data()`.
3841    /// 2. **Trigger evaluation** — check if the executor-level trigger passes.
3842    /// 3. **Dispatch** — invoke callbacks according to their `InvocationMode`.
3843    ///
3844    /// Returns a [`SpinOnceResult`] with counts of processed items and errors.
3845    ///
3846    /// # Arguments
3847    /// * `timeout` — upper bound on the I/O wait. Saturated at
3848    ///   `i32::MAX` ms (~24 days) for the underlying transport call.
3849    ///
3850    /// Phase 84.D7: unified on `core::time::Duration`. The previous
3851    /// `timeout_ms: i32` signature had a latent footgun where
3852    /// `spin_once(-1)` silently froze timers while still polling I/O;
3853    /// `Duration` has no negative sentinel.
3854    pub fn spin_once(&mut self, timeout: core::time::Duration) -> SpinOnceResult {
3855        let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;
3856
3857        // Phase 110.0 — cap against the backend's next internal-event
3858        // deadline (lease keepalive, heartbeat, ACK-NACK timeout, ...).
3859        // Default backend impl returns `None`, so this is a no-op
3860        // unless the active backend opts in.
3861        #[allow(unused_variables)]
3862        let timeout_ms = match self.session.next_deadline_ms() {
3863            Some(next) => timeout_ms.min(next.min(i32::MAX as u32) as i32),
3864            None => timeout_ms,
3865        };
3866
3867        // Wall-clock-accurate timer accumulation. Measure real time
3868        // since the previous `spin_once` exited (or, on the first call,
3869        // since `drive_io` started). Two failure modes the requested
3870        // `timeout_ms` doesn't capture:
3871        //  1. `drive_io` returns early — e.g. zenoh-pico's condvar wakes
3872        //     on data arrival, well under 1 ms.
3873        //  2. The caller spends time outside `spin_once` (explicit sleep,
3874        //     ROS-2 cooperative scheduling, etc.) and that time should
3875        //     still count toward timers.
3876        // Crediting the requested timeout to timers in either case ticks
3877        // them faster than wall-clock — observed as a 30 Hz control loop
3878        // overshooting to >200 Hz under sustained traffic. Carry the
3879        // sub-ms remainder across calls so precision is preserved.
3880        #[cfg(feature = "std")]
3881        let spin_start = std::time::Instant::now();
3882        #[cfg(not(feature = "std"))]
3883        let spin_start_us = self.clock_us_fn.map(|clock| clock());
3884
3885        // Phase 104.C.6 — shared executor wake. Swap-and-clear the
3886        // wake flag; if it was set before this `spin_once` entered,
3887        // skip the blocking wait on the primary session and poll
3888        // every session non-blockingly. Lets a wake signal from any
3889        // thread (or, post-104.C.6.b, any backend's vtable hook)
3890        // pre-empt whichever session the executor would otherwise
3891        // sleep on. Cost on the no-wake path is one atomic swap.
3892        #[cfg(feature = "std")]
3893        #[allow(unused_variables)]
3894        let was_woken = self
3895            .wake_flag
3896            .swap(false, std::sync::atomic::Ordering::SeqCst);
3897
3898        // Phase 124.B.4 — condvar-blocked wait.
3899        //
3900        // RT contract:
3901        //  * cv.wait_timeout_while: bounded by `timeout_ms`.
3902        //    Predicate is O(1) — one atomic swap + Instant::now.
3903        //    No allocation. PI-mutex consideration: wake_mu held
3904        //    only during predicate check (microseconds);
3905        //    contended worst-case = notify_all execution time
3906        //    (~10s of µs).
3907        //  * Backend's `set_wake_callback`-installed cb is called
3908        //    on async data arrival from its transport-notify path
3909        //    (worker thread, ISR-safe variant via 124.B.7). The
3910        //    runtime cb writes wake_flag + signals wake_cv,
3911        //    unblocking this loop sub-poll-period.
3912        //  * Poll-only backends (XRCE, bare-metal) leave the slot
3913        //    NULL; the cv wait still fires on its deadline, then
3914        //    drive_io(0) drains whatever the backend's internal
3915        //    poll has buffered. Equivalent to their pre-124
3916        //    behaviour minus the blocking wait inside drive_io.
3917        //
3918        // Lost-wakeup safe: SeqCst flag write happens-before
3919        // notify, and the waiter checks the flag under wake_mu in
3920        // the predicate. If wake fires between drain and cv.wait
3921        // entry, the predicate sees flag=true on first eval and
3922        // exits immediately.
3923        // Phase 130.4 — only sleep in the wake-primitive wait when a
3924        // backend actually installed `set_wake_callback`. Poll-only
3925        // backends (XRCE, current Cyclone / dust-DDS) leave the
3926        // vtable slot NULL → `has_async_wake == false` → drive_io
3927        // for the caller's full timeout instead of sleeping in a
3928        // never-signaled wait that starves reliable retransmission
3929        // (Phase 127.C.4 root cause: server's send_reply flushes
3930        // 100 ms once, then NodeWake.wait_ms(100) sleeps 100 ms
3931        // with zero session activity, so the agent's ACK arrives
3932        // into a stalled session and reliable redelivery never
3933        // fires). RTOS std builds with an event-driven backend
3934        // installed still use `NodeWake` (kernel-native binary
3935        // semaphore — honors its deadline, dodges Zephyr's libc
3936        // `pthread_cond_timedwait` hang); POSIX/macOS std keep
3937        // the existing `std::Condvar` path.
3938        // Phase 248 (C2) — platform-agnostic wake wait. The choice of
3939        // wait primitive is made at runtime from the platform vtable's
3940        // wake probe, NOT a compile-time per-RTOS `cfg`:
3941        //
3942        //   * `node_wake.is_some()` → a kernel-native binary semaphore
3943        //     (`nros_platform_wake_*`) is linked; block on it. It honors
3944        //     its deadline (dodging e.g. Zephyr's libc
3945        //     `pthread_cond_timedwait` hang) and `nros_rmw_runtime_wake_cb`
3946        //     signals it on transport arrival.
3947        //   * `node_wake.is_none()` → no platform wake primitive; fall
3948        //     back to the std `Condvar` wake pair.
3949        //
3950        // Either way, only sleep in the wake-primitive/cv wait when a
3951        // backend actually installed `set_wake_callback`
3952        // (`has_async_wake`); poll-only backends (XRCE, current Cyclone)
3953        // leave the slot NULL and get a full-timeout `drive_io` so
3954        // reliable retransmission isn't starved (Phase 127.C.4).
3955        #[cfg(all(feature = "std", feature = "rmw-cffi"))]
3956        let primary_drive_timeout_ms = if let Some(wake) = self.node_wake.as_ref() {
3957            if !was_woken && self.has_async_wake {
3958                let _ = wake.wait_ms(timeout_ms as u32);
3959                // Clear any pending flag the cb set while we were
3960                // waiting; mirrors the std cv predicate's flag drain.
3961                let _ = self
3962                    .wake_flag
3963                    .swap(false, std::sync::atomic::Ordering::SeqCst);
3964                0
3965            } else {
3966                timeout_ms
3967            }
3968        } else {
3969            if !was_woken && self.has_async_wake {
3970                let dur = core::time::Duration::from_millis(timeout_ms as u64);
3971                // SAFETY-invariant: `wake_mu` guards `()` — it is purely the
3972                // companion mutex for `wake_cv`, protecting no shared state. A
3973                // poison (another thread panicked while holding it) cannot have
3974                // corrupted anything, so recover the guard rather than aborting
3975                // this hot spin loop.
3976                let g = self.wake_mu.lock().unwrap_or_else(|e| e.into_inner());
3977                let _ = self.wake_cv.wait_timeout_while(g, dur, |_| {
3978                    !self
3979                        .wake_flag
3980                        .swap(false, std::sync::atomic::Ordering::SeqCst)
3981                });
3982            }
3983            // drive_io is non-blocking when the cv-wait above ran;
3984            // full-timeout otherwise so the transport's blocking recv
3985            // yields the thread instead of busy-spinning.
3986            if self.has_async_wake { 0 } else { timeout_ms }
3987        };
3988
3989        // std builds without rmw-cffi (mock-session tests, future
3990        // alternative backends) keep the original "drive_io is
3991        // non-blocking" assumption.
3992        #[cfg(all(feature = "std", not(feature = "rmw-cffi")))]
3993        let primary_drive_timeout_ms = 0;
3994
3995        // Phase 248 (C2) — no_std + alloc + rmw-cffi path. When a backend
3996        // installed the wake-cb (`has_async_wake_alloc`) and a platform
3997        // wake primitive is available (`node_wake_alloc.is_some()` — the
3998        // runtime vtable probe), block on `node_wake_alloc.wait_ms` so the
3999        // executor unblocks on transport arrival rather than relying on
4000        // drive_io's blocking recv for the full timeout. Then drive_io(0)
4001        // drains whatever the backend's poll path buffered. Platforms with
4002        // no wake primitive (bare-metal) fall through to the full timeout.
4003        #[cfg(all(feature = "alloc", not(feature = "std"), feature = "rmw-cffi"))]
4004        let primary_drive_timeout_ms = {
4005            let was_woken_alloc = self
4006                .wake_flag_alloc
4007                .swap(false, portable_atomic::Ordering::SeqCst);
4008            if !was_woken_alloc
4009                && self.has_async_wake_alloc
4010                && let Some(wake) = self.node_wake_alloc.as_ref()
4011            {
4012                let _ = wake.wait_ms(timeout_ms as u32);
4013                // Drain any flag the cb set while we were waiting.
4014                let _ = self
4015                    .wake_flag_alloc
4016                    .swap(false, portable_atomic::Ordering::SeqCst);
4017                0
4018            } else {
4019                timeout_ms
4020            }
4021        };
4022
4023        // no_std without (alloc + rmw-cffi) keeps the legacy
4024        // full-timeout drive_io call.
4025        #[cfg(all(
4026            not(feature = "std"),
4027            not(all(feature = "alloc", feature = "rmw-cffi"))
4028        ))]
4029        let primary_drive_timeout_ms = timeout_ms;
4030
4031        let _ = self.session.drive_io(primary_drive_timeout_ms);
4032        for extra in self.extra_sessions.iter_mut() {
4033            let _ = extra.drive_io(0);
4034        }
4035
4036        #[cfg(feature = "std")]
4037        let delta_ms = {
4038            let now = std::time::Instant::now();
4039            // `last_spin_end` is seeded at construction time, so this
4040            // path always has a Some(_) on every call.
4041            let prev = self.last_spin_end.unwrap_or(spin_start);
4042            let elapsed = now.saturating_duration_since(prev);
4043            self.last_spin_end = Some(now);
4044            let total_us = self
4045                .spin_residual_us
4046                .saturating_add(elapsed.as_micros() as u64);
4047            let ms = total_us / 1000;
4048            self.spin_residual_us = total_us % 1000;
4049            ms
4050        };
4051        #[cfg(not(feature = "std"))]
4052        let delta_ms = if let Some(clock) = self.clock_us_fn {
4053            let now = clock();
4054            let prev = self
4055                .last_spin_end_us
4056                .unwrap_or_else(|| spin_start_us.unwrap_or(now));
4057            self.last_spin_end_us = Some(now);
4058            let elapsed_us = now.saturating_sub(prev);
4059            let total_us = self.spin_residual_us.saturating_add(elapsed_us);
4060            let ms = total_us / 1000;
4061            self.spin_residual_us = total_us % 1000;
4062            ms
4063        } else {
4064            timeout_ms as u64
4065        };
4066        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
4067
4068        // Phase 1: Readiness scan (Phase 110.A.b — backed by FifoReadySet).
4069        //
4070        // `bits` carries data-readiness only (used by trigger eval +
4071        // by `InvocationMode::OnNewData`). `always_mask` carries the
4072        // `InvocationMode::Always` entries that fire regardless of
4073        // data presence. The dispatcher drains
4074        // `FifoReadySet(bits | always_mask)` after the trigger
4075        // passes; `pop_next` yields registration order (lowest bit
4076        // first) so behavior is bit-identical to the pre-refactor
4077        // `for (i, meta) in entries.iter().enumerate()` loop.
4078        let mut bits: u64 = 0;
4079        let mut count: usize = 0;
4080        let mut non_timer_mask: u64 = 0;
4081        let mut always_mask: u64 = 0;
4082
4083        for (i, meta) in self.entries.iter().enumerate() {
4084            if let Some(meta) = meta {
4085                let data_ptr = unsafe { arena_ptr.add(meta.offset) as *const u8 };
4086                if unsafe { (meta.has_data)(data_ptr) } {
4087                    bits |= 1u64 << i;
4088                }
4089                if !matches!(meta.kind, EntryKind::Timer | EntryKind::GuardCondition) {
4090                    non_timer_mask |= 1u64 << i;
4091                }
4092                if matches!(meta.invocation, InvocationMode::Always) {
4093                    always_mask |= 1u64 << i;
4094                }
4095                count += 1;
4096            }
4097        }
4098
4099        let snapshot = ReadinessSnapshot { bits, count };
4100
4101        // Phase 2: Trigger evaluation
4102        let trigger_passes = match &self.trigger {
4103            Trigger::Any => bits & non_timer_mask != 0 || non_timer_mask == 0,
4104            Trigger::All => bits & non_timer_mask == non_timer_mask,
4105            Trigger::One(id) => snapshot.is_ready(*id),
4106            Trigger::AllOf(set) => snapshot.all_ready(*set),
4107            Trigger::AnyOf(set) => snapshot.any_ready(*set),
4108            Trigger::Always => true,
4109            Trigger::Predicate(f) => f(&snapshot),
4110            Trigger::RawPredicate { callback, context } => {
4111                // Convert ReadinessSnapshot bitmask to a bool array for the C callback
4112                let mut ready_array = [false; 64];
4113                for (i, slot) in ready_array
4114                    .iter_mut()
4115                    .enumerate()
4116                    .take(snapshot.count.min(64))
4117                {
4118                    *slot = snapshot.bits & (1u64 << i) != 0;
4119                }
4120                // SAFETY: The callback and context are provided by the C API caller.
4121                // The ready_array is valid for snapshot.count elements.
4122                unsafe { callback(ready_array.as_ptr(), snapshot.count, *context) }
4123            }
4124        };
4125
4126        if !trigger_passes {
4127            // Timers still need delta accumulation even when trigger doesn't pass
4128            for meta in self.entries.iter().flatten() {
4129                if matches!(meta.kind, EntryKind::Timer) {
4130                    let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4131                    let _ = unsafe { (meta.try_process)(data_ptr, delta_ms) };
4132                }
4133            }
4134
4135            // Parameter services live outside the arena and must be processed
4136            // regardless of trigger state, otherwise ROS 2 param queries time out.
4137            #[cfg(feature = "param-services")]
4138            if let Some(params) = &mut self.params {
4139                {
4140                    let crate::parameter_services::ParamState {
4141                        server, services, ..
4142                    } = &mut **params;
4143                    let _ = services.process_services(server);
4144                }
4145                // Phase 172.H — persist any runtime override applied this tick.
4146                crate::parameter_services::flush_param_store(params);
4147            }
4148
4149            // Same treatment for lifecycle services — `ros2 lifecycle get`
4150            // must succeed even when no callbacks fired this tick.
4151            // SAFETY: see the matching invariant on the later call site.
4152            #[cfg(feature = "lifecycle-services")]
4153            if let Some(lc) = &mut self.lifecycle {
4154                let crate::lifecycle_services::LifecycleRuntimeState {
4155                    state_machine,
4156                    services,
4157                } = &mut **lc;
4158                let _ = unsafe { services.process_services(state_machine) };
4159            }
4160
4161            return SpinOnceResult::new();
4162        }
4163
4164        // Phase 2.5: LET pre-sample (only when LogicalExecutionTime)
4165        //
4166        // Sample all subscription data into entry buffers BEFORE dispatching
4167        // any callbacks. This ensures all callbacks in this cycle see a
4168        // consistent snapshot of data from the same point in time.
4169        // Services are NOT pre-sampled (request-reply is sequential).
4170        if matches!(self.semantics, ExecutorSemantics::LogicalExecutionTime) {
4171            for meta in self.entries.iter().flatten() {
4172                if matches!(meta.kind, EntryKind::Subscription) {
4173                    let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4174                    unsafe { (meta.pre_sample)(data_ptr) };
4175                }
4176            }
4177        }
4178
4179        // Phase 3: Dispatch (Phase 110.C — bucketed by SC.priority).
4180        //
4181        // Two ready-set families, each split across `Priority::COUNT`
4182        // buckets (Critical / Normal / BestEffort). Per-entry SC
4183        // `class` selects FIFO bitmap vs EDF heap; SC `priority`
4184        // selects the bucket within. Drain order:
4185        //   for each bucket in priority order (Critical first):
4186        //     drain EDF heap (deadline-priority), then FIFO bitmap
4187        //     (registration-order)
4188        // Default workloads — every entry on the auto-default Fifo SC
4189        // (Normal priority) — populate only `fifo[Normal]`, so
4190        // dispatch order is bit-identical to 110.B.b for those.
4191        const NB: usize = super::sched_context::Priority::COUNT;
4192        let mut result = SpinOnceResult::new();
4193        let mut fifo: super::ready_set::BucketedFifoSet<NB, { crate::config::MAX_CBS }> =
4194            super::ready_set::BucketedFifoSet::new();
4195        let mut edf: super::ready_set::BucketedEdfSet<NB, { crate::config::MAX_CBS }> =
4196            super::ready_set::BucketedEdfSet::new();
4197        let active_mask = bits | always_mask;
4198
4199        // Phase 110.E — refill any Sporadic SC budgets at period
4200        // boundaries before deciding what to dispatch this cycle.
4201        // Refill is polled (not ISR-driven) — coarse but correct
4202        // upper-bound bandwidth limiter.
4203        #[cfg(feature = "std")]
4204        {
4205            // Monotonic ms relative to a process-static epoch so the
4206            // refill clock survives wall-clock jumps.
4207            use std::sync::OnceLock;
4208            static EPOCH: OnceLock<std::time::Instant> = OnceLock::new();
4209            let now_ms = std::time::Instant::now()
4210                .saturating_duration_since(*EPOCH.get_or_init(std::time::Instant::now))
4211                .as_millis() as u64;
4212            // Use the cycle's `delta_ms` as the per-SC consumption
4213            // estimate — worst-case attribution. Per-callback
4214            // measurement lands with a higher-precision clock hook.
4215            let delta_us = (delta_ms as u32).saturating_mul(1000);
4216            for slot in self.sporadic_states.iter_mut().flatten() {
4217                let _ = slot.tick(now_ms, delta_us);
4218            }
4219        }
4220
4221        for i in 0..crate::config::MAX_CBS {
4222            if active_mask & (1u64 << i) == 0 {
4223                continue;
4224            }
4225            let sc_idx = self.sched_context_bindings[i].0 as usize;
4226            let sc_class_priority_deadline = self
4227                .sched_contexts
4228                .get(sc_idx)
4229                .and_then(|s| s.as_ref())
4230                .map(|sc| {
4231                    (
4232                        sc.class,
4233                        sc.priority.index(),
4234                        sc.deadline_us.get().map(|nz| nz.get()).unwrap_or(u32::MAX),
4235                    )
4236                });
4237            let (sc_class, bucket, deadline_us) = sc_class_priority_deadline.unwrap_or((
4238                super::sched_context::SchedClass::Fifo,
4239                super::sched_context::Priority::Normal.index(),
4240                u32::MAX,
4241            ));
4242            // Phase 110.E — Sporadic SC dispatch is suppressed when
4243            // its budget is exhausted. Atomic path (110.E.b PlatformTimer
4244            // refill) takes precedence when registered; polled path
4245            // (cycle-level delta_us attribution) handles the unregistered
4246            // case. Either way, exhausted budget skips dispatch.
4247            if matches!(sc_class, super::sched_context::SchedClass::Sporadic) {
4248                #[cfg(feature = "alloc")]
4249                let atomic_has_budget = self
4250                    .sporadic_atomic_states
4251                    .get(sc_idx)
4252                    .and_then(|s| s.as_ref())
4253                    .map(|(state, _)| state.has_budget());
4254                #[cfg(not(feature = "alloc"))]
4255                let atomic_has_budget: Option<bool> = None;
4256                let has_budget = match atomic_has_budget {
4257                    Some(b) => b,
4258                    None => self
4259                        .sporadic_states
4260                        .get(sc_idx)
4261                        .and_then(|s| s.as_ref())
4262                        .map(|s| s.budget_remaining_us > 0)
4263                        .unwrap_or(true),
4264                };
4265                if !has_budget {
4266                    continue;
4267                }
4268                // Phase 110.E.b follow-up — per-callback runtime
4269                // accounting (replaces this cycle-level attribution)
4270                // is applied at dispatch time below via
4271                // `consume_dispatch_runtime_us`. We only update the
4272                // polled-path `SporadicState` (no_std fallback) here
4273                // because the atomic path now records actual
4274                // wall-clock per-callback runtime. The
4275                // `delta_us` over-attribution that previously hit the
4276                // atomic state was a worst-case bandwidth limiter;
4277                // per-callback measurement is strictly tighter.
4278                #[cfg(not(feature = "alloc"))]
4279                {
4280                    let _ = sc_idx; // polled-state path lives in
4281                    // `sporadic_states`; this branch is a no-op when
4282                    // the atomic path is enabled.
4283                }
4284            }
4285            // Phase 110.F — per-callback OS priority routing. Entries
4286            // bound to an SC with `os_pri > 0` dispatch onto a worker
4287            // thread the OS has elevated to that priority; the
4288            // cooperative path is skipped for those entries. Workers
4289            // are spawned lazily.
4290            #[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
4291            {
4292                let os_pri = self
4293                    .sched_contexts
4294                    .get(sc_idx)
4295                    .and_then(|s| s.as_ref())
4296                    .map(|sc| sc.os_pri)
4297                    .unwrap_or(0);
4298                if os_pri > 0
4299                    && let Some(apply_policy) = self.os_priority_apply_policy
4300                {
4301                    let worker = self
4302                        .os_priority_workers
4303                        .entry(os_pri)
4304                        .or_insert_with(|| OsPriorityWorker::spawn(os_pri, apply_policy));
4305                    if let Some(meta) = self.entries[i].as_ref() {
4306                        let _ = worker.try_dispatch(WorkItem {
4307                            arena_base: arena_ptr as usize,
4308                            arena_offset: meta.offset,
4309                            try_process: meta.try_process,
4310                            delta_ms,
4311                        });
4312                    }
4313                    continue;
4314                }
4315            }
4316            // Phase 110.G — TT window gate, orthogonal to class.
4317            // Skips dispatch when the SC has a TT window AND the
4318            // current monotonic time is outside it. Both gates apply
4319            // independently — a Sporadic SC with a TT window must
4320            // pass both.
4321            if self.major_frame_us > 0 {
4322                let sc_opt = self.sched_contexts.get(sc_idx).and_then(|s| s.as_ref());
4323                if let Some(sc) = sc_opt {
4324                    let off = sc.tt_window_offset_us.get().map(|nz| nz.get()).unwrap_or(0);
4325                    let dur = sc
4326                        .tt_window_duration_us
4327                        .get()
4328                        .map(|nz| nz.get())
4329                        .unwrap_or(0);
4330                    if dur > 0 {
4331                        // Compute current phase within the major
4332                        // frame using the accumulated `delta_ms` clock
4333                        // (std-only precise; no_std uses `delta_ms`
4334                        // approximation from spin cadence).
4335                        #[cfg(feature = "std")]
4336                        let now_us = {
4337                            use std::sync::OnceLock;
4338                            static EPOCH: OnceLock<std::time::Instant> = OnceLock::new();
4339                            std::time::Instant::now()
4340                                .saturating_duration_since(
4341                                    *EPOCH.get_or_init(std::time::Instant::now),
4342                                )
4343                                .as_micros() as u64
4344                        };
4345                        #[cfg(not(feature = "std"))]
4346                        let now_us = delta_ms.saturating_mul(1000);
4347                        let phase = (now_us % self.major_frame_us as u64) as u32;
4348                        let in_window = if off + dur <= self.major_frame_us {
4349                            phase >= off && phase < off + dur
4350                        } else {
4351                            // Window wraps the major frame boundary.
4352                            let end = (off as u64 + dur as u64) % self.major_frame_us as u64;
4353                            phase >= off || (phase as u64) < end
4354                        };
4355                        if !in_window {
4356                            continue;
4357                        }
4358                    }
4359                }
4360            }
4361            let is_edf = matches!(sc_class, super::sched_context::SchedClass::Edf);
4362            let job = super::types::ActiveJob {
4363                sort_key: if is_edf { deadline_us } else { i as u32 },
4364                desc_idx: i as super::types::DescIdx,
4365            };
4366            if is_edf {
4367                let _ = edf.insert_into(bucket, job);
4368            } else {
4369                let _ = fifo.insert_into(bucket, job);
4370            }
4371        }
4372
4373        // SAFETY: each `desc_idx` we pop was set above only when the
4374        // corresponding `entries[i]` slot was `Some`; no Executor
4375        // mutation happens between that scan and this dispatch.
4376        let dispatch_one = |meta: &CallbackMeta,
4377                            arena_ptr: *mut u8,
4378                            delta_ms: u64,
4379                            result: &mut SpinOnceResult| {
4380            // Phase 141.B.2 — capture T1 at subscription dispatch
4381            // entry. Probe pairs it with the most recent T0 from
4382            // `nros_rmw_runtime_wake_cb` (std + alloc variants)
4383            // and pushes `T1 - T0` onto the ring buffer 141.C
4384            // drains. No-op when the probe feature is off or
4385            // no cycle reader is installed. Other entry kinds
4386            // (Service / Timer / GuardCondition) skip the probe
4387            // because the 141 acceptance is specifically
4388            // wake-to-subscription-dispatch latency.
4389            #[cfg(feature = "wake-latency-probe")]
4390            if matches!(meta.kind, EntryKind::Subscription) {
4391                super::wake_probe::on_dispatch();
4392            }
4393            let data_ptr = unsafe { arena_ptr.add(meta.offset) };
4394            match unsafe { (meta.try_process)(data_ptr, delta_ms) } {
4395                Ok(true) => match meta.kind {
4396                    EntryKind::Subscription => result.subscriptions_processed += 1,
4397                    EntryKind::Service
4398                    | EntryKind::ServiceClient
4399                    | EntryKind::ActionServer
4400                    | EntryKind::ActionClient => result.services_handled += 1,
4401                    EntryKind::Timer => result.timers_fired += 1,
4402                    EntryKind::GuardCondition => {}
4403                },
4404                Ok(false) => {}
4405                Err(_) => match meta.kind {
4406                    EntryKind::Subscription => result.subscription_errors += 1,
4407                    EntryKind::Service
4408                    | EntryKind::ServiceClient
4409                    | EntryKind::ActionServer
4410                    | EntryKind::ActionClient => result.service_errors += 1,
4411                    EntryKind::Timer | EntryKind::GuardCondition => {}
4412                },
4413            }
4414        };
4415
4416        // Phase 110.E.b follow-up — per-callback runtime accounting
4417        // for Sporadic SCs. Wall-clock-measure each dispatch and
4418        // consume the elapsed microseconds from the bound SC's
4419        // atomic budget. This replaces the cycle-level over-
4420        // attribution that previously charged the FULL `delta_us`
4421        // against every Sporadic SC regardless of which entries
4422        // actually fired — accurate per-callback measurement is the
4423        // shape the design doc's per-callback runtime acceptance
4424        // calls out. The closure is `feature = "std"`-gated because
4425        // it needs a `core::time::Instant`-equivalent monotonic
4426        // clock; the no_std fallback continues to use the polled
4427        // `SporadicState` path (cycle delta_us) until a board-side
4428        // monotonic-microsecond accessor lands.
4429        #[cfg(feature = "std")]
4430        let consume_dispatch_runtime_us =
4431            |desc_idx: usize,
4432             elapsed_us: u32,
4433             sched_context_bindings: &[super::sched_context::SchedContextId;
4434                  crate::config::MAX_CBS],
4435             sched_contexts: &[Option<super::sched_context::SchedContext>;
4436                  crate::config::MAX_SC],
4437             #[cfg(feature = "alloc")] sporadic_atomic_states: &[Option<(
4438                portable_atomic_util::Arc<super::sched_context::AtomicSporadicState>,
4439                OpaqueTimerHandle,
4440            )>;
4441                  crate::config::MAX_SC]| {
4442                let sc_idx = sched_context_bindings[desc_idx].0 as usize;
4443                let sc_class = sched_contexts
4444                    .get(sc_idx)
4445                    .and_then(|s| s.as_ref())
4446                    .map(|sc| sc.class)
4447                    .unwrap_or(super::sched_context::SchedClass::Fifo);
4448                if !matches!(sc_class, super::sched_context::SchedClass::Sporadic) {
4449                    return;
4450                }
4451                #[cfg(feature = "alloc")]
4452                if let Some((state, _)) =
4453                    sporadic_atomic_states.get(sc_idx).and_then(|s| s.as_ref())
4454                {
4455                    state.consume(elapsed_us);
4456                    // Phase 110.E.b — overrun detection. Cooperative
4457                    // single-thread can't preempt a runaway callback,
4458                    // so post-dispatch wall-clock comparison delivers
4459                    // the same observable signal as the design's
4460                    // oneshot-IRQ-and-cancel pattern, without needing
4461                    // a separate timer per SC. `budget_capacity_us` is
4462                    // the per-period budget the SC was sized against;
4463                    // any callback exceeding that has run past its
4464                    // bandwidth allotment.
4465                    if elapsed_us > state.budget_capacity_us {
4466                        state.record_overrun(elapsed_us - state.budget_capacity_us);
4467                    }
4468                }
4469                #[cfg(not(feature = "alloc"))]
4470                {
4471                    let _ = (sc_idx, elapsed_us);
4472                }
4473            };
4474
4475        // For each priority bucket (Critical → Normal → BestEffort),
4476        // drain EDF first then FIFO so an EDF callback in this bucket
4477        // beats a FIFO peer at the same priority, but no lower-priority
4478        // entry runs while a higher-priority bucket has work pending.
4479        // Strict static priority across buckets; non-preemptive within
4480        // an in-flight callback (see Phase 110.D).
4481        for bucket in 0..NB {
4482            while let Some(job) = edf.pop_from(bucket) {
4483                let i = job.desc_idx as usize;
4484                if let Some(meta) = self.entries[i].as_ref() {
4485                    #[cfg(feature = "std")]
4486                    let start = std::time::Instant::now();
4487                    dispatch_one(meta, arena_ptr, delta_ms, &mut result);
4488                    #[cfg(feature = "std")]
4489                    {
4490                        let elapsed_us = start.elapsed().as_micros().min(u32::MAX as u128) as u32;
4491                        consume_dispatch_runtime_us(
4492                            i,
4493                            elapsed_us,
4494                            &self.sched_context_bindings,
4495                            &self.sched_contexts,
4496                            #[cfg(feature = "alloc")]
4497                            &self.sporadic_atomic_states,
4498                        );
4499                    }
4500                }
4501            }
4502            while let Some(job) = fifo.pop_from(bucket) {
4503                let i = job.desc_idx as usize;
4504                if let Some(meta) = self.entries[i].as_ref() {
4505                    #[cfg(feature = "std")]
4506                    let start = std::time::Instant::now();
4507                    dispatch_one(meta, arena_ptr, delta_ms, &mut result);
4508                    #[cfg(feature = "std")]
4509                    {
4510                        let elapsed_us = start.elapsed().as_micros().min(u32::MAX as u128) as u32;
4511                        consume_dispatch_runtime_us(
4512                            i,
4513                            elapsed_us,
4514                            &self.sched_context_bindings,
4515                            &self.sched_contexts,
4516                            #[cfg(feature = "alloc")]
4517                            &self.sporadic_atomic_states,
4518                        );
4519                    }
4520                }
4521            }
4522        }
4523
4524        // Process parameter services (outside the arena)
4525        #[cfg(feature = "param-services")]
4526        if let Some(params) = &mut self.params {
4527            {
4528                let crate::parameter_services::ParamState {
4529                    server, services, ..
4530                } = &mut **params;
4531                if let Ok(n) = services.process_services(server) {
4532                    result.services_handled += n;
4533                }
4534            }
4535            // Phase 172.H — persist any runtime override applied this tick.
4536            crate::parameter_services::flush_param_store(params);
4537        }
4538
4539        // Process lifecycle services (outside the arena).
4540        //
4541        // SAFETY: `change_state` dispatches a user-supplied C callback through a
4542        // raw function pointer stored in `LifecyclePollingNodeCtx`. The caller
4543        // of `register_lifecycle_services` guarantees the callback/context pair
4544        // stays live for as long as the executor (see that method's docs).
4545        #[cfg(feature = "lifecycle-services")]
4546        if let Some(lc) = &mut self.lifecycle {
4547            let crate::lifecycle_services::LifecycleRuntimeState {
4548                state_machine,
4549                services,
4550            } = &mut **lc;
4551            if let Ok(n) = unsafe { services.process_services(state_machine) } {
4552                result.services_handled += n;
4553            }
4554        }
4555
4556        // Phase 258 (Track 2, 2a) — executor-owned component tick pass.
4557        // Mirrors `ExecutorNodeRuntime::run_ticks`: after the transport +
4558        // callbacks have been pumped, drive each enrolled component's `tick`
4559        // (service-client/action poll, etc.). `exec_ctx` hands the component
4560        // the whole executor as a raw `*mut Executor` so its tick can
4561        // reborrow it (the same disjoint-field raw-ptr pattern run_ticks
4562        // uses). Index-iterate over `Copy` slots so no borrow of
4563        // `self.component_slots` is held while `tick` runs (which aliases
4564        // `self` through `exec_ctx`).
4565        let exec_ctx = self as *mut Executor as *mut core::ffi::c_void;
4566        let slot_count = self.component_slots.len();
4567        for i in 0..slot_count {
4568            let slot = self.component_slots[i];
4569            // SAFETY: `slot.state` is the leaked cell the matching `tick`
4570            // expects (enrolled via `enroll_component`); `exec_ctx` is a live
4571            // `*mut Executor` for `self`. The slot was copied out, so no
4572            // borrow of `component_slots` is outstanding during the call.
4573            unsafe {
4574                (slot.tick)(slot.state, exec_ctx);
4575            }
4576        }
4577
4578        result
4579    }
4580
4581    /// Drive I/O and dispatch callbacks in an infinite loop.
4582    ///
4583    /// Each iteration calls [`spin_once(timeout_ms)`](Self::spin_once),
4584    /// which pumps the transport and dispatches all registered callbacks.
4585    ///
4586    /// This is the primary run loop for embedded applications:
4587    ///
4588    /// ```ignore
4589    /// let mut executor = Executor::open(&config)?;
4590    /// executor.register_subscription::<Int32, _>("/topic", |msg| { /* ... */ })?;
4591    /// executor.spin(10); // never returns
4592    /// ```
4593    pub fn spin(&mut self, timeout: core::time::Duration) -> ! {
4594        loop {
4595            self.spin_once(timeout);
4596        }
4597    }
4598
4599    /// Phase 104.C.3.3.c — rclcpp-`spin()`-shape no-arg variant.
4600    /// Defaults the per-iteration timeout to 50 ms, which keeps
4601    /// idle binaries from busy-spinning while staying responsive
4602    /// enough for default-QoS messaging.
4603    pub fn spin_default(&mut self) -> ! {
4604        self.spin(core::time::Duration::from_millis(50))
4605    }
4606
4607    /// Drive I/O and dispatch callbacks asynchronously.
4608    ///
4609    /// Runs forever, yielding between poll cycles so that other async tasks
4610    /// (e.g., [`Promise`](super::handles::Promise)) can make progress.
4611    ///
4612    /// Uses only `core::future` — no external async runtime dependency.
4613    ///
4614    /// # Usage patterns
4615    ///
4616    /// ```ignore
4617    /// // Pattern 1: select with a promise (embassy-futures)
4618    /// use embassy_futures::select::{select, Either};
4619    /// let promise = client.call(&req)?;
4620    /// let Either::Second(reply) = select(executor.spin_async(), promise).await
4621    ///     else { unreachable!() };
4622    ///
4623    /// // Pattern 2: manual polling (no async runtime)
4624    /// let mut promise = client.call(&req)?;
4625    /// loop {
4626    ///     executor.spin_once(core::time::Duration::from_millis(10));
4627    ///     if let Ok(Some(r)) = promise.try_recv() { break r; }
4628    /// }
4629    /// ```
4630    pub async fn spin_async(&mut self) -> ! {
4631        loop {
4632            self.spin_once(core::time::Duration::from_millis(1));
4633            core::future::poll_fn::<(), _>(|cx| {
4634                cx.waker().wake_by_ref();
4635                core::task::Poll::Pending
4636            })
4637            .await;
4638        }
4639    }
4640
4641    // ========================================================================
4642    // spin_one_period (no_std)
4643    // ========================================================================
4644
4645    /// Process one iteration and return remaining sleep time.
4646    ///
4647    /// This is `no_std` compatible — the caller is responsible for the actual
4648    /// delay using platform-specific sleep.
4649    ///
4650    /// # Arguments
4651    /// * `period_ms` - Target period in milliseconds
4652    /// * `elapsed_ms` - Time elapsed since last call (used for timer ticking)
4653    ///
4654    /// # Example
4655    ///
4656    /// ```ignore
4657    /// loop {
4658    ///     let r = executor.spin_one_period(10, elapsed_ms);
4659    ///     platform_sleep_ms(r.remaining_ms);
4660    /// }
4661    /// ```
4662    pub fn spin_one_period(&mut self, period_ms: u64, elapsed_ms: u64) -> SpinPeriodPollingResult {
4663        let result = self.spin_once(core::time::Duration::from_millis(elapsed_ms));
4664        SpinPeriodPollingResult {
4665            work: result,
4666            remaining_ms: period_ms.saturating_sub(elapsed_ms),
4667        }
4668    }
4669}
4670
4671// ============================================================================
4672// Parameter services (cfg param-services)
4673// ============================================================================
4674
4675#[cfg(feature = "param-services")]
4676impl Executor {
4677    /// Register the 6 ROS 2 parameter services for this node.
4678    ///
4679    /// Creates service servers for `get_parameters`, `set_parameters`,
4680    /// `set_parameters_atomically`, `list_parameters`, `describe_parameters`,
4681    /// and `get_parameter_types`.
4682    ///
4683    /// The service names follow the ROS 2 convention: `/{namespace}/{node_name}/{suffix}`.
4684    /// For the default namespace `/`, this becomes `/{node_name}/{suffix}` (e.g.
4685    /// `/sentinel/list_parameters`).
4686    ///
4687    /// Parameter services are stored outside the arena and don't consume
4688    /// callback slots.
4689    ///
4690    /// # Example
4691    ///
4692    /// ```ignore
4693    /// let config = ExecutorConfig::from_env().node_name("talker");
4694    /// let mut executor = Executor::open(&config)?;
4695    /// executor.register_parameter_services()?;
4696    /// executor.declare_parameter("start_value", ParameterValue::Integer(0));
4697    /// ```
4698    pub fn register_parameter_services(&mut self) -> Result<(), NodeError> {
4699        use crate::parameter_services::{
4700            DescribeParameters, GetParameterTypes, GetParameters, ListParameters,
4701            PARAM_SERVICE_BUFFER_SIZE, ParameterServiceServers, SetParameters,
4702            SetParametersAtomically,
4703        };
4704        use nros_core::RosService;
4705
4706        type PSrv<Svc> = super::handles::EmbeddedServiceServer<
4707            Svc,
4708            PARAM_SERVICE_BUFFER_SIZE,
4709            PARAM_SERVICE_BUFFER_SIZE,
4710        >;
4711
4712        // Build the node FQN from namespace + node_name, following ROS 2 convention.
4713        // Default namespace "/" → "/{node_name}"; otherwise "/{namespace}/{node_name}".
4714        let mut node_fqn = heapless::String::<256>::new();
4715        let ns: &str = &self.namespace;
4716        let nn: &str = &self.node_name;
4717        if ns.is_empty() || ns == "/" {
4718            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4719            node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4720        } else {
4721            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4722            node_fqn
4723                .push_str(ns.trim_matches('/'))
4724                .map_err(|_| NodeError::NameTooLong)?;
4725            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4726            node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4727        }
4728
4729        /// Build a service name like `{node_fqn}/{suffix}` and create the server handle.
4730        fn create_param_srv<Svc: RosService>(
4731            session: &mut session::ConcreteSession,
4732            node_fqn: &str,
4733            namespace: &str,
4734            node_name: &str,
4735            suffix: &str,
4736        ) -> Result<session::RmwServiceServer, NodeError> {
4737            let mut name = heapless::String::<256>::new();
4738            name.push_str(node_fqn)
4739                .map_err(|_| NodeError::NameTooLong)?;
4740            name.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4741            name.push_str(suffix).map_err(|_| NodeError::NameTooLong)?;
4742            let mut info = ServiceInfo::new(&name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
4743                .with_namespace(namespace);
4744            if !node_name.is_empty() {
4745                info = info.with_node_name(node_name);
4746            }
4747            session
4748                .create_service_server(&info, QosSettings::services_default())
4749                .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))
4750        }
4751
4752        let get_handle = create_param_srv::<GetParameters>(
4753            &mut self.session,
4754            &node_fqn,
4755            ns,
4756            nn,
4757            "get_parameters",
4758        )?;
4759        let set_handle = create_param_srv::<SetParameters>(
4760            &mut self.session,
4761            &node_fqn,
4762            ns,
4763            nn,
4764            "set_parameters",
4765        )?;
4766        let set_atomic_handle = create_param_srv::<SetParametersAtomically>(
4767            &mut self.session,
4768            &node_fqn,
4769            ns,
4770            nn,
4771            "set_parameters_atomically",
4772        )?;
4773        let list_handle = create_param_srv::<ListParameters>(
4774            &mut self.session,
4775            &node_fqn,
4776            ns,
4777            nn,
4778            "list_parameters",
4779        )?;
4780        let desc_handle = create_param_srv::<DescribeParameters>(
4781            &mut self.session,
4782            &node_fqn,
4783            ns,
4784            nn,
4785            "describe_parameters",
4786        )?;
4787        let types_handle = create_param_srv::<GetParameterTypes>(
4788            &mut self.session,
4789            &node_fqn,
4790            ns,
4791            nn,
4792            "get_parameter_types",
4793        )?;
4794
4795        let servers = ParameterServiceServers::new(
4796            PSrv::<GetParameters> {
4797                handle: get_handle,
4798                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4799                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4800                _phantom: core::marker::PhantomData,
4801            },
4802            PSrv::<SetParameters> {
4803                handle: set_handle,
4804                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4805                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4806                _phantom: core::marker::PhantomData,
4807            },
4808            PSrv::<SetParametersAtomically> {
4809                handle: set_atomic_handle,
4810                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4811                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4812                _phantom: core::marker::PhantomData,
4813            },
4814            PSrv::<ListParameters> {
4815                handle: list_handle,
4816                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4817                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4818                _phantom: core::marker::PhantomData,
4819            },
4820            PSrv::<DescribeParameters> {
4821                handle: desc_handle,
4822                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4823                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4824                _phantom: core::marker::PhantomData,
4825            },
4826            PSrv::<GetParameterTypes> {
4827                handle: types_handle,
4828                req_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4829                reply_buffer: [0u8; PARAM_SERVICE_BUFFER_SIZE],
4830                _phantom: core::marker::PhantomData,
4831            },
4832        );
4833
4834        self.params = Some(alloc::boxed::Box::new(
4835            crate::parameter_services::ParamState {
4836                server: nros_params::ParameterServer::new(),
4837                services: alloc::boxed::Box::new(servers),
4838                store: alloc::boxed::Box::new(nros_params::NullParamStore),
4839            },
4840        ));
4841
4842        Ok(())
4843    }
4844
4845    /// Phase 172.H — attach a parameter-override persistence backend.
4846    ///
4847    /// Call this **after** [`register_parameter_services`](Self::register_parameter_services)
4848    /// and after declaring the plan's default parameters, so persisted
4849    /// overrides win over compile-time defaults. It immediately overlays any
4850    /// values the backend already holds onto the declared parameters, then
4851    /// keeps the store to flush future runtime `set_parameters` changes (the
4852    /// executor flushes from its spin loop whenever a value changed).
4853    ///
4854    /// Returns [`NodeError::NotInitialized`] if parameter services have not
4855    /// been registered yet.
4856    pub fn enable_parameter_persistence(
4857        &mut self,
4858        store: alloc::boxed::Box<dyn nros_params::ParamStore>,
4859    ) -> Result<(), NodeError> {
4860        let state = self.params.as_mut().ok_or(NodeError::NotInitialized)?;
4861        // Overlay persisted overrides onto the declared defaults.
4862        store.load(&mut |name, value| {
4863            let _ = state.server.set(name, value);
4864        });
4865        // Loading persisted state is restoration, not a new runtime change —
4866        // don't let it trigger an immediate re-flush.
4867        state.server.take_dirty();
4868        state.store = store;
4869        Ok(())
4870    }
4871
4872    /// Phase 172.H — like [`enable_parameter_persistence`](Self::enable_parameter_persistence)
4873    /// but boxes the backend for you, so callers (and generated code) need no
4874    /// `Box` import.
4875    pub fn enable_parameter_persistence_with<S>(&mut self, store: S) -> Result<(), NodeError>
4876    where
4877        S: nros_params::ParamStore + 'static,
4878    {
4879        self.enable_parameter_persistence(alloc::boxed::Box::new(store))
4880    }
4881}
4882
4883// ============================================================================
4884// Lifecycle services (cfg lifecycle-services)
4885// ============================================================================
4886
4887#[cfg(feature = "lifecycle-services")]
4888impl Executor {
4889    /// Register the five REP-2002 lifecycle services on this executor.
4890    ///
4891    /// After this call, `ros2 lifecycle set|get|list|nodes` can drive the
4892    /// stored [`LifecyclePollingNodeCtx`](crate::lifecycle::LifecyclePollingNodeCtx)
4893    /// through the node's lifecycle. The state machine is created fresh
4894    /// (starting in `Unconfigured`); callers register their transition
4895    /// callbacks via [`Executor::lifecycle_state_machine_mut`].
4896    ///
4897    /// # Safety
4898    /// Registered callbacks on the state machine are C FFI function pointers.
4899    /// The caller must keep the callback code and any context it captures
4900    /// valid for as long as the executor processes services.
4901    pub fn register_lifecycle_services(&mut self) -> Result<(), NodeError> {
4902        use crate::{
4903            lifecycle::LifecyclePollingNodeCtx,
4904            lifecycle_services::{
4905                ChangeState, GetAvailableStates, GetAvailableTransitions, GetState,
4906                LIFECYCLE_SERVICE_BUFFER_SIZE, LifecycleRuntimeState, LifecycleServiceServers,
4907            },
4908        };
4909        use nros_core::RosService;
4910
4911        type LcSrv<Svc> = super::handles::EmbeddedServiceServer<
4912            Svc,
4913            LIFECYCLE_SERVICE_BUFFER_SIZE,
4914            LIFECYCLE_SERVICE_BUFFER_SIZE,
4915        >;
4916
4917        // Build the node FQN from namespace + node_name (same convention as
4918        // register_parameter_services).
4919        let mut node_fqn = heapless::String::<256>::new();
4920        let ns: &str = &self.namespace;
4921        let nn: &str = &self.node_name;
4922        if ns.is_empty() || ns == "/" {
4923            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4924            node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4925        } else {
4926            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4927            node_fqn
4928                .push_str(ns.trim_matches('/'))
4929                .map_err(|_| NodeError::NameTooLong)?;
4930            node_fqn.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4931            node_fqn.push_str(nn).map_err(|_| NodeError::NameTooLong)?;
4932        }
4933
4934        fn create_lc_srv<Svc: RosService>(
4935            session: &mut session::ConcreteSession,
4936            node_fqn: &str,
4937            namespace: &str,
4938            node_name: &str,
4939            suffix: &str,
4940        ) -> Result<session::RmwServiceServer, NodeError> {
4941            let mut name = heapless::String::<256>::new();
4942            name.push_str(node_fqn)
4943                .map_err(|_| NodeError::NameTooLong)?;
4944            name.push_str("/").map_err(|_| NodeError::NameTooLong)?;
4945            name.push_str(suffix).map_err(|_| NodeError::NameTooLong)?;
4946            let mut info = ServiceInfo::new(&name, Svc::SERVICE_NAME, Svc::SERVICE_HASH)
4947                .with_namespace(namespace);
4948            if !node_name.is_empty() {
4949                info = info.with_node_name(node_name);
4950            }
4951            session
4952                .create_service_server(&info, QosSettings::services_default())
4953                .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))
4954        }
4955
4956        let cs_handle =
4957            create_lc_srv::<ChangeState>(&mut self.session, &node_fqn, ns, nn, "change_state")?;
4958        let gs_handle =
4959            create_lc_srv::<GetState>(&mut self.session, &node_fqn, ns, nn, "get_state")?;
4960        let gas_handle = create_lc_srv::<GetAvailableStates>(
4961            &mut self.session,
4962            &node_fqn,
4963            ns,
4964            nn,
4965            "get_available_states",
4966        )?;
4967        let gat_handle = create_lc_srv::<GetAvailableTransitions>(
4968            &mut self.session,
4969            &node_fqn,
4970            ns,
4971            nn,
4972            "get_available_transitions",
4973        )?;
4974        let gtg_handle = create_lc_srv::<GetAvailableTransitions>(
4975            &mut self.session,
4976            &node_fqn,
4977            ns,
4978            nn,
4979            "get_transition_graph",
4980        )?;
4981
4982        let servers = LifecycleServiceServers::new(
4983            LcSrv::<ChangeState> {
4984                handle: cs_handle,
4985                req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4986                reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4987                _phantom: core::marker::PhantomData,
4988            },
4989            LcSrv::<GetState> {
4990                handle: gs_handle,
4991                req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4992                reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4993                _phantom: core::marker::PhantomData,
4994            },
4995            LcSrv::<GetAvailableStates> {
4996                handle: gas_handle,
4997                req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4998                reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
4999                _phantom: core::marker::PhantomData,
5000            },
5001            LcSrv::<GetAvailableTransitions> {
5002                handle: gat_handle,
5003                req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5004                reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5005                _phantom: core::marker::PhantomData,
5006            },
5007            LcSrv::<GetAvailableTransitions> {
5008                handle: gtg_handle,
5009                req_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5010                reply_buffer: [0u8; LIFECYCLE_SERVICE_BUFFER_SIZE],
5011                _phantom: core::marker::PhantomData,
5012            },
5013        );
5014
5015        self.lifecycle = Some(alloc::boxed::Box::new(LifecycleRuntimeState {
5016            state_machine: LifecyclePollingNodeCtx::new(),
5017            services: alloc::boxed::Box::new(servers),
5018        }));
5019
5020        Ok(())
5021    }
5022
5023    /// Mutable access to the lifecycle state machine, if registered.
5024    ///
5025    /// Used to register transition callbacks before spinning and to read the
5026    /// current state from application code.
5027    pub fn lifecycle_state_machine_mut(
5028        &mut self,
5029    ) -> Option<&mut crate::lifecycle::LifecyclePollingNodeCtx> {
5030        self.lifecycle.as_mut().map(|lc| &mut lc.state_machine)
5031    }
5032
5033    /// Immutable access to the lifecycle state machine, if registered.
5034    pub fn lifecycle_state_machine(&self) -> Option<&crate::lifecycle::LifecyclePollingNodeCtx> {
5035        self.lifecycle.as_ref().map(|lc| &lc.state_machine)
5036    }
5037}
5038
5039// ============================================================================
5040// Parameter declaration API (cfg param-services)
5041// ============================================================================
5042
5043#[cfg(feature = "param-services")]
5044impl Executor {
5045    /// Declare a parameter with a value. Returns `true` if successful.
5046    pub fn declare_parameter(&mut self, name: &str, value: nros_params::ParameterValue) -> bool {
5047        if let Some(params) = &mut self.params {
5048            params.server.declare(name, value)
5049        } else {
5050            false
5051        }
5052    }
5053
5054    /// Declare a parameter with a value and descriptor. Returns `true` if successful.
5055    pub fn declare_parameter_with_descriptor(
5056        &mut self,
5057        name: &str,
5058        value: nros_params::ParameterValue,
5059        descriptor: nros_params::ParameterDescriptor,
5060    ) -> bool {
5061        if let Some(params) = &mut self.params {
5062            params
5063                .server
5064                .declare_with_descriptor(name, value, Some(descriptor))
5065        } else {
5066            false
5067        }
5068    }
5069
5070    /// Get a parameter value by name.
5071    pub fn get_parameter(&self, name: &str) -> Option<&nros_params::ParameterValue> {
5072        self.params.as_ref()?.server.get(name)
5073    }
5074
5075    /// Get an integer parameter value by name (convenience).
5076    pub fn get_parameter_integer(&self, name: &str) -> Option<i64> {
5077        self.params.as_ref()?.server.get_integer(name)
5078    }
5079
5080    /// Get a reference to the parameter server (if registered).
5081    pub fn params(&self) -> Option<&nros_params::ParameterServer> {
5082        self.params.as_ref().map(|p| &p.server)
5083    }
5084
5085    /// Get a mutable reference to the parameter server (if registered).
5086    pub fn params_mut(&mut self) -> Option<&mut nros_params::ParameterServer> {
5087        self.params.as_mut().map(|p| &mut p.server)
5088    }
5089
5090    /// Create a typed parameter builder (rclrs-compatible API).
5091    ///
5092    /// Returns a [`ParameterBuilder`] for fluent parameter declaration with
5093    /// `.default()`, `.description()`, `.range()`, and terminal methods
5094    /// `.mandatory()`, `.optional()`, or `.read_only()`.
5095    ///
5096    /// Returns [`NodeError::NotInitialized`] if parameter services have
5097    /// not been registered yet — call [`register_parameter_services`]
5098    /// first.
5099    ///
5100    /// # Example
5101    ///
5102    /// ```ignore
5103    /// let max_speed = executor.parameter::<f64>("max_speed")?
5104    ///     .default(25.0)
5105    ///     .description("Maximum velocity (m/s)")
5106    ///     .read_only()?;
5107    /// ```
5108    ///
5109    /// [`ParameterBuilder`]: nros_params::ParameterBuilder
5110    /// [`register_parameter_services`]: Self::register_parameter_services
5111    pub fn parameter<'a, T: nros_params::ParameterVariant>(
5112        &'a mut self,
5113        name: &'a str,
5114    ) -> Result<nros_params::ParameterBuilder<'a, T>, NodeError> {
5115        let server = self
5116            .params
5117            .as_mut()
5118            .map(|p| &mut p.server)
5119            .ok_or(NodeError::NotInitialized)?;
5120        Ok(nros_params::ParameterBuilder::new(server, name))
5121    }
5122}
5123
5124// ============================================================================
5125// std-gated spin and halt methods
5126// ============================================================================
5127
5128#[cfg(feature = "std")]
5129impl Executor {
5130    /// Blocking spin loop with configurable exit conditions.
5131    ///
5132    /// Runs until one of:
5133    /// - [`halt()`](Self::halt) is called (from another thread or signal handler)
5134    /// - Timeout expires (if set in options)
5135    /// - Max callbacks reached (if set in options)
5136    /// - `only_next` is true (single iteration)
5137    ///
5138    /// # Example
5139    ///
5140    /// ```ignore
5141    /// // Spin forever until halted
5142    /// executor.spin_blocking(SpinOptions::default())?;
5143    ///
5144    /// // Spin with 5-second timeout
5145    /// executor.spin_blocking(SpinOptions::new().timeout_ms(5000))?;
5146    ///
5147    /// // Single iteration
5148    /// executor.spin_blocking(SpinOptions::spin_once())?;
5149    /// ```
5150    pub fn spin_blocking(&mut self, opts: SpinOptions) -> Result<(), NodeError> {
5151        use std::time::{Duration, Instant};
5152
5153        const POLL_INTERVAL: core::time::Duration = core::time::Duration::from_millis(10);
5154
5155        let start = Instant::now();
5156        let timeout = opts.timeout_ms.map(Duration::from_millis);
5157        let mut total_callbacks = 0usize;
5158
5159        self.halt_flag
5160            .store(false, std::sync::atomic::Ordering::SeqCst);
5161
5162        loop {
5163            if self.halt_flag.load(std::sync::atomic::Ordering::SeqCst) {
5164                break;
5165            }
5166
5167            if timeout.is_some_and(|t| start.elapsed() >= t) {
5168                break;
5169            }
5170
5171            let result = self.spin_once(POLL_INTERVAL);
5172            total_callbacks += result.total();
5173
5174            if opts.max_callbacks.is_some_and(|max| total_callbacks >= max) {
5175                break;
5176            }
5177
5178            if opts.only_next {
5179                break;
5180            }
5181        }
5182
5183        Ok(())
5184    }
5185
5186    /// Execute one period with wall-clock overrun detection.
5187    ///
5188    /// Calls [`spin_once()`](Self::spin_once), measures wall-clock time, sleeps
5189    /// for the remainder if under budget.
5190    ///
5191    /// # Example
5192    ///
5193    /// ```ignore
5194    /// let period = std::time::Duration::from_millis(10);
5195    /// let result = executor.spin_one_period_timed(period);
5196    /// if result.overrun {
5197    ///     log::warn!("Period overrun: {:?}", result.elapsed);
5198    /// }
5199    /// ```
5200    pub fn spin_one_period_timed(
5201        &mut self,
5202        period: std::time::Duration,
5203    ) -> super::types::SpinPeriodResult {
5204        let start = std::time::Instant::now();
5205        let result = self.spin_once(period);
5206        let elapsed = start.elapsed();
5207        let overrun = elapsed > period;
5208        if !overrun {
5209            std::thread::sleep(period - elapsed);
5210        }
5211        super::types::SpinPeriodResult {
5212            work: result,
5213            overrun,
5214            elapsed,
5215        }
5216    }
5217
5218    /// Spin at a fixed rate with drift compensation. Blocks until halted.
5219    ///
5220    /// Uses wall-clock time to maintain the target rate. The next invocation
5221    /// time is accumulated (not reset to `now + period`) to prevent cumulative
5222    /// drift.
5223    ///
5224    /// # Example
5225    ///
5226    /// ```ignore
5227    /// // 100Hz control loop — blocks until halt() is called
5228    /// executor.spin_period(std::time::Duration::from_millis(10))?;
5229    /// ```
5230    pub fn spin_period(&mut self, period: std::time::Duration) -> Result<(), NodeError> {
5231        self.halt_flag
5232            .store(false, std::sync::atomic::Ordering::SeqCst);
5233        let mut next_invocation = std::time::Instant::now() + period;
5234
5235        loop {
5236            if self.halt_flag.load(std::sync::atomic::Ordering::SeqCst) {
5237                break;
5238            }
5239
5240            self.spin_once(period);
5241
5242            let now = std::time::Instant::now();
5243            if now < next_invocation {
5244                std::thread::sleep(next_invocation - now);
5245            }
5246            // Accumulate to prevent drift (not = now + period)
5247            next_invocation += period;
5248        }
5249        Ok(())
5250    }
5251
5252    /// Request the executor to stop spinning.
5253    ///
5254    /// Sets a flag that causes [`spin_blocking()`](Self::spin_blocking) or
5255    /// [`spin_period()`](Self::spin_period) to exit on the next iteration.
5256    /// Safe to call from another thread or signal handler.
5257    ///
5258    /// Also raises the Phase 104.C.6 wake flag so a `spin_once` already
5259    /// blocked inside a backend's `drive_io` falls through to the halt
5260    /// check on its next loop iteration instead of waiting out its full
5261    /// `timeout_ms` first.
5262    pub fn halt(&self) {
5263        self.halt_flag
5264            .store(true, std::sync::atomic::Ordering::SeqCst);
5265        self.wake_flag
5266            .store(true, std::sync::atomic::Ordering::SeqCst);
5267    }
5268
5269    /// Phase 110.D.b — move this Executor onto a fresh OS thread,
5270    /// apply a per-thread scheduling policy via the caller-supplied
5271    /// `apply_policy` function, and run the spin loop until
5272    /// [`ThreadHandle::halt`] fires.
5273    ///
5274    /// The function-pointer indirection on `apply_policy` lets the
5275    /// caller pass any platform's `PlatformScheduler::set_current_thread_policy`
5276    /// without forcing `Executor` to be generic over the platform —
5277    /// keeps the existing `Executor` type stable.
5278    ///
5279    /// Multi-executor preemption (the actual hard-RT win) comes from
5280    /// the OS scheduler — call `open_threaded` once per criticality
5281    /// tier, each with its own policy / priority. The kernel handles
5282    /// preemption across executors; within a single executor,
5283    /// dispatch remains non-preemptive (110.A–C bucketed sets).
5284    ///
5285    /// # Safety
5286    ///
5287    /// Moves `self` across thread boundaries. `Executor` contains a
5288    /// raw `*mut session::ConcreteSession` when constructed via
5289    /// `from_session_ptr`; the caller must ensure that pointer's
5290    /// referent stays valid across the lifetime of the spawned thread
5291    /// and that no other thread mutates the session concurrently.
5292    /// `from_session` (Owned) is safer — `ConcreteSession` ownership
5293    /// transfers cleanly into the thread.
5294    #[cfg(feature = "std")]
5295    pub unsafe fn open_threaded(
5296        self,
5297        policy: nros_platform_api::SchedPolicy,
5298        apply_policy: fn(
5299            nros_platform_api::SchedPolicy,
5300        ) -> Result<(), nros_platform_api::SchedError>,
5301        spin_period: core::time::Duration,
5302    ) -> ThreadHandle {
5303        let halt = std::sync::Arc::clone(&self.halt_flag);
5304        // SAFETY: Send is asserted via `unsafe impl Send for Executor`
5305        // below; the caller's safety contract on `from_session_ptr`
5306        // covers the pointer-validity invariant.
5307        let mut executor = self;
5308        let join = std::thread::spawn(move || {
5309            // Apply the requested OS scheduling policy to this fresh
5310            // thread. Failure is reported but not propagated — a
5311            // runtime that fails to lift to SCHED_FIFO still spins
5312            // correctly at SCHED_OTHER (just without RT guarantees).
5313            let _ = apply_policy(policy);
5314            while !executor.is_halted() {
5315                executor.spin_once(spin_period);
5316            }
5317        });
5318        ThreadHandle {
5319            join: Some(join),
5320            halt,
5321        }
5322    }
5323
5324    /// Check if halt has been requested.
5325    pub fn is_halted(&self) -> bool {
5326        self.halt_flag.load(std::sync::atomic::Ordering::SeqCst)
5327    }
5328
5329    /// Get a clone of the halt flag for use in signal handlers or other threads.
5330    ///
5331    /// # Example
5332    ///
5333    /// ```ignore
5334    /// let halt = executor.halt_flag();
5335    /// std::thread::spawn(move || {
5336    ///     std::thread::sleep(Duration::from_secs(5));
5337    ///     halt.store(true, Ordering::SeqCst);
5338    /// });
5339    /// executor.spin_blocking(SpinOptions::default())?;
5340    /// ```
5341    pub fn halt_flag(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
5342        self.halt_flag.clone()
5343    }
5344
5345    /// Phase 104.C.6 — wake the executor from another thread / ISR /
5346    /// signal handler.
5347    ///
5348    /// Sets the shared `wake_flag`. The next `spin_once` swap-clears the
5349    /// flag, skips the blocking wait on the primary session, and polls
5350    /// every session non-blockingly so whatever queued the wake is
5351    /// observed in a single iteration. Idempotent — multiple `wake()`
5352    /// calls collapse into one observed wake per `spin_once`.
5353    pub fn wake(&self) {
5354        self.wake_flag
5355            .store(true, std::sync::atomic::Ordering::SeqCst);
5356    }
5357
5358    /// Phase 104.C.6 — clone of the shared wake flag for cross-thread
5359    /// use (signal handlers, foreign threads, future per-backend vtable
5360    /// wake hooks).
5361    ///
5362    /// # Example
5363    ///
5364    /// ```ignore
5365    /// let wake = executor.wake_handle();
5366    /// std::thread::spawn(move || {
5367    ///     // ... compute something ...
5368    ///     // hand off to executor by setting the flag.
5369    ///     wake.store(true, Ordering::SeqCst);
5370    /// });
5371    /// loop { executor.spin_once(Duration::from_millis(100)); }
5372    /// ```
5373    pub fn wake_handle(&self) -> std::sync::Arc<std::sync::atomic::AtomicBool> {
5374        self.wake_flag.clone()
5375    }
5376}
5377
5378/// Phase 110.E.b — opaque per-platform timer handle. Stores the
5379/// raw platform handle (POSIX `timer_t` boxed via `PosixTimerHandle`,
5380/// FreeRTOS `TimerHandle_t`, etc.) plus a destroy thunk so the
5381/// Executor can clean up without being generic over the platform.
5382///
5383/// Caller of `register_sporadic_timer` builds this via
5384/// `OpaqueTimerHandle::new(handle, destroy_fn)` after their
5385/// `PlatformTimer::create_periodic` call returns.
5386#[cfg(feature = "alloc")]
5387pub struct OpaqueTimerHandle {
5388    handle: *mut core::ffi::c_void,
5389    destroy_fn: extern "C" fn(*mut core::ffi::c_void),
5390}
5391
5392#[cfg(feature = "alloc")]
5393unsafe impl Send for OpaqueTimerHandle {}
5394#[cfg(feature = "alloc")]
5395unsafe impl Sync for OpaqueTimerHandle {}
5396
5397#[cfg(feature = "alloc")]
5398impl OpaqueTimerHandle {
5399    /// # Safety
5400    /// `handle` must be a live platform-specific timer handle that
5401    /// `destroy_fn` knows how to drop. Caller surrenders ownership
5402    /// of the underlying handle to the Executor.
5403    pub unsafe fn new(
5404        handle: *mut core::ffi::c_void,
5405        destroy_fn: extern "C" fn(*mut core::ffi::c_void),
5406    ) -> Self {
5407        Self { handle, destroy_fn }
5408    }
5409}
5410
5411#[cfg(feature = "alloc")]
5412impl Drop for OpaqueTimerHandle {
5413    fn drop(&mut self) {
5414        if !self.handle.is_null() {
5415            (self.destroy_fn)(self.handle);
5416            self.handle = core::ptr::null_mut();
5417        }
5418    }
5419}
5420
5421/// Handle returned from [`Executor::open_threaded`]. Holds the
5422/// spawned thread's join handle and a clone of the executor's halt
5423/// flag. Drop runs `halt() + join()` so the thread can't outlive the
5424/// handle.
5425#[cfg(feature = "std")]
5426pub struct ThreadHandle {
5427    join: Option<std::thread::JoinHandle<()>>,
5428    halt: std::sync::Arc<std::sync::atomic::AtomicBool>,
5429}
5430
5431#[cfg(feature = "std")]
5432impl ThreadHandle {
5433    /// Signal the spawned executor thread to stop. The thread exits
5434    /// on its next `spin_once` iteration.
5435    pub fn halt(&self) {
5436        self.halt.store(true, std::sync::atomic::Ordering::SeqCst);
5437    }
5438
5439    /// Wait for the spawned thread to exit. Returns the join result.
5440    /// After `join`, calling it again is a no-op (returns `Ok(())`).
5441    pub fn join(mut self) -> std::thread::Result<()> {
5442        self.halt();
5443        match self.join.take() {
5444            Some(j) => j.join(),
5445            None => Ok(()),
5446        }
5447    }
5448}
5449
5450#[cfg(feature = "std")]
5451impl Drop for ThreadHandle {
5452    fn drop(&mut self) {
5453        self.halt.store(true, std::sync::atomic::Ordering::SeqCst);
5454        if let Some(j) = self.join.take() {
5455            let _ = j.join();
5456        }
5457    }
5458}
5459
5460// SAFETY: Phase 110.D.b — `Executor` contains a raw `*mut
5461// session::ConcreteSession` only on the `from_session_ptr` (Borrowed)
5462// path; the `from_session` (Owned) path is plain Send-able. The
5463// `unsafe fn open_threaded` entry point documents the safety
5464// contract for Borrowed sessions; for Owned sessions the Send claim
5465// is unconditional.
5466#[cfg(feature = "std")]
5467unsafe impl Send for Executor {}
5468
5469// =============================================================================
5470// Phase 110.F — `OsPriorityWorker` + `WorkItem`
5471// =============================================================================
5472
5473/// One worker thread per distinct `SchedContext.os_pri` value used
5474/// across registered SCs. Self-elevates via the executor's stored
5475/// `apply_policy` fn pointer at startup; drains a bounded mpsc
5476/// mailbox of `WorkItem`s. Phase 110.F.
5477#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5478pub(crate) struct OsPriorityWorker {
5479    sender: std::sync::mpsc::Sender<WorkItem>,
5480    halt: std::sync::Arc<std::sync::atomic::AtomicBool>,
5481    join: Option<std::thread::JoinHandle<()>>,
5482}
5483
5484#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5485struct WorkItem {
5486    arena_base: usize,
5487    arena_offset: usize,
5488    try_process: unsafe fn(*mut u8, u64) -> Result<bool, nros_rmw::TransportError>,
5489    delta_ms: u64,
5490}
5491
5492// SAFETY: Phase 110.F per-DescIdx exclusive-access invariant — the
5493// activator scan in `spin_once` only sends a `WorkItem` for a given
5494// `arena_offset` to one worker per cycle, and won't re-send the same
5495// offset until the worker drains the previous one (`os_pri` dispatch
5496// is the worker's exclusive path; cooperative dispatch is skipped
5497// for SCs with non-zero `os_pri`). The fn pointer is Send-clean.
5498#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5499unsafe impl Send for WorkItem {}
5500
5501#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5502impl OsPriorityWorker {
5503    fn spawn(
5504        os_pri: u8,
5505        apply_policy: fn(
5506            nros_platform_api::SchedPolicy,
5507        ) -> Result<(), nros_platform_api::SchedError>,
5508    ) -> Self {
5509        use std::sync::atomic::Ordering;
5510        let (tx, rx) = std::sync::mpsc::channel::<WorkItem>();
5511        let halt = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
5512        let halt_w = std::sync::Arc::clone(&halt);
5513        let join = std::thread::Builder::new()
5514            .name(alloc::format!("nros-os-pri-{os_pri}"))
5515            .spawn(move || {
5516                // Self-elevate. Failure is logged but doesn't stop
5517                // the worker — running at SCHED_OTHER is still
5518                // correct, just without the priority guarantee.
5519                let _ = apply_policy(nros_platform_api::SchedPolicy::Fifo { os_pri });
5520                while !halt_w.load(Ordering::Acquire) {
5521                    match rx.recv_timeout(core::time::Duration::from_millis(10)) {
5522                        Ok(item) => {
5523                            // SAFETY: arena_base + arena_offset point
5524                            // into the executor's arena, which
5525                            // outlives the worker per Drop ordering
5526                            // (Executor::Drop halts + joins workers
5527                            // before the arena is freed).
5528                            let data = (item.arena_base as *mut u8).wrapping_add(item.arena_offset);
5529                            let _ = unsafe { (item.try_process)(data, item.delta_ms) };
5530                        }
5531                        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
5532                        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
5533                    }
5534                }
5535            })
5536            // SAFETY-invariant: spawn failure means the OS refused a new
5537            // thread (resource exhaustion). This runs once per priority
5538            // level at lazy worker setup — not on any hot/spin path — and
5539            // a runtime that cannot create its priority worker has no
5540            // correct way to continue, so fail fast at the setup point.
5541            .expect("os-priority worker spawn");
5542        Self {
5543            sender: tx,
5544            halt,
5545            join: Some(join),
5546        }
5547    }
5548
5549    fn try_dispatch(&self, item: WorkItem) -> bool {
5550        self.sender.send(item).is_ok()
5551    }
5552}
5553
5554#[cfg(all(feature = "std", feature = "scheduler-os-priority"))]
5555impl Drop for OsPriorityWorker {
5556    fn drop(&mut self) {
5557        self.halt.store(true, std::sync::atomic::Ordering::Release);
5558        if let Some(j) = self.join.take() {
5559            let _ = j.join();
5560        }
5561    }
5562}
5563
5564impl Drop for Executor {
5565    fn drop(&mut self) {
5566        // Phase 258 (Track 2, 2a) — release executor-owned component cells
5567        // first (before the arena entries), so a component's `drop`
5568        // trampoline can still touch its own (cell-owned) state. Each slot
5569        // owns a leaked `Arc<ComponentCell>`; its `drop` reconstitutes +
5570        // drops that Arc exactly once.
5571        for slot in self.component_slots.iter() {
5572            // SAFETY: `slot.state` is the leaked cell enrolled via
5573            // `enroll_component`; `slot.drop` is its matching trampoline, run
5574            // exactly once here (slots are not removed before Drop).
5575            unsafe {
5576                (slot.drop)(slot.state);
5577            }
5578        }
5579        let arena_ptr = self.arena.as_mut_ptr() as *mut u8;
5580        for meta in self.entries.iter().flatten() {
5581            // SAFETY: each entry was written by `ptr::write` in `add_*` and
5582            // has not been dropped yet. `drop_fn` matches the concrete type.
5583            unsafe {
5584                let data_ptr = arena_ptr.add(meta.offset);
5585                (meta.drop_fn)(data_ptr);
5586            }
5587        }
5588    }
5589}
5590
5591#[cfg(all(test, not(feature = "rmw-cffi")))]
5592mod dispatch_registry_tests {
5593    //! Phase 216 follow-up — `Executor::register_dispatch_slot` +
5594    //! `Executor::dispatch_callback` round-trip.
5595    //!
5596    //! Uses `MockSession` (same pattern as
5597    //! `lifecycle_services::tests::mock_integration`) so the test
5598    //! doesn't need a live RMW backend. Gated `not(feature =
5599    //! "rmw-cffi")` because under `rmw-cffi` the `ConcreteSession`
5600    //! type alias resolves to the cffi session, which `MockSession`
5601    //! can't impersonate.
5602
5603    extern crate alloc;
5604
5605    use super::Executor;
5606    use crate::mock::MockSession;
5607    use std::sync::Mutex;
5608
5609    static CAPTURED: Mutex<alloc::vec::Vec<(usize, alloc::vec::Vec<u8>, usize)>> =
5610        Mutex::new(alloc::vec::Vec::new());
5611
5612    /// Test trampoline matching the per-Node
5613    /// `__nros_node_<pkg>_on_callback` ABI shape (Phase 216.A.5).
5614    unsafe extern "C" fn recording_on_callback(
5615        state: *mut core::ffi::c_void,
5616        cb_id_ptr: *const u8,
5617        cb_id_len: usize,
5618        ctx: *mut core::ffi::c_void,
5619    ) {
5620        // SAFETY: caller (test body below) holds storage live;
5621        // `cb_id_ptr..len` points into a `&str` literal.
5622        let cb_id_bytes = unsafe { core::slice::from_raw_parts(cb_id_ptr, cb_id_len).to_vec() };
5623        let mut guard = CAPTURED.lock().expect("CAPTURED poisoned");
5624        guard.push((state as usize, cb_id_bytes, ctx as usize));
5625    }
5626
5627    #[test]
5628    fn register_dispatch_slot_round_trip() {
5629        let session = MockSession::new();
5630        let mut executor: Executor = Executor::from_session(session);
5631
5632        // Pre-condition: empty registry.
5633        assert_eq!(executor.dispatch_slot_count(), 0);
5634
5635        // Two distinct "states" so we prove every slot gets called
5636        // with its OWN state.
5637        let mut state_blob_a: u32 = 0xABCD_0001;
5638        let mut state_blob_b: u32 = 0xABCD_0002;
5639        let state_a_ptr = &mut state_blob_a as *mut u32 as *mut core::ffi::c_void;
5640        let state_b_ptr = &mut state_blob_b as *mut u32 as *mut core::ffi::c_void;
5641
5642        executor
5643            .register_dispatch_slot(state_a_ptr, recording_on_callback)
5644            .expect("register slot A");
5645        executor
5646            .register_dispatch_slot(state_b_ptr, recording_on_callback)
5647            .expect("register slot B");
5648        assert_eq!(executor.dispatch_slot_count(), 2);
5649
5650        let mut ctx_blob: u32 = 0xFEED_BEEF;
5651        let ctx_ptr = &mut ctx_blob as *mut u32 as *mut core::ffi::c_void;
5652        let cb_id = "/talker/timer/publish";
5653
5654        CAPTURED.lock().expect("CAPTURED poisoned").clear();
5655        executor.dispatch_callback(cb_id, ctx_ptr);
5656
5657        let captured = CAPTURED.lock().expect("CAPTURED poisoned").clone();
5658        assert_eq!(
5659            captured.len(),
5660            2,
5661            "every registered slot must be invoked — linear scan, \
5662             no self-filter at the registry layer"
5663        );
5664        // heapless::Vec iterates in insertion order.
5665        assert_eq!(captured[0].0, state_a_ptr as usize, "slot A's state");
5666        assert_eq!(captured[1].0, state_b_ptr as usize, "slot B's state");
5667        for (idx, capture) in captured.iter().enumerate() {
5668            assert_eq!(
5669                capture.1.as_slice(),
5670                cb_id.as_bytes(),
5671                "slot {idx} cb_id bytes round-trip"
5672            );
5673            assert_eq!(
5674                capture.2, ctx_ptr as usize,
5675                "slot {idx} ctx pointer round-trip"
5676            );
5677        }
5678    }
5679
5680    #[test]
5681    fn register_dispatch_slot_capacity_full() {
5682        let session = MockSession::new();
5683        let mut executor: Executor = Executor::from_session(session);
5684
5685        let mut state_blob: u32 = 0;
5686        let state_ptr = &mut state_blob as *mut u32 as *mut core::ffi::c_void;
5687
5688        // `MAX_NODES` slots fit; the next one must error.
5689        for _ in 0..crate::config::MAX_NODES {
5690            executor
5691                .register_dispatch_slot(state_ptr, recording_on_callback)
5692                .expect("under-capacity push must succeed");
5693        }
5694        assert_eq!(executor.dispatch_slot_count(), crate::config::MAX_NODES);
5695        let overflow = executor.register_dispatch_slot(state_ptr, recording_on_callback);
5696        assert!(
5697            overflow.is_err(),
5698            "over-capacity push must return Err(()) — raise \
5699             NROS_EXECUTOR_MAX_NODES at build time to grow the registry"
5700        );
5701    }
5702}