Skip to main content

nros_node/executor/
node_record.rs

1//! Phase 104.C.2 — multi-Node-per-Executor storage.
2//!
3//! Mirrors the `rclcpp` pattern where a single `Executor` holds N
4//! `Node`s via `add_node(...)`. Each Node carries its own
5//! name/namespace + a reference to the Session that backs it +
6//! a default `SchedContext` (Phase 110) handles inherit unless
7//! overridden.
8//!
9//! For Phase 104.C.2 we land the *storage scaffold* + the builder
10//! API. Multi-Session-per-Executor dispatch is a follow-up
11//! (Phase 104.C.3) — today every Node in this list resolves to the
12//! Executor's primary session, which means `node_builder.rmw(name)`
13//! only accepts the same backend the Executor was opened against.
14//! Bridge use cases (two RMW backends concurrent in one Executor)
15//! light up when 104.C.3 adds the per-Node session ref.
16
17use super::{sched_context::SchedContextId, types::NodeError};
18
19/// Opaque handle returned by `Executor::node_builder(...).build()`.
20/// Used in 104.C.3+ to disambiguate handle ownership when multiple
21/// Nodes coexist in one Executor.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct NodeId(pub(crate) u8);
24
25impl NodeId {
26    /// Reserved id for the implicit "primary" Node that mirrors the
27    /// pre-Phase 104.C.2 single-Node Executor identity.
28    pub const PRIMARY: NodeId = NodeId(0);
29
30    /// Numeric index into the Executor's node table.
31    pub fn index(self) -> usize {
32        self.0 as usize
33    }
34
35    /// Phase 104.C.8.b / C.9.b — build a `NodeId` from a raw `u8` for
36    /// FFI consumers that store the index in their own struct
37    /// (`nros_node_t.node_id`, `nros_cpp_node_t.node_id`). The value
38    /// is not validated against the executor's `nodes` table — the
39    /// caller is responsible for only constructing ids that
40    /// `node_builder(...).build()` previously returned. Out-of-range
41    /// ids fail loudly at the next `with_node` / `_on(...)` call.
42    pub const fn from_raw(raw: u8) -> NodeId {
43        NodeId(raw)
44    }
45
46    /// Raw u8 form for FFI persistence.
47    pub const fn raw(self) -> u8 {
48        self.0
49    }
50}
51
52/// Per-Node metadata stored inside the Executor.
53///
54/// Phase 104.C.2 keeps the shape minimal — name, namespace,
55/// default SchedContext, optional rmw-name for diagnostics. Future
56/// items: per-Node session reference (104.C.3), per-Node liveliness
57/// state, per-Node parameter overrides.
58pub struct NodeRecord {
59    pub name: heapless::String<64>,
60    pub namespace: heapless::String<64>,
61    /// RMW backend the Node was created against. `None` for the
62    /// implicit primary Node populated from `Executor::open`.
63    pub rmw_name: Option<heapless::String<32>>,
64    /// Per-Node locator override. `None` = use the Executor's
65    /// session-level locator.
66    pub locator: Option<heapless::String<128>>,
67    /// Default `SchedContext` for handles created via this Node.
68    /// Handles may override per-call. `SchedContextId::default()` =
69    /// the executor's auto-created Fifo slot (slot 0).
70    pub default_sched: SchedContextId,
71    /// Phase 104.C.3 — session-slot index. `0` resolves to the
72    /// Executor's primary `session` field; `N >= 1` resolves to
73    /// `extra_sessions[N-1]`. Each Node may bind to a different
74    /// session, enabling multi-RMW bridges in one Executor.
75    pub session_idx: u8,
76}
77
78impl NodeRecord {
79    /// Construct the implicit "primary" NodeRecord that mirrors the
80    /// Executor's pre-104.C.2 single-Node identity. Currently unused
81    /// (the primary Node is implicit until 104.C.3 wires per-Node
82    /// dispatch); kept here for the upcoming migration where every
83    /// Executor will have an explicit entry at slot 0.
84    #[allow(dead_code)]
85    pub(crate) fn new_primary(name: heapless::String<64>, namespace: heapless::String<64>) -> Self {
86        Self {
87            name,
88            namespace,
89            rmw_name: None,
90            locator: None,
91            default_sched: SchedContextId(0),
92            session_idx: 0,
93        }
94    }
95}
96
97/// Builder returned by `Executor::node_builder(name)`. Chainable
98/// configuration; `.build()` registers the Node with the Executor
99/// and returns a [`NodeId`].
100///
101/// rclcpp-aligned API. Mirrors:
102///
103/// ```ignore
104/// rclcpp::Node::make_shared("my_node",
105///     rclcpp::NodeOptions().use_intra_process_comms(true))
106/// ```
107///
108/// Where rclcpp uses a single `NodeOptions` struct, we expose the
109/// individual setters directly on the builder — fewer cycles when
110/// the user only needs one option.
111pub struct NodeBuilder<'a, 'cfg> {
112    pub(crate) executor: &'a mut super::spin::Executor,
113    pub(crate) name: &'cfg str,
114    pub(crate) namespace: Option<&'cfg str>,
115    pub(crate) rmw_name: Option<&'cfg str>,
116    pub(crate) locator: Option<&'cfg str>,
117    pub(crate) domain_id: Option<u32>,
118    pub(crate) sched: Option<SchedContextId>,
119    /// Phase 172.K.5 — explicit session slot (index into the sessions opened
120    /// by `open_multi`: 0 = primary, N = `extra_sessions[N-1]`). When set,
121    /// `build()` binds the Node directly to this session and **bypasses** the
122    /// rmw-based `resolve_session_slot` — the planner/generator already knows
123    /// which `SESSION_SPECS` slot each node belongs to (e.g. its domain group),
124    /// so no rmw/domain inference is needed. `None` ⇒ the legacy rmw-resolved
125    /// slot.
126    pub(crate) session_idx: Option<u8>,
127}
128
129impl<'a, 'cfg> NodeBuilder<'a, 'cfg> {
130    /// Select an RMW backend by name. `name` must match a backend
131    /// registered via `nros_rmw_cffi_register_named` (Phase 104.B.2).
132    ///
133    /// In Phase 104.C.2 (current), the name must match the backend
134    /// the Executor was opened against — bridge mode lands in
135    /// 104.C.3 when per-Node sessions are wired. Passing a name
136    /// that doesn't match the Executor's session returns
137    /// `Err(NodeError::BackendMismatch)` from `.build()`.
138    pub fn rmw(mut self, name: &'cfg str) -> Self {
139        self.rmw_name = Some(name);
140        self
141    }
142
143    /// Override the locator for this Node's session. Empty / unset =
144    /// use the Executor's locator.
145    pub fn locator(mut self, locator: &'cfg str) -> Self {
146        self.locator = Some(locator);
147        self
148    }
149
150    /// Override the domain id for this Node's session.
151    pub fn domain_id(mut self, domain_id: u32) -> Self {
152        self.domain_id = Some(domain_id);
153        self
154    }
155
156    /// Phase 172.K.5 — bind this Node to an explicit session slot (index into
157    /// the sessions opened by [`Executor::open_multi`]: `0` = primary,
158    /// `N` = `extra_sessions[N-1]`). Bypasses the rmw-based session resolution
159    /// — the caller (generated multi-domain wiring) already knows the slot.
160    pub fn session_idx(mut self, idx: u8) -> Self {
161        self.session_idx = Some(idx);
162        self
163    }
164
165    /// Namespace for handles created via this Node. Empty = "/".
166    pub fn namespace(mut self, namespace: &'cfg str) -> Self {
167        self.namespace = Some(namespace);
168        self
169    }
170
171    /// Default [`SchedContext`](super::sched_context::SchedContext) for
172    /// handles registered via this Node. Phase 110 integration —
173    /// handles inherit this unless they pass their own SchedContext
174    /// at registration time.
175    pub fn sched(mut self, sched: SchedContextId) -> Self {
176        self.sched = Some(sched);
177        self
178    }
179
180    /// Phase 104.C.3 — pick a session slot for the Node being
181    /// built. Returns `0` for the primary session (no rmw override
182    /// or rmw matches existing) and `N >= 1` for an extra session
183    /// just opened via `CffiRmw::open_with_rmw`.
184    #[cfg(feature = "rmw-cffi")]
185    fn resolve_session_slot(&mut self) -> Result<u8, NodeError> {
186        let Some(rmw) = self.rmw_name else {
187            return Ok(0);
188        };
189
190        // Phase 156 — check primary FIRST. Executor::open* records
191        // `primary_rmw_name` + `primary_locator` so we can detect
192        // when a `.rmw(name)` matches the primary session and
193        // return slot 0 instead of opening a SECOND backend
194        // session against the same singleton (which zenoh-pico's
195        // global g_session forbids). Locator-None means "inherit
196        // primary"; locator-Some must match primary's exactly.
197        // Empty `primary_rmw_name` → constructed via
198        // `from_session(_ptr)` without `open*` recording — fall
199        // through to extras cache + new-session path.
200        if !self.executor.primary_rmw_name.is_empty()
201            && self.executor.primary_rmw_name.as_str() == rmw
202        {
203            let locator_matches = match self.locator {
204                None => true,
205                Some(loc) => self.executor.primary_locator.as_str() == loc,
206            };
207            if locator_matches {
208                return Ok(0);
209            }
210        }
211
212        // Reuse an extra session if one already opened against the
213        // same rmw + locator. Slot 0 (primary) handled by the
214        // primary-identity check above.
215        for (i, sess) in self.executor.extra_sessions.iter().enumerate() {
216            let _ = sess;
217            // Phase 104.C.3 doesn't yet store rmw-name per session;
218            // dedupe by NodeRecord's stored rmw_name + locator.
219            if let Some(prev) = self.executor.nodes.iter().find(|n| {
220                n.session_idx as usize == i + 1
221                    && n.rmw_name.as_deref() == Some(rmw)
222                    && n.locator.as_deref() == self.locator
223            }) {
224                let _ = prev;
225                return Ok((i + 1) as u8);
226            }
227        }
228
229        // First Node naming this rmw → open a new session.
230        let mode = nros_rmw::SessionMode::Client;
231        let locator = self.locator.unwrap_or("");
232        let domain_id = self.domain_id.unwrap_or(0);
233        let cfg = nros_rmw::RmwConfig {
234            locator,
235            mode,
236            domain_id,
237            node_name: self.name,
238            namespace: self.namespace.unwrap_or(""),
239            properties: &[],
240        };
241        let session = nros_rmw_cffi::CffiRmw::open_with_rmw(rmw, &cfg)
242            .map_err(crate::executor::types::NodeError::Transport)?;
243        self.executor
244            .extra_sessions
245            .push(session)
246            .map_err(|_| NodeError::NodeTableFull)?;
247        let idx = self.executor.extra_sessions.len();
248        if idx > u8::MAX as usize {
249            return Err(NodeError::NodeTableFull);
250        }
251        // Phase 104.C.6.b — install the shared wake flag on the
252        // freshly opened extra session so its backend notifications
253        // can short-circuit `spin_once`.
254        #[cfg(feature = "std")]
255        self.executor.install_wake_signal_on_extra(idx - 1);
256        Ok(idx as u8)
257    }
258
259    #[cfg(not(feature = "rmw-cffi"))]
260    fn resolve_session_slot(&mut self) -> Result<u8, NodeError> {
261        // Without `rmw-cffi`, only the primary session exists. An
262        // rmw-name override is meaningless; treat as the primary.
263        Ok(0)
264    }
265
266    /// Register the Node with the Executor and return its
267    /// [`NodeId`]. Bumps `Executor.nodes.len()`; fails if the table
268    /// is full (`NROS_EXECUTOR_MAX_NODES` reached) or the name is
269    /// too long.
270    pub fn build(mut self) -> Result<NodeId, NodeError> {
271        if self.name.len() > 64 {
272            return Err(NodeError::NameTooLong);
273        }
274
275        // Phase 104.C.2 — single-session check. rmw mismatch is an
276        // error today; 104.C.3 will accept and open a new session
277        // via the session cache.
278        if let Some(_requested) = self.rmw_name {
279            // No accessor for the current session's rmw name yet —
280            // the registry first-registered slot drives the
281            // singleton. We accept any rmw name in C.2 (no
282            // validation) so consumer code is forward-compatible.
283            // C.3 adds the mismatch check + session-cache lookup.
284        }
285
286        let mut name_buf = heapless::String::<64>::new();
287        name_buf
288            .push_str(self.name)
289            .map_err(|_| NodeError::NameTooLong)?;
290
291        let mut ns_buf = heapless::String::<64>::new();
292        if let Some(ns) = self.namespace {
293            ns_buf.push_str(ns).map_err(|_| NodeError::NameTooLong)?;
294        } else {
295            ns_buf
296                .push_str(self.executor.namespace.as_str())
297                .map_err(|_| NodeError::NameTooLong)?;
298        }
299
300        let mut rmw_buf = None;
301        if let Some(rmw) = self.rmw_name {
302            let mut s = heapless::String::<32>::new();
303            s.push_str(rmw).map_err(|_| NodeError::NameTooLong)?;
304            rmw_buf = Some(s);
305        }
306
307        let mut loc_buf = None;
308        if let Some(loc) = self.locator {
309            let mut s = heapless::String::<128>::new();
310            s.push_str(loc).map_err(|_| NodeError::NameTooLong)?;
311            loc_buf = Some(s);
312        }
313
314        // Phase 172.K.5 — an explicit `.session_idx(n)` binds the Node to a
315        // pre-opened `open_multi` session directly (validated against the
316        // opened set), bypassing rmw resolution. Otherwise (Phase 104.C.3)
317        // resolve by rmw: slot 0 for no/primary-matching rmw, else open/reuse
318        // an extra session.
319        let session_idx = match self.session_idx {
320            Some(idx) => {
321                if idx as usize > self.executor.extra_sessions.len() {
322                    return Err(NodeError::NodeTableFull);
323                }
324                idx
325            }
326            None => self.resolve_session_slot()?,
327        };
328
329        let record = NodeRecord {
330            name: name_buf,
331            namespace: ns_buf,
332            rmw_name: rmw_buf,
333            locator: loc_buf,
334            default_sched: self.sched.unwrap_or(SchedContextId(0)),
335            session_idx,
336        };
337
338        self.executor
339            .nodes
340            .push(record)
341            .map_err(|_| NodeError::NodeTableFull)?;
342        let idx = self.executor.nodes.len() - 1;
343        if idx > u8::MAX as usize {
344            // heapless cap is far below u8::MAX; defensive only.
345            return Err(NodeError::NodeTableFull);
346        }
347        Ok(NodeId(idx as u8))
348    }
349}