Skip to main content

nros_node/executor/
node.rs

1//! Node — borrows the session to create typed entities.
2
3use core::marker::PhantomData;
4
5use nros_core::{RosAction, RosMessage, RosService};
6use nros_rmw::{ActionInfo, QosSettings, ServiceInfo, Session as _, TopicInfo, TransportError};
7
8use crate::{
9    cyclonedds_register::{MessageForRmw, register_type},
10    session,
11};
12
13use super::{
14    handles::{
15        ActionClient, ActionClientCallback, ActionServer, EmbeddedPublisher, EmbeddedServiceClient,
16        EmbeddedServiceServer, ServiceClientCallback, Subscription,
17    },
18    types::NodeError,
19};
20
21// ============================================================================
22// Node
23// ============================================================================
24
25/// Backend-agnostic node — borrows the session to create typed entities.
26pub struct NodeHandle<'a> {
27    name: heapless::String<64>,
28    namespace: heapless::String<64>,
29    session: &'a mut session::ConcreteSession,
30    domain_id: u32,
31    /// Phase 211.H — per-node QoS overrides lowered from the launch
32    /// `qos_overrides.<topic>.<role>.<policy>` params and baked into a
33    /// `&'static` table by the entry codegen. Folded into each entity's
34    /// `QosSettings` at `create_publisher`/`create_subscription` time
35    /// (setup-time, no alloc). Empty (`&[]`) by default → zero cost for
36    /// systems without overrides.
37    qos_overrides: &'static [nros_rmw::QosOverride],
38}
39
40impl<'a> NodeHandle<'a> {
41    /// Create a new node (called by Executor::create_node).
42    pub(crate) fn new(
43        name: heapless::String<64>,
44        namespace: heapless::String<64>,
45        session: &'a mut session::ConcreteSession,
46        domain_id: u32,
47    ) -> Self {
48        Self {
49            name,
50            namespace,
51            session,
52            domain_id,
53            qos_overrides: &[],
54        }
55    }
56
57    /// Phase 211.H — install the plan's QoS-override table on this node. Called
58    /// by the generated entry BEFORE the component constructs its entities, so
59    /// `create_publisher`/`create_subscription` fold the matching overrides in.
60    /// The table is `&'static` (codegen bakes it as a `static`), so there is no
61    /// lifetime to thread and no runtime allocation. Plan = authority: an
62    /// override for a topic the entity creates is applied transparently (the
63    /// user's `create_publisher(topic)` call is unchanged, matching rclcpp).
64    pub fn set_qos_overrides(&mut self, overrides: &'static [nros_rmw::QosOverride]) {
65        self.qos_overrides = overrides;
66    }
67
68    /// The installed QoS-override table (empty unless the entry set one).
69    #[must_use]
70    pub fn qos_overrides(&self) -> &'static [nros_rmw::QosOverride] {
71        self.qos_overrides
72    }
73
74    /// Get the node name.
75    pub fn name(&self) -> &str {
76        &self.name
77    }
78
79    /// Phase 88.12 — return the [`nros_log::Logger`] keyed on the
80    /// node name.
81    ///
82    /// Loggers are interned in nros-log's bounded global table
83    /// ([`nros_log::MAX_LOGGERS`] slots). If the caller has
84    /// pre-registered a `'static Logger` whose name matches this
85    /// node's name (via [`nros_log::register_logger`]), this method
86    /// returns that exact reference — so subsequent `nros_*!` calls
87    /// share per-logger runtime threshold state with any other call
88    /// site that resolves the same name. Otherwise the call returns
89    /// [`nros_log::DEFAULT_LOGGER`], keeping the API total.
90    ///
91    /// ```ignore
92    /// // Pre-register if you want a dedicated threshold:
93    /// static MY_NODE_LOGGER: nros_log::Logger =
94    ///     nros_log::Logger::new("my_node");
95    /// nros_log::register_logger(&MY_NODE_LOGGER);
96    ///
97    /// // Inside any node-creating code:
98    /// let logger = node.logger();
99    /// nros_log::nros_info!(logger, "started; domain = {}", node.domain_id());
100    /// ```
101    #[must_use]
102    pub fn logger(&self) -> &'static nros_log::Logger {
103        nros_log::get_logger(self.name())
104    }
105
106    /// Get the domain ID.
107    pub fn domain_id(&self) -> u32 {
108        self.domain_id
109    }
110
111    /// Set the domain ID.
112    pub fn set_domain_id(&mut self, domain_id: u32) {
113        self.domain_id = domain_id;
114    }
115
116    /// Get a mutable reference to the underlying session.
117    pub fn session_mut(&mut self) -> &mut session::ConcreteSession {
118        self.session
119    }
120
121    // ------------------------------------------------------------------
122    // Routing-info builders (Phase 91.F)
123    //
124    // Every `create_*` below threads the same node identity (domain_id +
125    // name + namespace) into a TopicInfo / ServiceInfo / ActionInfo. The
126    // shape repeats verbatim ~12 times across this file. Centralised
127    // here so a future change to the routing-info shape (e.g. adding a
128    // `with_security_context`) updates one site instead of twelve, and
129    // so the per-`create_*` function bodies focus on the parts that
130    // actually differ between them.
131    // ------------------------------------------------------------------
132
133    // Associated fns (NOT `&self` methods) so the returned `*Info`
134    // value's borrow tracks only the explicit `&str` arguments, not
135    // the whole `Node`. A `&self` form would block the immediately-
136    // following `self.session.create_*(&info, …)` mut borrow on the
137    // `name` / `namespace` reborrow held inside the returned `*Info`,
138    // because going through a method call hides the field-disjoint
139    // path that lets `&self.name` + `&mut self.session` coexist.
140    fn topic_info<'b>(
141        domain_id: u32,
142        node_name: &'b str,
143        namespace: &'b str,
144        topic_name: &'b str,
145        type_name: &'b str,
146        type_hash: &'b str,
147    ) -> TopicInfo<'b> {
148        TopicInfo::new(topic_name, type_name, type_hash)
149            .with_domain(domain_id)
150            .with_node_name(node_name)
151            .with_namespace(namespace)
152    }
153
154    fn service_info<'b>(
155        domain_id: u32,
156        node_name: &'b str,
157        namespace: &'b str,
158        service_name: &'b str,
159        type_name: &'b str,
160        type_hash: &'b str,
161    ) -> ServiceInfo<'b> {
162        ServiceInfo::new(service_name, type_name, type_hash)
163            .with_domain(domain_id)
164            .with_node_name(node_name)
165            .with_namespace(namespace)
166    }
167
168    fn action_info<'b>(
169        domain_id: u32,
170        action_name: &'b str,
171        type_name: &'b str,
172        type_hash: &'b str,
173    ) -> ActionInfo<'b> {
174        // Action root only needs the domain — per-channel ServiceInfo /
175        // TopicInfo derived from action_info.{send_goal,cancel_goal,...}_key()
176        // carry the full node identity via service_info() / topic_info().
177        ActionInfo::new(action_name, type_name, type_hash).with_domain(domain_id)
178    }
179
180    // -- Publishers --
181
182    /// Create a publisher for the given topic.
183    pub fn create_publisher<M: MessageForRmw>(
184        &mut self,
185        topic_name: &str,
186    ) -> Result<EmbeddedPublisher<M>, NodeError> {
187        self.create_publisher_with_qos::<M>(topic_name, QosSettings::default())
188    }
189
190    /// Create a publisher with custom QoS settings.
191    pub fn create_publisher_with_qos<M: MessageForRmw>(
192        &mut self,
193        topic_name: &str,
194        qos: QosSettings,
195    ) -> Result<EmbeddedPublisher<M>, NodeError> {
196        // Phase 212.K.7.6.b — under `rmw-cyclonedds`, ensure the runtime
197        // type-descriptor exists before the cffi vtable creates the
198        // entity. No-op for other RMWs.
199        register_type::<M>()?;
200        // Phase 211.H — fold any plan qos_overrides for this topic+publisher
201        // into the profile (setup-time, no alloc) BEFORE validation, so an
202        // override the backend can't honour still errors loudly below.
203        let qos = qos.apply_overrides(
204            topic_name,
205            nros_rmw::QosOverrideRole::Publisher,
206            self.qos_overrides,
207        );
208        // Phase 108.B — synchronous QoS validation against backend's
209        // `supported_qos_policies()` mask. No silent downgrade.
210        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
211            .map_err(NodeError::Transport)?;
212        let topic = Self::topic_info(
213            self.domain_id,
214            &self.name,
215            &self.namespace,
216            topic_name,
217            <M as RosMessage>::TYPE_NAME,
218            <M as RosMessage>::TYPE_HASH,
219        );
220        let handle = self
221            .session
222            .create_publisher(&topic, qos)
223            .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))?;
224        Ok(EmbeddedPublisher {
225            handle,
226            event_regs: crate::executor::handles::empty_event_regs(),
227            _phantom: PhantomData,
228        })
229    }
230
231    /// Create a typeless publisher for non-ROS wire formats (e.g. PX4 uORB
232    /// raw POD bytes, custom binary protocols). The caller supplies the
233    /// `type_name` and `type_hash` strings used by backends that need them
234    /// for liveliness/discovery; backends that don't (uORB) can pass any
235    /// stable string.
236    pub fn create_publisher_raw(
237        &mut self,
238        topic_name: &str,
239        type_name: &str,
240        type_hash: &str,
241    ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
242        self.create_publisher_raw_with_qos(topic_name, type_name, type_hash, QosSettings::default())
243    }
244
245    /// Typeless publisher with custom QoS.
246    pub fn create_publisher_raw_with_qos(
247        &mut self,
248        topic_name: &str,
249        type_name: &str,
250        type_hash: &str,
251        qos: QosSettings,
252    ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
253        // Phase 211.H — apply plan qos_overrides (publisher side) before validate.
254        let qos = qos.apply_overrides(
255            topic_name,
256            nros_rmw::QosOverrideRole::Publisher,
257            self.qos_overrides,
258        );
259        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
260            .map_err(NodeError::Transport)?;
261        let topic = Self::topic_info(
262            self.domain_id,
263            &self.name,
264            &self.namespace,
265            topic_name,
266            type_name,
267            type_hash,
268        );
269        let handle = self
270            .session
271            .create_publisher(&topic, qos)
272            .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))?;
273        Ok(crate::executor::handles::EmbeddedRawPublisher {
274            handle,
275            arena: crate::executor::handles::TxArena::new(),
276            event_regs: crate::executor::handles::empty_event_regs(),
277        })
278    }
279
280    /// Phase 189.M1 — the customizable publisher **builder** (the `clone` tier;
281    /// see `docs/design/0022-entity-api-tiers.md`). Pick a mode with `.typed::<M>()`
282    /// or `.generic(type, hash)`, set knobs (`.qos`), then `.build()`. The
283    /// convenient `create_publisher` / `create_publisher_raw` are the `fork`
284    /// tier — sugar over this with defaults.
285    pub fn publisher<'t>(&mut self, topic: &'t str) -> PublisherBuilder<'_, 'a, 't> {
286        PublisherBuilder {
287            node: self,
288            topic,
289            qos: QosSettings::default(),
290        }
291    }
292
293    // -- Subscriptions --
294
295    /// Create a subscription for the given topic.
296    pub fn create_subscription<M: MessageForRmw>(
297        &mut self,
298        topic_name: &str,
299    ) -> Result<Subscription<M>, NodeError> {
300        self.create_subscription_sized::<M, { crate::config::DEFAULT_RX_BUF_SIZE }>(topic_name)
301    }
302
303    /// Create a subscription with custom buffer size.
304    pub fn create_subscription_sized<M: MessageForRmw, const RX_BUF: usize>(
305        &mut self,
306        topic_name: &str,
307    ) -> Result<Subscription<M, RX_BUF>, NodeError> {
308        self.create_subscription_with_qos::<M, RX_BUF>(topic_name, QosSettings::default())
309    }
310
311    /// Create a subscription with custom QoS and buffer size.
312    pub fn create_subscription_with_qos<M: MessageForRmw, const RX_BUF: usize>(
313        &mut self,
314        topic_name: &str,
315        qos: QosSettings,
316    ) -> Result<Subscription<M, RX_BUF>, NodeError> {
317        // Phase 212.K.7.6.b — see `create_publisher_with_qos`.
318        register_type::<M>()?;
319        // Phase 211.H — apply plan qos_overrides (subscription side) before validate.
320        let qos = qos.apply_overrides(
321            topic_name,
322            nros_rmw::QosOverrideRole::Subscription,
323            self.qos_overrides,
324        );
325        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
326            .map_err(NodeError::Transport)?;
327        let topic = Self::topic_info(
328            self.domain_id,
329            &self.name,
330            &self.namespace,
331            topic_name,
332            <M as RosMessage>::TYPE_NAME,
333            <M as RosMessage>::TYPE_HASH,
334        );
335        let handle = self
336            .session
337            .create_subscriber(&topic, qos)
338            .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?;
339        Ok(Subscription {
340            handle,
341            buffer: [0u8; RX_BUF],
342            event_regs: crate::executor::handles::empty_event_regs(),
343            _phantom: PhantomData,
344        })
345    }
346
347    /// Create a typeless subscription. Caller decodes raw bytes themselves.
348    pub fn create_subscription_raw(
349        &mut self,
350        topic_name: &str,
351        type_name: &str,
352        type_hash: &str,
353    ) -> Result<crate::executor::handles::RawSubscription, NodeError> {
354        self.create_subscription_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }>(
355            topic_name, type_name, type_hash,
356        )
357    }
358
359    /// Typeless subscription with custom buffer size.
360    pub fn create_subscription_raw_sized<const RX_BUF: usize>(
361        &mut self,
362        topic_name: &str,
363        type_name: &str,
364        type_hash: &str,
365    ) -> Result<crate::executor::handles::RawSubscription<RX_BUF>, NodeError> {
366        // Phase 211.H — apply plan qos_overrides (subscription side) before
367        // validate, mirroring `create_publisher_raw_with_qos`. The raw entity
368        // paths honour node overrides exactly like the typed ones — an
369        // override the active RMW can't meet errors loudly, never silently.
370        let qos = QosSettings::default().apply_overrides(
371            topic_name,
372            nros_rmw::QosOverrideRole::Subscription,
373            self.qos_overrides,
374        );
375        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
376            .map_err(NodeError::Transport)?;
377        let topic = Self::topic_info(
378            self.domain_id,
379            &self.name,
380            &self.namespace,
381            topic_name,
382            type_name,
383            type_hash,
384        );
385        let handle = self
386            .session
387            .create_subscriber(&topic, qos)
388            .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?;
389        Ok(crate::executor::handles::RawSubscription {
390            handle,
391            buffer: [0u8; RX_BUF],
392            event_regs: crate::executor::handles::empty_event_regs(),
393        })
394    }
395
396    // -- Services --
397
398    /// Create a service server.
399    pub fn create_service<Svc: RosService>(
400        &mut self,
401        service_name: &str,
402    ) -> Result<EmbeddedServiceServer<Svc>, NodeError>
403    where
404        Svc::Request: MessageForRmw,
405        Svc::Reply: MessageForRmw,
406    {
407        self.create_service_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, QosSettings::services_default())
408    }
409
410    /// Phase 193.2b — service server with an explicit QoS profile (applied to
411    /// both the request + reply endpoints; rclcpp's `create_service(name, qos)`).
412    pub fn create_service_with_qos<Svc: RosService>(
413        &mut self,
414        service_name: &str,
415        qos: QosSettings,
416    ) -> Result<EmbeddedServiceServer<Svc>, NodeError>
417    where
418        Svc::Request: MessageForRmw,
419        Svc::Reply: MessageForRmw,
420    {
421        self.create_service_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, qos)
422    }
423
424    /// Create a service server with custom buffer sizes + QoS.
425    pub fn create_service_sized<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>(
426        &mut self,
427        service_name: &str,
428        qos: QosSettings,
429    ) -> Result<EmbeddedServiceServer<Svc, REQ_BUF, REPLY_BUF>, NodeError>
430    where
431        Svc::Request: MessageForRmw,
432        Svc::Reply: MessageForRmw,
433    {
434        // Phase 212.K.7.6.b — register both halves of the service round-trip
435        // under cyclonedds. No-op for other RMWs.
436        register_type::<Svc::Request>()?;
437        register_type::<Svc::Reply>()?;
438        // Phase 193.5 — validate the service profile against the backend's
439        // supported policies (mirrors pub/sub); no silent downgrade. RELIABLE is
440        // effectively required for request/reply, so a backend that only honours
441        // a fixed profile rejects an incompatible request here.
442        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
443            .map_err(NodeError::Transport)?;
444        let info = Self::service_info(
445            self.domain_id,
446            &self.name,
447            &self.namespace,
448            service_name,
449            Svc::SERVICE_NAME,
450            Svc::SERVICE_HASH,
451        );
452        let handle = self
453            .session
454            .create_service_server(&info, qos)
455            .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
456        Ok(EmbeddedServiceServer {
457            handle,
458            req_buffer: [0u8; REQ_BUF],
459            reply_buffer: [0u8; REPLY_BUF],
460            _phantom: PhantomData,
461        })
462    }
463
464    /// Create a service client.
465    pub fn create_client<Svc: RosService>(
466        &mut self,
467        service_name: &str,
468    ) -> Result<EmbeddedServiceClient<Svc>, NodeError>
469    where
470        Svc::Request: MessageForRmw,
471        Svc::Reply: MessageForRmw,
472    {
473        self.create_client_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, QosSettings::services_default())
474    }
475
476    /// Phase 193.2b — service client with an explicit QoS profile.
477    pub fn create_client_with_qos<Svc: RosService>(
478        &mut self,
479        service_name: &str,
480        qos: QosSettings,
481    ) -> Result<EmbeddedServiceClient<Svc>, NodeError>
482    where
483        Svc::Request: MessageForRmw,
484        Svc::Reply: MessageForRmw,
485    {
486        self.create_client_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, qos)
487    }
488
489    /// Create a service client with custom buffer sizes + QoS.
490    pub fn create_client_sized<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>(
491        &mut self,
492        service_name: &str,
493        qos: QosSettings,
494    ) -> Result<EmbeddedServiceClient<Svc, REQ_BUF, REPLY_BUF>, NodeError>
495    where
496        Svc::Request: MessageForRmw,
497        Svc::Reply: MessageForRmw,
498    {
499        // Phase 212.K.7.6.b — see `create_service_sized`.
500        register_type::<Svc::Request>()?;
501        register_type::<Svc::Reply>()?;
502        // Phase 193.5 — validate against the backend's supported policies (no
503        // silent downgrade); request/reply effectively requires RELIABLE.
504        qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
505            .map_err(NodeError::Transport)?;
506        let info = Self::service_info(
507            self.domain_id,
508            &self.name,
509            &self.namespace,
510            service_name,
511            Svc::SERVICE_NAME,
512            Svc::SERVICE_HASH,
513        );
514        let handle = self
515            .session
516            .create_service_client(&info, qos)
517            .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?;
518        Ok(EmbeddedServiceClient {
519            handle,
520            req_buffer: [0u8; REQ_BUF],
521            reply_buffer: [0u8; REPLY_BUF],
522            in_flight: false,
523            _phantom: PhantomData,
524        })
525    }
526
527    /// Typeless service server. L1 counterpart of [`create_service`]
528    /// for the C / C++ FFI shims and callers that own their own
529    /// scheduler. Returns a [`crate::executor::handles::RawServiceServer`]
530    /// which polls request bytes directly.
531    pub fn create_service_raw(
532        &mut self,
533        service_name: &str,
534        type_name: &str,
535        type_hash: &str,
536    ) -> Result<crate::executor::handles::RawServiceServer, NodeError> {
537        self.create_service_raw_sized::<
538            { crate::config::DEFAULT_RX_BUF_SIZE },
539            { crate::config::DEFAULT_RX_BUF_SIZE },
540        >(service_name, type_name, type_hash)
541    }
542
543    /// Typeless service server with custom buffer sizes.
544    pub fn create_service_raw_sized<const REQ_BUF: usize, const RESP_BUF: usize>(
545        &mut self,
546        service_name: &str,
547        type_name: &str,
548        type_hash: &str,
549    ) -> Result<crate::executor::handles::RawServiceServer<REQ_BUF, RESP_BUF>, NodeError> {
550        let info = Self::service_info(
551            self.domain_id,
552            &self.name,
553            &self.namespace,
554            service_name,
555            type_name,
556            type_hash,
557        );
558        let handle = self
559            .session
560            .create_service_server(&info, QosSettings::services_default())
561            .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
562        Ok(crate::executor::handles::RawServiceServer::new(handle))
563    }
564
565    /// Typeless service client. L1 counterpart of [`create_client`].
566    pub fn create_client_raw(
567        &mut self,
568        service_name: &str,
569        type_name: &str,
570        type_hash: &str,
571    ) -> Result<crate::executor::handles::RawServiceClient, NodeError> {
572        self.create_client_raw_sized::<
573            { crate::config::DEFAULT_RX_BUF_SIZE },
574            { crate::config::DEFAULT_RX_BUF_SIZE },
575        >(service_name, type_name, type_hash)
576    }
577
578    /// Typeless service client with custom buffer sizes.
579    pub fn create_client_raw_sized<const REQ_BUF: usize, const REPLY_BUF: usize>(
580        &mut self,
581        service_name: &str,
582        type_name: &str,
583        type_hash: &str,
584    ) -> Result<crate::executor::handles::RawServiceClient<REQ_BUF, REPLY_BUF>, NodeError> {
585        let info = Self::service_info(
586            self.domain_id,
587            &self.name,
588            &self.namespace,
589            service_name,
590            type_name,
591            type_hash,
592        );
593        let handle = self
594            .session
595            .create_service_client(&info, QosSettings::services_default())
596            .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?;
597        Ok(crate::executor::handles::RawServiceClient::new(handle))
598    }
599
600    // -- Actions --
601
602    /// Phase 122.3.c.6 — typeless action server. Builds the 5
603    /// transport channels (`send_goal` / `cancel_goal` / `get_result`
604    /// services + `feedback` / `status` publishers) and returns the
605    /// raw `ActionServerCore` directly. Caller owns scheduling —
606    /// drives `try_recv_goal_request` / `publish_feedback_raw` /
607    /// `complete_goal_raw` / `try_handle_cancel` /
608    /// `try_handle_get_result_raw` on the returned core.
609    pub fn create_action_server_raw(
610        &mut self,
611        action_name: &str,
612        type_name: &str,
613        type_hash: &str,
614    ) -> Result<
615        super::action_core::ActionServerCore<
616            { crate::config::DEFAULT_RX_BUF_SIZE },
617            { crate::config::DEFAULT_RX_BUF_SIZE },
618            { crate::config::DEFAULT_RX_BUF_SIZE },
619            4,
620        >,
621        NodeError,
622    > {
623        self.create_action_server_raw_sized::<
624            { crate::config::DEFAULT_RX_BUF_SIZE },
625            { crate::config::DEFAULT_RX_BUF_SIZE },
626            { crate::config::DEFAULT_RX_BUF_SIZE },
627            4,
628        >(action_name, type_name, type_hash)
629    }
630
631    /// Typeless action server with custom buffer + goal-slot sizes.
632    pub fn create_action_server_raw_sized<
633        const GOAL_BUF: usize,
634        const RESULT_BUF: usize,
635        const FEEDBACK_BUF: usize,
636        const MAX_GOALS: usize,
637    >(
638        &mut self,
639        action_name: &str,
640        type_name: &str,
641        type_hash: &str,
642    ) -> Result<
643        super::action_core::ActionServerCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>,
644        NodeError,
645    > {
646        let action_info = Self::action_info(self.domain_id, action_name, type_name, type_hash);
647
648        let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
649        let send_goal_info = Self::service_info(
650            self.domain_id,
651            &self.name,
652            &self.namespace,
653            &send_goal_keyexpr,
654            type_name,
655            type_hash,
656        );
657        let send_goal_server = self
658            .session
659            .create_service_server(&send_goal_info, QosSettings::services_default())
660            .map_err(|_| NodeError::ActionCreationFailed)?;
661
662        let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
663        let cancel_goal_info = Self::service_info(
664            self.domain_id,
665            &self.name,
666            &self.namespace,
667            &cancel_goal_keyexpr,
668            "action_msgs::srv::dds_::CancelGoal_",
669            type_hash,
670        );
671        let cancel_goal_server = self
672            .session
673            .create_service_server(&cancel_goal_info, QosSettings::services_default())
674            .map_err(|_| NodeError::ActionCreationFailed)?;
675
676        let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
677        let get_result_info = Self::service_info(
678            self.domain_id,
679            &self.name,
680            &self.namespace,
681            &get_result_keyexpr,
682            type_name,
683            type_hash,
684        );
685        let get_result_server = self
686            .session
687            .create_service_server(&get_result_info, QosSettings::services_default())
688            .map_err(|_| NodeError::ActionCreationFailed)?;
689
690        let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
691        let feedback_topic = Self::topic_info(
692            self.domain_id,
693            &self.name,
694            &self.namespace,
695            &feedback_keyexpr,
696            type_name,
697            type_hash,
698        );
699        let feedback_publisher = self
700            .session
701            .create_publisher(&feedback_topic, QosSettings::QOS_PROFILE_DEFAULT)
702            .map_err(|_| NodeError::ActionCreationFailed)?;
703
704        let status_keyexpr: heapless::String<256> = action_info.status_key();
705        let status_topic = Self::topic_info(
706            self.domain_id,
707            &self.name,
708            &self.namespace,
709            &status_keyexpr,
710            "action_msgs::msg::dds_::GoalStatusArray_",
711            type_hash,
712        );
713        let status_publisher = self
714            .session
715            .create_publisher(
716                &status_topic,
717                QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT,
718            )
719            .map_err(|_| NodeError::ActionCreationFailed)?;
720
721        Ok(super::action_core::ActionServerCore {
722            send_goal_server,
723            cancel_goal_server,
724            get_result_server,
725            feedback_publisher,
726            status_publisher,
727            active_goals: heapless::Vec::new(),
728            completed_results: heapless::Vec::new(),
729            pending_get_results: heapless::Vec::new(),
730            result_slab: [0u8; RESULT_BUF],
731            result_slab_used: 0,
732            goal_buffer: [0u8; GOAL_BUF],
733            feedback_buffer: [0u8; FEEDBACK_BUF],
734            cancel_buffer: [0u8; 256],
735        })
736    }
737
738    /// Phase 122.3.c.6 — typeless action client. Same shape as
739    /// `create_action_server_raw` but builds the 3 service clients
740    /// + 1 feedback subscriber, returns the raw `ActionClientCore`.
741    pub fn create_action_client_raw(
742        &mut self,
743        action_name: &str,
744        type_name: &str,
745        type_hash: &str,
746    ) -> Result<
747        super::action_core::ActionClientCore<
748            { crate::config::DEFAULT_RX_BUF_SIZE },
749            { crate::config::DEFAULT_RX_BUF_SIZE },
750            { crate::config::DEFAULT_RX_BUF_SIZE },
751        >,
752        NodeError,
753    > {
754        self.create_action_client_raw_sized::<
755            { crate::config::DEFAULT_RX_BUF_SIZE },
756            { crate::config::DEFAULT_RX_BUF_SIZE },
757            { crate::config::DEFAULT_RX_BUF_SIZE },
758        >(action_name, type_name, type_hash)
759    }
760
761    /// Typeless action client with custom buffer sizes.
762    pub fn create_action_client_raw_sized<
763        const GOAL_BUF: usize,
764        const RESULT_BUF: usize,
765        const FEEDBACK_BUF: usize,
766    >(
767        &mut self,
768        action_name: &str,
769        type_name: &str,
770        type_hash: &str,
771    ) -> Result<super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
772    {
773        let action_info = Self::action_info(self.domain_id, action_name, type_name, type_hash);
774
775        let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
776        let send_goal_info = Self::service_info(
777            self.domain_id,
778            &self.name,
779            &self.namespace,
780            &send_goal_keyexpr,
781            type_name,
782            type_hash,
783        );
784        let send_goal_client = self
785            .session
786            .create_service_client(&send_goal_info, QosSettings::services_default())
787            .map_err(|_| NodeError::ActionCreationFailed)?;
788
789        let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
790        let cancel_goal_info = Self::service_info(
791            self.domain_id,
792            &self.name,
793            &self.namespace,
794            &cancel_goal_keyexpr,
795            "action_msgs::srv::dds_::CancelGoal_",
796            type_hash,
797        );
798        let cancel_goal_client = self
799            .session
800            .create_service_client(&cancel_goal_info, QosSettings::services_default())
801            .map_err(|_| NodeError::ActionCreationFailed)?;
802
803        let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
804        let get_result_info = Self::service_info(
805            self.domain_id,
806            &self.name,
807            &self.namespace,
808            &get_result_keyexpr,
809            type_name,
810            type_hash,
811        );
812        let get_result_client = self
813            .session
814            .create_service_client(&get_result_info, QosSettings::services_default())
815            .map_err(|_| NodeError::ActionCreationFailed)?;
816
817        let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
818        let feedback_topic = Self::topic_info(
819            self.domain_id,
820            &self.name,
821            &self.namespace,
822            &feedback_keyexpr,
823            type_name,
824            type_hash,
825        );
826        let feedback_subscriber = self
827            .session
828            .create_subscriber(&feedback_topic, QosSettings::BEST_EFFORT)
829            .map_err(|_| NodeError::ActionCreationFailed)?;
830
831        Ok(super::action_core::ActionClientCore::new(
832            send_goal_client,
833            cancel_goal_client,
834            get_result_client,
835            feedback_subscriber,
836        ))
837    }
838
839    /// Create an action server.
840    pub fn create_action_server<A: RosAction>(
841        &mut self,
842        action_name: &str,
843    ) -> Result<ActionServer<A>, NodeError>
844    where
845        A::Goal: MessageForRmw,
846        A::Result: MessageForRmw,
847        A::Feedback: MessageForRmw,
848        A::SendGoalRequest: MessageForRmw,
849        A::SendGoalResponse: MessageForRmw,
850        A::GetResultRequest: MessageForRmw,
851        A::GetResultResponse: MessageForRmw,
852        A::FeedbackMessage: MessageForRmw,
853    {
854        self.create_action_server_sized::<A, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, 4>(action_name)
855    }
856
857    /// Create an action server with custom buffer sizes.
858    pub fn create_action_server_sized<
859        A: RosAction,
860        const GOAL_BUF: usize,
861        const RESULT_BUF: usize,
862        const FEEDBACK_BUF: usize,
863        const MAX_GOALS: usize,
864    >(
865        &mut self,
866        action_name: &str,
867    ) -> Result<ActionServer<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>, NodeError>
868    where
869        A::Goal: MessageForRmw,
870        A::Result: MessageForRmw,
871        A::Feedback: MessageForRmw,
872        A::SendGoalRequest: MessageForRmw,
873        A::SendGoalResponse: MessageForRmw,
874        A::GetResultRequest: MessageForRmw,
875        A::GetResultResponse: MessageForRmw,
876        A::FeedbackMessage: MessageForRmw,
877    {
878        // Phase 212.K.7.6.b + K.7.7.c — register the three user-facing
879        // message types AND the five action-protocol envelope types under
880        // cyclonedds. No-op for other RMWs. The envelopes are needed
881        // because the action service shapes (`*_SendGoal_Request`,
882        // `*_GetResult_Response`, …) are the actual on-wire CDR types,
883        // and the C++ Cyclone bridge auto-prepends a cdds_request_header_t
884        // for any TYPE_NAME ending `_Request`/`_Response`/`_Reply`.
885        register_type::<A::Goal>()?;
886        register_type::<A::Result>()?;
887        register_type::<A::Feedback>()?;
888        register_type::<A::SendGoalRequest>()?;
889        register_type::<A::SendGoalResponse>()?;
890        register_type::<A::GetResultRequest>()?;
891        register_type::<A::GetResultResponse>()?;
892        register_type::<A::FeedbackMessage>()?;
893        let action_info =
894            Self::action_info(self.domain_id, action_name, A::ACTION_NAME, A::ACTION_HASH);
895
896        // Each underlying ServiceInfo / TopicInfo also carries the
897        // node identity so the Zenoh shim declares a liveliness token
898        // for it. Without `with_node_name` the shim's
899        // `declare_entity_liveliness` short-circuits (`node_name.and_then`
900        // → None) and `wait_for_action_server` has nothing to find.
901        // Advertise the per-channel service / topic types ROS 2 matches on
902        // (`<Action>_SendGoal` / `<Action>_GetResult` / `<Action>_FeedbackMessage`),
903        // not the bare action type — see `action_core::action_service_base_type`.
904        let send_goal_type = super::action_core::action_service_base_type(
905            <A::SendGoalRequest as RosMessage>::TYPE_NAME,
906            A::ACTION_NAME,
907        );
908        let get_result_type = super::action_core::action_service_base_type(
909            <A::GetResultRequest as RosMessage>::TYPE_NAME,
910            A::ACTION_NAME,
911        );
912        let feedback_type = <A::FeedbackMessage as RosMessage>::TYPE_NAME;
913
914        let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
915        let send_goal_info = Self::service_info(
916            self.domain_id,
917            &self.name,
918            &self.namespace,
919            &send_goal_keyexpr,
920            send_goal_type,
921            A::ACTION_HASH,
922        );
923        let send_goal_server = self
924            .session
925            .create_service_server(&send_goal_info, QosSettings::services_default())
926            .map_err(|_| NodeError::ActionCreationFailed)?;
927
928        let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
929        let cancel_goal_info = Self::service_info(
930            self.domain_id,
931            &self.name,
932            &self.namespace,
933            &cancel_goal_keyexpr,
934            "action_msgs::srv::dds_::CancelGoal_",
935            A::ACTION_HASH,
936        );
937        let cancel_goal_server = self
938            .session
939            .create_service_server(&cancel_goal_info, QosSettings::services_default())
940            .map_err(|_| NodeError::ActionCreationFailed)?;
941
942        let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
943        let get_result_info = Self::service_info(
944            self.domain_id,
945            &self.name,
946            &self.namespace,
947            &get_result_keyexpr,
948            get_result_type,
949            A::ACTION_HASH,
950        );
951        let get_result_server = self
952            .session
953            .create_service_server(&get_result_info, QosSettings::services_default())
954            .map_err(|_| NodeError::ActionCreationFailed)?;
955
956        let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
957        let feedback_topic = Self::topic_info(
958            self.domain_id,
959            &self.name,
960            &self.namespace,
961            &feedback_keyexpr,
962            feedback_type,
963            A::ACTION_HASH,
964        );
965        let feedback_publisher = self
966            .session
967            .create_publisher(&feedback_topic, QosSettings::QOS_PROFILE_DEFAULT)
968            .map_err(|_| NodeError::ActionCreationFailed)?;
969
970        let status_keyexpr: heapless::String<256> = action_info.status_key();
971        let status_topic = Self::topic_info(
972            self.domain_id,
973            &self.name,
974            &self.namespace,
975            &status_keyexpr,
976            "action_msgs::msg::dds_::GoalStatusArray_",
977            A::ACTION_HASH,
978        );
979        let status_publisher = self
980            .session
981            .create_publisher(
982                &status_topic,
983                QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT,
984            )
985            .map_err(|_| NodeError::ActionCreationFailed)?;
986
987        Ok(ActionServer {
988            core: super::action_core::ActionServerCore {
989                send_goal_server,
990                cancel_goal_server,
991                get_result_server,
992                feedback_publisher,
993                status_publisher,
994                active_goals: heapless::Vec::new(),
995                completed_results: heapless::Vec::new(),
996                pending_get_results: heapless::Vec::new(),
997                result_slab: [0u8; RESULT_BUF],
998                result_slab_used: 0,
999                goal_buffer: [0u8; GOAL_BUF],
1000                feedback_buffer: [0u8; FEEDBACK_BUF],
1001                cancel_buffer: [0u8; 256],
1002            },
1003            typed_goals: heapless::Vec::new(),
1004            completed_goals: heapless::Vec::new(),
1005        })
1006    }
1007
1008    /// Create an action client.
1009    pub fn create_action_client<A: RosAction>(
1010        &mut self,
1011        action_name: &str,
1012    ) -> Result<ActionClient<A>, NodeError>
1013    where
1014        A::Goal: MessageForRmw,
1015        A::Result: MessageForRmw,
1016        A::Feedback: MessageForRmw,
1017        A::SendGoalRequest: MessageForRmw,
1018        A::SendGoalResponse: MessageForRmw,
1019        A::GetResultRequest: MessageForRmw,
1020        A::GetResultResponse: MessageForRmw,
1021        A::FeedbackMessage: MessageForRmw,
1022    {
1023        self.create_action_client_sized::<A, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(action_name)
1024    }
1025
1026    /// Create an action client with custom buffer sizes.
1027    pub fn create_action_client_sized<
1028        A: RosAction,
1029        const GOAL_BUF: usize,
1030        const RESULT_BUF: usize,
1031        const FEEDBACK_BUF: usize,
1032    >(
1033        &mut self,
1034        action_name: &str,
1035    ) -> Result<ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
1036    where
1037        A::Goal: MessageForRmw,
1038        A::Result: MessageForRmw,
1039        A::Feedback: MessageForRmw,
1040        A::SendGoalRequest: MessageForRmw,
1041        A::SendGoalResponse: MessageForRmw,
1042        A::GetResultRequest: MessageForRmw,
1043        A::GetResultResponse: MessageForRmw,
1044        A::FeedbackMessage: MessageForRmw,
1045    {
1046        // Phase 212.K.7.6.b + K.7.7.c — see `create_action_server_sized`.
1047        register_type::<A::Goal>()?;
1048        register_type::<A::Result>()?;
1049        register_type::<A::Feedback>()?;
1050        register_type::<A::SendGoalRequest>()?;
1051        register_type::<A::SendGoalResponse>()?;
1052        register_type::<A::GetResultRequest>()?;
1053        register_type::<A::GetResultResponse>()?;
1054        register_type::<A::FeedbackMessage>()?;
1055        let action_info =
1056            Self::action_info(self.domain_id, action_name, A::ACTION_NAME, A::ACTION_HASH);
1057
1058        // Mirror `create_action_server_sized`: thread node identity through
1059        // each underlying ServiceInfo / TopicInfo so the Zenoh shim
1060        // declares the matching client-side liveliness tokens (and so the
1061        // discovery wildcard built from `send_goal_info` ends up in the
1062        // same domain as the server's tokens).
1063        // Same per-channel typing as the server side so the client's requesters
1064        // and feedback reader match a real ROS 2 action server over DDS.
1065        let send_goal_type = super::action_core::action_service_base_type(
1066            <A::SendGoalRequest as RosMessage>::TYPE_NAME,
1067            A::ACTION_NAME,
1068        );
1069        let get_result_type = super::action_core::action_service_base_type(
1070            <A::GetResultRequest as RosMessage>::TYPE_NAME,
1071            A::ACTION_NAME,
1072        );
1073        let feedback_type = <A::FeedbackMessage as RosMessage>::TYPE_NAME;
1074
1075        let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
1076        let send_goal_info = Self::service_info(
1077            self.domain_id,
1078            &self.name,
1079            &self.namespace,
1080            &send_goal_keyexpr,
1081            send_goal_type,
1082            A::ACTION_HASH,
1083        );
1084        let send_goal_client = self
1085            .session
1086            .create_service_client(&send_goal_info, QosSettings::services_default())
1087            .map_err(|_| NodeError::ActionCreationFailed)?;
1088
1089        let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
1090        let cancel_goal_info = Self::service_info(
1091            self.domain_id,
1092            &self.name,
1093            &self.namespace,
1094            &cancel_goal_keyexpr,
1095            "action_msgs::srv::dds_::CancelGoal_",
1096            A::ACTION_HASH,
1097        );
1098        let cancel_goal_client = self
1099            .session
1100            .create_service_client(&cancel_goal_info, QosSettings::services_default())
1101            .map_err(|_| NodeError::ActionCreationFailed)?;
1102
1103        let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
1104        let get_result_info = Self::service_info(
1105            self.domain_id,
1106            &self.name,
1107            &self.namespace,
1108            &get_result_keyexpr,
1109            get_result_type,
1110            A::ACTION_HASH,
1111        );
1112        let get_result_client = self
1113            .session
1114            .create_service_client(&get_result_info, QosSettings::services_default())
1115            .map_err(|_| NodeError::ActionCreationFailed)?;
1116
1117        let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
1118        let feedback_topic = Self::topic_info(
1119            self.domain_id,
1120            &self.name,
1121            &self.namespace,
1122            &feedback_keyexpr,
1123            feedback_type,
1124            A::ACTION_HASH,
1125        );
1126        let feedback_subscriber = self
1127            .session
1128            .create_subscriber(&feedback_topic, QosSettings::BEST_EFFORT)
1129            .map_err(|_| NodeError::ActionCreationFailed)?;
1130
1131        Ok(ActionClient {
1132            core: super::action_core::ActionClientCore {
1133                send_goal_client,
1134                cancel_goal_client,
1135                get_result_client,
1136                feedback_subscriber,
1137                goal_buffer: [0u8; GOAL_BUF],
1138                result_buffer: [0u8; RESULT_BUF],
1139                feedback_buffer: [0u8; FEEDBACK_BUF],
1140                goal_counter: 0,
1141                in_flight_send_goal: false,
1142                in_flight_cancel: false,
1143                in_flight_get_result: false,
1144            },
1145            _phantom: PhantomData,
1146        })
1147    }
1148}
1149
1150// ===================================================================
1151// Phase 189.M1 — entity builders (the `clone` tier)
1152// ===================================================================
1153
1154/// Publisher builder — `node.publisher(topic)`. Choose `.typed::<M>()` or
1155/// `.generic(type, hash)`, optionally `.qos(..)`, then `.build()`.
1156pub struct PublisherBuilder<'n, 'a, 't> {
1157    node: &'n mut NodeHandle<'a>,
1158    topic: &'t str,
1159    qos: QosSettings,
1160}
1161
1162impl<'n, 'a, 't> PublisherBuilder<'n, 'a, 't> {
1163    /// Set the QoS (also settable on the typed/generic builder).
1164    pub fn qos(mut self, qos: QosSettings) -> Self {
1165        self.qos = qos;
1166        self
1167    }
1168
1169    /// Typed publisher for a ROS message `M` (mirrors rclcpp/rclrs).
1170    pub fn typed<M: MessageForRmw>(self) -> TypedPublisherBuilder<'n, 'a, 't, M> {
1171        TypedPublisherBuilder {
1172            node: self.node,
1173            topic: self.topic,
1174            qos: self.qos,
1175            _phantom: PhantomData,
1176        }
1177    }
1178
1179    /// Generic (type-erased) publisher — the rclcpp `create_generic_publisher`
1180    /// form; raw CDR bytes via `publish_raw`.
1181    pub fn generic(
1182        self,
1183        type_name: &'t str,
1184        type_hash: &'t str,
1185    ) -> GenericPublisherBuilder<'n, 'a, 't> {
1186        GenericPublisherBuilder {
1187            node: self.node,
1188            topic: self.topic,
1189            type_name,
1190            type_hash,
1191            qos: self.qos,
1192        }
1193    }
1194}
1195
1196/// Typed publisher builder (`.typed::<M>()`).
1197pub struct TypedPublisherBuilder<'n, 'a, 't, M> {
1198    node: &'n mut NodeHandle<'a>,
1199    topic: &'t str,
1200    qos: QosSettings,
1201    _phantom: PhantomData<M>,
1202}
1203
1204impl<'n, 'a, 't, M: MessageForRmw> TypedPublisherBuilder<'n, 'a, 't, M> {
1205    pub fn qos(mut self, qos: QosSettings) -> Self {
1206        self.qos = qos;
1207        self
1208    }
1209
1210    pub fn build(self) -> Result<EmbeddedPublisher<M>, NodeError> {
1211        self.node
1212            .create_publisher_with_qos::<M>(self.topic, self.qos)
1213    }
1214}
1215
1216/// Generic (type-erased) publisher builder (`.generic(type, hash)`).
1217pub struct GenericPublisherBuilder<'n, 'a, 't> {
1218    node: &'n mut NodeHandle<'a>,
1219    topic: &'t str,
1220    type_name: &'t str,
1221    type_hash: &'t str,
1222    qos: QosSettings,
1223}
1224
1225impl<'n, 'a, 't> GenericPublisherBuilder<'n, 'a, 't> {
1226    pub fn qos(mut self, qos: QosSettings) -> Self {
1227        self.qos = qos;
1228        self
1229    }
1230
1231    pub fn build(self) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1232        self.node.create_publisher_raw_with_qos(
1233            self.topic,
1234            self.type_name,
1235            self.type_hash,
1236            self.qos,
1237        )
1238    }
1239}
1240
1241/// An executor-borrowing node handle — `exec.node(id)`. Hosts the
1242/// callback-registering entity builders (subscriptions register into the
1243/// executor's dispatch arena). It is a **short-lived `&mut Executor` borrow**:
1244/// create entities, then drop it before acquiring the next node handle; entity
1245/// handles (`HandleId`, publishers) are owned and outlive it (no `Arc` — see
1246/// `docs/design/0022-entity-api-tiers.md` §Borrow model).
1247pub struct NodeCtx<'e> {
1248    executor: &'e mut super::spin::Executor,
1249    node_id: super::node_record::NodeId,
1250}
1251
1252impl<'e> NodeCtx<'e> {
1253    pub(crate) fn new(
1254        executor: &'e mut super::spin::Executor,
1255        node_id: super::node_record::NodeId,
1256    ) -> Self {
1257        Self { executor, node_id }
1258    }
1259
1260    /// Subscription builder (the `clone` tier). Pick a mode with `.typed::<M>()`
1261    /// or `.generic(type, hash)`, set knobs (`.qos`), then `.build(callback)`.
1262    pub fn subscription<'t>(&mut self, topic: &'t str) -> SubscriptionBuilder<'_, 'e, 't> {
1263        SubscriptionBuilder {
1264            ctx: self,
1265            topic,
1266            qos: QosSettings::default(),
1267        }
1268    }
1269
1270    /// Publisher builder (the `clone` tier), symmetric with
1271    /// [`subscription`](Self::subscription). Pick `.typed::<M>()` or
1272    /// `.generic(type, hash)`, set `.qos()`, then `.build()`. The returned
1273    /// publisher handle is owned and outlives this `NodeCtx` — the bridge
1274    /// builds the dest publisher on one ctx, drops it, then registers the
1275    /// source subscription on another (see `0022-entity-api-tiers.md`).
1276    pub fn publisher<'t>(&mut self, topic: &'t str) -> CtxPublisherBuilder<'_, 'e, 't> {
1277        CtxPublisherBuilder {
1278            ctx: self,
1279            topic,
1280            qos: QosSettings::default(),
1281        }
1282    }
1283
1284    /// Convenient typed publisher (the `fork` tier — rclcpp/rclrs shape).
1285    pub fn create_publisher<M: MessageForRmw>(
1286        &mut self,
1287        topic: &str,
1288    ) -> Result<EmbeddedPublisher<M>, NodeError> {
1289        self.executor
1290            .create_publisher_on::<M>(self.node_id, topic, QosSettings::default())
1291    }
1292
1293    /// Convenient generic (type-erased) publisher — rclcpp `create_generic_*`.
1294    pub fn create_generic_publisher(
1295        &mut self,
1296        topic: &str,
1297        type_name: &str,
1298        type_hash: &str,
1299    ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1300        self.executor.create_publisher_raw_on(
1301            self.node_id,
1302            topic,
1303            type_name,
1304            type_hash,
1305            QosSettings::default(),
1306        )
1307    }
1308
1309    /// Convenient typed subscription (the `fork` tier — rclcpp/rclrs shape).
1310    /// Sugar over the builder with default QoS + buffer.
1311    pub fn create_subscription<M, F>(
1312        &mut self,
1313        topic: &str,
1314        callback: F,
1315    ) -> Result<super::types::HandleId, NodeError>
1316    where
1317        M: MessageForRmw + 'static,
1318        F: FnMut(&M) + 'static,
1319    {
1320        self.executor
1321            .register_subscription_buffered_on::<M, F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1322                self.node_id,
1323                topic,
1324                QosSettings::default(),
1325                callback,
1326            )
1327    }
1328
1329    /// RFC-0041 / Phase 239.1 — callback-based service client (rclcpp
1330    /// `async_send_request(req, cb)` analogue). The reply is delivered to
1331    /// `callback` at `spin_once` (no `Promise` poll). Returns a
1332    /// [`ServiceClientCallback`] send handle; dual-mode — the `Promise`-based
1333    /// [`create_client`](Self::create_client) is unchanged.
1334    pub fn create_client_with_callback<Svc, F>(
1335        &mut self,
1336        service_name: &str,
1337        callback: F,
1338    ) -> Result<ServiceClientCallback<Svc>, NodeError>
1339    where
1340        Svc: RosService + 'static,
1341        Svc::Request: MessageForRmw,
1342        Svc::Reply: MessageForRmw,
1343        F: FnMut(&Svc::Reply) + 'static,
1344    {
1345        self.create_client_with_callback_sized::<
1346            Svc,
1347            F,
1348            { crate::config::DEFAULT_RX_BUF_SIZE },
1349            { crate::config::DEFAULT_RX_BUF_SIZE },
1350        >(service_name, callback)
1351    }
1352
1353    /// Callback-based service client with custom buffer sizes (Phase 239.1).
1354    pub fn create_client_with_callback_sized<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
1355        &mut self,
1356        service_name: &str,
1357        callback: F,
1358    ) -> Result<ServiceClientCallback<Svc, REQ_BUF, REPLY_BUF>, NodeError>
1359    where
1360        Svc: RosService + 'static,
1361        Svc::Request: MessageForRmw,
1362        Svc::Reply: MessageForRmw,
1363        F: FnMut(&Svc::Reply) + 'static,
1364    {
1365        register_type::<Svc::Request>()?;
1366        register_type::<Svc::Reply>()?;
1367        let (_id, hdr) = self
1368            .executor
1369            .register_service_client_callback::<Svc, F, REPLY_BUF>(
1370                Some(self.node_id),
1371                service_name,
1372                Svc::SERVICE_NAME,
1373                Svc::SERVICE_HASH,
1374                QosSettings::services_default(),
1375                callback,
1376            )?;
1377        Ok(ServiceClientCallback::new(hdr))
1378    }
1379
1380    /// RFC-0041 / Phase 239.2 — callback-based action client (rclcpp
1381    /// `SendGoalOptions{goal_response_callback, feedback_callback,
1382    /// result_callback}` analogue). Goal-response / feedback / result are
1383    /// delivered to the closures at `spin_once`. Returns an
1384    /// [`ActionClientCallback`] send handle (`send_goal` / `get_result`);
1385    /// dual-mode — the `Promise`-based [`create_action_client`](Self::create_action_client)
1386    /// is unchanged.
1387    #[allow(clippy::type_complexity)]
1388    pub fn create_action_client_with_callbacks<A, GRespF, FbF, ResF>(
1389        &mut self,
1390        action_name: &str,
1391        on_goal_response: GRespF,
1392        on_feedback: FbF,
1393        on_result: ResF,
1394    ) -> Result<ActionClientCallback<A>, NodeError>
1395    where
1396        A: RosAction + 'static,
1397        A::Goal: MessageForRmw,
1398        A::Result: MessageForRmw,
1399        A::Feedback: MessageForRmw,
1400        GRespF: FnMut(&nros_core::GoalId, bool) + 'static,
1401        FbF: FnMut(&nros_core::GoalId, &A::Feedback) + 'static,
1402        ResF: FnMut(&nros_core::GoalId, nros_core::GoalStatus, &A::Result) + 'static,
1403    {
1404        self.create_action_client_with_callbacks_sized::<
1405            A,
1406            GRespF,
1407            FbF,
1408            ResF,
1409            { crate::config::DEFAULT_RX_BUF_SIZE },
1410            { crate::config::DEFAULT_RX_BUF_SIZE },
1411            { crate::config::DEFAULT_RX_BUF_SIZE },
1412        >(action_name, on_goal_response, on_feedback, on_result)
1413    }
1414
1415    /// Callback-based action client with custom buffer sizes (Phase 239.2).
1416    #[allow(clippy::type_complexity)]
1417    pub fn create_action_client_with_callbacks_sized<
1418        A,
1419        GRespF,
1420        FbF,
1421        ResF,
1422        const GOAL_BUF: usize,
1423        const RESULT_BUF: usize,
1424        const FEEDBACK_BUF: usize,
1425    >(
1426        &mut self,
1427        action_name: &str,
1428        on_goal_response: GRespF,
1429        on_feedback: FbF,
1430        on_result: ResF,
1431    ) -> Result<ActionClientCallback<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
1432    where
1433        A: RosAction + 'static,
1434        A::Goal: MessageForRmw,
1435        A::Result: MessageForRmw,
1436        A::Feedback: MessageForRmw,
1437        GRespF: FnMut(&nros_core::GoalId, bool) + 'static,
1438        FbF: FnMut(&nros_core::GoalId, &A::Feedback) + 'static,
1439        ResF: FnMut(&nros_core::GoalId, nros_core::GoalStatus, &A::Result) + 'static,
1440    {
1441        register_type::<A::Goal>()?;
1442        register_type::<A::Result>()?;
1443        register_type::<A::Feedback>()?;
1444        let (_id, core) = self
1445            .executor
1446            .register_action_client_callback::<A, GRespF, FbF, ResF, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>(
1447                Some(self.node_id),
1448                action_name,
1449                A::ACTION_NAME,
1450                A::ACTION_HASH,
1451                // Feedback is a stream → buffer a short QoS-depth history (Phase
1452                // 239.5). Goal-response / result are single-outstanding (gated).
1453                8u16,
1454                on_goal_response,
1455                on_feedback,
1456                on_result,
1457            )?;
1458        Ok(ActionClientCallback::new(core))
1459    }
1460
1461    /// Convenient generic (type-erased) subscription — rclcpp `create_generic_*`.
1462    pub fn create_generic_subscription<F>(
1463        &mut self,
1464        topic: &str,
1465        type_name: &str,
1466        type_hash: &str,
1467        callback: F,
1468    ) -> Result<super::types::HandleId, NodeError>
1469    where
1470        F: FnMut(&[u8]) + 'static,
1471    {
1472        self.executor
1473            .register_subscription_buffered_raw_on::<F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1474                self.node_id,
1475                topic,
1476                type_name,
1477                type_hash,
1478                QosSettings::default(),
1479                callback,
1480            )
1481    }
1482
1483    /// Phase 250 (Wave 2) — generic (type-erased) subscription that surfaces E2E
1484    /// [`IntegrityStatus`](nros_rmw::IntegrityStatus) (CRC + sequence gap/dup) to
1485    /// the callback (`FnMut(&[u8], &IntegrityStatus)`). The declarative-`Node`
1486    /// analog of the typed `.typed::<M>().safety()` builder: the validator lives
1487    /// in the `RmwSubscriber`, so the raw bytes + status arrive together without
1488    /// a typed `M`. Wired by the declarative runtime's `.safety()` opt-in.
1489    #[cfg(feature = "safety-e2e")]
1490    pub fn create_generic_subscription_with_integrity<F>(
1491        &mut self,
1492        topic: &str,
1493        type_name: &str,
1494        type_hash: &str,
1495        callback: F,
1496    ) -> Result<super::types::HandleId, NodeError>
1497    where
1498        F: FnMut(&[u8], &nros_rmw::IntegrityStatus) + 'static,
1499    {
1500        self.executor
1501            .register_subscription_buffered_raw_safety_on::<F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1502                self.node_id,
1503                topic,
1504                type_name,
1505                type_hash,
1506                QosSettings::default(),
1507                callback,
1508            )
1509    }
1510
1511    /// Convenient borrowed (zero-copy) subscription (Phase 229.6, issue 0007 /
1512    /// RFC-0033 `borrowed` mode).
1513    ///
1514    /// `B` is the code-generated borrowed-message marker (e.g. `ImageBorrow`,
1515    /// emitted alongside the owned `Image` for a `.msg` with a `borrowed`-mode
1516    /// field). The callback receives `&B::View<'a>` — a lifetime-carrying
1517    /// message whose unbounded sequence/string fields borrow directly from the
1518    /// receive buffer (no `heapless::Vec` copy); the view is valid only for the
1519    /// callback's duration.
1520    ///
1521    /// Uses `KEEP_LAST(1)` QoS → triple buffer, as borrowed subscriptions
1522    /// require (a single well-defined slot for the callback's borrow). For an
1523    /// explicit deeper queue use the owned
1524    /// [`create_subscription`](Self::create_subscription); a borrowed
1525    /// subscription registered with `KEEP_LAST(N>1)` is rejected.
1526    pub fn create_subscription_borrowed<B, F>(
1527        &mut self,
1528        topic: &str,
1529        callback: F,
1530    ) -> Result<super::types::HandleId, NodeError>
1531    where
1532        B: nros_core::BorrowedMessage + 'static,
1533        F: for<'a> FnMut(&B::View<'a>) + 'static,
1534    {
1535        self.executor
1536            .register_subscription_buffered_borrowed_on::<B, F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1537                self.node_id,
1538                topic,
1539                QosSettings::default().keep_last(1),
1540                callback,
1541            )
1542    }
1543
1544    /// Service-server builder (the `clone` tier) — `node.service(name)`.
1545    /// Set `.qos()` (defaults to the services profile = RELIABLE+VOLATILE+
1546    /// KEEP_LAST(10)), then `.build::<Svc, _>(callback)` (Phase 193.2).
1547    pub fn service<'t>(&mut self, name: &'t str) -> CtxServiceBuilder<'_, 'e, 't> {
1548        CtxServiceBuilder {
1549            ctx: self,
1550            name,
1551            qos: QosSettings::services_default(),
1552        }
1553    }
1554
1555    /// Convenient service server (the `fork` tier — rclrs/rclcpp shape), default
1556    /// services QoS. Mirror of `create_subscription`.
1557    pub fn create_service<Svc, F>(
1558        &mut self,
1559        name: &str,
1560        callback: F,
1561    ) -> Result<super::types::HandleId, NodeError>
1562    where
1563        Svc: RosService + 'static,
1564        Svc::Request: crate::cyclonedds_register::MessageForRmw,
1565        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
1566        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
1567    {
1568        self.executor.register_service_sized_on::<
1569            Svc,
1570            F,
1571            { crate::config::DEFAULT_RX_BUF_SIZE },
1572            { crate::config::DEFAULT_RX_BUF_SIZE },
1573        >(self.node_id, name, QosSettings::services_default(), callback)
1574    }
1575}
1576
1577/// Service-server builder on a [`NodeCtx`] — `node.service(name)`.
1578pub struct CtxServiceBuilder<'c, 'e, 't> {
1579    ctx: &'c mut NodeCtx<'e>,
1580    name: &'t str,
1581    qos: QosSettings,
1582}
1583
1584impl<'c, 'e, 't> CtxServiceBuilder<'c, 'e, 't> {
1585    /// Service QoS (applies to both the request + reply endpoints). Defaults to
1586    /// `QosSettings::services_default()`.
1587    pub fn qos(mut self, qos: QosSettings) -> Self {
1588        self.qos = qos;
1589        self
1590    }
1591
1592    pub fn build<Svc, F>(self, callback: F) -> Result<super::types::HandleId, NodeError>
1593    where
1594        Svc: RosService + 'static,
1595        Svc::Request: crate::cyclonedds_register::MessageForRmw,
1596        Svc::Reply: crate::cyclonedds_register::MessageForRmw,
1597        F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
1598    {
1599        self.ctx.executor.register_service_sized_on::<
1600            Svc,
1601            F,
1602            { crate::config::DEFAULT_RX_BUF_SIZE },
1603            { crate::config::DEFAULT_RX_BUF_SIZE },
1604        >(self.ctx.node_id, self.name, self.qos, callback)
1605    }
1606}
1607
1608/// Publisher builder on a [`NodeCtx`] — `node.publisher(topic)`.
1609pub struct CtxPublisherBuilder<'c, 'e, 't> {
1610    ctx: &'c mut NodeCtx<'e>,
1611    topic: &'t str,
1612    qos: QosSettings,
1613}
1614
1615impl<'c, 'e, 't> CtxPublisherBuilder<'c, 'e, 't> {
1616    pub fn qos(mut self, qos: QosSettings) -> Self {
1617        self.qos = qos;
1618        self
1619    }
1620
1621    /// Typed publisher for a ROS message `M`.
1622    pub fn typed<M: MessageForRmw>(self) -> CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1623        CtxTypedPublisherBuilder {
1624            ctx: self.ctx,
1625            topic: self.topic,
1626            qos: self.qos,
1627            _phantom: PhantomData,
1628        }
1629    }
1630
1631    /// Generic (type-erased) publisher.
1632    pub fn generic(
1633        self,
1634        type_name: &'t str,
1635        type_hash: &'t str,
1636    ) -> CtxGenericPublisherBuilder<'c, 'e, 't> {
1637        CtxGenericPublisherBuilder {
1638            ctx: self.ctx,
1639            topic: self.topic,
1640            type_name,
1641            type_hash,
1642            qos: self.qos,
1643        }
1644    }
1645}
1646
1647/// Typed publisher builder on a `NodeCtx` (`.typed::<M>()`).
1648pub struct CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1649    ctx: &'c mut NodeCtx<'e>,
1650    topic: &'t str,
1651    qos: QosSettings,
1652    _phantom: PhantomData<M>,
1653}
1654
1655impl<'c, 'e, 't, M: MessageForRmw> CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1656    pub fn qos(mut self, qos: QosSettings) -> Self {
1657        self.qos = qos;
1658        self
1659    }
1660
1661    pub fn build(self) -> Result<EmbeddedPublisher<M>, NodeError> {
1662        self.ctx
1663            .executor
1664            .create_publisher_on::<M>(self.ctx.node_id, self.topic, self.qos)
1665    }
1666}
1667
1668/// Generic publisher builder on a `NodeCtx` (`.generic(type, hash)`).
1669pub struct CtxGenericPublisherBuilder<'c, 'e, 't> {
1670    ctx: &'c mut NodeCtx<'e>,
1671    topic: &'t str,
1672    type_name: &'t str,
1673    type_hash: &'t str,
1674    qos: QosSettings,
1675}
1676
1677impl<'c, 'e, 't> CtxGenericPublisherBuilder<'c, 'e, 't> {
1678    pub fn qos(mut self, qos: QosSettings) -> Self {
1679        self.qos = qos;
1680        self
1681    }
1682
1683    pub fn build(self) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1684        self.ctx.executor.create_publisher_raw_on(
1685            self.ctx.node_id,
1686            self.topic,
1687            self.type_name,
1688            self.type_hash,
1689            self.qos,
1690        )
1691    }
1692}
1693
1694/// Subscription builder — `node.subscription(topic)`.
1695pub struct SubscriptionBuilder<'c, 'e, 't> {
1696    ctx: &'c mut NodeCtx<'e>,
1697    topic: &'t str,
1698    qos: QosSettings,
1699}
1700
1701impl<'c, 'e, 't> SubscriptionBuilder<'c, 'e, 't> {
1702    pub fn qos(mut self, qos: QosSettings) -> Self {
1703        self.qos = qos;
1704        self
1705    }
1706
1707    /// Typed subscription for a ROS message `M`.
1708    pub fn typed<M: MessageForRmw + 'static>(self) -> TypedSubscriptionBuilder<'c, 'e, 't, M> {
1709        TypedSubscriptionBuilder {
1710            ctx: self.ctx,
1711            topic: self.topic,
1712            qos: self.qos,
1713            sched: None,
1714            _phantom: PhantomData,
1715        }
1716    }
1717
1718    /// Generic (type-erased) subscription — raw CDR bytes to the callback.
1719    pub fn generic(
1720        self,
1721        type_name: &'t str,
1722        type_hash: &'t str,
1723    ) -> GenericSubscriptionBuilder<'c, 'e, 't> {
1724        GenericSubscriptionBuilder {
1725            ctx: self.ctx,
1726            topic: self.topic,
1727            type_name,
1728            type_hash,
1729            qos: self.qos,
1730            sched: None,
1731        }
1732    }
1733}
1734
1735/// Typed subscription builder (`.typed::<M>()`). `RX` is the staging-buffer
1736/// size, set via `.rx_buffer::<N>()` (defaults to `DEFAULT_RX_BUF_SIZE`).
1737pub struct TypedSubscriptionBuilder<
1738    'c,
1739    'e,
1740    't,
1741    M,
1742    const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1743> {
1744    ctx: &'c mut NodeCtx<'e>,
1745    topic: &'t str,
1746    qos: QosSettings,
1747    sched: Option<super::sched_context::SchedContextId>,
1748    _phantom: PhantomData<M>,
1749}
1750
1751impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1752    TypedSubscriptionBuilder<'c, 'e, 't, M, RX>
1753{
1754    pub fn qos(mut self, qos: QosSettings) -> Self {
1755        self.qos = qos;
1756        self
1757    }
1758
1759    /// Bind the subscription's callback to a scheduling context.
1760    pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1761        self.sched = Some(sc);
1762        self
1763    }
1764
1765    /// Set the staging-buffer size (const-generic).
1766    pub fn rx_buffer<const N: usize>(self) -> TypedSubscriptionBuilder<'c, 'e, 't, M, N> {
1767        TypedSubscriptionBuilder {
1768            ctx: self.ctx,
1769            topic: self.topic,
1770            qos: self.qos,
1771            sched: self.sched,
1772            _phantom: PhantomData,
1773        }
1774    }
1775
1776    /// Surface per-message [`MessageInfo`](nros_core::MessageInfo) (seq,
1777    /// publisher GID, timestamps) to the callback — `FnMut(&M, Option<&MessageInfo>)`,
1778    /// the rclrs shape. Distinct from the generic builder's `.message_info()`
1779    /// (which yields a `RawMessageInfo` with the wire attachment).
1780    pub fn message_info(self) -> TypedSubInfoBuilder<'c, 'e, 't, M, RX> {
1781        TypedSubInfoBuilder {
1782            ctx: self.ctx,
1783            topic: self.topic,
1784            qos: self.qos,
1785            sched: self.sched,
1786            _phantom: PhantomData,
1787        }
1788    }
1789
1790    /// Surface E2E-safety validation (CRC + sequence gap/duplicate) to the
1791    /// callback — `FnMut(&M, &IntegrityStatus)`.
1792    #[cfg(feature = "safety-e2e")]
1793    pub fn safety(self) -> TypedSubSafetyBuilder<'c, 'e, 't, M, RX> {
1794        TypedSubSafetyBuilder {
1795            ctx: self.ctx,
1796            topic: self.topic,
1797            qos: self.qos,
1798            sched: self.sched,
1799            _phantom: PhantomData,
1800        }
1801    }
1802
1803    pub fn build<F: FnMut(&M) + 'static>(
1804        self,
1805        callback: F,
1806    ) -> Result<super::types::HandleId, NodeError> {
1807        let handle = self
1808            .ctx
1809            .executor
1810            .register_subscription_buffered_on::<M, F, RX>(
1811                self.ctx.node_id,
1812                self.topic,
1813                self.qos,
1814                callback,
1815            )?;
1816        if let Some(sc) = self.sched {
1817            self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1818        }
1819        Ok(handle)
1820    }
1821}
1822
1823/// Typed subscription builder with `MessageInfo` (`.typed::<M>().message_info()`).
1824/// Callback is `FnMut(&M, Option<&MessageInfo>)`.
1825pub struct TypedSubInfoBuilder<
1826    'c,
1827    'e,
1828    't,
1829    M,
1830    const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1831> {
1832    ctx: &'c mut NodeCtx<'e>,
1833    topic: &'t str,
1834    qos: QosSettings,
1835    sched: Option<super::sched_context::SchedContextId>,
1836    _phantom: PhantomData<M>,
1837}
1838
1839impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1840    TypedSubInfoBuilder<'c, 'e, 't, M, RX>
1841{
1842    pub fn qos(mut self, qos: QosSettings) -> Self {
1843        self.qos = qos;
1844        self
1845    }
1846
1847    pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1848        self.sched = Some(sc);
1849        self
1850    }
1851
1852    pub fn rx_buffer<const N: usize>(self) -> TypedSubInfoBuilder<'c, 'e, 't, M, N> {
1853        TypedSubInfoBuilder {
1854            ctx: self.ctx,
1855            topic: self.topic,
1856            qos: self.qos,
1857            sched: self.sched,
1858            _phantom: PhantomData,
1859        }
1860    }
1861
1862    pub fn build<F: FnMut(&M, Option<&nros_core::MessageInfo>) + 'static>(
1863        self,
1864        callback: F,
1865    ) -> Result<super::types::HandleId, NodeError> {
1866        let handle = self
1867            .ctx
1868            .executor
1869            .register_subscription_with_info_sized_inner::<M, F, RX>(
1870                Some(self.ctx.node_id),
1871                self.topic,
1872                self.qos,
1873                callback,
1874            )?;
1875        if let Some(sc) = self.sched {
1876            self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1877        }
1878        Ok(handle)
1879    }
1880}
1881
1882/// Typed subscription builder with E2E-safety validation
1883/// (`.typed::<M>().safety()`). Callback is `FnMut(&M, &IntegrityStatus)`.
1884#[cfg(feature = "safety-e2e")]
1885pub struct TypedSubSafetyBuilder<
1886    'c,
1887    'e,
1888    't,
1889    M,
1890    const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1891> {
1892    ctx: &'c mut NodeCtx<'e>,
1893    topic: &'t str,
1894    qos: QosSettings,
1895    sched: Option<super::sched_context::SchedContextId>,
1896    _phantom: PhantomData<M>,
1897}
1898
1899#[cfg(feature = "safety-e2e")]
1900impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1901    TypedSubSafetyBuilder<'c, 'e, 't, M, RX>
1902{
1903    pub fn qos(mut self, qos: QosSettings) -> Self {
1904        self.qos = qos;
1905        self
1906    }
1907
1908    pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1909        self.sched = Some(sc);
1910        self
1911    }
1912
1913    pub fn rx_buffer<const N: usize>(self) -> TypedSubSafetyBuilder<'c, 'e, 't, M, N> {
1914        TypedSubSafetyBuilder {
1915            ctx: self.ctx,
1916            topic: self.topic,
1917            qos: self.qos,
1918            sched: self.sched,
1919            _phantom: PhantomData,
1920        }
1921    }
1922
1923    pub fn build<F: FnMut(&M, &nros_rmw::IntegrityStatus) + 'static>(
1924        self,
1925        callback: F,
1926    ) -> Result<super::types::HandleId, NodeError> {
1927        let handle = self
1928            .ctx
1929            .executor
1930            .register_subscription_with_safety_sized_inner::<M, F, RX>(
1931                Some(self.ctx.node_id),
1932                self.topic,
1933                self.qos,
1934                callback,
1935            )?;
1936        if let Some(sc) = self.sched {
1937            self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1938        }
1939        Ok(handle)
1940    }
1941}
1942
1943/// Generic (type-erased) subscription builder (`.generic(type, hash)`).
1944pub struct GenericSubscriptionBuilder<
1945    'c,
1946    'e,
1947    't,
1948    const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1949> {
1950    ctx: &'c mut NodeCtx<'e>,
1951    topic: &'t str,
1952    type_name: &'t str,
1953    type_hash: &'t str,
1954    qos: QosSettings,
1955    sched: Option<super::sched_context::SchedContextId>,
1956}
1957
1958impl<'c, 'e, 't, const RX: usize> GenericSubscriptionBuilder<'c, 'e, 't, RX> {
1959    pub fn qos(mut self, qos: QosSettings) -> Self {
1960        self.qos = qos;
1961        self
1962    }
1963
1964    pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1965        self.sched = Some(sc);
1966        self
1967    }
1968
1969    pub fn rx_buffer<const N: usize>(self) -> GenericSubscriptionBuilder<'c, 'e, 't, N> {
1970        GenericSubscriptionBuilder {
1971            ctx: self.ctx,
1972            topic: self.topic,
1973            type_name: self.type_name,
1974            type_hash: self.type_hash,
1975            qos: self.qos,
1976            sched: self.sched,
1977        }
1978    }
1979
1980    /// Surface the sample's wire attachment + metadata to the callback
1981    /// (`FnMut(&[u8], &RawMessageInfo)`). The cross-RMW bridge reads the
1982    /// `bridge_origin` tag from `info.attachment()` for echo suppression.
1983    pub fn message_info(self) -> GenericSubInfoBuilder<'c, 'e, 't, RX> {
1984        GenericSubInfoBuilder {
1985            ctx: self.ctx,
1986            topic: self.topic,
1987            type_name: self.type_name,
1988            type_hash: self.type_hash,
1989            qos: self.qos,
1990            sched: self.sched,
1991        }
1992    }
1993
1994    pub fn build<F: FnMut(&[u8]) + 'static>(
1995        self,
1996        callback: F,
1997    ) -> Result<super::types::HandleId, NodeError> {
1998        let handle = self
1999            .ctx
2000            .executor
2001            .register_subscription_buffered_raw_on::<F, RX>(
2002                self.ctx.node_id,
2003                self.topic,
2004                self.type_name,
2005                self.type_hash,
2006                self.qos,
2007                callback,
2008            )?;
2009        if let Some(sc) = self.sched {
2010            self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
2011        }
2012        Ok(handle)
2013    }
2014}
2015
2016/// Generic subscription builder with `MessageInfo` surfaced
2017/// (`.message_info()`). Callback is `FnMut(&[u8], &RawMessageInfo)`.
2018pub struct GenericSubInfoBuilder<
2019    'c,
2020    'e,
2021    't,
2022    const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2023> {
2024    ctx: &'c mut NodeCtx<'e>,
2025    topic: &'t str,
2026    type_name: &'t str,
2027    type_hash: &'t str,
2028    qos: QosSettings,
2029    sched: Option<super::sched_context::SchedContextId>,
2030}
2031
2032impl<'c, 'e, 't, const RX: usize> GenericSubInfoBuilder<'c, 'e, 't, RX> {
2033    pub fn qos(mut self, qos: QosSettings) -> Self {
2034        self.qos = qos;
2035        self
2036    }
2037
2038    pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
2039        self.sched = Some(sc);
2040        self
2041    }
2042
2043    pub fn rx_buffer<const N: usize>(self) -> GenericSubInfoBuilder<'c, 'e, 't, N> {
2044        GenericSubInfoBuilder {
2045            ctx: self.ctx,
2046            topic: self.topic,
2047            type_name: self.type_name,
2048            type_hash: self.type_hash,
2049            qos: self.qos,
2050            sched: self.sched,
2051        }
2052    }
2053
2054    pub fn build<F: FnMut(&[u8], &nros_core::RawMessageInfo) + 'static>(
2055        self,
2056        callback: F,
2057    ) -> Result<super::types::HandleId, NodeError> {
2058        let handle = self
2059            .ctx
2060            .executor
2061            .register_subscription_buffered_raw_info_on::<F, RX>(
2062                self.ctx.node_id,
2063                self.topic,
2064                self.type_name,
2065                self.type_hash,
2066                self.qos,
2067                callback,
2068            )?;
2069        if let Some(sc) = self.sched {
2070            self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
2071        }
2072        Ok(handle)
2073    }
2074}
2075
2076// `not(feature = "rmw-cffi")` — these tests use the `mock` backend
2077// (`crate::mock`, itself `cfg(all(test, not(rmw-cffi)))`); a workspace test
2078// build that unifies `rmw-cffi` on swaps `ConcreteSession` to the cffi session
2079// and drops `mock`, so the module must drop with it (matches the
2080// `mock_integration` gate in lifecycle_services.rs).
2081#[cfg(all(test, not(feature = "rmw-cffi")))]
2082mod builder_tests {
2083    use super::*;
2084    use crate::{executor::Executor, mock::MockSession};
2085    use nros_core::{CdrReader, CdrWriter, DeserError, Deserialize, SerError, Serialize};
2086
2087    struct TestMsg;
2088    impl RosMessage for TestMsg {
2089        const TYPE_NAME: &'static str = "test/msg/TestMsg";
2090        const TYPE_HASH: &'static str = "test_hash";
2091    }
2092    impl Serialize for TestMsg {
2093        fn serialize(&self, _w: &mut CdrWriter) -> Result<(), SerError> {
2094            Ok(())
2095        }
2096    }
2097    impl Deserialize for TestMsg {
2098        fn deserialize(_r: &mut CdrReader) -> Result<Self, DeserError> {
2099            Ok(Self)
2100        }
2101    }
2102    // Phase 212.K.7.6.b — minimal single-field `Message` impl so
2103    // `TypedPublisherBuilder::build` resolves under the cyclonedds-tightened
2104    // bound AND the runtime register call succeeds. `DescriptorBuilder`
2105    // rejects empty `FIELDS` with `BuildError::EmptySchema`; pretend
2106    // there's one byte so the bridge stub returns a non-NULL pointer.
2107    #[cfg(rmw_cyclonedds_present)]
2108    impl nros_serdes::schema::Message for TestMsg {
2109        const TYPE_NAME: &'static str = "test/msg/TestMsg";
2110        const FIELDS: &'static [nros_serdes::schema::Field] = &[nros_serdes::schema::Field {
2111            name: "data",
2112            ty: nros_serdes::schema::FieldType::Uint8,
2113            offset: 0,
2114        }];
2115    }
2116
2117    fn s(v: &str) -> heapless::String<64> {
2118        heapless::String::try_from(v).unwrap()
2119    }
2120
2121    #[test]
2122    fn publisher_builder_typed_and_generic() {
2123        let mut session = MockSession::new();
2124        let mut node = NodeHandle::new(s("n"), s("/"), &mut session, 0);
2125
2126        // typed: node.publisher(t).typed::<M>().qos(..).build()
2127        let _typed = node
2128            .publisher("/chatter")
2129            .typed::<TestMsg>()
2130            .qos(QosSettings::default().keep_last(5))
2131            .build()
2132            .expect("typed publisher builds");
2133
2134        // generic: node.publisher(t).qos(..).generic(type, hash).build()
2135        let _generic = node
2136            .publisher("/chatter")
2137            .qos(QosSettings::default())
2138            .generic("std_msgs/msg/Int32", "hash")
2139            .build()
2140            .expect("generic publisher builds");
2141    }
2142
2143    #[test]
2144    fn subscription_builder_and_convenient() {
2145        let mut exec: Executor = Executor::from_session(MockSession::new());
2146        let id = exec.node_builder("n").build().expect("node");
2147
2148        // builder: typed
2149        let _h = exec
2150            .node_mut(id)
2151            .subscription("/chatter")
2152            .typed::<TestMsg>()
2153            .qos(QosSettings::default().keep_last(5))
2154            .build(|_m: &TestMsg| {})
2155            .expect("typed subscription builds");
2156
2157        // builder: generic (raw bytes)
2158        let _g = exec
2159            .node_mut(id)
2160            .subscription("/raw")
2161            .generic("std_msgs/msg/Int32", "hash")
2162            .build(|_b: &[u8]| {})
2163            .expect("generic subscription builds");
2164
2165        // builder: sized + sched-context (slice 3 knobs)
2166        let sc = exec.default_sched_context_id();
2167        let _s = exec
2168            .node_mut(id)
2169            .subscription("/sized")
2170            .typed::<TestMsg>()
2171            .rx_buffer::<64>()
2172            .sched_context(sc)
2173            .build(|_m: &TestMsg| {})
2174            .expect("sized + sched subscription builds");
2175
2176        // convenient (fork tier) — one node-ctx at a time, re-acquired
2177        let _c = exec
2178            .node_mut(id)
2179            .create_subscription::<TestMsg, _>("/conv", |_m: &TestMsg| {})
2180            .expect("convenient typed subscription builds");
2181    }
2182
2183    #[test]
2184    fn generic_message_info_builder() {
2185        // slice 3b — the bridge echo path: generic sub whose callback
2186        // receives the wire attachment via RawMessageInfo.
2187        let mut exec: Executor = Executor::from_session(MockSession::new());
2188        let id = exec.node_builder("n").build().expect("node");
2189
2190        let _i = exec
2191            .node_mut(id)
2192            .subscription("/info")
2193            .generic("std_msgs/msg/Int32", "hash")
2194            .message_info()
2195            .rx_buffer::<256>()
2196            .build(|_payload: &[u8], info: &nros_core::RawMessageInfo| {
2197                let _ = info.attachment();
2198            })
2199            .expect("generic + message_info subscription builds");
2200    }
2201
2202    #[test]
2203    fn typed_message_info_builder() {
2204        // M2.a — typed .message_info() (rclrs shape FnMut(&M, Option<&MessageInfo>)),
2205        // replacing register_subscription_with_info.
2206        let mut exec: Executor = Executor::from_session(MockSession::new());
2207        let id = exec.node_builder("n").build().expect("node");
2208        let _h = exec
2209            .node_mut(id)
2210            .subscription("/chatter")
2211            .typed::<TestMsg>()
2212            .qos(QosSettings::default().keep_last(5))
2213            .message_info()
2214            .build(|_m: &TestMsg, _info: Option<&nros_core::MessageInfo>| {})
2215            .expect("typed + message_info subscription builds");
2216    }
2217
2218    #[cfg(feature = "safety-e2e")]
2219    #[test]
2220    fn typed_safety_builder() {
2221        // M2.a — typed .safety(), replacing register_subscription_with_safety.
2222        let mut exec: Executor = Executor::from_session(MockSession::new());
2223        let id = exec.node_builder("n").build().expect("node");
2224        let _h = exec
2225            .node_mut(id)
2226            .subscription("/chatter")
2227            .typed::<TestMsg>()
2228            .safety()
2229            .build(|_m: &TestMsg, _status: &nros_rmw::IntegrityStatus| {})
2230            .expect("typed + safety subscription builds");
2231    }
2232
2233    #[test]
2234    fn generator_emitted_chain_compiles() {
2235        // Locks the exact builder chain the orchestration generator emits
2236        // for a subscriber (replaces register_subscription_raw_with_qos_sized_on).
2237        let mut exec: Executor = Executor::from_session(MockSession::new());
2238        let id = exec.node_builder("n").build().expect("node");
2239        let _h = exec
2240            .node_mut(id)
2241            .subscription("/topic")
2242            .generic("std_msgs/msg/Int32", "hash")
2243            .qos(QosSettings::default().keep_last(1))
2244            .rx_buffer::<1024>()
2245            .build(|_data: &[u8]| {})
2246            .expect("generator-shape subscription builds");
2247    }
2248
2249    #[test]
2250    fn nodectx_publisher_and_bridge_shape() {
2251        // NodeCtx publisher symmetry + the bridge two-ctx borrow pattern:
2252        // build the dest publisher on one NodeCtx (dropped), then register
2253        // the source subscription on another — the owned publisher outlives.
2254        let mut exec: Executor = Executor::from_session(MockSession::new());
2255        let id = exec.node_builder("n").build().expect("node");
2256
2257        // convenient + builder publisher on NodeCtx
2258        let _p = exec
2259            .node_mut(id)
2260            .create_publisher::<TestMsg>("/p")
2261            .expect("ctx convenient publisher");
2262        let dest_pub = exec
2263            .node_mut(id)
2264            .publisher("/fwd")
2265            .generic("std_msgs/msg/Int32", "hash")
2266            .build()
2267            .expect("ctx generic publisher builds"); // NodeCtx dropped here
2268
2269        // re-borrow exec for the source sub; closure owns dest_pub
2270        let _s = exec
2271            .node_mut(id)
2272            .subscription("/src")
2273            .generic("std_msgs/msg/Int32", "hash")
2274            .message_info()
2275            .build(move |payload: &[u8], _info: &nros_core::RawMessageInfo| {
2276                let _ = dest_pub.publish_raw(payload);
2277            })
2278            .expect("bridge-shape source subscription builds");
2279    }
2280}