1#![no_std]
19
20#[cfg(feature = "alloc")]
21extern crate alloc;
22
23#[cfg(feature = "std")]
24extern crate std;
25
26use core::{cell::UnsafeCell, ffi::c_void, sync::atomic::Ordering};
27
28use nros_rmw::{
29 MessageInfo, Publisher, QosDurabilityPolicy, QosHistoryPolicy, QosReliabilityPolicy,
30 QosSettings, ServiceClientTrait, ServiceInfo, ServiceRequest, ServiceServerTrait, Session,
31 TopicInfo, TransportError,
32};
33
34#[cfg(feature = "alloc")]
38pub mod rust_adapter;
39
40#[cfg(feature = "alloc")]
41pub use rust_adapter::{RustBackend, RustBackendAdapter};
42
43pub mod section;
46
47pub type NrosRmwRet = i32;
59
60pub const NROS_RMW_RET_OK: NrosRmwRet = 0;
62pub const NROS_RMW_RET_ERROR: NrosRmwRet = -1;
64pub const NROS_RMW_RET_TIMEOUT: NrosRmwRet = -2;
66pub const NROS_RMW_RET_BAD_ALLOC: NrosRmwRet = -3;
68pub const NROS_RMW_RET_INVALID_ARGUMENT: NrosRmwRet = -4;
70pub const NROS_RMW_RET_UNSUPPORTED: NrosRmwRet = -5;
72pub const NROS_RMW_RET_INCOMPATIBLE_QOS: NrosRmwRet = -6;
74pub const NROS_RMW_RET_TOPIC_NAME_INVALID: NrosRmwRet = -7;
76pub const NROS_RMW_RET_NODE_NAME_NON_EXISTENT: NrosRmwRet = -8;
78pub const NROS_RMW_RET_LOAN_NOT_SUPPORTED: NrosRmwRet = -9;
80pub const NROS_RMW_RET_NO_DATA: NrosRmwRet = -10;
82pub const NROS_RMW_RET_WOULD_BLOCK: NrosRmwRet = -11;
84pub const NROS_RMW_RET_BUFFER_TOO_SMALL: NrosRmwRet = -12;
86pub const NROS_RMW_RET_MESSAGE_TOO_LARGE: NrosRmwRet = -13;
88
89#[cfg(feature = "c-stub-test")]
94unsafe extern "C" {
95 fn nros_c_stub_make_ops(out: *mut core::ffi::c_void);
96 fn nros_c_stub_reset_counters();
97 fn nros_c_stub_get_open_calls() -> u32;
98 fn nros_c_stub_get_close_calls() -> u32;
99 fn nros_c_stub_get_write_calls() -> u32;
100 fn nros_c_stub_get_read_calls() -> u32;
101}
102#[cfg(feature = "c-stub-test")]
103#[doc(hidden)]
104pub fn _c_stub_transport_vtable_anchor() -> [*const core::ffi::c_void; 6] {
105 [
106 nros_c_stub_make_ops as *const _,
107 nros_c_stub_reset_counters as *const _,
108 nros_c_stub_get_open_calls as *const _,
109 nros_c_stub_get_close_calls as *const _,
110 nros_c_stub_get_write_calls as *const _,
111 nros_c_stub_get_read_calls as *const _,
112 ]
113}
114pub const NROS_RMW_RET_INCOMPATIBLE_ABI: NrosRmwRet = -14;
120
121pub const NROS_RMW_RET_NO_BACKEND: NrosRmwRet = -15;
125
126pub const NROS_RMW_RET_AMBIGUOUS_BACKEND: NrosRmwRet = -16;
130
131pub const NROS_RMW_RET_UNKNOWN_BACKEND: NrosRmwRet = -17;
134
135pub const NROS_RMW_RET_CONNECTION_FAILED: NrosRmwRet = -18;
141
142pub fn ret_from_error(err: &TransportError) -> NrosRmwRet {
149 match err {
150 TransportError::Timeout => NROS_RMW_RET_TIMEOUT,
151 TransportError::WouldBlock => NROS_RMW_RET_WOULD_BLOCK,
152 TransportError::TooLarge => NROS_RMW_RET_MESSAGE_TOO_LARGE,
153 TransportError::BufferTooSmall => NROS_RMW_RET_BUFFER_TOO_SMALL,
154 TransportError::MessageTooLarge => NROS_RMW_RET_MESSAGE_TOO_LARGE,
155 TransportError::InvalidArgument => NROS_RMW_RET_INVALID_ARGUMENT,
156 TransportError::InvalidConfig => NROS_RMW_RET_INVALID_ARGUMENT,
157 TransportError::Unsupported => NROS_RMW_RET_UNSUPPORTED,
158 TransportError::BadAlloc => NROS_RMW_RET_BAD_ALLOC,
159 TransportError::IncompatibleQos => NROS_RMW_RET_INCOMPATIBLE_QOS,
160 TransportError::TopicNameInvalid => NROS_RMW_RET_TOPIC_NAME_INVALID,
161 TransportError::NodeNameNonExistent => NROS_RMW_RET_NODE_NAME_NON_EXISTENT,
162 TransportError::LoanNotSupported => NROS_RMW_RET_LOAN_NOT_SUPPORTED,
163 TransportError::NoData => NROS_RMW_RET_NO_DATA,
164 TransportError::IncompatibleAbi => NROS_RMW_RET_INCOMPATIBLE_ABI,
165 TransportError::ConnectionFailed | TransportError::Disconnected => {
175 NROS_RMW_RET_CONNECTION_FAILED
176 }
177 _ => NROS_RMW_RET_ERROR,
181 }
182}
183
184pub fn error_from_ret(ret: NrosRmwRet) -> TransportError {
195 match ret {
196 NROS_RMW_RET_OK => {
197 TransportError::Backend("ok (logic error: positive ret_t at error site)")
198 }
199 NROS_RMW_RET_ERROR => TransportError::Backend("rmw_ret error"),
200 NROS_RMW_RET_TIMEOUT => TransportError::Timeout,
201 NROS_RMW_RET_BAD_ALLOC => TransportError::BadAlloc,
202 NROS_RMW_RET_INVALID_ARGUMENT => TransportError::InvalidArgument,
203 NROS_RMW_RET_UNSUPPORTED => TransportError::Unsupported,
204 NROS_RMW_RET_INCOMPATIBLE_QOS => TransportError::IncompatibleQos,
205 NROS_RMW_RET_TOPIC_NAME_INVALID => TransportError::TopicNameInvalid,
206 NROS_RMW_RET_NODE_NAME_NON_EXISTENT => TransportError::NodeNameNonExistent,
207 NROS_RMW_RET_LOAN_NOT_SUPPORTED => TransportError::LoanNotSupported,
208 NROS_RMW_RET_NO_DATA => TransportError::NoData,
209 NROS_RMW_RET_WOULD_BLOCK => TransportError::WouldBlock,
210 NROS_RMW_RET_BUFFER_TOO_SMALL => TransportError::BufferTooSmall,
211 NROS_RMW_RET_MESSAGE_TOO_LARGE => TransportError::MessageTooLarge,
212 NROS_RMW_RET_INCOMPATIBLE_ABI => TransportError::IncompatibleAbi,
213 NROS_RMW_RET_CONNECTION_FAILED => TransportError::ConnectionFailed,
220 _ => TransportError::Backend("unknown rmw_ret_t"),
221 }
222}
223
224#[repr(u8)]
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum NrosRmwLivelinessKind {
237 None = 0,
238 Automatic = 1,
239 ManualByTopic = 2,
240 ManualByNode = 3,
241}
242
243#[repr(C)]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct NrosRmwQos {
248 pub reliability: u8,
250 pub durability: u8,
252 pub history: u8,
254 pub liveliness_kind: u8,
256 pub depth: u16,
258 pub _reserved0: u16,
260
261 pub deadline_ms: u32,
264 pub lifespan_ms: u32,
266 pub liveliness_lease_ms: u32,
268 pub avoid_ros_namespace_conventions: u8,
272 pub _reserved1: [u8; 3],
274 pub rx_buffer_hint: u32,
280}
281
282pub const NROS_RMW_QOS_PROFILE_DEFAULT: NrosRmwQos = NrosRmwQos {
284 reliability: 1, durability: 0, history: 0, liveliness_kind: NrosRmwLivelinessKind::Automatic as u8,
288 depth: 10,
289 _reserved0: 0,
290 deadline_ms: 0,
291 lifespan_ms: 0,
292 liveliness_lease_ms: 0,
293 avoid_ros_namespace_conventions: 0,
294 _reserved1: [0; 3],
295 rx_buffer_hint: 0,
296};
297
298pub const NROS_RMW_QOS_PROFILE_SENSOR_DATA: NrosRmwQos = NrosRmwQos {
300 reliability: 0, durability: 0, history: 0, liveliness_kind: NrosRmwLivelinessKind::Automatic as u8,
304 depth: 5,
305 _reserved0: 0,
306 deadline_ms: 0,
307 lifespan_ms: 0,
308 liveliness_lease_ms: 0,
309 avoid_ros_namespace_conventions: 0,
310 _reserved1: [0; 3],
311 rx_buffer_hint: 0,
312};
313
314pub const NROS_RMW_QOS_PROFILE_SERVICES_DEFAULT: NrosRmwQos = NROS_RMW_QOS_PROFILE_DEFAULT;
316
317pub const NROS_RMW_QOS_PROFILE_PARAMETERS: NrosRmwQos = NrosRmwQos {
319 depth: 1000,
320 ..NROS_RMW_QOS_PROFILE_DEFAULT
321};
322
323pub const NROS_RMW_QOS_PROFILE_SYSTEM_DEFAULT: NrosRmwQos = NROS_RMW_QOS_PROFILE_DEFAULT;
325
326#[repr(C)]
328pub struct NrosRmwSession {
329 pub node_name: *const u8,
331 pub namespace_: *const u8,
333 pub _reserved: [u8; 8],
336 pub backend_data: *mut c_void,
338}
339
340#[repr(C)]
346pub struct NrosRmwPublisher {
347 pub topic_name: *const u8,
349 pub type_name: *const u8,
351 pub qos: NrosRmwQos,
352 pub can_loan_messages: bool,
354 pub _reserved: [u8; 7],
356 pub backend_data: *mut c_void,
358}
359
360#[repr(C)]
362pub struct NrosRmwSubscriber {
363 pub topic_name: *const u8,
365 pub type_name: *const u8,
367 pub qos: NrosRmwQos,
368 pub can_loan_messages: bool,
370 pub _reserved: [u8; 7],
372 pub backend_data: *mut c_void,
374}
375
376#[repr(C)]
378pub struct NrosRmwServiceServer {
379 pub service_name: *const u8,
381 pub type_name: *const u8,
383 pub _reserved: [u8; 8],
385 pub backend_data: *mut c_void,
387}
388
389#[repr(C)]
391pub struct NrosRmwServiceClient {
392 pub service_name: *const u8,
394 pub type_name: *const u8,
396 pub _reserved: [u8; 8],
398 pub backend_data: *mut c_void,
400}
401
402impl From<QosSettings> for NrosRmwQos {
403 fn from(qos: QosSettings) -> Self {
404 Self {
405 reliability: match qos.reliability {
406 QosReliabilityPolicy::BestEffort => 0,
407 QosReliabilityPolicy::Reliable => 1,
408 },
409 durability: match qos.durability {
410 QosDurabilityPolicy::Volatile => 0,
411 QosDurabilityPolicy::TransientLocal => 1,
412 },
413 history: match qos.history {
414 QosHistoryPolicy::KeepLast => 0,
415 QosHistoryPolicy::KeepAll => 1,
416 },
417 liveliness_kind: qos.liveliness_kind as u8,
418 depth: qos.depth.min(u16::MAX as u32) as u16,
422 _reserved0: 0,
423 deadline_ms: qos.deadline_ms,
424 lifespan_ms: qos.lifespan_ms,
425 liveliness_lease_ms: qos.liveliness_lease_ms,
426 avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions as u8,
427 _reserved1: [0; 3],
428 rx_buffer_hint: 0,
429 }
430 }
431}
432
433#[repr(C)]
445pub struct NrosRmwVtable {
446 pub open: unsafe extern "C" fn(
448 locator: *const u8,
449 mode: u8,
450 domain_id: u32,
451 node_name: *const u8,
452 out: *mut NrosRmwSession,
453 ) -> NrosRmwRet,
454 pub close: unsafe extern "C" fn(session: *mut NrosRmwSession) -> NrosRmwRet,
455 pub drive_io: unsafe extern "C" fn(session: *mut NrosRmwSession, timeout_ms: i32) -> NrosRmwRet,
456
457 pub create_publisher: unsafe extern "C" fn(
459 session: *mut NrosRmwSession,
460 topic_name: *const u8,
461 type_name: *const u8,
462 type_hash: *const u8,
463 domain_id: u32,
464 qos: *const NrosRmwQos,
465 out: *mut NrosRmwPublisher,
466 ) -> NrosRmwRet,
467 pub destroy_publisher: unsafe extern "C" fn(publisher: *mut NrosRmwPublisher),
468 pub publish_raw: unsafe extern "C" fn(
469 publisher: *mut NrosRmwPublisher,
470 data: *const u8,
471 len: usize,
472 ) -> NrosRmwRet,
473
474 pub create_subscriber: unsafe extern "C" fn(
476 session: *mut NrosRmwSession,
477 topic_name: *const u8,
478 type_name: *const u8,
479 type_hash: *const u8,
480 domain_id: u32,
481 qos: *const NrosRmwQos,
482 out: *mut NrosRmwSubscriber,
483 ) -> NrosRmwRet,
484 pub destroy_subscriber: unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber),
485 pub try_recv_raw: unsafe extern "C" fn(
486 subscriber: *mut NrosRmwSubscriber,
487 buf: *mut u8,
488 buf_len: usize,
489 ) -> i32,
490 pub has_data: unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber) -> i32,
491
492 pub create_service_server: unsafe extern "C" fn(
495 session: *mut NrosRmwSession,
496 service_name: *const u8,
497 type_name: *const u8,
498 type_hash: *const u8,
499 domain_id: u32,
500 qos: *const NrosRmwQos,
501 out: *mut NrosRmwServiceServer,
502 ) -> NrosRmwRet,
503 pub destroy_service_server: unsafe extern "C" fn(server: *mut NrosRmwServiceServer),
504 pub try_recv_request: unsafe extern "C" fn(
505 server: *mut NrosRmwServiceServer,
506 buf: *mut u8,
507 buf_len: usize,
508 seq_out: *mut i64,
509 ) -> i32,
510 pub has_request: unsafe extern "C" fn(server: *mut NrosRmwServiceServer) -> i32,
511 pub send_reply: unsafe extern "C" fn(
512 server: *mut NrosRmwServiceServer,
513 seq: i64,
514 data: *const u8,
515 len: usize,
516 ) -> NrosRmwRet,
517
518 pub create_service_client: unsafe extern "C" fn(
520 session: *mut NrosRmwSession,
521 service_name: *const u8,
522 type_name: *const u8,
523 type_hash: *const u8,
524 domain_id: u32,
525 qos: *const NrosRmwQos,
526 out: *mut NrosRmwServiceClient,
527 ) -> NrosRmwRet,
528 pub destroy_service_client: unsafe extern "C" fn(client: *mut NrosRmwServiceClient),
529 pub call_raw: unsafe extern "C" fn(
530 client: *mut NrosRmwServiceClient,
531 request: *const u8,
532 req_len: usize,
533 reply_buf: *mut u8,
534 reply_buf_len: usize,
535 ) -> i32,
536
537 pub send_request_raw: Option<
539 unsafe extern "C" fn(
540 client: *mut NrosRmwServiceClient,
541 request: *const u8,
542 req_len: usize,
543 ) -> NrosRmwRet,
544 >,
545 pub try_recv_reply_raw: Option<
546 unsafe extern "C" fn(
547 client: *mut NrosRmwServiceClient,
548 reply_buf: *mut u8,
549 reply_buf_len: usize,
550 ) -> i32,
551 >,
552
553 pub register_subscriber_event: unsafe extern "C" fn(
555 subscriber: *mut NrosRmwSubscriber,
556 kind: NrosRmwEventKind,
557 deadline_ms: u32,
558 cb: NrosRmwEventCallback,
559 user_context: *mut c_void,
560 ) -> NrosRmwRet,
561
562 pub register_publisher_event: unsafe extern "C" fn(
563 publisher: *mut NrosRmwPublisher,
564 kind: NrosRmwEventKind,
565 deadline_ms: u32,
566 cb: NrosRmwEventCallback,
567 user_context: *mut c_void,
568 ) -> NrosRmwRet,
569
570 pub assert_publisher_liveliness:
572 unsafe extern "C" fn(publisher: *mut NrosRmwPublisher) -> NrosRmwRet,
573
574 pub next_deadline_ms: Option<unsafe extern "C" fn(session: *const NrosRmwSession) -> i32>,
578
579 pub set_wake_callback: Option<
589 unsafe extern "C" fn(
590 session: *mut NrosRmwSession,
591 cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
592 ctx: *mut core::ffi::c_void,
593 ) -> NrosRmwRet,
594 >,
595
596 pub pub_loan: Option<
601 unsafe extern "C" fn(
602 publisher: *mut NrosRmwPublisher,
603 requested_len: usize,
604 out_buf: *mut *mut u8,
605 out_cap: *mut usize,
606 out_token: *mut *mut core::ffi::c_void,
607 ) -> NrosRmwRet,
608 >,
609 pub pub_commit: Option<
612 unsafe extern "C" fn(
613 publisher: *mut NrosRmwPublisher,
614 token: *mut core::ffi::c_void,
615 actual_len: usize,
616 ) -> NrosRmwRet,
617 >,
618 pub pub_discard: Option<
621 unsafe extern "C" fn(publisher: *mut NrosRmwPublisher, token: *mut core::ffi::c_void),
622 >,
623
624 pub sub_borrow: Option<
629 unsafe extern "C" fn(
630 subscriber: *mut NrosRmwSubscriber,
631 out_buf: *mut *const u8,
632 out_len: *mut usize,
633 out_token: *mut *mut core::ffi::c_void,
634 ) -> i32,
635 >,
636 pub sub_release: Option<
639 unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber, token: *mut core::ffi::c_void),
640 >,
641
642 pub service_server_available:
652 Option<unsafe extern "C" fn(client: *mut NrosRmwServiceClient) -> i32>,
653
654 pub try_recv_sequence: Option<
667 unsafe extern "C" fn(
668 subscriber: *mut NrosRmwSubscriber,
669 buf: *mut u8,
670 per_msg_cap: usize,
671 max_msgs: usize,
672 out_lens: *mut usize,
673 ) -> i32,
674 >,
675
676 pub publish_streamed: Option<
688 unsafe extern "C" fn(
689 publisher: *mut NrosRmwPublisher,
690 size_cb: unsafe extern "C" fn(
691 out_total_len: *mut usize,
692 user_ctx: *mut core::ffi::c_void,
693 ),
694 chunk_cb: unsafe extern "C" fn(
695 out_buf: *mut u8,
696 cap: usize,
697 out_written: *mut usize,
698 user_ctx: *mut core::ffi::c_void,
699 ),
700 user_ctx: *mut core::ffi::c_void,
701 ) -> NrosRmwRet,
702 >,
703
704 pub ping_session:
714 Option<unsafe extern "C" fn(session: *mut NrosRmwSession, timeout_ms: i32) -> NrosRmwRet>,
715
716 pub subscriber_supports_in_place:
722 Option<unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber) -> i32>,
723
724 pub process_raw_in_place: Option<
731 unsafe extern "C" fn(
732 subscriber: *mut NrosRmwSubscriber,
733 ctx: *mut core::ffi::c_void,
734 cb: unsafe extern "C" fn(ctx: *mut core::ffi::c_void, ptr: *const u8, len: usize),
735 ) -> i32,
736 >,
737}
738
739#[repr(u8)]
746#[derive(Debug, Clone, Copy, PartialEq, Eq)]
747pub enum NrosRmwEventKind {
748 LivelinessChanged = 0,
749 RequestedDeadlineMissed = 1,
750 MessageLost = 2,
751 LivelinessLost = 3,
752 OfferedDeadlineMissed = 4,
753}
754
755impl From<nros_rmw::EventKind> for NrosRmwEventKind {
756 fn from(k: nros_rmw::EventKind) -> Self {
757 use nros_rmw::EventKind as K;
758 match k {
759 K::LivelinessChanged => NrosRmwEventKind::LivelinessChanged,
760 K::RequestedDeadlineMissed => NrosRmwEventKind::RequestedDeadlineMissed,
761 K::MessageLost => NrosRmwEventKind::MessageLost,
762 K::LivelinessLost => NrosRmwEventKind::LivelinessLost,
763 K::OfferedDeadlineMissed => NrosRmwEventKind::OfferedDeadlineMissed,
764 _ => NrosRmwEventKind::MessageLost, }
766 }
767}
768
769#[repr(C)]
771#[derive(Debug, Clone, Copy, Default)]
772pub struct NrosRmwLivelinessChangedStatus {
773 pub alive_count: u16,
774 pub not_alive_count: u16,
775 pub alive_count_change: i16,
776 pub not_alive_count_change: i16,
777}
778
779#[repr(C)]
781#[derive(Debug, Clone, Copy, Default)]
782pub struct NrosRmwCountStatus {
783 pub total_count: u32,
784 pub total_count_change: u32,
785}
786
787#[repr(C)]
790pub union NrosRmwEventPayload {
791 pub liveliness_changed: NrosRmwLivelinessChangedStatus,
792 pub count: NrosRmwCountStatus,
793}
794
795pub type NrosRmwEventCallback = unsafe extern "C" fn(
797 kind: NrosRmwEventKind,
798 payload: *const NrosRmwEventPayload,
799 user_context: *mut c_void,
800);
801
802pub const MAX_BACKENDS: usize = parse_max_backends(env!("NROS_RMW_MAX_BACKENDS"));
826
827const fn parse_max_backends(s: &str) -> usize {
828 let bytes = s.as_bytes();
829 let mut i = 0usize;
830 let mut acc: usize = 0;
831 while i < bytes.len() {
832 let d = bytes[i];
833 assert!(
834 d.is_ascii_digit(),
835 "NROS_RMW_MAX_BACKENDS must be a decimal integer"
836 );
837 acc = acc * 10 + (d - b'0') as usize;
838 i += 1;
839 }
840 acc
841}
842
843const BACKEND_NAME_MAX: usize = 32;
846
847#[repr(C)]
848struct BackendSlot {
849 name: [u8; BACKEND_NAME_MAX],
852 vtable: *const NrosRmwVtable,
853}
854
855impl BackendSlot {
856 const fn empty() -> Self {
857 Self {
858 name: [0u8; BACKEND_NAME_MAX],
859 vtable: core::ptr::null(),
860 }
861 }
862
863 #[inline]
864 fn is_empty(&self) -> bool {
865 self.name[0] == 0
866 }
867
868 #[inline]
869 fn name_matches(&self, candidate: &[u8]) -> bool {
870 if self.is_empty() {
871 return false;
872 }
873 let mut i = 0usize;
875 while i < self.name.len() && i < candidate.len() {
876 if self.name[i] == 0 {
877 return false; }
879 if self.name[i] != candidate[i] {
880 return false;
881 }
882 i += 1;
883 }
884 i == candidate.len() && (i == self.name.len() || self.name[i] == 0)
886 }
887}
888
889unsafe impl Sync for BackendSlot {}
895
896#[doc(hidden)]
910pub struct Registry {
911 slots: core::cell::UnsafeCell<[BackendSlot; MAX_BACKENDS]>,
912 len: portable_atomic::AtomicUsize,
913}
914
915impl Registry {
916 #[doc(hidden)]
917 pub const fn new() -> Self {
918 let slots = {
919 #[allow(clippy::declare_interior_mutable_const)]
920 const E: BackendSlot = BackendSlot::empty();
921 [E; MAX_BACKENDS]
922 };
923 Self {
924 slots: core::cell::UnsafeCell::new(slots),
925 len: portable_atomic::AtomicUsize::new(0),
926 }
927 }
928
929 #[inline]
932 unsafe fn slot(&self, i: usize) -> &BackendSlot {
933 unsafe { &(*self.slots.get())[i] }
936 }
937
938 #[inline]
942 #[allow(clippy::mut_from_ref)]
943 unsafe fn slot_mut(&self, i: usize) -> &mut BackendSlot {
944 unsafe { &mut (*self.slots.get())[i] }
946 }
947}
948
949unsafe impl Sync for Registry {}
951
952#[unsafe(no_mangle)]
960static REGISTRY: Registry = Registry::new();
961
962#[inline]
964fn registry() -> &'static Registry {
965 ®ISTRY
966}
967
968const MESSAGE_INFO_SLOTS: usize = 64;
980
981struct MessageInfoSlot {
982 key: portable_atomic::AtomicUsize,
983 valid: portable_atomic::AtomicBool,
984 info: UnsafeCell<MessageInfo>,
985 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
986 validate_requested: portable_atomic::AtomicBool,
987 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
988 integrity_valid: portable_atomic::AtomicBool,
989 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
990 integrity: UnsafeCell<nros_rmw::IntegrityStatus>,
991}
992
993impl MessageInfoSlot {
994 const fn empty() -> Self {
995 Self {
996 key: portable_atomic::AtomicUsize::new(0),
997 valid: portable_atomic::AtomicBool::new(false),
998 info: UnsafeCell::new(MessageInfo::new()),
999 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1000 validate_requested: portable_atomic::AtomicBool::new(false),
1001 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1002 integrity_valid: portable_atomic::AtomicBool::new(false),
1003 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1004 integrity: UnsafeCell::new(nros_rmw::IntegrityStatus {
1005 gap: 0,
1006 duplicate: false,
1007 crc_valid: None,
1008 }),
1009 }
1010 }
1011}
1012
1013unsafe impl Sync for MessageInfoSlot {}
1017
1018static MESSAGE_INFO_TABLE: [MessageInfoSlot; MESSAGE_INFO_SLOTS] = {
1019 #[allow(clippy::declare_interior_mutable_const)]
1020 const E: MessageInfoSlot = MessageInfoSlot::empty();
1021 [E; MESSAGE_INFO_SLOTS]
1022};
1023
1024fn lookup_message_info_slot(key: usize) -> Option<&'static MessageInfoSlot> {
1025 if key == 0 {
1026 return None;
1027 }
1028 MESSAGE_INFO_TABLE
1029 .iter()
1030 .find(|slot| slot.key.load(Ordering::Acquire) == key)
1031}
1032
1033#[cfg(feature = "alloc")]
1034fn get_or_insert_message_info_slot(key: usize) -> Option<&'static MessageInfoSlot> {
1035 if key == 0 {
1036 return None;
1037 }
1038 for slot in &MESSAGE_INFO_TABLE {
1039 let current = slot.key.load(Ordering::Acquire);
1040 if current == key {
1041 return Some(slot);
1042 }
1043 if current == 0
1044 && slot
1045 .key
1046 .compare_exchange(0, key, Ordering::AcqRel, Ordering::Acquire)
1047 .is_ok()
1048 {
1049 return Some(slot);
1050 }
1051 }
1052 None
1053}
1054
1055#[cfg(feature = "alloc")]
1056pub(crate) fn store_cffi_message_info(key: usize, info: Option<MessageInfo>) {
1057 let Some(slot) = get_or_insert_message_info_slot(key) else {
1058 return;
1059 };
1060 match info {
1061 Some(info) => {
1062 unsafe {
1066 *slot.info.get() = info;
1067 }
1068 slot.valid.store(true, Ordering::Release);
1069 }
1070 None => slot.valid.store(false, Ordering::Release),
1071 }
1072}
1073
1074fn take_cffi_message_info(key: usize) -> Option<MessageInfo> {
1075 let slot = lookup_message_info_slot(key)?;
1076 if !slot.valid.swap(false, Ordering::AcqRel) {
1077 return None;
1078 }
1079 Some(unsafe { *slot.info.get() })
1082}
1083
1084#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1085fn request_cffi_integrity_status(key: usize) {
1086 let Some(slot) = get_or_insert_message_info_slot(key) else {
1087 return;
1088 };
1089 slot.integrity_valid.store(false, Ordering::Release);
1090 slot.validate_requested.store(true, Ordering::Release);
1091}
1092
1093#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1094pub(crate) fn take_cffi_integrity_request(key: usize) -> bool {
1095 lookup_message_info_slot(key)
1096 .map(|slot| slot.validate_requested.swap(false, Ordering::AcqRel))
1097 .unwrap_or(false)
1098}
1099
1100#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1101pub(crate) fn store_cffi_integrity_status(key: usize, status: nros_rmw::IntegrityStatus) {
1102 let Some(slot) = get_or_insert_message_info_slot(key) else {
1103 return;
1104 };
1105 unsafe {
1108 *slot.integrity.get() = status;
1109 }
1110 slot.integrity_valid.store(true, Ordering::Release);
1111}
1112
1113#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1114fn take_cffi_integrity_status(key: usize) -> Option<nros_rmw::IntegrityStatus> {
1115 let slot = lookup_message_info_slot(key)?;
1116 if !slot.integrity_valid.swap(false, Ordering::AcqRel) {
1117 return None;
1118 }
1119 Some(unsafe { *slot.integrity.get() })
1120}
1121
1122fn clear_cffi_message_info(key: usize) {
1123 let Some(slot) = lookup_message_info_slot(key) else {
1124 return;
1125 };
1126 slot.valid.store(false, Ordering::Release);
1127 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1128 {
1129 slot.validate_requested.store(false, Ordering::Release);
1130 slot.integrity_valid.store(false, Ordering::Release);
1131 }
1132 slot.key.store(0, Ordering::Release);
1133}
1134
1135#[deprecated(
1154 since = "0.2.0",
1155 note = "use nros_rmw_cffi_register_named with the backend's canonical name; the unnamed shim will be removed"
1156)]
1157#[unsafe(no_mangle)]
1158pub unsafe extern "C" fn nros_rmw_cffi_register(vtable: *const NrosRmwVtable) -> NrosRmwRet {
1159 unsafe { nros_rmw_cffi_register_named(c"default".as_ptr(), vtable) }
1160}
1161
1162#[unsafe(no_mangle)]
1187pub unsafe extern "C" fn nros_rmw_cffi_register_named(
1188 name: *const core::ffi::c_char,
1189 vtable: *const NrosRmwVtable,
1190) -> NrosRmwRet {
1191 if name.is_null() || vtable.is_null() {
1192 return NROS_RMW_RET_INVALID_ARGUMENT;
1193 }
1194 let name_u8 = name.cast::<u8>();
1195
1196 let mut len = 0usize;
1199 while len < BACKEND_NAME_MAX {
1200 let b = unsafe { *name_u8.add(len) };
1201 if b == 0 {
1202 break;
1203 }
1204 len += 1;
1205 }
1206 if len == 0 {
1207 return NROS_RMW_RET_INVALID_ARGUMENT;
1208 }
1209 if unsafe { *name_u8.add(len) } != 0 {
1211 return NROS_RMW_RET_INVALID_ARGUMENT;
1212 }
1213
1214 let name_bytes = unsafe { core::slice::from_raw_parts(name_u8, len) };
1215
1216 let current_len = registry().len.load(Ordering::Acquire);
1218 for i in 0..current_len {
1219 let slot = unsafe { registry().slot(i) };
1221 if slot.name_matches(name_bytes) {
1222 unsafe {
1226 let slot_mut = registry().slot_mut(i);
1227 slot_mut.vtable = vtable;
1228 }
1229 core::sync::atomic::fence(Ordering::Release);
1230 return NROS_RMW_RET_OK;
1231 }
1232 }
1233
1234 let idx = registry().len.fetch_add(1, Ordering::AcqRel);
1236 if idx >= MAX_BACKENDS {
1237 registry().len.store(MAX_BACKENDS, Ordering::Release);
1241 return NROS_RMW_RET_ERROR;
1242 }
1243
1244 unsafe {
1246 let slot = registry().slot_mut(idx);
1247 slot.name[..len].copy_from_slice(name_bytes);
1248 slot.name[len] = 0;
1249 slot.vtable = vtable;
1250 }
1251 core::sync::atomic::fence(Ordering::Release);
1254 NROS_RMW_RET_OK
1255}
1256
1257#[unsafe(no_mangle)]
1264pub unsafe extern "C" fn nros_rmw_cffi_lookup(
1265 name: *const core::ffi::c_char,
1266) -> *const NrosRmwVtable {
1267 if name.is_null() {
1268 return core::ptr::null();
1269 }
1270 let name_u8 = name.cast::<u8>();
1271 let mut len = 0usize;
1272 while len < BACKEND_NAME_MAX {
1273 if unsafe { *name_u8.add(len) } == 0 {
1274 break;
1275 }
1276 len += 1;
1277 }
1278 if len == 0 || len == BACKEND_NAME_MAX {
1279 return core::ptr::null();
1280 }
1281 let name_bytes = unsafe { core::slice::from_raw_parts(name_u8, len) };
1282
1283 let current_len = registry().len.load(Ordering::Acquire);
1284 for i in 0..current_len {
1285 let slot = unsafe { registry().slot(i) };
1288 if slot.name_matches(name_bytes) {
1289 return slot.vtable;
1290 }
1291 }
1292 core::ptr::null()
1293}
1294
1295#[unsafe(no_mangle)]
1304pub unsafe extern "C" fn nros_rmw_cffi_registered_names(
1305 buf: *mut *const core::ffi::c_char,
1306 cap: usize,
1307) -> usize {
1308 let n = registry().len.load(Ordering::Acquire);
1309 if !buf.is_null() && cap > 0 {
1310 let limit = n.min(cap);
1311 for i in 0..limit {
1312 let slot = unsafe { registry().slot(i) };
1314 unsafe {
1315 buf.add(i)
1316 .write(slot.name.as_ptr() as *const core::ffi::c_char)
1317 };
1318 }
1319 }
1320 n
1321}
1322
1323#[inline]
1328pub fn backend_registered() -> bool {
1329 registry().len.load(Ordering::Acquire) > 0
1330}
1331
1332fn default_vtable() -> Option<&'static NrosRmwVtable> {
1337 let n = registry().len.load(Ordering::Acquire);
1338 if n == 0 {
1339 return None;
1340 }
1341 let slot = unsafe { registry().slot(0) };
1344 if slot.vtable.is_null() {
1345 return None;
1346 }
1347 Some(unsafe { &*slot.vtable })
1348}
1349
1350pub enum BackendResolution {
1352 Single(&'static NrosRmwVtable),
1354 NoBackend,
1357 Ambiguous,
1360 Unknown,
1363}
1364
1365pub fn resolve_backend(selector: Option<&[u8]>) -> BackendResolution {
1383 let n = registry().len.load(Ordering::Acquire);
1384 if let Some(name) = selector {
1385 let mut i = 0usize;
1386 while i < n {
1387 let slot = unsafe { registry().slot(i) };
1389 if slot.name_matches(name) {
1390 if slot.vtable.is_null() {
1391 return BackendResolution::Unknown;
1392 }
1393 return BackendResolution::Single(unsafe { &*slot.vtable });
1394 }
1395 i += 1;
1396 }
1397 return BackendResolution::Unknown;
1398 }
1399 match n {
1400 0 => BackendResolution::NoBackend,
1401 1 => default_vtable()
1402 .map(BackendResolution::Single)
1403 .unwrap_or(BackendResolution::NoBackend),
1404 _ => BackendResolution::Ambiguous,
1405 }
1406}
1407
1408pub fn backend_resolution_to_ret(res: &BackendResolution) -> NrosRmwRet {
1413 match res {
1414 BackendResolution::Single(_) => NROS_RMW_RET_OK,
1415 BackendResolution::NoBackend => NROS_RMW_RET_NO_BACKEND,
1416 BackendResolution::Ambiguous => NROS_RMW_RET_AMBIGUOUS_BACKEND,
1417 BackendResolution::Unknown => NROS_RMW_RET_UNKNOWN_BACKEND,
1418 }
1419}
1420
1421#[unsafe(no_mangle)]
1439pub unsafe extern "C" fn nros_rmw_cffi_set_custom_transport(
1440 ops: *const nros_rmw::NrosTransportOps,
1441) -> NrosRmwRet {
1442 if ops.is_null() {
1443 let _ = unsafe { nros_rmw::set_custom_transport(None) };
1445 return NROS_RMW_RET_OK;
1446 }
1447 let copy = unsafe { *ops };
1448 match unsafe { nros_rmw::set_custom_transport(Some(copy)) } {
1449 Ok(()) => NROS_RMW_RET_OK,
1450 Err(e) => ret_from_error(&e),
1451 }
1452}
1453
1454fn get_vtable() -> Result<&'static NrosRmwVtable, TransportError> {
1455 default_vtable().ok_or(TransportError::InvalidArgument)
1460}
1461
1462fn to_c_str<const N: usize>(s: &str, buf: &mut [u8; N]) -> *const u8 {
1469 let len = s.len().min(N - 1);
1470 buf[..len].copy_from_slice(&s.as_bytes()[..len]);
1471 buf[len] = 0;
1472 buf.as_ptr()
1473}
1474
1475fn cstr_buf_to_str<const N: usize>(buf: &[u8; N]) -> &str {
1481 let len = buf.iter().position(|&b| b == 0).unwrap_or(N);
1482 core::str::from_utf8(&buf[..len]).unwrap_or("")
1486}
1487
1488const NAME_BUF_LEN: usize = 256;
1510const HASH_BUF_LEN: usize = 128;
1511
1512pub struct CffiSession {
1514 vtable: &'static NrosRmwVtable,
1515 node_name_buf: [u8; NAME_BUF_LEN],
1517 namespace_buf: [u8; NAME_BUF_LEN],
1521 backend_data: *mut c_void,
1523}
1524
1525impl CffiSession {
1526 fn make_view(&mut self) -> NrosRmwSession {
1527 NrosRmwSession {
1528 node_name: self.node_name_buf.as_ptr(),
1529 namespace_: self.namespace_buf.as_ptr(),
1530 _reserved: [0u8; 8],
1531 backend_data: self.backend_data,
1532 }
1533 }
1534
1535 pub fn node_name(&self) -> &str {
1537 cstr_buf_to_str(&self.node_name_buf)
1538 }
1539
1540 pub fn open(
1547 locator: &str,
1548 mode: u8,
1549 domain_id: u32,
1550 node_name: &str,
1551 ) -> Result<Self, TransportError> {
1552 let vtable = get_vtable()?;
1553 Self::open_with_vtable(vtable, locator, mode, domain_id, node_name)
1554 }
1555
1556 pub fn open_named(
1561 rmw_name: &str,
1562 locator: &str,
1563 mode: u8,
1564 domain_id: u32,
1565 node_name: &str,
1566 ) -> Result<Self, TransportError> {
1567 let mut name_buf = [0u8; BACKEND_NAME_MAX];
1570 if rmw_name.len() >= BACKEND_NAME_MAX {
1571 return Err(TransportError::InvalidArgument);
1572 }
1573 name_buf[..rmw_name.len()].copy_from_slice(rmw_name.as_bytes());
1574 let raw = unsafe { nros_rmw_cffi_lookup(name_buf.as_ptr() as *const _) };
1576 if raw.is_null() {
1577 return Err(TransportError::InvalidArgument);
1578 }
1579 let vtable = unsafe { &*raw };
1581 Self::open_with_vtable(vtable, locator, mode, domain_id, node_name)
1582 }
1583
1584 fn open_with_vtable(
1585 vtable: &'static NrosRmwVtable,
1586 locator: &str,
1587 mode: u8,
1588 domain_id: u32,
1589 node_name: &str,
1590 ) -> Result<Self, TransportError> {
1591 let mut loc_buf = [0u8; NAME_BUF_LEN];
1592 let loc_ptr = to_c_str(locator, &mut loc_buf);
1593
1594 let mut session = Self {
1595 vtable,
1596 node_name_buf: [0u8; NAME_BUF_LEN],
1597 namespace_buf: [0u8; NAME_BUF_LEN],
1598 backend_data: core::ptr::null_mut(),
1599 };
1600 let _ = to_c_str(node_name, &mut session.node_name_buf);
1601
1602 let mut view = NrosRmwSession {
1603 node_name: session.node_name_buf.as_ptr(),
1604 namespace_: session.namespace_buf.as_ptr(),
1605 _reserved: [0u8; 8],
1606 backend_data: core::ptr::null_mut(),
1607 };
1608 let ret = unsafe {
1609 (vtable.open)(
1610 loc_ptr,
1611 mode,
1612 domain_id,
1613 session.node_name_buf.as_ptr(),
1614 &mut view,
1615 )
1616 };
1617 #[cfg(feature = "std")]
1623 if std::env::var_os("NROS_RMW_TRACE_OPEN").is_some() {
1624 std::eprintln!(
1625 "[nros-rmw-cffi] open: locator={locator:?} mode={mode} ret={ret} backend_data={:p}",
1626 view.backend_data,
1627 );
1628 }
1629 if ret != NROS_RMW_RET_OK {
1630 return Err(error_from_ret(ret));
1631 }
1632 if view.backend_data.is_null() {
1633 return Err(TransportError::ConnectionFailed);
1634 }
1635 session.backend_data = view.backend_data;
1636 Ok(session)
1637 }
1638}
1639
1640impl Session for CffiSession {
1641 type Error = TransportError;
1642 type PublisherHandle = CffiPublisher;
1643 type SubscriberHandle = CffiSubscriber;
1644 type ServiceServerHandle = CffiServiceServer;
1645 type ServiceClientHandle = CffiServiceClient;
1646
1647 fn create_publisher(
1648 &mut self,
1649 topic: &TopicInfo,
1650 qos: QosSettings,
1651 ) -> Result<CffiPublisher, TransportError> {
1652 let mut hash_buf = [0u8; HASH_BUF_LEN];
1653 let hash_ptr = to_c_str(topic.type_hash, &mut hash_buf);
1654 let qos_struct = NrosRmwQos::from(qos);
1655
1656 let mut pub_state = CffiPublisher {
1657 vtable: self.vtable,
1658 topic_name_buf: [0u8; NAME_BUF_LEN],
1659 type_name_buf: [0u8; NAME_BUF_LEN],
1660 qos: qos_struct,
1661 can_loan_messages: false,
1662 backend_data: core::ptr::null_mut(),
1663 };
1664 let topic_ptr = to_c_str(topic.name, &mut pub_state.topic_name_buf);
1665 let type_ptr = to_c_str(topic.type_name, &mut pub_state.type_name_buf);
1666
1667 let mut view = NrosRmwPublisher {
1668 topic_name: topic_ptr,
1669 type_name: type_ptr,
1670 qos: qos_struct,
1671 can_loan_messages: false,
1672 _reserved: [0u8; 7],
1673 backend_data: core::ptr::null_mut(),
1674 };
1675 let mut session_view = self.make_view();
1676 let ret = unsafe {
1677 (self.vtable.create_publisher)(
1678 &mut session_view,
1679 topic_ptr,
1680 type_ptr,
1681 hash_ptr,
1682 topic.domain_id,
1683 &qos_struct,
1684 &mut view,
1685 )
1686 };
1687 if ret != NROS_RMW_RET_OK {
1688 return Err(error_from_ret(ret));
1689 }
1690 if view.backend_data.is_null() {
1691 return Err(TransportError::PublisherCreationFailed);
1692 }
1693 pub_state.backend_data = view.backend_data;
1694 pub_state.can_loan_messages = view.can_loan_messages;
1695 Ok(pub_state)
1696 }
1697
1698 fn create_subscriber(
1699 &mut self,
1700 topic: &TopicInfo,
1701 qos: QosSettings,
1702 ) -> Result<CffiSubscriber, TransportError> {
1703 let mut hash_buf = [0u8; HASH_BUF_LEN];
1704 let hash_ptr = to_c_str(topic.type_hash, &mut hash_buf);
1705 let mut qos_struct = NrosRmwQos::from(qos);
1706 qos_struct.rx_buffer_hint = topic.rx_buffer_hint.min(u32::MAX as usize) as u32;
1709
1710 let mut sub_state = CffiSubscriber {
1711 vtable: self.vtable,
1712 topic_name_buf: [0u8; NAME_BUF_LEN],
1713 type_name_buf: [0u8; NAME_BUF_LEN],
1714 qos: qos_struct,
1715 can_loan_messages: false,
1716 backend_data: core::ptr::null_mut(),
1717 supports_in_place: false,
1718 };
1719 let topic_ptr = to_c_str(topic.name, &mut sub_state.topic_name_buf);
1720 let type_ptr = to_c_str(topic.type_name, &mut sub_state.type_name_buf);
1721
1722 let mut view = NrosRmwSubscriber {
1723 topic_name: topic_ptr,
1724 type_name: type_ptr,
1725 qos: qos_struct,
1726 can_loan_messages: false,
1727 _reserved: [0u8; 7],
1728 backend_data: core::ptr::null_mut(),
1729 };
1730 let mut session_view = self.make_view();
1731 let ret = unsafe {
1732 (self.vtable.create_subscriber)(
1733 &mut session_view,
1734 topic_ptr,
1735 type_ptr,
1736 hash_ptr,
1737 topic.domain_id,
1738 &qos_struct,
1739 &mut view,
1740 )
1741 };
1742 if ret != NROS_RMW_RET_OK {
1743 return Err(error_from_ret(ret));
1744 }
1745 if view.backend_data.is_null() {
1746 return Err(TransportError::SubscriberCreationFailed);
1747 }
1748 sub_state.backend_data = view.backend_data;
1749 sub_state.can_loan_messages = view.can_loan_messages;
1750 sub_state.supports_in_place = match sub_state.vtable.subscriber_supports_in_place {
1752 Some(f) => {
1753 let mut v = sub_state.make_view();
1754 unsafe { f(&mut v) == 1 }
1755 }
1756 None => false,
1757 };
1758 Ok(sub_state)
1759 }
1760
1761 fn create_service_server(
1762 &mut self,
1763 service: &ServiceInfo,
1764 qos: QosSettings,
1765 ) -> Result<CffiServiceServer, TransportError> {
1766 let qos_struct = NrosRmwQos::from(qos);
1767 let mut hash_buf = [0u8; HASH_BUF_LEN];
1768 let hash_ptr = to_c_str(service.type_hash, &mut hash_buf);
1769
1770 let mut srv_state = CffiServiceServer {
1771 vtable: self.vtable,
1772 service_name_buf: [0u8; NAME_BUF_LEN],
1773 type_name_buf: [0u8; NAME_BUF_LEN],
1774 backend_data: core::ptr::null_mut(),
1775 };
1776 let svc_ptr = to_c_str(service.name, &mut srv_state.service_name_buf);
1777 let type_ptr = to_c_str(service.type_name, &mut srv_state.type_name_buf);
1778
1779 let mut view = NrosRmwServiceServer {
1780 service_name: svc_ptr,
1781 type_name: type_ptr,
1782 _reserved: [0u8; 8],
1783 backend_data: core::ptr::null_mut(),
1784 };
1785 let mut session_view = self.make_view();
1786 let ret = unsafe {
1787 (self.vtable.create_service_server)(
1788 &mut session_view,
1789 svc_ptr,
1790 type_ptr,
1791 hash_ptr,
1792 service.domain_id,
1793 &qos_struct,
1794 &mut view,
1795 )
1796 };
1797 if ret != NROS_RMW_RET_OK {
1798 return Err(error_from_ret(ret));
1799 }
1800 if view.backend_data.is_null() {
1801 return Err(TransportError::ServiceServerCreationFailed);
1802 }
1803 srv_state.backend_data = view.backend_data;
1804 Ok(srv_state)
1805 }
1806
1807 fn create_service_client(
1808 &mut self,
1809 service: &ServiceInfo,
1810 qos: QosSettings,
1811 ) -> Result<CffiServiceClient, TransportError> {
1812 let qos_struct = NrosRmwQos::from(qos);
1813 let mut hash_buf = [0u8; HASH_BUF_LEN];
1814 let hash_ptr = to_c_str(service.type_hash, &mut hash_buf);
1815
1816 let mut cli_state = CffiServiceClient {
1817 vtable: self.vtable,
1818 service_name_buf: [0u8; NAME_BUF_LEN],
1819 type_name_buf: [0u8; NAME_BUF_LEN],
1820 backend_data: core::ptr::null_mut(),
1821 pending_len: 0,
1822 };
1823 let svc_ptr = to_c_str(service.name, &mut cli_state.service_name_buf);
1824 let type_ptr = to_c_str(service.type_name, &mut cli_state.type_name_buf);
1825
1826 let mut view = NrosRmwServiceClient {
1827 service_name: svc_ptr,
1828 type_name: type_ptr,
1829 _reserved: [0u8; 8],
1830 backend_data: core::ptr::null_mut(),
1831 };
1832 let mut session_view = self.make_view();
1833 let ret = unsafe {
1834 (self.vtable.create_service_client)(
1835 &mut session_view,
1836 svc_ptr,
1837 type_ptr,
1838 hash_ptr,
1839 service.domain_id,
1840 &qos_struct,
1841 &mut view,
1842 )
1843 };
1844 if ret != NROS_RMW_RET_OK {
1845 return Err(error_from_ret(ret));
1846 }
1847 if view.backend_data.is_null() {
1848 return Err(TransportError::ServiceClientCreationFailed);
1849 }
1850 cli_state.backend_data = view.backend_data;
1851 Ok(cli_state)
1852 }
1853
1854 fn close(&mut self) -> Result<(), TransportError> {
1855 if self.backend_data.is_null() {
1856 return Ok(());
1857 }
1858 let mut view = self.make_view();
1859 let ret = unsafe { (self.vtable.close)(&mut view) };
1860 if ret != NROS_RMW_RET_OK {
1861 return Err(error_from_ret(ret));
1862 }
1863 self.backend_data = core::ptr::null_mut();
1864 Ok(())
1865 }
1866
1867 fn drive_io(&mut self, timeout_ms: i32) -> Result<(), TransportError> {
1868 let mut view = self.make_view();
1869 let ret = unsafe { (self.vtable.drive_io)(&mut view, timeout_ms) };
1870 if ret != NROS_RMW_RET_OK {
1871 return Err(error_from_ret(ret));
1872 }
1873 Ok(())
1874 }
1875
1876 fn next_deadline_ms(&self) -> Option<u32> {
1877 let f = self.vtable.next_deadline_ms?;
1878 let view = NrosRmwSession {
1882 node_name: self.node_name_buf.as_ptr(),
1883 namespace_: self.namespace_buf.as_ptr(),
1884 _reserved: [0u8; 8],
1885 backend_data: self.backend_data,
1886 };
1887 let ret = unsafe { f(&view as *const _) };
1888 if ret < 0 { None } else { Some(ret as u32) }
1889 }
1890
1891 unsafe fn set_wake_callback(
1892 &mut self,
1893 cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
1894 ctx: *mut core::ffi::c_void,
1895 ) {
1896 let Some(f) = self.vtable.set_wake_callback else {
1897 return;
1898 };
1899 let mut view = NrosRmwSession {
1900 node_name: self.node_name_buf.as_ptr(),
1901 namespace_: self.namespace_buf.as_ptr(),
1902 _reserved: [0u8; 8],
1903 backend_data: self.backend_data,
1904 };
1905 let _ = unsafe { f(&mut view as *mut _, cb, ctx) };
1908 }
1909
1910 fn supports_wake_callback(&self) -> bool {
1911 self.vtable.set_wake_callback.is_some()
1916 }
1917
1918 fn ping_session(&mut self, timeout_ms: i32) -> Result<(), TransportError> {
1919 let Some(f) = self.vtable.ping_session else {
1924 return Err(TransportError::Unsupported);
1925 };
1926 let mut view = self.make_view();
1927 let rc = unsafe { f(&mut view, timeout_ms) };
1928 if rc == NROS_RMW_RET_OK {
1929 Ok(())
1930 } else {
1931 Err(error_from_ret(rc))
1932 }
1933 }
1934
1935 fn supported_qos_policies(&self) -> nros_rmw::QosPolicyMask {
1950 use nros_rmw::QosPolicyMask;
1951 QosPolicyMask::CORE
1952 | QosPolicyMask::DURABILITY_TRANSIENT_LOCAL
1953 | QosPolicyMask::AVOID_ROS_NAMESPACE_CONVENTIONS
1954 | QosPolicyMask::DEADLINE
1955 | QosPolicyMask::LIFESPAN
1956 | QosPolicyMask::LIVELINESS_AUTOMATIC
1957 | QosPolicyMask::LIVELINESS_MANUAL_BY_TOPIC
1958 | QosPolicyMask::LIVELINESS_MANUAL_BY_NODE
1959 | QosPolicyMask::LIVELINESS_LEASE
1960 }
1961}
1962
1963impl Drop for CffiSession {
1964 fn drop(&mut self) {
1965 if !self.backend_data.is_null() {
1966 let mut view = self.make_view();
1967 unsafe { (self.vtable.close)(&mut view) };
1968 }
1969 }
1970}
1971
1972pub struct CffiPublisher {
1978 vtable: &'static NrosRmwVtable,
1979 topic_name_buf: [u8; NAME_BUF_LEN],
1980 type_name_buf: [u8; NAME_BUF_LEN],
1981 qos: NrosRmwQos,
1982 can_loan_messages: bool,
1983 backend_data: *mut c_void,
1984}
1985
1986impl CffiPublisher {
1987 fn make_view(&mut self) -> NrosRmwPublisher {
1988 NrosRmwPublisher {
1989 topic_name: self.topic_name_buf.as_ptr(),
1990 type_name: self.type_name_buf.as_ptr(),
1991 qos: self.qos,
1992 can_loan_messages: self.can_loan_messages,
1993 _reserved: [0u8; 7],
1994 backend_data: self.backend_data,
1995 }
1996 }
1997
1998 pub fn topic_name(&self) -> &str {
2001 cstr_buf_to_str(&self.topic_name_buf)
2002 }
2003
2004 pub fn type_name(&self) -> &str {
2006 cstr_buf_to_str(&self.type_name_buf)
2007 }
2008
2009 pub fn qos(&self) -> NrosRmwQos {
2011 self.qos
2012 }
2013
2014 pub fn can_loan_messages(&self) -> bool {
2017 self.can_loan_messages
2018 }
2019}
2020
2021#[cfg(feature = "lending")]
2026pub struct CffiSlot<'a> {
2027 buf: *mut u8,
2028 cap: usize,
2029 cursor: usize,
2030 token: *mut c_void,
2031 publisher: Option<&'a CffiPublisher>,
2034 fallback: bool,
2040}
2041
2042#[cfg(feature = "lending")]
2043impl<'a> CffiSlot<'a> {
2044 pub fn set_len(&mut self, len: usize) {
2048 debug_assert!(len <= self.cap);
2049 self.cursor = len.min(self.cap);
2050 }
2051}
2052
2053#[cfg(all(feature = "lending", feature = "alloc"))]
2060struct ArenaStaging {
2061 buf: alloc::vec::Vec<u8>,
2062}
2063
2064#[cfg(feature = "lending")]
2065impl<'a> AsMut<[u8]> for CffiSlot<'a> {
2066 fn as_mut(&mut self) -> &mut [u8] {
2067 unsafe { core::slice::from_raw_parts_mut(self.buf, self.cap) }
2072 }
2073}
2074
2075#[cfg(feature = "lending")]
2076impl<'a> Drop for CffiSlot<'a> {
2077 fn drop(&mut self) {
2078 if self.publisher.is_none() {
2079 return;
2081 }
2082 if self.fallback {
2083 #[cfg(feature = "alloc")]
2085 unsafe {
2086 let _ = alloc::boxed::Box::from_raw(self.token as *mut ArenaStaging);
2087 }
2088 return;
2089 }
2090 if let Some(p) = self.publisher
2091 && let Some(discard) = p.vtable.pub_discard
2092 {
2093 let mut view = NrosRmwPublisher {
2097 topic_name: p.topic_name_buf.as_ptr(),
2098 type_name: p.type_name_buf.as_ptr(),
2099 qos: p.qos,
2100 can_loan_messages: p.can_loan_messages,
2101 _reserved: [0u8; 7],
2102 backend_data: p.backend_data,
2103 };
2104 unsafe { discard(&mut view, self.token) };
2108 }
2109 }
2110}
2111
2112#[cfg(feature = "lending")]
2113impl nros_rmw::SlotLending for CffiPublisher {
2114 type Slot<'a> = CffiSlot<'a>;
2115
2116 fn try_lend_slot(&self, len: usize) -> Result<Option<CffiSlot<'_>>, TransportError> {
2117 let Some(loan) = self.vtable.pub_loan else {
2118 #[cfg(feature = "alloc")]
2125 {
2126 let mut staging = alloc::boxed::Box::new(ArenaStaging {
2127 buf: alloc::vec![0u8; len],
2128 });
2129 let buf_ptr = staging.buf.as_mut_ptr();
2130 let token = alloc::boxed::Box::into_raw(staging) as *mut c_void;
2131 return Ok(Some(CffiSlot {
2132 buf: buf_ptr,
2133 cap: len,
2134 cursor: len,
2135 token,
2136 publisher: Some(self),
2137 fallback: true,
2138 }));
2139 }
2140 #[cfg(not(feature = "alloc"))]
2141 {
2142 let _ = len;
2143 return Ok(None);
2144 }
2145 };
2146 let mut view = NrosRmwPublisher {
2147 topic_name: self.topic_name_buf.as_ptr(),
2148 type_name: self.type_name_buf.as_ptr(),
2149 qos: self.qos,
2150 can_loan_messages: self.can_loan_messages,
2151 _reserved: [0u8; 7],
2152 backend_data: self.backend_data,
2153 };
2154 let mut out_buf: *mut u8 = core::ptr::null_mut();
2155 let mut out_cap: usize = 0;
2156 let mut out_token: *mut c_void = core::ptr::null_mut();
2157 let ret = unsafe { loan(&mut view, len, &mut out_buf, &mut out_cap, &mut out_token) };
2160 if ret == NROS_RMW_RET_WOULD_BLOCK || ret == NROS_RMW_RET_NO_DATA {
2161 return Ok(None);
2162 }
2163 if ret != NROS_RMW_RET_OK {
2164 return Err(error_from_ret(ret));
2165 }
2166 if out_buf.is_null() || out_cap < len {
2167 if let Some(discard) = self.vtable.pub_discard {
2170 unsafe { discard(&mut view, out_token) };
2171 }
2172 return Ok(None);
2173 }
2174 Ok(Some(CffiSlot {
2175 buf: out_buf,
2176 cap: out_cap,
2177 cursor: len,
2178 token: out_token,
2179 publisher: Some(self),
2180 fallback: false,
2181 }))
2182 }
2183
2184 fn commit_slot(&self, mut slot: CffiSlot<'_>) -> Result<(), TransportError> {
2185 let publisher = slot
2187 .publisher
2188 .take()
2189 .ok_or(TransportError::InvalidArgument)?;
2190 debug_assert!(core::ptr::eq(publisher, self));
2191 if slot.fallback {
2192 #[cfg(feature = "alloc")]
2196 {
2197 let staging =
2200 unsafe { alloc::boxed::Box::from_raw(slot.token as *mut ArenaStaging) };
2201 let bytes = &staging.buf[..slot.cursor.min(staging.buf.len())];
2202 return Publisher::publish_raw(self, bytes);
2203 }
2204 #[cfg(not(feature = "alloc"))]
2205 {
2206 return Err(TransportError::Unsupported);
2207 }
2208 }
2209 let commit = self.vtable.pub_commit.ok_or(TransportError::Unsupported)?;
2210 let mut view = NrosRmwPublisher {
2211 topic_name: self.topic_name_buf.as_ptr(),
2212 type_name: self.type_name_buf.as_ptr(),
2213 qos: self.qos,
2214 can_loan_messages: self.can_loan_messages,
2215 _reserved: [0u8; 7],
2216 backend_data: self.backend_data,
2217 };
2218 let len = slot.cursor;
2219 let token = slot.token;
2220 let ret = unsafe { commit(&mut view, token, len) };
2223 if ret != NROS_RMW_RET_OK {
2224 return Err(error_from_ret(ret));
2225 }
2226 Ok(())
2227 }
2228}
2229
2230impl Publisher for CffiPublisher {
2231 type Error = TransportError;
2232
2233 fn publish_raw(&self, data: &[u8]) -> Result<(), TransportError> {
2234 let mut view = NrosRmwPublisher {
2235 topic_name: self.topic_name_buf.as_ptr(),
2236 type_name: self.type_name_buf.as_ptr(),
2237 qos: self.qos,
2238 can_loan_messages: self.can_loan_messages,
2239 _reserved: [0u8; 7],
2240 backend_data: self.backend_data,
2241 };
2242 let ret = unsafe { (self.vtable.publish_raw)(&mut view, data.as_ptr(), data.len()) };
2243 if ret != NROS_RMW_RET_OK {
2244 return Err(error_from_ret(ret));
2245 }
2246 Ok(())
2247 }
2248
2249 unsafe fn publish_streamed(
2250 &self,
2251 size_cb: unsafe extern "C" fn(out_total_len: *mut usize, user_ctx: *mut core::ffi::c_void),
2252 chunk_cb: unsafe extern "C" fn(
2253 out_buf: *mut u8,
2254 cap: usize,
2255 out_written: *mut usize,
2256 user_ctx: *mut core::ffi::c_void,
2257 ),
2258 user_ctx: *mut core::ffi::c_void,
2259 ) -> Result<(), TransportError> {
2260 if let Some(f) = self.vtable.publish_streamed {
2267 let mut view = NrosRmwPublisher {
2268 topic_name: self.topic_name_buf.as_ptr(),
2269 type_name: self.type_name_buf.as_ptr(),
2270 qos: self.qos,
2271 can_loan_messages: self.can_loan_messages,
2272 _reserved: [0u8; 7],
2273 backend_data: self.backend_data,
2274 };
2275 let ret = unsafe { f(&mut view, size_cb, chunk_cb, user_ctx) };
2276 if ret != NROS_RMW_RET_OK {
2277 return Err(error_from_ret(ret));
2278 }
2279 return Ok(());
2280 }
2281 const STAGE_CAP: usize = 4096;
2286 let mut total = 0usize;
2287 unsafe { size_cb(&mut total as *mut usize, user_ctx) };
2288 if total > STAGE_CAP {
2289 return Err(TransportError::BufferTooSmall);
2290 }
2291 let mut stage = [0u8; STAGE_CAP];
2292 let mut written_so_far = 0usize;
2293 while written_so_far < total {
2294 let mut chunk_written = 0usize;
2295 let remaining = total - written_so_far;
2296 unsafe {
2297 chunk_cb(
2298 stage.as_mut_ptr().add(written_so_far),
2299 remaining,
2300 &mut chunk_written as *mut usize,
2301 user_ctx,
2302 );
2303 }
2304 if chunk_written == 0 {
2305 return Err(TransportError::BufferTooSmall);
2306 }
2307 written_so_far += chunk_written;
2308 }
2309 self.publish_raw(&stage[..total])
2310 }
2311
2312 fn buffer_error(&self) -> TransportError {
2313 TransportError::BufferTooSmall
2314 }
2315
2316 fn serialization_error(&self) -> TransportError {
2317 TransportError::SerializationError
2318 }
2319
2320 fn unsupported_event_error(&self) -> TransportError {
2321 TransportError::Unsupported
2322 }
2323
2324 unsafe fn register_event_callback(
2325 &mut self,
2326 kind: nros_rmw::EventKind,
2327 deadline_ms: u32,
2328 cb: nros_rmw::EventCallback,
2329 user_ctx: *mut core::ffi::c_void,
2330 ) -> Result<(), TransportError> {
2331 let mut view = NrosRmwPublisher {
2332 topic_name: self.topic_name_buf.as_ptr(),
2333 type_name: self.type_name_buf.as_ptr(),
2334 qos: self.qos,
2335 can_loan_messages: self.can_loan_messages,
2336 _reserved: [0u8; 7],
2337 backend_data: self.backend_data,
2338 };
2339 let cb: NrosRmwEventCallback =
2343 unsafe { core::mem::transmute::<nros_rmw::EventCallback, NrosRmwEventCallback>(cb) };
2344 let ret = unsafe {
2345 (self.vtable.register_publisher_event)(
2346 &mut view,
2347 kind.into(),
2348 deadline_ms,
2349 cb,
2350 user_ctx,
2351 )
2352 };
2353 if ret != NROS_RMW_RET_OK {
2354 return Err(error_from_ret(ret));
2355 }
2356 Ok(())
2357 }
2358
2359 fn assert_liveliness(&self) -> Result<(), TransportError> {
2360 let view_ptr = self as *const _ as *mut Self;
2365 let mut view = unsafe { (*view_ptr).make_view() };
2366 let ret = unsafe { (self.vtable.assert_publisher_liveliness)(&mut view) };
2367 if ret != NROS_RMW_RET_OK {
2368 return Err(error_from_ret(ret));
2369 }
2370 Ok(())
2371 }
2372}
2373
2374impl Drop for CffiPublisher {
2375 fn drop(&mut self) {
2376 if !self.backend_data.is_null() {
2377 let mut view = self.make_view();
2378 unsafe { (self.vtable.destroy_publisher)(&mut view) };
2379 }
2380 }
2381}
2382
2383pub struct CffiSubscriber {
2389 vtable: &'static NrosRmwVtable,
2390 topic_name_buf: [u8; NAME_BUF_LEN],
2391 type_name_buf: [u8; NAME_BUF_LEN],
2392 qos: NrosRmwQos,
2393 can_loan_messages: bool,
2394 backend_data: *mut c_void,
2395 supports_in_place: bool,
2398}
2399
2400impl CffiSubscriber {
2401 fn make_view(&mut self) -> NrosRmwSubscriber {
2402 NrosRmwSubscriber {
2403 topic_name: self.topic_name_buf.as_ptr(),
2404 type_name: self.type_name_buf.as_ptr(),
2405 qos: self.qos,
2406 can_loan_messages: self.can_loan_messages,
2407 _reserved: [0u8; 7],
2408 backend_data: self.backend_data,
2409 }
2410 }
2411
2412 fn run_process_in_place<G: FnOnce(&[u8])>(&mut self, f: G) -> Result<bool, TransportError> {
2418 let Some(slot) = self.vtable.process_raw_in_place else {
2419 return Err(TransportError::MessageTooLarge);
2420 };
2421 unsafe extern "C" fn cb_tramp<G: FnOnce(&[u8])>(
2422 ctx: *mut c_void,
2423 ptr: *const u8,
2424 len: usize,
2425 ) {
2426 let cell = unsafe { &mut *(ctx as *mut Option<G>) };
2427 if let Some(g) = cell.take() {
2428 g(unsafe { core::slice::from_raw_parts(ptr, len) });
2429 }
2430 }
2431 let mut cell: Option<G> = Some(f);
2432 let mut view = self.make_view();
2433 let rc = unsafe {
2434 slot(
2435 &mut view,
2436 &mut cell as *mut Option<G> as *mut c_void,
2437 cb_tramp::<G>,
2438 )
2439 };
2440 if rc == NROS_RMW_RET_NO_DATA {
2441 Ok(false)
2442 } else if rc < 0 {
2443 Err(error_from_ret(rc))
2444 } else {
2445 Ok(rc > 0)
2446 }
2447 }
2448
2449 pub fn topic_name(&self) -> &str {
2450 cstr_buf_to_str(&self.topic_name_buf)
2451 }
2452
2453 pub fn type_name(&self) -> &str {
2454 cstr_buf_to_str(&self.type_name_buf)
2455 }
2456
2457 pub fn qos(&self) -> NrosRmwQos {
2458 self.qos
2459 }
2460
2461 pub fn can_loan_messages(&self) -> bool {
2464 self.can_loan_messages
2465 }
2466}
2467
2468#[cfg(feature = "lending")]
2472pub struct CffiView<'a> {
2473 buf: *const u8,
2474 len: usize,
2475 token: *mut c_void,
2476 subscriber: Option<&'a mut CffiSubscriber>,
2477}
2478
2479#[cfg(feature = "lending")]
2480impl<'a> AsRef<[u8]> for CffiView<'a> {
2481 fn as_ref(&self) -> &[u8] {
2482 unsafe { core::slice::from_raw_parts(self.buf, self.len) }
2487 }
2488}
2489
2490#[cfg(feature = "lending")]
2491impl<'a> Drop for CffiView<'a> {
2492 fn drop(&mut self) {
2493 if let Some(sub) = self.subscriber.take()
2494 && let Some(release) = sub.vtable.sub_release
2495 {
2496 let mut view = sub.make_view();
2497 unsafe { release(&mut view, self.token) };
2500 }
2501 }
2502}
2503
2504#[cfg(feature = "lending")]
2505impl nros_rmw::SlotBorrowing for CffiSubscriber {
2506 type View<'a> = CffiView<'a>;
2507
2508 fn try_borrow(&mut self) -> Result<Option<CffiView<'_>>, TransportError> {
2509 let Some(borrow) = self.vtable.sub_borrow else {
2510 return Ok(None);
2514 };
2515 let mut view = self.make_view();
2516 let mut out_buf: *const u8 = core::ptr::null();
2517 let mut out_len: usize = 0;
2518 let mut out_token: *mut c_void = core::ptr::null_mut();
2519 let rc = unsafe { borrow(&mut view, &mut out_buf, &mut out_len, &mut out_token) };
2522 if rc == 0 {
2523 return Ok(None);
2525 }
2526 if rc < 0 {
2527 return Err(error_from_ret(rc));
2528 }
2529 if out_buf.is_null() {
2530 return Ok(None);
2531 }
2532 let len = (rc as usize).min(out_len.max(rc as usize));
2533 Ok(Some(CffiView {
2534 buf: out_buf,
2535 len,
2536 token: out_token,
2537 subscriber: Some(self),
2538 }))
2539 }
2540}
2541
2542impl nros_rmw::Subscriber for CffiSubscriber {
2543 type Error = TransportError;
2544
2545 fn supports_process_in_place(&self) -> bool {
2546 self.supports_in_place
2547 }
2548
2549 fn process_raw_in_place(&mut self, f: impl FnOnce(&[u8])) -> Result<bool, Self::Error> {
2550 self.run_process_in_place(f)
2551 }
2552
2553 fn has_data(&self) -> bool {
2554 let view_ptr = self as *const _ as *mut Self;
2558 let mut view = unsafe { (*view_ptr).make_view() };
2559 let rc = unsafe { (self.vtable.has_data)(&mut view) };
2560 rc > 0
2561 }
2562
2563 fn try_recv_raw(&mut self, buf: &mut [u8]) -> Result<Option<usize>, TransportError> {
2564 let mut view = self.make_view();
2565 let rc = unsafe { (self.vtable.try_recv_raw)(&mut view, buf.as_mut_ptr(), buf.len()) };
2566 if rc == NROS_RMW_RET_NO_DATA {
2567 return Ok(None);
2568 }
2569 if rc < 0 {
2570 return Err(error_from_ret(rc));
2571 }
2572 if rc == 0 {
2573 return Ok(None);
2574 }
2575 Ok(Some(rc as usize))
2576 }
2577
2578 fn try_recv_raw_with_info(
2579 &mut self,
2580 buf: &mut [u8],
2581 ) -> Result<Option<(usize, Option<MessageInfo>)>, TransportError> {
2582 let key = self.backend_data as usize;
2583 self.try_recv_raw(buf)
2584 .map(|opt| opt.map(|len| (len, take_cffi_message_info(key))))
2585 }
2586
2587 #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
2588 fn try_recv_validated(
2589 &mut self,
2590 buf: &mut [u8],
2591 ) -> Result<Option<(usize, nros_rmw::IntegrityStatus)>, Self::Error> {
2592 let key = self.backend_data as usize;
2593 request_cffi_integrity_status(key);
2594 self.try_recv_raw(buf).map(|opt| {
2595 opt.map(|len| {
2596 (
2597 len,
2598 take_cffi_integrity_status(key).unwrap_or(nros_rmw::IntegrityStatus {
2599 gap: 0,
2600 duplicate: false,
2601 crc_valid: None,
2602 }),
2603 )
2604 })
2605 })
2606 }
2607
2608 fn try_recv_sequence(
2609 &mut self,
2610 buf: &mut [u8],
2611 per_msg_cap: usize,
2612 max_msgs: usize,
2613 out_lens: &mut [usize],
2614 ) -> Result<usize, TransportError> {
2615 if let Some(f) = self.vtable.try_recv_sequence {
2622 if per_msg_cap == 0 || max_msgs == 0 {
2623 return Ok(0);
2624 }
2625 let limit = max_msgs.min(out_lens.len());
2626 if buf.len() < limit.saturating_mul(per_msg_cap) {
2627 return Err(TransportError::BufferTooSmall);
2628 }
2629 let mut view = self.make_view();
2630 let rc = unsafe {
2631 f(
2632 &mut view,
2633 buf.as_mut_ptr(),
2634 per_msg_cap,
2635 limit,
2636 out_lens.as_mut_ptr(),
2637 )
2638 };
2639 if rc < 0 {
2640 return Err(error_from_ret(rc));
2641 }
2642 return Ok(rc as usize);
2643 }
2644 if per_msg_cap == 0 || max_msgs == 0 {
2651 return Ok(0);
2652 }
2653 let limit = max_msgs.min(out_lens.len());
2654 if buf.len() < limit.saturating_mul(per_msg_cap) {
2655 return Err(TransportError::BufferTooSmall);
2656 }
2657 let mut count = 0;
2658 for i in 0..limit {
2659 let slot = &mut buf[i * per_msg_cap..(i + 1) * per_msg_cap];
2660 match self.try_recv_raw(slot)? {
2661 Some(len) => {
2662 out_lens[i] = len;
2663 count += 1;
2664 }
2665 None => break,
2666 }
2667 }
2668 Ok(count)
2669 }
2670
2671 fn deserialization_error(&self) -> TransportError {
2672 TransportError::DeserializationError
2673 }
2674
2675 fn unsupported_event_error(&self) -> TransportError {
2676 TransportError::Unsupported
2677 }
2678
2679 unsafe fn register_event_callback(
2680 &mut self,
2681 kind: nros_rmw::EventKind,
2682 deadline_ms: u32,
2683 cb: nros_rmw::EventCallback,
2684 user_ctx: *mut core::ffi::c_void,
2685 ) -> Result<(), TransportError> {
2686 let mut view = self.make_view();
2687 let cb: NrosRmwEventCallback =
2688 unsafe { core::mem::transmute::<nros_rmw::EventCallback, NrosRmwEventCallback>(cb) };
2689 let ret = unsafe {
2690 (self.vtable.register_subscriber_event)(
2691 &mut view,
2692 kind.into(),
2693 deadline_ms,
2694 cb,
2695 user_ctx,
2696 )
2697 };
2698 if ret != NROS_RMW_RET_OK {
2699 return Err(error_from_ret(ret));
2700 }
2701 Ok(())
2702 }
2703}
2704
2705impl Drop for CffiSubscriber {
2706 fn drop(&mut self) {
2707 if !self.backend_data.is_null() {
2708 clear_cffi_message_info(self.backend_data as usize);
2709 let mut view = self.make_view();
2710 unsafe { (self.vtable.destroy_subscriber)(&mut view) };
2711 }
2712 }
2713}
2714
2715pub struct CffiServiceServer {
2721 vtable: &'static NrosRmwVtable,
2722 service_name_buf: [u8; NAME_BUF_LEN],
2723 type_name_buf: [u8; NAME_BUF_LEN],
2724 backend_data: *mut c_void,
2725}
2726
2727impl CffiServiceServer {
2728 fn make_view(&mut self) -> NrosRmwServiceServer {
2729 NrosRmwServiceServer {
2730 service_name: self.service_name_buf.as_ptr(),
2731 type_name: self.type_name_buf.as_ptr(),
2732 _reserved: [0u8; 8],
2733 backend_data: self.backend_data,
2734 }
2735 }
2736
2737 pub fn service_name(&self) -> &str {
2738 cstr_buf_to_str(&self.service_name_buf)
2739 }
2740
2741 pub fn type_name(&self) -> &str {
2742 cstr_buf_to_str(&self.type_name_buf)
2743 }
2744}
2745
2746impl ServiceServerTrait for CffiServiceServer {
2747 type Error = TransportError;
2748
2749 fn has_request(&self) -> bool {
2750 let view_ptr = self as *const _ as *mut Self;
2751 let mut view = unsafe { (*view_ptr).make_view() };
2752 let rc = unsafe { (self.vtable.has_request)(&mut view) };
2753 rc > 0
2754 }
2755
2756 fn try_recv_request<'a>(
2757 &mut self,
2758 buf: &'a mut [u8],
2759 ) -> Result<Option<ServiceRequest<'a>>, TransportError> {
2760 let mut seq: i64 = 0;
2761 let mut view = self.make_view();
2762 let rc = unsafe {
2763 (self.vtable.try_recv_request)(&mut view, buf.as_mut_ptr(), buf.len(), &mut seq)
2764 };
2765 if rc == NROS_RMW_RET_NO_DATA {
2766 return Ok(None);
2767 }
2768 if rc < 0 {
2769 return Err(error_from_ret(rc));
2770 }
2771 if rc == 0 {
2772 return Ok(None);
2773 }
2774 let len = rc as usize;
2775 Ok(Some(ServiceRequest {
2776 data: &buf[..len],
2777 sequence_number: seq,
2778 }))
2779 }
2780
2781 fn send_reply(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), TransportError> {
2782 let mut view = self.make_view();
2783 let ret = unsafe {
2784 (self.vtable.send_reply)(&mut view, sequence_number, data.as_ptr(), data.len())
2785 };
2786 if ret != NROS_RMW_RET_OK {
2787 return Err(error_from_ret(ret));
2788 }
2789 Ok(())
2790 }
2791}
2792
2793impl Drop for CffiServiceServer {
2794 fn drop(&mut self) {
2795 if !self.backend_data.is_null() {
2796 let mut view = self.make_view();
2797 unsafe { (self.vtable.destroy_service_server)(&mut view) };
2798 }
2799 }
2800}
2801
2802pub struct CffiServiceClient {
2808 vtable: &'static NrosRmwVtable,
2809 service_name_buf: [u8; NAME_BUF_LEN],
2810 type_name_buf: [u8; NAME_BUF_LEN],
2811 backend_data: *mut c_void,
2812 pending_len: usize,
2819}
2820
2821impl CffiServiceClient {
2822 fn make_view(&mut self) -> NrosRmwServiceClient {
2823 NrosRmwServiceClient {
2824 service_name: self.service_name_buf.as_ptr(),
2825 type_name: self.type_name_buf.as_ptr(),
2826 _reserved: [0u8; 8],
2827 backend_data: self.backend_data,
2828 }
2829 }
2830
2831 pub fn service_name(&self) -> &str {
2832 cstr_buf_to_str(&self.service_name_buf)
2833 }
2834
2835 pub fn type_name(&self) -> &str {
2836 cstr_buf_to_str(&self.type_name_buf)
2837 }
2838}
2839
2840impl ServiceClientTrait for CffiServiceClient {
2841 type Error = TransportError;
2842
2843 #[allow(deprecated)]
2844 fn call_raw(&mut self, request: &[u8], reply_buf: &mut [u8]) -> Result<usize, TransportError> {
2845 let mut view = self.make_view();
2846 let rc = unsafe {
2847 (self.vtable.call_raw)(
2848 &mut view,
2849 request.as_ptr(),
2850 request.len(),
2851 reply_buf.as_mut_ptr(),
2852 reply_buf.len(),
2853 )
2854 };
2855 if rc < 0 {
2856 return Err(error_from_ret(rc));
2857 }
2858 Ok(rc as usize)
2859 }
2860
2861 fn send_request_raw(&mut self, request: &[u8]) -> Result<(), TransportError> {
2862 let Some(f) = self.vtable.send_request_raw else {
2873 return Err(TransportError::Unsupported);
2874 };
2875 let mut view = self.make_view();
2876 let rc = unsafe { f(&mut view, request.as_ptr(), request.len()) };
2877 if rc != NROS_RMW_RET_OK {
2878 return Err(error_from_ret(rc));
2879 }
2880 self.pending_len = request.len().max(1);
2881 Ok(())
2882 }
2883
2884 fn try_recv_reply_raw(
2885 &mut self,
2886 reply_buf: &mut [u8],
2887 ) -> Result<Option<usize>, TransportError> {
2888 let Some(f) = self.vtable.try_recv_reply_raw else {
2892 return Err(TransportError::Unsupported);
2893 };
2894 let mut view = self.make_view();
2895 let rc = unsafe { f(&mut view, reply_buf.as_mut_ptr(), reply_buf.len()) };
2896 if rc == NROS_RMW_RET_NO_DATA {
2897 return Ok(None);
2898 }
2899 if rc < 0 {
2900 self.pending_len = 0;
2901 return Err(error_from_ret(rc));
2902 }
2903 self.pending_len = 0;
2904 Ok(Some(rc as usize))
2905 }
2906
2907 fn server_available(&self) -> Result<bool, TransportError> {
2908 let Some(f) = self.vtable.service_server_available else {
2909 return Err(TransportError::Unsupported);
2910 };
2911 let mut view = NrosRmwServiceClient {
2918 service_name: self.service_name_buf.as_ptr(),
2919 type_name: self.type_name_buf.as_ptr(),
2920 _reserved: [0u8; 8],
2921 backend_data: self.backend_data,
2922 };
2923 let rc = unsafe { f(&mut view) };
2924 match rc {
2925 0 => Ok(false),
2926 1 => Ok(true),
2927 n if n < 0 => Err(error_from_ret(n)),
2928 _ => Ok(true),
2932 }
2933 }
2934}
2935
2936impl Drop for CffiServiceClient {
2937 fn drop(&mut self) {
2938 if !self.backend_data.is_null() {
2939 let mut view = self.make_view();
2940 unsafe { (self.vtable.destroy_service_client)(&mut view) };
2941 }
2942 }
2943}
2944
2945#[derive(Default)]
2951pub struct CffiRmw;
2952
2953impl nros_rmw::Rmw for CffiRmw {
2954 type Session = CffiSession;
2955 type Error = TransportError;
2956
2957 fn open(self, config: &nros_rmw::RmwConfig) -> Result<CffiSession, TransportError> {
2958 let mode = match config.mode {
2959 nros_rmw::SessionMode::Client => 0u8,
2960 nros_rmw::SessionMode::Peer => 1u8,
2961 };
2962 CffiSession::open(config.locator, mode, config.domain_id, config.node_name)
2963 }
2964}
2965
2966impl CffiRmw {
2967 pub fn open_with_rmw(
2971 rmw_name: &str,
2972 config: &nros_rmw::RmwConfig,
2973 ) -> Result<CffiSession, TransportError> {
2974 let mode = match config.mode {
2975 nros_rmw::SessionMode::Client => 0u8,
2976 nros_rmw::SessionMode::Peer => 1u8,
2977 };
2978 CffiSession::open_named(
2979 rmw_name,
2980 config.locator,
2981 mode,
2982 config.domain_id,
2983 config.node_name,
2984 )
2985 }
2986}
2987
2988#[cfg(test)]
3002#[allow(static_mut_refs)]
3003mod tests {
3004 use super::*;
3005 use nros_rmw::{Rmw, RmwConfig, Session, SessionMode, TopicInfo};
3006
3007 static mut STUB_OPEN_CALLED: bool = false;
3010 static mut STUB_CREATE_PUB_CALLED: bool = false;
3011 static mut STUB_PUBLISH_CALLED: bool = false;
3012 static mut STUB_LAST_TOPIC_NAME: [u8; 64] = [0u8; 64];
3013 static mut STUB_LAST_TYPE_NAME: [u8; 64] = [0u8; 64];
3014 static mut STUB_LAST_QOS: NrosRmwQos = NrosRmwQos {
3015 reliability: 0,
3016 durability: 0,
3017 history: 0,
3018 liveliness_kind: 0,
3019 depth: 0,
3020 _reserved0: 0,
3021 deadline_ms: 0,
3022 lifespan_ms: 0,
3023 liveliness_lease_ms: 0,
3024 avoid_ros_namespace_conventions: 0,
3025 _reserved1: [0; 3],
3026 rx_buffer_hint: 0,
3027 };
3028
3029 unsafe fn copy_cstr(src: *const u8, dst: &mut [u8]) {
3033 let mut i = 0;
3034 while i < dst.len() {
3035 let b = unsafe { *src.add(i) };
3036 dst[i] = b;
3037 if b == 0 {
3038 break;
3039 }
3040 i += 1;
3041 }
3042 }
3043
3044 unsafe extern "C" fn stub_open(
3045 _locator: *const u8,
3046 _mode: u8,
3047 _domain_id: u32,
3048 _node_name: *const u8,
3049 out: *mut NrosRmwSession,
3050 ) -> NrosRmwRet {
3051 unsafe {
3052 STUB_OPEN_CALLED = true;
3053 (*out).backend_data = 0xDEAD_BEEFusize as *mut c_void;
3054 }
3055 NROS_RMW_RET_OK
3056 }
3057
3058 unsafe extern "C" fn stub_close(_session: *mut NrosRmwSession) -> NrosRmwRet {
3059 NROS_RMW_RET_OK
3060 }
3061
3062 unsafe extern "C" fn stub_drive_io(
3063 _session: *mut NrosRmwSession,
3064 _timeout_ms: i32,
3065 ) -> NrosRmwRet {
3066 NROS_RMW_RET_OK
3067 }
3068
3069 unsafe extern "C" fn stub_create_publisher(
3070 _session: *mut NrosRmwSession,
3071 _topic_name: *const u8,
3072 _type_name: *const u8,
3073 _type_hash: *const u8,
3074 _domain_id: u32,
3075 qos: *const NrosRmwQos,
3076 out: *mut NrosRmwPublisher,
3077 ) -> NrosRmwRet {
3078 unsafe {
3080 STUB_CREATE_PUB_CALLED = true;
3081 copy_cstr((*out).topic_name, &mut STUB_LAST_TOPIC_NAME);
3082 copy_cstr((*out).type_name, &mut STUB_LAST_TYPE_NAME);
3083 STUB_LAST_QOS = *qos;
3084 (*out).backend_data = 0xCAFEusize as *mut c_void;
3085 (*out).can_loan_messages = true;
3086 }
3087 NROS_RMW_RET_OK
3088 }
3089
3090 unsafe extern "C" fn stub_destroy_publisher(_publisher: *mut NrosRmwPublisher) {}
3091
3092 unsafe extern "C" fn stub_publish_raw(
3093 publisher: *mut NrosRmwPublisher,
3094 _data: *const u8,
3095 _len: usize,
3096 ) -> NrosRmwRet {
3097 unsafe {
3100 STUB_PUBLISH_CALLED = true;
3101 assert_eq!((*publisher).backend_data as usize, 0xCAFE);
3102 let mut buf = [0u8; 64];
3103 copy_cstr((*publisher).topic_name, &mut buf);
3104 assert_eq!(&buf[..], &STUB_LAST_TOPIC_NAME);
3105 }
3106 NROS_RMW_RET_OK
3107 }
3108
3109 unsafe extern "C" fn stub_create_subscriber(
3110 _: *mut NrosRmwSession,
3111 _: *const u8,
3112 _: *const u8,
3113 _: *const u8,
3114 _: u32,
3115 _: *const NrosRmwQos,
3116 out: *mut NrosRmwSubscriber,
3117 ) -> NrosRmwRet {
3118 unsafe {
3119 (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3120 }
3121 NROS_RMW_RET_OK
3122 }
3123 unsafe extern "C" fn stub_destroy_subscriber(_: *mut NrosRmwSubscriber) {}
3124 unsafe extern "C" fn stub_try_recv_raw(_: *mut NrosRmwSubscriber, _: *mut u8, _: usize) -> i32 {
3125 0
3126 }
3127 unsafe extern "C" fn stub_has_data(_: *mut NrosRmwSubscriber) -> i32 {
3128 0
3129 }
3130
3131 unsafe extern "C" fn stub_create_service_server(
3132 _: *mut NrosRmwSession,
3133 _: *const u8,
3134 _: *const u8,
3135 _: *const u8,
3136 _: u32,
3137 _: *const NrosRmwQos,
3138 out: *mut NrosRmwServiceServer,
3139 ) -> NrosRmwRet {
3140 unsafe {
3141 (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3142 }
3143 NROS_RMW_RET_OK
3144 }
3145 unsafe extern "C" fn stub_destroy_service_server(_: *mut NrosRmwServiceServer) {}
3146 unsafe extern "C" fn stub_try_recv_request(
3147 _: *mut NrosRmwServiceServer,
3148 _: *mut u8,
3149 _: usize,
3150 _: *mut i64,
3151 ) -> i32 {
3152 NROS_RMW_RET_NO_DATA
3153 }
3154 unsafe extern "C" fn stub_has_request(_: *mut NrosRmwServiceServer) -> i32 {
3155 0
3156 }
3157 unsafe extern "C" fn stub_send_reply(
3158 _: *mut NrosRmwServiceServer,
3159 _: i64,
3160 _: *const u8,
3161 _: usize,
3162 ) -> NrosRmwRet {
3163 NROS_RMW_RET_OK
3164 }
3165
3166 unsafe extern "C" fn stub_create_service_client(
3167 _: *mut NrosRmwSession,
3168 _: *const u8,
3169 _: *const u8,
3170 _: *const u8,
3171 _: u32,
3172 _: *const NrosRmwQos,
3173 out: *mut NrosRmwServiceClient,
3174 ) -> NrosRmwRet {
3175 unsafe {
3176 (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3177 }
3178 NROS_RMW_RET_OK
3179 }
3180 unsafe extern "C" fn stub_destroy_service_client(_: *mut NrosRmwServiceClient) {}
3181 unsafe extern "C" fn stub_call_raw(
3182 _: *mut NrosRmwServiceClient,
3183 _: *const u8,
3184 _: usize,
3185 _: *mut u8,
3186 _: usize,
3187 ) -> i32 {
3188 0
3189 }
3190
3191 unsafe extern "C" fn stub_register_subscriber_event(
3192 _: *mut NrosRmwSubscriber,
3193 _: NrosRmwEventKind,
3194 _: u32,
3195 _: NrosRmwEventCallback,
3196 _: *mut c_void,
3197 ) -> NrosRmwRet {
3198 NROS_RMW_RET_UNSUPPORTED
3199 }
3200 unsafe extern "C" fn stub_register_publisher_event(
3201 _: *mut NrosRmwPublisher,
3202 _: NrosRmwEventKind,
3203 _: u32,
3204 _: NrosRmwEventCallback,
3205 _: *mut c_void,
3206 ) -> NrosRmwRet {
3207 NROS_RMW_RET_UNSUPPORTED
3208 }
3209 unsafe extern "C" fn stub_assert_publisher_liveliness(_: *mut NrosRmwPublisher) -> NrosRmwRet {
3210 NROS_RMW_RET_UNSUPPORTED
3211 }
3212
3213 static STUB_VTABLE: NrosRmwVtable = NrosRmwVtable {
3214 open: stub_open,
3215 close: stub_close,
3216 drive_io: stub_drive_io,
3217 create_publisher: stub_create_publisher,
3218 destroy_publisher: stub_destroy_publisher,
3219 publish_raw: stub_publish_raw,
3220 create_subscriber: stub_create_subscriber,
3221 destroy_subscriber: stub_destroy_subscriber,
3222 try_recv_raw: stub_try_recv_raw,
3223 has_data: stub_has_data,
3224 create_service_server: stub_create_service_server,
3225 destroy_service_server: stub_destroy_service_server,
3226 try_recv_request: stub_try_recv_request,
3227 has_request: stub_has_request,
3228 send_reply: stub_send_reply,
3229 create_service_client: stub_create_service_client,
3230 destroy_service_client: stub_destroy_service_client,
3231 call_raw: stub_call_raw,
3232 send_request_raw: None,
3233 try_recv_reply_raw: None,
3234 register_subscriber_event: stub_register_subscriber_event,
3235 register_publisher_event: stub_register_publisher_event,
3236 assert_publisher_liveliness: stub_assert_publisher_liveliness,
3237 next_deadline_ms: None,
3238 set_wake_callback: None,
3239 pub_loan: None,
3240 pub_commit: None,
3241 pub_discard: None,
3242 sub_borrow: None,
3243 sub_release: None,
3244 service_server_available: None,
3245 try_recv_sequence: None,
3246 publish_streamed: None,
3247 ping_session: None,
3248 subscriber_supports_in_place: None,
3249 process_raw_in_place: None,
3250 };
3251
3252 #[test]
3253 fn service_server_no_data_maps_to_none() {
3254 use nros_rmw::ServiceServerTrait as _;
3255
3256 let mut server = CffiServiceServer {
3257 vtable: &STUB_VTABLE,
3258 service_name_buf: [0u8; NAME_BUF_LEN],
3259 type_name_buf: [0u8; NAME_BUF_LEN],
3260 backend_data: core::ptr::dangling_mut::<c_void>(),
3261 };
3262 let mut buf = [0u8; 16];
3263
3264 assert!(server.try_recv_request(&mut buf).unwrap().is_none());
3265 }
3266
3267 #[test]
3268 fn typed_struct_roundtrip() {
3269 let ret = unsafe { nros_rmw_cffi_register_named(c"default".as_ptr(), &STUB_VTABLE) };
3271 assert_eq!(ret, NROS_RMW_RET_OK);
3272
3273 let cfg = RmwConfig {
3275 mode: SessionMode::Client,
3276 locator: "tcp/127.0.0.1:7447",
3277 domain_id: 0,
3278 node_name: "test_node",
3279 namespace: "",
3280 properties: &[],
3281 };
3282 let mut session = Rmw::open(CffiRmw, &cfg).expect("session open");
3283 assert!(unsafe { STUB_OPEN_CALLED });
3284 assert_eq!(session.node_name(), "test_node");
3285
3286 let topic = TopicInfo::new("/chatter", "std_msgs/msg/Int32", "RIHS01_abc");
3289 let qos = nros_rmw::QosSettings::default();
3290 let publisher = session
3291 .create_publisher(&topic, qos)
3292 .expect("publisher create");
3293 assert!(unsafe { STUB_CREATE_PUB_CALLED });
3294 let topic_buf = unsafe { &STUB_LAST_TOPIC_NAME };
3295 assert_eq!(
3296 core::str::from_utf8(topic_buf)
3297 .unwrap_or("")
3298 .trim_end_matches('\0'),
3299 "/chatter"
3300 );
3301 let type_buf = unsafe { &STUB_LAST_TYPE_NAME };
3302 assert_eq!(
3303 core::str::from_utf8(type_buf)
3304 .unwrap_or("")
3305 .trim_end_matches('\0'),
3306 "std_msgs/msg/Int32"
3307 );
3308
3309 assert_eq!(publisher.topic_name(), "/chatter");
3311 assert_eq!(publisher.type_name(), "std_msgs/msg/Int32");
3312 assert!(publisher.can_loan_messages());
3313
3314 use nros_rmw::Publisher as _;
3317 publisher.publish_raw(&[1u8, 2, 3]).expect("publish");
3318 assert!(unsafe { STUB_PUBLISH_CALLED });
3319 }
3320}