nros_node/executor/node_record.rs
1//! Phase 104.C.2 — multi-Node-per-Executor storage.
2//!
3//! Mirrors the `rclcpp` pattern where a single `Executor` holds N
4//! `Node`s via `add_node(...)`. Each Node carries its own
5//! name/namespace + a reference to the Session that backs it +
6//! a default `SchedContext` (Phase 110) handles inherit unless
7//! overridden.
8//!
9//! For Phase 104.C.2 we land the *storage scaffold* + the builder
10//! API. Multi-Session-per-Executor dispatch is a follow-up
11//! (Phase 104.C.3) — today every Node in this list resolves to the
12//! Executor's primary session, which means `node_builder.rmw(name)`
13//! only accepts the same backend the Executor was opened against.
14//! Bridge use cases (two RMW backends concurrent in one Executor)
15//! light up when 104.C.3 adds the per-Node session ref.
16
17use super::{sched_context::SchedContextId, types::NodeError};
18
19/// Opaque handle returned by `Executor::node_builder(...).build()`.
20/// Used in 104.C.3+ to disambiguate handle ownership when multiple
21/// Nodes coexist in one Executor.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct NodeId(pub(crate) u8);
24
25impl NodeId {
26 /// Reserved id for the implicit "primary" Node that mirrors the
27 /// pre-Phase 104.C.2 single-Node Executor identity.
28 pub const PRIMARY: NodeId = NodeId(0);
29
30 /// Numeric index into the Executor's node table.
31 pub fn index(self) -> usize {
32 self.0 as usize
33 }
34
35 /// Phase 104.C.8.b / C.9.b — build a `NodeId` from a raw `u8` for
36 /// FFI consumers that store the index in their own struct
37 /// (`nros_node_t.node_id`, `nros_cpp_node_t.node_id`). The value
38 /// is not validated against the executor's `nodes` table — the
39 /// caller is responsible for only constructing ids that
40 /// `node_builder(...).build()` previously returned. Out-of-range
41 /// ids fail loudly at the next `with_node` / `_on(...)` call.
42 pub const fn from_raw(raw: u8) -> NodeId {
43 NodeId(raw)
44 }
45
46 /// Raw u8 form for FFI persistence.
47 pub const fn raw(self) -> u8 {
48 self.0
49 }
50}
51
52/// Per-Node metadata stored inside the Executor.
53///
54/// Phase 104.C.2 keeps the shape minimal — name, namespace,
55/// default SchedContext, optional rmw-name for diagnostics. Future
56/// items: per-Node session reference (104.C.3), per-Node liveliness
57/// state, per-Node parameter overrides.
58pub struct NodeRecord {
59 pub name: heapless::String<64>,
60 pub namespace: heapless::String<64>,
61 /// RMW backend the Node was created against. `None` for the
62 /// implicit primary Node populated from `Executor::open`.
63 pub rmw_name: Option<heapless::String<32>>,
64 /// Per-Node locator override. `None` = use the Executor's
65 /// session-level locator.
66 pub locator: Option<heapless::String<128>>,
67 /// Default `SchedContext` for handles created via this Node.
68 /// Handles may override per-call. `SchedContextId::default()` =
69 /// the executor's auto-created Fifo slot (slot 0).
70 pub default_sched: SchedContextId,
71 /// Phase 104.C.3 — session-slot index. `0` resolves to the
72 /// Executor's primary `session` field; `N >= 1` resolves to
73 /// `extra_sessions[N-1]`. Each Node may bind to a different
74 /// session, enabling multi-RMW bridges in one Executor.
75 pub session_idx: u8,
76}
77
78impl NodeRecord {
79 /// Construct the implicit "primary" NodeRecord that mirrors the
80 /// Executor's pre-104.C.2 single-Node identity. Currently unused
81 /// (the primary Node is implicit until 104.C.3 wires per-Node
82 /// dispatch); kept here for the upcoming migration where every
83 /// Executor will have an explicit entry at slot 0.
84 #[allow(dead_code)]
85 pub(crate) fn new_primary(name: heapless::String<64>, namespace: heapless::String<64>) -> Self {
86 Self {
87 name,
88 namespace,
89 rmw_name: None,
90 locator: None,
91 default_sched: SchedContextId(0),
92 session_idx: 0,
93 }
94 }
95}
96
97/// Builder returned by `Executor::node_builder(name)`. Chainable
98/// configuration; `.build()` registers the Node with the Executor
99/// and returns a [`NodeId`].
100///
101/// rclcpp-aligned API. Mirrors:
102///
103/// ```ignore
104/// rclcpp::Node::make_shared("my_node",
105/// rclcpp::NodeOptions().use_intra_process_comms(true))
106/// ```
107///
108/// Where rclcpp uses a single `NodeOptions` struct, we expose the
109/// individual setters directly on the builder — fewer cycles when
110/// the user only needs one option.
111pub struct NodeBuilder<'a, 'cfg> {
112 pub(crate) executor: &'a mut super::spin::Executor,
113 pub(crate) name: &'cfg str,
114 pub(crate) namespace: Option<&'cfg str>,
115 pub(crate) rmw_name: Option<&'cfg str>,
116 pub(crate) locator: Option<&'cfg str>,
117 pub(crate) domain_id: Option<u32>,
118 pub(crate) sched: Option<SchedContextId>,
119 /// Phase 172.K.5 — explicit session slot (index into the sessions opened
120 /// by `open_multi`: 0 = primary, N = `extra_sessions[N-1]`). When set,
121 /// `build()` binds the Node directly to this session and **bypasses** the
122 /// rmw-based `resolve_session_slot` — the planner/generator already knows
123 /// which `SESSION_SPECS` slot each node belongs to (e.g. its domain group),
124 /// so no rmw/domain inference is needed. `None` ⇒ the legacy rmw-resolved
125 /// slot.
126 pub(crate) session_idx: Option<u8>,
127}
128
129impl<'a, 'cfg> NodeBuilder<'a, 'cfg> {
130 /// Select an RMW backend by name. `name` must match a backend
131 /// registered via `nros_rmw_cffi_register_named` (Phase 104.B.2).
132 ///
133 /// In Phase 104.C.2 (current), the name must match the backend
134 /// the Executor was opened against — bridge mode lands in
135 /// 104.C.3 when per-Node sessions are wired. Passing a name
136 /// that doesn't match the Executor's session returns
137 /// `Err(NodeError::BackendMismatch)` from `.build()`.
138 pub fn rmw(mut self, name: &'cfg str) -> Self {
139 self.rmw_name = Some(name);
140 self
141 }
142
143 /// Override the locator for this Node's session. Empty / unset =
144 /// use the Executor's locator.
145 pub fn locator(mut self, locator: &'cfg str) -> Self {
146 self.locator = Some(locator);
147 self
148 }
149
150 /// Override the domain id for this Node's session.
151 pub fn domain_id(mut self, domain_id: u32) -> Self {
152 self.domain_id = Some(domain_id);
153 self
154 }
155
156 /// Phase 172.K.5 — bind this Node to an explicit session slot (index into
157 /// the sessions opened by [`Executor::open_multi`]: `0` = primary,
158 /// `N` = `extra_sessions[N-1]`). Bypasses the rmw-based session resolution
159 /// — the caller (generated multi-domain wiring) already knows the slot.
160 pub fn session_idx(mut self, idx: u8) -> Self {
161 self.session_idx = Some(idx);
162 self
163 }
164
165 /// Namespace for handles created via this Node. Empty = "/".
166 pub fn namespace(mut self, namespace: &'cfg str) -> Self {
167 self.namespace = Some(namespace);
168 self
169 }
170
171 /// Default [`SchedContext`](super::sched_context::SchedContext) for
172 /// handles registered via this Node. Phase 110 integration —
173 /// handles inherit this unless they pass their own SchedContext
174 /// at registration time.
175 pub fn sched(mut self, sched: SchedContextId) -> Self {
176 self.sched = Some(sched);
177 self
178 }
179
180 /// Phase 104.C.3 — pick a session slot for the Node being
181 /// built. Returns `0` for the primary session (no rmw override
182 /// or rmw matches existing) and `N >= 1` for an extra session
183 /// just opened via `CffiRmw::open_with_rmw`.
184 #[cfg(feature = "rmw-cffi")]
185 fn resolve_session_slot(&mut self) -> Result<u8, NodeError> {
186 let Some(rmw) = self.rmw_name else {
187 return Ok(0);
188 };
189
190 // Phase 156 — check primary FIRST. Executor::open* records
191 // `primary_rmw_name` + `primary_locator` so we can detect
192 // when a `.rmw(name)` matches the primary session and
193 // return slot 0 instead of opening a SECOND backend
194 // session against the same singleton (which zenoh-pico's
195 // global g_session forbids). Locator-None means "inherit
196 // primary"; locator-Some must match primary's exactly.
197 // Empty `primary_rmw_name` → constructed via
198 // `from_session(_ptr)` without `open*` recording — fall
199 // through to extras cache + new-session path.
200 if !self.executor.primary_rmw_name.is_empty()
201 && self.executor.primary_rmw_name.as_str() == rmw
202 {
203 let locator_matches = match self.locator {
204 None => true,
205 Some(loc) => self.executor.primary_locator.as_str() == loc,
206 };
207 if locator_matches {
208 return Ok(0);
209 }
210 }
211
212 // Reuse an extra session if one already opened against the
213 // same rmw + locator. Slot 0 (primary) handled by the
214 // primary-identity check above.
215 for (i, sess) in self.executor.extra_sessions.iter().enumerate() {
216 let _ = sess;
217 // Phase 104.C.3 doesn't yet store rmw-name per session;
218 // dedupe by NodeRecord's stored rmw_name + locator.
219 if let Some(prev) = self.executor.nodes.iter().find(|n| {
220 n.session_idx as usize == i + 1
221 && n.rmw_name.as_deref() == Some(rmw)
222 && n.locator.as_deref() == self.locator
223 }) {
224 let _ = prev;
225 return Ok((i + 1) as u8);
226 }
227 }
228
229 // First Node naming this rmw → open a new session.
230 let mode = nros_rmw::SessionMode::Client;
231 let locator = self.locator.unwrap_or("");
232 let domain_id = self.domain_id.unwrap_or(0);
233 let cfg = nros_rmw::RmwConfig {
234 locator,
235 mode,
236 domain_id,
237 node_name: self.name,
238 namespace: self.namespace.unwrap_or(""),
239 properties: &[],
240 };
241 let session = nros_rmw_cffi::CffiRmw::open_with_rmw(rmw, &cfg)
242 .map_err(crate::executor::types::NodeError::Transport)?;
243 self.executor
244 .extra_sessions
245 .push(session)
246 .map_err(|_| NodeError::NodeTableFull)?;
247 let idx = self.executor.extra_sessions.len();
248 if idx > u8::MAX as usize {
249 return Err(NodeError::NodeTableFull);
250 }
251 // Phase 104.C.6.b — install the shared wake flag on the
252 // freshly opened extra session so its backend notifications
253 // can short-circuit `spin_once`.
254 #[cfg(feature = "std")]
255 self.executor.install_wake_signal_on_extra(idx - 1);
256 Ok(idx as u8)
257 }
258
259 #[cfg(not(feature = "rmw-cffi"))]
260 fn resolve_session_slot(&mut self) -> Result<u8, NodeError> {
261 // Without `rmw-cffi`, only the primary session exists. An
262 // rmw-name override is meaningless; treat as the primary.
263 Ok(0)
264 }
265
266 /// Register the Node with the Executor and return its
267 /// [`NodeId`]. Bumps `Executor.nodes.len()`; fails if the table
268 /// is full (`NROS_EXECUTOR_MAX_NODES` reached) or the name is
269 /// too long.
270 pub fn build(mut self) -> Result<NodeId, NodeError> {
271 if self.name.len() > 64 {
272 return Err(NodeError::NameTooLong);
273 }
274
275 // Phase 104.C.2 — single-session check. rmw mismatch is an
276 // error today; 104.C.3 will accept and open a new session
277 // via the session cache.
278 if let Some(_requested) = self.rmw_name {
279 // No accessor for the current session's rmw name yet —
280 // the registry first-registered slot drives the
281 // singleton. We accept any rmw name in C.2 (no
282 // validation) so consumer code is forward-compatible.
283 // C.3 adds the mismatch check + session-cache lookup.
284 }
285
286 let mut name_buf = heapless::String::<64>::new();
287 name_buf
288 .push_str(self.name)
289 .map_err(|_| NodeError::NameTooLong)?;
290
291 let mut ns_buf = heapless::String::<64>::new();
292 if let Some(ns) = self.namespace {
293 ns_buf.push_str(ns).map_err(|_| NodeError::NameTooLong)?;
294 } else {
295 ns_buf
296 .push_str(self.executor.namespace.as_str())
297 .map_err(|_| NodeError::NameTooLong)?;
298 }
299
300 let mut rmw_buf = None;
301 if let Some(rmw) = self.rmw_name {
302 let mut s = heapless::String::<32>::new();
303 s.push_str(rmw).map_err(|_| NodeError::NameTooLong)?;
304 rmw_buf = Some(s);
305 }
306
307 let mut loc_buf = None;
308 if let Some(loc) = self.locator {
309 let mut s = heapless::String::<128>::new();
310 s.push_str(loc).map_err(|_| NodeError::NameTooLong)?;
311 loc_buf = Some(s);
312 }
313
314 // Phase 172.K.5 — an explicit `.session_idx(n)` binds the Node to a
315 // pre-opened `open_multi` session directly (validated against the
316 // opened set), bypassing rmw resolution. Otherwise (Phase 104.C.3)
317 // resolve by rmw: slot 0 for no/primary-matching rmw, else open/reuse
318 // an extra session.
319 let session_idx = match self.session_idx {
320 Some(idx) => {
321 if idx as usize > self.executor.extra_sessions.len() {
322 return Err(NodeError::NodeTableFull);
323 }
324 idx
325 }
326 None => self.resolve_session_slot()?,
327 };
328
329 let record = NodeRecord {
330 name: name_buf,
331 namespace: ns_buf,
332 rmw_name: rmw_buf,
333 locator: loc_buf,
334 default_sched: self.sched.unwrap_or(SchedContextId(0)),
335 session_idx,
336 };
337
338 self.executor
339 .nodes
340 .push(record)
341 .map_err(|_| NodeError::NodeTableFull)?;
342 let idx = self.executor.nodes.len() - 1;
343 if idx > u8::MAX as usize {
344 // heapless cap is far below u8::MAX; defensive only.
345 return Err(NodeError::NodeTableFull);
346 }
347 Ok(NodeId(idx as u8))
348 }
349}