Skip to main content

nros_rmw/
traits.rs

1//! Transport abstraction traits.
2//!
3//! Defines the backend-agnostic interface that transport implementations
4//! (zenoh-pico, XRCE-DDS) must satisfy. The core trait hierarchy is:
5//!
6//! - [`Session`] — connection lifecycle and handle creation
7//! - [`Publisher`] / [`Subscriber`] — pub/sub data transport
8//! - [`ServiceServerTrait`] / [`ServiceClientTrait`] — request/reply
9//! - [`Rmw`] — top-level factory that creates sessions
10
11use nros_core::{Deserialize, RosMessage, RosService, Serialize};
12
13/// Topic information for pub/sub
14#[derive(Debug, Clone)]
15pub struct TopicInfo<'a> {
16    /// Topic name (e.g., "/chatter")
17    pub name: &'a str,
18    /// ROS type name (e.g., "std_msgs::msg::dds_::String_")
19    pub type_name: &'a str,
20    /// Type hash for compatibility checking
21    pub type_hash: &'a str,
22    /// Domain ID (default: 0)
23    pub domain_id: u32,
24    /// Node name for liveliness token generation.
25    /// `None` means no node association — no liveliness token will be declared.
26    pub node_name: Option<&'a str>,
27    /// Node namespace for liveliness token generation (default: "/").
28    /// In ROS 2, "/" is the root namespace and the standard default.
29    pub namespace: &'a str,
30    /// Phase 231 (RFC-0038) — receive-buffer size hint, bytes. The executor sets
31    /// this from the subscription's `RX_BUF`; a backend may use it to route the
32    /// subscription to a size-class receive buffer (zenoh-pico: small vs large).
33    /// `0` = unset (backend picks its default). Ignored by backends that don't
34    /// size-class their receive storage.
35    pub rx_buffer_hint: usize,
36}
37
38impl<'a> TopicInfo<'a> {
39    /// Create new topic info
40    pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
41        Self {
42            name,
43            type_name,
44            type_hash,
45            domain_id: 0,
46            node_name: None,
47            namespace: "/",
48            rx_buffer_hint: 0,
49        }
50    }
51
52    /// Create topic info with custom domain ID
53    pub const fn with_domain(mut self, domain_id: u32) -> Self {
54        self.domain_id = domain_id;
55        self
56    }
57
58    /// Phase 231 (RFC-0038) — set the receive-buffer size hint (bytes) used by
59    /// size-classing backends (zenoh-pico) to route the subscription's receive
60    /// buffer to the small or large class.
61    pub const fn with_rx_buffer_hint(mut self, hint: usize) -> Self {
62        self.rx_buffer_hint = hint;
63        self
64    }
65
66    /// Set the node name for liveliness token generation
67    pub const fn with_node_name(mut self, node_name: &'a str) -> Self {
68        self.node_name = Some(node_name);
69        self
70    }
71
72    /// Set the node namespace for liveliness token generation
73    pub const fn with_namespace(mut self, namespace: &'a str) -> Self {
74        self.namespace = namespace;
75        self
76    }
77}
78
79/// Service information for service client/server
80#[derive(Debug, Clone)]
81pub struct ServiceInfo<'a> {
82    /// Service name (e.g., "/add_two_ints")
83    pub name: &'a str,
84    /// ROS service type name (e.g., "example_interfaces::srv::dds_::AddTwoInts_")
85    pub type_name: &'a str,
86    /// Type hash for compatibility checking
87    pub type_hash: &'a str,
88    /// Domain ID (default: 0)
89    pub domain_id: u32,
90    /// Node name for liveliness token generation.
91    /// `None` means no node association — no liveliness token will be declared.
92    pub node_name: Option<&'a str>,
93    /// Node namespace for liveliness token generation (default: "/").
94    /// In ROS 2, "/" is the root namespace and the standard default.
95    pub namespace: &'a str,
96}
97
98/// Action information for action client/server
99///
100/// Actions in ROS 2 use 5 communication channels:
101/// - `send_goal` service: `<action_name>/_action/send_goal`
102/// - `cancel_goal` service: `<action_name>/_action/cancel_goal`
103/// - `get_result` service: `<action_name>/_action/get_result`
104/// - `feedback` topic: `<action_name>/_action/feedback`
105/// - `status` topic: `<action_name>/_action/status`
106#[derive(Debug, Clone)]
107pub struct ActionInfo<'a> {
108    /// Action name (e.g., "/fibonacci")
109    pub name: &'a str,
110    /// ROS action type name (e.g., "example_interfaces::action::dds_::Fibonacci_")
111    pub type_name: &'a str,
112    /// Type hash for compatibility checking
113    pub type_hash: &'a str,
114    /// Domain ID (default: 0)
115    pub domain_id: u32,
116}
117
118impl<'a> ActionInfo<'a> {
119    /// Create new action info
120    pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
121        Self {
122            name,
123            type_name,
124            type_hash,
125            domain_id: 0,
126        }
127    }
128
129    /// Create action info with custom domain ID
130    pub const fn with_domain(mut self, domain_id: u32) -> Self {
131        self.domain_id = domain_id;
132        self
133    }
134
135    /// Generate the send_goal service name
136    /// Returns: `<action>/_action/send_goal`
137    pub fn send_goal_key<const N: usize>(&self) -> heapless::String<N> {
138        self.sub_name::<N>("send_goal")
139    }
140
141    /// Generate the cancel_goal service name
142    /// Returns: `<action>/_action/cancel_goal`
143    pub fn cancel_goal_key<const N: usize>(&self) -> heapless::String<N> {
144        self.sub_name::<N>("cancel_goal")
145    }
146
147    /// Generate the get_result service name
148    /// Returns: `<action>/_action/get_result`
149    pub fn get_result_key<const N: usize>(&self) -> heapless::String<N> {
150        self.sub_name::<N>("get_result")
151    }
152
153    /// Generate the feedback topic name
154    /// Returns: `<action>/_action/feedback`
155    pub fn feedback_key<const N: usize>(&self) -> heapless::String<N> {
156        self.sub_name::<N>("feedback")
157    }
158
159    /// Generate the status topic name
160    /// Returns: `<action>/_action/status`
161    pub fn status_key<const N: usize>(&self) -> heapless::String<N> {
162        self.sub_name::<N>("status")
163    }
164
165    /// Generate a sub-entity name for an action component
166    /// Returns: `<action>/_action/<suffix>` (e.g., `fibonacci/_action/send_goal`)
167    ///
168    /// The caller is responsible for constructing the full key expression
169    /// by wrapping this name in a `ServiceInfo` or `TopicInfo` with the
170    /// correct sub-service/sub-topic type name.
171    fn sub_name<const N: usize>(&self, suffix: &str) -> heapless::String<N> {
172        let mut name = heapless::String::new();
173        let action_stripped = self.name.trim_matches('/');
174        let _ = core::fmt::write(
175            &mut name,
176            format_args!("/{}/_action/{}", action_stripped, suffix),
177        );
178        name
179    }
180}
181
182impl<'a> ServiceInfo<'a> {
183    /// Create new service info
184    pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
185        Self {
186            name,
187            type_name,
188            type_hash,
189            domain_id: 0,
190            node_name: None,
191            namespace: "/",
192        }
193    }
194
195    /// Create service info with custom domain ID
196    pub const fn with_domain(mut self, domain_id: u32) -> Self {
197        self.domain_id = domain_id;
198        self
199    }
200
201    /// Set the node name for liveliness token generation
202    pub const fn with_node_name(mut self, node_name: &'a str) -> Self {
203        self.node_name = Some(node_name);
204        self
205    }
206
207    /// Set the node namespace for liveliness token generation
208    pub const fn with_namespace(mut self, namespace: &'a str) -> Self {
209        self.namespace = namespace;
210        self
211    }
212}
213
214/// Transport error types.
215///
216/// No longer `Copy` — the `Backend` / `BackendDynamic` variants carry a
217/// string diagnostic, which can't be `Copy`. Rust callers that used to
218/// copy a `TransportError` value repeatedly now need `.clone()` or
219/// `ref` in match arms. C/C++ callers are unaffected — both map
220/// `TransportError` to integer codes (`nros_ret_t` / `ErrorCode`)
221/// before crossing the FFI boundary.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TransportError {
224    /// Failed to connect to transport
225    ConnectionFailed,
226    /// Connection was closed
227    Disconnected,
228    /// Failed to create publisher
229    PublisherCreationFailed,
230    /// Failed to create subscriber
231    SubscriberCreationFailed,
232    /// Failed to create service server
233    ServiceServerCreationFailed,
234    /// Failed to create service client
235    ServiceClientCreationFailed,
236    /// Failed to publish message
237    PublishFailed,
238    /// Failed to send service request
239    ServiceRequestFailed,
240    /// Failed to send service reply
241    ServiceReplyFailed,
242    /// Serialization error
243    SerializationError,
244    /// Deserialization error
245    DeserializationError,
246    /// Buffer too small
247    BufferTooSmall,
248    /// Incoming message exceeded the static buffer capacity
249    MessageTooLarge,
250    /// Timeout waiting for message
251    Timeout,
252    /// Invalid configuration
253    InvalidConfig,
254    /// Resource (slot, buffer, queue) momentarily unavailable. Retry.
255    /// Phase 99: returned by `try_loan` when arena slots are full and
256    /// by `try_borrow` when no message is ready (alternative to
257    /// `Ok(None)` for backends that prefer the error variant).
258    WouldBlock,
259    /// Requested allocation exceeds backend capacity. Phase 99:
260    /// `try_loan(len)` returns this when `len` > arena slot size.
261    TooLarge,
262    /// Failed to start background tasks
263    TaskStartFailed,
264    /// Failed to poll for incoming messages
265    PollFailed,
266    /// Failed to send keepalive
267    KeepaliveFailed,
268    /// Failed to send join message
269    JoinFailed,
270    /// Caller supplied a NULL pointer, an out-of-range value, or an
271    /// inconsistent argument combination. Phase 102.1.
272    InvalidArgument,
273    /// The backend does not implement this operation. Optional
274    /// callbacks return this; the runtime then falls back to a
275    /// default path. Phase 102.1.
276    Unsupported,
277    /// Memory allocation failed. Returned by backends on `std` /
278    /// `alloc`-equipped platforms when heap allocation fails.
279    /// Bare-metal backends generally do not produce this — they
280    /// preallocate at session-open time. Phase 102.1.
281    BadAlloc,
282    /// Publisher and subscriber QoS profiles do not match in a way
283    /// the backend cannot reconcile. Phase 102.1.
284    IncompatibleQos,
285    /// Topic, service, or action name failed validation. Phase 102.1.
286    TopicNameInvalid,
287    /// A request referenced a node that does not exist in this
288    /// session. Phase 102.1.
289    NodeNameNonExistent,
290    /// The backend does not support loaned messages on this entity,
291    /// or the loan slot is currently in use. Phase 102.1.
292    LoanNotSupported,
293    /// No data was available on a non-blocking receive. Distinct
294    /// from `Timeout`: fires immediately, not after a bounded wait.
295    /// Phase 102.1.
296    NoData,
297    /// Phase 115.A.2 — caller passed a versioned vtable struct
298    /// (e.g. `NrosTransportOps`) with an `abi_version` the runtime
299    /// doesn't know. Maps to `NROS_RMW_RET_INCOMPATIBLE_ABI` at
300    /// the C boundary.
301    IncompatibleAbi,
302    /// Backend-specific error with a `'static` diagnostic string.
303    ///
304    /// Useful for zenoh-pico / XRCE-DDS return codes that map to a
305    /// fixed set of known messages. `no_std`-compatible.
306    Backend(&'static str),
307    /// Backend-specific error with an owned diagnostic string.
308    ///
309    /// Available only with the `alloc` feature. Use this when the
310    /// diagnostic is formatted at runtime (e.g. from a C error code
311    /// plus a socket address).
312    #[cfg(feature = "alloc")]
313    BackendDynamic(alloc::string::String),
314}
315
316/// QoS history policy
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
318pub enum QosHistoryPolicy {
319    /// Keep last N messages (where N is defined in QosSettings)
320    #[default]
321    KeepLast,
322    /// Keep all messages (up to resource limits)
323    KeepAll,
324}
325
326/// QoS reliability policy
327#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
328pub enum QosReliabilityPolicy {
329    /// Reliable delivery (retransmit if needed).
330    ///
331    /// Default — matches ROS 2 `rmw_qos_profile_default` and the
332    /// `QosSettings::default()` / `QOS_PROFILE_DEFAULT` aggregates.
333    #[default]
334    Reliable,
335    /// Best-effort delivery (no retransmits)
336    BestEffort,
337}
338
339/// QoS durability policy
340#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
341pub enum QosDurabilityPolicy {
342    /// Messages are discarded when subscriber disconnects
343    #[default]
344    Volatile,
345    /// Messages are persisted for late-joining subscribers
346    TransientLocal,
347}
348
349/// QoS liveliness policy. Matches DDS `LIVELINESS` semantics.
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
351#[repr(u8)]
352pub enum QosLivelinessPolicy {
353    /// No liveliness assertion or tracking. Default for entities
354    /// that don't care about liveliness.
355    #[default]
356    None = 0,
357    /// Backend's keepalive task asserts liveliness automatically.
358    Automatic = 1,
359    /// Application calls `assert_liveliness()` per topic explicitly.
360    ManualByTopic = 2,
361    /// Application calls `assert_liveliness()` at the node level.
362    ManualByNode = 3,
363}
364
365/// Phase 211.H — which side of a topic a [`QosOverride`] targets.
366/// Mirrors the `<role>` segment of a ROS 2
367/// `qos_overrides.<topic>.<role>.<policy>` launch parameter.
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum QosOverrideRole {
370    /// `qos_overrides.<topic>.publisher.*`
371    Publisher,
372    /// `qos_overrides.<topic>.subscription.*`
373    Subscription,
374}
375
376/// Phase 211.H — a single policy value a [`QosOverride`] sets. A typed enum
377/// (not a string) so the codegen that bakes these from the plan catches an
378/// unknown policy / mistyped value at generation time rather than silently
379/// no-op-ing at runtime.
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381pub enum QosOverrideValue {
382    /// `.reliability` → Reliable / BestEffort.
383    Reliability(QosReliabilityPolicy),
384    /// `.durability` → Volatile / TransientLocal.
385    Durability(QosDurabilityPolicy),
386    /// `.history` → KeepLast / KeepAll.
387    History(QosHistoryPolicy),
388    /// `.depth` → KeepLast depth.
389    Depth(u32),
390}
391
392/// Phase 211.H — one per-topic QoS override, lowered from a ROS 2
393/// `qos_overrides.<topic>.<role>.<policy>` launch parameter by the planner and
394/// baked into a `&'static [QosOverride]` table by the entry codegen. The node
395/// folds the matching entries into the entity's [`QosSettings`] at
396/// `create_publisher` / `create_subscription` time (setup-time, single
397/// linear scan, no alloc), *before* the backend-compat `validate_against` —
398/// so an override the active RMW can't honour still errors loudly, never a
399/// silent downgrade.
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401pub struct QosOverride {
402    /// The resolved (remapped) topic name the override targets, e.g.
403    /// `"/chatter"`. Matched exactly against the entity's topic.
404    pub topic: &'static str,
405    /// Publisher or subscription side.
406    pub role: QosOverrideRole,
407    /// The policy + value to set.
408    pub value: QosOverrideValue,
409}
410
411/// Full DDS-shaped QoS profile. Matches the field set of upstream
412/// `rmw_qos_profile_t`.
413///
414/// Backends advertise per-policy support via
415/// [`Session::supported_qos_policies`]; entities created with a
416/// profile the active backend can't honour return
417/// [`TransportError::IncompatibleQos`] synchronously at create time
418/// — no silent downgrade.
419///
420/// Zero-valued time-window fields ("off") mean infinite — the policy
421/// is effectively disabled for the entity.
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub struct QosSettings {
424    /// History policy
425    pub history: QosHistoryPolicy,
426    /// Reliability policy
427    pub reliability: QosReliabilityPolicy,
428    /// Durability policy
429    pub durability: QosDurabilityPolicy,
430    /// Liveliness policy
431    pub liveliness_kind: QosLivelinessPolicy,
432    /// History depth (only used if history is KeepLast)
433    pub depth: u32,
434    /// Subscriber max-inter-arrival / publisher offered-rate, ms.
435    /// `0` = infinite (no deadline check).
436    pub deadline_ms: u32,
437    /// Sample expiry, ms. Subscribers filter samples older than this.
438    /// `0` = infinite (no expiry).
439    pub lifespan_ms: u32,
440    /// Liveliness lease, ms. `0` = infinite.
441    pub liveliness_lease_ms: u32,
442    /// If `true`, topic-name encoding skips the `/rt/` ROS prefix.
443    pub avoid_ros_namespace_conventions: bool,
444}
445
446impl Default for QosSettings {
447    fn default() -> Self {
448        Self::QOS_PROFILE_DEFAULT
449    }
450}
451
452impl QosSettings {
453    /// Phase 211.H — fold the plan's `qos_overrides` matching `topic` + `role`
454    /// into this profile, returning the overridden profile. Setup-time only
455    /// (called from `create_publisher`/`create_subscription`): a single linear
456    /// scan over the baked `&'static` table, no alloc, RT-safe. Later entries
457    /// win on a duplicate `(topic, role, policy)` (last-write), matching the
458    /// planner's sorted, de-conflicted emit. Non-matching entries are ignored,
459    /// so passing the whole node table to every entity is cheap + correct.
460    #[must_use]
461    pub fn apply_overrides(
462        mut self,
463        topic: &str,
464        role: QosOverrideRole,
465        overrides: &[QosOverride],
466    ) -> Self {
467        for ovr in overrides {
468            if ovr.topic == topic && ovr.role == role {
469                match ovr.value {
470                    QosOverrideValue::Reliability(r) => self.reliability = r,
471                    QosOverrideValue::Durability(d) => self.durability = d,
472                    QosOverrideValue::History(h) => self.history = h,
473                    QosOverrideValue::Depth(d) => self.depth = d,
474                }
475            }
476        }
477        self
478    }
479}
480
481impl QosSettings {
482    /// Internal const builder. Extended-policy fields default to
483    /// "off" (zero) and `liveliness_kind = Automatic` (the upstream
484    /// `rmw_qos_profile_default` choice).
485    const fn build(
486        reliability: QosReliabilityPolicy,
487        durability: QosDurabilityPolicy,
488        history: QosHistoryPolicy,
489        depth: u32,
490    ) -> Self {
491        Self {
492            history,
493            reliability,
494            durability,
495            liveliness_kind: QosLivelinessPolicy::Automatic,
496            depth,
497            deadline_ms: 0,
498            lifespan_ms: 0,
499            liveliness_lease_ms: 0,
500            avoid_ros_namespace_conventions: false,
501        }
502    }
503
504    /// Create new QoS settings with defaults (matches `QOS_PROFILE_DEFAULT`:
505    /// Reliable, Volatile, KeepLast(10)).
506    pub const fn new() -> Self {
507        Self::QOS_PROFILE_DEFAULT
508    }
509
510    /// Best-effort QoS (for real-time)
511    pub const BEST_EFFORT: Self = Self::build(
512        QosReliabilityPolicy::BestEffort,
513        QosDurabilityPolicy::Volatile,
514        QosHistoryPolicy::KeepLast,
515        1,
516    );
517
518    /// Reliable QoS
519    pub const RELIABLE: Self = Self::build(
520        QosReliabilityPolicy::Reliable,
521        QosDurabilityPolicy::Volatile,
522        QosHistoryPolicy::KeepLast,
523        10,
524    );
525
526    /// System default QoS profile (matches rmw_qos_profile_system_default)
527    pub const QOS_PROFILE_SYSTEM_DEFAULT: Self = Self::build(
528        QosReliabilityPolicy::Reliable,
529        QosDurabilityPolicy::Volatile,
530        QosHistoryPolicy::KeepLast,
531        1,
532    );
533
534    /// Default QoS profile (matches rmw_qos_profile_default)
535    pub const QOS_PROFILE_DEFAULT: Self = Self::build(
536        QosReliabilityPolicy::Reliable,
537        QosDurabilityPolicy::Volatile,
538        QosHistoryPolicy::KeepLast,
539        10,
540    );
541
542    /// Sensor data QoS profile (matches rmw_qos_profile_sensor_data)
543    pub const QOS_PROFILE_SENSOR_DATA: Self = Self::build(
544        QosReliabilityPolicy::BestEffort,
545        QosDurabilityPolicy::Volatile,
546        QosHistoryPolicy::KeepLast,
547        5,
548    );
549
550    /// Services default QoS profile (matches rmw_qos_profile_services_default)
551    pub const QOS_PROFILE_SERVICES_DEFAULT: Self = Self::build(
552        QosReliabilityPolicy::Reliable,
553        QosDurabilityPolicy::Volatile,
554        QosHistoryPolicy::KeepLast,
555        10,
556    );
557
558    /// Parameters QoS profile (matches rmw_qos_profile_parameters)
559    pub const QOS_PROFILE_PARAMETERS: Self = Self::build(
560        QosReliabilityPolicy::Reliable,
561        QosDurabilityPolicy::TransientLocal,
562        QosHistoryPolicy::KeepLast,
563        1000,
564    );
565
566    /// Clock QoS profile - same as sensor data but with depth 1
567    pub const QOS_PROFILE_CLOCK: Self = Self::build(
568        QosReliabilityPolicy::BestEffort,
569        QosDurabilityPolicy::Volatile,
570        QosHistoryPolicy::KeepLast,
571        1,
572    );
573
574    /// Parameter events QoS profile (matches rmw_qos_profile_parameter_events)
575    pub const QOS_PROFILE_PARAMETER_EVENTS: Self = Self::build(
576        QosReliabilityPolicy::Reliable,
577        QosDurabilityPolicy::Volatile,
578        QosHistoryPolicy::KeepAll,
579        0, // Not used with KeepAll
580    );
581
582    /// Action status default QoS profile (matches rcl_action_qos_profile_status_default)
583    pub const QOS_PROFILE_ACTION_STATUS_DEFAULT: Self = Self::build(
584        QosReliabilityPolicy::Reliable,
585        QosDurabilityPolicy::TransientLocal,
586        QosHistoryPolicy::KeepLast,
587        1,
588    );
589
590    /// PX4 companion QoS profile (Phase 233 / RFC-0039 Track B). Matches the
591    /// QoS PX4's `uxrce_dds_client` uses on `/fmu/out/*` and `/fmu/in/*` —
592    /// `BEST_EFFORT` + `VOLATILE` + `KEEP_LAST(1)`. A nano-ros node talking to
593    /// the same `MicroXRCEAgent` must use this (a reliable or
594    /// `TRANSIENT_LOCAL` reader will not match PX4's volatile best-effort
595    /// writers). Verified against real PX4 SITL (`nros-px4-sitl-test`):
596    /// `TRANSIENT_LOCAL` durability silently fails to match `/fmu/out/*`.
597    /// Adjust depth via `.keep_last(n)` for higher-rate streams.
598    pub const QOS_PROFILE_PX4: Self = Self::build(
599        QosReliabilityPolicy::BestEffort,
600        QosDurabilityPolicy::Volatile,
601        QosHistoryPolicy::KeepLast,
602        1,
603    );
604
605    // --- Static constructor methods (matching rclrs API) ---
606
607    /// Get the default QoS profile for ordinary topics
608    pub const fn topics_default() -> Self {
609        Self::QOS_PROFILE_DEFAULT
610    }
611
612    /// The PX4 companion QoS profile ([`QOS_PROFILE_PX4`](Self::QOS_PROFILE_PX4))
613    /// — use for `/fmu/out/*` subscriptions and `/fmu/in/*` publications against
614    /// a `MicroXRCEAgent`.
615    pub const fn px4() -> Self {
616        Self::QOS_PROFILE_PX4
617    }
618
619    /// Get the default QoS profile for sensor data topics
620    pub const fn sensor_data_default() -> Self {
621        Self::QOS_PROFILE_SENSOR_DATA
622    }
623
624    /// Get the default QoS profile for services
625    pub const fn services_default() -> Self {
626        Self::QOS_PROFILE_SERVICES_DEFAULT
627    }
628
629    /// Get the default QoS profile for parameter services
630    pub const fn parameters_default() -> Self {
631        Self::QOS_PROFILE_PARAMETERS
632    }
633
634    /// Get the default QoS profile for parameter events
635    pub const fn parameter_events_default() -> Self {
636        Self::QOS_PROFILE_PARAMETER_EVENTS
637    }
638
639    /// Get the system default QoS profile
640    pub const fn system_default() -> Self {
641        Self::QOS_PROFILE_SYSTEM_DEFAULT
642    }
643
644    /// Get the default QoS profile for action status topics
645    pub const fn action_status_default() -> Self {
646        Self::QOS_PROFILE_ACTION_STATUS_DEFAULT
647    }
648
649    /// Get the default QoS profile for clock topics
650    pub const fn clock_default() -> Self {
651        Self::QOS_PROFILE_CLOCK
652    }
653
654    // --- Builder methods ---
655
656    /// Set history to keep last N messages
657    pub const fn keep_last(mut self, depth: u32) -> Self {
658        self.history = QosHistoryPolicy::KeepLast;
659        self.depth = depth;
660        self
661    }
662
663    /// Set history to keep all messages
664    pub const fn keep_all(mut self) -> Self {
665        self.history = QosHistoryPolicy::KeepAll;
666        self
667    }
668
669    /// Set reliability to reliable
670    pub const fn reliable(mut self) -> Self {
671        self.reliability = QosReliabilityPolicy::Reliable;
672        self
673    }
674
675    /// Set reliability to best-effort
676    pub const fn best_effort(mut self) -> Self {
677        self.reliability = QosReliabilityPolicy::BestEffort;
678        self
679    }
680
681    /// Set durability to volatile
682    pub const fn volatile(mut self) -> Self {
683        self.durability = QosDurabilityPolicy::Volatile;
684        self
685    }
686
687    /// Set durability to transient local
688    pub const fn transient_local(mut self) -> Self {
689        self.durability = QosDurabilityPolicy::TransientLocal;
690        self
691    }
692
693    /// Set reliability policy explicitly
694    pub const fn reliability(mut self, policy: QosReliabilityPolicy) -> Self {
695        self.reliability = policy;
696        self
697    }
698
699    /// Set durability policy explicitly
700    pub const fn durability(mut self, policy: QosDurabilityPolicy) -> Self {
701        self.durability = policy;
702        self
703    }
704
705    /// Set history policy explicitly
706    pub const fn history(mut self, policy: QosHistoryPolicy) -> Self {
707        self.history = policy;
708        self
709    }
710
711    /// Set history depth explicitly
712    pub const fn depth(mut self, depth: u32) -> Self {
713        self.depth = depth;
714        self
715    }
716
717    /// Get history depth (for backwards compatibility)
718    pub const fn history_depth(&self) -> u8 {
719        if self.depth > 255 {
720            255
721        } else {
722            self.depth as u8
723        }
724    }
725}
726
727/// Transport session configuration
728#[derive(Debug, Clone)]
729pub struct TransportConfig<'a> {
730    /// Peer locator (e.g., "tcp/192.168.1.1:7447" or "serial//dev/ttyUSB0#baudrate=115200")
731    pub locator: Option<&'a str>,
732    /// Session mode: client, peer, or router
733    pub mode: SessionMode,
734    /// Additional transport properties (key-value pairs)
735    ///
736    /// These are passed through to the underlying transport backend.
737    /// For zenoh-pico, recognized keys include:
738    /// - `"multicast_scouting"` - Enable/disable multicast scouting (`"true"` or `"false"`)
739    /// - `"scouting_timeout_ms"` - Scouting timeout in milliseconds
740    /// - `"multicast_locator"` - Multicast group address
741    /// - `"listen"` - Listen endpoint (e.g., `"tcp/0.0.0.0:0"`)
742    /// - `"add_timestamp"` - Add timestamps to messages (`"true"` or `"false"`)
743    pub properties: &'a [(&'a str, &'a str)],
744}
745
746impl Default for TransportConfig<'_> {
747    fn default() -> Self {
748        Self {
749            locator: None,
750            mode: SessionMode::Client,
751            properties: &[],
752        }
753    }
754}
755
756/// Middleware-agnostic session configuration.
757///
758/// `RmwConfig` provides a uniform interface that any RMW backend can
759/// interpret. Backends map the universal fields to their own connection
760/// parameters and interpret `properties` for anything backend-specific.
761///
762/// # Examples
763///
764/// ```
765/// use nros_rmw::{RmwConfig, SessionMode};
766///
767/// let config = RmwConfig {
768///     locator: "tcp/192.168.1.1:7447",
769///     mode: SessionMode::Client,
770///     domain_id: 0,
771///     node_name: "talker",
772///     namespace: "",
773///     properties: &[],
774/// };
775/// ```
776#[derive(Debug, Clone, Copy)]
777pub struct RmwConfig<'a> {
778    /// Middleware-specific connection string.
779    ///
780    /// - zenoh: `"tcp/192.168.1.1:7447"` or `"udp/224.0.0.224:7447"`
781    /// - XRCE-DDS: `"udp/192.168.1.1:2019"`
782    pub locator: &'a str,
783    /// Session mode (zenoh: client/peer; XRCE-DDS: always client)
784    pub mode: SessionMode,
785    /// ROS 2 domain ID (maps to DDS domain or zenoh key prefix)
786    pub domain_id: u32,
787    /// Node name (e.g., `"talker"`)
788    pub node_name: &'a str,
789    /// Node namespace (e.g., `""` or `"/ns1"`)
790    pub namespace: &'a str,
791    /// Backend-specific key/value properties.
792    ///
793    /// Uniform escape hatch for backend-specific tuning that doesn't fit
794    /// the universal fields above. Each backend documents the keys it
795    /// understands; unknown keys are ignored. Passing `&[]` is always
796    /// valid.
797    ///
798    /// Examples:
799    /// - zenoh: `"tls.root_ca"`, `"scouting.multicast.enabled"`
800    /// - XRCE-DDS: `"agent_port"`, `"client_key"`
801    pub properties: &'a [(&'a str, &'a str)],
802}
803
804impl Default for RmwConfig<'_> {
805    fn default() -> Self {
806        Self {
807            locator: "tcp/127.0.0.1:7447",
808            mode: SessionMode::Client,
809            domain_id: 0,
810            node_name: "node",
811            namespace: "",
812            properties: &[],
813        }
814    }
815}
816
817/// Locator transport protocol
818#[derive(Debug, Clone, Copy, PartialEq, Eq)]
819pub enum LocatorProtocol {
820    /// TCP transport (e.g., "tcp/127.0.0.1:7447")
821    Tcp,
822    /// UDP transport (e.g., "udp/192.168.1.50:2019" — common for XRCE-DDS)
823    Udp,
824    /// Serial/UART transport (e.g., "serial//dev/ttyUSB0#baudrate=115200")
825    Serial,
826    /// Unknown protocol
827    Unknown,
828}
829
830/// Parse the protocol from a locator string
831pub fn locator_protocol(locator: &str) -> LocatorProtocol {
832    if locator.starts_with("tcp/") {
833        LocatorProtocol::Tcp
834    } else if locator.starts_with("udp/") {
835        LocatorProtocol::Udp
836    } else if locator.starts_with("serial/") {
837        LocatorProtocol::Serial
838    } else {
839        LocatorProtocol::Unknown
840    }
841}
842
843/// Validate a locator string format.
844///
845/// Returns `Ok(())` if the locator is well-formed, or an error message describing
846/// the problem. This provides early feedback before zenoh-pico or XRCE-DDS rejects
847/// a bad locator.
848///
849/// Supported formats:
850/// - TCP: `tcp/<host>:<port>` (e.g., `tcp/127.0.0.1:7447`)
851/// - UDP: `udp/<host>:<port>` (e.g., `udp/192.168.1.50:2019`)
852/// - Serial: `serial/<device>#baudrate=<rate>` (e.g., `serial//dev/ttyUSB0#baudrate=115200`)
853pub fn validate_locator(locator: &str) -> Result<(), &'static str> {
854    match locator_protocol(locator) {
855        LocatorProtocol::Tcp => {
856            let rest = &locator[4..]; // skip "tcp/"
857            if !rest.contains(':') {
858                return Err("TCP locator must contain host:port (e.g., tcp/127.0.0.1:7447)");
859            }
860            Ok(())
861        }
862        LocatorProtocol::Udp => {
863            let rest = &locator[4..]; // skip "udp/"
864            if !rest.contains(':') {
865                return Err("UDP locator must contain host:port (e.g., udp/192.168.1.50:2019)");
866            }
867            Ok(())
868        }
869        LocatorProtocol::Serial => {
870            let rest = &locator[7..]; // skip "serial/"
871            if rest.is_empty() {
872                return Err(
873                    "serial locator must specify device (e.g., serial//dev/ttyUSB0#baudrate=115200)",
874                );
875            }
876            if !rest.contains("#baudrate=") {
877                return Err(
878                    "serial locator must include #baudrate=RATE (e.g., serial//dev/ttyUSB0#baudrate=115200)",
879                );
880            }
881            // Validate baudrate is numeric
882            if let Some(baud_str) = rest.split("#baudrate=").nth(1) {
883                let baud_str = baud_str.split('#').next().unwrap_or(baud_str);
884                if baud_str.parse::<u32>().is_err() {
885                    return Err("serial baudrate must be a number");
886                }
887            }
888            Ok(())
889        }
890        LocatorProtocol::Unknown => {
891            Err("unknown locator protocol (expected tcp/, udp/, or serial/)")
892        }
893    }
894}
895
896/// Session mode
897#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
898pub enum SessionMode {
899    /// Connect as client to a router
900    #[default]
901    Client,
902    /// Connect as peer for peer-to-peer communication
903    Peer,
904}
905
906/// Transport session trait — the per-process anchor an RMW backend
907/// gives to the executor.
908///
909/// # Threading
910///
911/// `&mut self` on every method means the executor serialises all
912/// session calls onto a single thread. A backend may rely on this
913/// — no internal locking is required for `create_*` / `close` /
914/// `drive_io`. **Publisher / subscriber / service handles created
915/// from the session, however, are typically used from worker
916/// threads** and must carry their own synchronisation (see the
917/// [`Publisher`] / [`Subscriber`] trait docs).
918///
919/// # Calling pattern
920///
921/// 1. Open the session (backend-specific factory; not on this trait).
922/// 2. `create_*` for every entity at startup. Creating entities mid-
923///    flight after `drive_io` has run is allowed but not common.
924/// 3. The executor calls `drive_io` periodically. Worker threads
925///    publish / receive in parallel.
926/// 4. `close` once at shutdown. Entities must be dropped first.
927pub trait Session {
928    /// Error type for this session
929    type Error;
930    /// Publisher handle type
931    type PublisherHandle;
932    /// Subscriber handle type
933    type SubscriberHandle;
934    /// Service server handle type
935    type ServiceServerHandle;
936    /// Service client handle type
937    type ServiceClientHandle;
938
939    /// Create a publisher bound to this session.
940    ///
941    /// May allocate transport resources (zenoh declarations, DDS
942    /// writers). Returns a handle that outlives the call but not the
943    /// session — drop the handle before `close()`.
944    fn create_publisher(
945        &mut self,
946        topic: &TopicInfo,
947        qos: QosSettings,
948    ) -> Result<Self::PublisherHandle, Self::Error>;
949
950    /// Create a subscriber bound to this session.
951    ///
952    /// Subscribers may start receiving immediately after creation if
953    /// the transport supports late-joining publishers. Late messages
954    /// are buffered up to the QoS depth.
955    fn create_subscriber(
956        &mut self,
957        topic: &TopicInfo,
958        qos: QosSettings,
959    ) -> Result<Self::SubscriberHandle, Self::Error>;
960
961    /// Create a service server bound to this session. Replies are
962    /// matched to requests by the sequence number returned from
963    /// [`ServiceServerTrait::try_recv_request`].
964    ///
965    /// `qos` is applied to both the request and reply endpoints (a
966    /// service is two DDS topics; rmw uses one profile for both). The
967    /// default is [`QosSettings::services_default`]
968    /// (RELIABLE+VOLATILE+KEEP_LAST(10)).
969    fn create_service_server(
970        &mut self,
971        service: &ServiceInfo,
972        qos: QosSettings,
973    ) -> Result<Self::ServiceServerHandle, Self::Error>;
974
975    /// Create a service client bound to this session.
976    ///
977    /// `qos` is applied to both the request and reply endpoints (a
978    /// service is two DDS topics; rmw uses one profile for both). The
979    /// default is [`QosSettings::services_default`]
980    /// (RELIABLE+VOLATILE+KEEP_LAST(10)).
981    fn create_service_client(
982        &mut self,
983        service: &ServiceInfo,
984        qos: QosSettings,
985    ) -> Result<Self::ServiceClientHandle, Self::Error>;
986
987    /// Close the session, releasing transport resources. All entity
988    /// handles created from this session must already be dropped.
989    fn close(&mut self) -> Result<(), Self::Error>;
990
991    /// Drive transport I/O (poll network, dispatch callbacks).
992    ///
993    /// Both zenoh-pico and XRCE-DDS are pull-based: they require the
994    /// application to periodically call this method to read from the
995    /// network socket and dispatch incoming messages to subscriber
996    /// buffers.
997    ///
998    /// `timeout_ms` is the maximum time to wait for data (0 = non-blocking;
999    /// negative values mean "block indefinitely" — see Phase 84.D7 for the
1000    /// planned migration to `core::time::Duration`).
1001    ///
1002    /// **Required**. There is no default body — both shipped backends
1003    /// (zenoh and XRCE) must drive I/O, and a silent no-op default was a
1004    /// trap for third-party implementers. If your backend genuinely
1005    /// receives data via OS callbacks (push-based) and has nothing to do
1006    /// here, return `Ok(())` explicitly.
1007    fn drive_io(&mut self, timeout_ms: i32) -> Result<(), Self::Error>;
1008
1009    /// Phase 109 — report which QoS policies the active backend
1010    /// honours. The runtime validates requested QoS against this mask
1011    /// at entity-create time and returns
1012    /// [`TransportError::IncompatibleQos`] if the requested profile
1013    /// includes a policy the backend can't enforce. **No silent
1014    /// downgrade.**
1015    ///
1016    /// Default returns [`QosPolicyMask::CORE`] — reliability +
1017    /// durability VOLATILE + history + depth. Backends override per
1018    /// supported policy.
1019    fn supported_qos_policies(&self) -> QosPolicyMask {
1020        QosPolicyMask::CORE
1021    }
1022
1023    /// Phase 110.0 — backend's next internal-event deadline in
1024    /// milliseconds from now (lease keepalive, heartbeat, reader
1025    /// ACK-NACK timeout, etc.).
1026    ///
1027    /// The executor caps its `drive_io` timeout against
1028    /// `min(user_timeout, timer_deadline, this)` so quiet links don't
1029    /// wake early, see no user-visible work, and round-trip back into
1030    /// `drive_io`. Returns `None` when the backend has no internal
1031    /// deadlines or chooses not to expose them.
1032    ///
1033    /// Default `None` keeps existing backends working unchanged; opt-in
1034    /// per backend.
1035    fn next_deadline_ms(&self) -> Option<u32> {
1036        None
1037    }
1038
1039    /// Phase 124.B.1 — install (or clear, when `cb.is_none()`) the
1040    /// executor wake callback. The runtime calls this once per
1041    /// session after `open` with `cb` pointing at a runtime-owned
1042    /// function and `ctx` pointing at the executor's wake state.
1043    /// The backend stores `(cb, ctx)` in its per-session state and
1044    /// calls `cb(ctx)` whenever its transport notification path
1045    /// fires (datagram arrival, condvar wake, etc.) — the runtime
1046    /// cb does flag-write + condvar-signal atomically, so a
1047    /// `spin_once` blocked on the wake condvar resumes immediately
1048    /// instead of waiting for the next poll iteration.
1049    ///
1050    /// # Safety
1051    ///
1052    /// When `cb` is `Some`, `ctx` must remain valid until the callback is
1053    /// cleared or the session is closed. The backend may invoke `cb(ctx)` from
1054    /// its transport notification path.
1055    ///
1056    /// Default body: ignore the call. Poll-only backends (XRCE,
1057    /// bare-metal) leave the default in place; the executor still
1058    /// drains them on its deadline-bound cv-wait boundary.
1059    unsafe fn set_wake_callback(
1060        &mut self,
1061        cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
1062        ctx: *mut core::ffi::c_void,
1063    ) {
1064        let _ = (cb, ctx);
1065    }
1066
1067    /// Phase 130.4 — does this backend actually honour
1068    /// [`set_wake_callback`]?
1069    ///
1070    /// `true` means the backend installs the callback and will
1071    /// fire it from its async notify path (worker thread, ISR,
1072    /// signalfd, …). `false` (the default) means
1073    /// `set_wake_callback` was a no-op — the executor must drive
1074    /// I/O for the caller's full timeout because no async wake
1075    /// will pre-empt it.
1076    ///
1077    /// The executor uses this to choose between the wake-primitive
1078    /// wait (`NodeWake::wait_ms` / `std::Condvar::wait_timeout_while`)
1079    /// and a direct `drive_io(timeout_ms)`. Poll-only backends
1080    /// (XRCE-DDS-Client, bare-metal smoltcp) return `false`;
1081    /// event-driven backends (zenoh-pico with an RX task that
1082    /// invokes the callback on packet arrival) return `true`.
1083    ///
1084    /// [`set_wake_callback`]: Self::set_wake_callback
1085    fn supports_wake_callback(&self) -> bool {
1086        false
1087    }
1088
1089    /// Phase 124.F.1 — session-level connectivity probe.
1090    ///
1091    /// Sends a wire-level round-trip probe and waits up to
1092    /// `timeout_ms`. `Ok(())` on reply, `Err(TransportError::Timeout)`
1093    /// on no reply, `Err(TransportError::Unsupported)` when the
1094    /// backend can't probe (DDS without participant introspection).
1095    /// Lesson from micro-ROS's `rmw_uros_ping_agent`.
1096    ///
1097    /// Default body: `Err(Unsupported)`. Backends with a native
1098    /// ping API (zenoh: `z_send_ping`; XRCE:
1099    /// `uxr_ping_agent_session_until_timeout`) opt in by overriding.
1100    fn ping_session(&mut self, timeout_ms: i32) -> Result<(), Self::Error>
1101    where
1102        Self::Error: From<TransportError>,
1103    {
1104        let _ = timeout_ms;
1105        Err(TransportError::Unsupported.into())
1106    }
1107}
1108
1109/// Bitmask of QoS policies a backend can honour. See
1110/// [`Session::supported_qos_policies`].
1111///
1112/// `CORE` covers the policies every nano-ros backend implements:
1113/// reliability, durability=VOLATILE, history, depth. Backends opt
1114/// into additional policies by OR-ing the relevant flags.
1115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1116pub struct QosPolicyMask(pub u32);
1117
1118impl QosPolicyMask {
1119    pub const RELIABILITY: Self = Self(1 << 0);
1120    pub const DURABILITY_VOLATILE: Self = Self(1 << 1);
1121    pub const DURABILITY_TRANSIENT_LOCAL: Self = Self(1 << 2);
1122    pub const HISTORY: Self = Self(1 << 3);
1123    pub const DEPTH: Self = Self(1 << 4);
1124    pub const DEADLINE: Self = Self(1 << 5);
1125    pub const LIFESPAN: Self = Self(1 << 6);
1126    pub const LIVELINESS_AUTOMATIC: Self = Self(1 << 7);
1127    pub const LIVELINESS_MANUAL_BY_TOPIC: Self = Self(1 << 8);
1128    pub const LIVELINESS_MANUAL_BY_NODE: Self = Self(1 << 9);
1129    pub const LIVELINESS_LEASE: Self = Self(1 << 10);
1130    pub const AVOID_ROS_NAMESPACE_CONVENTIONS: Self = Self(1 << 11);
1131
1132    /// Policies every nano-ros backend implements.
1133    pub const CORE: Self =
1134        Self(Self::RELIABILITY.0 | Self::DURABILITY_VOLATILE.0 | Self::HISTORY.0 | Self::DEPTH.0);
1135
1136    /// `true` if `self` contains every policy in `other`.
1137    pub const fn contains(self, other: Self) -> bool {
1138        self.0 & other.0 == other.0
1139    }
1140
1141    /// Bitwise OR of two masks.
1142    pub const fn union(self, other: Self) -> Self {
1143        Self(self.0 | other.0)
1144    }
1145}
1146
1147impl core::ops::BitOr for QosPolicyMask {
1148    type Output = Self;
1149    fn bitor(self, rhs: Self) -> Self {
1150        self.union(rhs)
1151    }
1152}
1153
1154impl core::ops::BitOrAssign for QosPolicyMask {
1155    fn bitor_assign(&mut self, rhs: Self) {
1156        self.0 |= rhs.0;
1157    }
1158}
1159
1160impl QosSettings {
1161    /// Compute the set of QoS policies actually requested by this profile.
1162    ///
1163    /// Zero-valued time fields and `LivelinessKind::None` count as "not
1164    /// requesting" the corresponding policy — the cheap default. The
1165    /// `CORE` bits (reliability, durability=VOLATILE, history, depth)
1166    /// are always present because every nano-ros backend honours them.
1167    pub fn required_policies(&self) -> QosPolicyMask {
1168        let mut mask = QosPolicyMask::CORE;
1169        if self.durability == QosDurabilityPolicy::TransientLocal {
1170            mask = QosPolicyMask(
1171                (mask.0 & !QosPolicyMask::DURABILITY_VOLATILE.0)
1172                    | QosPolicyMask::DURABILITY_TRANSIENT_LOCAL.0,
1173            );
1174        }
1175        if self.deadline_ms != 0 {
1176            mask |= QosPolicyMask::DEADLINE;
1177        }
1178        if self.lifespan_ms != 0 {
1179            mask |= QosPolicyMask::LIFESPAN;
1180        }
1181        match self.liveliness_kind {
1182            QosLivelinessPolicy::None => {}
1183            QosLivelinessPolicy::Automatic => mask |= QosPolicyMask::LIVELINESS_AUTOMATIC,
1184            QosLivelinessPolicy::ManualByTopic => mask |= QosPolicyMask::LIVELINESS_MANUAL_BY_TOPIC,
1185            QosLivelinessPolicy::ManualByNode => mask |= QosPolicyMask::LIVELINESS_MANUAL_BY_NODE,
1186        }
1187        if self.liveliness_lease_ms != 0 {
1188            mask |= QosPolicyMask::LIVELINESS_LEASE;
1189        }
1190        if self.avoid_ros_namespace_conventions {
1191            mask |= QosPolicyMask::AVOID_ROS_NAMESPACE_CONVENTIONS;
1192        }
1193        mask
1194    }
1195
1196    /// Returns `Err(TransportError::IncompatibleQos)` if any policy this
1197    /// profile requires is missing from the backend's `supported` mask.
1198    /// Used at entity-create time to enforce the **no silent
1199    /// degradation** contract.
1200    pub fn validate_against(&self, supported: QosPolicyMask) -> Result<(), TransportError> {
1201        if supported.contains(self.required_policies()) {
1202            Ok(())
1203        } else {
1204            Err(TransportError::IncompatibleQos)
1205        }
1206    }
1207}
1208
1209/// Publisher trait for sending messages.
1210///
1211/// # Threading
1212///
1213/// `&self` on `publish_raw` — implementors must allow concurrent
1214/// publishes from multiple threads. Internal locking (or lock-free
1215/// queues) is the backend's responsibility.
1216///
1217/// # Buffer ownership
1218///
1219/// `data` in `publish_raw` is borrowed for the duration of the call.
1220/// The backend must either send it inline or copy into its own
1221/// buffer before returning — the slice is invalid after the call.
1222///
1223/// # Blocking
1224///
1225/// `publish_raw` is expected to be non-blocking on best-effort QoS
1226/// and bounded-blocking on reliable QoS (waiting for outbound queue
1227/// space). Backends should *not* block waiting for ack from a
1228/// matched subscriber.
1229pub trait Publisher {
1230    /// Error type for publish operations
1231    type Error;
1232
1233    /// Publish a CDR-serialised message.
1234    ///
1235    /// Returns once the message has been handed to the transport
1236    /// (queued or fired-and-forgotten depending on QoS). Does **not**
1237    /// wait for delivery.
1238    fn publish_raw(&self, data: &[u8]) -> Result<(), Self::Error>;
1239
1240    /// Phase 128.F.4 — publish with an opaque attachment block.
1241    ///
1242    /// `attachment` rides alongside the payload at the wire layer.
1243    /// Receivers can read it back via
1244    /// [`Subscriber::try_recv_raw_with_attachment`].
1245    ///
1246    /// Primary use case: cross-RMW bridges stamp a `bridge_origin`
1247    /// tag (the source backend's RMW name) so a paired return
1248    /// bridge can deterministically drop echoed frames.
1249    ///
1250    /// Default body delegates to [`publish_raw`](Self::publish_raw)
1251    /// and discards the attachment — backends that do not natively
1252    /// carry attachments (XRCE today, DDS without a user-data hook)
1253    /// see no change. Backends with native attachment support
1254    /// (zenoh-pico's `z_publisher_put_options::attachment`,
1255    /// Cyclone DDS user-data) override to write the bytes onto the
1256    /// wire.
1257    fn publish_raw_with_attachment(
1258        &self,
1259        data: &[u8],
1260        _attachment: &[u8],
1261    ) -> Result<(), Self::Error> {
1262        self.publish_raw(data)
1263    }
1264
1265    /// Phase 124.E.1 — streamed publish.
1266    ///
1267    /// `size_cb` reports the total payload length once; `chunk_cb`
1268    /// fills the slot in chunks. Saves the per-publisher staging
1269    /// buffer when the message is large enough to dominate the
1270    /// device's `.bss`.
1271    ///
1272    /// Default body — the **staging-buffer fallback** (124.E.2).
1273    /// Asks `size_cb` for the total length, fills a stack-allocated
1274    /// `[u8; NROS_MAX_STREAM_CHUNK]` via `chunk_cb`, then forwards
1275    /// to `publish_raw`. Returns `Err(BufferTooSmall)` if the total
1276    /// exceeds the stack cap so the caller can drop back to a
1277    /// regular `publish_raw` with a heap-sized buffer.
1278    ///
1279    /// Concrete backends opt in by overriding to stream straight
1280    /// into the network buffer (zenoh: write into the zenoh-pico
1281    /// outbound buffer; XRCE: micro-CDR streaming APIs).
1282    ///
1283    /// # Safety
1284    ///
1285    /// The caller must ensure `user_ctx` is valid for every invocation of
1286    /// `size_cb` and `chunk_cb` during this call, and that both callbacks obey
1287    /// their out-pointer contracts.
1288    ///
1289    /// `size_cb` and `chunk_cb` may be called from the same thread
1290    /// that called `publish_streamed`. Backends MUST NOT defer the
1291    /// calls past the function return; the caller's `user_ctx`
1292    /// pointer is only guaranteed valid for the duration of the
1293    /// call.
1294    unsafe fn publish_streamed(
1295        &self,
1296        size_cb: unsafe extern "C" fn(out_total_len: *mut usize, user_ctx: *mut core::ffi::c_void),
1297        chunk_cb: unsafe extern "C" fn(
1298            out_buf: *mut u8,
1299            cap: usize,
1300            out_written: *mut usize,
1301            user_ctx: *mut core::ffi::c_void,
1302        ),
1303        user_ctx: *mut core::ffi::c_void,
1304    ) -> Result<(), Self::Error>
1305    where
1306        Self::Error: From<TransportError>,
1307    {
1308        /// Default staging-buffer cap. Stack-allocated, so embedded
1309        /// callers don't pay for it unless they actually invoke this
1310        /// fallback. 4 KiB matches typical RTPS frag-size +
1311        /// micro-XRCE message ceilings.
1312        const STAGE_CAP: usize = 4096;
1313
1314        let mut total = 0usize;
1315        // SAFETY: caller's contract on `size_cb` matches our trait
1316        // doc — fire once with a writable `*mut usize` slot.
1317        unsafe { size_cb(&mut total as *mut usize, user_ctx) };
1318        if total > STAGE_CAP {
1319            return Err(TransportError::BufferTooSmall.into());
1320        }
1321        let mut stage = [0u8; STAGE_CAP];
1322        let mut written_so_far = 0usize;
1323        while written_so_far < total {
1324            let mut chunk_written = 0usize;
1325            let remaining = total - written_so_far;
1326            // SAFETY: `chunk_cb` writes ≤ `cap` bytes to
1327            // `out_buf` and reports the count via `out_written`.
1328            unsafe {
1329                chunk_cb(
1330                    stage.as_mut_ptr().add(written_so_far),
1331                    remaining,
1332                    &mut chunk_written as *mut usize,
1333                    user_ctx,
1334                );
1335            }
1336            if chunk_written == 0 {
1337                // Caller signalled EOF early — treat the partial
1338                // write as a malformed sequence; reporting it as
1339                // BufferTooSmall keeps the surface tight without
1340                // adding a new variant.
1341                return Err(TransportError::BufferTooSmall.into());
1342            }
1343            written_so_far += chunk_written;
1344        }
1345        self.publish_raw(&stage[..total])
1346    }
1347
1348    /// Publish a typed message (serializes automatically)
1349    fn publish<M: RosMessage>(&self, msg: &M, buf: &mut [u8]) -> Result<(), Self::Error> {
1350        use nros_core::CdrWriter;
1351
1352        let mut writer = CdrWriter::new_with_header(buf).map_err(|_| self.buffer_error())?;
1353        msg.serialize(&mut writer)
1354            .map_err(|_| self.serialization_error())?;
1355        let len = writer.position();
1356        self.publish_raw(&buf[..len])
1357    }
1358
1359    /// Return a buffer-too-small error (implementation specific)
1360    fn buffer_error(&self) -> Self::Error;
1361
1362    /// Return a serialization error (implementation specific)
1363    fn serialization_error(&self) -> Self::Error;
1364
1365    /// Phase 108 — `true` if the backend can generate this event for
1366    /// this publisher. Default returns `false`; backends override per
1367    /// supported event kind.
1368    ///
1369    /// Only [`EventKind::LivelinessLost`](crate::event::EventKind::LivelinessLost) and
1370    /// [`EventKind::OfferedDeadlineMissed`](crate::event::EventKind::OfferedDeadlineMissed) are publisher-side events;
1371    /// other kinds always return `false` here.
1372    fn supports_event(&self, _kind: crate::event::EventKind) -> bool {
1373        false
1374    }
1375
1376    /// Phase 108 — register a callback fired when the named status
1377    /// event occurs. `deadline_ms` applies to
1378    /// [`EventKind::OfferedDeadlineMissed`](crate::event::EventKind::OfferedDeadlineMissed) only; ignored otherwise.
1379    /// Default impl returns the backend's "unsupported"-shaped error.
1380    ///
1381    /// # Safety
1382    ///
1383    /// `cb` and `user_ctx` must remain valid for the entity's
1384    /// lifetime. Caller (typically `nros-node`'s typed wrapper) is
1385    /// responsible for keeping the closure / context arena alive.
1386    unsafe fn register_event_callback(
1387        &mut self,
1388        _kind: crate::event::EventKind,
1389        _deadline_ms: u32,
1390        _cb: crate::event::EventCallback,
1391        _user_ctx: *mut core::ffi::c_void,
1392    ) -> Result<(), Self::Error> {
1393        Err(self.unsupported_event_error())
1394    }
1395
1396    /// Phase 108 — backend's error variant for "this event kind is
1397    /// not supported." Default impl reuses `serialization_error()`
1398    /// since most backends share an `Unsupported` variant; backends
1399    /// override if they have a distinct `Unsupported` mapping.
1400    fn unsupported_event_error(&self) -> Self::Error {
1401        self.serialization_error()
1402    }
1403
1404    /// Phase 109 — assert this publisher's liveliness manually.
1405    /// Required for publishers configured with
1406    /// `QosLivelinessPolicy::ManualByTopic`. No-op for other
1407    /// liveliness kinds. Default impl returns `Ok(())` (no-op);
1408    /// backends override when they implement manual liveliness.
1409    fn assert_liveliness(&self) -> Result<(), Self::Error> {
1410        Ok(())
1411    }
1412}
1413
1414/// Subscriber trait for receiving messages.
1415///
1416/// # Threading
1417///
1418/// `&mut self` on `try_recv_raw` — the executor takes exclusive
1419/// ownership of the subscriber for the duration of a receive. A
1420/// backend that wants to allow concurrent receives must split into
1421/// per-thread sub-handles internally.
1422///
1423/// # Buffer ownership
1424///
1425/// `buf` is caller-owned. The implementation copies the next ready
1426/// message into `buf` and returns the byte count. The caller may
1427/// re-use or drop `buf` immediately after the call.
1428///
1429/// # Blocking
1430///
1431/// `try_recv_raw` is **non-blocking**: returns `Ok(None)` (or
1432/// equivalent for backends that map empty into a zero-length read)
1433/// when no message is ready. Use [`Session::drive_io`] to wait for
1434/// data; never sleep inside `try_recv_raw`.
1435pub trait Subscriber {
1436    /// Error type for receive operations
1437    type Error;
1438
1439    /// Check if data is available without consuming it.
1440    ///
1441    /// Non-destructive — does not advance the receive cursor.
1442    /// Conservative default returns `true` (always assume data may
1443    /// be available); backends should override with a real check
1444    /// to avoid spurious receive attempts.
1445    fn has_data(&self) -> bool {
1446        true
1447    }
1448
1449    /// Try to receive one message into `buf`.
1450    ///
1451    /// Non-blocking. On success returns `Ok(Some(len))` where `len`
1452    /// is the byte count written into `buf[..len]`. Returns
1453    /// `Ok(None)` if no message is ready. If `buf` is too small the
1454    /// backend may either truncate (and document it) or return an
1455    /// error (preferred).
1456    fn try_recv_raw(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error>;
1457
1458    /// Phase 128.F.4 — receive with attachment bytes alongside the
1459    /// payload.
1460    ///
1461    /// On success returns `Ok(Some((payload_len, attachment_len)))`
1462    /// with the payload written into `buf[..payload_len]` and the
1463    /// attachment (if any) written into
1464    /// `att_buf[..attachment_len]`. `attachment_len == 0` means the
1465    /// incoming sample carried no attachment.
1466    ///
1467    /// Default body falls back to [`try_recv_raw`](Self::try_recv_raw)
1468    /// and reports a 0-length attachment. Backends with native
1469    /// attachment support override to populate `att_buf`. Cross-RMW
1470    /// bridges use the attachment to read the `bridge_origin` tag
1471    /// stamped by the sending side.
1472    fn try_recv_raw_with_attachment(
1473        &mut self,
1474        buf: &mut [u8],
1475        _att_buf: &mut [u8],
1476    ) -> Result<Option<(usize, usize)>, Self::Error> {
1477        match self.try_recv_raw(buf)? {
1478            Some(len) => Ok(Some((len, 0))),
1479            None => Ok(None),
1480        }
1481    }
1482
1483    /// Phase 124.D.1 — burst-take.
1484    ///
1485    /// Drain up to `max_msgs` queued samples into the contiguous
1486    /// `buf` block in one call, with the i-th sample at
1487    /// `buf[i * per_msg_cap .. i * per_msg_cap + out_lens[i]]`.
1488    /// Returns the number of messages actually delivered. Partial
1489    /// drains MUST report the count, not error out.
1490    ///
1491    /// Default body loop-drives `try_recv_raw` so callers can
1492    /// commit to the batched API regardless of backend support.
1493    /// Concrete backends opt in by overriding with a native batch
1494    /// take (zenoh queue drain, `dds_take(max_samples)`).
1495    fn try_recv_sequence(
1496        &mut self,
1497        buf: &mut [u8],
1498        per_msg_cap: usize,
1499        max_msgs: usize,
1500        out_lens: &mut [usize],
1501    ) -> Result<usize, Self::Error> {
1502        if per_msg_cap == 0 || max_msgs == 0 {
1503            return Ok(0);
1504        }
1505        let limit = max_msgs.min(out_lens.len());
1506        let mut count = 0;
1507        for i in 0..limit {
1508            let slot = &mut buf[i * per_msg_cap..(i + 1) * per_msg_cap];
1509            match self.try_recv_raw(slot)? {
1510                Some(len) => {
1511                    out_lens[i] = len;
1512                    count += 1;
1513                }
1514                None => break,
1515            }
1516        }
1517        Ok(count)
1518    }
1519
1520    /// Try to receive a typed message (non-blocking)
1521    fn try_recv<M: RosMessage>(&mut self, buf: &mut [u8]) -> Result<Option<M>, Self::Error> {
1522        use nros_core::CdrReader;
1523
1524        match self.try_recv_raw(buf)? {
1525            Some(len) => {
1526                let mut reader = CdrReader::new_with_header(&buf[..len])
1527                    .map_err(|_| self.deserialization_error())?;
1528                let msg = M::deserialize(&mut reader).map_err(|_| self.deserialization_error())?;
1529                Ok(Some(msg))
1530            }
1531            None => Ok(None),
1532        }
1533    }
1534
1535    /// Process the received message in-place without copying.
1536    ///
1537    /// Calls `f` with a reference to the raw CDR bytes in the subscriber's
1538    /// internal receive buffer, avoiding a copy into a caller-provided buffer.
1539    /// While `f` executes the buffer is exclusively borrowed — any messages
1540    /// arriving from the transport during that time are dropped to prevent
1541    /// data races.
1542    ///
1543    /// Returns `Ok(true)` if a message was available and `f` was called,
1544    /// `Ok(false)` if no message was available.
1545    ///
1546    /// **Default body**: returns `Err(MessageTooLarge)` — the old default
1547    /// silently truncated anything larger than 1 KB into a stack buffer,
1548    /// which broke large messages with no diagnostic. Backends must
1549    /// override this with a real zero-copy path if they advertise support
1550    /// for `process_raw_in_place`; callers that hit the default should
1551    /// use `try_recv_raw` with a caller-sized buffer instead.
1552    fn process_raw_in_place(&mut self, f: impl FnOnce(&[u8])) -> Result<bool, Self::Error>
1553    where
1554        Self::Error: From<TransportError>,
1555    {
1556        let _ = f;
1557        Err(TransportError::MessageTooLarge.into())
1558    }
1559
1560    /// Whether this backend implements the in-place dispatch methods
1561    /// ([`process_raw_in_place`](Subscriber::process_raw_in_place) /
1562    /// [`process_raw_in_place_with_info`](Subscriber::process_raw_in_place_with_info))
1563    /// with a real zero-copy borrow.
1564    ///
1565    /// The executor consults this at subscription registration to choose the
1566    /// **in-place** arena dispatch (borrow + deserialize from the backend slot, no
1567    /// arena buffer) over the **buffered** dispatch (copy into an arena buffer
1568    /// first). Backends that leave the in-place methods at their unsupported
1569    /// default return `false` (the default) and keep the buffered path. (RFC-0038,
1570    /// Phase 231 Wave 0.2.)
1571    fn supports_process_in_place(&self) -> bool {
1572        false
1573    }
1574
1575    /// In-place processing variant that also surfaces publisher metadata.
1576    ///
1577    /// Same borrow contract as
1578    /// [`process_raw_in_place`](Subscriber::process_raw_in_place): `f` receives
1579    /// the raw CDR bytes plus the parsed [`MessageInfo`](nros_core::MessageInfo)
1580    /// — the co-located attachment (publisher GID / sequence / source timestamp),
1581    /// or `None` when no attachment was present — for the duration of the call;
1582    /// the slot is released after `f` returns. `Ok(true)` = a message was
1583    /// available and `f` was called; `Ok(false)` = none ready.
1584    ///
1585    /// **Default body**: returns the unsupported error (mirrors
1586    /// `process_raw_in_place`). Backends that advertise in-place support override
1587    /// this with a real zero-copy path; callers that hit the default should use
1588    /// the buffered [`try_recv_raw_with_info`](Subscriber::try_recv_raw_with_info)
1589    /// path instead. (RFC-0038, Phase 231 Wave 0.1.)
1590    fn process_raw_in_place_with_info(
1591        &mut self,
1592        f: impl FnOnce(&[u8], Option<nros_core::MessageInfo>),
1593    ) -> Result<bool, Self::Error>
1594    where
1595        Self::Error: From<TransportError>,
1596    {
1597        let _ = f;
1598        Err(TransportError::MessageTooLarge.into())
1599    }
1600
1601    /// Try to receive raw data along with publisher metadata.
1602    ///
1603    /// When available, [`MessageInfo`](nros_core::MessageInfo) contains
1604    /// the publisher's GID (Global Identifier) and source timestamp,
1605    /// extracted from a transport-level attachment on the incoming message.
1606    ///
1607    /// Returns `Ok(Some((len, info)))` if data is available, where:
1608    /// - `len` is the number of bytes written to the buffer
1609    /// - `info` is the parsed publisher metadata (if attachment was present)
1610    ///
1611    /// Default: delegates to [`try_recv_raw`](Subscriber::try_recv_raw) with no info.
1612    fn try_recv_raw_with_info(
1613        &mut self,
1614        buf: &mut [u8],
1615    ) -> Result<Option<(usize, Option<nros_core::MessageInfo>)>, Self::Error> {
1616        self.try_recv_raw(buf).map(|opt| opt.map(|len| (len, None)))
1617    }
1618
1619    /// Try to receive raw data with E2E safety validation (CRC + sequence tracking).
1620    ///
1621    /// Returns `Ok(Some((len, status)))` if data is available, where:
1622    /// - `len` is the number of bytes written to the buffer
1623    /// - `status` is the integrity validation result
1624    ///
1625    /// Default: delegates to `try_recv_raw` with no CRC info.
1626    #[cfg(feature = "safety-e2e")]
1627    fn try_recv_validated(
1628        &mut self,
1629        buf: &mut [u8],
1630    ) -> Result<Option<(usize, crate::IntegrityStatus)>, Self::Error> {
1631        self.try_recv_raw(buf).map(|opt| {
1632            opt.map(|len| {
1633                (
1634                    len,
1635                    crate::IntegrityStatus {
1636                        gap: 0,
1637                        duplicate: false,
1638                        crc_valid: None,
1639                    },
1640                )
1641            })
1642        })
1643    }
1644
1645    /// Register an async waker to be notified when data arrives.
1646    ///
1647    /// Called from `Future::poll()` implementations to store the waker.
1648    /// The transport backend calls `waker.wake()` from its receive callback
1649    /// when new data is available, enabling event-driven async without
1650    /// busy-polling.
1651    ///
1652    /// Default: no-op (backends that don't support waking simply ignore this).
1653    fn register_waker(&self, _waker: &core::task::Waker) {}
1654
1655    /// Return a deserialization error (implementation specific)
1656    fn deserialization_error(&self) -> Self::Error;
1657
1658    /// Phase 108 — `true` if the backend can generate this event for
1659    /// this subscriber. Default returns `false`; backends override per
1660    /// supported event kind.
1661    ///
1662    /// Subscriber-side event kinds:
1663    /// [`EventKind::LivelinessChanged`](crate::event::EventKind::LivelinessChanged),
1664    /// [`EventKind::RequestedDeadlineMissed`](crate::event::EventKind::RequestedDeadlineMissed),
1665    /// [`EventKind::MessageLost`](crate::event::EventKind::MessageLost).
1666    /// Publisher kinds always return `false` here.
1667    fn supports_event(&self, _kind: crate::event::EventKind) -> bool {
1668        false
1669    }
1670
1671    /// Phase 108 — register a callback fired when the named status
1672    /// event occurs. `deadline_ms` applies to
1673    /// [`EventKind::RequestedDeadlineMissed`](crate::event::EventKind::RequestedDeadlineMissed) only; ignored otherwise.
1674    /// Default impl returns the backend's "unsupported"-shaped error.
1675    ///
1676    /// # Safety
1677    ///
1678    /// `cb` and `user_ctx` must remain valid for the entity's
1679    /// lifetime. Caller (typically `nros-node`'s typed wrapper) is
1680    /// responsible for keeping the closure / context arena alive.
1681    unsafe fn register_event_callback(
1682        &mut self,
1683        _kind: crate::event::EventKind,
1684        _deadline_ms: u32,
1685        _cb: crate::event::EventCallback,
1686        _user_ctx: *mut core::ffi::c_void,
1687    ) -> Result<(), Self::Error> {
1688        Err(self.unsupported_event_error())
1689    }
1690
1691    /// Phase 108 — backend's error variant for "this event kind is
1692    /// not supported." Default reuses `deserialization_error()` for
1693    /// backends that don't have a distinct `Unsupported` mapping.
1694    fn unsupported_event_error(&self) -> Self::Error {
1695        self.deserialization_error()
1696    }
1697}
1698
1699/// Service request from a client
1700pub struct ServiceRequest<'a> {
1701    /// Raw request data (CDR encoded)
1702    pub data: &'a [u8],
1703    /// Sequence number for request/response matching
1704    pub sequence_number: i64,
1705}
1706
1707// ============================================================================
1708// Phase 99 — zero-copy raw API: SlotLending / SlotBorrowing
1709// ============================================================================
1710//
1711// Backends that can lend a slot directly into their outbound buffer
1712// (zenoh-pico w/ unstable-zenoh-api, XRCE-DDS via uxr_prepare_output_stream,
1713// full DDS w/ SHM transport) implement these traits. Backends that cannot
1714// (uORB, default zenoh-pico) do NOT impl them — `EmbeddedRawPublisher` then
1715// falls back to its per-publisher arena and memcpys at commit time. Both
1716// paths land at `Publisher::publish_raw` for the actual wire write; only
1717// the user-side copy is eliminated when lending is available.
1718//
1719// Selection is **compile-time** via the `lending` Cargo feature. Each
1720// backend crate forwards its own `lending` feature to `nros-rmw/lending`
1721// when it can satisfy the trait. nros-node's `rmw-lending` aggregates.
1722// User opting `nros/rmw-lending` w/ a non-lending backend (e.g. uORB)
1723// gets a clear compile error from the unsatisfied trait bound on the
1724// concrete `RmwPublisher`.
1725
1726/// Backend can lend a writable slot into its outbound buffer.
1727///
1728/// The returned slot's lifetime is tied to `&self`; user fills it in
1729/// place, then calls [`commit_slot`](Self::commit_slot) to publish.
1730/// Dropping the slot without commit is a no-op (slot returned to free
1731/// pool); concurrent loan attempts that would exceed backend capacity
1732/// return [`TransportError::WouldBlock`] — never block.
1733#[cfg(feature = "lending")]
1734pub trait SlotLending: Publisher {
1735    /// Backend-owned writable slot. Holds a `&'a mut [u8]` and any
1736    /// state needed for commit_slot.
1737    type Slot<'a>: AsMut<[u8]> + 'a
1738    where
1739        Self: 'a;
1740
1741    /// Reserve a writable slot of `len` bytes from the backend's
1742    /// outbound buffer. Returns `Ok(None)` if the backend has no slot
1743    /// available (full); never blocks.
1744    fn try_lend_slot(&self, len: usize) -> Result<Option<Self::Slot<'_>>, Self::Error>;
1745
1746    /// Commit a previously-lent slot. Consumes the slot and triggers
1747    /// the actual wire write. Returns `Err` on backend send failure;
1748    /// the slot's bytes are lost in that case (caller must re-lend +
1749    /// re-fill to retry).
1750    fn commit_slot(&self, slot: Self::Slot<'_>) -> Result<(), Self::Error>;
1751}
1752
1753/// Backend can lend a read-only view into its receive buffer.
1754///
1755/// The returned view's lifetime is tied to `&mut self` (subscriber-
1756/// exclusive); dropping the view releases any backend lock and lets
1757/// the next message advance into the buffer.
1758#[cfg(feature = "lending")]
1759pub trait SlotBorrowing: Subscriber {
1760    /// Backend-owned read-only view. Holds a `&'a [u8]` and any state
1761    /// needed to release the borrow on Drop.
1762    type View<'a>: AsRef<[u8]> + 'a
1763    where
1764        Self: 'a;
1765
1766    /// Try to borrow the next available message in place. Returns
1767    /// `Ok(None)` if no message is ready; never blocks.
1768    fn try_borrow(&mut self) -> Result<Option<Self::View<'_>>, Self::Error>;
1769}
1770
1771/// Service server trait for handling requests.
1772///
1773/// # Threading
1774///
1775/// `&mut self` on `try_recv_request` and `send_reply` — the executor
1776/// owns the server while a request is being handled. Handler bodies
1777/// run synchronously on the executor thread; long handlers should
1778/// dispatch work to a worker queue and reply later via the recorded
1779/// `sequence_number`.
1780///
1781/// # Calling pattern
1782///
1783/// 1. Executor calls `try_recv_request(buf)`.
1784/// 2. If `Some(req)` returned, decode, run handler, encode reply.
1785/// 3. `send_reply(req.sequence_number, &reply_buf)`.
1786///
1787/// `sequence_number` is the canonical request → reply correlation
1788/// token; backends derive it from the wire-level metadata (zenoh
1789/// query id, DDS sample identity).
1790pub trait ServiceServerTrait {
1791    /// Error type for service operations
1792    type Error;
1793
1794    /// Check if a request is available without consuming it.
1795    ///
1796    /// Non-destructive. Default returns `true` (always assume one
1797    /// may be available); backends should override with a real
1798    /// check.
1799    fn has_request(&self) -> bool {
1800        true
1801    }
1802
1803    /// Phase 122.3.c.6.e — register a `Waker` for event-driven
1804    /// service servers. Mirrors the matching method on
1805    /// `SubscriberTrait` / `ServiceClientTrait`. Backends that
1806    /// surface incoming-request notifications wake `waker` when
1807    /// `has_request()` flips true. Default: no-op (backends without
1808    /// wake support ignore — caller falls back to polling).
1809    fn register_waker(&self, _waker: &core::task::Waker) {}
1810
1811    /// Try to receive a service request into `buf` (non-blocking).
1812    ///
1813    /// On success returns a `ServiceRequest` that borrows from
1814    /// `buf`. The borrow is released when the returned struct is
1815    /// dropped — typically before `send_reply` is called, since
1816    /// `send_reply` takes `&mut self`.
1817    fn try_recv_request<'a>(
1818        &mut self,
1819        buf: &'a mut [u8],
1820    ) -> Result<Option<ServiceRequest<'a>>, Self::Error>;
1821
1822    /// Send a reply for the given sequence number. Non-blocking
1823    /// from the application's perspective; the backend may queue
1824    /// the reply for transport-level transmission.
1825    fn send_reply(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), Self::Error>;
1826
1827    /// Handle a service request with typed messages
1828    fn handle_request<S: RosService>(
1829        &mut self,
1830        req_buf: &mut [u8],
1831        reply_buf: &mut [u8],
1832        handler: impl FnOnce(&S::Request) -> S::Reply,
1833    ) -> Result<bool, Self::Error>
1834    where
1835        Self::Error: From<TransportError>,
1836    {
1837        use nros_core::{CdrReader, CdrWriter};
1838
1839        // First, try to receive a request and extract necessary data.
1840        // Capture the data slice's offset within `req_buf` so we can
1841        // re-borrow it after the `ServiceRequest` (which holds a
1842        // borrow into `req_buf`) is dropped. Some backends prepend a
1843        // header (DDS: 8-byte sequence number) and place the CDR
1844        // payload at a non-zero offset in the buffer; others (zenoh)
1845        // put it at offset 0. Reading from offset 0 unconditionally
1846        // would feed the prefix bytes to the CDR deserializer and
1847        // silently corrupt the request.
1848        let buf_start = req_buf.as_ptr() as usize;
1849        let (data_offset, data_len, sequence_number) = match self.try_recv_request(req_buf)? {
1850            Some(request) => {
1851                let offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
1852                (offset, request.data.len(), request.sequence_number)
1853            }
1854            None => return Ok(false),
1855        };
1856
1857        // Deserialize request from the captured offset.
1858        let mut reader = CdrReader::new_with_header(&req_buf[data_offset..data_offset + data_len])
1859            .map_err(|_| TransportError::DeserializationError)?;
1860        let req = S::Request::deserialize(&mut reader)
1861            .map_err(|_| TransportError::DeserializationError)?;
1862
1863        // Call handler
1864        let reply = handler(&req);
1865
1866        // Serialize reply
1867        let mut writer =
1868            CdrWriter::new_with_header(reply_buf).map_err(|_| TransportError::BufferTooSmall)?;
1869        reply
1870            .serialize(&mut writer)
1871            .map_err(|_| TransportError::SerializationError)?;
1872        let len = writer.position();
1873
1874        // Send reply (now we can borrow self mutably again)
1875        self.send_reply(sequence_number, &reply_buf[..len])?;
1876        Ok(true)
1877    }
1878
1879    /// Handle a service request where the handler returns `Box<S::Reply>`
1880    ///
1881    /// Identical to `handle_request` but the handler returns a heap-allocated reply.
1882    /// This is needed for services with large response types (e.g., parameter services
1883    /// where `Vec<ParameterValue, 64>` is ~1MB+) that would overflow the stack.
1884    #[cfg(feature = "alloc")]
1885    fn handle_request_boxed<S: RosService>(
1886        &mut self,
1887        req_buf: &mut [u8],
1888        reply_buf: &mut [u8],
1889        handler: impl FnOnce(&S::Request) -> alloc::boxed::Box<S::Reply>,
1890    ) -> Result<bool, Self::Error>
1891    where
1892        Self::Error: From<TransportError>,
1893    {
1894        use nros_core::{CdrReader, CdrWriter};
1895
1896        let buf_start = req_buf.as_ptr() as usize;
1897        let (data_offset, data_len, sequence_number) = match self.try_recv_request(req_buf)? {
1898            Some(request) => {
1899                let offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
1900                (offset, request.data.len(), request.sequence_number)
1901            }
1902            None => return Ok(false),
1903        };
1904
1905        let mut reader = CdrReader::new_with_header(&req_buf[data_offset..data_offset + data_len])
1906            .map_err(|_| TransportError::DeserializationError)?;
1907        let req = S::Request::deserialize(&mut reader)
1908            .map_err(|_| TransportError::DeserializationError)?;
1909
1910        let reply = handler(&req);
1911
1912        let mut writer =
1913            CdrWriter::new_with_header(reply_buf).map_err(|_| TransportError::BufferTooSmall)?;
1914        reply
1915            .serialize(&mut writer)
1916            .map_err(|_| TransportError::SerializationError)?;
1917        let len = writer.position();
1918
1919        self.send_reply(sequence_number, &reply_buf[..len])?;
1920        Ok(true)
1921    }
1922}
1923
1924/// Service client trait for sending requests.
1925///
1926/// # Threading
1927///
1928/// `&mut self` on every method — the client is single-owner. For
1929/// fan-out request patterns, create one client per worker thread.
1930///
1931/// # Calling pattern
1932///
1933/// All in-tree backends route blocking waits through the executor:
1934///
1935/// 1. `send_request_raw(buf)` — non-blocking; returns once the
1936///    request is queued for transmission.
1937/// 2. The executor's `drive_io` runs.
1938/// 3. `try_recv_reply_raw(buf)` — non-blocking; returns
1939///    `Ok(Some(len))` when the reply is back.
1940///
1941/// The deprecated [`call_raw`](Self::call_raw) blocking path is
1942/// kept for backwards compatibility but should not be called.
1943pub trait ServiceClientTrait {
1944    /// Error type for service operations
1945    type Error;
1946
1947    /// Send a service request and wait for reply (blocking).
1948    ///
1949    /// **Deprecated — do not call.** The default body returns `Timeout`
1950    /// immediately without polling. Use `Client::call` →
1951    /// `Promise::wait(executor, timeout_ms)` which lets the executor
1952    /// drive I/O while waiting instead of busy-looping on
1953    /// `try_recv_reply_raw` with no sleep (which starves the transport
1954    /// on FreeRTOS / Zephyr single-threaded schedulers).
1955    ///
1956    /// Backends that still need an internal blocking path should
1957    /// override this with a real sleep-between-polls implementation,
1958    /// but all in-tree backends (zenoh, XRCE) route blocking waits
1959    /// through the executor.
1960    #[deprecated(note = "use Client::call → Promise::wait with an executor instead")]
1961    fn call_raw(&mut self, request: &[u8], _reply_buf: &mut [u8]) -> Result<usize, Self::Error>
1962    where
1963        Self::Error: From<TransportError>,
1964    {
1965        let _ = request;
1966        Err(TransportError::Timeout.into())
1967    }
1968
1969    /// Send a service request without waiting for a reply (non-blocking).
1970    ///
1971    /// The caller must subsequently poll [`try_recv_reply_raw`](Self::try_recv_reply_raw)
1972    /// to retrieve the reply.
1973    fn send_request_raw(&mut self, request: &[u8]) -> Result<(), Self::Error>;
1974
1975    /// Poll for a reply to the most recently sent request (non-blocking).
1976    ///
1977    /// Returns `Ok(Some(len))` when a reply has arrived, `Ok(None)` if not yet
1978    /// available, or `Err` on failure.
1979    fn try_recv_reply_raw(&mut self, reply_buf: &mut [u8]) -> Result<Option<usize>, Self::Error>;
1980
1981    /// Send a typed service request without waiting for a reply (non-blocking).
1982    ///
1983    /// Serializes the request into `req_buf` and calls [`send_request_raw`](Self::send_request_raw).
1984    fn send_request<S: RosService>(
1985        &mut self,
1986        request: &S::Request,
1987        req_buf: &mut [u8],
1988    ) -> Result<(), Self::Error>
1989    where
1990        Self::Error: From<TransportError>,
1991    {
1992        use nros_core::CdrWriter;
1993
1994        let mut writer =
1995            CdrWriter::new_with_header(req_buf).map_err(|_| TransportError::BufferTooSmall)?;
1996        request
1997            .serialize(&mut writer)
1998            .map_err(|_| TransportError::SerializationError)?;
1999        let req_len = writer.position();
2000
2001        self.send_request_raw(&req_buf[..req_len])
2002    }
2003
2004    /// Poll for a typed reply to the most recently sent request (non-blocking).
2005    ///
2006    /// Calls [`try_recv_reply_raw`](Self::try_recv_reply_raw) and deserializes if available.
2007    fn try_recv_reply<S: RosService>(
2008        &mut self,
2009        reply_buf: &mut [u8],
2010    ) -> Result<Option<S::Reply>, Self::Error>
2011    where
2012        Self::Error: From<TransportError>,
2013    {
2014        use nros_core::CdrReader;
2015
2016        match self.try_recv_reply_raw(reply_buf)? {
2017            Some(len) => {
2018                let mut reader = CdrReader::new_with_header(&reply_buf[..len])
2019                    .map_err(|_| TransportError::DeserializationError)?;
2020                let reply = S::Reply::deserialize(&mut reader)
2021                    .map_err(|_| TransportError::DeserializationError)?;
2022                Ok(Some(reply))
2023            }
2024            None => Ok(None),
2025        }
2026    }
2027
2028    /// Register an async waker to be notified when a reply arrives.
2029    ///
2030    /// Called from `Future::poll()` implementations to store the waker.
2031    /// The transport backend calls `waker.wake()` from its reply callback
2032    /// when a response is available, enabling event-driven async without
2033    /// busy-polling.
2034    ///
2035    /// Default: no-op (backends that don't support waking simply ignore this).
2036    fn register_waker(&self, _waker: &core::task::Waker) {}
2037
2038    /// Begin a server-discovery query on this client (non-blocking).
2039    ///
2040    /// Models `rclcpp::ClientBase::wait_for_service` machinery: the backend
2041    /// fires off a discovery probe (typically a Zenoh liveliness query
2042    /// against the matching server's wildcarded liveliness keyexpr) and
2043    /// the caller polls [`poll_server_discovery`](Self::poll_server_discovery)
2044    /// to collect the result.
2045    ///
2046    /// Default impl: no-op success. Backends without a discovery channel
2047    /// (or those that always assume the server is reachable) can leave
2048    /// this default and have `poll_server_discovery` return
2049    /// `Ok(Some(true))` immediately.
2050    fn start_server_discovery(&mut self, _timeout_ms: u32) -> Result<(), Self::Error> {
2051        Ok(())
2052    }
2053
2054    /// Poll an in-flight server-discovery query.
2055    ///
2056    /// - `Ok(Some(true))` — at least one matching server has reported
2057    ///   back; safe to send the first request.
2058    /// - `Ok(Some(false))` — discovery query finished without finding
2059    ///   any matching server (timeout / no-replies).
2060    /// - `Ok(None)` — query still in flight.
2061    /// - `Err(_)` — transport-level failure unrelated to server presence.
2062    ///
2063    /// Default impl: returns `Ok(Some(true))` (i.e., "server is always
2064    /// assumed reachable"). The Zenoh backend overrides this with a
2065    /// liveliness-token check.
2066    fn poll_server_discovery(&mut self) -> Result<Option<bool>, Self::Error> {
2067        Ok(Some(true))
2068    }
2069
2070    /// Synchronous, non-blocking check of whether a matching server is
2071    /// currently visible.
2072    ///
2073    /// Mirrors `rclcpp::ClientBase::service_is_ready`. Backends that lack
2074    /// discovery should keep the default `true` so existing call sites
2075    /// don't regress.
2076    ///
2077    /// Default impl: always `true`.
2078    fn is_server_ready(&self) -> bool {
2079        true
2080    }
2081
2082    /// Phase 124.C.1 — graph-aware server-availability probe.
2083    ///
2084    /// Returns `Ok(true)` if at least one matching server has been
2085    /// discovered, `Ok(false)` if none yet, or `Err(_)` if the
2086    /// backend cannot answer (e.g. XRCE — micro-XRCE-DDS-Client has
2087    /// no participant enumeration). Distinct from
2088    /// [`is_server_ready`](Self::is_server_ready), which collapses
2089    /// "don't know" and "no server" into the same `false` answer.
2090    ///
2091    /// User-facing surface: `Client<S>::server_available()` in Rust,
2092    /// `nros_client_server_available()` in C/C++. Clients use this
2093    /// to gate the first `call_raw` so a startup-ordering race
2094    /// (client opens before server's discovery announcement lands)
2095    /// doesn't surface as a request-side timeout.
2096    ///
2097    /// Default impl: `Err(TransportError::Unsupported)` — backends
2098    /// that support graph introspection (zenoh queryable interest,
2099    /// DDS built-in topic readers) opt in by overriding.
2100    fn server_available(&self) -> Result<bool, Self::Error>
2101    where
2102        Self::Error: From<TransportError>,
2103    {
2104        Err(TransportError::Unsupported.into())
2105    }
2106
2107    /// Call a service with typed messages (blocking).
2108    ///
2109    /// **Deprecated — do not call.** The default body returns `Timeout`
2110    /// immediately without polling. Use `Client::call` on the executor
2111    /// instead, which drives I/O while waiting. See
2112    /// [`call_raw`](Self::call_raw) for the same reasoning.
2113    #[deprecated(note = "use Client::call → Promise::wait with an executor instead")]
2114    fn call<S: RosService>(
2115        &mut self,
2116        request: &S::Request,
2117        req_buf: &mut [u8],
2118        _reply_buf: &mut [u8],
2119    ) -> Result<S::Reply, Self::Error>
2120    where
2121        Self::Error: From<TransportError>,
2122    {
2123        use nros_core::CdrWriter;
2124
2125        // Serialize request so the error surface matches the old impl
2126        // for the "bad request" path; but skip the receive busy-loop.
2127        let mut writer =
2128            CdrWriter::new_with_header(req_buf).map_err(|_| TransportError::BufferTooSmall)?;
2129        request
2130            .serialize(&mut writer)
2131            .map_err(|_| TransportError::SerializationError)?;
2132        Err(TransportError::Timeout.into())
2133    }
2134}
2135
2136/// Transport backend trait (legacy).
2137///
2138/// Use [`Rmw`] for new code. This trait is retained for backward compatibility
2139/// with existing code that uses [`TransportConfig`] directly.
2140pub trait Transport {
2141    /// Error type for this transport
2142    type Error;
2143    /// Session type for this transport
2144    type Session: Session;
2145
2146    /// Open a new session with the given configuration
2147    fn open(config: &TransportConfig) -> Result<Self::Session, Self::Error>;
2148}
2149
2150/// Factory trait for compile-time middleware selection.
2151///
2152/// Embedded crates select a backend via feature flag:
2153/// ```rust,ignore
2154/// #[cfg(feature = "rmw-cffi")]
2155/// type DefaultRmw = nros_rmw_cffi::CffiRmw;
2156/// ```
2157///
2158/// Each backend provides its own `Rmw` implementation that bridges
2159/// from the middleware-agnostic [`RmwConfig`] to backend-specific
2160/// initialization.
2161///
2162/// Phase 84.E2: `open` consumes `self`. Backends carry their own
2163/// configuration (agent addresses, serial ports, TLS CA slots)
2164/// inside the factory value and hand that over to the session at
2165/// `open` time. All in-repo backends also implement
2166/// [`Default`]; most callers spell this as
2167/// `BackendRmw::default().open(&config)`.
2168pub trait Rmw {
2169    /// Session type returned by [`open`](Rmw::open)
2170    type Session: Session;
2171    /// Error type for session creation
2172    type Error: core::fmt::Debug;
2173
2174    /// Open a new middleware session with the given configuration.
2175    ///
2176    /// The backend maps [`RmwConfig`] fields to its own connection
2177    /// parameters (e.g., zenoh locator and session mode, XRCE-DDS
2178    /// agent address). Any backend-specific pre-open state stored
2179    /// on `self` (e.g. configured agent IP / port) is moved into the
2180    /// returned `Session`.
2181    fn open(self, config: &RmwConfig) -> Result<Self::Session, Self::Error>;
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186    use super::*;
2187
2188    #[test]
2189    fn test_topic_info() {
2190        let topic = TopicInfo::new("/chatter", "std_msgs::msg::dds_::String_", "abc123");
2191        assert_eq!(topic.name, "/chatter");
2192        assert_eq!(topic.domain_id, 0);
2193    }
2194
2195    #[test]
2196    fn qos_apply_overrides_matches_topic_and_role() {
2197        // Default is Reliable / Volatile / KeepLast(10).
2198        static OVERRIDES: &[QosOverride] = &[
2199            QosOverride {
2200                topic: "/chatter",
2201                role: QosOverrideRole::Publisher,
2202                value: QosOverrideValue::Reliability(QosReliabilityPolicy::BestEffort),
2203            },
2204            QosOverride {
2205                topic: "/chatter",
2206                role: QosOverrideRole::Publisher,
2207                value: QosOverrideValue::Depth(5),
2208            },
2209            QosOverride {
2210                topic: "/scan",
2211                role: QosOverrideRole::Subscription,
2212                value: QosOverrideValue::Durability(QosDurabilityPolicy::TransientLocal),
2213            },
2214        ];
2215
2216        // Matching topic + publisher role → reliability + depth applied.
2217        let pub_qos = QosSettings::default().apply_overrides(
2218            "/chatter",
2219            QosOverrideRole::Publisher,
2220            OVERRIDES,
2221        );
2222        assert_eq!(pub_qos.reliability, QosReliabilityPolicy::BestEffort);
2223        assert_eq!(pub_qos.depth, 5);
2224        assert_eq!(pub_qos.durability, QosDurabilityPolicy::Volatile); // untouched
2225
2226        // Same topic but subscription role → publisher overrides DON'T apply;
2227        // the /scan override is for a different topic → also no change.
2228        let sub_qos = QosSettings::default().apply_overrides(
2229            "/chatter",
2230            QosOverrideRole::Subscription,
2231            OVERRIDES,
2232        );
2233        assert_eq!(sub_qos, QosSettings::default());
2234
2235        // The /scan subscription override applies only to /scan+subscription.
2236        let scan_qos = QosSettings::default().apply_overrides(
2237            "/scan",
2238            QosOverrideRole::Subscription,
2239            OVERRIDES,
2240        );
2241        assert_eq!(scan_qos.durability, QosDurabilityPolicy::TransientLocal);
2242
2243        // Empty table → identity (the zero-override fast path).
2244        assert_eq!(
2245            QosSettings::default().apply_overrides("/x", QosOverrideRole::Publisher, &[]),
2246            QosSettings::default()
2247        );
2248    }
2249
2250    #[test]
2251    fn test_qos_defaults() {
2252        let qos = QosSettings::default();
2253        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2254    }
2255
2256    #[test]
2257    fn test_action_info() {
2258        let action = ActionInfo::new(
2259            "/fibonacci",
2260            "example_interfaces::action::dds_::Fibonacci_",
2261            "abc123",
2262        );
2263        assert_eq!(action.name, "/fibonacci");
2264        assert_eq!(action.domain_id, 0);
2265    }
2266
2267    #[test]
2268    fn test_action_info_with_domain() {
2269        let action = ActionInfo::new(
2270            "/fibonacci",
2271            "example_interfaces::action::dds_::Fibonacci_",
2272            "abc123",
2273        )
2274        .with_domain(42);
2275        assert_eq!(action.domain_id, 42);
2276    }
2277
2278    #[test]
2279    fn test_action_send_goal_key() {
2280        let action = ActionInfo::new(
2281            "/fibonacci",
2282            "example_interfaces::action::dds_::Fibonacci_",
2283            "abc123",
2284        )
2285        .with_domain(0);
2286
2287        let key: heapless::String<256> = action.send_goal_key();
2288        // ActionInfo returns the sub-entity name with leading slash for ROS 2 compatibility
2289        assert_eq!(key.as_str(), "/fibonacci/_action/send_goal");
2290    }
2291
2292    #[test]
2293    fn test_action_feedback_key() {
2294        let action = ActionInfo::new(
2295            "/fibonacci",
2296            "example_interfaces::action::dds_::Fibonacci_",
2297            "abc123",
2298        )
2299        .with_domain(0);
2300
2301        let key: heapless::String<256> = action.feedback_key();
2302        assert_eq!(key.as_str(), "/fibonacci/_action/feedback");
2303    }
2304
2305    #[test]
2306    fn test_action_all_sub_names() {
2307        let action = ActionInfo::new(
2308            "/fibonacci",
2309            "example_interfaces::action::dds_::Fibonacci_",
2310            "abc123",
2311        )
2312        .with_domain(0);
2313
2314        let cancel: heapless::String<256> = action.cancel_goal_key();
2315        assert_eq!(cancel.as_str(), "/fibonacci/_action/cancel_goal");
2316
2317        let result: heapless::String<256> = action.get_result_key();
2318        assert_eq!(result.as_str(), "/fibonacci/_action/get_result");
2319
2320        let status: heapless::String<256> = action.status_key();
2321        assert_eq!(status.as_str(), "/fibonacci/_action/status");
2322    }
2323
2324    // --- QoS Profile Tests ---
2325
2326    #[test]
2327    fn test_qos_profile_sensor_data() {
2328        let qos = QosSettings::QOS_PROFILE_SENSOR_DATA;
2329        assert_eq!(qos.reliability, QosReliabilityPolicy::BestEffort);
2330        assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2331        assert_eq!(qos.history, QosHistoryPolicy::KeepLast);
2332        assert_eq!(qos.depth, 5);
2333    }
2334
2335    #[test]
2336    fn test_qos_profile_default() {
2337        let qos = QosSettings::QOS_PROFILE_DEFAULT;
2338        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2339        assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2340        assert_eq!(qos.depth, 10);
2341    }
2342
2343    #[test]
2344    fn test_qos_profile_services_default() {
2345        let qos = QosSettings::QOS_PROFILE_SERVICES_DEFAULT;
2346        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2347        assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2348    }
2349
2350    #[test]
2351    fn services_default_validates_and_rejects_missing_policy() {
2352        // Phase 193.5 — the service-create path (node `create_service*_sized` +
2353        // the typed-arena `register_service*_on`) runs `validate_against` on the
2354        // caller's profile, exactly like pub/sub. A backend advertising the
2355        // profile's required policies admits it; dropping any required bit (here
2356        // RELIABILITY) rejects it with `IncompatibleQos` — no silent downgrade.
2357        let qos = QosSettings::services_default();
2358        let required = qos.required_policies();
2359        assert!(qos.validate_against(required).is_ok());
2360        assert!(qos.validate_against(QosPolicyMask(u32::MAX)).is_ok());
2361        let missing = QosPolicyMask(required.0 & !QosPolicyMask::RELIABILITY.0);
2362        assert_eq!(
2363            qos.validate_against(missing),
2364            Err(TransportError::IncompatibleQos)
2365        );
2366    }
2367
2368    #[test]
2369    fn test_qos_profile_parameters() {
2370        let qos = QosSettings::QOS_PROFILE_PARAMETERS;
2371        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2372        assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2373        assert_eq!(qos.depth, 1000);
2374    }
2375
2376    #[test]
2377    fn test_qos_profile_clock() {
2378        let qos = QosSettings::QOS_PROFILE_CLOCK;
2379        assert_eq!(qos.reliability, QosReliabilityPolicy::BestEffort);
2380        assert_eq!(qos.depth, 1);
2381    }
2382
2383    #[test]
2384    fn test_qos_profile_parameter_events() {
2385        let qos = QosSettings::QOS_PROFILE_PARAMETER_EVENTS;
2386        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2387        assert_eq!(qos.history, QosHistoryPolicy::KeepAll);
2388    }
2389
2390    #[test]
2391    fn test_qos_profile_action_status() {
2392        let qos = QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT;
2393        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2394        assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2395        assert_eq!(qos.depth, 1);
2396    }
2397
2398    #[test]
2399    fn test_qos_static_constructors() {
2400        assert_eq!(
2401            QosSettings::topics_default(),
2402            QosSettings::QOS_PROFILE_DEFAULT
2403        );
2404        assert_eq!(
2405            QosSettings::sensor_data_default(),
2406            QosSettings::QOS_PROFILE_SENSOR_DATA
2407        );
2408        assert_eq!(
2409            QosSettings::services_default(),
2410            QosSettings::QOS_PROFILE_SERVICES_DEFAULT
2411        );
2412        assert_eq!(
2413            QosSettings::parameters_default(),
2414            QosSettings::QOS_PROFILE_PARAMETERS
2415        );
2416        assert_eq!(
2417            QosSettings::action_status_default(),
2418            QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT
2419        );
2420    }
2421
2422    #[test]
2423    fn test_qos_builder_explicit_setters() {
2424        let qos = QosSettings::new()
2425            .reliability(QosReliabilityPolicy::Reliable)
2426            .durability(QosDurabilityPolicy::TransientLocal)
2427            .history(QosHistoryPolicy::KeepAll)
2428            .depth(100);
2429
2430        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2431        assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2432        assert_eq!(qos.history, QosHistoryPolicy::KeepAll);
2433        assert_eq!(qos.depth, 100);
2434    }
2435
2436    #[test]
2437    fn test_qos_builder_chaining() {
2438        // Test that builder methods can be chained in any order
2439        let qos = QosSettings::sensor_data_default()
2440            .reliable()
2441            .transient_local()
2442            .keep_last(20);
2443
2444        assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2445        assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2446        assert_eq!(qos.history, QosHistoryPolicy::KeepLast);
2447        assert_eq!(qos.depth, 20);
2448    }
2449
2450    #[test]
2451    fn test_qos_eq_impl() {
2452        // Verify that PartialEq works correctly via derive on QosSettings
2453        let qos1 = QosSettings::QOS_PROFILE_DEFAULT;
2454        let qos2 = QosSettings::topics_default();
2455        // Both should have same values - verify field by field
2456        assert_eq!(qos1.reliability, qos2.reliability);
2457        assert_eq!(qos1.durability, qos2.durability);
2458        assert_eq!(qos1.history, qos2.history);
2459        assert_eq!(qos1.depth, qos2.depth);
2460    }
2461
2462    // --- Locator validation tests ---
2463
2464    #[test]
2465    fn test_locator_protocol_tcp() {
2466        assert_eq!(locator_protocol("tcp/127.0.0.1:7447"), LocatorProtocol::Tcp);
2467    }
2468
2469    #[test]
2470    fn test_locator_protocol_serial() {
2471        assert_eq!(
2472            locator_protocol("serial//dev/ttyUSB0#baudrate=115200"),
2473            LocatorProtocol::Serial
2474        );
2475    }
2476
2477    #[test]
2478    fn test_locator_protocol_unknown() {
2479        assert_eq!(locator_protocol(""), LocatorProtocol::Unknown);
2480        assert_eq!(locator_protocol("http://foo"), LocatorProtocol::Unknown);
2481        assert_eq!(locator_protocol("tls/host:port"), LocatorProtocol::Unknown);
2482    }
2483
2484    #[test]
2485    fn test_locator_protocol_udp() {
2486        assert_eq!(locator_protocol("udp/127.0.0.1:7447"), LocatorProtocol::Udp);
2487        assert_eq!(
2488            locator_protocol("udp/192.168.1.50:2019"),
2489            LocatorProtocol::Udp
2490        );
2491    }
2492
2493    #[test]
2494    fn test_validate_tcp_locator_ok() {
2495        assert!(validate_locator("tcp/127.0.0.1:7447").is_ok());
2496        assert!(validate_locator("tcp/192.168.1.1:7447").is_ok());
2497    }
2498
2499    #[test]
2500    fn test_validate_tcp_locator_missing_port() {
2501        assert!(validate_locator("tcp/127.0.0.1").is_err());
2502    }
2503
2504    #[test]
2505    fn test_validate_serial_locator_ok() {
2506        assert!(validate_locator("serial//dev/ttyUSB0#baudrate=115200").is_ok());
2507        assert!(validate_locator("serial//dev/ttyACM0#baudrate=9600").is_ok());
2508        assert!(validate_locator("serial/uart1#baudrate=921600").is_ok());
2509    }
2510
2511    #[test]
2512    fn test_validate_serial_locator_empty_device() {
2513        assert!(validate_locator("serial/").is_err());
2514    }
2515
2516    #[test]
2517    fn test_validate_serial_locator_missing_baudrate() {
2518        assert!(validate_locator("serial//dev/ttyUSB0").is_err());
2519    }
2520
2521    #[test]
2522    fn test_validate_serial_locator_invalid_baudrate() {
2523        assert!(validate_locator("serial//dev/ttyUSB0#baudrate=abc").is_err());
2524    }
2525
2526    #[test]
2527    fn test_validate_unknown_protocol() {
2528        assert!(validate_locator("http://foo").is_err());
2529        assert!(validate_locator("tls/host:port").is_err());
2530    }
2531
2532    #[test]
2533    fn test_validate_udp_locator_ok() {
2534        assert!(validate_locator("udp/127.0.0.1:7447").is_ok());
2535        assert!(validate_locator("udp/192.168.1.50:2019").is_ok());
2536    }
2537
2538    #[test]
2539    fn test_validate_udp_locator_missing_port() {
2540        assert!(validate_locator("udp/127.0.0.1").is_err());
2541    }
2542
2543    // Phase 233.2 — the PX4 companion QoS profile must be BEST_EFFORT +
2544    // TRANSIENT_LOCAL + KEEP_LAST so it matches PX4's uxrce_dds_client endpoints.
2545    #[test]
2546    fn px4_qos_profile_matches_uxrce_dds_client() {
2547        let q = QosSettings::px4();
2548        assert_eq!(q.reliability, QosReliabilityPolicy::BestEffort);
2549        // VOLATILE — PX4's /fmu/out writers are volatile; a TRANSIENT_LOCAL
2550        // reader silently fails to match (verified against real PX4 SITL).
2551        assert_eq!(q.durability, QosDurabilityPolicy::Volatile);
2552        assert_eq!(q.history, QosHistoryPolicy::KeepLast);
2553        assert_eq!(q, QosSettings::QOS_PROFILE_PX4);
2554        // Depth is tunable via the builder without losing the PX4 policies.
2555        let deep = QosSettings::px4().keep_last(5);
2556        assert_eq!(deep.depth, 5);
2557        assert_eq!(deep.reliability, QosReliabilityPolicy::BestEffort);
2558        assert_eq!(deep.durability, QosDurabilityPolicy::Volatile);
2559    }
2560
2561    // --- RmwConfig Tests ---
2562
2563    #[test]
2564    fn test_rmw_config_default() {
2565        let config = RmwConfig::default();
2566        assert_eq!(config.locator, "tcp/127.0.0.1:7447");
2567        assert_eq!(config.mode, SessionMode::Client);
2568        assert_eq!(config.domain_id, 0);
2569        assert_eq!(config.node_name, "node");
2570        assert_eq!(config.namespace, "");
2571    }
2572
2573    #[test]
2574    fn test_rmw_config_custom() {
2575        let config = RmwConfig {
2576            locator: "tcp/192.168.1.1:7447",
2577            mode: SessionMode::Peer,
2578            domain_id: 42,
2579            node_name: "talker",
2580            namespace: "/ns1",
2581            properties: &[("agent_port", "2019")],
2582        };
2583        assert_eq!(config.locator, "tcp/192.168.1.1:7447");
2584        assert_eq!(config.mode, SessionMode::Peer);
2585        assert_eq!(config.domain_id, 42);
2586        assert_eq!(config.node_name, "talker");
2587        assert_eq!(config.namespace, "/ns1");
2588        assert_eq!(config.properties.len(), 1);
2589        assert_eq!(config.properties[0].0, "agent_port");
2590    }
2591
2592    #[test]
2593    fn test_rmw_config_is_copy() {
2594        let config = RmwConfig::default();
2595        let config2 = config; // Copy
2596        assert_eq!(config.locator, config2.locator);
2597        assert_eq!(config.domain_id, config2.domain_id);
2598    }
2599
2600    #[test]
2601    fn test_rmw_config_clone() {
2602        let config = RmwConfig::default();
2603        let cloned = RmwConfig { ..config };
2604        assert_eq!(cloned.locator, config.locator);
2605        assert_eq!(cloned.node_name, config.node_name);
2606    }
2607}