nros_rmw/traits.rs
1//! Transport abstraction traits.
2//!
3//! Defines the backend-agnostic interface that transport implementations
4//! (zenoh-pico, XRCE-DDS) must satisfy. The core trait hierarchy is:
5//!
6//! - [`Session`] — connection lifecycle and handle creation
7//! - [`Publisher`] / [`Subscriber`] — pub/sub data transport
8//! - [`ServiceServerTrait`] / [`ServiceClientTrait`] — request/reply
9//! - [`Rmw`] — top-level factory that creates sessions
10
11use nros_core::{Deserialize, RosMessage, RosService, Serialize};
12
13/// Topic information for pub/sub
14#[derive(Debug, Clone)]
15pub struct TopicInfo<'a> {
16 /// Topic name (e.g., "/chatter")
17 pub name: &'a str,
18 /// ROS type name (e.g., "std_msgs::msg::dds_::String_")
19 pub type_name: &'a str,
20 /// Type hash for compatibility checking
21 pub type_hash: &'a str,
22 /// Domain ID (default: 0)
23 pub domain_id: u32,
24 /// Node name for liveliness token generation.
25 /// `None` means no node association — no liveliness token will be declared.
26 pub node_name: Option<&'a str>,
27 /// Node namespace for liveliness token generation (default: "/").
28 /// In ROS 2, "/" is the root namespace and the standard default.
29 pub namespace: &'a str,
30 /// Phase 231 (RFC-0038) — receive-buffer size hint, bytes. The executor sets
31 /// this from the subscription's `RX_BUF`; a backend may use it to route the
32 /// subscription to a size-class receive buffer (zenoh-pico: small vs large).
33 /// `0` = unset (backend picks its default). Ignored by backends that don't
34 /// size-class their receive storage.
35 pub rx_buffer_hint: usize,
36}
37
38impl<'a> TopicInfo<'a> {
39 /// Create new topic info
40 pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
41 Self {
42 name,
43 type_name,
44 type_hash,
45 domain_id: 0,
46 node_name: None,
47 namespace: "/",
48 rx_buffer_hint: 0,
49 }
50 }
51
52 /// Create topic info with custom domain ID
53 pub const fn with_domain(mut self, domain_id: u32) -> Self {
54 self.domain_id = domain_id;
55 self
56 }
57
58 /// Phase 231 (RFC-0038) — set the receive-buffer size hint (bytes) used by
59 /// size-classing backends (zenoh-pico) to route the subscription's receive
60 /// buffer to the small or large class.
61 pub const fn with_rx_buffer_hint(mut self, hint: usize) -> Self {
62 self.rx_buffer_hint = hint;
63 self
64 }
65
66 /// Set the node name for liveliness token generation
67 pub const fn with_node_name(mut self, node_name: &'a str) -> Self {
68 self.node_name = Some(node_name);
69 self
70 }
71
72 /// Set the node namespace for liveliness token generation
73 pub const fn with_namespace(mut self, namespace: &'a str) -> Self {
74 self.namespace = namespace;
75 self
76 }
77}
78
79/// Service information for service client/server
80#[derive(Debug, Clone)]
81pub struct ServiceInfo<'a> {
82 /// Service name (e.g., "/add_two_ints")
83 pub name: &'a str,
84 /// ROS service type name (e.g., "example_interfaces::srv::dds_::AddTwoInts_")
85 pub type_name: &'a str,
86 /// Type hash for compatibility checking
87 pub type_hash: &'a str,
88 /// Domain ID (default: 0)
89 pub domain_id: u32,
90 /// Node name for liveliness token generation.
91 /// `None` means no node association — no liveliness token will be declared.
92 pub node_name: Option<&'a str>,
93 /// Node namespace for liveliness token generation (default: "/").
94 /// In ROS 2, "/" is the root namespace and the standard default.
95 pub namespace: &'a str,
96}
97
98/// Action information for action client/server
99///
100/// Actions in ROS 2 use 5 communication channels:
101/// - `send_goal` service: `<action_name>/_action/send_goal`
102/// - `cancel_goal` service: `<action_name>/_action/cancel_goal`
103/// - `get_result` service: `<action_name>/_action/get_result`
104/// - `feedback` topic: `<action_name>/_action/feedback`
105/// - `status` topic: `<action_name>/_action/status`
106#[derive(Debug, Clone)]
107pub struct ActionInfo<'a> {
108 /// Action name (e.g., "/fibonacci")
109 pub name: &'a str,
110 /// ROS action type name (e.g., "example_interfaces::action::dds_::Fibonacci_")
111 pub type_name: &'a str,
112 /// Type hash for compatibility checking
113 pub type_hash: &'a str,
114 /// Domain ID (default: 0)
115 pub domain_id: u32,
116}
117
118impl<'a> ActionInfo<'a> {
119 /// Create new action info
120 pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
121 Self {
122 name,
123 type_name,
124 type_hash,
125 domain_id: 0,
126 }
127 }
128
129 /// Create action info with custom domain ID
130 pub const fn with_domain(mut self, domain_id: u32) -> Self {
131 self.domain_id = domain_id;
132 self
133 }
134
135 /// Generate the send_goal service name
136 /// Returns: `<action>/_action/send_goal`
137 pub fn send_goal_key<const N: usize>(&self) -> heapless::String<N> {
138 self.sub_name::<N>("send_goal")
139 }
140
141 /// Generate the cancel_goal service name
142 /// Returns: `<action>/_action/cancel_goal`
143 pub fn cancel_goal_key<const N: usize>(&self) -> heapless::String<N> {
144 self.sub_name::<N>("cancel_goal")
145 }
146
147 /// Generate the get_result service name
148 /// Returns: `<action>/_action/get_result`
149 pub fn get_result_key<const N: usize>(&self) -> heapless::String<N> {
150 self.sub_name::<N>("get_result")
151 }
152
153 /// Generate the feedback topic name
154 /// Returns: `<action>/_action/feedback`
155 pub fn feedback_key<const N: usize>(&self) -> heapless::String<N> {
156 self.sub_name::<N>("feedback")
157 }
158
159 /// Generate the status topic name
160 /// Returns: `<action>/_action/status`
161 pub fn status_key<const N: usize>(&self) -> heapless::String<N> {
162 self.sub_name::<N>("status")
163 }
164
165 /// Generate a sub-entity name for an action component
166 /// Returns: `<action>/_action/<suffix>` (e.g., `fibonacci/_action/send_goal`)
167 ///
168 /// The caller is responsible for constructing the full key expression
169 /// by wrapping this name in a `ServiceInfo` or `TopicInfo` with the
170 /// correct sub-service/sub-topic type name.
171 fn sub_name<const N: usize>(&self, suffix: &str) -> heapless::String<N> {
172 let mut name = heapless::String::new();
173 let action_stripped = self.name.trim_matches('/');
174 let _ = core::fmt::write(
175 &mut name,
176 format_args!("/{}/_action/{}", action_stripped, suffix),
177 );
178 name
179 }
180}
181
182impl<'a> ServiceInfo<'a> {
183 /// Create new service info
184 pub const fn new(name: &'a str, type_name: &'a str, type_hash: &'a str) -> Self {
185 Self {
186 name,
187 type_name,
188 type_hash,
189 domain_id: 0,
190 node_name: None,
191 namespace: "/",
192 }
193 }
194
195 /// Create service info with custom domain ID
196 pub const fn with_domain(mut self, domain_id: u32) -> Self {
197 self.domain_id = domain_id;
198 self
199 }
200
201 /// Set the node name for liveliness token generation
202 pub const fn with_node_name(mut self, node_name: &'a str) -> Self {
203 self.node_name = Some(node_name);
204 self
205 }
206
207 /// Set the node namespace for liveliness token generation
208 pub const fn with_namespace(mut self, namespace: &'a str) -> Self {
209 self.namespace = namespace;
210 self
211 }
212}
213
214/// Transport error types.
215///
216/// No longer `Copy` — the `Backend` / `BackendDynamic` variants carry a
217/// string diagnostic, which can't be `Copy`. Rust callers that used to
218/// copy a `TransportError` value repeatedly now need `.clone()` or
219/// `ref` in match arms. C/C++ callers are unaffected — both map
220/// `TransportError` to integer codes (`nros_ret_t` / `ErrorCode`)
221/// before crossing the FFI boundary.
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub enum TransportError {
224 /// Failed to connect to transport
225 ConnectionFailed,
226 /// Connection was closed
227 Disconnected,
228 /// Failed to create publisher
229 PublisherCreationFailed,
230 /// Failed to create subscriber
231 SubscriberCreationFailed,
232 /// Failed to create service server
233 ServiceServerCreationFailed,
234 /// Failed to create service client
235 ServiceClientCreationFailed,
236 /// Failed to publish message
237 PublishFailed,
238 /// Failed to send service request
239 ServiceRequestFailed,
240 /// Failed to send service reply
241 ServiceReplyFailed,
242 /// Serialization error
243 SerializationError,
244 /// Deserialization error
245 DeserializationError,
246 /// Buffer too small
247 BufferTooSmall,
248 /// Incoming message exceeded the static buffer capacity
249 MessageTooLarge,
250 /// Timeout waiting for message
251 Timeout,
252 /// Invalid configuration
253 InvalidConfig,
254 /// Resource (slot, buffer, queue) momentarily unavailable. Retry.
255 /// Phase 99: returned by `try_loan` when arena slots are full and
256 /// by `try_borrow` when no message is ready (alternative to
257 /// `Ok(None)` for backends that prefer the error variant).
258 WouldBlock,
259 /// Requested allocation exceeds backend capacity. Phase 99:
260 /// `try_loan(len)` returns this when `len` > arena slot size.
261 TooLarge,
262 /// Failed to start background tasks
263 TaskStartFailed,
264 /// Failed to poll for incoming messages
265 PollFailed,
266 /// Failed to send keepalive
267 KeepaliveFailed,
268 /// Failed to send join message
269 JoinFailed,
270 /// Caller supplied a NULL pointer, an out-of-range value, or an
271 /// inconsistent argument combination. Phase 102.1.
272 InvalidArgument,
273 /// The backend does not implement this operation. Optional
274 /// callbacks return this; the runtime then falls back to a
275 /// default path. Phase 102.1.
276 Unsupported,
277 /// Memory allocation failed. Returned by backends on `std` /
278 /// `alloc`-equipped platforms when heap allocation fails.
279 /// Bare-metal backends generally do not produce this — they
280 /// preallocate at session-open time. Phase 102.1.
281 BadAlloc,
282 /// Publisher and subscriber QoS profiles do not match in a way
283 /// the backend cannot reconcile. Phase 102.1.
284 IncompatibleQos,
285 /// Topic, service, or action name failed validation. Phase 102.1.
286 TopicNameInvalid,
287 /// A request referenced a node that does not exist in this
288 /// session. Phase 102.1.
289 NodeNameNonExistent,
290 /// The backend does not support loaned messages on this entity,
291 /// or the loan slot is currently in use. Phase 102.1.
292 LoanNotSupported,
293 /// No data was available on a non-blocking receive. Distinct
294 /// from `Timeout`: fires immediately, not after a bounded wait.
295 /// Phase 102.1.
296 NoData,
297 /// Phase 115.A.2 — caller passed a versioned vtable struct
298 /// (e.g. `NrosTransportOps`) with an `abi_version` the runtime
299 /// doesn't know. Maps to `NROS_RMW_RET_INCOMPATIBLE_ABI` at
300 /// the C boundary.
301 IncompatibleAbi,
302 /// Backend-specific error with a `'static` diagnostic string.
303 ///
304 /// Useful for zenoh-pico / XRCE-DDS return codes that map to a
305 /// fixed set of known messages. `no_std`-compatible.
306 Backend(&'static str),
307 /// Backend-specific error with an owned diagnostic string.
308 ///
309 /// Available only with the `alloc` feature. Use this when the
310 /// diagnostic is formatted at runtime (e.g. from a C error code
311 /// plus a socket address).
312 #[cfg(feature = "alloc")]
313 BackendDynamic(alloc::string::String),
314}
315
316/// QoS history policy
317#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
318pub enum QosHistoryPolicy {
319 /// Keep last N messages (where N is defined in QosSettings)
320 #[default]
321 KeepLast,
322 /// Keep all messages (up to resource limits)
323 KeepAll,
324}
325
326/// QoS reliability policy
327#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
328pub enum QosReliabilityPolicy {
329 /// Reliable delivery (retransmit if needed).
330 ///
331 /// Default — matches ROS 2 `rmw_qos_profile_default` and the
332 /// `QosSettings::default()` / `QOS_PROFILE_DEFAULT` aggregates.
333 #[default]
334 Reliable,
335 /// Best-effort delivery (no retransmits)
336 BestEffort,
337}
338
339/// QoS durability policy
340#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
341pub enum QosDurabilityPolicy {
342 /// Messages are discarded when subscriber disconnects
343 #[default]
344 Volatile,
345 /// Messages are persisted for late-joining subscribers
346 TransientLocal,
347}
348
349/// QoS liveliness policy. Matches DDS `LIVELINESS` semantics.
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
351#[repr(u8)]
352pub enum QosLivelinessPolicy {
353 /// No liveliness assertion or tracking. Default for entities
354 /// that don't care about liveliness.
355 #[default]
356 None = 0,
357 /// Backend's keepalive task asserts liveliness automatically.
358 Automatic = 1,
359 /// Application calls `assert_liveliness()` per topic explicitly.
360 ManualByTopic = 2,
361 /// Application calls `assert_liveliness()` at the node level.
362 ManualByNode = 3,
363}
364
365/// Phase 211.H — which side of a topic a [`QosOverride`] targets.
366/// Mirrors the `<role>` segment of a ROS 2
367/// `qos_overrides.<topic>.<role>.<policy>` launch parameter.
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369pub enum QosOverrideRole {
370 /// `qos_overrides.<topic>.publisher.*`
371 Publisher,
372 /// `qos_overrides.<topic>.subscription.*`
373 Subscription,
374}
375
376/// Phase 211.H — a single policy value a [`QosOverride`] sets. A typed enum
377/// (not a string) so the codegen that bakes these from the plan catches an
378/// unknown policy / mistyped value at generation time rather than silently
379/// no-op-ing at runtime.
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381pub enum QosOverrideValue {
382 /// `.reliability` → Reliable / BestEffort.
383 Reliability(QosReliabilityPolicy),
384 /// `.durability` → Volatile / TransientLocal.
385 Durability(QosDurabilityPolicy),
386 /// `.history` → KeepLast / KeepAll.
387 History(QosHistoryPolicy),
388 /// `.depth` → KeepLast depth.
389 Depth(u32),
390}
391
392/// Phase 211.H — one per-topic QoS override, lowered from a ROS 2
393/// `qos_overrides.<topic>.<role>.<policy>` launch parameter by the planner and
394/// baked into a `&'static [QosOverride]` table by the entry codegen. The node
395/// folds the matching entries into the entity's [`QosSettings`] at
396/// `create_publisher` / `create_subscription` time (setup-time, single
397/// linear scan, no alloc), *before* the backend-compat `validate_against` —
398/// so an override the active RMW can't honour still errors loudly, never a
399/// silent downgrade.
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401pub struct QosOverride {
402 /// The resolved (remapped) topic name the override targets, e.g.
403 /// `"/chatter"`. Matched exactly against the entity's topic.
404 pub topic: &'static str,
405 /// Publisher or subscription side.
406 pub role: QosOverrideRole,
407 /// The policy + value to set.
408 pub value: QosOverrideValue,
409}
410
411/// Full DDS-shaped QoS profile. Matches the field set of upstream
412/// `rmw_qos_profile_t`.
413///
414/// Backends advertise per-policy support via
415/// [`Session::supported_qos_policies`]; entities created with a
416/// profile the active backend can't honour return
417/// [`TransportError::IncompatibleQos`] synchronously at create time
418/// — no silent downgrade.
419///
420/// Zero-valued time-window fields ("off") mean infinite — the policy
421/// is effectively disabled for the entity.
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub struct QosSettings {
424 /// History policy
425 pub history: QosHistoryPolicy,
426 /// Reliability policy
427 pub reliability: QosReliabilityPolicy,
428 /// Durability policy
429 pub durability: QosDurabilityPolicy,
430 /// Liveliness policy
431 pub liveliness_kind: QosLivelinessPolicy,
432 /// History depth (only used if history is KeepLast)
433 pub depth: u32,
434 /// Subscriber max-inter-arrival / publisher offered-rate, ms.
435 /// `0` = infinite (no deadline check).
436 pub deadline_ms: u32,
437 /// Sample expiry, ms. Subscribers filter samples older than this.
438 /// `0` = infinite (no expiry).
439 pub lifespan_ms: u32,
440 /// Liveliness lease, ms. `0` = infinite.
441 pub liveliness_lease_ms: u32,
442 /// If `true`, topic-name encoding skips the `/rt/` ROS prefix.
443 pub avoid_ros_namespace_conventions: bool,
444}
445
446impl Default for QosSettings {
447 fn default() -> Self {
448 Self::QOS_PROFILE_DEFAULT
449 }
450}
451
452impl QosSettings {
453 /// Phase 211.H — fold the plan's `qos_overrides` matching `topic` + `role`
454 /// into this profile, returning the overridden profile. Setup-time only
455 /// (called from `create_publisher`/`create_subscription`): a single linear
456 /// scan over the baked `&'static` table, no alloc, RT-safe. Later entries
457 /// win on a duplicate `(topic, role, policy)` (last-write), matching the
458 /// planner's sorted, de-conflicted emit. Non-matching entries are ignored,
459 /// so passing the whole node table to every entity is cheap + correct.
460 #[must_use]
461 pub fn apply_overrides(
462 mut self,
463 topic: &str,
464 role: QosOverrideRole,
465 overrides: &[QosOverride],
466 ) -> Self {
467 for ovr in overrides {
468 if ovr.topic == topic && ovr.role == role {
469 match ovr.value {
470 QosOverrideValue::Reliability(r) => self.reliability = r,
471 QosOverrideValue::Durability(d) => self.durability = d,
472 QosOverrideValue::History(h) => self.history = h,
473 QosOverrideValue::Depth(d) => self.depth = d,
474 }
475 }
476 }
477 self
478 }
479}
480
481impl QosSettings {
482 /// Internal const builder. Extended-policy fields default to
483 /// "off" (zero) and `liveliness_kind = Automatic` (the upstream
484 /// `rmw_qos_profile_default` choice).
485 const fn build(
486 reliability: QosReliabilityPolicy,
487 durability: QosDurabilityPolicy,
488 history: QosHistoryPolicy,
489 depth: u32,
490 ) -> Self {
491 Self {
492 history,
493 reliability,
494 durability,
495 liveliness_kind: QosLivelinessPolicy::Automatic,
496 depth,
497 deadline_ms: 0,
498 lifespan_ms: 0,
499 liveliness_lease_ms: 0,
500 avoid_ros_namespace_conventions: false,
501 }
502 }
503
504 /// Create new QoS settings with defaults (matches `QOS_PROFILE_DEFAULT`:
505 /// Reliable, Volatile, KeepLast(10)).
506 pub const fn new() -> Self {
507 Self::QOS_PROFILE_DEFAULT
508 }
509
510 /// Best-effort QoS (for real-time)
511 pub const BEST_EFFORT: Self = Self::build(
512 QosReliabilityPolicy::BestEffort,
513 QosDurabilityPolicy::Volatile,
514 QosHistoryPolicy::KeepLast,
515 1,
516 );
517
518 /// Reliable QoS
519 pub const RELIABLE: Self = Self::build(
520 QosReliabilityPolicy::Reliable,
521 QosDurabilityPolicy::Volatile,
522 QosHistoryPolicy::KeepLast,
523 10,
524 );
525
526 /// System default QoS profile (matches rmw_qos_profile_system_default)
527 pub const QOS_PROFILE_SYSTEM_DEFAULT: Self = Self::build(
528 QosReliabilityPolicy::Reliable,
529 QosDurabilityPolicy::Volatile,
530 QosHistoryPolicy::KeepLast,
531 1,
532 );
533
534 /// Default QoS profile (matches rmw_qos_profile_default)
535 pub const QOS_PROFILE_DEFAULT: Self = Self::build(
536 QosReliabilityPolicy::Reliable,
537 QosDurabilityPolicy::Volatile,
538 QosHistoryPolicy::KeepLast,
539 10,
540 );
541
542 /// Sensor data QoS profile (matches rmw_qos_profile_sensor_data)
543 pub const QOS_PROFILE_SENSOR_DATA: Self = Self::build(
544 QosReliabilityPolicy::BestEffort,
545 QosDurabilityPolicy::Volatile,
546 QosHistoryPolicy::KeepLast,
547 5,
548 );
549
550 /// Services default QoS profile (matches rmw_qos_profile_services_default)
551 pub const QOS_PROFILE_SERVICES_DEFAULT: Self = Self::build(
552 QosReliabilityPolicy::Reliable,
553 QosDurabilityPolicy::Volatile,
554 QosHistoryPolicy::KeepLast,
555 10,
556 );
557
558 /// Parameters QoS profile (matches rmw_qos_profile_parameters)
559 pub const QOS_PROFILE_PARAMETERS: Self = Self::build(
560 QosReliabilityPolicy::Reliable,
561 QosDurabilityPolicy::TransientLocal,
562 QosHistoryPolicy::KeepLast,
563 1000,
564 );
565
566 /// Clock QoS profile - same as sensor data but with depth 1
567 pub const QOS_PROFILE_CLOCK: Self = Self::build(
568 QosReliabilityPolicy::BestEffort,
569 QosDurabilityPolicy::Volatile,
570 QosHistoryPolicy::KeepLast,
571 1,
572 );
573
574 /// Parameter events QoS profile (matches rmw_qos_profile_parameter_events)
575 pub const QOS_PROFILE_PARAMETER_EVENTS: Self = Self::build(
576 QosReliabilityPolicy::Reliable,
577 QosDurabilityPolicy::Volatile,
578 QosHistoryPolicy::KeepAll,
579 0, // Not used with KeepAll
580 );
581
582 /// Action status default QoS profile (matches rcl_action_qos_profile_status_default)
583 pub const QOS_PROFILE_ACTION_STATUS_DEFAULT: Self = Self::build(
584 QosReliabilityPolicy::Reliable,
585 QosDurabilityPolicy::TransientLocal,
586 QosHistoryPolicy::KeepLast,
587 1,
588 );
589
590 /// PX4 companion QoS profile (Phase 233 / RFC-0039 Track B). Matches the
591 /// QoS PX4's `uxrce_dds_client` uses on `/fmu/out/*` and `/fmu/in/*` —
592 /// `BEST_EFFORT` + `VOLATILE` + `KEEP_LAST(1)`. A nano-ros node talking to
593 /// the same `MicroXRCEAgent` must use this (a reliable or
594 /// `TRANSIENT_LOCAL` reader will not match PX4's volatile best-effort
595 /// writers). Verified against real PX4 SITL (`nros-px4-sitl-test`):
596 /// `TRANSIENT_LOCAL` durability silently fails to match `/fmu/out/*`.
597 /// Adjust depth via `.keep_last(n)` for higher-rate streams.
598 pub const QOS_PROFILE_PX4: Self = Self::build(
599 QosReliabilityPolicy::BestEffort,
600 QosDurabilityPolicy::Volatile,
601 QosHistoryPolicy::KeepLast,
602 1,
603 );
604
605 // --- Static constructor methods (matching rclrs API) ---
606
607 /// Get the default QoS profile for ordinary topics
608 pub const fn topics_default() -> Self {
609 Self::QOS_PROFILE_DEFAULT
610 }
611
612 /// The PX4 companion QoS profile ([`QOS_PROFILE_PX4`](Self::QOS_PROFILE_PX4))
613 /// — use for `/fmu/out/*` subscriptions and `/fmu/in/*` publications against
614 /// a `MicroXRCEAgent`.
615 pub const fn px4() -> Self {
616 Self::QOS_PROFILE_PX4
617 }
618
619 /// Get the default QoS profile for sensor data topics
620 pub const fn sensor_data_default() -> Self {
621 Self::QOS_PROFILE_SENSOR_DATA
622 }
623
624 /// Get the default QoS profile for services
625 pub const fn services_default() -> Self {
626 Self::QOS_PROFILE_SERVICES_DEFAULT
627 }
628
629 /// Get the default QoS profile for parameter services
630 pub const fn parameters_default() -> Self {
631 Self::QOS_PROFILE_PARAMETERS
632 }
633
634 /// Get the default QoS profile for parameter events
635 pub const fn parameter_events_default() -> Self {
636 Self::QOS_PROFILE_PARAMETER_EVENTS
637 }
638
639 /// Get the system default QoS profile
640 pub const fn system_default() -> Self {
641 Self::QOS_PROFILE_SYSTEM_DEFAULT
642 }
643
644 /// Get the default QoS profile for action status topics
645 pub const fn action_status_default() -> Self {
646 Self::QOS_PROFILE_ACTION_STATUS_DEFAULT
647 }
648
649 /// Get the default QoS profile for clock topics
650 pub const fn clock_default() -> Self {
651 Self::QOS_PROFILE_CLOCK
652 }
653
654 // --- Builder methods ---
655
656 /// Set history to keep last N messages
657 pub const fn keep_last(mut self, depth: u32) -> Self {
658 self.history = QosHistoryPolicy::KeepLast;
659 self.depth = depth;
660 self
661 }
662
663 /// Set history to keep all messages
664 pub const fn keep_all(mut self) -> Self {
665 self.history = QosHistoryPolicy::KeepAll;
666 self
667 }
668
669 /// Set reliability to reliable
670 pub const fn reliable(mut self) -> Self {
671 self.reliability = QosReliabilityPolicy::Reliable;
672 self
673 }
674
675 /// Set reliability to best-effort
676 pub const fn best_effort(mut self) -> Self {
677 self.reliability = QosReliabilityPolicy::BestEffort;
678 self
679 }
680
681 /// Set durability to volatile
682 pub const fn volatile(mut self) -> Self {
683 self.durability = QosDurabilityPolicy::Volatile;
684 self
685 }
686
687 /// Set durability to transient local
688 pub const fn transient_local(mut self) -> Self {
689 self.durability = QosDurabilityPolicy::TransientLocal;
690 self
691 }
692
693 /// Set reliability policy explicitly
694 pub const fn reliability(mut self, policy: QosReliabilityPolicy) -> Self {
695 self.reliability = policy;
696 self
697 }
698
699 /// Set durability policy explicitly
700 pub const fn durability(mut self, policy: QosDurabilityPolicy) -> Self {
701 self.durability = policy;
702 self
703 }
704
705 /// Set history policy explicitly
706 pub const fn history(mut self, policy: QosHistoryPolicy) -> Self {
707 self.history = policy;
708 self
709 }
710
711 /// Set history depth explicitly
712 pub const fn depth(mut self, depth: u32) -> Self {
713 self.depth = depth;
714 self
715 }
716
717 /// Get history depth (for backwards compatibility)
718 pub const fn history_depth(&self) -> u8 {
719 if self.depth > 255 {
720 255
721 } else {
722 self.depth as u8
723 }
724 }
725}
726
727/// Transport session configuration
728#[derive(Debug, Clone)]
729pub struct TransportConfig<'a> {
730 /// Peer locator (e.g., "tcp/192.168.1.1:7447" or "serial//dev/ttyUSB0#baudrate=115200")
731 pub locator: Option<&'a str>,
732 /// Session mode: client, peer, or router
733 pub mode: SessionMode,
734 /// Additional transport properties (key-value pairs)
735 ///
736 /// These are passed through to the underlying transport backend.
737 /// For zenoh-pico, recognized keys include:
738 /// - `"multicast_scouting"` - Enable/disable multicast scouting (`"true"` or `"false"`)
739 /// - `"scouting_timeout_ms"` - Scouting timeout in milliseconds
740 /// - `"multicast_locator"` - Multicast group address
741 /// - `"listen"` - Listen endpoint (e.g., `"tcp/0.0.0.0:0"`)
742 /// - `"add_timestamp"` - Add timestamps to messages (`"true"` or `"false"`)
743 pub properties: &'a [(&'a str, &'a str)],
744}
745
746impl Default for TransportConfig<'_> {
747 fn default() -> Self {
748 Self {
749 locator: None,
750 mode: SessionMode::Client,
751 properties: &[],
752 }
753 }
754}
755
756/// Middleware-agnostic session configuration.
757///
758/// `RmwConfig` provides a uniform interface that any RMW backend can
759/// interpret. Backends map the universal fields to their own connection
760/// parameters and interpret `properties` for anything backend-specific.
761///
762/// # Examples
763///
764/// ```
765/// use nros_rmw::{RmwConfig, SessionMode};
766///
767/// let config = RmwConfig {
768/// locator: "tcp/192.168.1.1:7447",
769/// mode: SessionMode::Client,
770/// domain_id: 0,
771/// node_name: "talker",
772/// namespace: "",
773/// properties: &[],
774/// };
775/// ```
776#[derive(Debug, Clone, Copy)]
777pub struct RmwConfig<'a> {
778 /// Middleware-specific connection string.
779 ///
780 /// - zenoh: `"tcp/192.168.1.1:7447"` or `"udp/224.0.0.224:7447"`
781 /// - XRCE-DDS: `"udp/192.168.1.1:2019"`
782 pub locator: &'a str,
783 /// Session mode (zenoh: client/peer; XRCE-DDS: always client)
784 pub mode: SessionMode,
785 /// ROS 2 domain ID (maps to DDS domain or zenoh key prefix)
786 pub domain_id: u32,
787 /// Node name (e.g., `"talker"`)
788 pub node_name: &'a str,
789 /// Node namespace (e.g., `""` or `"/ns1"`)
790 pub namespace: &'a str,
791 /// Backend-specific key/value properties.
792 ///
793 /// Uniform escape hatch for backend-specific tuning that doesn't fit
794 /// the universal fields above. Each backend documents the keys it
795 /// understands; unknown keys are ignored. Passing `&[]` is always
796 /// valid.
797 ///
798 /// Examples:
799 /// - zenoh: `"tls.root_ca"`, `"scouting.multicast.enabled"`
800 /// - XRCE-DDS: `"agent_port"`, `"client_key"`
801 pub properties: &'a [(&'a str, &'a str)],
802}
803
804impl Default for RmwConfig<'_> {
805 fn default() -> Self {
806 Self {
807 locator: "tcp/127.0.0.1:7447",
808 mode: SessionMode::Client,
809 domain_id: 0,
810 node_name: "node",
811 namespace: "",
812 properties: &[],
813 }
814 }
815}
816
817/// Locator transport protocol
818#[derive(Debug, Clone, Copy, PartialEq, Eq)]
819pub enum LocatorProtocol {
820 /// TCP transport (e.g., "tcp/127.0.0.1:7447")
821 Tcp,
822 /// UDP transport (e.g., "udp/192.168.1.50:2019" — common for XRCE-DDS)
823 Udp,
824 /// Serial/UART transport (e.g., "serial//dev/ttyUSB0#baudrate=115200")
825 Serial,
826 /// Unknown protocol
827 Unknown,
828}
829
830/// Parse the protocol from a locator string
831pub fn locator_protocol(locator: &str) -> LocatorProtocol {
832 if locator.starts_with("tcp/") {
833 LocatorProtocol::Tcp
834 } else if locator.starts_with("udp/") {
835 LocatorProtocol::Udp
836 } else if locator.starts_with("serial/") {
837 LocatorProtocol::Serial
838 } else {
839 LocatorProtocol::Unknown
840 }
841}
842
843/// Validate a locator string format.
844///
845/// Returns `Ok(())` if the locator is well-formed, or an error message describing
846/// the problem. This provides early feedback before zenoh-pico or XRCE-DDS rejects
847/// a bad locator.
848///
849/// Supported formats:
850/// - TCP: `tcp/<host>:<port>` (e.g., `tcp/127.0.0.1:7447`)
851/// - UDP: `udp/<host>:<port>` (e.g., `udp/192.168.1.50:2019`)
852/// - Serial: `serial/<device>#baudrate=<rate>` (e.g., `serial//dev/ttyUSB0#baudrate=115200`)
853pub fn validate_locator(locator: &str) -> Result<(), &'static str> {
854 match locator_protocol(locator) {
855 LocatorProtocol::Tcp => {
856 let rest = &locator[4..]; // skip "tcp/"
857 if !rest.contains(':') {
858 return Err("TCP locator must contain host:port (e.g., tcp/127.0.0.1:7447)");
859 }
860 Ok(())
861 }
862 LocatorProtocol::Udp => {
863 let rest = &locator[4..]; // skip "udp/"
864 if !rest.contains(':') {
865 return Err("UDP locator must contain host:port (e.g., udp/192.168.1.50:2019)");
866 }
867 Ok(())
868 }
869 LocatorProtocol::Serial => {
870 let rest = &locator[7..]; // skip "serial/"
871 if rest.is_empty() {
872 return Err(
873 "serial locator must specify device (e.g., serial//dev/ttyUSB0#baudrate=115200)",
874 );
875 }
876 if !rest.contains("#baudrate=") {
877 return Err(
878 "serial locator must include #baudrate=RATE (e.g., serial//dev/ttyUSB0#baudrate=115200)",
879 );
880 }
881 // Validate baudrate is numeric
882 if let Some(baud_str) = rest.split("#baudrate=").nth(1) {
883 let baud_str = baud_str.split('#').next().unwrap_or(baud_str);
884 if baud_str.parse::<u32>().is_err() {
885 return Err("serial baudrate must be a number");
886 }
887 }
888 Ok(())
889 }
890 LocatorProtocol::Unknown => {
891 Err("unknown locator protocol (expected tcp/, udp/, or serial/)")
892 }
893 }
894}
895
896/// Session mode
897#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
898pub enum SessionMode {
899 /// Connect as client to a router
900 #[default]
901 Client,
902 /// Connect as peer for peer-to-peer communication
903 Peer,
904}
905
906/// Transport session trait — the per-process anchor an RMW backend
907/// gives to the executor.
908///
909/// # Threading
910///
911/// `&mut self` on every method means the executor serialises all
912/// session calls onto a single thread. A backend may rely on this
913/// — no internal locking is required for `create_*` / `close` /
914/// `drive_io`. **Publisher / subscriber / service handles created
915/// from the session, however, are typically used from worker
916/// threads** and must carry their own synchronisation (see the
917/// [`Publisher`] / [`Subscriber`] trait docs).
918///
919/// # Calling pattern
920///
921/// 1. Open the session (backend-specific factory; not on this trait).
922/// 2. `create_*` for every entity at startup. Creating entities mid-
923/// flight after `drive_io` has run is allowed but not common.
924/// 3. The executor calls `drive_io` periodically. Worker threads
925/// publish / receive in parallel.
926/// 4. `close` once at shutdown. Entities must be dropped first.
927pub trait Session {
928 /// Error type for this session
929 type Error;
930 /// Publisher handle type
931 type PublisherHandle;
932 /// Subscriber handle type
933 type SubscriberHandle;
934 /// Service server handle type
935 type ServiceServerHandle;
936 /// Service client handle type
937 type ServiceClientHandle;
938
939 /// Create a publisher bound to this session.
940 ///
941 /// May allocate transport resources (zenoh declarations, DDS
942 /// writers). Returns a handle that outlives the call but not the
943 /// session — drop the handle before `close()`.
944 fn create_publisher(
945 &mut self,
946 topic: &TopicInfo,
947 qos: QosSettings,
948 ) -> Result<Self::PublisherHandle, Self::Error>;
949
950 /// Create a subscriber bound to this session.
951 ///
952 /// Subscribers may start receiving immediately after creation if
953 /// the transport supports late-joining publishers. Late messages
954 /// are buffered up to the QoS depth.
955 fn create_subscriber(
956 &mut self,
957 topic: &TopicInfo,
958 qos: QosSettings,
959 ) -> Result<Self::SubscriberHandle, Self::Error>;
960
961 /// Create a service server bound to this session. Replies are
962 /// matched to requests by the sequence number returned from
963 /// [`ServiceServerTrait::try_recv_request`].
964 ///
965 /// `qos` is applied to both the request and reply endpoints (a
966 /// service is two DDS topics; rmw uses one profile for both). The
967 /// default is [`QosSettings::services_default`]
968 /// (RELIABLE+VOLATILE+KEEP_LAST(10)).
969 fn create_service_server(
970 &mut self,
971 service: &ServiceInfo,
972 qos: QosSettings,
973 ) -> Result<Self::ServiceServerHandle, Self::Error>;
974
975 /// Create a service client bound to this session.
976 ///
977 /// `qos` is applied to both the request and reply endpoints (a
978 /// service is two DDS topics; rmw uses one profile for both). The
979 /// default is [`QosSettings::services_default`]
980 /// (RELIABLE+VOLATILE+KEEP_LAST(10)).
981 fn create_service_client(
982 &mut self,
983 service: &ServiceInfo,
984 qos: QosSettings,
985 ) -> Result<Self::ServiceClientHandle, Self::Error>;
986
987 /// Close the session, releasing transport resources. All entity
988 /// handles created from this session must already be dropped.
989 fn close(&mut self) -> Result<(), Self::Error>;
990
991 /// Drive transport I/O (poll network, dispatch callbacks).
992 ///
993 /// Both zenoh-pico and XRCE-DDS are pull-based: they require the
994 /// application to periodically call this method to read from the
995 /// network socket and dispatch incoming messages to subscriber
996 /// buffers.
997 ///
998 /// `timeout_ms` is the maximum time to wait for data (0 = non-blocking;
999 /// negative values mean "block indefinitely" — see Phase 84.D7 for the
1000 /// planned migration to `core::time::Duration`).
1001 ///
1002 /// **Required**. There is no default body — both shipped backends
1003 /// (zenoh and XRCE) must drive I/O, and a silent no-op default was a
1004 /// trap for third-party implementers. If your backend genuinely
1005 /// receives data via OS callbacks (push-based) and has nothing to do
1006 /// here, return `Ok(())` explicitly.
1007 fn drive_io(&mut self, timeout_ms: i32) -> Result<(), Self::Error>;
1008
1009 /// Phase 109 — report which QoS policies the active backend
1010 /// honours. The runtime validates requested QoS against this mask
1011 /// at entity-create time and returns
1012 /// [`TransportError::IncompatibleQos`] if the requested profile
1013 /// includes a policy the backend can't enforce. **No silent
1014 /// downgrade.**
1015 ///
1016 /// Default returns [`QosPolicyMask::CORE`] — reliability +
1017 /// durability VOLATILE + history + depth. Backends override per
1018 /// supported policy.
1019 fn supported_qos_policies(&self) -> QosPolicyMask {
1020 QosPolicyMask::CORE
1021 }
1022
1023 /// Phase 110.0 — backend's next internal-event deadline in
1024 /// milliseconds from now (lease keepalive, heartbeat, reader
1025 /// ACK-NACK timeout, etc.).
1026 ///
1027 /// The executor caps its `drive_io` timeout against
1028 /// `min(user_timeout, timer_deadline, this)` so quiet links don't
1029 /// wake early, see no user-visible work, and round-trip back into
1030 /// `drive_io`. Returns `None` when the backend has no internal
1031 /// deadlines or chooses not to expose them.
1032 ///
1033 /// Default `None` keeps existing backends working unchanged; opt-in
1034 /// per backend.
1035 fn next_deadline_ms(&self) -> Option<u32> {
1036 None
1037 }
1038
1039 /// Phase 124.B.1 — install (or clear, when `cb.is_none()`) the
1040 /// executor wake callback. The runtime calls this once per
1041 /// session after `open` with `cb` pointing at a runtime-owned
1042 /// function and `ctx` pointing at the executor's wake state.
1043 /// The backend stores `(cb, ctx)` in its per-session state and
1044 /// calls `cb(ctx)` whenever its transport notification path
1045 /// fires (datagram arrival, condvar wake, etc.) — the runtime
1046 /// cb does flag-write + condvar-signal atomically, so a
1047 /// `spin_once` blocked on the wake condvar resumes immediately
1048 /// instead of waiting for the next poll iteration.
1049 ///
1050 /// # Safety
1051 ///
1052 /// When `cb` is `Some`, `ctx` must remain valid until the callback is
1053 /// cleared or the session is closed. The backend may invoke `cb(ctx)` from
1054 /// its transport notification path.
1055 ///
1056 /// Default body: ignore the call. Poll-only backends (XRCE,
1057 /// bare-metal) leave the default in place; the executor still
1058 /// drains them on its deadline-bound cv-wait boundary.
1059 unsafe fn set_wake_callback(
1060 &mut self,
1061 cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
1062 ctx: *mut core::ffi::c_void,
1063 ) {
1064 let _ = (cb, ctx);
1065 }
1066
1067 /// Phase 130.4 — does this backend actually honour
1068 /// [`set_wake_callback`]?
1069 ///
1070 /// `true` means the backend installs the callback and will
1071 /// fire it from its async notify path (worker thread, ISR,
1072 /// signalfd, …). `false` (the default) means
1073 /// `set_wake_callback` was a no-op — the executor must drive
1074 /// I/O for the caller's full timeout because no async wake
1075 /// will pre-empt it.
1076 ///
1077 /// The executor uses this to choose between the wake-primitive
1078 /// wait (`NodeWake::wait_ms` / `std::Condvar::wait_timeout_while`)
1079 /// and a direct `drive_io(timeout_ms)`. Poll-only backends
1080 /// (XRCE-DDS-Client, bare-metal smoltcp) return `false`;
1081 /// event-driven backends (zenoh-pico with an RX task that
1082 /// invokes the callback on packet arrival) return `true`.
1083 ///
1084 /// [`set_wake_callback`]: Self::set_wake_callback
1085 fn supports_wake_callback(&self) -> bool {
1086 false
1087 }
1088
1089 /// Phase 124.F.1 — session-level connectivity probe.
1090 ///
1091 /// Sends a wire-level round-trip probe and waits up to
1092 /// `timeout_ms`. `Ok(())` on reply, `Err(TransportError::Timeout)`
1093 /// on no reply, `Err(TransportError::Unsupported)` when the
1094 /// backend can't probe (DDS without participant introspection).
1095 /// Lesson from micro-ROS's `rmw_uros_ping_agent`.
1096 ///
1097 /// Default body: `Err(Unsupported)`. Backends with a native
1098 /// ping API (zenoh: `z_send_ping`; XRCE:
1099 /// `uxr_ping_agent_session_until_timeout`) opt in by overriding.
1100 fn ping_session(&mut self, timeout_ms: i32) -> Result<(), Self::Error>
1101 where
1102 Self::Error: From<TransportError>,
1103 {
1104 let _ = timeout_ms;
1105 Err(TransportError::Unsupported.into())
1106 }
1107}
1108
1109/// Bitmask of QoS policies a backend can honour. See
1110/// [`Session::supported_qos_policies`].
1111///
1112/// `CORE` covers the policies every nano-ros backend implements:
1113/// reliability, durability=VOLATILE, history, depth. Backends opt
1114/// into additional policies by OR-ing the relevant flags.
1115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1116pub struct QosPolicyMask(pub u32);
1117
1118impl QosPolicyMask {
1119 pub const RELIABILITY: Self = Self(1 << 0);
1120 pub const DURABILITY_VOLATILE: Self = Self(1 << 1);
1121 pub const DURABILITY_TRANSIENT_LOCAL: Self = Self(1 << 2);
1122 pub const HISTORY: Self = Self(1 << 3);
1123 pub const DEPTH: Self = Self(1 << 4);
1124 pub const DEADLINE: Self = Self(1 << 5);
1125 pub const LIFESPAN: Self = Self(1 << 6);
1126 pub const LIVELINESS_AUTOMATIC: Self = Self(1 << 7);
1127 pub const LIVELINESS_MANUAL_BY_TOPIC: Self = Self(1 << 8);
1128 pub const LIVELINESS_MANUAL_BY_NODE: Self = Self(1 << 9);
1129 pub const LIVELINESS_LEASE: Self = Self(1 << 10);
1130 pub const AVOID_ROS_NAMESPACE_CONVENTIONS: Self = Self(1 << 11);
1131
1132 /// Policies every nano-ros backend implements.
1133 pub const CORE: Self =
1134 Self(Self::RELIABILITY.0 | Self::DURABILITY_VOLATILE.0 | Self::HISTORY.0 | Self::DEPTH.0);
1135
1136 /// `true` if `self` contains every policy in `other`.
1137 pub const fn contains(self, other: Self) -> bool {
1138 self.0 & other.0 == other.0
1139 }
1140
1141 /// Bitwise OR of two masks.
1142 pub const fn union(self, other: Self) -> Self {
1143 Self(self.0 | other.0)
1144 }
1145}
1146
1147impl core::ops::BitOr for QosPolicyMask {
1148 type Output = Self;
1149 fn bitor(self, rhs: Self) -> Self {
1150 self.union(rhs)
1151 }
1152}
1153
1154impl core::ops::BitOrAssign for QosPolicyMask {
1155 fn bitor_assign(&mut self, rhs: Self) {
1156 self.0 |= rhs.0;
1157 }
1158}
1159
1160impl QosSettings {
1161 /// Compute the set of QoS policies actually requested by this profile.
1162 ///
1163 /// Zero-valued time fields and `LivelinessKind::None` count as "not
1164 /// requesting" the corresponding policy — the cheap default. The
1165 /// `CORE` bits (reliability, durability=VOLATILE, history, depth)
1166 /// are always present because every nano-ros backend honours them.
1167 pub fn required_policies(&self) -> QosPolicyMask {
1168 let mut mask = QosPolicyMask::CORE;
1169 if self.durability == QosDurabilityPolicy::TransientLocal {
1170 mask = QosPolicyMask(
1171 (mask.0 & !QosPolicyMask::DURABILITY_VOLATILE.0)
1172 | QosPolicyMask::DURABILITY_TRANSIENT_LOCAL.0,
1173 );
1174 }
1175 if self.deadline_ms != 0 {
1176 mask |= QosPolicyMask::DEADLINE;
1177 }
1178 if self.lifespan_ms != 0 {
1179 mask |= QosPolicyMask::LIFESPAN;
1180 }
1181 match self.liveliness_kind {
1182 QosLivelinessPolicy::None => {}
1183 QosLivelinessPolicy::Automatic => mask |= QosPolicyMask::LIVELINESS_AUTOMATIC,
1184 QosLivelinessPolicy::ManualByTopic => mask |= QosPolicyMask::LIVELINESS_MANUAL_BY_TOPIC,
1185 QosLivelinessPolicy::ManualByNode => mask |= QosPolicyMask::LIVELINESS_MANUAL_BY_NODE,
1186 }
1187 if self.liveliness_lease_ms != 0 {
1188 mask |= QosPolicyMask::LIVELINESS_LEASE;
1189 }
1190 if self.avoid_ros_namespace_conventions {
1191 mask |= QosPolicyMask::AVOID_ROS_NAMESPACE_CONVENTIONS;
1192 }
1193 mask
1194 }
1195
1196 /// Returns `Err(TransportError::IncompatibleQos)` if any policy this
1197 /// profile requires is missing from the backend's `supported` mask.
1198 /// Used at entity-create time to enforce the **no silent
1199 /// degradation** contract.
1200 pub fn validate_against(&self, supported: QosPolicyMask) -> Result<(), TransportError> {
1201 if supported.contains(self.required_policies()) {
1202 Ok(())
1203 } else {
1204 Err(TransportError::IncompatibleQos)
1205 }
1206 }
1207}
1208
1209/// Publisher trait for sending messages.
1210///
1211/// # Threading
1212///
1213/// `&self` on `publish_raw` — implementors must allow concurrent
1214/// publishes from multiple threads. Internal locking (or lock-free
1215/// queues) is the backend's responsibility.
1216///
1217/// # Buffer ownership
1218///
1219/// `data` in `publish_raw` is borrowed for the duration of the call.
1220/// The backend must either send it inline or copy into its own
1221/// buffer before returning — the slice is invalid after the call.
1222///
1223/// # Blocking
1224///
1225/// `publish_raw` is expected to be non-blocking on best-effort QoS
1226/// and bounded-blocking on reliable QoS (waiting for outbound queue
1227/// space). Backends should *not* block waiting for ack from a
1228/// matched subscriber.
1229pub trait Publisher {
1230 /// Error type for publish operations
1231 type Error;
1232
1233 /// Publish a CDR-serialised message.
1234 ///
1235 /// Returns once the message has been handed to the transport
1236 /// (queued or fired-and-forgotten depending on QoS). Does **not**
1237 /// wait for delivery.
1238 fn publish_raw(&self, data: &[u8]) -> Result<(), Self::Error>;
1239
1240 /// Phase 128.F.4 — publish with an opaque attachment block.
1241 ///
1242 /// `attachment` rides alongside the payload at the wire layer.
1243 /// Receivers can read it back via
1244 /// [`Subscriber::try_recv_raw_with_attachment`].
1245 ///
1246 /// Primary use case: cross-RMW bridges stamp a `bridge_origin`
1247 /// tag (the source backend's RMW name) so a paired return
1248 /// bridge can deterministically drop echoed frames.
1249 ///
1250 /// Default body delegates to [`publish_raw`](Self::publish_raw)
1251 /// and discards the attachment — backends that do not natively
1252 /// carry attachments (XRCE today, DDS without a user-data hook)
1253 /// see no change. Backends with native attachment support
1254 /// (zenoh-pico's `z_publisher_put_options::attachment`,
1255 /// Cyclone DDS user-data) override to write the bytes onto the
1256 /// wire.
1257 fn publish_raw_with_attachment(
1258 &self,
1259 data: &[u8],
1260 _attachment: &[u8],
1261 ) -> Result<(), Self::Error> {
1262 self.publish_raw(data)
1263 }
1264
1265 /// Phase 124.E.1 — streamed publish.
1266 ///
1267 /// `size_cb` reports the total payload length once; `chunk_cb`
1268 /// fills the slot in chunks. Saves the per-publisher staging
1269 /// buffer when the message is large enough to dominate the
1270 /// device's `.bss`.
1271 ///
1272 /// Default body — the **staging-buffer fallback** (124.E.2).
1273 /// Asks `size_cb` for the total length, fills a stack-allocated
1274 /// `[u8; NROS_MAX_STREAM_CHUNK]` via `chunk_cb`, then forwards
1275 /// to `publish_raw`. Returns `Err(BufferTooSmall)` if the total
1276 /// exceeds the stack cap so the caller can drop back to a
1277 /// regular `publish_raw` with a heap-sized buffer.
1278 ///
1279 /// Concrete backends opt in by overriding to stream straight
1280 /// into the network buffer (zenoh: write into the zenoh-pico
1281 /// outbound buffer; XRCE: micro-CDR streaming APIs).
1282 ///
1283 /// # Safety
1284 ///
1285 /// The caller must ensure `user_ctx` is valid for every invocation of
1286 /// `size_cb` and `chunk_cb` during this call, and that both callbacks obey
1287 /// their out-pointer contracts.
1288 ///
1289 /// `size_cb` and `chunk_cb` may be called from the same thread
1290 /// that called `publish_streamed`. Backends MUST NOT defer the
1291 /// calls past the function return; the caller's `user_ctx`
1292 /// pointer is only guaranteed valid for the duration of the
1293 /// call.
1294 unsafe fn publish_streamed(
1295 &self,
1296 size_cb: unsafe extern "C" fn(out_total_len: *mut usize, user_ctx: *mut core::ffi::c_void),
1297 chunk_cb: unsafe extern "C" fn(
1298 out_buf: *mut u8,
1299 cap: usize,
1300 out_written: *mut usize,
1301 user_ctx: *mut core::ffi::c_void,
1302 ),
1303 user_ctx: *mut core::ffi::c_void,
1304 ) -> Result<(), Self::Error>
1305 where
1306 Self::Error: From<TransportError>,
1307 {
1308 /// Default staging-buffer cap. Stack-allocated, so embedded
1309 /// callers don't pay for it unless they actually invoke this
1310 /// fallback. 4 KiB matches typical RTPS frag-size +
1311 /// micro-XRCE message ceilings.
1312 const STAGE_CAP: usize = 4096;
1313
1314 let mut total = 0usize;
1315 // SAFETY: caller's contract on `size_cb` matches our trait
1316 // doc — fire once with a writable `*mut usize` slot.
1317 unsafe { size_cb(&mut total as *mut usize, user_ctx) };
1318 if total > STAGE_CAP {
1319 return Err(TransportError::BufferTooSmall.into());
1320 }
1321 let mut stage = [0u8; STAGE_CAP];
1322 let mut written_so_far = 0usize;
1323 while written_so_far < total {
1324 let mut chunk_written = 0usize;
1325 let remaining = total - written_so_far;
1326 // SAFETY: `chunk_cb` writes ≤ `cap` bytes to
1327 // `out_buf` and reports the count via `out_written`.
1328 unsafe {
1329 chunk_cb(
1330 stage.as_mut_ptr().add(written_so_far),
1331 remaining,
1332 &mut chunk_written as *mut usize,
1333 user_ctx,
1334 );
1335 }
1336 if chunk_written == 0 {
1337 // Caller signalled EOF early — treat the partial
1338 // write as a malformed sequence; reporting it as
1339 // BufferTooSmall keeps the surface tight without
1340 // adding a new variant.
1341 return Err(TransportError::BufferTooSmall.into());
1342 }
1343 written_so_far += chunk_written;
1344 }
1345 self.publish_raw(&stage[..total])
1346 }
1347
1348 /// Publish a typed message (serializes automatically)
1349 fn publish<M: RosMessage>(&self, msg: &M, buf: &mut [u8]) -> Result<(), Self::Error> {
1350 use nros_core::CdrWriter;
1351
1352 let mut writer = CdrWriter::new_with_header(buf).map_err(|_| self.buffer_error())?;
1353 msg.serialize(&mut writer)
1354 .map_err(|_| self.serialization_error())?;
1355 let len = writer.position();
1356 self.publish_raw(&buf[..len])
1357 }
1358
1359 /// Return a buffer-too-small error (implementation specific)
1360 fn buffer_error(&self) -> Self::Error;
1361
1362 /// Return a serialization error (implementation specific)
1363 fn serialization_error(&self) -> Self::Error;
1364
1365 /// Phase 108 — `true` if the backend can generate this event for
1366 /// this publisher. Default returns `false`; backends override per
1367 /// supported event kind.
1368 ///
1369 /// Only [`EventKind::LivelinessLost`](crate::event::EventKind::LivelinessLost) and
1370 /// [`EventKind::OfferedDeadlineMissed`](crate::event::EventKind::OfferedDeadlineMissed) are publisher-side events;
1371 /// other kinds always return `false` here.
1372 fn supports_event(&self, _kind: crate::event::EventKind) -> bool {
1373 false
1374 }
1375
1376 /// Phase 108 — register a callback fired when the named status
1377 /// event occurs. `deadline_ms` applies to
1378 /// [`EventKind::OfferedDeadlineMissed`](crate::event::EventKind::OfferedDeadlineMissed) only; ignored otherwise.
1379 /// Default impl returns the backend's "unsupported"-shaped error.
1380 ///
1381 /// # Safety
1382 ///
1383 /// `cb` and `user_ctx` must remain valid for the entity's
1384 /// lifetime. Caller (typically `nros-node`'s typed wrapper) is
1385 /// responsible for keeping the closure / context arena alive.
1386 unsafe fn register_event_callback(
1387 &mut self,
1388 _kind: crate::event::EventKind,
1389 _deadline_ms: u32,
1390 _cb: crate::event::EventCallback,
1391 _user_ctx: *mut core::ffi::c_void,
1392 ) -> Result<(), Self::Error> {
1393 Err(self.unsupported_event_error())
1394 }
1395
1396 /// Phase 108 — backend's error variant for "this event kind is
1397 /// not supported." Default impl reuses `serialization_error()`
1398 /// since most backends share an `Unsupported` variant; backends
1399 /// override if they have a distinct `Unsupported` mapping.
1400 fn unsupported_event_error(&self) -> Self::Error {
1401 self.serialization_error()
1402 }
1403
1404 /// Phase 109 — assert this publisher's liveliness manually.
1405 /// Required for publishers configured with
1406 /// `QosLivelinessPolicy::ManualByTopic`. No-op for other
1407 /// liveliness kinds. Default impl returns `Ok(())` (no-op);
1408 /// backends override when they implement manual liveliness.
1409 fn assert_liveliness(&self) -> Result<(), Self::Error> {
1410 Ok(())
1411 }
1412}
1413
1414/// Subscriber trait for receiving messages.
1415///
1416/// # Threading
1417///
1418/// `&mut self` on `try_recv_raw` — the executor takes exclusive
1419/// ownership of the subscriber for the duration of a receive. A
1420/// backend that wants to allow concurrent receives must split into
1421/// per-thread sub-handles internally.
1422///
1423/// # Buffer ownership
1424///
1425/// `buf` is caller-owned. The implementation copies the next ready
1426/// message into `buf` and returns the byte count. The caller may
1427/// re-use or drop `buf` immediately after the call.
1428///
1429/// # Blocking
1430///
1431/// `try_recv_raw` is **non-blocking**: returns `Ok(None)` (or
1432/// equivalent for backends that map empty into a zero-length read)
1433/// when no message is ready. Use [`Session::drive_io`] to wait for
1434/// data; never sleep inside `try_recv_raw`.
1435pub trait Subscriber {
1436 /// Error type for receive operations
1437 type Error;
1438
1439 /// Check if data is available without consuming it.
1440 ///
1441 /// Non-destructive — does not advance the receive cursor.
1442 /// Conservative default returns `true` (always assume data may
1443 /// be available); backends should override with a real check
1444 /// to avoid spurious receive attempts.
1445 fn has_data(&self) -> bool {
1446 true
1447 }
1448
1449 /// Try to receive one message into `buf`.
1450 ///
1451 /// Non-blocking. On success returns `Ok(Some(len))` where `len`
1452 /// is the byte count written into `buf[..len]`. Returns
1453 /// `Ok(None)` if no message is ready. If `buf` is too small the
1454 /// backend may either truncate (and document it) or return an
1455 /// error (preferred).
1456 fn try_recv_raw(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error>;
1457
1458 /// Phase 128.F.4 — receive with attachment bytes alongside the
1459 /// payload.
1460 ///
1461 /// On success returns `Ok(Some((payload_len, attachment_len)))`
1462 /// with the payload written into `buf[..payload_len]` and the
1463 /// attachment (if any) written into
1464 /// `att_buf[..attachment_len]`. `attachment_len == 0` means the
1465 /// incoming sample carried no attachment.
1466 ///
1467 /// Default body falls back to [`try_recv_raw`](Self::try_recv_raw)
1468 /// and reports a 0-length attachment. Backends with native
1469 /// attachment support override to populate `att_buf`. Cross-RMW
1470 /// bridges use the attachment to read the `bridge_origin` tag
1471 /// stamped by the sending side.
1472 fn try_recv_raw_with_attachment(
1473 &mut self,
1474 buf: &mut [u8],
1475 _att_buf: &mut [u8],
1476 ) -> Result<Option<(usize, usize)>, Self::Error> {
1477 match self.try_recv_raw(buf)? {
1478 Some(len) => Ok(Some((len, 0))),
1479 None => Ok(None),
1480 }
1481 }
1482
1483 /// Phase 124.D.1 — burst-take.
1484 ///
1485 /// Drain up to `max_msgs` queued samples into the contiguous
1486 /// `buf` block in one call, with the i-th sample at
1487 /// `buf[i * per_msg_cap .. i * per_msg_cap + out_lens[i]]`.
1488 /// Returns the number of messages actually delivered. Partial
1489 /// drains MUST report the count, not error out.
1490 ///
1491 /// Default body loop-drives `try_recv_raw` so callers can
1492 /// commit to the batched API regardless of backend support.
1493 /// Concrete backends opt in by overriding with a native batch
1494 /// take (zenoh queue drain, `dds_take(max_samples)`).
1495 fn try_recv_sequence(
1496 &mut self,
1497 buf: &mut [u8],
1498 per_msg_cap: usize,
1499 max_msgs: usize,
1500 out_lens: &mut [usize],
1501 ) -> Result<usize, Self::Error> {
1502 if per_msg_cap == 0 || max_msgs == 0 {
1503 return Ok(0);
1504 }
1505 let limit = max_msgs.min(out_lens.len());
1506 let mut count = 0;
1507 for i in 0..limit {
1508 let slot = &mut buf[i * per_msg_cap..(i + 1) * per_msg_cap];
1509 match self.try_recv_raw(slot)? {
1510 Some(len) => {
1511 out_lens[i] = len;
1512 count += 1;
1513 }
1514 None => break,
1515 }
1516 }
1517 Ok(count)
1518 }
1519
1520 /// Try to receive a typed message (non-blocking)
1521 fn try_recv<M: RosMessage>(&mut self, buf: &mut [u8]) -> Result<Option<M>, Self::Error> {
1522 use nros_core::CdrReader;
1523
1524 match self.try_recv_raw(buf)? {
1525 Some(len) => {
1526 let mut reader = CdrReader::new_with_header(&buf[..len])
1527 .map_err(|_| self.deserialization_error())?;
1528 let msg = M::deserialize(&mut reader).map_err(|_| self.deserialization_error())?;
1529 Ok(Some(msg))
1530 }
1531 None => Ok(None),
1532 }
1533 }
1534
1535 /// Process the received message in-place without copying.
1536 ///
1537 /// Calls `f` with a reference to the raw CDR bytes in the subscriber's
1538 /// internal receive buffer, avoiding a copy into a caller-provided buffer.
1539 /// While `f` executes the buffer is exclusively borrowed — any messages
1540 /// arriving from the transport during that time are dropped to prevent
1541 /// data races.
1542 ///
1543 /// Returns `Ok(true)` if a message was available and `f` was called,
1544 /// `Ok(false)` if no message was available.
1545 ///
1546 /// **Default body**: returns `Err(MessageTooLarge)` — the old default
1547 /// silently truncated anything larger than 1 KB into a stack buffer,
1548 /// which broke large messages with no diagnostic. Backends must
1549 /// override this with a real zero-copy path if they advertise support
1550 /// for `process_raw_in_place`; callers that hit the default should
1551 /// use `try_recv_raw` with a caller-sized buffer instead.
1552 fn process_raw_in_place(&mut self, f: impl FnOnce(&[u8])) -> Result<bool, Self::Error>
1553 where
1554 Self::Error: From<TransportError>,
1555 {
1556 let _ = f;
1557 Err(TransportError::MessageTooLarge.into())
1558 }
1559
1560 /// Whether this backend implements the in-place dispatch methods
1561 /// ([`process_raw_in_place`](Subscriber::process_raw_in_place) /
1562 /// [`process_raw_in_place_with_info`](Subscriber::process_raw_in_place_with_info))
1563 /// with a real zero-copy borrow.
1564 ///
1565 /// The executor consults this at subscription registration to choose the
1566 /// **in-place** arena dispatch (borrow + deserialize from the backend slot, no
1567 /// arena buffer) over the **buffered** dispatch (copy into an arena buffer
1568 /// first). Backends that leave the in-place methods at their unsupported
1569 /// default return `false` (the default) and keep the buffered path. (RFC-0038,
1570 /// Phase 231 Wave 0.2.)
1571 fn supports_process_in_place(&self) -> bool {
1572 false
1573 }
1574
1575 /// In-place processing variant that also surfaces publisher metadata.
1576 ///
1577 /// Same borrow contract as
1578 /// [`process_raw_in_place`](Subscriber::process_raw_in_place): `f` receives
1579 /// the raw CDR bytes plus the parsed [`MessageInfo`](nros_core::MessageInfo)
1580 /// — the co-located attachment (publisher GID / sequence / source timestamp),
1581 /// or `None` when no attachment was present — for the duration of the call;
1582 /// the slot is released after `f` returns. `Ok(true)` = a message was
1583 /// available and `f` was called; `Ok(false)` = none ready.
1584 ///
1585 /// **Default body**: returns the unsupported error (mirrors
1586 /// `process_raw_in_place`). Backends that advertise in-place support override
1587 /// this with a real zero-copy path; callers that hit the default should use
1588 /// the buffered [`try_recv_raw_with_info`](Subscriber::try_recv_raw_with_info)
1589 /// path instead. (RFC-0038, Phase 231 Wave 0.1.)
1590 fn process_raw_in_place_with_info(
1591 &mut self,
1592 f: impl FnOnce(&[u8], Option<nros_core::MessageInfo>),
1593 ) -> Result<bool, Self::Error>
1594 where
1595 Self::Error: From<TransportError>,
1596 {
1597 let _ = f;
1598 Err(TransportError::MessageTooLarge.into())
1599 }
1600
1601 /// Try to receive raw data along with publisher metadata.
1602 ///
1603 /// When available, [`MessageInfo`](nros_core::MessageInfo) contains
1604 /// the publisher's GID (Global Identifier) and source timestamp,
1605 /// extracted from a transport-level attachment on the incoming message.
1606 ///
1607 /// Returns `Ok(Some((len, info)))` if data is available, where:
1608 /// - `len` is the number of bytes written to the buffer
1609 /// - `info` is the parsed publisher metadata (if attachment was present)
1610 ///
1611 /// Default: delegates to [`try_recv_raw`](Subscriber::try_recv_raw) with no info.
1612 fn try_recv_raw_with_info(
1613 &mut self,
1614 buf: &mut [u8],
1615 ) -> Result<Option<(usize, Option<nros_core::MessageInfo>)>, Self::Error> {
1616 self.try_recv_raw(buf).map(|opt| opt.map(|len| (len, None)))
1617 }
1618
1619 /// Try to receive raw data with E2E safety validation (CRC + sequence tracking).
1620 ///
1621 /// Returns `Ok(Some((len, status)))` if data is available, where:
1622 /// - `len` is the number of bytes written to the buffer
1623 /// - `status` is the integrity validation result
1624 ///
1625 /// Default: delegates to `try_recv_raw` with no CRC info.
1626 #[cfg(feature = "safety-e2e")]
1627 fn try_recv_validated(
1628 &mut self,
1629 buf: &mut [u8],
1630 ) -> Result<Option<(usize, crate::IntegrityStatus)>, Self::Error> {
1631 self.try_recv_raw(buf).map(|opt| {
1632 opt.map(|len| {
1633 (
1634 len,
1635 crate::IntegrityStatus {
1636 gap: 0,
1637 duplicate: false,
1638 crc_valid: None,
1639 },
1640 )
1641 })
1642 })
1643 }
1644
1645 /// Register an async waker to be notified when data arrives.
1646 ///
1647 /// Called from `Future::poll()` implementations to store the waker.
1648 /// The transport backend calls `waker.wake()` from its receive callback
1649 /// when new data is available, enabling event-driven async without
1650 /// busy-polling.
1651 ///
1652 /// Default: no-op (backends that don't support waking simply ignore this).
1653 fn register_waker(&self, _waker: &core::task::Waker) {}
1654
1655 /// Return a deserialization error (implementation specific)
1656 fn deserialization_error(&self) -> Self::Error;
1657
1658 /// Phase 108 — `true` if the backend can generate this event for
1659 /// this subscriber. Default returns `false`; backends override per
1660 /// supported event kind.
1661 ///
1662 /// Subscriber-side event kinds:
1663 /// [`EventKind::LivelinessChanged`](crate::event::EventKind::LivelinessChanged),
1664 /// [`EventKind::RequestedDeadlineMissed`](crate::event::EventKind::RequestedDeadlineMissed),
1665 /// [`EventKind::MessageLost`](crate::event::EventKind::MessageLost).
1666 /// Publisher kinds always return `false` here.
1667 fn supports_event(&self, _kind: crate::event::EventKind) -> bool {
1668 false
1669 }
1670
1671 /// Phase 108 — register a callback fired when the named status
1672 /// event occurs. `deadline_ms` applies to
1673 /// [`EventKind::RequestedDeadlineMissed`](crate::event::EventKind::RequestedDeadlineMissed) only; ignored otherwise.
1674 /// Default impl returns the backend's "unsupported"-shaped error.
1675 ///
1676 /// # Safety
1677 ///
1678 /// `cb` and `user_ctx` must remain valid for the entity's
1679 /// lifetime. Caller (typically `nros-node`'s typed wrapper) is
1680 /// responsible for keeping the closure / context arena alive.
1681 unsafe fn register_event_callback(
1682 &mut self,
1683 _kind: crate::event::EventKind,
1684 _deadline_ms: u32,
1685 _cb: crate::event::EventCallback,
1686 _user_ctx: *mut core::ffi::c_void,
1687 ) -> Result<(), Self::Error> {
1688 Err(self.unsupported_event_error())
1689 }
1690
1691 /// Phase 108 — backend's error variant for "this event kind is
1692 /// not supported." Default reuses `deserialization_error()` for
1693 /// backends that don't have a distinct `Unsupported` mapping.
1694 fn unsupported_event_error(&self) -> Self::Error {
1695 self.deserialization_error()
1696 }
1697}
1698
1699/// Service request from a client
1700pub struct ServiceRequest<'a> {
1701 /// Raw request data (CDR encoded)
1702 pub data: &'a [u8],
1703 /// Sequence number for request/response matching
1704 pub sequence_number: i64,
1705}
1706
1707// ============================================================================
1708// Phase 99 — zero-copy raw API: SlotLending / SlotBorrowing
1709// ============================================================================
1710//
1711// Backends that can lend a slot directly into their outbound buffer
1712// (zenoh-pico w/ unstable-zenoh-api, XRCE-DDS via uxr_prepare_output_stream,
1713// full DDS w/ SHM transport) implement these traits. Backends that cannot
1714// (uORB, default zenoh-pico) do NOT impl them — `EmbeddedRawPublisher` then
1715// falls back to its per-publisher arena and memcpys at commit time. Both
1716// paths land at `Publisher::publish_raw` for the actual wire write; only
1717// the user-side copy is eliminated when lending is available.
1718//
1719// Selection is **compile-time** via the `lending` Cargo feature. Each
1720// backend crate forwards its own `lending` feature to `nros-rmw/lending`
1721// when it can satisfy the trait. nros-node's `rmw-lending` aggregates.
1722// User opting `nros/rmw-lending` w/ a non-lending backend (e.g. uORB)
1723// gets a clear compile error from the unsatisfied trait bound on the
1724// concrete `RmwPublisher`.
1725
1726/// Backend can lend a writable slot into its outbound buffer.
1727///
1728/// The returned slot's lifetime is tied to `&self`; user fills it in
1729/// place, then calls [`commit_slot`](Self::commit_slot) to publish.
1730/// Dropping the slot without commit is a no-op (slot returned to free
1731/// pool); concurrent loan attempts that would exceed backend capacity
1732/// return [`TransportError::WouldBlock`] — never block.
1733#[cfg(feature = "lending")]
1734pub trait SlotLending: Publisher {
1735 /// Backend-owned writable slot. Holds a `&'a mut [u8]` and any
1736 /// state needed for commit_slot.
1737 type Slot<'a>: AsMut<[u8]> + 'a
1738 where
1739 Self: 'a;
1740
1741 /// Reserve a writable slot of `len` bytes from the backend's
1742 /// outbound buffer. Returns `Ok(None)` if the backend has no slot
1743 /// available (full); never blocks.
1744 fn try_lend_slot(&self, len: usize) -> Result<Option<Self::Slot<'_>>, Self::Error>;
1745
1746 /// Commit a previously-lent slot. Consumes the slot and triggers
1747 /// the actual wire write. Returns `Err` on backend send failure;
1748 /// the slot's bytes are lost in that case (caller must re-lend +
1749 /// re-fill to retry).
1750 fn commit_slot(&self, slot: Self::Slot<'_>) -> Result<(), Self::Error>;
1751}
1752
1753/// Backend can lend a read-only view into its receive buffer.
1754///
1755/// The returned view's lifetime is tied to `&mut self` (subscriber-
1756/// exclusive); dropping the view releases any backend lock and lets
1757/// the next message advance into the buffer.
1758#[cfg(feature = "lending")]
1759pub trait SlotBorrowing: Subscriber {
1760 /// Backend-owned read-only view. Holds a `&'a [u8]` and any state
1761 /// needed to release the borrow on Drop.
1762 type View<'a>: AsRef<[u8]> + 'a
1763 where
1764 Self: 'a;
1765
1766 /// Try to borrow the next available message in place. Returns
1767 /// `Ok(None)` if no message is ready; never blocks.
1768 fn try_borrow(&mut self) -> Result<Option<Self::View<'_>>, Self::Error>;
1769}
1770
1771/// Service server trait for handling requests.
1772///
1773/// # Threading
1774///
1775/// `&mut self` on `try_recv_request` and `send_reply` — the executor
1776/// owns the server while a request is being handled. Handler bodies
1777/// run synchronously on the executor thread; long handlers should
1778/// dispatch work to a worker queue and reply later via the recorded
1779/// `sequence_number`.
1780///
1781/// # Calling pattern
1782///
1783/// 1. Executor calls `try_recv_request(buf)`.
1784/// 2. If `Some(req)` returned, decode, run handler, encode reply.
1785/// 3. `send_reply(req.sequence_number, &reply_buf)`.
1786///
1787/// `sequence_number` is the canonical request → reply correlation
1788/// token; backends derive it from the wire-level metadata (zenoh
1789/// query id, DDS sample identity).
1790pub trait ServiceServerTrait {
1791 /// Error type for service operations
1792 type Error;
1793
1794 /// Check if a request is available without consuming it.
1795 ///
1796 /// Non-destructive. Default returns `true` (always assume one
1797 /// may be available); backends should override with a real
1798 /// check.
1799 fn has_request(&self) -> bool {
1800 true
1801 }
1802
1803 /// Phase 122.3.c.6.e — register a `Waker` for event-driven
1804 /// service servers. Mirrors the matching method on
1805 /// `SubscriberTrait` / `ServiceClientTrait`. Backends that
1806 /// surface incoming-request notifications wake `waker` when
1807 /// `has_request()` flips true. Default: no-op (backends without
1808 /// wake support ignore — caller falls back to polling).
1809 fn register_waker(&self, _waker: &core::task::Waker) {}
1810
1811 /// Try to receive a service request into `buf` (non-blocking).
1812 ///
1813 /// On success returns a `ServiceRequest` that borrows from
1814 /// `buf`. The borrow is released when the returned struct is
1815 /// dropped — typically before `send_reply` is called, since
1816 /// `send_reply` takes `&mut self`.
1817 fn try_recv_request<'a>(
1818 &mut self,
1819 buf: &'a mut [u8],
1820 ) -> Result<Option<ServiceRequest<'a>>, Self::Error>;
1821
1822 /// Send a reply for the given sequence number. Non-blocking
1823 /// from the application's perspective; the backend may queue
1824 /// the reply for transport-level transmission.
1825 fn send_reply(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), Self::Error>;
1826
1827 /// Handle a service request with typed messages
1828 fn handle_request<S: RosService>(
1829 &mut self,
1830 req_buf: &mut [u8],
1831 reply_buf: &mut [u8],
1832 handler: impl FnOnce(&S::Request) -> S::Reply,
1833 ) -> Result<bool, Self::Error>
1834 where
1835 Self::Error: From<TransportError>,
1836 {
1837 use nros_core::{CdrReader, CdrWriter};
1838
1839 // First, try to receive a request and extract necessary data.
1840 // Capture the data slice's offset within `req_buf` so we can
1841 // re-borrow it after the `ServiceRequest` (which holds a
1842 // borrow into `req_buf`) is dropped. Some backends prepend a
1843 // header (DDS: 8-byte sequence number) and place the CDR
1844 // payload at a non-zero offset in the buffer; others (zenoh)
1845 // put it at offset 0. Reading from offset 0 unconditionally
1846 // would feed the prefix bytes to the CDR deserializer and
1847 // silently corrupt the request.
1848 let buf_start = req_buf.as_ptr() as usize;
1849 let (data_offset, data_len, sequence_number) = match self.try_recv_request(req_buf)? {
1850 Some(request) => {
1851 let offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
1852 (offset, request.data.len(), request.sequence_number)
1853 }
1854 None => return Ok(false),
1855 };
1856
1857 // Deserialize request from the captured offset.
1858 let mut reader = CdrReader::new_with_header(&req_buf[data_offset..data_offset + data_len])
1859 .map_err(|_| TransportError::DeserializationError)?;
1860 let req = S::Request::deserialize(&mut reader)
1861 .map_err(|_| TransportError::DeserializationError)?;
1862
1863 // Call handler
1864 let reply = handler(&req);
1865
1866 // Serialize reply
1867 let mut writer =
1868 CdrWriter::new_with_header(reply_buf).map_err(|_| TransportError::BufferTooSmall)?;
1869 reply
1870 .serialize(&mut writer)
1871 .map_err(|_| TransportError::SerializationError)?;
1872 let len = writer.position();
1873
1874 // Send reply (now we can borrow self mutably again)
1875 self.send_reply(sequence_number, &reply_buf[..len])?;
1876 Ok(true)
1877 }
1878
1879 /// Handle a service request where the handler returns `Box<S::Reply>`
1880 ///
1881 /// Identical to `handle_request` but the handler returns a heap-allocated reply.
1882 /// This is needed for services with large response types (e.g., parameter services
1883 /// where `Vec<ParameterValue, 64>` is ~1MB+) that would overflow the stack.
1884 #[cfg(feature = "alloc")]
1885 fn handle_request_boxed<S: RosService>(
1886 &mut self,
1887 req_buf: &mut [u8],
1888 reply_buf: &mut [u8],
1889 handler: impl FnOnce(&S::Request) -> alloc::boxed::Box<S::Reply>,
1890 ) -> Result<bool, Self::Error>
1891 where
1892 Self::Error: From<TransportError>,
1893 {
1894 use nros_core::{CdrReader, CdrWriter};
1895
1896 let buf_start = req_buf.as_ptr() as usize;
1897 let (data_offset, data_len, sequence_number) = match self.try_recv_request(req_buf)? {
1898 Some(request) => {
1899 let offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
1900 (offset, request.data.len(), request.sequence_number)
1901 }
1902 None => return Ok(false),
1903 };
1904
1905 let mut reader = CdrReader::new_with_header(&req_buf[data_offset..data_offset + data_len])
1906 .map_err(|_| TransportError::DeserializationError)?;
1907 let req = S::Request::deserialize(&mut reader)
1908 .map_err(|_| TransportError::DeserializationError)?;
1909
1910 let reply = handler(&req);
1911
1912 let mut writer =
1913 CdrWriter::new_with_header(reply_buf).map_err(|_| TransportError::BufferTooSmall)?;
1914 reply
1915 .serialize(&mut writer)
1916 .map_err(|_| TransportError::SerializationError)?;
1917 let len = writer.position();
1918
1919 self.send_reply(sequence_number, &reply_buf[..len])?;
1920 Ok(true)
1921 }
1922}
1923
1924/// Service client trait for sending requests.
1925///
1926/// # Threading
1927///
1928/// `&mut self` on every method — the client is single-owner. For
1929/// fan-out request patterns, create one client per worker thread.
1930///
1931/// # Calling pattern
1932///
1933/// All in-tree backends route blocking waits through the executor:
1934///
1935/// 1. `send_request_raw(buf)` — non-blocking; returns once the
1936/// request is queued for transmission.
1937/// 2. The executor's `drive_io` runs.
1938/// 3. `try_recv_reply_raw(buf)` — non-blocking; returns
1939/// `Ok(Some(len))` when the reply is back.
1940///
1941/// The deprecated [`call_raw`](Self::call_raw) blocking path is
1942/// kept for backwards compatibility but should not be called.
1943pub trait ServiceClientTrait {
1944 /// Error type for service operations
1945 type Error;
1946
1947 /// Send a service request and wait for reply (blocking).
1948 ///
1949 /// **Deprecated — do not call.** The default body returns `Timeout`
1950 /// immediately without polling. Use `Client::call` →
1951 /// `Promise::wait(executor, timeout_ms)` which lets the executor
1952 /// drive I/O while waiting instead of busy-looping on
1953 /// `try_recv_reply_raw` with no sleep (which starves the transport
1954 /// on FreeRTOS / Zephyr single-threaded schedulers).
1955 ///
1956 /// Backends that still need an internal blocking path should
1957 /// override this with a real sleep-between-polls implementation,
1958 /// but all in-tree backends (zenoh, XRCE) route blocking waits
1959 /// through the executor.
1960 #[deprecated(note = "use Client::call → Promise::wait with an executor instead")]
1961 fn call_raw(&mut self, request: &[u8], _reply_buf: &mut [u8]) -> Result<usize, Self::Error>
1962 where
1963 Self::Error: From<TransportError>,
1964 {
1965 let _ = request;
1966 Err(TransportError::Timeout.into())
1967 }
1968
1969 /// Send a service request without waiting for a reply (non-blocking).
1970 ///
1971 /// The caller must subsequently poll [`try_recv_reply_raw`](Self::try_recv_reply_raw)
1972 /// to retrieve the reply.
1973 fn send_request_raw(&mut self, request: &[u8]) -> Result<(), Self::Error>;
1974
1975 /// Poll for a reply to the most recently sent request (non-blocking).
1976 ///
1977 /// Returns `Ok(Some(len))` when a reply has arrived, `Ok(None)` if not yet
1978 /// available, or `Err` on failure.
1979 fn try_recv_reply_raw(&mut self, reply_buf: &mut [u8]) -> Result<Option<usize>, Self::Error>;
1980
1981 /// Send a typed service request without waiting for a reply (non-blocking).
1982 ///
1983 /// Serializes the request into `req_buf` and calls [`send_request_raw`](Self::send_request_raw).
1984 fn send_request<S: RosService>(
1985 &mut self,
1986 request: &S::Request,
1987 req_buf: &mut [u8],
1988 ) -> Result<(), Self::Error>
1989 where
1990 Self::Error: From<TransportError>,
1991 {
1992 use nros_core::CdrWriter;
1993
1994 let mut writer =
1995 CdrWriter::new_with_header(req_buf).map_err(|_| TransportError::BufferTooSmall)?;
1996 request
1997 .serialize(&mut writer)
1998 .map_err(|_| TransportError::SerializationError)?;
1999 let req_len = writer.position();
2000
2001 self.send_request_raw(&req_buf[..req_len])
2002 }
2003
2004 /// Poll for a typed reply to the most recently sent request (non-blocking).
2005 ///
2006 /// Calls [`try_recv_reply_raw`](Self::try_recv_reply_raw) and deserializes if available.
2007 fn try_recv_reply<S: RosService>(
2008 &mut self,
2009 reply_buf: &mut [u8],
2010 ) -> Result<Option<S::Reply>, Self::Error>
2011 where
2012 Self::Error: From<TransportError>,
2013 {
2014 use nros_core::CdrReader;
2015
2016 match self.try_recv_reply_raw(reply_buf)? {
2017 Some(len) => {
2018 let mut reader = CdrReader::new_with_header(&reply_buf[..len])
2019 .map_err(|_| TransportError::DeserializationError)?;
2020 let reply = S::Reply::deserialize(&mut reader)
2021 .map_err(|_| TransportError::DeserializationError)?;
2022 Ok(Some(reply))
2023 }
2024 None => Ok(None),
2025 }
2026 }
2027
2028 /// Register an async waker to be notified when a reply arrives.
2029 ///
2030 /// Called from `Future::poll()` implementations to store the waker.
2031 /// The transport backend calls `waker.wake()` from its reply callback
2032 /// when a response is available, enabling event-driven async without
2033 /// busy-polling.
2034 ///
2035 /// Default: no-op (backends that don't support waking simply ignore this).
2036 fn register_waker(&self, _waker: &core::task::Waker) {}
2037
2038 /// Begin a server-discovery query on this client (non-blocking).
2039 ///
2040 /// Models `rclcpp::ClientBase::wait_for_service` machinery: the backend
2041 /// fires off a discovery probe (typically a Zenoh liveliness query
2042 /// against the matching server's wildcarded liveliness keyexpr) and
2043 /// the caller polls [`poll_server_discovery`](Self::poll_server_discovery)
2044 /// to collect the result.
2045 ///
2046 /// Default impl: no-op success. Backends without a discovery channel
2047 /// (or those that always assume the server is reachable) can leave
2048 /// this default and have `poll_server_discovery` return
2049 /// `Ok(Some(true))` immediately.
2050 fn start_server_discovery(&mut self, _timeout_ms: u32) -> Result<(), Self::Error> {
2051 Ok(())
2052 }
2053
2054 /// Poll an in-flight server-discovery query.
2055 ///
2056 /// - `Ok(Some(true))` — at least one matching server has reported
2057 /// back; safe to send the first request.
2058 /// - `Ok(Some(false))` — discovery query finished without finding
2059 /// any matching server (timeout / no-replies).
2060 /// - `Ok(None)` — query still in flight.
2061 /// - `Err(_)` — transport-level failure unrelated to server presence.
2062 ///
2063 /// Default impl: returns `Ok(Some(true))` (i.e., "server is always
2064 /// assumed reachable"). The Zenoh backend overrides this with a
2065 /// liveliness-token check.
2066 fn poll_server_discovery(&mut self) -> Result<Option<bool>, Self::Error> {
2067 Ok(Some(true))
2068 }
2069
2070 /// Synchronous, non-blocking check of whether a matching server is
2071 /// currently visible.
2072 ///
2073 /// Mirrors `rclcpp::ClientBase::service_is_ready`. Backends that lack
2074 /// discovery should keep the default `true` so existing call sites
2075 /// don't regress.
2076 ///
2077 /// Default impl: always `true`.
2078 fn is_server_ready(&self) -> bool {
2079 true
2080 }
2081
2082 /// Phase 124.C.1 — graph-aware server-availability probe.
2083 ///
2084 /// Returns `Ok(true)` if at least one matching server has been
2085 /// discovered, `Ok(false)` if none yet, or `Err(_)` if the
2086 /// backend cannot answer (e.g. XRCE — micro-XRCE-DDS-Client has
2087 /// no participant enumeration). Distinct from
2088 /// [`is_server_ready`](Self::is_server_ready), which collapses
2089 /// "don't know" and "no server" into the same `false` answer.
2090 ///
2091 /// User-facing surface: `Client<S>::server_available()` in Rust,
2092 /// `nros_client_server_available()` in C/C++. Clients use this
2093 /// to gate the first `call_raw` so a startup-ordering race
2094 /// (client opens before server's discovery announcement lands)
2095 /// doesn't surface as a request-side timeout.
2096 ///
2097 /// Default impl: `Err(TransportError::Unsupported)` — backends
2098 /// that support graph introspection (zenoh queryable interest,
2099 /// DDS built-in topic readers) opt in by overriding.
2100 fn server_available(&self) -> Result<bool, Self::Error>
2101 where
2102 Self::Error: From<TransportError>,
2103 {
2104 Err(TransportError::Unsupported.into())
2105 }
2106
2107 /// Call a service with typed messages (blocking).
2108 ///
2109 /// **Deprecated — do not call.** The default body returns `Timeout`
2110 /// immediately without polling. Use `Client::call` on the executor
2111 /// instead, which drives I/O while waiting. See
2112 /// [`call_raw`](Self::call_raw) for the same reasoning.
2113 #[deprecated(note = "use Client::call → Promise::wait with an executor instead")]
2114 fn call<S: RosService>(
2115 &mut self,
2116 request: &S::Request,
2117 req_buf: &mut [u8],
2118 _reply_buf: &mut [u8],
2119 ) -> Result<S::Reply, Self::Error>
2120 where
2121 Self::Error: From<TransportError>,
2122 {
2123 use nros_core::CdrWriter;
2124
2125 // Serialize request so the error surface matches the old impl
2126 // for the "bad request" path; but skip the receive busy-loop.
2127 let mut writer =
2128 CdrWriter::new_with_header(req_buf).map_err(|_| TransportError::BufferTooSmall)?;
2129 request
2130 .serialize(&mut writer)
2131 .map_err(|_| TransportError::SerializationError)?;
2132 Err(TransportError::Timeout.into())
2133 }
2134}
2135
2136/// Transport backend trait (legacy).
2137///
2138/// Use [`Rmw`] for new code. This trait is retained for backward compatibility
2139/// with existing code that uses [`TransportConfig`] directly.
2140pub trait Transport {
2141 /// Error type for this transport
2142 type Error;
2143 /// Session type for this transport
2144 type Session: Session;
2145
2146 /// Open a new session with the given configuration
2147 fn open(config: &TransportConfig) -> Result<Self::Session, Self::Error>;
2148}
2149
2150/// Factory trait for compile-time middleware selection.
2151///
2152/// Embedded crates select a backend via feature flag:
2153/// ```rust,ignore
2154/// #[cfg(feature = "rmw-cffi")]
2155/// type DefaultRmw = nros_rmw_cffi::CffiRmw;
2156/// ```
2157///
2158/// Each backend provides its own `Rmw` implementation that bridges
2159/// from the middleware-agnostic [`RmwConfig`] to backend-specific
2160/// initialization.
2161///
2162/// Phase 84.E2: `open` consumes `self`. Backends carry their own
2163/// configuration (agent addresses, serial ports, TLS CA slots)
2164/// inside the factory value and hand that over to the session at
2165/// `open` time. All in-repo backends also implement
2166/// [`Default`]; most callers spell this as
2167/// `BackendRmw::default().open(&config)`.
2168pub trait Rmw {
2169 /// Session type returned by [`open`](Rmw::open)
2170 type Session: Session;
2171 /// Error type for session creation
2172 type Error: core::fmt::Debug;
2173
2174 /// Open a new middleware session with the given configuration.
2175 ///
2176 /// The backend maps [`RmwConfig`] fields to its own connection
2177 /// parameters (e.g., zenoh locator and session mode, XRCE-DDS
2178 /// agent address). Any backend-specific pre-open state stored
2179 /// on `self` (e.g. configured agent IP / port) is moved into the
2180 /// returned `Session`.
2181 fn open(self, config: &RmwConfig) -> Result<Self::Session, Self::Error>;
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186 use super::*;
2187
2188 #[test]
2189 fn test_topic_info() {
2190 let topic = TopicInfo::new("/chatter", "std_msgs::msg::dds_::String_", "abc123");
2191 assert_eq!(topic.name, "/chatter");
2192 assert_eq!(topic.domain_id, 0);
2193 }
2194
2195 #[test]
2196 fn qos_apply_overrides_matches_topic_and_role() {
2197 // Default is Reliable / Volatile / KeepLast(10).
2198 static OVERRIDES: &[QosOverride] = &[
2199 QosOverride {
2200 topic: "/chatter",
2201 role: QosOverrideRole::Publisher,
2202 value: QosOverrideValue::Reliability(QosReliabilityPolicy::BestEffort),
2203 },
2204 QosOverride {
2205 topic: "/chatter",
2206 role: QosOverrideRole::Publisher,
2207 value: QosOverrideValue::Depth(5),
2208 },
2209 QosOverride {
2210 topic: "/scan",
2211 role: QosOverrideRole::Subscription,
2212 value: QosOverrideValue::Durability(QosDurabilityPolicy::TransientLocal),
2213 },
2214 ];
2215
2216 // Matching topic + publisher role → reliability + depth applied.
2217 let pub_qos = QosSettings::default().apply_overrides(
2218 "/chatter",
2219 QosOverrideRole::Publisher,
2220 OVERRIDES,
2221 );
2222 assert_eq!(pub_qos.reliability, QosReliabilityPolicy::BestEffort);
2223 assert_eq!(pub_qos.depth, 5);
2224 assert_eq!(pub_qos.durability, QosDurabilityPolicy::Volatile); // untouched
2225
2226 // Same topic but subscription role → publisher overrides DON'T apply;
2227 // the /scan override is for a different topic → also no change.
2228 let sub_qos = QosSettings::default().apply_overrides(
2229 "/chatter",
2230 QosOverrideRole::Subscription,
2231 OVERRIDES,
2232 );
2233 assert_eq!(sub_qos, QosSettings::default());
2234
2235 // The /scan subscription override applies only to /scan+subscription.
2236 let scan_qos = QosSettings::default().apply_overrides(
2237 "/scan",
2238 QosOverrideRole::Subscription,
2239 OVERRIDES,
2240 );
2241 assert_eq!(scan_qos.durability, QosDurabilityPolicy::TransientLocal);
2242
2243 // Empty table → identity (the zero-override fast path).
2244 assert_eq!(
2245 QosSettings::default().apply_overrides("/x", QosOverrideRole::Publisher, &[]),
2246 QosSettings::default()
2247 );
2248 }
2249
2250 #[test]
2251 fn test_qos_defaults() {
2252 let qos = QosSettings::default();
2253 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2254 }
2255
2256 #[test]
2257 fn test_action_info() {
2258 let action = ActionInfo::new(
2259 "/fibonacci",
2260 "example_interfaces::action::dds_::Fibonacci_",
2261 "abc123",
2262 );
2263 assert_eq!(action.name, "/fibonacci");
2264 assert_eq!(action.domain_id, 0);
2265 }
2266
2267 #[test]
2268 fn test_action_info_with_domain() {
2269 let action = ActionInfo::new(
2270 "/fibonacci",
2271 "example_interfaces::action::dds_::Fibonacci_",
2272 "abc123",
2273 )
2274 .with_domain(42);
2275 assert_eq!(action.domain_id, 42);
2276 }
2277
2278 #[test]
2279 fn test_action_send_goal_key() {
2280 let action = ActionInfo::new(
2281 "/fibonacci",
2282 "example_interfaces::action::dds_::Fibonacci_",
2283 "abc123",
2284 )
2285 .with_domain(0);
2286
2287 let key: heapless::String<256> = action.send_goal_key();
2288 // ActionInfo returns the sub-entity name with leading slash for ROS 2 compatibility
2289 assert_eq!(key.as_str(), "/fibonacci/_action/send_goal");
2290 }
2291
2292 #[test]
2293 fn test_action_feedback_key() {
2294 let action = ActionInfo::new(
2295 "/fibonacci",
2296 "example_interfaces::action::dds_::Fibonacci_",
2297 "abc123",
2298 )
2299 .with_domain(0);
2300
2301 let key: heapless::String<256> = action.feedback_key();
2302 assert_eq!(key.as_str(), "/fibonacci/_action/feedback");
2303 }
2304
2305 #[test]
2306 fn test_action_all_sub_names() {
2307 let action = ActionInfo::new(
2308 "/fibonacci",
2309 "example_interfaces::action::dds_::Fibonacci_",
2310 "abc123",
2311 )
2312 .with_domain(0);
2313
2314 let cancel: heapless::String<256> = action.cancel_goal_key();
2315 assert_eq!(cancel.as_str(), "/fibonacci/_action/cancel_goal");
2316
2317 let result: heapless::String<256> = action.get_result_key();
2318 assert_eq!(result.as_str(), "/fibonacci/_action/get_result");
2319
2320 let status: heapless::String<256> = action.status_key();
2321 assert_eq!(status.as_str(), "/fibonacci/_action/status");
2322 }
2323
2324 // --- QoS Profile Tests ---
2325
2326 #[test]
2327 fn test_qos_profile_sensor_data() {
2328 let qos = QosSettings::QOS_PROFILE_SENSOR_DATA;
2329 assert_eq!(qos.reliability, QosReliabilityPolicy::BestEffort);
2330 assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2331 assert_eq!(qos.history, QosHistoryPolicy::KeepLast);
2332 assert_eq!(qos.depth, 5);
2333 }
2334
2335 #[test]
2336 fn test_qos_profile_default() {
2337 let qos = QosSettings::QOS_PROFILE_DEFAULT;
2338 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2339 assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2340 assert_eq!(qos.depth, 10);
2341 }
2342
2343 #[test]
2344 fn test_qos_profile_services_default() {
2345 let qos = QosSettings::QOS_PROFILE_SERVICES_DEFAULT;
2346 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2347 assert_eq!(qos.durability, QosDurabilityPolicy::Volatile);
2348 }
2349
2350 #[test]
2351 fn services_default_validates_and_rejects_missing_policy() {
2352 // Phase 193.5 — the service-create path (node `create_service*_sized` +
2353 // the typed-arena `register_service*_on`) runs `validate_against` on the
2354 // caller's profile, exactly like pub/sub. A backend advertising the
2355 // profile's required policies admits it; dropping any required bit (here
2356 // RELIABILITY) rejects it with `IncompatibleQos` — no silent downgrade.
2357 let qos = QosSettings::services_default();
2358 let required = qos.required_policies();
2359 assert!(qos.validate_against(required).is_ok());
2360 assert!(qos.validate_against(QosPolicyMask(u32::MAX)).is_ok());
2361 let missing = QosPolicyMask(required.0 & !QosPolicyMask::RELIABILITY.0);
2362 assert_eq!(
2363 qos.validate_against(missing),
2364 Err(TransportError::IncompatibleQos)
2365 );
2366 }
2367
2368 #[test]
2369 fn test_qos_profile_parameters() {
2370 let qos = QosSettings::QOS_PROFILE_PARAMETERS;
2371 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2372 assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2373 assert_eq!(qos.depth, 1000);
2374 }
2375
2376 #[test]
2377 fn test_qos_profile_clock() {
2378 let qos = QosSettings::QOS_PROFILE_CLOCK;
2379 assert_eq!(qos.reliability, QosReliabilityPolicy::BestEffort);
2380 assert_eq!(qos.depth, 1);
2381 }
2382
2383 #[test]
2384 fn test_qos_profile_parameter_events() {
2385 let qos = QosSettings::QOS_PROFILE_PARAMETER_EVENTS;
2386 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2387 assert_eq!(qos.history, QosHistoryPolicy::KeepAll);
2388 }
2389
2390 #[test]
2391 fn test_qos_profile_action_status() {
2392 let qos = QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT;
2393 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2394 assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2395 assert_eq!(qos.depth, 1);
2396 }
2397
2398 #[test]
2399 fn test_qos_static_constructors() {
2400 assert_eq!(
2401 QosSettings::topics_default(),
2402 QosSettings::QOS_PROFILE_DEFAULT
2403 );
2404 assert_eq!(
2405 QosSettings::sensor_data_default(),
2406 QosSettings::QOS_PROFILE_SENSOR_DATA
2407 );
2408 assert_eq!(
2409 QosSettings::services_default(),
2410 QosSettings::QOS_PROFILE_SERVICES_DEFAULT
2411 );
2412 assert_eq!(
2413 QosSettings::parameters_default(),
2414 QosSettings::QOS_PROFILE_PARAMETERS
2415 );
2416 assert_eq!(
2417 QosSettings::action_status_default(),
2418 QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT
2419 );
2420 }
2421
2422 #[test]
2423 fn test_qos_builder_explicit_setters() {
2424 let qos = QosSettings::new()
2425 .reliability(QosReliabilityPolicy::Reliable)
2426 .durability(QosDurabilityPolicy::TransientLocal)
2427 .history(QosHistoryPolicy::KeepAll)
2428 .depth(100);
2429
2430 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2431 assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2432 assert_eq!(qos.history, QosHistoryPolicy::KeepAll);
2433 assert_eq!(qos.depth, 100);
2434 }
2435
2436 #[test]
2437 fn test_qos_builder_chaining() {
2438 // Test that builder methods can be chained in any order
2439 let qos = QosSettings::sensor_data_default()
2440 .reliable()
2441 .transient_local()
2442 .keep_last(20);
2443
2444 assert_eq!(qos.reliability, QosReliabilityPolicy::Reliable);
2445 assert_eq!(qos.durability, QosDurabilityPolicy::TransientLocal);
2446 assert_eq!(qos.history, QosHistoryPolicy::KeepLast);
2447 assert_eq!(qos.depth, 20);
2448 }
2449
2450 #[test]
2451 fn test_qos_eq_impl() {
2452 // Verify that PartialEq works correctly via derive on QosSettings
2453 let qos1 = QosSettings::QOS_PROFILE_DEFAULT;
2454 let qos2 = QosSettings::topics_default();
2455 // Both should have same values - verify field by field
2456 assert_eq!(qos1.reliability, qos2.reliability);
2457 assert_eq!(qos1.durability, qos2.durability);
2458 assert_eq!(qos1.history, qos2.history);
2459 assert_eq!(qos1.depth, qos2.depth);
2460 }
2461
2462 // --- Locator validation tests ---
2463
2464 #[test]
2465 fn test_locator_protocol_tcp() {
2466 assert_eq!(locator_protocol("tcp/127.0.0.1:7447"), LocatorProtocol::Tcp);
2467 }
2468
2469 #[test]
2470 fn test_locator_protocol_serial() {
2471 assert_eq!(
2472 locator_protocol("serial//dev/ttyUSB0#baudrate=115200"),
2473 LocatorProtocol::Serial
2474 );
2475 }
2476
2477 #[test]
2478 fn test_locator_protocol_unknown() {
2479 assert_eq!(locator_protocol(""), LocatorProtocol::Unknown);
2480 assert_eq!(locator_protocol("http://foo"), LocatorProtocol::Unknown);
2481 assert_eq!(locator_protocol("tls/host:port"), LocatorProtocol::Unknown);
2482 }
2483
2484 #[test]
2485 fn test_locator_protocol_udp() {
2486 assert_eq!(locator_protocol("udp/127.0.0.1:7447"), LocatorProtocol::Udp);
2487 assert_eq!(
2488 locator_protocol("udp/192.168.1.50:2019"),
2489 LocatorProtocol::Udp
2490 );
2491 }
2492
2493 #[test]
2494 fn test_validate_tcp_locator_ok() {
2495 assert!(validate_locator("tcp/127.0.0.1:7447").is_ok());
2496 assert!(validate_locator("tcp/192.168.1.1:7447").is_ok());
2497 }
2498
2499 #[test]
2500 fn test_validate_tcp_locator_missing_port() {
2501 assert!(validate_locator("tcp/127.0.0.1").is_err());
2502 }
2503
2504 #[test]
2505 fn test_validate_serial_locator_ok() {
2506 assert!(validate_locator("serial//dev/ttyUSB0#baudrate=115200").is_ok());
2507 assert!(validate_locator("serial//dev/ttyACM0#baudrate=9600").is_ok());
2508 assert!(validate_locator("serial/uart1#baudrate=921600").is_ok());
2509 }
2510
2511 #[test]
2512 fn test_validate_serial_locator_empty_device() {
2513 assert!(validate_locator("serial/").is_err());
2514 }
2515
2516 #[test]
2517 fn test_validate_serial_locator_missing_baudrate() {
2518 assert!(validate_locator("serial//dev/ttyUSB0").is_err());
2519 }
2520
2521 #[test]
2522 fn test_validate_serial_locator_invalid_baudrate() {
2523 assert!(validate_locator("serial//dev/ttyUSB0#baudrate=abc").is_err());
2524 }
2525
2526 #[test]
2527 fn test_validate_unknown_protocol() {
2528 assert!(validate_locator("http://foo").is_err());
2529 assert!(validate_locator("tls/host:port").is_err());
2530 }
2531
2532 #[test]
2533 fn test_validate_udp_locator_ok() {
2534 assert!(validate_locator("udp/127.0.0.1:7447").is_ok());
2535 assert!(validate_locator("udp/192.168.1.50:2019").is_ok());
2536 }
2537
2538 #[test]
2539 fn test_validate_udp_locator_missing_port() {
2540 assert!(validate_locator("udp/127.0.0.1").is_err());
2541 }
2542
2543 // Phase 233.2 — the PX4 companion QoS profile must be BEST_EFFORT +
2544 // TRANSIENT_LOCAL + KEEP_LAST so it matches PX4's uxrce_dds_client endpoints.
2545 #[test]
2546 fn px4_qos_profile_matches_uxrce_dds_client() {
2547 let q = QosSettings::px4();
2548 assert_eq!(q.reliability, QosReliabilityPolicy::BestEffort);
2549 // VOLATILE — PX4's /fmu/out writers are volatile; a TRANSIENT_LOCAL
2550 // reader silently fails to match (verified against real PX4 SITL).
2551 assert_eq!(q.durability, QosDurabilityPolicy::Volatile);
2552 assert_eq!(q.history, QosHistoryPolicy::KeepLast);
2553 assert_eq!(q, QosSettings::QOS_PROFILE_PX4);
2554 // Depth is tunable via the builder without losing the PX4 policies.
2555 let deep = QosSettings::px4().keep_last(5);
2556 assert_eq!(deep.depth, 5);
2557 assert_eq!(deep.reliability, QosReliabilityPolicy::BestEffort);
2558 assert_eq!(deep.durability, QosDurabilityPolicy::Volatile);
2559 }
2560
2561 // --- RmwConfig Tests ---
2562
2563 #[test]
2564 fn test_rmw_config_default() {
2565 let config = RmwConfig::default();
2566 assert_eq!(config.locator, "tcp/127.0.0.1:7447");
2567 assert_eq!(config.mode, SessionMode::Client);
2568 assert_eq!(config.domain_id, 0);
2569 assert_eq!(config.node_name, "node");
2570 assert_eq!(config.namespace, "");
2571 }
2572
2573 #[test]
2574 fn test_rmw_config_custom() {
2575 let config = RmwConfig {
2576 locator: "tcp/192.168.1.1:7447",
2577 mode: SessionMode::Peer,
2578 domain_id: 42,
2579 node_name: "talker",
2580 namespace: "/ns1",
2581 properties: &[("agent_port", "2019")],
2582 };
2583 assert_eq!(config.locator, "tcp/192.168.1.1:7447");
2584 assert_eq!(config.mode, SessionMode::Peer);
2585 assert_eq!(config.domain_id, 42);
2586 assert_eq!(config.node_name, "talker");
2587 assert_eq!(config.namespace, "/ns1");
2588 assert_eq!(config.properties.len(), 1);
2589 assert_eq!(config.properties[0].0, "agent_port");
2590 }
2591
2592 #[test]
2593 fn test_rmw_config_is_copy() {
2594 let config = RmwConfig::default();
2595 let config2 = config; // Copy
2596 assert_eq!(config.locator, config2.locator);
2597 assert_eq!(config.domain_id, config2.domain_id);
2598 }
2599
2600 #[test]
2601 fn test_rmw_config_clone() {
2602 let config = RmwConfig::default();
2603 let cloned = RmwConfig { ..config };
2604 assert_eq!(cloned.locator, config.locator);
2605 assert_eq!(cloned.node_name, config.node_name);
2606 }
2607}