Skip to main content

nros/
node_runtime.rs

1//! Phase 212.M.5.a.2 — Executor-backed `NodeRuntime` /
2//! `DeclaredNodeRuntime` for nano-ros.
3//!
4//! [`MetadataRecorder`](crate::node_metadata::MetadataRecorder)
5//! (the planner sink) binds the
6//! [`Node`](crate::node::Node) /
7//! [`ExecutableNode`](crate::node::ExecutableNode)
8//! traits to a pure metadata target. This module is the missing twin:
9//! it binds the same traits to a live [`Executor`](crate::Executor) so
10//! a Node pkg can actually run — nodes, publishers,
11//! subscriptions, timers materialise as real executor handles, and
12//! every fired callback dispatches into
13//! [`ExecutableNode::on_callback`] with the right
14//! [`CallbackId`].
15//!
16//! Shape:
17//!
18//! ```ignore
19//! use nros::{Executor, ExecutorConfig};
20//! use nros::node_runtime::ExecutorNodeRuntime;
21//!
22//! let cfg = ExecutorConfig::from_env().node_name("talker_main");
23//! let executor = Executor::open(&cfg).unwrap();
24//! let mut runtime = ExecutorNodeRuntime::from_executor(executor);
25//! let _handle = runtime.register_node::<Talker>().unwrap();
26//! runtime.spin().unwrap();
27//! ```
28//!
29//! Owned-spin / board consumer: the macro-emitted `<pkg>::register(runtime)`
30//! wrapper (Phase 258, Track 2) installs each Node onto the runtime's executor
31//! through the uniform `install_node_typed` seam — the same seam the C/C++
32//! typed entries use. The codegen `run_plan(runtime)` body + `nros::main!`
33//! owned-spin loop drive one `register(runtime)?` per launch-XML `<node>`,
34//! then `runtime.spin()`. (The retired Phase 212.M.5.a four-fn-ptr BSP-baker
35//! ABI — `register_dispatch_slot` / `nros_run_components` — is gone.)
36//!
37//! ## Coverage today (Phase 212.M.5.a.2)
38//!
39//! Publishers, subscriptions, and repeating timers wire end-to-end:
40//! the live executor delivers callbacks; the bound
41//! [`ExecutableNode::on_callback`] body runs with a
42//! [`CallbackCtx`] backed by the per-component publisher resolver.
43//! Service servers / clients and action servers / clients wire
44//! end-to-end too (Phase 212.M-F.23): `create_entity` registers them on
45//! the executor with C-ABI trampolines that route inbound requests /
46//! goals into the component's `on_callback`, and the tick-time client /
47//! action surface ([`TickCtx`]) is backed by `RuntimeClientDispatch` /
48//! `RuntimeActions` over the live executor. Parameters are still a
49//! follow-up (registration succeeds; param callbacks don't fire yet).
50
51#![cfg(feature = "rmw-cffi")]
52
53extern crate alloc;
54
55use alloc::{boxed::Box, string::String, vec::Vec};
56use core::{cell::RefCell, marker::PhantomData, time::Duration};
57
58use portable_atomic::{AtomicUsize, Ordering};
59use portable_atomic_util::Arc;
60
61use crate::{
62    EmbeddedRawPublisher, Executor, GoalId, GoalStatus,
63    node::{
64        ActionExecutor, Callback, CallbackCtx, ClientDispatch, ExecutableNode, NodeContext,
65        NodeDeclError, NodeOptions, NodeResult, NodeRuntime, PublisherResolver, TickCtx,
66    },
67    node_metadata::{
68        CallbackEffectKind, CallbackId, EntityId, EntityKind, EntityMetadata, NodeId as MetaNodeId,
69    },
70};
71
72// Phase 212.N.7 closing sweep — `component_register_symbol` retired
73// (no live callers after the BSP baker + macro extern emit were
74// removed). The former re-export here is gone.
75
76// =============================================================================
77// Public types
78// =============================================================================
79
80/// Opaque handle returned by
81/// [`ExecutorNodeRuntime::register_node`].
82///
83/// `C` distinguishes handles at the type level so a caller who keeps
84/// the handle can later (post-M.5.a.3) recover a typed mut-state
85/// borrow. For today the handle is purely a witness that registration
86/// succeeded.
87pub struct RegisteredNode<C: ExecutableNode> {
88    component_idx: usize,
89    _phantom: PhantomData<fn() -> C>,
90}
91
92impl<C: ExecutableNode> RegisteredNode<C> {
93    /// Slot index of this component inside the runtime.
94    pub fn slot(&self) -> usize {
95        self.component_idx
96    }
97}
98
99/// Errors returned by the runtime entry points.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub enum ExecutorError {
102    /// One of the components' register / lifecycle calls failed.
103    Node(NodeDeclError),
104    /// The executor's spin loop returned an unexpected error.
105    SpinFailed,
106}
107
108impl From<NodeDeclError> for ExecutorError {
109    fn from(e: NodeDeclError) -> Self {
110        Self::Node(e)
111    }
112}
113
114// =============================================================================
115// Internal slot — type-erases the component's `State` so the runtime
116// can hold a heterogeneous vec.
117// =============================================================================
118
119trait ComponentSlot {
120    fn dispatch(&mut self, cb_id: &str, ctx: &mut CallbackCtx<'_>);
121    fn tick(&mut self, ctx: &mut TickCtx<'_>);
122}
123
124struct TypedSlot<C: ExecutableNode> {
125    state: C::State,
126    _phantom: PhantomData<fn() -> C>,
127}
128
129impl<C: ExecutableNode> ComponentSlot for TypedSlot<C> {
130    fn dispatch(&mut self, cb_id: &str, ctx: &mut CallbackCtx<'_>) {
131        C::on_callback(
132            &mut self.state,
133            Callback::__from_id(CallbackId::new(cb_id)),
134            ctx,
135        );
136    }
137    fn tick(&mut self, ctx: &mut TickCtx<'_>) {
138        C::tick(&mut self.state, ctx);
139    }
140}
141
142// Phase 258 (Track 2, w5) — `BspDispatchSlot` (the type-erased
143// four-fn-ptr BSP dispatch slot) is gone with the retired
144// `register_dispatch_slot` / `nros_run_components` BSP-baker path. The
145// only remaining `ComponentSlot` impl is `TypedSlot<C>` above (used by
146// `register_node` / `register_node_borrowed` / `install_node_typed`).
147
148/// Shared per-component cell. Subscription / timer closures registered
149/// against the executor hold an `Arc` clone so they can dispatch +
150/// publish back through the resolver.
151struct ComponentCell {
152    slot: RefCell<Box<dyn ComponentSlot>>,
153    publishers: RefCell<Vec<(String, EmbeddedRawPublisher)>>,
154    // Phase 212.M-F.23 — declarative service/action CLIENT + action-SERVER
155    // handles, keyed by stable entity id, resolved during tick dispatch.
156    // Mirror of the orchestration `GenClientDispatch`/`GenActionExec` arrays,
157    // but built at registration time on the single-node runtime. Service- and
158    // action-SERVER request/goal dispatch is owned by the executor (the
159    // trampolines registered in `create_entity`); only the action-server
160    // handle is kept here so the tick can complete goals / publish feedback.
161    service_clients: RefCell<Vec<(String, crate::HandleId)>>,
162    action_clients: RefCell<Vec<(String, usize)>>,
163    action_servers: RefCell<Vec<(String, crate::ActionServerRawHandle)>>,
164    callback_dispatches: AtomicUsize,
165    message_dispatches: AtomicUsize,
166}
167
168impl ComponentCell {
169    fn lookup_publisher<R>(
170        &self,
171        entity_id: &str,
172        f: impl FnOnce(&EmbeddedRawPublisher) -> R,
173    ) -> Option<R> {
174        let pubs = self.publishers.borrow();
175        pubs.iter()
176            .find(|(id, _)| id == entity_id)
177            .map(|(_, p)| f(p))
178    }
179}
180
181/// `PublisherResolver` implementation backed by a `ComponentCell`.
182struct CellResolver<'a> {
183    cell: &'a ComponentCell,
184}
185
186impl PublisherResolver for CellResolver<'_> {
187    fn publish_raw(&self, entity_id: &str, data: &[u8]) -> NodeResult<()> {
188        self.cell
189            .lookup_publisher(entity_id, |p| {
190                p.publish_raw(data).map_err(|_| NodeDeclError::Runtime)
191            })
192            .unwrap_or(Err(NodeDeclError::Runtime))
193    }
194}
195
196// Phase 212.M-F.23 — the `UnsupportedActions` / `UnsupportedClients` tick-side
197// stubs are retired. Real service/action client + action-server dispatch on the
198// single-node runtime lives in `RuntimeClientDispatch` / `RuntimeActions`
199// (below), wired into `run_ticks`.
200
201// =============================================================================
202// ExecutorNodeRuntime
203// =============================================================================
204
205/// Executor-backed component runtime.
206///
207/// Owns the [`Executor`] and one slot per registered component. The
208/// register / spin lifecycle:
209///
210/// 1. [`from_executor`](Self::from_executor) wraps an open
211///    [`Executor`].
212/// 2. [`register_node`](Self::register_node) builds the
213///    component's `State`, runs [`Node::register`] over an
214///    internal [`NodeRuntime`] adapter that materialises nodes /
215///    pubs / subs / timers on the real executor, and wires each
216///    subscription + timer callback to dispatch into
217///    [`ExecutableNode::on_callback`] with the right
218///    [`CallbackId`].
219/// 3. [`spin`](Self::spin) / [`spin_once`](Self::spin_once) drive the
220///    executor; between iterations every registered component's
221///    [`ExecutableNode::tick`] runs.
222pub struct ExecutorNodeRuntime {
223    executor: Executor,
224    components: Vec<Arc<ComponentCell>>,
225}
226
227impl ExecutorNodeRuntime {
228    /// Wrap an already-built [`Executor`].
229    pub fn from_executor(executor: Executor) -> Self {
230        Self {
231            executor,
232            components: Vec::new(),
233        }
234    }
235
236    /// Borrow the underlying executor.
237    pub fn executor(&self) -> &Executor {
238        &self.executor
239    }
240
241    /// Mutably borrow the underlying executor — for advanced wiring
242    /// (parameter services, custom guard conditions). Don't use during
243    /// [`spin`](Self::spin) from another thread; the runtime is
244    /// single-threaded.
245    pub fn executor_mut(&mut self) -> &mut Executor {
246        &mut self.executor
247    }
248
249    /// Number of registered components.
250    pub fn component_count(&self) -> usize {
251        self.components.len()
252    }
253
254    /// Register a [`Node`] (which must also be
255    /// [`ExecutableNode`]) into this runtime. Builds the
256    /// component's `State` (via [`ExecutableNode::init`]) and
257    /// walks [`Node::register`] over the live executor — every
258    /// declared node / pub / sub / timer materialises as a real
259    /// executor handle, and subscription + timer callbacks are wired
260    /// to dispatch into [`ExecutableNode::on_callback`].
261    pub fn register_node<C: ExecutableNode + 'static>(&mut self) -> NodeResult<RegisteredNode<C>>
262    where
263        C::State: 'static,
264    {
265        let cell = Arc::new(ComponentCell {
266            slot: RefCell::new(Box::new(TypedSlot::<C> {
267                state: C::init(),
268                _phantom: PhantomData,
269            })),
270            publishers: RefCell::new(Vec::new()),
271            service_clients: RefCell::new(Vec::new()),
272            action_clients: RefCell::new(Vec::new()),
273            action_servers: RefCell::new(Vec::new()),
274            callback_dispatches: AtomicUsize::new(0),
275            message_dispatches: AtomicUsize::new(0),
276        });
277        let component_idx = self.components.len();
278        self.components.push(cell.clone());
279
280        let mut sink = ExecutorSink {
281            executor: &mut self.executor,
282            cell: cell.clone(),
283            nodes: Vec::new(),
284        };
285        let sink_dyn: &mut dyn NodeRuntime = &mut sink;
286        let mut context = NodeContext::new(C::NAME, sink_dyn);
287        let result = C::register(&mut context);
288        if result.is_err() {
289            // Roll back the slot push so `component_count` stays
290            // consistent with what users observe.
291            self.components.pop();
292        }
293        result?;
294
295        Ok(RegisteredNode {
296            component_idx,
297            _phantom: PhantomData,
298        })
299    }
300
301    // Phase 258 (Track 2, w5) — `register_dispatch_slot` (the four-fn-ptr
302    // BSP registration) is gone with the retired `register_dispatch_slot_dyn`
303    // bridge + `nros_run_components`. Owned-spin / BSP entries now register
304    // through `install_node_typed` (the uniform install seam).
305
306    /// Drive one executor iteration + a `tick` per registered
307    /// component.
308    pub fn spin_once(&mut self, timeout: Duration) -> Result<(), ExecutorError> {
309        let _result = self.executor.spin_once(timeout);
310        self.run_ticks();
311        Ok(())
312    }
313
314    /// Phase 216.B.3 / C.3 follow-up — route a signaled callback to
315    /// every registered component slot.
316    ///
317    /// The RTIC (`nros-board-rtic-stm32f4`) and Embassy
318    /// (`nros-board-embassy-stm32f4`) dispatch tasks dequeue a
319    /// [`nros_platform::SignaledCallback`] envelope from their SPSC
320    /// queue / Embassy channel and need a routing entry point that
321    /// hands the callback off to the right Node's `on_callback`
322    /// trampoline. This method is that entry point.
323    ///
324    /// # Strategy — linear scan
325    ///
326    /// Each registered slot's `dispatch_fn` is the codegen-emitted
327    /// `d()` trampoline from `nros::node!()` (see
328    /// `packages/core/nros-macros/src/lib.rs`). That trampoline calls
329    /// `<NodeTy as ExecutableNode>::on_callback`, whose body
330    /// `match`es on the callback's own tag set
331    /// (`Subscription` / `Timer` / `Service` / `Action` ids) and is a
332    /// no-op for non-matching `cb_id`s. So a linear scan across every
333    /// slot is correct — each slot self-filters and at most one
334    /// component actually acts on a given `cb_id`. A focused
335    /// `cb_id → slot` index is a separate follow-up; the trampoline's
336    /// tag dispatch already gates the real work cheaply (string
337    /// compare on statically known literals), so the linear scan is
338    /// the minimum-viable wiring that closes the conceptual gap left
339    /// by the B.3 / C.3 skeleton emits.
340    ///
341    /// # Borrow semantics
342    ///
343    /// Each `ComponentCell`'s slot lives behind a [`RefCell`]; the
344    /// per-slot dispatch takes `try_borrow_mut` and is a no-op on
345    /// re-entrancy. The runtime is single-threaded by construction
346    /// (the dispatch task owns it via `&mut self`), so the borrow
347    /// always succeeds in normal flow.
348    pub fn dispatch_callback(&mut self, cb_id: &str, ctx: &mut CallbackCtx<'_>) {
349        for cell in &self.components {
350            if let Ok(mut slot) = cell.slot.try_borrow_mut() {
351                slot.dispatch(cb_id, ctx);
352            }
353        }
354    }
355
356    /// Spin until the executor's halt flag is raised. Hosted-only; on
357    /// bare-metal the BSP wraps `spin_once` in its own loop.
358    #[cfg(feature = "std")]
359    pub fn spin(&mut self) -> Result<(), ExecutorError> {
360        // 10 ms tick cadence — matches the existing executor spin
361        // budgeting (see `Executor::spin_default`); short enough that
362        // component `tick` hooks observe latency under one cycle.
363        let tick = Duration::from_millis(10);
364        while !self.executor.is_halted() {
365            let _ = self.executor.spin_once(tick);
366            self.run_ticks();
367        }
368        Ok(())
369    }
370
371    /// Halt a running [`spin`](Self::spin). Idempotent.
372    #[cfg(feature = "std")]
373    pub fn halt(&self) {
374        self.executor.halt();
375    }
376
377    fn run_ticks(&mut self) {
378        // Per-component tick — each component's resolver is its own cell.
379        // Phase 212.M-F.23: the tick reaches the executor (service-client
380        // call_raw poll, action-server complete/feedback) through a raw
381        // pointer so `&self.components` and `&mut self.executor` (disjoint
382        // fields) can be live at once.
383        let exec_ptr: *mut Executor = &mut self.executor;
384        for cell in &self.components {
385            tick_one_cell(cell.as_ref(), exec_ptr);
386        }
387    }
388}
389
390/// Phase 258 (Track 2, 2a) — drive one component cell's `tick` against the
391/// executor. The single source of truth for the per-component tick body,
392/// shared by [`ExecutorNodeRuntime::run_ticks`] (the owned-runtime path) and
393/// the executor-enrolled [`component_tick_trampoline`] (the `install` path).
394///
395/// `exec_ptr` is reached through a raw `*mut Executor` so the caller can hold
396/// the component (`&ComponentCell`) and the executor live at once — they are
397/// disjoint, and `RuntimeActions` / `RuntimeClientDispatch` reborrow `&mut`
398/// per call (see their docs).
399fn tick_one_cell(cell: &ComponentCell, exec_ptr: *mut Executor) {
400    let resolver = CellResolver { cell };
401    let service_clients = cell.service_clients.borrow();
402    let action_clients = cell.action_clients.borrow();
403    let action_servers = cell.action_servers.borrow();
404    let mut actions = RuntimeActions {
405        executor: exec_ptr,
406        handles: &action_servers,
407    };
408    let mut clients = RuntimeClientDispatch {
409        executor: exec_ptr,
410        services: &service_clients,
411        actions: &action_clients,
412    };
413    let mut ctx = TickCtx::new(&resolver, &mut actions, &mut clients);
414    if let Ok(mut slot) = cell.slot.try_borrow_mut() {
415        slot.tick(&mut ctx);
416    }
417}
418
419/// Phase 258 (Track 2, 2a) — executor `ComponentSlot.tick` trampoline. Casts
420/// the enrolled state back to the component cell + `exec_ctx` back to the
421/// executor and drives one tick. The layering-clean `extern "C"` shim the
422/// `nros-node` [`Executor`] calls each `spin_once` (it can't name `nros`'s
423/// [`ComponentCell`] — see [`register_node_borrowed`]'s enroll).
424///
425/// # Safety
426/// `state` must be the leaked `Arc<ComponentCell>` enrolled via
427/// [`Executor::enroll_component`] from [`register_node_borrowed`] (borrowed
428/// here, **not** consumed); `exec_ctx` must be the live `*mut Executor` the
429/// executor passes itself.
430unsafe extern "C" fn component_tick_trampoline(
431    state: *mut core::ffi::c_void,
432    exec_ctx: *mut core::ffi::c_void,
433) {
434    // SAFETY: `state` is a live, leaked `Arc<ComponentCell>` ptr (kept alive
435    // until `component_drop_trampoline`); borrow it without taking ownership.
436    let cell = unsafe { &*(state as *const ComponentCell) };
437    tick_one_cell(cell, exec_ctx as *mut Executor);
438}
439
440/// Phase 258 (Track 2, 2a) — executor `ComponentSlot.drop` trampoline.
441/// Reconstitutes + drops the leaked `Arc<ComponentCell>` enrolled by
442/// [`register_node_borrowed`]. Run exactly once on `Executor::drop`.
443///
444/// # Safety
445/// `state` must be the leaked `Arc<ComponentCell>` enrolled via
446/// [`Executor::enroll_component`], not yet reclaimed.
447unsafe extern "C" fn component_drop_trampoline(state: *mut core::ffi::c_void) {
448    // SAFETY: reclaim the one leaked Arc clone the enroll handed the executor.
449    drop(unsafe { Arc::from_raw(state as *const ComponentCell) });
450}
451
452// =============================================================================
453// Phase 212.N.7 step-3.3 — bridge to platform-side `NodeDispatchRuntime`.
454// =============================================================================
455//
456// `nros_platform::NodeDispatchRuntime` is the board-side sink: object-safe +
457// `no_std`. `BoardEntry::run` installs this `ExecutorNodeRuntime` impl on the
458// per-boot `RuntimeCtx::runtime` slot. The owned-spin entry reaches the live
459// executor through `executor_handle()` (a raw pointer crosses the layering wall
460// cleanly) and installs via `nros::install_node_typed`. Phase 258 (w5) retired
461// the old `register_dispatch_slot_dyn` four-fn-ptr bridge.
462
463impl ::nros_platform::NodeDispatchRuntime for ExecutorNodeRuntime {
464    fn spin_once(&mut self, timeout_ms: u32) -> Result<(), ()> {
465        Self::spin_once(self, Duration::from_millis(timeout_ms.into())).map_err(|_| ())
466    }
467
468    fn executor_handle(&mut self) -> *mut core::ffi::c_void {
469        // Phase 258 (Track 2, 2a) — hand the owned-spin entry a raw pointer to
470        // the executor this runtime owns, so a Node pkg's `register(runtime)`
471        // can install through `nros::install_node_typed` (same seam as the
472        // C/C++ typed entries). The pointer is valid for the runtime's life
473        // (the executor is an inline field); the install call uses it only
474        // during registration, before any concurrent spin.
475        &mut self.executor as *mut Executor as *mut core::ffi::c_void
476    }
477
478    fn observed_callback_counts(&self) -> (usize, usize) {
479        self.components
480            .iter()
481            .fold((0, 0), |(callbacks, messages), cell| {
482                (
483                    callbacks + cell.callback_dispatches.load(Ordering::Relaxed),
484                    messages + cell.message_dispatches.load(Ordering::Relaxed),
485                )
486            })
487    }
488}
489
490// =============================================================================
491// Internal sink — bridges `NodeRuntime` declarations onto the
492// live executor.
493// =============================================================================
494
495struct ExecutorSink<'a> {
496    executor: &'a mut Executor,
497    cell: Arc<ComponentCell>,
498    /// Per-registration node mapping: stable id → executor `NodeId`.
499    nodes: Vec<(String, nros_node::executor::NodeId)>,
500}
501
502impl ExecutorSink<'_> {
503    fn lookup_node(&self, stable_id: &str) -> Option<nros_node::executor::NodeId> {
504        self.nodes
505            .iter()
506            .find(|(id, _)| id == stable_id)
507            .map(|(_, n)| *n)
508    }
509}
510
511impl NodeRuntime for ExecutorSink<'_> {
512    fn create_node(&mut self, id: MetaNodeId<'_>, options: NodeOptions<'_>) -> NodeResult<()> {
513        if self.nodes.iter().any(|(s, _)| s.as_str() == id.as_str()) {
514            return Err(NodeDeclError::Runtime);
515        }
516        let node_id = self
517            .executor
518            .node_builder(options.name)
519            .namespace(options.namespace)
520            .domain_id(options.domain_id)
521            .build()
522            .map_err(|_| NodeDeclError::Runtime)?;
523        self.nodes.push((String::from(id.as_str()), node_id));
524        Ok(())
525    }
526
527    fn create_entity(&mut self, metadata: EntityMetadata) -> NodeResult<()> {
528        // Phase 228.C tier gate: when this executor runs a specific tier
529        // (`active_groups` set by codegen), an entity whose callback group
530        // is not active on this tier is a no-op — no RMW handle, no slot.
531        // An unlabeled entity (`callback_group == None`) is wildcard-eligible
532        // and always registers; the degenerate single-tier executor leaves
533        // `active_groups == None`, so every entity registers (byte-identical
534        // to pre-228 output).
535        if let Some(group) = metadata.callback_group.as_ref()
536            && !self.executor.group_active(group.as_str())
537        {
538            return Ok(());
539        }
540        let node = self
541            .lookup_node(metadata.node_id.as_str())
542            .ok_or(NodeDeclError::Runtime)?;
543        match metadata.kind {
544            EntityKind::Publisher => {
545                let handle = self
546                    .executor
547                    .node_mut(node)
548                    .create_generic_publisher(
549                        metadata.source_name.as_str(),
550                        metadata.type_name,
551                        metadata.type_hash,
552                    )
553                    .map_err(|_| NodeDeclError::Runtime)?;
554                let id_owned = String::from(metadata.id.as_str());
555                self.cell.publishers.borrow_mut().push((id_owned, handle));
556                Ok(())
557            }
558            EntityKind::Subscription => {
559                let cb_id = metadata
560                    .callback_id
561                    .as_ref()
562                    .ok_or(NodeDeclError::Runtime)?;
563                let cb_id_owned = String::from(cb_id.as_str());
564                let cell = self.cell.clone();
565                // Phase 250 (Wave 2b) — a `.safety()` subscription registers via
566                // the integrity-aware generic path so `CallbackCtx::integrity()`
567                // surfaces CRC + sequence gap/dup. Gated: when `safety-e2e` is off
568                // the flag is ignored and the basic path below runs.
569                #[cfg(feature = "safety-e2e")]
570                if metadata.safety {
571                    let cell_s = self.cell.clone();
572                    let cb_s = cb_id_owned.clone();
573                    self.executor
574                        .node_mut(node)
575                        .create_generic_subscription_with_integrity(
576                            metadata.source_name.as_str(),
577                            metadata.type_name,
578                            metadata.type_hash,
579                            move |payload: &[u8], status: &nros_node::IntegrityStatus| {
580                                dispatch_into_cell_with_integrity(&cell_s, &cb_s, payload, status);
581                            },
582                        )
583                        .map_err(|_| NodeDeclError::Runtime)?;
584                    return Ok(());
585                }
586                self.executor
587                    .node_mut(node)
588                    .create_generic_subscription(
589                        metadata.source_name.as_str(),
590                        metadata.type_name,
591                        metadata.type_hash,
592                        move |payload: &[u8]| {
593                            dispatch_into_cell(&cell, &cb_id_owned, payload);
594                        },
595                    )
596                    .map_err(|_| NodeDeclError::Runtime)?;
597                Ok(())
598            }
599            EntityKind::Timer => {
600                let cb_id = metadata
601                    .callback_id
602                    .as_ref()
603                    .ok_or(NodeDeclError::Runtime)?;
604                let cb_id_owned = String::from(cb_id.as_str());
605                let period_ms = metadata.period_ms.ok_or(NodeDeclError::Runtime)?;
606                let cell = self.cell.clone();
607                self.executor
608                    .register_timer(
609                        nros_node::TimerDuration::from_millis(period_ms),
610                        move || {
611                            dispatch_into_cell(&cell, &cb_id_owned, &[]);
612                        },
613                    )
614                    .map_err(|_| NodeDeclError::Runtime)?;
615                Ok(())
616            }
617            // Phase 212.M-F.23 — service / action client + server dispatch on
618            // the single-node runtime. The executor-level `register_*_on`
619            // calls add an arena dispatch entry, so inbound requests / goals
620            // are serviced inside `spin_once`; the leaked `*Ctx` trampoline
621            // contexts bridge back into the component's `on_callback`. Client
622            // handles are stashed in the cell for the tick-side dispatch
623            // (`RuntimeClientDispatch` / `RuntimeActions` in `run_ticks`).
624            EntityKind::ServiceServer => {
625                let cb_id = metadata
626                    .callback_id
627                    .as_ref()
628                    .ok_or(NodeDeclError::Runtime)?;
629                let ctx = Box::into_raw(Box::new(ServiceServerCtx {
630                    cell: self.cell.clone(),
631                    callback_id: String::from(cb_id.as_str()),
632                })) as *mut core::ffi::c_void;
633                self.executor
634                    .register_service_raw_sized_on::<1024, 1024>(
635                        node,
636                        metadata.source_name.as_str(),
637                        metadata.type_name,
638                        metadata.type_hash,
639                        crate::QosSettings::services_default(),
640                        service_server_trampoline,
641                        ctx,
642                    )
643                    .map_err(|_| NodeDeclError::Runtime)?;
644                Ok(())
645            }
646            EntityKind::ServiceClient => {
647                let hid = self
648                    .executor
649                    .register_service_client_raw_sized_on::<1024>(
650                        node,
651                        metadata.source_name.as_str(),
652                        metadata.type_name,
653                        metadata.type_hash,
654                        crate::QosSettings::services_default(),
655                        None,
656                        core::ptr::null_mut(),
657                    )
658                    .map_err(|_| NodeDeclError::Runtime)?;
659                self.cell
660                    .service_clients
661                    .borrow_mut()
662                    .push((String::from(metadata.id.as_str()), hid));
663                Ok(())
664            }
665            EntityKind::ActionServer => {
666                let goal_cb = metadata
667                    .callback_id
668                    .as_ref()
669                    .ok_or(NodeDeclError::Runtime)?;
670                let cancel_cb = metadata
671                    .action_cancel_callback_id
672                    .as_ref()
673                    .ok_or(NodeDeclError::Runtime)?;
674                let accepted_cb = metadata
675                    .action_accepted_callback_id
676                    .as_ref()
677                    .map(|c| String::from(c.as_str()));
678                let ctx = Box::into_raw(Box::new(ActionServerCtx {
679                    cell: self.cell.clone(),
680                    goal_callback_id: String::from(goal_cb.as_str()),
681                    cancel_callback_id: String::from(cancel_cb.as_str()),
682                    accepted_callback_id: accepted_cb,
683                })) as *mut core::ffi::c_void;
684                let handle = self
685                    .executor
686                    .register_action_server_raw_sized::<1024, 1024, 1024, 4>(
687                        crate::RawActionServerSpec {
688                            node_id: Some(node),
689                            action_name: metadata.source_name.as_str(),
690                            type_name: metadata.type_name,
691                            type_hash: metadata.type_hash,
692                            qos: crate::QosSettings::services_default(),
693                            goal_callback: action_goal_trampoline,
694                            cancel_callback: action_cancel_trampoline,
695                            accepted_callback: Some(action_accepted_trampoline),
696                            context: ctx,
697                        },
698                    )
699                    .map_err(|_| NodeDeclError::Runtime)?;
700                self.cell
701                    .action_servers
702                    .borrow_mut()
703                    .push((String::from(metadata.id.as_str()), handle));
704                Ok(())
705            }
706            EntityKind::ActionClient => {
707                // A bound `callback_id` (set by
708                // `create_action_client_with_callbacks_for_name`) delivers the
709                // terminal goal result to the component via `on_callback`; the
710                // optional `action_accepted_callback_id` slot carries the
711                // feedback callback (reused — unused on a client). The executor
712                // auto-drives accept → feedback → result during spin and invokes
713                // these trampolines. No callbacks → send-goal only.
714                let (result_callback, feedback_callback, ctx) = match metadata.callback_id.as_ref()
715                {
716                    Some(result_cb) => {
717                        let feedback_cb = metadata
718                            .action_accepted_callback_id
719                            .as_ref()
720                            .map(|c| String::from(c.as_str()));
721                        let ctx = Box::into_raw(Box::new(ActionClientCtx {
722                            cell: self.cell.clone(),
723                            result_callback_id: String::from(result_cb.as_str()),
724                            feedback_callback_id: feedback_cb.clone(),
725                        })) as *mut core::ffi::c_void;
726                        let fb = feedback_cb.map(|_| action_feedback_trampoline as _);
727                        (Some(action_result_trampoline as _), fb, ctx)
728                    }
729                    None => (None, None, core::ptr::null_mut()),
730                };
731                let handle = self
732                    .executor
733                    .register_action_client_raw_sized::<1024, 1024, 1024>(
734                        crate::RawActionClientSpec {
735                            node_id: Some(node),
736                            action_name: metadata.source_name.as_str(),
737                            type_name: metadata.type_name,
738                            type_hash: metadata.type_hash,
739                            goal_response_callback: None,
740                            feedback_callback,
741                            result_callback,
742                            context: ctx,
743                        },
744                    )
745                    .map_err(|_| NodeDeclError::Runtime)?;
746                self.cell
747                    .action_clients
748                    .borrow_mut()
749                    .push((String::from(metadata.id.as_str()), handle.entry_index()));
750                Ok(())
751            }
752            EntityKind::Parameter => {
753                // Phase 212.M-F.23 Wave 2 — declarative parameter dispatch on
754                // the single-node runtime. The first declared parameter lazily
755                // stands up the 6 ROS 2 parameter services for this executor's
756                // node; `spin_once` drives those service servers thereafter
757                // (`#[cfg(param-services)]` block at spin.rs). The declared
758                // source default seeds the value. With `param-services` off the
759                // arm is a no-op (entity declared, no RMW handle) — byte-
760                // identical to the pre-Wave-2 behavior.
761                #[cfg(feature = "param-services")]
762                {
763                    if self.executor.params().is_none() {
764                        self.executor
765                            .register_parameter_services()
766                            .map_err(|_| NodeDeclError::Runtime)?;
767                    }
768                    let value = param_default_to_value(metadata.parameter_default.as_ref());
769                    self.executor
770                        .declare_parameter(metadata.source_name.as_str(), value);
771                }
772                Ok(())
773            }
774        }
775    }
776
777    fn record_callback_effect(
778        &mut self,
779        _callback_id: CallbackId<'_>,
780        _kind: CallbackEffectKind,
781        _entity_id: EntityId<'_>,
782    ) -> NodeResult<()> {
783        // Planner concern only — the live runtime doesn't need the
784        // effect graph at spin time.
785        Ok(())
786    }
787}
788
789/// Lower a source-recorded [`ParameterDefault`] into the executor-facing
790/// [`nros_params::ParameterValue`] used to seed a declared parameter. Scalar
791/// defaults carry their value directly; the array variants record only the
792/// declared type (no element data at the source layer) so they seed as
793/// `NotSet` — the parameter is still declared, just without a concrete array
794/// default. A `Double` default is stored as a string at the metadata layer and
795/// parsed here (unparseable → `0.0`).
796#[cfg(feature = "param-services")]
797fn param_default_to_value(
798    default: Option<&crate::node_metadata::ParameterDefault>,
799) -> nros_params::ParameterValue {
800    use crate::node_metadata::ParameterDefault;
801    use nros_params::ParameterValue;
802    match default {
803        None => ParameterValue::NotSet,
804        Some(ParameterDefault::Bool(b)) => ParameterValue::Bool(*b),
805        Some(ParameterDefault::Integer(i)) => ParameterValue::Integer(*i),
806        Some(ParameterDefault::Double(s)) => {
807            ParameterValue::Double(s.as_str().parse::<f64>().unwrap_or(0.0))
808        }
809        Some(ParameterDefault::String(s)) => {
810            ParameterValue::from_string(s.as_str()).unwrap_or(ParameterValue::NotSet)
811        }
812        Some(
813            ParameterDefault::BoolArray
814            | ParameterDefault::IntegerArray
815            | ParameterDefault::DoubleArray
816            | ParameterDefault::StringArray,
817        ) => ParameterValue::NotSet,
818    }
819}
820
821fn dispatch_into_cell(cell: &Arc<ComponentCell>, cb_id: &str, payload: &[u8]) {
822    cell.callback_dispatches.fetch_add(1, Ordering::Relaxed);
823    if !payload.is_empty() {
824        cell.message_dispatches.fetch_add(1, Ordering::Relaxed);
825    }
826    let resolver = CellResolver {
827        cell: cell.as_ref(),
828    };
829    let mut ctx = CallbackCtx::new(payload, &resolver);
830    // If the slot is already borrowed (a re-entrant publish from a
831    // tick hook on the same cell, etc.) we drop this dispatch. In
832    // practice `try_borrow_mut` succeeds because subscription / timer
833    // callbacks run sequentially under the single-threaded executor.
834    if let Ok(mut slot) = cell.slot.try_borrow_mut() {
835        slot.dispatch(cb_id, &mut ctx);
836    }
837}
838
839/// Phase 250 (Wave 2b) — dispatch a `.safety()` subscription message into the
840/// component's `on_callback` with its E2E [`IntegrityStatus`] attached, read via
841/// `CallbackCtx::integrity()`. The integrity-aware twin of [`dispatch_into_cell`].
842#[cfg(feature = "safety-e2e")]
843fn dispatch_into_cell_with_integrity(
844    cell: &Arc<ComponentCell>,
845    cb_id: &str,
846    payload: &[u8],
847    status: &nros_node::IntegrityStatus,
848) {
849    cell.callback_dispatches.fetch_add(1, Ordering::Relaxed);
850    if !payload.is_empty() {
851        cell.message_dispatches.fetch_add(1, Ordering::Relaxed);
852    }
853    let resolver = CellResolver {
854        cell: cell.as_ref(),
855    };
856    let mut ctx = CallbackCtx::new_with_integrity(payload, &resolver, status);
857    if let Ok(mut slot) = cell.slot.try_borrow_mut() {
858        slot.dispatch(cb_id, &mut ctx);
859    }
860}
861
862// =============================================================================
863// Phase 212.M-F.23 — service / action SERVER trampolines + tick-side client /
864// action dispatch.
865//
866// The executor's raw service/action-server registration takes C-ABI fn
867// pointers, so the runtime leaks a `*Ctx` (lives for the runtime's lifetime,
868// like the executor) holding the owning `ComponentCell` + the declared
869// callback ids. Each trampoline rebuilds a `CallbackCtx` and routes into the
870// component's `on_callback`, exactly as the orchestration codegen's
871// `svc_tramp_*` / `goal_tramp_*` do for the Entry path.
872// =============================================================================
873
874/// Leaked context for a service-server arena callback.
875struct ServiceServerCtx {
876    cell: Arc<ComponentCell>,
877    callback_id: String,
878}
879
880/// Leaked context for an action-server arena callback (goal + cancel + the
881/// optional accepted hook all share one).
882struct ActionServerCtx {
883    cell: Arc<ComponentCell>,
884    goal_callback_id: String,
885    cancel_callback_id: String,
886    accepted_callback_id: Option<String>,
887}
888
889/// Leaked context for an action-CLIENT result + feedback callbacks.
890struct ActionClientCtx {
891    cell: Arc<ComponentCell>,
892    result_callback_id: String,
893    feedback_callback_id: Option<String>,
894}
895
896/// Action-client result callback: the executor's spin auto-drives the goal to
897/// completion and hands the terminal result CDR here; route it into the
898/// component's `on_callback` (read with `CallbackCtx::message`).
899unsafe extern "C" fn action_result_trampoline(
900    _goal_id: *const GoalId,
901    _status: GoalStatus,
902    result_data: *const u8,
903    result_len: usize,
904    ctx: *mut core::ffi::c_void,
905) {
906    let actx = unsafe { &*(ctx as *const ActionClientCtx) };
907    let result_slice = unsafe { core::slice::from_raw_parts(result_data, result_len) };
908    dispatch_into_cell(&actx.cell, &actx.result_callback_id, result_slice);
909}
910
911/// Action-client feedback callback: route each feedback CDR into the
912/// component's `on_callback` under the bound feedback callback id.
913unsafe extern "C" fn action_feedback_trampoline(
914    _goal_id: *const GoalId,
915    feedback_data: *const u8,
916    feedback_len: usize,
917    ctx: *mut core::ffi::c_void,
918) {
919    let actx = unsafe { &*(ctx as *const ActionClientCtx) };
920    let Some(cb_id) = actx.feedback_callback_id.as_ref() else {
921        return;
922    };
923    let feedback_slice = unsafe { core::slice::from_raw_parts(feedback_data, feedback_len) };
924    dispatch_into_cell(&actx.cell, cb_id, feedback_slice);
925}
926
927/// Service-server request callback: deserialize-side runs in the component's
928/// `on_callback` via `CallbackCtx::with_reply`; the executor sends the reply
929/// from the bytes written into `resp`.
930unsafe extern "C" fn service_server_trampoline(
931    req: *const u8,
932    req_len: usize,
933    resp: *mut u8,
934    resp_cap: usize,
935    resp_len: *mut usize,
936    ctx: *mut core::ffi::c_void,
937) -> bool {
938    let sctx = unsafe { &*(ctx as *const ServiceServerCtx) };
939    let req_slice = unsafe { core::slice::from_raw_parts(req, req_len) };
940    let resp_slice = unsafe { core::slice::from_raw_parts_mut(resp, resp_cap) };
941    let mut written = 0usize;
942    let resolver = CellResolver {
943        cell: sctx.cell.as_ref(),
944    };
945    let mut cb = CallbackCtx::with_reply(req_slice, &resolver, resp_slice, &mut written);
946    if let Ok(mut slot) = sctx.cell.slot.try_borrow_mut() {
947        slot.dispatch(&sctx.callback_id, &mut cb);
948    }
949    unsafe { *resp_len = written };
950    true
951}
952
953/// Action-server goal callback → component `on_callback` with a goal decision.
954unsafe extern "C" fn action_goal_trampoline(
955    _goal_id: *const GoalId,
956    goal_data: *const u8,
957    goal_len: usize,
958    ctx: *mut core::ffi::c_void,
959) -> crate::GoalResponse {
960    let actx = unsafe { &*(ctx as *const ActionServerCtx) };
961    let goal_slice = unsafe { core::slice::from_raw_parts(goal_data, goal_len) };
962    let mut resp = crate::GoalResponse::Reject;
963    let resolver = CellResolver {
964        cell: actx.cell.as_ref(),
965    };
966    let mut cb = CallbackCtx::with_goal_decision(goal_slice, &resolver, &mut resp);
967    if let Ok(mut slot) = actx.cell.slot.try_borrow_mut() {
968        slot.dispatch(&actx.goal_callback_id, &mut cb);
969    }
970    resp
971}
972
973/// Action-server cancel callback → component `on_callback` with a cancel
974/// decision. The cancel callback has no goal payload.
975unsafe extern "C" fn action_cancel_trampoline(
976    _goal_id: *const GoalId,
977    _status: GoalStatus,
978    ctx: *mut core::ffi::c_void,
979) -> crate::CancelResponse {
980    let actx = unsafe { &*(ctx as *const ActionServerCtx) };
981    let mut resp = crate::CancelResponse::Rejected;
982    let resolver = CellResolver {
983        cell: actx.cell.as_ref(),
984    };
985    let mut cb = CallbackCtx::with_cancel_decision(&[], &resolver, &mut resp);
986    if let Ok(mut slot) = actx.cell.slot.try_borrow_mut() {
987        slot.dispatch(&actx.cancel_callback_id, &mut cb);
988    }
989    resp
990}
991
992/// Action-server accepted hook → component `on_callback` (no decision, no
993/// payload). No-op when the component didn't declare an accepted callback.
994unsafe extern "C" fn action_accepted_trampoline(
995    _goal_id: *const GoalId,
996    ctx: *mut core::ffi::c_void,
997) {
998    let actx = unsafe { &*(ctx as *const ActionServerCtx) };
999    let Some(cb_id) = actx.accepted_callback_id.as_ref() else {
1000        return;
1001    };
1002    dispatch_into_cell(&actx.cell, cb_id, &[]);
1003}
1004
1005/// Tick-side service/action CLIENT dispatch — the single-node runtime's mirror
1006/// of the orchestration `GenClientDispatch`. Resolves the per-component client
1007/// handle arrays + a `*mut Executor` (the tick borrows `&components` while
1008/// needing `&mut executor`, so the executor is reached through a raw pointer,
1009/// reborrowed `&mut` per call; no aliasing — `executor` and `components` are
1010/// disjoint fields).
1011struct RuntimeClientDispatch<'a> {
1012    executor: *mut Executor,
1013    services: &'a [(String, crate::HandleId)],
1014    actions: &'a [(String, usize)],
1015}
1016
1017impl RuntimeClientDispatch<'_> {
1018    fn service(&self, entity: &str) -> NodeResult<crate::HandleId> {
1019        self.services
1020            .iter()
1021            .find(|(e, _)| e == entity)
1022            .map(|(_, h)| *h)
1023            .ok_or(NodeDeclError::Runtime)
1024    }
1025
1026    fn action_entry(&self, entity: &str) -> NodeResult<usize> {
1027        self.actions
1028            .iter()
1029            .find(|(e, _)| e == entity)
1030            .map(|(_, i)| *i)
1031            .ok_or(NodeDeclError::Runtime)
1032    }
1033}
1034
1035impl ClientDispatch for RuntimeClientDispatch<'_> {
1036    fn call_raw(
1037        &mut self,
1038        service_entity: &str,
1039        request_cdr: &[u8],
1040        response_buf: &mut [u8],
1041    ) -> NodeResult<usize> {
1042        use crate::ServiceClientTrait;
1043        let hid = self.service(service_entity)?;
1044        {
1045            let executor = unsafe { &mut *self.executor };
1046            let entry = unsafe { executor.service_client_entry_mut(hid.0) }
1047                .ok_or(NodeDeclError::Runtime)?;
1048            entry
1049                .handle
1050                .send_request_raw(request_cdr)
1051                .map_err(|_| NodeDeclError::Runtime)?;
1052        }
1053        // Bounded wait — caps total time so the tick loop stays responsive.
1054        for _ in 0..200 {
1055            let executor = unsafe { &mut *self.executor };
1056            executor.spin_once(core::time::Duration::from_millis(10));
1057            let entry = unsafe { executor.service_client_entry_mut(hid.0) }
1058                .ok_or(NodeDeclError::Runtime)?;
1059            match entry.handle.try_recv_reply_raw(response_buf) {
1060                Ok(Some(len)) => return Ok(len),
1061                Ok(None) => continue,
1062                Err(_) => return Err(NodeDeclError::Runtime),
1063            }
1064        }
1065        Err(NodeDeclError::Runtime)
1066    }
1067
1068    fn send_goal_raw(&mut self, action_entity: &str, goal_cdr: &[u8]) -> NodeResult<GoalId> {
1069        let entry_index = self.action_entry(action_entity)?;
1070        let executor = unsafe { &mut *self.executor };
1071        let core = unsafe { executor.action_client_core_mut(entry_index) }
1072            .ok_or(NodeDeclError::Runtime)?;
1073        let goal_id = core
1074            .send_goal_raw(goal_cdr)
1075            .map_err(|_| NodeDeclError::Runtime)?;
1076        // rclcpp-style: request the result immediately. The server queues the
1077        // get_result request until the goal terminates, then replies — the
1078        // executor's spin auto-delivers it to the bound result callback (the
1079        // executor never auto-sends this request, so the declarative client
1080        // must). Best-effort: a transport hiccup just means no result callback.
1081        let _ = core.send_get_result_request(&goal_id);
1082        Ok(goal_id)
1083    }
1084}
1085
1086/// Tick-side action-SERVER execution — mirror of `GenActionExec`. Lets a
1087/// component complete goals / publish feedback / enumerate active goals from
1088/// its `tick` via `TickCtx`.
1089struct RuntimeActions<'a> {
1090    executor: *mut Executor,
1091    handles: &'a [(String, crate::ActionServerRawHandle)],
1092}
1093
1094impl RuntimeActions<'_> {
1095    fn handle(&self, entity: &str) -> NodeResult<crate::ActionServerRawHandle> {
1096        self.handles
1097            .iter()
1098            .find(|(e, _)| e == entity)
1099            .map(|(_, h)| *h)
1100            .ok_or(NodeDeclError::Runtime)
1101    }
1102}
1103
1104impl ActionExecutor for RuntimeActions<'_> {
1105    fn complete_goal_raw(
1106        &mut self,
1107        action_entity: &str,
1108        goal_id: &GoalId,
1109        status: GoalStatus,
1110        result: &[u8],
1111    ) -> NodeResult<()> {
1112        let handle = self.handle(action_entity)?;
1113        let executor = unsafe { &mut *self.executor };
1114        handle.complete_goal_raw(executor, goal_id, status, result);
1115        Ok(())
1116    }
1117
1118    fn publish_feedback_raw(
1119        &mut self,
1120        action_entity: &str,
1121        goal_id: &GoalId,
1122        feedback: &[u8],
1123    ) -> NodeResult<()> {
1124        let handle = self.handle(action_entity)?;
1125        let executor = unsafe { &mut *self.executor };
1126        handle
1127            .publish_feedback_raw(executor, goal_id, feedback)
1128            .map_err(|_| NodeDeclError::Runtime)
1129    }
1130
1131    fn for_each_active_goal(
1132        &self,
1133        action_entity: &str,
1134        visit: &mut dyn FnMut(&GoalId, GoalStatus),
1135    ) {
1136        if let Ok(handle) = self.handle(action_entity) {
1137            let executor = unsafe { &*self.executor };
1138            handle.for_each_active_goal(executor, |g| visit(&g.goal_id, g.status));
1139        }
1140    }
1141}
1142
1143// Phase 258 (Track 2, w5) — the typed BSP fn-ptr aliases (`NodeRegisterFn` /
1144// `NodeInitFn` / `NodeDispatchFn` / `NodeTickFn`) are gone with the retired
1145// `register_dispatch_slot` / `nros_run_components` BSP-baker path. The
1146// macro-emitted `register(runtime)` wrapper now installs via the
1147// `install_node_typed` seam (Track 2 w4).
1148
1149/// Phase 257 (W0-B) — register an [`ExecutableNode`] `C` against a **borrowed**
1150/// executor (the shared cffi `Executor` a foreign-language typed entry hands in via
1151/// its `nros::global_handle()` / `Node::executor_handle()`), returning the live
1152/// [`ComponentCell`].
1153///
1154/// Unlike [`ExecutorNodeRuntime::register_node`] this owns neither the executor nor a
1155/// components list: the executor's per-entity callbacks hold `Arc<ComponentCell>`
1156/// clones (see [`ExecutorSink`]), so the cell stays alive for the executor's lifetime
1157/// via dispatch alone — the caller may drop the returned cell (pub/sub/timer nodes, the
1158/// W0-B target) or stash it to drive `tick` (service-client/action nodes; phase-257 D2).
1159/// The node self-creates its node (its `Node::NAME`) on the shared executor (phase-257
1160/// D7 Option C — Rust nodes in a foreign entry self-name, no entry-side qos-override).
1161fn register_node_borrowed<C: ExecutableNode + 'static>(
1162    executor: &mut Executor,
1163) -> NodeResult<Arc<ComponentCell>>
1164where
1165    C::State: 'static,
1166{
1167    let cell = Arc::new(ComponentCell {
1168        slot: RefCell::new(Box::new(TypedSlot::<C> {
1169            state: C::init(),
1170            _phantom: PhantomData,
1171        })),
1172        publishers: RefCell::new(Vec::new()),
1173        service_clients: RefCell::new(Vec::new()),
1174        action_clients: RefCell::new(Vec::new()),
1175        action_servers: RefCell::new(Vec::new()),
1176        callback_dispatches: AtomicUsize::new(0),
1177        message_dispatches: AtomicUsize::new(0),
1178    });
1179    let mut sink = ExecutorSink {
1180        // Reborrow so `executor` stays usable for `enroll_component` after the
1181        // sink (which holds `&mut Executor`) is dropped below.
1182        executor: &mut *executor,
1183        cell: cell.clone(),
1184        nodes: Vec::new(),
1185    };
1186    let sink_dyn: &mut dyn NodeRuntime = &mut sink;
1187    let mut context = NodeContext::new(C::NAME, sink_dyn);
1188    C::register(&mut context)?;
1189
1190    // Phase 258 (Track 2, 2a) — enroll the cell in the executor's component
1191    // tick registry so `install`'d nodes tick (closes phase-257 D2: poll-only
1192    // service-client/action nodes have no callbacks keeping the cell alive AND
1193    // never ran `tick`). Leak one `Arc` clone for the executor to own; it runs
1194    // `tick` each `spin_once` and drops the clone on `Executor::drop`. Harmless
1195    // for pub/sub/timer-only nodes — their tick body is a no-op. Registry-full
1196    // (`MAX_NODES`) reclaims the clone and proceeds without a tick slot.
1197    let raw = Arc::into_raw(cell.clone()) as *mut core::ffi::c_void;
1198    // SAFETY: `raw` is a freshly-leaked `Arc<ComponentCell>`; the trampolines
1199    // match its provenance (borrow on tick, reclaim on drop).
1200    if unsafe {
1201        executor.enroll_component(raw, component_tick_trampoline, component_drop_trampoline)
1202    }
1203    .is_err()
1204    {
1205        // SAFETY: enroll rejected the slot, so the executor never took `raw`;
1206        // reclaim the leaked clone here to avoid a permanent leak.
1207        drop(unsafe { Arc::from_raw(raw as *const ComponentCell) });
1208    }
1209
1210    Ok(cell)
1211}
1212
1213/// Phase 257 (W0-B) — C-ABI typed component install. Recovers the shared `Executor`
1214/// from the foreign typed entry's handle (`global_handle()` / `Node::executor_handle()`
1215/// = the `_opaque` `*mut Executor`; cf. nros-c `get_executor_from_ptr`) and registers
1216/// `C` on it via [`register_node_borrowed`]. The component's `ComponentCell` is kept
1217/// alive by the executor's own callback `Arc` clones (phase-257 D1), so this drops the
1218/// returned cell. Returns `0` on success, `-1` on a null handle or a registration error.
1219///
1220/// This backs the `__nros_component_<pkg>_install(node, executor, self)` symbol
1221/// `nros::node!()` emits — the uniform cross-language install seam (phase-257 D6).
1222///
1223/// # Safety
1224/// `executor` must be the live `*mut Executor` handle a typed entry passes (its
1225/// `nros::global_handle()` / a node's `executor_handle()`), valid for the call.
1226pub unsafe fn install_node_typed<C: ExecutableNode + 'static>(
1227    executor: *mut core::ffi::c_void,
1228) -> i32
1229where
1230    C::State: 'static,
1231{
1232    if executor.is_null() {
1233        return -1;
1234    }
1235    // SAFETY: per the fn contract, `executor` is the live `*mut Executor` handle.
1236    let exec: &mut Executor = unsafe { &mut *(executor as *mut Executor) };
1237    match register_node_borrowed::<C>(exec) {
1238        Ok(_cell) => 0,
1239        Err(_) => -1,
1240    }
1241}
1242
1243// Phase 258 (Track 2, w5) — `nros_run_components` (the BSP shim that registered
1244// every component via the four-fn-ptr `register_dispatch_slot` then spun) is
1245// gone. It had no callers; owned-spin / BSP entries register through the
1246// `install_node_typed` seam + drive `ExecutorNodeRuntime::spin`.
1247
1248// =============================================================================
1249// Tests
1250// =============================================================================
1251//
1252// Concrete `Executor` construction needs a real RMW backend session
1253// (with `rmw-cffi` on, `Executor::from_session` takes the cffi
1254// session). MockSession only exists when `rmw-cffi` is off — so the
1255// unit tests that exercise live timer firing live in
1256// `packages/testing/nros-tests/tests/phase212_m5a2_component_runtime.rs`
1257// gated behind the `component-runtime-test` feature (pulls
1258// `nros-rmw-zenoh`). The compile-only smoke here verifies the public
1259// types are reachable through the umbrella surface.
1260
1261#[cfg(test)]
1262mod tests {
1263    use super::*;
1264    use crate::node::Node;
1265
1266    #[test]
1267    fn handle_slot_is_observable() {
1268        // Trivial smoke — the handle type carries the slot index.
1269        let h = RegisteredNode::<DummyComp> {
1270            component_idx: 7,
1271            _phantom: PhantomData,
1272        };
1273        assert_eq!(h.slot(), 7);
1274    }
1275
1276    struct DummyComp;
1277    impl Node for DummyComp {
1278        const NAME: &'static str = "dummy";
1279        fn register(_ctx: &mut NodeContext<'_>) -> NodeResult<()> {
1280            Ok(())
1281        }
1282    }
1283    impl ExecutableNode for DummyComp {
1284        type State = ();
1285        fn init() -> Self::State {}
1286        fn on_callback(_s: &mut (), _cb: Callback<'_>, _ctx: &mut CallbackCtx<'_>) {}
1287    }
1288}