Skip to main content

nros_rmw_cffi/
lib.rs

1//! C function table adapter for nros RMW backends.
2//!
3//! This crate provides a vtable-based bridge so that backends written in C,
4//! C++, Zig, Ada, or any language with a C-compatible ABI can implement the
5//! nros `Session` / `Publisher` / `Subscriber` / service traits without
6//! writing Rust code.
7//!
8//! # Usage (C backend implementor)
9//!
10//! 1. Include `<nros/rmw_vtable.h>`
11//! 2. Implement all function pointers in `nros_rmw_vtable_t`
12//! 3. Call `nros_rmw_cffi_register(&my_vtable)` before creating sessions
13//!
14//! # Usage (Rust consumer)
15//!
16//! Enable the `rmw-cffi` feature on `nros` and use `Executor<CffiSession>`.
17
18#![no_std]
19
20#[cfg(feature = "alloc")]
21extern crate alloc;
22
23#[cfg(feature = "std")]
24extern crate std;
25
26use core::{cell::UnsafeCell, ffi::c_void, sync::atomic::Ordering};
27
28use nros_rmw::{
29    MessageInfo, Publisher, QosDurabilityPolicy, QosHistoryPolicy, QosReliabilityPolicy,
30    QosSettings, ServiceClientTrait, ServiceInfo, ServiceRequest, ServiceServerTrait, Session,
31    TopicInfo, TransportError,
32};
33
34// Phase 115.L.0 — generic Rust→C-vtable adapter. Lives behind the
35// `alloc` feature because each entity handle is boxed for stable
36// address mgmt; every nros backend already requires alloc.
37#[cfg(feature = "alloc")]
38pub mod rust_adapter;
39
40#[cfg(feature = "alloc")]
41pub use rust_adapter::{RustBackend, RustBackendAdapter};
42
43// Phase 249 P4b.1 — `.init_array` ctor self-registration
44// (`nros_rmw_register_backend!` macro lives here).
45pub mod section;
46
47// ============================================================================
48// Phase 102.1 — `nros_rmw_ret_t` named return codes
49// ============================================================================
50//
51// Mirrors the macro constants in `<nros/rmw_ret.h>`. The C side uses
52// `#define` so future additions don't widen the type; the Rust side
53// uses `pub const` so the same names are usable by Rust code that
54// crosses the C-vtable boundary.
55
56/// Signed 32-bit status code mirroring the C `nros_rmw_ret_t` typedef.
57/// Zero on success; negative on error.
58pub type NrosRmwRet = i32;
59
60/// Operation completed successfully.
61pub const NROS_RMW_RET_OK: NrosRmwRet = 0;
62/// Generic failure not covered by a more specific code.
63pub const NROS_RMW_RET_ERROR: NrosRmwRet = -1;
64/// Operation deadline elapsed before completion.
65pub const NROS_RMW_RET_TIMEOUT: NrosRmwRet = -2;
66/// Memory allocation failed.
67pub const NROS_RMW_RET_BAD_ALLOC: NrosRmwRet = -3;
68/// Caller supplied a NULL pointer or an out-of-range value.
69pub const NROS_RMW_RET_INVALID_ARGUMENT: NrosRmwRet = -4;
70/// The backend does not implement this operation.
71pub const NROS_RMW_RET_UNSUPPORTED: NrosRmwRet = -5;
72/// QoS profiles incompatible in a way the backend cannot reconcile.
73pub const NROS_RMW_RET_INCOMPATIBLE_QOS: NrosRmwRet = -6;
74/// Topic, service, or action name failed validation.
75pub const NROS_RMW_RET_TOPIC_NAME_INVALID: NrosRmwRet = -7;
76/// Request referenced a node that does not exist in this session.
77pub const NROS_RMW_RET_NODE_NAME_NON_EXISTENT: NrosRmwRet = -8;
78/// Backend does not support loaned messages on this entity, or slot in use.
79pub const NROS_RMW_RET_LOAN_NOT_SUPPORTED: NrosRmwRet = -9;
80/// No data on a non-blocking receive (distinct from `TIMEOUT`).
81pub const NROS_RMW_RET_NO_DATA: NrosRmwRet = -10;
82/// Resource momentarily unavailable; caller should retry.
83pub const NROS_RMW_RET_WOULD_BLOCK: NrosRmwRet = -11;
84/// Caller buffer smaller than the data the backend wants to deliver.
85pub const NROS_RMW_RET_BUFFER_TOO_SMALL: NrosRmwRet = -12;
86/// Incoming message exceeded the backend's static capacity.
87pub const NROS_RMW_RET_MESSAGE_TOO_LARGE: NrosRmwRet = -13;
88
89// Anchor every C-stub-transport symbol so they survive
90// `--gc-sections` when integration tests link against
91// `libnros_rmw_cffi`. Only compiled when the c-stub-test feature
92// is on; otherwise no C anchor + no toolchain dep.
93#[cfg(feature = "c-stub-test")]
94unsafe extern "C" {
95    fn nros_c_stub_make_ops(out: *mut core::ffi::c_void);
96    fn nros_c_stub_reset_counters();
97    fn nros_c_stub_get_open_calls() -> u32;
98    fn nros_c_stub_get_close_calls() -> u32;
99    fn nros_c_stub_get_write_calls() -> u32;
100    fn nros_c_stub_get_read_calls() -> u32;
101}
102#[cfg(feature = "c-stub-test")]
103#[doc(hidden)]
104pub fn _c_stub_transport_vtable_anchor() -> [*const core::ffi::c_void; 6] {
105    [
106        nros_c_stub_make_ops as *const _,
107        nros_c_stub_reset_counters as *const _,
108        nros_c_stub_get_open_calls as *const _,
109        nros_c_stub_get_close_calls as *const _,
110        nros_c_stub_get_write_calls as *const _,
111        nros_c_stub_get_read_calls as *const _,
112    ]
113}
114/// Phase 115.A.2 — caller's vtable struct has an `abi_version` the
115/// runtime doesn't know. Returned by entry points that take a
116/// versioned vtable struct (`nros_set_custom_transport`,
117/// `nros_cpp_set_custom_transport`, …) when
118/// `vtable.abi_version != NROS_RMW_*_ABI_VERSION_VN`.
119pub const NROS_RMW_RET_INCOMPATIBLE_ABI: NrosRmwRet = -14;
120
121/// Phase 128.A.3 — `Executor::open` / `nros::init` could not pick a
122/// unique backend because no `nros-rmw-*` crate (or static lib) is
123/// linked into this binary.
124pub const NROS_RMW_RET_NO_BACKEND: NrosRmwRet = -15;
125
126/// Phase 128.A.3 — more than one backend is linked and no
127/// `NROS_RMW=<name>` selector was supplied. Caller must either set
128/// the env var or use `Executor::open_multi`.
129pub const NROS_RMW_RET_AMBIGUOUS_BACKEND: NrosRmwRet = -16;
130
131/// Phase 128.A.3 — selector pointed at a backend name that is not
132/// in the registry (mis-spelling or missing `nros-rmw-<name>` dep).
133pub const NROS_RMW_RET_UNKNOWN_BACKEND: NrosRmwRet = -17;
134
135/// Phase 155.B.3 — backend reached the wire but couldn't establish a
136/// session. Maps to / from `TransportError::ConnectionFailed` /
137/// `Disconnected`. Distinct from `NROS_RMW_RET_ERROR` so callers can
138/// tell "can't reach the router" from "internal backend invariant
139/// tripped".
140pub const NROS_RMW_RET_CONNECTION_FAILED: NrosRmwRet = -18;
141
142/// Map a `TransportError` to the corresponding `nros_rmw_ret_t` code.
143///
144/// By-reference because `TransportError` carries a `String` on its
145/// dynamic-diagnostic variant and is not `Copy`. The string itself is
146/// dropped at the boundary — embedded RMW callers cannot afford a
147/// thread-local error buffer.
148pub fn ret_from_error(err: &TransportError) -> NrosRmwRet {
149    match err {
150        TransportError::Timeout => NROS_RMW_RET_TIMEOUT,
151        TransportError::WouldBlock => NROS_RMW_RET_WOULD_BLOCK,
152        TransportError::TooLarge => NROS_RMW_RET_MESSAGE_TOO_LARGE,
153        TransportError::BufferTooSmall => NROS_RMW_RET_BUFFER_TOO_SMALL,
154        TransportError::MessageTooLarge => NROS_RMW_RET_MESSAGE_TOO_LARGE,
155        TransportError::InvalidArgument => NROS_RMW_RET_INVALID_ARGUMENT,
156        TransportError::InvalidConfig => NROS_RMW_RET_INVALID_ARGUMENT,
157        TransportError::Unsupported => NROS_RMW_RET_UNSUPPORTED,
158        TransportError::BadAlloc => NROS_RMW_RET_BAD_ALLOC,
159        TransportError::IncompatibleQos => NROS_RMW_RET_INCOMPATIBLE_QOS,
160        TransportError::TopicNameInvalid => NROS_RMW_RET_TOPIC_NAME_INVALID,
161        TransportError::NodeNameNonExistent => NROS_RMW_RET_NODE_NAME_NON_EXISTENT,
162        TransportError::LoanNotSupported => NROS_RMW_RET_LOAN_NOT_SUPPORTED,
163        TransportError::NoData => NROS_RMW_RET_NO_DATA,
164        TransportError::IncompatibleAbi => NROS_RMW_RET_INCOMPATIBLE_ABI,
165        // Phase 155.B.3 — distinguish wire-level connection failure
166        // from generic backend error so the FreeRTOS / RV64 C+C++
167        // `init -> -X` logs identify the actual class. zenoh-pico's
168        // `ZpicoError::Session` (zpico_open returned -3) and
169        // `ZpicoError::Generic` (zpico_init returned -1) both flow
170        // through `ZpicoError → ConnectionFailed`; the cmake-built
171        // FreeRTOS C/C++ tests will now surface NOT_FOUND (the
172        // user-side mapping in `nros_support_init`) instead of the
173        // generic NROS_RET_ERROR catch-all.
174        TransportError::ConnectionFailed | TransportError::Disconnected => {
175            NROS_RMW_RET_CONNECTION_FAILED
176        }
177        // Everything else collapses to NROS_RMW_RET_ERROR. Backends
178        // that want fine-grained reporting should adopt the named
179        // variants above (Phase 102.2 sweep).
180        _ => NROS_RMW_RET_ERROR,
181    }
182}
183
184/// Map a `nros_rmw_ret_t` returned by a C-side vtable function back to
185/// a `TransportError` for the Rust caller. Inverse of `ret_from_error`
186/// — used when `nros-rmw-cffi`'s `CffiSession` etc. receive a code
187/// from the registered C backend.
188///
189/// `NROS_RMW_RET_OK` is mapped to `TransportError::Backend("ok")` as a
190/// programming-error sentinel; callers should branch on the success
191/// path before calling this. Unknown negative codes collapse to the
192/// generic `TransportError::Backend("unknown rmw_ret_t")` so a future
193/// constant added to the C header degrades gracefully on the Rust side.
194pub fn error_from_ret(ret: NrosRmwRet) -> TransportError {
195    match ret {
196        NROS_RMW_RET_OK => {
197            TransportError::Backend("ok (logic error: positive ret_t at error site)")
198        }
199        NROS_RMW_RET_ERROR => TransportError::Backend("rmw_ret error"),
200        NROS_RMW_RET_TIMEOUT => TransportError::Timeout,
201        NROS_RMW_RET_BAD_ALLOC => TransportError::BadAlloc,
202        NROS_RMW_RET_INVALID_ARGUMENT => TransportError::InvalidArgument,
203        NROS_RMW_RET_UNSUPPORTED => TransportError::Unsupported,
204        NROS_RMW_RET_INCOMPATIBLE_QOS => TransportError::IncompatibleQos,
205        NROS_RMW_RET_TOPIC_NAME_INVALID => TransportError::TopicNameInvalid,
206        NROS_RMW_RET_NODE_NAME_NON_EXISTENT => TransportError::NodeNameNonExistent,
207        NROS_RMW_RET_LOAN_NOT_SUPPORTED => TransportError::LoanNotSupported,
208        NROS_RMW_RET_NO_DATA => TransportError::NoData,
209        NROS_RMW_RET_WOULD_BLOCK => TransportError::WouldBlock,
210        NROS_RMW_RET_BUFFER_TOO_SMALL => TransportError::BufferTooSmall,
211        NROS_RMW_RET_MESSAGE_TOO_LARGE => TransportError::MessageTooLarge,
212        NROS_RMW_RET_INCOMPATIBLE_ABI => TransportError::IncompatibleAbi,
213        // Phase 155.B.3 — inverse of `ret_from_error`'s
214        // `ConnectionFailed | Disconnected → CONNECTION_FAILED`
215        // mapping. Decodes the new vtable-level code back to the
216        // `TransportError::ConnectionFailed` variant; downstream
217        // `transport_error_to_ret` in nros-c surfaces it as
218        // `NROS_RET_NOT_FOUND` (-4) to the user.
219        NROS_RMW_RET_CONNECTION_FAILED => TransportError::ConnectionFailed,
220        _ => TransportError::Backend("unknown rmw_ret_t"),
221    }
222}
223
224// ============================================================================
225// Phase 102.3 — typed entity structs (mirrors `<nros/rmw_entity.h>`)
226// ============================================================================
227//
228// These structs are layout-compatible with the typed entity structs
229// in the C header. Same shape as upstream `rmw.h`'s `rmw_publisher_t`
230// / `rmw_subscription_t` family: visible metadata + a `void * data`
231// tail (named `backend_data` here).
232
233/// Liveliness kind values for `NrosRmwQos::liveliness_kind`.
234#[repr(u8)]
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum NrosRmwLivelinessKind {
237    None = 0,
238    Automatic = 1,
239    ManualByTopic = 2,
240    ManualByNode = 3,
241}
242
243/// Full DDS-shaped QoS profile. Mirrors `nros_rmw_qos_t` from
244/// `<nros/rmw_entity.h>`.
245#[repr(C)]
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct NrosRmwQos {
248    /// Reliability policy: `0` = best-effort, `1` = reliable.
249    pub reliability: u8,
250    /// Durability policy: `0` = volatile, `1` = transient-local.
251    pub durability: u8,
252    /// History policy: `0` = keep-last, `1` = keep-all.
253    pub history: u8,
254    /// Liveliness kind. See [`NrosRmwLivelinessKind`].
255    pub liveliness_kind: u8,
256    /// History depth (0–65 535).
257    pub depth: u16,
258    /// Reserved; must be zero.
259    pub _reserved0: u16,
260
261    /// Subscriber max-inter-arrival / publisher offered-rate, ms.
262    /// `0` = infinite (no deadline).
263    pub deadline_ms: u32,
264    /// Sample expiry, ms. `0` = infinite.
265    pub lifespan_ms: u32,
266    /// Liveliness lease, ms. `0` = infinite.
267    pub liveliness_lease_ms: u32,
268    /// If non-zero, topic name encoding skips the ROS `/rt/` prefix.
269    /// `u8` instead of `bool` for ABI parity with C — `sizeof(_Bool)`
270    /// is impl-defined per C99.
271    pub avoid_ros_namespace_conventions: u8,
272    /// Reserved; must be zero.
273    pub _reserved1: [u8; 3],
274    /// Phase 231 (RFC-0038) — subscription receive-buffer size hint, bytes.
275    /// Carries `TopicInfo::rx_buffer_hint` across the C ABI to `create_subscriber`
276    /// so a size-classing backend (zenoh-pico) can pick a small/large receive
277    /// buffer. `0` = unset. Appended at the struct tail (ABI-append); ignored by
278    /// every slot except `create_subscriber`.
279    pub rx_buffer_hint: u32,
280}
281
282/// Standard `rmw_qos_profile_default`-equivalent.
283pub const NROS_RMW_QOS_PROFILE_DEFAULT: NrosRmwQos = NrosRmwQos {
284    reliability: 1, // RELIABLE
285    durability: 0,  // VOLATILE
286    history: 0,     // KEEP_LAST
287    liveliness_kind: NrosRmwLivelinessKind::Automatic as u8,
288    depth: 10,
289    _reserved0: 0,
290    deadline_ms: 0,
291    lifespan_ms: 0,
292    liveliness_lease_ms: 0,
293    avoid_ros_namespace_conventions: 0,
294    _reserved1: [0; 3],
295    rx_buffer_hint: 0,
296};
297
298/// Standard `rmw_qos_profile_sensor_data`-equivalent.
299pub const NROS_RMW_QOS_PROFILE_SENSOR_DATA: NrosRmwQos = NrosRmwQos {
300    reliability: 0, // BEST_EFFORT
301    durability: 0,  // VOLATILE
302    history: 0,     // KEEP_LAST
303    liveliness_kind: NrosRmwLivelinessKind::Automatic as u8,
304    depth: 5,
305    _reserved0: 0,
306    deadline_ms: 0,
307    lifespan_ms: 0,
308    liveliness_lease_ms: 0,
309    avoid_ros_namespace_conventions: 0,
310    _reserved1: [0; 3],
311    rx_buffer_hint: 0,
312};
313
314/// Standard `rmw_qos_profile_services_default`-equivalent.
315pub const NROS_RMW_QOS_PROFILE_SERVICES_DEFAULT: NrosRmwQos = NROS_RMW_QOS_PROFILE_DEFAULT;
316
317/// Standard `rmw_qos_profile_parameters`-equivalent.
318pub const NROS_RMW_QOS_PROFILE_PARAMETERS: NrosRmwQos = NrosRmwQos {
319    depth: 1000,
320    ..NROS_RMW_QOS_PROFILE_DEFAULT
321};
322
323/// Standard `rmw_qos_profile_system_default`-equivalent.
324pub const NROS_RMW_QOS_PROFILE_SYSTEM_DEFAULT: NrosRmwQos = NROS_RMW_QOS_PROFILE_DEFAULT;
325
326/// Per-process RMW session. Mirrors `nros_rmw_session_t`.
327#[repr(C)]
328pub struct NrosRmwSession {
329    /// Borrowed; outlives the session.
330    pub node_name: *const u8,
331    /// Borrowed; outlives the session.
332    pub namespace_: *const u8,
333    /// Reserved for future fields (Phase 104 vtable pointer slot);
334    /// must be zero.
335    pub _reserved: [u8; 8],
336    /// Opaque backend state. NULL when uninitialised.
337    pub backend_data: *mut c_void,
338}
339
340/// Publisher entity. Mirrors `nros_rmw_publisher_t`.
341///
342/// `can_loan_messages` matches upstream `rmw_publisher_t`'s field of
343/// the same name: `true` means the backend exposes the
344/// `loan_publish` / `commit_publish` primitive (Phase 99).
345#[repr(C)]
346pub struct NrosRmwPublisher {
347    /// Borrowed; outlives the publisher.
348    pub topic_name: *const u8,
349    /// Borrowed; outlives the publisher.
350    pub type_name: *const u8,
351    pub qos: NrosRmwQos,
352    /// Backend exposes loan_publish / commit_publish (Phase 99).
353    pub can_loan_messages: bool,
354    /// Reserved for future fields; must be zero.
355    pub _reserved: [u8; 7],
356    /// Opaque backend state. NULL when creation failed.
357    pub backend_data: *mut c_void,
358}
359
360/// Subscriber entity. Mirrors `nros_rmw_subscriber_t`.
361#[repr(C)]
362pub struct NrosRmwSubscriber {
363    /// Borrowed; outlives the subscriber.
364    pub topic_name: *const u8,
365    /// Borrowed; outlives the subscriber.
366    pub type_name: *const u8,
367    pub qos: NrosRmwQos,
368    /// Backend exposes loan_recv / release_recv (Phase 99).
369    pub can_loan_messages: bool,
370    /// Reserved for future fields; must be zero.
371    pub _reserved: [u8; 7],
372    /// Opaque backend state. NULL when creation failed.
373    pub backend_data: *mut c_void,
374}
375
376/// Service-server entity. Mirrors `nros_rmw_service_server_t`.
377#[repr(C)]
378pub struct NrosRmwServiceServer {
379    /// Borrowed; outlives the server.
380    pub service_name: *const u8,
381    /// Borrowed; outlives the server.
382    pub type_name: *const u8,
383    /// Reserved for future fields; must be zero.
384    pub _reserved: [u8; 8],
385    /// Opaque backend state. NULL when creation failed.
386    pub backend_data: *mut c_void,
387}
388
389/// Service-client entity. Mirrors `nros_rmw_service_client_t`.
390#[repr(C)]
391pub struct NrosRmwServiceClient {
392    /// Borrowed; outlives the client.
393    pub service_name: *const u8,
394    /// Borrowed; outlives the client.
395    pub type_name: *const u8,
396    /// Reserved for future fields; must be zero.
397    pub _reserved: [u8; 8],
398    /// Opaque backend state. NULL when creation failed.
399    pub backend_data: *mut c_void,
400}
401
402impl From<QosSettings> for NrosRmwQos {
403    fn from(qos: QosSettings) -> Self {
404        Self {
405            reliability: match qos.reliability {
406                QosReliabilityPolicy::BestEffort => 0,
407                QosReliabilityPolicy::Reliable => 1,
408            },
409            durability: match qos.durability {
410                QosDurabilityPolicy::Volatile => 0,
411                QosDurabilityPolicy::TransientLocal => 1,
412            },
413            history: match qos.history {
414                QosHistoryPolicy::KeepLast => 0,
415                QosHistoryPolicy::KeepAll => 1,
416            },
417            liveliness_kind: qos.liveliness_kind as u8,
418            // QosSettings::depth is u32; clamp to u16 max. Embedded
419            // ROS queue depths are typically 1–100; oversize values
420            // are saturated at 65 535 rather than wrapped.
421            depth: qos.depth.min(u16::MAX as u32) as u16,
422            _reserved0: 0,
423            deadline_ms: qos.deadline_ms,
424            lifespan_ms: qos.lifespan_ms,
425            liveliness_lease_ms: qos.liveliness_lease_ms,
426            avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions as u8,
427            _reserved1: [0; 3],
428            rx_buffer_hint: 0,
429        }
430    }
431}
432
433// ============================================================================
434// Vtable type (mirrors C header)
435// ============================================================================
436
437/// C function table for an RMW backend.
438///
439/// Mirrors `nros_rmw_vtable_t` from `<nros/rmw_vtable.h>`. Phase 102.4
440/// signatures: every entity entry point takes a typed-struct pointer
441/// instead of `void *`; every status-only return is `nros_rmw_ret_t`
442/// (typedef of `i32`); byte-count returns stay `i32` (positive bytes,
443/// negative `nros_rmw_ret_t`).
444#[repr(C)]
445pub struct NrosRmwVtable {
446    // ---- Session lifecycle ----
447    pub open: unsafe extern "C" fn(
448        locator: *const u8,
449        mode: u8,
450        domain_id: u32,
451        node_name: *const u8,
452        out: *mut NrosRmwSession,
453    ) -> NrosRmwRet,
454    pub close: unsafe extern "C" fn(session: *mut NrosRmwSession) -> NrosRmwRet,
455    pub drive_io: unsafe extern "C" fn(session: *mut NrosRmwSession, timeout_ms: i32) -> NrosRmwRet,
456
457    // ---- Publisher ----
458    pub create_publisher: unsafe extern "C" fn(
459        session: *mut NrosRmwSession,
460        topic_name: *const u8,
461        type_name: *const u8,
462        type_hash: *const u8,
463        domain_id: u32,
464        qos: *const NrosRmwQos,
465        out: *mut NrosRmwPublisher,
466    ) -> NrosRmwRet,
467    pub destroy_publisher: unsafe extern "C" fn(publisher: *mut NrosRmwPublisher),
468    pub publish_raw: unsafe extern "C" fn(
469        publisher: *mut NrosRmwPublisher,
470        data: *const u8,
471        len: usize,
472    ) -> NrosRmwRet,
473
474    // ---- Subscriber ----
475    pub create_subscriber: unsafe extern "C" fn(
476        session: *mut NrosRmwSession,
477        topic_name: *const u8,
478        type_name: *const u8,
479        type_hash: *const u8,
480        domain_id: u32,
481        qos: *const NrosRmwQos,
482        out: *mut NrosRmwSubscriber,
483    ) -> NrosRmwRet,
484    pub destroy_subscriber: unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber),
485    pub try_recv_raw: unsafe extern "C" fn(
486        subscriber: *mut NrosRmwSubscriber,
487        buf: *mut u8,
488        buf_len: usize,
489    ) -> i32,
490    pub has_data: unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber) -> i32,
491
492    // ---- Service Server ----
493    // Phase 193.1b — `qos` applies to both request + reply endpoints.
494    pub create_service_server: unsafe extern "C" fn(
495        session: *mut NrosRmwSession,
496        service_name: *const u8,
497        type_name: *const u8,
498        type_hash: *const u8,
499        domain_id: u32,
500        qos: *const NrosRmwQos,
501        out: *mut NrosRmwServiceServer,
502    ) -> NrosRmwRet,
503    pub destroy_service_server: unsafe extern "C" fn(server: *mut NrosRmwServiceServer),
504    pub try_recv_request: unsafe extern "C" fn(
505        server: *mut NrosRmwServiceServer,
506        buf: *mut u8,
507        buf_len: usize,
508        seq_out: *mut i64,
509    ) -> i32,
510    pub has_request: unsafe extern "C" fn(server: *mut NrosRmwServiceServer) -> i32,
511    pub send_reply: unsafe extern "C" fn(
512        server: *mut NrosRmwServiceServer,
513        seq: i64,
514        data: *const u8,
515        len: usize,
516    ) -> NrosRmwRet,
517
518    // ---- Service Client ----
519    pub create_service_client: unsafe extern "C" fn(
520        session: *mut NrosRmwSession,
521        service_name: *const u8,
522        type_name: *const u8,
523        type_hash: *const u8,
524        domain_id: u32,
525        qos: *const NrosRmwQos,
526        out: *mut NrosRmwServiceClient,
527    ) -> NrosRmwRet,
528    pub destroy_service_client: unsafe extern "C" fn(client: *mut NrosRmwServiceClient),
529    pub call_raw: unsafe extern "C" fn(
530        client: *mut NrosRmwServiceClient,
531        request: *const u8,
532        req_len: usize,
533        reply_buf: *mut u8,
534        reply_buf_len: usize,
535    ) -> i32,
536
537    // ---- Phase 130.4 — non-blocking send/recv split (optional) ----
538    pub send_request_raw: Option<
539        unsafe extern "C" fn(
540            client: *mut NrosRmwServiceClient,
541            request: *const u8,
542            req_len: usize,
543        ) -> NrosRmwRet,
544    >,
545    pub try_recv_reply_raw: Option<
546        unsafe extern "C" fn(
547            client: *mut NrosRmwServiceClient,
548            reply_buf: *mut u8,
549            reply_buf_len: usize,
550        ) -> i32,
551    >,
552
553    // ---- Phase 108 — status events (optional) ----
554    pub register_subscriber_event: unsafe extern "C" fn(
555        subscriber: *mut NrosRmwSubscriber,
556        kind: NrosRmwEventKind,
557        deadline_ms: u32,
558        cb: NrosRmwEventCallback,
559        user_context: *mut c_void,
560    ) -> NrosRmwRet,
561
562    pub register_publisher_event: unsafe extern "C" fn(
563        publisher: *mut NrosRmwPublisher,
564        kind: NrosRmwEventKind,
565        deadline_ms: u32,
566        cb: NrosRmwEventCallback,
567        user_context: *mut c_void,
568    ) -> NrosRmwRet,
569
570    // ---- Phase 108.B — manual liveliness assertion (optional) ----
571    pub assert_publisher_liveliness:
572        unsafe extern "C" fn(publisher: *mut NrosRmwPublisher) -> NrosRmwRet,
573
574    // ---- Phase 110.0 — backend's next internal-event deadline ----
575    /// Returns next deadline in ms (≥ 0) or a negative value for
576    /// "no deadline". NULL function pointer = treat as no deadline.
577    pub next_deadline_ms: Option<unsafe extern "C" fn(session: *const NrosRmwSession) -> i32>,
578
579    /// Phase 124.B.1 — executor wake callback. Backend stores
580    /// `(cb, ctx)` and invokes `cb(ctx)` on async wake. The
581    /// runtime-supplied `cb` does flag-write + condvar-signal
582    /// atomically, giving sub-poll-period wake latency for spin
583    /// loops blocked on the executor's wake condvar.
584    ///
585    /// NULL fn pointer = backend has no async wake path (poll-only:
586    /// XRCE, bare-metal). The runtime still drains the session on
587    /// its deadline-bound cv-wait boundary.
588    pub set_wake_callback: Option<
589        unsafe extern "C" fn(
590            session: *mut NrosRmwSession,
591            cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
592            ctx: *mut core::ffi::c_void,
593        ) -> NrosRmwRet,
594    >,
595
596    // ---- Phase 124.A — zero-copy publisher loan ----
597    /// Reserve a writable slot of at least `requested_len` bytes in
598    /// the backend's outbound buffer. NULL = arena fallback. See the
599    /// C header for the full semantics + lifetime contract.
600    pub pub_loan: Option<
601        unsafe extern "C" fn(
602            publisher: *mut NrosRmwPublisher,
603            requested_len: usize,
604            out_buf: *mut *mut u8,
605            out_cap: *mut usize,
606            out_token: *mut *mut core::ffi::c_void,
607        ) -> NrosRmwRet,
608    >,
609    /// Commit a previously loaned slot. NULL = paired with NULL
610    /// `pub_loan`.
611    pub pub_commit: Option<
612        unsafe extern "C" fn(
613            publisher: *mut NrosRmwPublisher,
614            token: *mut core::ffi::c_void,
615            actual_len: usize,
616        ) -> NrosRmwRet,
617    >,
618    /// Abandon a previously loaned slot. NULL = paired with NULL
619    /// `pub_loan`.
620    pub pub_discard: Option<
621        unsafe extern "C" fn(publisher: *mut NrosRmwPublisher, token: *mut core::ffi::c_void),
622    >,
623
624    // ---- Phase 124.A — zero-copy subscriber borrow ----
625    /// Borrow the next message in place. Returns length (≥ 0) or a
626    /// negative error code. NULL = staging-buffer fallback via
627    /// `try_recv_raw`.
628    pub sub_borrow: Option<
629        unsafe extern "C" fn(
630            subscriber: *mut NrosRmwSubscriber,
631            out_buf: *mut *const u8,
632            out_len: *mut usize,
633            out_token: *mut *mut core::ffi::c_void,
634        ) -> i32,
635    >,
636    /// Release a previously borrowed view. NULL = paired with NULL
637    /// `sub_borrow`.
638    pub sub_release: Option<
639        unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber, token: *mut core::ffi::c_void),
640    >,
641
642    // ---- Phase 124.C.1 — service-server availability probe ----
643    /// Returns `1` if ≥ 1 matching server has been discovered on the
644    /// RMW graph, `0` if none yet, or a negative `NrosRmwRet`
645    /// constant on backend error. Clients use this to gate the first
646    /// `call_raw` so a startup-ordering race doesn't surface as a
647    /// request-side timeout.
648    ///
649    /// NULL fn pointer = backend cannot answer; the runtime maps the
650    /// missing slot to `NROS_RMW_RET_UNSUPPORTED`.
651    pub service_server_available:
652        Option<unsafe extern "C" fn(client: *mut NrosRmwServiceClient) -> i32>,
653
654    // ---- Phase 124.D.1 — burst-take ----
655    /// Drains up to `max_msgs` queued messages into a contiguous
656    /// caller buffer in a single backend call. The i-th delivered
657    /// message lives at `buf + i * per_msg_cap` and has length
658    /// `out_lens[i]`. Returns the message count (≥ 0) or a negative
659    /// `NrosRmwRet` error code; partial drains MUST report the
660    /// count, never error out.
661    ///
662    /// NULL fn pointer = backend doesn't batch; the runtime falls
663    /// back to a `try_recv_raw` loop in
664    /// `CffiSubscriber::try_recv_sequence` so user code can commit
665    /// to the batched API regardless of backend support.
666    pub try_recv_sequence: Option<
667        unsafe extern "C" fn(
668            subscriber: *mut NrosRmwSubscriber,
669            buf: *mut u8,
670            per_msg_cap: usize,
671            max_msgs: usize,
672            out_lens: *mut usize,
673        ) -> i32,
674    >,
675
676    // ---- Phase 124.E.1 — streamed publish ----
677    /// Caller hands the backend two callbacks. The backend invokes
678    /// `size_cb` once to learn the total payload length, then
679    /// `chunk_cb` repeatedly to fill the slot in chunks. Lets big
680    /// messages skip a per-publisher staging buffer on RAM-
681    /// constrained nodes.
682    ///
683    /// NULL fn pointer = backend doesn't stream; the runtime falls
684    /// back to a stack staging buffer (capped at the configured
685    /// `NROS_MAX_STREAM_CHUNK`) + `publish_raw` so user code can
686    /// commit to the streamed API regardless of backend support.
687    pub publish_streamed: Option<
688        unsafe extern "C" fn(
689            publisher: *mut NrosRmwPublisher,
690            size_cb: unsafe extern "C" fn(
691                out_total_len: *mut usize,
692                user_ctx: *mut core::ffi::c_void,
693            ),
694            chunk_cb: unsafe extern "C" fn(
695                out_buf: *mut u8,
696                cap: usize,
697                out_written: *mut usize,
698                user_ctx: *mut core::ffi::c_void,
699            ),
700            user_ctx: *mut core::ffi::c_void,
701        ) -> NrosRmwRet,
702    >,
703
704    // ---- Phase 124.F.1 — session-level connectivity probe ----
705    /// Wire-level round-trip "is the peer / agent / router still
706    /// reachable?" probe. Cheaper than the service-availability
707    /// probe — no discovery state required.
708    ///
709    /// Returns `NROS_RMW_RET_OK` on reply within `timeout_ms`,
710    /// `NROS_RMW_RET_TIMEOUT` on no reply, or
711    /// `NROS_RMW_RET_UNSUPPORTED` when the backend can't probe.
712    /// NULL slot = runtime surfaces `Unsupported` to the caller.
713    pub ping_session:
714        Option<unsafe extern "C" fn(session: *mut NrosRmwSession, timeout_ms: i32) -> NrosRmwRet>,
715
716    // ---- Phase 231 (RFC-0038) — zero-copy in-place subscription take ----
717    /// Capability query: does this subscriber support
718    /// [`process_raw_in_place`](Self::process_raw_in_place)? Returns `1` if yes,
719    /// `0` if no. The executor consults this at registration to pick the in-place
720    /// arena dispatch over the buffered one. NULL slot = treated as unsupported.
721    pub subscriber_supports_in_place:
722        Option<unsafe extern "C" fn(subscriber: *mut NrosRmwSubscriber) -> i32>,
723
724    /// Borrow one ready message in place and hand its raw CDR bytes to `cb`
725    /// (along with the opaque `ctx`) for the duration of the call, then release
726    /// the slot — no copy into a caller buffer. Returns `1` if a message was
727    /// processed (`cb` invoked), `NROS_RMW_RET_NO_DATA` if none was ready, or a
728    /// negative error. `cb` MUST NOT re-enter this subscriber's receive. NULL
729    /// slot = unsupported (the runtime uses the buffered path).
730    pub process_raw_in_place: Option<
731        unsafe extern "C" fn(
732            subscriber: *mut NrosRmwSubscriber,
733            ctx: *mut core::ffi::c_void,
734            cb: unsafe extern "C" fn(ctx: *mut core::ffi::c_void, ptr: *const u8, len: usize),
735        ) -> i32,
736    >,
737}
738
739// ============================================================================
740// Phase 108 — status-event types (mirror `<nros/rmw_event.h>`)
741// ============================================================================
742
743/// Tier-1 event kinds. Stable u8 values matching
744/// `nros_rmw_event_kind_t` in the C header.
745#[repr(u8)]
746#[derive(Debug, Clone, Copy, PartialEq, Eq)]
747pub enum NrosRmwEventKind {
748    LivelinessChanged = 0,
749    RequestedDeadlineMissed = 1,
750    MessageLost = 2,
751    LivelinessLost = 3,
752    OfferedDeadlineMissed = 4,
753}
754
755impl From<nros_rmw::EventKind> for NrosRmwEventKind {
756    fn from(k: nros_rmw::EventKind) -> Self {
757        use nros_rmw::EventKind as K;
758        match k {
759            K::LivelinessChanged => NrosRmwEventKind::LivelinessChanged,
760            K::RequestedDeadlineMissed => NrosRmwEventKind::RequestedDeadlineMissed,
761            K::MessageLost => NrosRmwEventKind::MessageLost,
762            K::LivelinessLost => NrosRmwEventKind::LivelinessLost,
763            K::OfferedDeadlineMissed => NrosRmwEventKind::OfferedDeadlineMissed,
764            _ => NrosRmwEventKind::MessageLost, // unreachable for now (#[non_exhaustive])
765        }
766    }
767}
768
769/// Liveliness payload mirror.
770#[repr(C)]
771#[derive(Debug, Clone, Copy, Default)]
772pub struct NrosRmwLivelinessChangedStatus {
773    pub alive_count: u16,
774    pub not_alive_count: u16,
775    pub alive_count_change: i16,
776    pub not_alive_count_change: i16,
777}
778
779/// Count payload mirror.
780#[repr(C)]
781#[derive(Debug, Clone, Copy, Default)]
782pub struct NrosRmwCountStatus {
783    pub total_count: u32,
784    pub total_count_change: u32,
785}
786
787/// Borrow-shaped payload union mirror. C-side ABI — runtime-checked
788/// kind tag selects which member is valid.
789#[repr(C)]
790pub union NrosRmwEventPayload {
791    pub liveliness_changed: NrosRmwLivelinessChangedStatus,
792    pub count: NrosRmwCountStatus,
793}
794
795/// C callback signature. Matches `nros_rmw_event_callback_t`.
796pub type NrosRmwEventCallback = unsafe extern "C" fn(
797    kind: NrosRmwEventKind,
798    payload: *const NrosRmwEventPayload,
799    user_context: *mut c_void,
800);
801
802// ============================================================================
803// Registration
804// ============================================================================
805//
806// Phase 104.B.2 — named registry replaces the singleton vtable.
807// Backends register under a stable identifier (`"zenoh"`, `"dds"`,
808// `"xrce"`, future `"uorb"`, `"cyclonedds"`); consumers look up
809// vtables by name via `nros_rmw_cffi_lookup`. Multiple backends can
810// coexist in the same process (bridge nodes).
811//
812// Capacity comes from the `NROS_RMW_MAX_BACKENDS` build-time env
813// var (default 8). See `build.rs`.
814//
815// Implementation: a fixed-size `[BackendSlot; MAX_BACKENDS]`
816// guarded by an atomic length counter. No alloc; `no_std`
817// compatible. Slot scan is O(N) for lookup but N is tiny (8 by
818// default). Each slot owns its name buffer; `name_ptr` returned
819// to consumers points into the slot and stays valid for the
820// program's lifetime.
821
822/// Compile-time max number of concurrently registered backends.
823/// Set via `NROS_RMW_MAX_BACKENDS` env var at build time
824/// (`build.rs`). Default 8.
825pub const MAX_BACKENDS: usize = parse_max_backends(env!("NROS_RMW_MAX_BACKENDS"));
826
827const fn parse_max_backends(s: &str) -> usize {
828    let bytes = s.as_bytes();
829    let mut i = 0usize;
830    let mut acc: usize = 0;
831    while i < bytes.len() {
832        let d = bytes[i];
833        assert!(
834            d.is_ascii_digit(),
835            "NROS_RMW_MAX_BACKENDS must be a decimal integer"
836        );
837        acc = acc * 10 + (d - b'0') as usize;
838        i += 1;
839    }
840    acc
841}
842
843/// Maximum length of a backend name. Names are short ASCII
844/// identifiers (`"zenoh"`, `"cyclonedds"`); 32 bytes is generous.
845const BACKEND_NAME_MAX: usize = 32;
846
847#[repr(C)]
848struct BackendSlot {
849    /// Null-terminated UTF-8 backend name. Zero-initialized when
850    /// unused (`name[0] == 0`).
851    name: [u8; BACKEND_NAME_MAX],
852    vtable: *const NrosRmwVtable,
853}
854
855impl BackendSlot {
856    const fn empty() -> Self {
857        Self {
858            name: [0u8; BACKEND_NAME_MAX],
859            vtable: core::ptr::null(),
860        }
861    }
862
863    #[inline]
864    fn is_empty(&self) -> bool {
865        self.name[0] == 0
866    }
867
868    #[inline]
869    fn name_matches(&self, candidate: &[u8]) -> bool {
870        if self.is_empty() {
871            return false;
872        }
873        // Compare up to the first NUL or candidate length.
874        let mut i = 0usize;
875        while i < self.name.len() && i < candidate.len() {
876            if self.name[i] == 0 {
877                return false; // slot name shorter than candidate
878            }
879            if self.name[i] != candidate[i] {
880                return false;
881            }
882            i += 1;
883        }
884        // candidate fully consumed; slot must be NUL at i (same length)
885        i == candidate.len() && (i == self.name.len() || self.name[i] == 0)
886    }
887}
888
889// SAFETY: `BackendSlot::vtable` is a `*const` pointer used in a
890// `'static` context; once written it's never freed and the registry
891// is guarded by an atomic length counter for publication. Marker
892// trait implementations are required so the static array is
893// `Sync` across threads.
894unsafe impl Sync for BackendSlot {}
895
896/// Fixed-size registry. `slots[0..len]` are live; `slots[len..]`
897/// are zero-initialized. `len` is the publication fence.
898///
899/// `slots` lives in an `UnsafeCell` because we mutate through
900/// `&'static REGISTRY`. Safety invariants:
901/// * Slot writes happen only inside `nros_rmw_cffi_register_named`,
902///   which is documented "call before `Executor::open`" — backend
903///   ctors fire pre-main, manual calls precede session creation.
904/// * Slot reads via `nros_rmw_cffi_lookup` and `get_vtable` happen
905///   after `Executor::open`, well after registration completes.
906/// * The atomic `len` provides the release-acquire fence so any
907///   reader that sees `len = N` also sees the populated slot
908///   contents for indices `< N`.
909#[doc(hidden)]
910pub struct Registry {
911    slots: core::cell::UnsafeCell<[BackendSlot; MAX_BACKENDS]>,
912    len: portable_atomic::AtomicUsize,
913}
914
915impl Registry {
916    #[doc(hidden)]
917    pub const fn new() -> Self {
918        let slots = {
919            #[allow(clippy::declare_interior_mutable_const)]
920            const E: BackendSlot = BackendSlot::empty();
921            [E; MAX_BACKENDS]
922        };
923        Self {
924            slots: core::cell::UnsafeCell::new(slots),
925            len: portable_atomic::AtomicUsize::new(0),
926        }
927    }
928
929    /// Borrow slot `i` immutably. Caller must guarantee
930    /// `i < self.len.load(Acquire)`.
931    #[inline]
932    unsafe fn slot(&self, i: usize) -> &BackendSlot {
933        // SAFETY: registry protocol guarantees slot stability once
934        // published via the atomic len fence.
935        unsafe { &(*self.slots.get())[i] }
936    }
937
938    /// Borrow slot `i` mutably. Caller must guarantee exclusive
939    /// access — either pre-publication (idx > current `len`) or
940    /// during an idempotent overwrite of an already-registered name.
941    #[inline]
942    #[allow(clippy::mut_from_ref)]
943    unsafe fn slot_mut(&self, i: usize) -> &mut BackendSlot {
944        // SAFETY: see Registry doc — writer-side discipline.
945        unsafe { &mut (*self.slots.get())[i] }
946    }
947}
948
949// SAFETY: see `Registry` doc-comment on the mutation protocol.
950unsafe impl Sync for Registry {}
951
952// Phase 241.D3-rev — `REGISTRY` is DEFINED once in this rlib (plain
953// `#[no_mangle]`). The single-runtime model puts exactly one Rust staticlib in any
954// link (the umbrella `nros-c` / `nros-cpp` bundles the backend as an rlib), so the
955// cffi rlib appears once and one strong definition is correct everywhere: pure-Rust
956// firmware, the NuttX build-std ELF, and the umbrella C/C++ staticlib alike. This
957// supersedes the slice-4 `external-registry`/provider split, which existed only
958// because the C/C++ link used to carry multiple Rust staticlibs.
959#[unsafe(no_mangle)]
960static REGISTRY: Registry = Registry::new();
961
962/// The single process-wide backend registry.
963#[inline]
964fn registry() -> &'static Registry {
965    &REGISTRY
966}
967
968// ============================================================================
969// Rust-adapter MessageInfo side channel
970// ============================================================================
971//
972// The stable C subscriber ABI returns only a `(payload, len)` pair from
973// `try_recv_raw`. Rust backends can produce `MessageInfo`, so the generic
974// Rust->C adapter stores that metadata keyed by the backend handle pointer
975// immediately before returning the payload length. The Rust CFFI subscriber
976// consumes it after the vtable call. Pure C/C++ backends never write this table
977// and keep the documented `None` metadata behavior.
978
979const MESSAGE_INFO_SLOTS: usize = 64;
980
981struct MessageInfoSlot {
982    key: portable_atomic::AtomicUsize,
983    valid: portable_atomic::AtomicBool,
984    info: UnsafeCell<MessageInfo>,
985    #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
986    validate_requested: portable_atomic::AtomicBool,
987    #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
988    integrity_valid: portable_atomic::AtomicBool,
989    #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
990    integrity: UnsafeCell<nros_rmw::IntegrityStatus>,
991}
992
993impl MessageInfoSlot {
994    const fn empty() -> Self {
995        Self {
996            key: portable_atomic::AtomicUsize::new(0),
997            valid: portable_atomic::AtomicBool::new(false),
998            info: UnsafeCell::new(MessageInfo::new()),
999            #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1000            validate_requested: portable_atomic::AtomicBool::new(false),
1001            #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1002            integrity_valid: portable_atomic::AtomicBool::new(false),
1003            #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1004            integrity: UnsafeCell::new(nros_rmw::IntegrityStatus {
1005                gap: 0,
1006                duplicate: false,
1007                crc_valid: None,
1008            }),
1009        }
1010    }
1011}
1012
1013// SAFETY: each slot is published by `key` and `valid` atomics. Writers store
1014// `info` before setting `valid = true` with Release ordering; readers take
1015// `valid` with AcqRel before copying the `MessageInfo`.
1016unsafe impl Sync for MessageInfoSlot {}
1017
1018static MESSAGE_INFO_TABLE: [MessageInfoSlot; MESSAGE_INFO_SLOTS] = {
1019    #[allow(clippy::declare_interior_mutable_const)]
1020    const E: MessageInfoSlot = MessageInfoSlot::empty();
1021    [E; MESSAGE_INFO_SLOTS]
1022};
1023
1024fn lookup_message_info_slot(key: usize) -> Option<&'static MessageInfoSlot> {
1025    if key == 0 {
1026        return None;
1027    }
1028    MESSAGE_INFO_TABLE
1029        .iter()
1030        .find(|slot| slot.key.load(Ordering::Acquire) == key)
1031}
1032
1033#[cfg(feature = "alloc")]
1034fn get_or_insert_message_info_slot(key: usize) -> Option<&'static MessageInfoSlot> {
1035    if key == 0 {
1036        return None;
1037    }
1038    for slot in &MESSAGE_INFO_TABLE {
1039        let current = slot.key.load(Ordering::Acquire);
1040        if current == key {
1041            return Some(slot);
1042        }
1043        if current == 0
1044            && slot
1045                .key
1046                .compare_exchange(0, key, Ordering::AcqRel, Ordering::Acquire)
1047                .is_ok()
1048        {
1049            return Some(slot);
1050        }
1051    }
1052    None
1053}
1054
1055#[cfg(feature = "alloc")]
1056pub(crate) fn store_cffi_message_info(key: usize, info: Option<MessageInfo>) {
1057    let Some(slot) = get_or_insert_message_info_slot(key) else {
1058        return;
1059    };
1060    match info {
1061        Some(info) => {
1062            // SAFETY: this slot is keyed to one subscriber backend handle. The
1063            // executor owns each subscriber mutably while receiving, so writes
1064            // for the same key are serialized.
1065            unsafe {
1066                *slot.info.get() = info;
1067            }
1068            slot.valid.store(true, Ordering::Release);
1069        }
1070        None => slot.valid.store(false, Ordering::Release),
1071    }
1072}
1073
1074fn take_cffi_message_info(key: usize) -> Option<MessageInfo> {
1075    let slot = lookup_message_info_slot(key)?;
1076    if !slot.valid.swap(false, Ordering::AcqRel) {
1077        return None;
1078    }
1079    // SAFETY: `valid.swap(false)` gives this reader exclusive consumption of the
1080    // last stored `MessageInfo` for this key.
1081    Some(unsafe { *slot.info.get() })
1082}
1083
1084#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1085fn request_cffi_integrity_status(key: usize) {
1086    let Some(slot) = get_or_insert_message_info_slot(key) else {
1087        return;
1088    };
1089    slot.integrity_valid.store(false, Ordering::Release);
1090    slot.validate_requested.store(true, Ordering::Release);
1091}
1092
1093#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1094pub(crate) fn take_cffi_integrity_request(key: usize) -> bool {
1095    lookup_message_info_slot(key)
1096        .map(|slot| slot.validate_requested.swap(false, Ordering::AcqRel))
1097        .unwrap_or(false)
1098}
1099
1100#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1101pub(crate) fn store_cffi_integrity_status(key: usize, status: nros_rmw::IntegrityStatus) {
1102    let Some(slot) = get_or_insert_message_info_slot(key) else {
1103        return;
1104    };
1105    // SAFETY: integrity status follows the same per-subscriber handoff as
1106    // `info`; the CFFI subscriber owns receive calls mutably for this key.
1107    unsafe {
1108        *slot.integrity.get() = status;
1109    }
1110    slot.integrity_valid.store(true, Ordering::Release);
1111}
1112
1113#[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1114fn take_cffi_integrity_status(key: usize) -> Option<nros_rmw::IntegrityStatus> {
1115    let slot = lookup_message_info_slot(key)?;
1116    if !slot.integrity_valid.swap(false, Ordering::AcqRel) {
1117        return None;
1118    }
1119    Some(unsafe { *slot.integrity.get() })
1120}
1121
1122fn clear_cffi_message_info(key: usize) {
1123    let Some(slot) = lookup_message_info_slot(key) else {
1124        return;
1125    };
1126    slot.valid.store(false, Ordering::Release);
1127    #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
1128    {
1129        slot.validate_requested.store(false, Ordering::Release);
1130        slot.integrity_valid.store(false, Ordering::Release);
1131    }
1132    slot.key.store(0, Ordering::Release);
1133}
1134
1135/// Register a custom RMW backend vtable (legacy single-arg form).
1136///
1137/// Phase 104.B.2 — internally forwards to
1138/// [`nros_rmw_cffi_register_named`] with the literal name `"default"`.
1139/// Preserved as a one-release source-compat shim so backend ctors
1140/// authored before the named-registry switchover keep working.
1141///
1142/// **Deprecated (Phase 128.B.5).** All in-tree callers now use
1143/// [`nros_rmw_cffi_register_named`] directly so the registry slot is
1144/// keyed by the backend's canonical name (`"zenoh"`, `"dds"`,
1145/// `"xrce"`, `"cyclonedds"`, …). New backends MUST follow the same
1146/// pattern; the unnamed shim will be removed in a follow-up phase
1147/// once external callers have migrated.
1148///
1149/// # Safety
1150///
1151/// The vtable pointer must remain valid for the lifetime of the program.
1152/// All function pointers in the vtable must be valid.
1153#[deprecated(
1154    since = "0.2.0",
1155    note = "use nros_rmw_cffi_register_named with the backend's canonical name; the unnamed shim will be removed"
1156)]
1157#[unsafe(no_mangle)]
1158pub unsafe extern "C" fn nros_rmw_cffi_register(vtable: *const NrosRmwVtable) -> NrosRmwRet {
1159    unsafe { nros_rmw_cffi_register_named(c"default".as_ptr(), vtable) }
1160}
1161
1162/// Register a backend under a stable name. Multiple backends can
1163/// coexist; consumers select via [`nros_rmw_cffi_lookup`] or the
1164/// higher-level `Executor::node_builder(...).rmw(...)` path.
1165///
1166/// Names must be UTF-8, NUL-terminated, ≤ 31 bytes (excluding NUL).
1167/// Reserved names today: `"zenoh"`, `"dds"`, `"xrce"`,
1168/// `"cyclonedds"`, future `"uorb"`. The string `"default"` is the
1169/// implicit name used by the legacy single-arg
1170/// [`nros_rmw_cffi_register`] shim.
1171///
1172/// Returns:
1173/// * `NROS_RMW_RET_OK` on success.
1174/// * `NROS_RMW_RET_INVALID_ARGUMENT` if `name` / `vtable` is
1175///   NULL, the name is empty, or exceeds 31 bytes.
1176/// * `NROS_RMW_RET_ERROR` if the registry is full
1177///   (`MAX_BACKENDS` reached without a matching entry).
1178///
1179/// Duplicate registration of the same name overwrites the
1180/// previous vtable (idempotent for ctor-fires-twice cases).
1181///
1182/// # Safety
1183///
1184/// * `name` must be a valid NUL-terminated UTF-8 string.
1185/// * `vtable` must remain valid for the program's lifetime.
1186#[unsafe(no_mangle)]
1187pub unsafe extern "C" fn nros_rmw_cffi_register_named(
1188    name: *const core::ffi::c_char,
1189    vtable: *const NrosRmwVtable,
1190) -> NrosRmwRet {
1191    if name.is_null() || vtable.is_null() {
1192        return NROS_RMW_RET_INVALID_ARGUMENT;
1193    }
1194    let name_u8 = name.cast::<u8>();
1195
1196    // Length-check the input. We scan up to BACKEND_NAME_MAX + 1
1197    // bytes; anything longer is rejected.
1198    let mut len = 0usize;
1199    while len < BACKEND_NAME_MAX {
1200        let b = unsafe { *name_u8.add(len) };
1201        if b == 0 {
1202            break;
1203        }
1204        len += 1;
1205    }
1206    if len == 0 {
1207        return NROS_RMW_RET_INVALID_ARGUMENT;
1208    }
1209    // Must have found a NUL within BACKEND_NAME_MAX.
1210    if unsafe { *name_u8.add(len) } != 0 {
1211        return NROS_RMW_RET_INVALID_ARGUMENT;
1212    }
1213
1214    let name_bytes = unsafe { core::slice::from_raw_parts(name_u8, len) };
1215
1216    // First pass: look for existing entry with same name → overwrite.
1217    let current_len = registry().len.load(Ordering::Acquire);
1218    for i in 0..current_len {
1219        // SAFETY: i < current_len, indices in bounds.
1220        let slot = unsafe { registry().slot(i) };
1221        if slot.name_matches(name_bytes) {
1222            // SAFETY: writer-side idempotent overwrite. The slot is
1223            // already published; concurrent readers will see either
1224            // the old or new vtable consistently, both valid.
1225            unsafe {
1226                let slot_mut = registry().slot_mut(i);
1227                slot_mut.vtable = vtable;
1228            }
1229            core::sync::atomic::fence(Ordering::Release);
1230            return NROS_RMW_RET_OK;
1231        }
1232    }
1233
1234    // No existing entry; append. Reserve a slot via atomic increment.
1235    let idx = registry().len.fetch_add(1, Ordering::AcqRel);
1236    if idx >= MAX_BACKENDS {
1237        // Roll back the increment so subsequent registers don't see a
1238        // stale `len > MAX_BACKENDS`. (Race window negligible — once
1239        // we hit capacity, no further append succeeds.)
1240        registry().len.store(MAX_BACKENDS, Ordering::Release);
1241        return NROS_RMW_RET_ERROR;
1242    }
1243
1244    // SAFETY: idx < MAX_BACKENDS, mutating an as-yet-unpublished slot.
1245    unsafe {
1246        let slot = registry().slot_mut(idx);
1247        slot.name[..len].copy_from_slice(name_bytes);
1248        slot.name[len] = 0;
1249        slot.vtable = vtable;
1250    }
1251    // Release-fence so concurrent lookups see both the name and the
1252    // vtable consistently with the updated `len`.
1253    core::sync::atomic::fence(Ordering::Release);
1254    NROS_RMW_RET_OK
1255}
1256
1257/// Look up a backend's vtable by name. Returns NULL if no backend
1258/// is registered under `name`.
1259///
1260/// # Safety
1261///
1262/// * `name` must be a valid NUL-terminated UTF-8 string.
1263#[unsafe(no_mangle)]
1264pub unsafe extern "C" fn nros_rmw_cffi_lookup(
1265    name: *const core::ffi::c_char,
1266) -> *const NrosRmwVtable {
1267    if name.is_null() {
1268        return core::ptr::null();
1269    }
1270    let name_u8 = name.cast::<u8>();
1271    let mut len = 0usize;
1272    while len < BACKEND_NAME_MAX {
1273        if unsafe { *name_u8.add(len) } == 0 {
1274            break;
1275        }
1276        len += 1;
1277    }
1278    if len == 0 || len == BACKEND_NAME_MAX {
1279        return core::ptr::null();
1280    }
1281    let name_bytes = unsafe { core::slice::from_raw_parts(name_u8, len) };
1282
1283    let current_len = registry().len.load(Ordering::Acquire);
1284    for i in 0..current_len {
1285        // SAFETY: i < current_len, indices in bounds; publication
1286        // fence via the atomic-len Acquire load.
1287        let slot = unsafe { registry().slot(i) };
1288        if slot.name_matches(name_bytes) {
1289            return slot.vtable;
1290        }
1291    }
1292    core::ptr::null()
1293}
1294
1295/// Diagnostic helper — fills `buf` with pointers to up to `cap`
1296/// registered backend names. Returns the number of names available
1297/// (may exceed `cap`). Pointer-valid for the program's lifetime.
1298///
1299/// # Safety
1300///
1301/// * `buf` must either be NULL (when `cap == 0`) or point at writable
1302///   memory of at least `cap * sizeof(*const c_char)` bytes.
1303#[unsafe(no_mangle)]
1304pub unsafe extern "C" fn nros_rmw_cffi_registered_names(
1305    buf: *mut *const core::ffi::c_char,
1306    cap: usize,
1307) -> usize {
1308    let n = registry().len.load(Ordering::Acquire);
1309    if !buf.is_null() && cap > 0 {
1310        let limit = n.min(cap);
1311        for i in 0..limit {
1312            // SAFETY: i < limit <= cap, buf capacity guaranteed by caller.
1313            let slot = unsafe { registry().slot(i) };
1314            unsafe {
1315                buf.add(i)
1316                    .write(slot.name.as_ptr() as *const core::ffi::c_char)
1317            };
1318        }
1319    }
1320    n
1321}
1322
1323/// Phase 104.A — registry-presence probe. Returns `true` iff at
1324/// least one backend is registered. Used by `Executor::open` to
1325/// detect "user forgot to register a backend before opening the
1326/// session" and fail with a meaningful error.
1327#[inline]
1328pub fn backend_registered() -> bool {
1329    registry().len.load(Ordering::Acquire) > 0
1330}
1331
1332/// Phase 104.B — internal access to the registry for the Rust-side
1333/// adapter. `nros-node`'s `register_active_backend` removal already
1334/// switched to `backend_registered()` for the presence check; this
1335/// returns the vtable for any single-backend fast-path callers.
1336fn default_vtable() -> Option<&'static NrosRmwVtable> {
1337    let n = registry().len.load(Ordering::Acquire);
1338    if n == 0 {
1339        return None;
1340    }
1341    // SAFETY: index 0 < n, registry's len-Acquire fence orders the
1342    // slot read.
1343    let slot = unsafe { registry().slot(0) };
1344    if slot.vtable.is_null() {
1345        return None;
1346    }
1347    Some(unsafe { &*slot.vtable })
1348}
1349
1350/// Phase 128.A.3 — outcome of `resolve_backend`.
1351pub enum BackendResolution {
1352    /// Exactly one matching backend; use its vtable.
1353    Single(&'static NrosRmwVtable),
1354    /// No backend linked into the binary. Maps to
1355    /// [`NROS_RMW_RET_NO_BACKEND`].
1356    NoBackend,
1357    /// More than one backend linked and no selector given. Maps to
1358    /// [`NROS_RMW_RET_AMBIGUOUS_BACKEND`].
1359    Ambiguous,
1360    /// Selector did not match any registered backend. Maps to
1361    /// [`NROS_RMW_RET_UNKNOWN_BACKEND`].
1362    Unknown,
1363}
1364
1365/// Phase 128.A.3 — selection policy for the single-backend
1366/// `Executor::open` / `nros::init` path.
1367///
1368/// Algorithm:
1369///
1370/// 1. If `selector` is `Some(name)` (typically from `$NROS_RMW`),
1371///    look it up in the registry. Hit → [`BackendResolution::Single`];
1372///    miss → [`BackendResolution::Unknown`].
1373/// 2. Otherwise, if exactly one backend is registered, return it.
1374/// 3. Otherwise, if zero, [`BackendResolution::NoBackend`]; if more
1375///    than one, [`BackendResolution::Ambiguous`].
1376///
1377/// Callers convert the resolution to a [`NrosRmwRet`] via
1378/// [`backend_resolution_to_ret`].
1379///
1380/// Bridge consumers (`Executor::open_multi`) bypass this function and
1381/// call `nros_rmw_cffi_lookup` per spec instead.
1382pub fn resolve_backend(selector: Option<&[u8]>) -> BackendResolution {
1383    let n = registry().len.load(Ordering::Acquire);
1384    if let Some(name) = selector {
1385        let mut i = 0usize;
1386        while i < n {
1387            // SAFETY: i < n, registry len-Acquire fence orders the read.
1388            let slot = unsafe { registry().slot(i) };
1389            if slot.name_matches(name) {
1390                if slot.vtable.is_null() {
1391                    return BackendResolution::Unknown;
1392                }
1393                return BackendResolution::Single(unsafe { &*slot.vtable });
1394            }
1395            i += 1;
1396        }
1397        return BackendResolution::Unknown;
1398    }
1399    match n {
1400        0 => BackendResolution::NoBackend,
1401        1 => default_vtable()
1402            .map(BackendResolution::Single)
1403            .unwrap_or(BackendResolution::NoBackend),
1404        _ => BackendResolution::Ambiguous,
1405    }
1406}
1407
1408/// Phase 128.A.3 — map a [`BackendResolution`] to its canonical
1409/// [`NrosRmwRet`]. [`BackendResolution::Single`] is *not* an error and
1410/// returns [`NROS_RMW_RET_OK`]; callers needing the vtable should
1411/// pattern-match on the resolution itself.
1412pub fn backend_resolution_to_ret(res: &BackendResolution) -> NrosRmwRet {
1413    match res {
1414        BackendResolution::Single(_) => NROS_RMW_RET_OK,
1415        BackendResolution::NoBackend => NROS_RMW_RET_NO_BACKEND,
1416        BackendResolution::Ambiguous => NROS_RMW_RET_AMBIGUOUS_BACKEND,
1417        BackendResolution::Unknown => NROS_RMW_RET_UNKNOWN_BACKEND,
1418    }
1419}
1420
1421/// Phase 115.A.2 — C entry point for installing a custom transport.
1422///
1423/// Mirrors the Rust-side `nros_rmw::set_custom_transport(Some(...))`
1424/// (or `None` when `ops == NULL`) but returns the canonical
1425/// `nros_rmw_ret_t` codes so non-Rust consumers don't have to
1426/// reach into nros-c's higher-level error enum.
1427///
1428/// The struct's contents are copied internally; the caller may
1429/// stack-allocate. Pass `NULL` to clear the slot.
1430///
1431/// # Safety
1432///
1433/// `ops` must either be `NULL` or point at a valid
1434/// `nros_transport_ops_t` whose four fn pointers stay live for the
1435/// lifetime of the registration (i.e. until a subsequent
1436/// `nros_rmw_cffi_set_custom_transport(NULL)` or a replacement
1437/// install).
1438#[unsafe(no_mangle)]
1439pub unsafe extern "C" fn nros_rmw_cffi_set_custom_transport(
1440    ops: *const nros_rmw::NrosTransportOps,
1441) -> NrosRmwRet {
1442    if ops.is_null() {
1443        // Clear: ignore any error (None is always accepted).
1444        let _ = unsafe { nros_rmw::set_custom_transport(None) };
1445        return NROS_RMW_RET_OK;
1446    }
1447    let copy = unsafe { *ops };
1448    match unsafe { nros_rmw::set_custom_transport(Some(copy)) } {
1449        Ok(()) => NROS_RMW_RET_OK,
1450        Err(e) => ret_from_error(&e),
1451    }
1452}
1453
1454fn get_vtable() -> Result<&'static NrosRmwVtable, TransportError> {
1455    // Phase 104.B.2 — fast path: registry has exactly one backend.
1456    // Mirror the single-backend hot path the singleton-VTABLE
1457    // implementation had. Bridge / multi-backend users should call
1458    // a forthcoming `get_vtable_named` API (104.C work) instead.
1459    default_vtable().ok_or(TransportError::InvalidArgument)
1460}
1461
1462// ============================================================================
1463// Helper: null-terminated string on the stack
1464// ============================================================================
1465
1466/// Write a Rust `&str` as a null-terminated byte sequence into a fixed buffer.
1467/// Returns a pointer to the buffer start.
1468fn to_c_str<const N: usize>(s: &str, buf: &mut [u8; N]) -> *const u8 {
1469    let len = s.len().min(N - 1);
1470    buf[..len].copy_from_slice(&s.as_bytes()[..len]);
1471    buf[len] = 0;
1472    buf.as_ptr()
1473}
1474
1475/// Inverse of [`to_c_str`] — read a null-terminated byte buffer back
1476/// as a `&str`, stopping at the first NUL byte. Used by the
1477/// `topic_name()` / `type_name()` / `node_name()` accessors on the
1478/// `Cffi*` types so callers can introspect without round-tripping
1479/// through the vtable. Phase 102.5.
1480fn cstr_buf_to_str<const N: usize>(buf: &[u8; N]) -> &str {
1481    let len = buf.iter().position(|&b| b == 0).unwrap_or(N);
1482    // The buffers are written via `to_c_str` from a `&str`, so the
1483    // bytes between [..len] are guaranteed valid UTF-8. `from_utf8`
1484    // handles the (impossible) corruption case by returning empty.
1485    core::str::from_utf8(&buf[..len]).unwrap_or("")
1486}
1487
1488// ============================================================================
1489// CffiSession
1490// ============================================================================
1491//
1492// Storage discipline:
1493// * Each Cffi* struct owns null-terminated name buffers as inline
1494//   arrays. The C-side typed entity struct is rebuilt fresh on every
1495//   FFI call via `make_*_view`, so move-invalidation of pointers
1496//   into the buffer is impossible — the pointer always points to the
1497//   *current* address of the buffer, computed at call time.
1498// * The backend writes `backend_data` (and `can_loan_messages` for
1499//   pub/sub entities)
1500//   into the FFI view; we copy the writes back into the Cffi*
1501//   struct's fields after the call.
1502// * Strings ARE immutable for the entity's lifetime, so backends that
1503//   stash the topic_name pointer for diagnostics see stable storage
1504//   *as long as the Cffi* struct is not moved.* The Phase 102.4
1505//   contract is "do not move a Cffi* struct after construction" —
1506//   nano-ros embeds them inside the executor arena, which doesn't
1507//   relocate.
1508
1509const NAME_BUF_LEN: usize = 256;
1510const HASH_BUF_LEN: usize = 128;
1511
1512/// Session backed by a C vtable.
1513pub struct CffiSession {
1514    vtable: &'static NrosRmwVtable,
1515    /// Borrowed-pointer storage for `node_name`. Outlives the session.
1516    node_name_buf: [u8; NAME_BUF_LEN],
1517    /// Borrowed-pointer storage for `namespace_`. Empty for now —
1518    /// `RmwConfig` does not yet carry a namespace through the cffi
1519    /// path; reserved for future use.
1520    namespace_buf: [u8; NAME_BUF_LEN],
1521    /// Backend-private state, written by `vtable.open`.
1522    backend_data: *mut c_void,
1523}
1524
1525impl CffiSession {
1526    fn make_view(&mut self) -> NrosRmwSession {
1527        NrosRmwSession {
1528            node_name: self.node_name_buf.as_ptr(),
1529            namespace_: self.namespace_buf.as_ptr(),
1530            _reserved: [0u8; 8],
1531            backend_data: self.backend_data,
1532        }
1533    }
1534
1535    /// Node name passed at session-open time.
1536    pub fn node_name(&self) -> &str {
1537        cstr_buf_to_str(&self.node_name_buf)
1538    }
1539
1540    /// Open a new session via the **default** registered vtable
1541    /// (first entry in the registry — the RMW_IMPLEMENTATION-style
1542    /// fast path for single-backend builds).
1543    ///
1544    /// For explicit backend selection in multi-backend (bridge)
1545    /// binaries, use [`open_named`](Self::open_named).
1546    pub fn open(
1547        locator: &str,
1548        mode: u8,
1549        domain_id: u32,
1550        node_name: &str,
1551    ) -> Result<Self, TransportError> {
1552        let vtable = get_vtable()?;
1553        Self::open_with_vtable(vtable, locator, mode, domain_id, node_name)
1554    }
1555
1556    /// Phase 104.C.1 — open a new session against a named backend.
1557    /// Resolves `rmw_name` against the registry (Phase 104.B.2),
1558    /// returns `Err(TransportError::InvalidArgument)` if no backend
1559    /// is registered under that name.
1560    pub fn open_named(
1561        rmw_name: &str,
1562        locator: &str,
1563        mode: u8,
1564        domain_id: u32,
1565        node_name: &str,
1566    ) -> Result<Self, TransportError> {
1567        // C-string-marshal `rmw_name` on the stack — registry lookup
1568        // expects NUL-terminated UTF-8.
1569        let mut name_buf = [0u8; BACKEND_NAME_MAX];
1570        if rmw_name.len() >= BACKEND_NAME_MAX {
1571            return Err(TransportError::InvalidArgument);
1572        }
1573        name_buf[..rmw_name.len()].copy_from_slice(rmw_name.as_bytes());
1574        // name_buf[rmw_name.len()] is already 0.
1575        let raw = unsafe { nros_rmw_cffi_lookup(name_buf.as_ptr() as *const _) };
1576        if raw.is_null() {
1577            return Err(TransportError::InvalidArgument);
1578        }
1579        // SAFETY: registry-issued pointer; valid for the program's lifetime.
1580        let vtable = unsafe { &*raw };
1581        Self::open_with_vtable(vtable, locator, mode, domain_id, node_name)
1582    }
1583
1584    fn open_with_vtable(
1585        vtable: &'static NrosRmwVtable,
1586        locator: &str,
1587        mode: u8,
1588        domain_id: u32,
1589        node_name: &str,
1590    ) -> Result<Self, TransportError> {
1591        let mut loc_buf = [0u8; NAME_BUF_LEN];
1592        let loc_ptr = to_c_str(locator, &mut loc_buf);
1593
1594        let mut session = Self {
1595            vtable,
1596            node_name_buf: [0u8; NAME_BUF_LEN],
1597            namespace_buf: [0u8; NAME_BUF_LEN],
1598            backend_data: core::ptr::null_mut(),
1599        };
1600        let _ = to_c_str(node_name, &mut session.node_name_buf);
1601
1602        let mut view = NrosRmwSession {
1603            node_name: session.node_name_buf.as_ptr(),
1604            namespace_: session.namespace_buf.as_ptr(),
1605            _reserved: [0u8; 8],
1606            backend_data: core::ptr::null_mut(),
1607        };
1608        let ret = unsafe {
1609            (vtable.open)(
1610                loc_ptr,
1611                mode,
1612                domain_id,
1613                session.node_name_buf.as_ptr(),
1614                &mut view,
1615            )
1616        };
1617        // Phase 156.4 — diagnostic for bridge runtime
1618        // ConnectionFailed investigation. Logs the raw ret +
1619        // post-open backend_data state so callers see which of
1620        // the two failure paths fired. Gated on env var so
1621        // production traffic stays quiet.
1622        #[cfg(feature = "std")]
1623        if std::env::var_os("NROS_RMW_TRACE_OPEN").is_some() {
1624            std::eprintln!(
1625                "[nros-rmw-cffi] open: locator={locator:?} mode={mode} ret={ret} backend_data={:p}",
1626                view.backend_data,
1627            );
1628        }
1629        if ret != NROS_RMW_RET_OK {
1630            return Err(error_from_ret(ret));
1631        }
1632        if view.backend_data.is_null() {
1633            return Err(TransportError::ConnectionFailed);
1634        }
1635        session.backend_data = view.backend_data;
1636        Ok(session)
1637    }
1638}
1639
1640impl Session for CffiSession {
1641    type Error = TransportError;
1642    type PublisherHandle = CffiPublisher;
1643    type SubscriberHandle = CffiSubscriber;
1644    type ServiceServerHandle = CffiServiceServer;
1645    type ServiceClientHandle = CffiServiceClient;
1646
1647    fn create_publisher(
1648        &mut self,
1649        topic: &TopicInfo,
1650        qos: QosSettings,
1651    ) -> Result<CffiPublisher, TransportError> {
1652        let mut hash_buf = [0u8; HASH_BUF_LEN];
1653        let hash_ptr = to_c_str(topic.type_hash, &mut hash_buf);
1654        let qos_struct = NrosRmwQos::from(qos);
1655
1656        let mut pub_state = CffiPublisher {
1657            vtable: self.vtable,
1658            topic_name_buf: [0u8; NAME_BUF_LEN],
1659            type_name_buf: [0u8; NAME_BUF_LEN],
1660            qos: qos_struct,
1661            can_loan_messages: false,
1662            backend_data: core::ptr::null_mut(),
1663        };
1664        let topic_ptr = to_c_str(topic.name, &mut pub_state.topic_name_buf);
1665        let type_ptr = to_c_str(topic.type_name, &mut pub_state.type_name_buf);
1666
1667        let mut view = NrosRmwPublisher {
1668            topic_name: topic_ptr,
1669            type_name: type_ptr,
1670            qos: qos_struct,
1671            can_loan_messages: false,
1672            _reserved: [0u8; 7],
1673            backend_data: core::ptr::null_mut(),
1674        };
1675        let mut session_view = self.make_view();
1676        let ret = unsafe {
1677            (self.vtable.create_publisher)(
1678                &mut session_view,
1679                topic_ptr,
1680                type_ptr,
1681                hash_ptr,
1682                topic.domain_id,
1683                &qos_struct,
1684                &mut view,
1685            )
1686        };
1687        if ret != NROS_RMW_RET_OK {
1688            return Err(error_from_ret(ret));
1689        }
1690        if view.backend_data.is_null() {
1691            return Err(TransportError::PublisherCreationFailed);
1692        }
1693        pub_state.backend_data = view.backend_data;
1694        pub_state.can_loan_messages = view.can_loan_messages;
1695        Ok(pub_state)
1696    }
1697
1698    fn create_subscriber(
1699        &mut self,
1700        topic: &TopicInfo,
1701        qos: QosSettings,
1702    ) -> Result<CffiSubscriber, TransportError> {
1703        let mut hash_buf = [0u8; HASH_BUF_LEN];
1704        let hash_ptr = to_c_str(topic.type_hash, &mut hash_buf);
1705        let mut qos_struct = NrosRmwQos::from(qos);
1706        // Phase 231 (RFC-0038) — carry the receive-buffer size hint across the
1707        // C ABI so a size-classing backend can route its receive storage.
1708        qos_struct.rx_buffer_hint = topic.rx_buffer_hint.min(u32::MAX as usize) as u32;
1709
1710        let mut sub_state = CffiSubscriber {
1711            vtable: self.vtable,
1712            topic_name_buf: [0u8; NAME_BUF_LEN],
1713            type_name_buf: [0u8; NAME_BUF_LEN],
1714            qos: qos_struct,
1715            can_loan_messages: false,
1716            backend_data: core::ptr::null_mut(),
1717            supports_in_place: false,
1718        };
1719        let topic_ptr = to_c_str(topic.name, &mut sub_state.topic_name_buf);
1720        let type_ptr = to_c_str(topic.type_name, &mut sub_state.type_name_buf);
1721
1722        let mut view = NrosRmwSubscriber {
1723            topic_name: topic_ptr,
1724            type_name: type_ptr,
1725            qos: qos_struct,
1726            can_loan_messages: false,
1727            _reserved: [0u8; 7],
1728            backend_data: core::ptr::null_mut(),
1729        };
1730        let mut session_view = self.make_view();
1731        let ret = unsafe {
1732            (self.vtable.create_subscriber)(
1733                &mut session_view,
1734                topic_ptr,
1735                type_ptr,
1736                hash_ptr,
1737                topic.domain_id,
1738                &qos_struct,
1739                &mut view,
1740            )
1741        };
1742        if ret != NROS_RMW_RET_OK {
1743            return Err(error_from_ret(ret));
1744        }
1745        if view.backend_data.is_null() {
1746            return Err(TransportError::SubscriberCreationFailed);
1747        }
1748        sub_state.backend_data = view.backend_data;
1749        sub_state.can_loan_messages = view.can_loan_messages;
1750        // Phase 231 (RFC-0038) — cache the in-place capability once.
1751        sub_state.supports_in_place = match sub_state.vtable.subscriber_supports_in_place {
1752            Some(f) => {
1753                let mut v = sub_state.make_view();
1754                unsafe { f(&mut v) == 1 }
1755            }
1756            None => false,
1757        };
1758        Ok(sub_state)
1759    }
1760
1761    fn create_service_server(
1762        &mut self,
1763        service: &ServiceInfo,
1764        qos: QosSettings,
1765    ) -> Result<CffiServiceServer, TransportError> {
1766        let qos_struct = NrosRmwQos::from(qos);
1767        let mut hash_buf = [0u8; HASH_BUF_LEN];
1768        let hash_ptr = to_c_str(service.type_hash, &mut hash_buf);
1769
1770        let mut srv_state = CffiServiceServer {
1771            vtable: self.vtable,
1772            service_name_buf: [0u8; NAME_BUF_LEN],
1773            type_name_buf: [0u8; NAME_BUF_LEN],
1774            backend_data: core::ptr::null_mut(),
1775        };
1776        let svc_ptr = to_c_str(service.name, &mut srv_state.service_name_buf);
1777        let type_ptr = to_c_str(service.type_name, &mut srv_state.type_name_buf);
1778
1779        let mut view = NrosRmwServiceServer {
1780            service_name: svc_ptr,
1781            type_name: type_ptr,
1782            _reserved: [0u8; 8],
1783            backend_data: core::ptr::null_mut(),
1784        };
1785        let mut session_view = self.make_view();
1786        let ret = unsafe {
1787            (self.vtable.create_service_server)(
1788                &mut session_view,
1789                svc_ptr,
1790                type_ptr,
1791                hash_ptr,
1792                service.domain_id,
1793                &qos_struct,
1794                &mut view,
1795            )
1796        };
1797        if ret != NROS_RMW_RET_OK {
1798            return Err(error_from_ret(ret));
1799        }
1800        if view.backend_data.is_null() {
1801            return Err(TransportError::ServiceServerCreationFailed);
1802        }
1803        srv_state.backend_data = view.backend_data;
1804        Ok(srv_state)
1805    }
1806
1807    fn create_service_client(
1808        &mut self,
1809        service: &ServiceInfo,
1810        qos: QosSettings,
1811    ) -> Result<CffiServiceClient, TransportError> {
1812        let qos_struct = NrosRmwQos::from(qos);
1813        let mut hash_buf = [0u8; HASH_BUF_LEN];
1814        let hash_ptr = to_c_str(service.type_hash, &mut hash_buf);
1815
1816        let mut cli_state = CffiServiceClient {
1817            vtable: self.vtable,
1818            service_name_buf: [0u8; NAME_BUF_LEN],
1819            type_name_buf: [0u8; NAME_BUF_LEN],
1820            backend_data: core::ptr::null_mut(),
1821            pending_len: 0,
1822        };
1823        let svc_ptr = to_c_str(service.name, &mut cli_state.service_name_buf);
1824        let type_ptr = to_c_str(service.type_name, &mut cli_state.type_name_buf);
1825
1826        let mut view = NrosRmwServiceClient {
1827            service_name: svc_ptr,
1828            type_name: type_ptr,
1829            _reserved: [0u8; 8],
1830            backend_data: core::ptr::null_mut(),
1831        };
1832        let mut session_view = self.make_view();
1833        let ret = unsafe {
1834            (self.vtable.create_service_client)(
1835                &mut session_view,
1836                svc_ptr,
1837                type_ptr,
1838                hash_ptr,
1839                service.domain_id,
1840                &qos_struct,
1841                &mut view,
1842            )
1843        };
1844        if ret != NROS_RMW_RET_OK {
1845            return Err(error_from_ret(ret));
1846        }
1847        if view.backend_data.is_null() {
1848            return Err(TransportError::ServiceClientCreationFailed);
1849        }
1850        cli_state.backend_data = view.backend_data;
1851        Ok(cli_state)
1852    }
1853
1854    fn close(&mut self) -> Result<(), TransportError> {
1855        if self.backend_data.is_null() {
1856            return Ok(());
1857        }
1858        let mut view = self.make_view();
1859        let ret = unsafe { (self.vtable.close)(&mut view) };
1860        if ret != NROS_RMW_RET_OK {
1861            return Err(error_from_ret(ret));
1862        }
1863        self.backend_data = core::ptr::null_mut();
1864        Ok(())
1865    }
1866
1867    fn drive_io(&mut self, timeout_ms: i32) -> Result<(), TransportError> {
1868        let mut view = self.make_view();
1869        let ret = unsafe { (self.vtable.drive_io)(&mut view, timeout_ms) };
1870        if ret != NROS_RMW_RET_OK {
1871            return Err(error_from_ret(ret));
1872        }
1873        Ok(())
1874    }
1875
1876    fn next_deadline_ms(&self) -> Option<u32> {
1877        let f = self.vtable.next_deadline_ms?;
1878        // SAFETY: build a transient `&self`-only view of the session
1879        // fields the C side may inspect; matches the layout `make_view`
1880        // produces but doesn't require `&mut self`.
1881        let view = NrosRmwSession {
1882            node_name: self.node_name_buf.as_ptr(),
1883            namespace_: self.namespace_buf.as_ptr(),
1884            _reserved: [0u8; 8],
1885            backend_data: self.backend_data,
1886        };
1887        let ret = unsafe { f(&view as *const _) };
1888        if ret < 0 { None } else { Some(ret as u32) }
1889    }
1890
1891    unsafe fn set_wake_callback(
1892        &mut self,
1893        cb: Option<unsafe extern "C" fn(ctx: *mut core::ffi::c_void)>,
1894        ctx: *mut core::ffi::c_void,
1895    ) {
1896        let Some(f) = self.vtable.set_wake_callback else {
1897            return;
1898        };
1899        let mut view = NrosRmwSession {
1900            node_name: self.node_name_buf.as_ptr(),
1901            namespace_: self.namespace_buf.as_ptr(),
1902            _reserved: [0u8; 8],
1903            backend_data: self.backend_data,
1904        };
1905        // SAFETY: vtable trampoline owns the install/clear; result is
1906        // ignored — best-effort.
1907        let _ = unsafe { f(&mut view as *mut _, cb, ctx) };
1908    }
1909
1910    fn supports_wake_callback(&self) -> bool {
1911        // Phase 130.4 — the vtable slot's presence is the truthful
1912        // signal. Poll-only backends (XRCE-DDS-Client, current
1913        // Cyclone wrapper, current dust-DDS shim) leave the slot
1914        // NULL; only backends with an async wake source fill it.
1915        self.vtable.set_wake_callback.is_some()
1916    }
1917
1918    fn ping_session(&mut self, timeout_ms: i32) -> Result<(), TransportError> {
1919        // Phase 124.F.1 — forward to the backend's vtable slot when
1920        // available; NULL surfaces `Unsupported` to the caller (no
1921        // implicit emulation — backends without a wire-level
1922        // round-trip can't probe honestly).
1923        let Some(f) = self.vtable.ping_session else {
1924            return Err(TransportError::Unsupported);
1925        };
1926        let mut view = self.make_view();
1927        let rc = unsafe { f(&mut view, timeout_ms) };
1928        if rc == NROS_RMW_RET_OK {
1929            Ok(())
1930        } else {
1931            Err(error_from_ret(rc))
1932        }
1933    }
1934
1935    /// Phase 115.K.2.5.1.2 — declare a permissive QoS-policy mask
1936    /// here so backends behind the cffi vtable don't get rejected by
1937    /// the runtime's pre-validate step before they ever see the
1938    /// `create_publisher` / `create_subscriber` call. The vtable
1939    /// doesn't expose a per-backend policy mask yet; until it does,
1940    /// the cffi route has to assume the registered backend supports
1941    /// the union of every policy any nros-supported RMW honours.
1942    /// Backends that don't support a policy MUST surface
1943    /// `NROS_RMW_RET_INCOMPATIBLE_QOS` from `create_publisher` etc.
1944    /// to keep the no-silent-degradation contract.
1945    ///
1946    /// TODO 115.K.2.x: extend `nros_rmw_vtable_t` with a
1947    /// `supported_qos_policies()` callback so the runtime queries
1948    /// the backend instead of guessing.
1949    fn supported_qos_policies(&self) -> nros_rmw::QosPolicyMask {
1950        use nros_rmw::QosPolicyMask;
1951        QosPolicyMask::CORE
1952            | QosPolicyMask::DURABILITY_TRANSIENT_LOCAL
1953            | QosPolicyMask::AVOID_ROS_NAMESPACE_CONVENTIONS
1954            | QosPolicyMask::DEADLINE
1955            | QosPolicyMask::LIFESPAN
1956            | QosPolicyMask::LIVELINESS_AUTOMATIC
1957            | QosPolicyMask::LIVELINESS_MANUAL_BY_TOPIC
1958            | QosPolicyMask::LIVELINESS_MANUAL_BY_NODE
1959            | QosPolicyMask::LIVELINESS_LEASE
1960    }
1961}
1962
1963impl Drop for CffiSession {
1964    fn drop(&mut self) {
1965        if !self.backend_data.is_null() {
1966            let mut view = self.make_view();
1967            unsafe { (self.vtable.close)(&mut view) };
1968        }
1969    }
1970}
1971
1972// ============================================================================
1973// CffiPublisher
1974// ============================================================================
1975
1976/// Publisher backed by a C vtable.
1977pub struct CffiPublisher {
1978    vtable: &'static NrosRmwVtable,
1979    topic_name_buf: [u8; NAME_BUF_LEN],
1980    type_name_buf: [u8; NAME_BUF_LEN],
1981    qos: NrosRmwQos,
1982    can_loan_messages: bool,
1983    backend_data: *mut c_void,
1984}
1985
1986impl CffiPublisher {
1987    fn make_view(&mut self) -> NrosRmwPublisher {
1988        NrosRmwPublisher {
1989            topic_name: self.topic_name_buf.as_ptr(),
1990            type_name: self.type_name_buf.as_ptr(),
1991            qos: self.qos,
1992            can_loan_messages: self.can_loan_messages,
1993            _reserved: [0u8; 7],
1994            backend_data: self.backend_data,
1995        }
1996    }
1997
1998    /// Topic name. Result is the null-terminated string written at
1999    /// publisher creation; never re-resolved from the backend.
2000    pub fn topic_name(&self) -> &str {
2001        cstr_buf_to_str(&self.topic_name_buf)
2002    }
2003
2004    /// Fully-qualified type name (`"std_msgs/msg/Int32"`).
2005    pub fn type_name(&self) -> &str {
2006        cstr_buf_to_str(&self.type_name_buf)
2007    }
2008
2009    /// QoS used to create this publisher.
2010    pub fn qos(&self) -> NrosRmwQos {
2011        self.qos
2012    }
2013
2014    /// `true` iff the backend exposes the publish loan primitive
2015    /// (Phase 99). Mirrors upstream `rmw_publisher_t::can_loan_messages`.
2016    pub fn can_loan_messages(&self) -> bool {
2017        self.can_loan_messages
2018    }
2019}
2020
2021/// Phase 124.A — writable slot returned by
2022/// [`CffiPublisher::try_lend_slot`]. Holds the backend's raw buffer
2023/// + opaque token until `commit_slot` consumes it or `Drop` fires
2024/// `pub_discard`.
2025#[cfg(feature = "lending")]
2026pub struct CffiSlot<'a> {
2027    buf: *mut u8,
2028    cap: usize,
2029    cursor: usize,
2030    token: *mut c_void,
2031    /// `None` after `commit_slot` consumes the slot — Drop skips the
2032    /// discard call in that case.
2033    publisher: Option<&'a CffiPublisher>,
2034    /// Phase 124.A.3 — `true` when this slot came from the runtime's
2035    /// arena fallback (backend had NULL `pub_loan`). Commit performs
2036    /// a `publish_raw` of the staged bytes; discard / Drop reclaims
2037    /// the staging buffer. `false` for native backend loans —
2038    /// commit / discard go through the vtable slots.
2039    fallback: bool,
2040}
2041
2042#[cfg(feature = "lending")]
2043impl<'a> CffiSlot<'a> {
2044    /// Mark the actual bytes written before commit. Defaults to the
2045    /// full capacity; callers that write a shorter prefix MUST call
2046    /// `set_len` first.
2047    pub fn set_len(&mut self, len: usize) {
2048        debug_assert!(len <= self.cap);
2049        self.cursor = len.min(self.cap);
2050    }
2051}
2052
2053/// Phase 124.A.3 — staging buffer for the arena-fallback loan path.
2054/// Allocated on each `try_lend_slot` when the backend's `pub_loan`
2055/// slot is NULL; commit copies into a `publish_raw` call; Drop /
2056/// discard reclaims the allocation. `Box::into_raw` of this struct
2057/// becomes the slot's opaque `token` so commit / discard can find
2058/// it back.
2059#[cfg(all(feature = "lending", feature = "alloc"))]
2060struct ArenaStaging {
2061    buf: alloc::vec::Vec<u8>,
2062}
2063
2064#[cfg(feature = "lending")]
2065impl<'a> AsMut<[u8]> for CffiSlot<'a> {
2066    fn as_mut(&mut self) -> &mut [u8] {
2067        // SAFETY: `buf` came from `pub_loan` with capacity `cap`. The
2068        // loan contract guarantees the slot stays valid until commit
2069        // or discard. The lifetime `'a` borrows the publisher so the
2070        // returned slice can't outlive the loan.
2071        unsafe { core::slice::from_raw_parts_mut(self.buf, self.cap) }
2072    }
2073}
2074
2075#[cfg(feature = "lending")]
2076impl<'a> Drop for CffiSlot<'a> {
2077    fn drop(&mut self) {
2078        if self.publisher.is_none() {
2079            // commit_slot consumed the loan — nothing to release.
2080            return;
2081        }
2082        if self.fallback {
2083            // Phase 124.A.3 — reclaim the staging allocation.
2084            #[cfg(feature = "alloc")]
2085            unsafe {
2086                let _ = alloc::boxed::Box::from_raw(self.token as *mut ArenaStaging);
2087            }
2088            return;
2089        }
2090        if let Some(p) = self.publisher
2091            && let Some(discard) = p.vtable.pub_discard
2092        {
2093            // Re-materialise the publisher view so the backend sees
2094            // the same `NrosRmwPublisher` shape it created the loan
2095            // against.
2096            let mut view = NrosRmwPublisher {
2097                topic_name: p.topic_name_buf.as_ptr(),
2098                type_name: p.type_name_buf.as_ptr(),
2099                qos: p.qos,
2100                can_loan_messages: p.can_loan_messages,
2101                _reserved: [0u8; 7],
2102                backend_data: p.backend_data,
2103            };
2104            // SAFETY: `token` came from a paired `pub_loan` on this
2105            // publisher and the publisher is still alive (lifetime
2106            // `'a` borrows it).
2107            unsafe { discard(&mut view, self.token) };
2108        }
2109    }
2110}
2111
2112#[cfg(feature = "lending")]
2113impl nros_rmw::SlotLending for CffiPublisher {
2114    type Slot<'a> = CffiSlot<'a>;
2115
2116    fn try_lend_slot(&self, len: usize) -> Result<Option<CffiSlot<'_>>, TransportError> {
2117        let Some(loan) = self.vtable.pub_loan else {
2118            // Phase 124.A.3 — backend doesn't natively lend; allocate
2119            // a staging buffer and stash it in `token` so commit can
2120            // memcpy → publish_raw and discard / Drop can reclaim.
2121            // Requires `alloc` for the dynamic staging; no_std-no_alloc
2122            // builds return None and let the caller fall back to a
2123            // non-loan path.
2124            #[cfg(feature = "alloc")]
2125            {
2126                let mut staging = alloc::boxed::Box::new(ArenaStaging {
2127                    buf: alloc::vec![0u8; len],
2128                });
2129                let buf_ptr = staging.buf.as_mut_ptr();
2130                let token = alloc::boxed::Box::into_raw(staging) as *mut c_void;
2131                return Ok(Some(CffiSlot {
2132                    buf: buf_ptr,
2133                    cap: len,
2134                    cursor: len,
2135                    token,
2136                    publisher: Some(self),
2137                    fallback: true,
2138                }));
2139            }
2140            #[cfg(not(feature = "alloc"))]
2141            {
2142                let _ = len;
2143                return Ok(None);
2144            }
2145        };
2146        let mut view = NrosRmwPublisher {
2147            topic_name: self.topic_name_buf.as_ptr(),
2148            type_name: self.type_name_buf.as_ptr(),
2149            qos: self.qos,
2150            can_loan_messages: self.can_loan_messages,
2151            _reserved: [0u8; 7],
2152            backend_data: self.backend_data,
2153        };
2154        let mut out_buf: *mut u8 = core::ptr::null_mut();
2155        let mut out_cap: usize = 0;
2156        let mut out_token: *mut c_void = core::ptr::null_mut();
2157        // SAFETY: vtable contract — slot pointers stay valid until
2158        // commit / discard.
2159        let ret = unsafe { loan(&mut view, len, &mut out_buf, &mut out_cap, &mut out_token) };
2160        if ret == NROS_RMW_RET_WOULD_BLOCK || ret == NROS_RMW_RET_NO_DATA {
2161            return Ok(None);
2162        }
2163        if ret != NROS_RMW_RET_OK {
2164            return Err(error_from_ret(ret));
2165        }
2166        if out_buf.is_null() || out_cap < len {
2167            // Defensive: a buggy backend returned OK with a too-small
2168            // slot. Treat as transient.
2169            if let Some(discard) = self.vtable.pub_discard {
2170                unsafe { discard(&mut view, out_token) };
2171            }
2172            return Ok(None);
2173        }
2174        Ok(Some(CffiSlot {
2175            buf: out_buf,
2176            cap: out_cap,
2177            cursor: len,
2178            token: out_token,
2179            publisher: Some(self),
2180            fallback: false,
2181        }))
2182    }
2183
2184    fn commit_slot(&self, mut slot: CffiSlot<'_>) -> Result<(), TransportError> {
2185        // Cancel Drop's discard — we're committing, not abandoning.
2186        let publisher = slot
2187            .publisher
2188            .take()
2189            .ok_or(TransportError::InvalidArgument)?;
2190        debug_assert!(core::ptr::eq(publisher, self));
2191        if slot.fallback {
2192            // Phase 124.A.3 — fallback path: reclaim the staging
2193            // box, run a single publish_raw of the cursor-truncated
2194            // contents.
2195            #[cfg(feature = "alloc")]
2196            {
2197                // SAFETY: `slot.token` came from
2198                // `Box::into_raw(Box<ArenaStaging>)` in try_lend_slot.
2199                let staging =
2200                    unsafe { alloc::boxed::Box::from_raw(slot.token as *mut ArenaStaging) };
2201                let bytes = &staging.buf[..slot.cursor.min(staging.buf.len())];
2202                return Publisher::publish_raw(self, bytes);
2203            }
2204            #[cfg(not(feature = "alloc"))]
2205            {
2206                return Err(TransportError::Unsupported);
2207            }
2208        }
2209        let commit = self.vtable.pub_commit.ok_or(TransportError::Unsupported)?;
2210        let mut view = NrosRmwPublisher {
2211            topic_name: self.topic_name_buf.as_ptr(),
2212            type_name: self.type_name_buf.as_ptr(),
2213            qos: self.qos,
2214            can_loan_messages: self.can_loan_messages,
2215            _reserved: [0u8; 7],
2216            backend_data: self.backend_data,
2217        };
2218        let len = slot.cursor;
2219        let token = slot.token;
2220        // `slot` drops here without firing `pub_discard` because
2221        // `publisher` is `None`.
2222        let ret = unsafe { commit(&mut view, token, len) };
2223        if ret != NROS_RMW_RET_OK {
2224            return Err(error_from_ret(ret));
2225        }
2226        Ok(())
2227    }
2228}
2229
2230impl Publisher for CffiPublisher {
2231    type Error = TransportError;
2232
2233    fn publish_raw(&self, data: &[u8]) -> Result<(), TransportError> {
2234        let mut view = NrosRmwPublisher {
2235            topic_name: self.topic_name_buf.as_ptr(),
2236            type_name: self.type_name_buf.as_ptr(),
2237            qos: self.qos,
2238            can_loan_messages: self.can_loan_messages,
2239            _reserved: [0u8; 7],
2240            backend_data: self.backend_data,
2241        };
2242        let ret = unsafe { (self.vtable.publish_raw)(&mut view, data.as_ptr(), data.len()) };
2243        if ret != NROS_RMW_RET_OK {
2244            return Err(error_from_ret(ret));
2245        }
2246        Ok(())
2247    }
2248
2249    unsafe fn publish_streamed(
2250        &self,
2251        size_cb: unsafe extern "C" fn(out_total_len: *mut usize, user_ctx: *mut core::ffi::c_void),
2252        chunk_cb: unsafe extern "C" fn(
2253            out_buf: *mut u8,
2254            cap: usize,
2255            out_written: *mut usize,
2256            user_ctx: *mut core::ffi::c_void,
2257        ),
2258        user_ctx: *mut core::ffi::c_void,
2259    ) -> Result<(), TransportError> {
2260        // Phase 124.E.1+2 — vtable forwarder. If the backend exposes
2261        // `publish_streamed` natively, dispatch in one hop so the
2262        // callbacks land directly inside the backend's outbound
2263        // buffer (no staging copy). Otherwise fall back to the
2264        // `Publisher::publish_streamed` default body, which runs a
2265        // stack staging buffer + `publish_raw`.
2266        if let Some(f) = self.vtable.publish_streamed {
2267            let mut view = NrosRmwPublisher {
2268                topic_name: self.topic_name_buf.as_ptr(),
2269                type_name: self.type_name_buf.as_ptr(),
2270                qos: self.qos,
2271                can_loan_messages: self.can_loan_messages,
2272                _reserved: [0u8; 7],
2273                backend_data: self.backend_data,
2274            };
2275            let ret = unsafe { f(&mut view, size_cb, chunk_cb, user_ctx) };
2276            if ret != NROS_RMW_RET_OK {
2277                return Err(error_from_ret(ret));
2278            }
2279            return Ok(());
2280        }
2281        // Inlined staging-buffer fallback. Mirrors the trait default
2282        // body so the override doesn't recurse through dynamic
2283        // dispatch — the default body would resolve back to this
2284        // function and deadlock.
2285        const STAGE_CAP: usize = 4096;
2286        let mut total = 0usize;
2287        unsafe { size_cb(&mut total as *mut usize, user_ctx) };
2288        if total > STAGE_CAP {
2289            return Err(TransportError::BufferTooSmall);
2290        }
2291        let mut stage = [0u8; STAGE_CAP];
2292        let mut written_so_far = 0usize;
2293        while written_so_far < total {
2294            let mut chunk_written = 0usize;
2295            let remaining = total - written_so_far;
2296            unsafe {
2297                chunk_cb(
2298                    stage.as_mut_ptr().add(written_so_far),
2299                    remaining,
2300                    &mut chunk_written as *mut usize,
2301                    user_ctx,
2302                );
2303            }
2304            if chunk_written == 0 {
2305                return Err(TransportError::BufferTooSmall);
2306            }
2307            written_so_far += chunk_written;
2308        }
2309        self.publish_raw(&stage[..total])
2310    }
2311
2312    fn buffer_error(&self) -> TransportError {
2313        TransportError::BufferTooSmall
2314    }
2315
2316    fn serialization_error(&self) -> TransportError {
2317        TransportError::SerializationError
2318    }
2319
2320    fn unsupported_event_error(&self) -> TransportError {
2321        TransportError::Unsupported
2322    }
2323
2324    unsafe fn register_event_callback(
2325        &mut self,
2326        kind: nros_rmw::EventKind,
2327        deadline_ms: u32,
2328        cb: nros_rmw::EventCallback,
2329        user_ctx: *mut core::ffi::c_void,
2330    ) -> Result<(), TransportError> {
2331        let mut view = NrosRmwPublisher {
2332            topic_name: self.topic_name_buf.as_ptr(),
2333            type_name: self.type_name_buf.as_ptr(),
2334            qos: self.qos,
2335            can_loan_messages: self.can_loan_messages,
2336            _reserved: [0u8; 7],
2337            backend_data: self.backend_data,
2338        };
2339        // Cffi NrosRmwEventCallback ABI matches nros_rmw::EventCallback —
2340        // both are `unsafe extern "C" fn(EventKind, *const c_void, *mut c_void)`.
2341        // The C-side enum is bitwise-equivalent to the Rust enum (same #[repr(u8)]).
2342        let cb: NrosRmwEventCallback =
2343            unsafe { core::mem::transmute::<nros_rmw::EventCallback, NrosRmwEventCallback>(cb) };
2344        let ret = unsafe {
2345            (self.vtable.register_publisher_event)(
2346                &mut view,
2347                kind.into(),
2348                deadline_ms,
2349                cb,
2350                user_ctx,
2351            )
2352        };
2353        if ret != NROS_RMW_RET_OK {
2354            return Err(error_from_ret(ret));
2355        }
2356        Ok(())
2357    }
2358
2359    fn assert_liveliness(&self) -> Result<(), TransportError> {
2360        // Phase 108.B — manual liveliness assertion. NULL function
2361        // pointer = backend doesn't support manual liveliness; the
2362        // runtime caller (Node) gates the call by liveliness_kind so
2363        // we just delegate.
2364        let view_ptr = self as *const _ as *mut Self;
2365        let mut view = unsafe { (*view_ptr).make_view() };
2366        let ret = unsafe { (self.vtable.assert_publisher_liveliness)(&mut view) };
2367        if ret != NROS_RMW_RET_OK {
2368            return Err(error_from_ret(ret));
2369        }
2370        Ok(())
2371    }
2372}
2373
2374impl Drop for CffiPublisher {
2375    fn drop(&mut self) {
2376        if !self.backend_data.is_null() {
2377            let mut view = self.make_view();
2378            unsafe { (self.vtable.destroy_publisher)(&mut view) };
2379        }
2380    }
2381}
2382
2383// ============================================================================
2384// CffiSubscriber
2385// ============================================================================
2386
2387/// Subscriber backed by a C vtable.
2388pub struct CffiSubscriber {
2389    vtable: &'static NrosRmwVtable,
2390    topic_name_buf: [u8; NAME_BUF_LEN],
2391    type_name_buf: [u8; NAME_BUF_LEN],
2392    qos: NrosRmwQos,
2393    can_loan_messages: bool,
2394    backend_data: *mut c_void,
2395    /// Phase 231 (RFC-0038) — cached `subscriber_supports_in_place` capability,
2396    /// queried once at creation so `supports_process_in_place(&self)` is cheap.
2397    supports_in_place: bool,
2398}
2399
2400impl CffiSubscriber {
2401    fn make_view(&mut self) -> NrosRmwSubscriber {
2402        NrosRmwSubscriber {
2403            topic_name: self.topic_name_buf.as_ptr(),
2404            type_name: self.type_name_buf.as_ptr(),
2405            qos: self.qos,
2406            can_loan_messages: self.can_loan_messages,
2407            _reserved: [0u8; 7],
2408            backend_data: self.backend_data,
2409        }
2410    }
2411
2412    /// Phase 231 (RFC-0038) — drive the `process_raw_in_place` vtable slot,
2413    /// marshalling the Rust `FnOnce` through the C `ctx`/`cb`. A monomorphized
2414    /// trampoline takes the closure out of a stack `Option` cell and calls it
2415    /// with the borrowed slice. The named generic `G` is why the public trait
2416    /// method (which uses APIT) delegates here.
2417    fn run_process_in_place<G: FnOnce(&[u8])>(&mut self, f: G) -> Result<bool, TransportError> {
2418        let Some(slot) = self.vtable.process_raw_in_place else {
2419            return Err(TransportError::MessageTooLarge);
2420        };
2421        unsafe extern "C" fn cb_tramp<G: FnOnce(&[u8])>(
2422            ctx: *mut c_void,
2423            ptr: *const u8,
2424            len: usize,
2425        ) {
2426            let cell = unsafe { &mut *(ctx as *mut Option<G>) };
2427            if let Some(g) = cell.take() {
2428                g(unsafe { core::slice::from_raw_parts(ptr, len) });
2429            }
2430        }
2431        let mut cell: Option<G> = Some(f);
2432        let mut view = self.make_view();
2433        let rc = unsafe {
2434            slot(
2435                &mut view,
2436                &mut cell as *mut Option<G> as *mut c_void,
2437                cb_tramp::<G>,
2438            )
2439        };
2440        if rc == NROS_RMW_RET_NO_DATA {
2441            Ok(false)
2442        } else if rc < 0 {
2443            Err(error_from_ret(rc))
2444        } else {
2445            Ok(rc > 0)
2446        }
2447    }
2448
2449    pub fn topic_name(&self) -> &str {
2450        cstr_buf_to_str(&self.topic_name_buf)
2451    }
2452
2453    pub fn type_name(&self) -> &str {
2454        cstr_buf_to_str(&self.type_name_buf)
2455    }
2456
2457    pub fn qos(&self) -> NrosRmwQos {
2458        self.qos
2459    }
2460
2461    /// `true` iff the backend exposes the receive loan primitive
2462    /// (Phase 99).
2463    pub fn can_loan_messages(&self) -> bool {
2464        self.can_loan_messages
2465    }
2466}
2467
2468/// Phase 124.A — read-only view returned by
2469/// [`CffiSubscriber::try_borrow`]. Holds the backend's raw buffer +
2470/// opaque token until `Drop` fires `sub_release`.
2471#[cfg(feature = "lending")]
2472pub struct CffiView<'a> {
2473    buf: *const u8,
2474    len: usize,
2475    token: *mut c_void,
2476    subscriber: Option<&'a mut CffiSubscriber>,
2477}
2478
2479#[cfg(feature = "lending")]
2480impl<'a> AsRef<[u8]> for CffiView<'a> {
2481    fn as_ref(&self) -> &[u8] {
2482        // SAFETY: `buf` came from `sub_borrow` with length `len`.
2483        // The borrow contract guarantees the buffer stays valid until
2484        // `sub_release` fires (in Drop). Lifetime `'a` borrows the
2485        // subscriber so the slice can't outlive the borrow.
2486        unsafe { core::slice::from_raw_parts(self.buf, self.len) }
2487    }
2488}
2489
2490#[cfg(feature = "lending")]
2491impl<'a> Drop for CffiView<'a> {
2492    fn drop(&mut self) {
2493        if let Some(sub) = self.subscriber.take()
2494            && let Some(release) = sub.vtable.sub_release
2495        {
2496            let mut view = sub.make_view();
2497            // SAFETY: `token` paired with a prior `sub_borrow` on
2498            // this subscriber and the subscriber is still alive.
2499            unsafe { release(&mut view, self.token) };
2500        }
2501    }
2502}
2503
2504#[cfg(feature = "lending")]
2505impl nros_rmw::SlotBorrowing for CffiSubscriber {
2506    type View<'a> = CffiView<'a>;
2507
2508    fn try_borrow(&mut self) -> Result<Option<CffiView<'_>>, TransportError> {
2509        let Some(borrow) = self.vtable.sub_borrow else {
2510            // Phase 124.A — backend doesn't natively borrow; runtime
2511            // falls back to `try_recv_raw` into a staging buffer
2512            // (124.A.3). `None` lets the caller use the slow path.
2513            return Ok(None);
2514        };
2515        let mut view = self.make_view();
2516        let mut out_buf: *const u8 = core::ptr::null();
2517        let mut out_len: usize = 0;
2518        let mut out_token: *mut c_void = core::ptr::null_mut();
2519        // SAFETY: vtable contract — borrowed pointers stay valid
2520        // until `sub_release` runs.
2521        let rc = unsafe { borrow(&mut view, &mut out_buf, &mut out_len, &mut out_token) };
2522        if rc == 0 {
2523            // No message ready.
2524            return Ok(None);
2525        }
2526        if rc < 0 {
2527            return Err(error_from_ret(rc));
2528        }
2529        if out_buf.is_null() {
2530            return Ok(None);
2531        }
2532        let len = (rc as usize).min(out_len.max(rc as usize));
2533        Ok(Some(CffiView {
2534            buf: out_buf,
2535            len,
2536            token: out_token,
2537            subscriber: Some(self),
2538        }))
2539    }
2540}
2541
2542impl nros_rmw::Subscriber for CffiSubscriber {
2543    type Error = TransportError;
2544
2545    fn supports_process_in_place(&self) -> bool {
2546        self.supports_in_place
2547    }
2548
2549    fn process_raw_in_place(&mut self, f: impl FnOnce(&[u8])) -> Result<bool, Self::Error> {
2550        self.run_process_in_place(f)
2551    }
2552
2553    fn has_data(&self) -> bool {
2554        // has_data takes &mut to match the C signature; cast away const
2555        // because the predicate is logically read-only — backends must
2556        // not mutate state from has_data.
2557        let view_ptr = self as *const _ as *mut Self;
2558        let mut view = unsafe { (*view_ptr).make_view() };
2559        let rc = unsafe { (self.vtable.has_data)(&mut view) };
2560        rc > 0
2561    }
2562
2563    fn try_recv_raw(&mut self, buf: &mut [u8]) -> Result<Option<usize>, TransportError> {
2564        let mut view = self.make_view();
2565        let rc = unsafe { (self.vtable.try_recv_raw)(&mut view, buf.as_mut_ptr(), buf.len()) };
2566        if rc == NROS_RMW_RET_NO_DATA {
2567            return Ok(None);
2568        }
2569        if rc < 0 {
2570            return Err(error_from_ret(rc));
2571        }
2572        if rc == 0 {
2573            return Ok(None);
2574        }
2575        Ok(Some(rc as usize))
2576    }
2577
2578    fn try_recv_raw_with_info(
2579        &mut self,
2580        buf: &mut [u8],
2581    ) -> Result<Option<(usize, Option<MessageInfo>)>, TransportError> {
2582        let key = self.backend_data as usize;
2583        self.try_recv_raw(buf)
2584            .map(|opt| opt.map(|len| (len, take_cffi_message_info(key))))
2585    }
2586
2587    #[cfg(all(feature = "alloc", feature = "safety-e2e"))]
2588    fn try_recv_validated(
2589        &mut self,
2590        buf: &mut [u8],
2591    ) -> Result<Option<(usize, nros_rmw::IntegrityStatus)>, Self::Error> {
2592        let key = self.backend_data as usize;
2593        request_cffi_integrity_status(key);
2594        self.try_recv_raw(buf).map(|opt| {
2595            opt.map(|len| {
2596                (
2597                    len,
2598                    take_cffi_integrity_status(key).unwrap_or(nros_rmw::IntegrityStatus {
2599                        gap: 0,
2600                        duplicate: false,
2601                        crc_valid: None,
2602                    }),
2603                )
2604            })
2605        })
2606    }
2607
2608    fn try_recv_sequence(
2609        &mut self,
2610        buf: &mut [u8],
2611        per_msg_cap: usize,
2612        max_msgs: usize,
2613        out_lens: &mut [usize],
2614    ) -> Result<usize, TransportError> {
2615        // Phase 124.D.2 — runtime fallback. If the backend exposes
2616        // `try_recv_sequence` natively, call it in one hop; otherwise
2617        // delegate to the trait's default body which loop-drives
2618        // `try_recv_raw`. Either way the caller sees the same shape:
2619        // contiguous slot block + per-slot length array + count
2620        // return.
2621        if let Some(f) = self.vtable.try_recv_sequence {
2622            if per_msg_cap == 0 || max_msgs == 0 {
2623                return Ok(0);
2624            }
2625            let limit = max_msgs.min(out_lens.len());
2626            if buf.len() < limit.saturating_mul(per_msg_cap) {
2627                return Err(TransportError::BufferTooSmall);
2628            }
2629            let mut view = self.make_view();
2630            let rc = unsafe {
2631                f(
2632                    &mut view,
2633                    buf.as_mut_ptr(),
2634                    per_msg_cap,
2635                    limit,
2636                    out_lens.as_mut_ptr(),
2637                )
2638            };
2639            if rc < 0 {
2640                return Err(error_from_ret(rc));
2641            }
2642            return Ok(rc as usize);
2643        }
2644        // Phase 124.D.2 — `try_recv_raw` loop fallback. Inlined
2645        // here (rather than dispatching back through the trait
2646        // default body) so the recursion is structurally
2647        // impossible — `Subscriber::try_recv_sequence` on
2648        // `CffiSubscriber` is THIS function, and forwarding to
2649        // the default body would deadlock the override.
2650        if per_msg_cap == 0 || max_msgs == 0 {
2651            return Ok(0);
2652        }
2653        let limit = max_msgs.min(out_lens.len());
2654        if buf.len() < limit.saturating_mul(per_msg_cap) {
2655            return Err(TransportError::BufferTooSmall);
2656        }
2657        let mut count = 0;
2658        for i in 0..limit {
2659            let slot = &mut buf[i * per_msg_cap..(i + 1) * per_msg_cap];
2660            match self.try_recv_raw(slot)? {
2661                Some(len) => {
2662                    out_lens[i] = len;
2663                    count += 1;
2664                }
2665                None => break,
2666            }
2667        }
2668        Ok(count)
2669    }
2670
2671    fn deserialization_error(&self) -> TransportError {
2672        TransportError::DeserializationError
2673    }
2674
2675    fn unsupported_event_error(&self) -> TransportError {
2676        TransportError::Unsupported
2677    }
2678
2679    unsafe fn register_event_callback(
2680        &mut self,
2681        kind: nros_rmw::EventKind,
2682        deadline_ms: u32,
2683        cb: nros_rmw::EventCallback,
2684        user_ctx: *mut core::ffi::c_void,
2685    ) -> Result<(), TransportError> {
2686        let mut view = self.make_view();
2687        let cb: NrosRmwEventCallback =
2688            unsafe { core::mem::transmute::<nros_rmw::EventCallback, NrosRmwEventCallback>(cb) };
2689        let ret = unsafe {
2690            (self.vtable.register_subscriber_event)(
2691                &mut view,
2692                kind.into(),
2693                deadline_ms,
2694                cb,
2695                user_ctx,
2696            )
2697        };
2698        if ret != NROS_RMW_RET_OK {
2699            return Err(error_from_ret(ret));
2700        }
2701        Ok(())
2702    }
2703}
2704
2705impl Drop for CffiSubscriber {
2706    fn drop(&mut self) {
2707        if !self.backend_data.is_null() {
2708            clear_cffi_message_info(self.backend_data as usize);
2709            let mut view = self.make_view();
2710            unsafe { (self.vtable.destroy_subscriber)(&mut view) };
2711        }
2712    }
2713}
2714
2715// ============================================================================
2716// CffiServiceServer
2717// ============================================================================
2718
2719/// Service server backed by a C vtable.
2720pub struct CffiServiceServer {
2721    vtable: &'static NrosRmwVtable,
2722    service_name_buf: [u8; NAME_BUF_LEN],
2723    type_name_buf: [u8; NAME_BUF_LEN],
2724    backend_data: *mut c_void,
2725}
2726
2727impl CffiServiceServer {
2728    fn make_view(&mut self) -> NrosRmwServiceServer {
2729        NrosRmwServiceServer {
2730            service_name: self.service_name_buf.as_ptr(),
2731            type_name: self.type_name_buf.as_ptr(),
2732            _reserved: [0u8; 8],
2733            backend_data: self.backend_data,
2734        }
2735    }
2736
2737    pub fn service_name(&self) -> &str {
2738        cstr_buf_to_str(&self.service_name_buf)
2739    }
2740
2741    pub fn type_name(&self) -> &str {
2742        cstr_buf_to_str(&self.type_name_buf)
2743    }
2744}
2745
2746impl ServiceServerTrait for CffiServiceServer {
2747    type Error = TransportError;
2748
2749    fn has_request(&self) -> bool {
2750        let view_ptr = self as *const _ as *mut Self;
2751        let mut view = unsafe { (*view_ptr).make_view() };
2752        let rc = unsafe { (self.vtable.has_request)(&mut view) };
2753        rc > 0
2754    }
2755
2756    fn try_recv_request<'a>(
2757        &mut self,
2758        buf: &'a mut [u8],
2759    ) -> Result<Option<ServiceRequest<'a>>, TransportError> {
2760        let mut seq: i64 = 0;
2761        let mut view = self.make_view();
2762        let rc = unsafe {
2763            (self.vtable.try_recv_request)(&mut view, buf.as_mut_ptr(), buf.len(), &mut seq)
2764        };
2765        if rc == NROS_RMW_RET_NO_DATA {
2766            return Ok(None);
2767        }
2768        if rc < 0 {
2769            return Err(error_from_ret(rc));
2770        }
2771        if rc == 0 {
2772            return Ok(None);
2773        }
2774        let len = rc as usize;
2775        Ok(Some(ServiceRequest {
2776            data: &buf[..len],
2777            sequence_number: seq,
2778        }))
2779    }
2780
2781    fn send_reply(&mut self, sequence_number: i64, data: &[u8]) -> Result<(), TransportError> {
2782        let mut view = self.make_view();
2783        let ret = unsafe {
2784            (self.vtable.send_reply)(&mut view, sequence_number, data.as_ptr(), data.len())
2785        };
2786        if ret != NROS_RMW_RET_OK {
2787            return Err(error_from_ret(ret));
2788        }
2789        Ok(())
2790    }
2791}
2792
2793impl Drop for CffiServiceServer {
2794    fn drop(&mut self) {
2795        if !self.backend_data.is_null() {
2796            let mut view = self.make_view();
2797            unsafe { (self.vtable.destroy_service_server)(&mut view) };
2798        }
2799    }
2800}
2801
2802// ============================================================================
2803// CffiServiceClient
2804// ============================================================================
2805
2806/// Service client backed by a C vtable.
2807pub struct CffiServiceClient {
2808    vtable: &'static NrosRmwVtable,
2809    service_name_buf: [u8; NAME_BUF_LEN],
2810    type_name_buf: [u8; NAME_BUF_LEN],
2811    backend_data: *mut c_void,
2812    /// Phase 130.8 — flag (request length, or 0) tracking whether a
2813    /// request is in flight via the non-blocking `send_request_raw`
2814    /// / `try_recv_reply_raw` vtable slots. The legacy
2815    /// blocking-call_raw fallback that previously needed a local
2816    /// 4 KiB pending-request buffer has been removed; backends own
2817    /// the request bytes from `send_request_raw` onward.
2818    pending_len: usize,
2819}
2820
2821impl CffiServiceClient {
2822    fn make_view(&mut self) -> NrosRmwServiceClient {
2823        NrosRmwServiceClient {
2824            service_name: self.service_name_buf.as_ptr(),
2825            type_name: self.type_name_buf.as_ptr(),
2826            _reserved: [0u8; 8],
2827            backend_data: self.backend_data,
2828        }
2829    }
2830
2831    pub fn service_name(&self) -> &str {
2832        cstr_buf_to_str(&self.service_name_buf)
2833    }
2834
2835    pub fn type_name(&self) -> &str {
2836        cstr_buf_to_str(&self.type_name_buf)
2837    }
2838}
2839
2840impl ServiceClientTrait for CffiServiceClient {
2841    type Error = TransportError;
2842
2843    #[allow(deprecated)]
2844    fn call_raw(&mut self, request: &[u8], reply_buf: &mut [u8]) -> Result<usize, TransportError> {
2845        let mut view = self.make_view();
2846        let rc = unsafe {
2847            (self.vtable.call_raw)(
2848                &mut view,
2849                request.as_ptr(),
2850                request.len(),
2851                reply_buf.as_mut_ptr(),
2852                reply_buf.len(),
2853            )
2854        };
2855        if rc < 0 {
2856            return Err(error_from_ret(rc));
2857        }
2858        Ok(rc as usize)
2859    }
2860
2861    fn send_request_raw(&mut self, request: &[u8]) -> Result<(), TransportError> {
2862        // Phase 130.8 — every shipping backend now provides the
2863        // non-blocking `send_request_raw` + `try_recv_reply_raw`
2864        // vtable slots: XRCE-DDS-Client (native C),
2865        // Cyclone DDS C++ wrapper (native C++), Rust adapters
2866        // (dust-DDS + zenoh-pico via `rust_adapter`). The legacy
2867        // blocking-call_raw fallback that starved the executor's
2868        // spin loop (Phase 127.C.4 root cause) has been removed.
2869        // Backends that omit the slot get `Unsupported`; the
2870        // executor surfaces the error to the caller instead of
2871        // silently degrading to a multi-second blocking burst.
2872        let Some(f) = self.vtable.send_request_raw else {
2873            return Err(TransportError::Unsupported);
2874        };
2875        let mut view = self.make_view();
2876        let rc = unsafe { f(&mut view, request.as_ptr(), request.len()) };
2877        if rc != NROS_RMW_RET_OK {
2878            return Err(error_from_ret(rc));
2879        }
2880        self.pending_len = request.len().max(1);
2881        Ok(())
2882    }
2883
2884    fn try_recv_reply_raw(
2885        &mut self,
2886        reply_buf: &mut [u8],
2887    ) -> Result<Option<usize>, TransportError> {
2888        // Phase 130.8 — non-blocking poll only. NULL slot = backend
2889        // doesn't implement the service-client path; surface
2890        // Unsupported rather than the deprecated blocking fallback.
2891        let Some(f) = self.vtable.try_recv_reply_raw else {
2892            return Err(TransportError::Unsupported);
2893        };
2894        let mut view = self.make_view();
2895        let rc = unsafe { f(&mut view, reply_buf.as_mut_ptr(), reply_buf.len()) };
2896        if rc == NROS_RMW_RET_NO_DATA {
2897            return Ok(None);
2898        }
2899        if rc < 0 {
2900            self.pending_len = 0;
2901            return Err(error_from_ret(rc));
2902        }
2903        self.pending_len = 0;
2904        Ok(Some(rc as usize))
2905    }
2906
2907    fn server_available(&self) -> Result<bool, TransportError> {
2908        let Some(f) = self.vtable.service_server_available else {
2909            return Err(TransportError::Unsupported);
2910        };
2911        // SAFETY: `f` accepts a `*mut NrosRmwServiceClient`. We
2912        // construct a transient view from this client's fields the
2913        // same way `make_view` does, but on `&self` (no mutation
2914        // required for a graph probe). The borrowed pointers all
2915        // alias into `&self`, so the lifetime is bounded by the
2916        // call.
2917        let mut view = NrosRmwServiceClient {
2918            service_name: self.service_name_buf.as_ptr(),
2919            type_name: self.type_name_buf.as_ptr(),
2920            _reserved: [0u8; 8],
2921            backend_data: self.backend_data,
2922        };
2923        let rc = unsafe { f(&mut view) };
2924        match rc {
2925            0 => Ok(false),
2926            1 => Ok(true),
2927            n if n < 0 => Err(error_from_ret(n)),
2928            // Any positive value other than 1 is non-spec; treat as
2929            // "server available" — backends signalling availability
2930            // counts ≥ 1 still mean "ready".
2931            _ => Ok(true),
2932        }
2933    }
2934}
2935
2936impl Drop for CffiServiceClient {
2937    fn drop(&mut self) {
2938        if !self.backend_data.is_null() {
2939            let mut view = self.make_view();
2940            unsafe { (self.vtable.destroy_service_client)(&mut view) };
2941        }
2942    }
2943}
2944
2945// ============================================================================
2946// Factory
2947// ============================================================================
2948
2949/// RMW factory for the C function table backend.
2950#[derive(Default)]
2951pub struct CffiRmw;
2952
2953impl nros_rmw::Rmw for CffiRmw {
2954    type Session = CffiSession;
2955    type Error = TransportError;
2956
2957    fn open(self, config: &nros_rmw::RmwConfig) -> Result<CffiSession, TransportError> {
2958        let mode = match config.mode {
2959            nros_rmw::SessionMode::Client => 0u8,
2960            nros_rmw::SessionMode::Peer => 1u8,
2961        };
2962        CffiSession::open(config.locator, mode, config.domain_id, config.node_name)
2963    }
2964}
2965
2966impl CffiRmw {
2967    /// Phase 104.C.1 — open a session against a named backend.
2968    /// `rmw_name` selects an entry from the registry populated by
2969    /// `nros_rmw_cffi_register_named` (Phase 104.B.2).
2970    pub fn open_with_rmw(
2971        rmw_name: &str,
2972        config: &nros_rmw::RmwConfig,
2973    ) -> Result<CffiSession, TransportError> {
2974        let mode = match config.mode {
2975            nros_rmw::SessionMode::Client => 0u8,
2976            nros_rmw::SessionMode::Peer => 1u8,
2977        };
2978        CffiSession::open_named(
2979            rmw_name,
2980            config.locator,
2981            mode,
2982            config.domain_id,
2983            config.node_name,
2984        )
2985    }
2986}
2987
2988// ============================================================================
2989// Phase 102.5 — typed-struct roundtrip test
2990// ============================================================================
2991//
2992// Verifies the visible-struct contract end-to-end:
2993// 1. Runtime fills `topic_name` / `type_name` / `qos` before
2994//    `create_publisher`.
2995// 2. Backend's `create_publisher` writes `backend_data` and
2996//    `can_loan_messages` into the same struct.
2997// 3. Rust accessors (`CffiPublisher::topic_name()`, `qos()`,
2998//    `can_loan_messages()`) read back the values without any
2999//    vtable callback.
3000
3001#[cfg(test)]
3002#[allow(static_mut_refs)]
3003mod tests {
3004    use super::*;
3005    use nros_rmw::{Rmw, RmwConfig, Session, SessionMode, TopicInfo};
3006
3007    // Stub backend state. Statically allocated; the vtable's
3008    // `backend_data` round-trips a `&'static mut StubBackend`.
3009    static mut STUB_OPEN_CALLED: bool = false;
3010    static mut STUB_CREATE_PUB_CALLED: bool = false;
3011    static mut STUB_PUBLISH_CALLED: bool = false;
3012    static mut STUB_LAST_TOPIC_NAME: [u8; 64] = [0u8; 64];
3013    static mut STUB_LAST_TYPE_NAME: [u8; 64] = [0u8; 64];
3014    static mut STUB_LAST_QOS: NrosRmwQos = NrosRmwQos {
3015        reliability: 0,
3016        durability: 0,
3017        history: 0,
3018        liveliness_kind: 0,
3019        depth: 0,
3020        _reserved0: 0,
3021        deadline_ms: 0,
3022        lifespan_ms: 0,
3023        liveliness_lease_ms: 0,
3024        avoid_ros_namespace_conventions: 0,
3025        _reserved1: [0; 3],
3026        rx_buffer_hint: 0,
3027    };
3028
3029    /// Read a null-terminated `*const u8` into the supplied byte
3030    /// buffer. Used by the stub backend to capture the topic / type
3031    /// names that the runtime hands it.
3032    unsafe fn copy_cstr(src: *const u8, dst: &mut [u8]) {
3033        let mut i = 0;
3034        while i < dst.len() {
3035            let b = unsafe { *src.add(i) };
3036            dst[i] = b;
3037            if b == 0 {
3038                break;
3039            }
3040            i += 1;
3041        }
3042    }
3043
3044    unsafe extern "C" fn stub_open(
3045        _locator: *const u8,
3046        _mode: u8,
3047        _domain_id: u32,
3048        _node_name: *const u8,
3049        out: *mut NrosRmwSession,
3050    ) -> NrosRmwRet {
3051        unsafe {
3052            STUB_OPEN_CALLED = true;
3053            (*out).backend_data = 0xDEAD_BEEFusize as *mut c_void;
3054        }
3055        NROS_RMW_RET_OK
3056    }
3057
3058    unsafe extern "C" fn stub_close(_session: *mut NrosRmwSession) -> NrosRmwRet {
3059        NROS_RMW_RET_OK
3060    }
3061
3062    unsafe extern "C" fn stub_drive_io(
3063        _session: *mut NrosRmwSession,
3064        _timeout_ms: i32,
3065    ) -> NrosRmwRet {
3066        NROS_RMW_RET_OK
3067    }
3068
3069    unsafe extern "C" fn stub_create_publisher(
3070        _session: *mut NrosRmwSession,
3071        _topic_name: *const u8,
3072        _type_name: *const u8,
3073        _type_hash: *const u8,
3074        _domain_id: u32,
3075        qos: *const NrosRmwQos,
3076        out: *mut NrosRmwPublisher,
3077    ) -> NrosRmwRet {
3078        // Capture the typed-struct fields the runtime supplied.
3079        unsafe {
3080            STUB_CREATE_PUB_CALLED = true;
3081            copy_cstr((*out).topic_name, &mut STUB_LAST_TOPIC_NAME);
3082            copy_cstr((*out).type_name, &mut STUB_LAST_TYPE_NAME);
3083            STUB_LAST_QOS = *qos;
3084            (*out).backend_data = 0xCAFEusize as *mut c_void;
3085            (*out).can_loan_messages = true;
3086        }
3087        NROS_RMW_RET_OK
3088    }
3089
3090    unsafe extern "C" fn stub_destroy_publisher(_publisher: *mut NrosRmwPublisher) {}
3091
3092    unsafe extern "C" fn stub_publish_raw(
3093        publisher: *mut NrosRmwPublisher,
3094        _data: *const u8,
3095        _len: usize,
3096    ) -> NrosRmwRet {
3097        // Verify the runtime is still passing the same backend_data
3098        // and topic_name on every call.
3099        unsafe {
3100            STUB_PUBLISH_CALLED = true;
3101            assert_eq!((*publisher).backend_data as usize, 0xCAFE);
3102            let mut buf = [0u8; 64];
3103            copy_cstr((*publisher).topic_name, &mut buf);
3104            assert_eq!(&buf[..], &STUB_LAST_TOPIC_NAME);
3105        }
3106        NROS_RMW_RET_OK
3107    }
3108
3109    unsafe extern "C" fn stub_create_subscriber(
3110        _: *mut NrosRmwSession,
3111        _: *const u8,
3112        _: *const u8,
3113        _: *const u8,
3114        _: u32,
3115        _: *const NrosRmwQos,
3116        out: *mut NrosRmwSubscriber,
3117    ) -> NrosRmwRet {
3118        unsafe {
3119            (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3120        }
3121        NROS_RMW_RET_OK
3122    }
3123    unsafe extern "C" fn stub_destroy_subscriber(_: *mut NrosRmwSubscriber) {}
3124    unsafe extern "C" fn stub_try_recv_raw(_: *mut NrosRmwSubscriber, _: *mut u8, _: usize) -> i32 {
3125        0
3126    }
3127    unsafe extern "C" fn stub_has_data(_: *mut NrosRmwSubscriber) -> i32 {
3128        0
3129    }
3130
3131    unsafe extern "C" fn stub_create_service_server(
3132        _: *mut NrosRmwSession,
3133        _: *const u8,
3134        _: *const u8,
3135        _: *const u8,
3136        _: u32,
3137        _: *const NrosRmwQos,
3138        out: *mut NrosRmwServiceServer,
3139    ) -> NrosRmwRet {
3140        unsafe {
3141            (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3142        }
3143        NROS_RMW_RET_OK
3144    }
3145    unsafe extern "C" fn stub_destroy_service_server(_: *mut NrosRmwServiceServer) {}
3146    unsafe extern "C" fn stub_try_recv_request(
3147        _: *mut NrosRmwServiceServer,
3148        _: *mut u8,
3149        _: usize,
3150        _: *mut i64,
3151    ) -> i32 {
3152        NROS_RMW_RET_NO_DATA
3153    }
3154    unsafe extern "C" fn stub_has_request(_: *mut NrosRmwServiceServer) -> i32 {
3155        0
3156    }
3157    unsafe extern "C" fn stub_send_reply(
3158        _: *mut NrosRmwServiceServer,
3159        _: i64,
3160        _: *const u8,
3161        _: usize,
3162    ) -> NrosRmwRet {
3163        NROS_RMW_RET_OK
3164    }
3165
3166    unsafe extern "C" fn stub_create_service_client(
3167        _: *mut NrosRmwSession,
3168        _: *const u8,
3169        _: *const u8,
3170        _: *const u8,
3171        _: u32,
3172        _: *const NrosRmwQos,
3173        out: *mut NrosRmwServiceClient,
3174    ) -> NrosRmwRet {
3175        unsafe {
3176            (*out).backend_data = core::ptr::dangling_mut::<c_void>();
3177        }
3178        NROS_RMW_RET_OK
3179    }
3180    unsafe extern "C" fn stub_destroy_service_client(_: *mut NrosRmwServiceClient) {}
3181    unsafe extern "C" fn stub_call_raw(
3182        _: *mut NrosRmwServiceClient,
3183        _: *const u8,
3184        _: usize,
3185        _: *mut u8,
3186        _: usize,
3187    ) -> i32 {
3188        0
3189    }
3190
3191    unsafe extern "C" fn stub_register_subscriber_event(
3192        _: *mut NrosRmwSubscriber,
3193        _: NrosRmwEventKind,
3194        _: u32,
3195        _: NrosRmwEventCallback,
3196        _: *mut c_void,
3197    ) -> NrosRmwRet {
3198        NROS_RMW_RET_UNSUPPORTED
3199    }
3200    unsafe extern "C" fn stub_register_publisher_event(
3201        _: *mut NrosRmwPublisher,
3202        _: NrosRmwEventKind,
3203        _: u32,
3204        _: NrosRmwEventCallback,
3205        _: *mut c_void,
3206    ) -> NrosRmwRet {
3207        NROS_RMW_RET_UNSUPPORTED
3208    }
3209    unsafe extern "C" fn stub_assert_publisher_liveliness(_: *mut NrosRmwPublisher) -> NrosRmwRet {
3210        NROS_RMW_RET_UNSUPPORTED
3211    }
3212
3213    static STUB_VTABLE: NrosRmwVtable = NrosRmwVtable {
3214        open: stub_open,
3215        close: stub_close,
3216        drive_io: stub_drive_io,
3217        create_publisher: stub_create_publisher,
3218        destroy_publisher: stub_destroy_publisher,
3219        publish_raw: stub_publish_raw,
3220        create_subscriber: stub_create_subscriber,
3221        destroy_subscriber: stub_destroy_subscriber,
3222        try_recv_raw: stub_try_recv_raw,
3223        has_data: stub_has_data,
3224        create_service_server: stub_create_service_server,
3225        destroy_service_server: stub_destroy_service_server,
3226        try_recv_request: stub_try_recv_request,
3227        has_request: stub_has_request,
3228        send_reply: stub_send_reply,
3229        create_service_client: stub_create_service_client,
3230        destroy_service_client: stub_destroy_service_client,
3231        call_raw: stub_call_raw,
3232        send_request_raw: None,
3233        try_recv_reply_raw: None,
3234        register_subscriber_event: stub_register_subscriber_event,
3235        register_publisher_event: stub_register_publisher_event,
3236        assert_publisher_liveliness: stub_assert_publisher_liveliness,
3237        next_deadline_ms: None,
3238        set_wake_callback: None,
3239        pub_loan: None,
3240        pub_commit: None,
3241        pub_discard: None,
3242        sub_borrow: None,
3243        sub_release: None,
3244        service_server_available: None,
3245        try_recv_sequence: None,
3246        publish_streamed: None,
3247        ping_session: None,
3248        subscriber_supports_in_place: None,
3249        process_raw_in_place: None,
3250    };
3251
3252    #[test]
3253    fn service_server_no_data_maps_to_none() {
3254        use nros_rmw::ServiceServerTrait as _;
3255
3256        let mut server = CffiServiceServer {
3257            vtable: &STUB_VTABLE,
3258            service_name_buf: [0u8; NAME_BUF_LEN],
3259            type_name_buf: [0u8; NAME_BUF_LEN],
3260            backend_data: core::ptr::dangling_mut::<c_void>(),
3261        };
3262        let mut buf = [0u8; 16];
3263
3264        assert!(server.try_recv_request(&mut buf).unwrap().is_none());
3265    }
3266
3267    #[test]
3268    fn typed_struct_roundtrip() {
3269        // Register the stub vtable under its canonical name.
3270        let ret = unsafe { nros_rmw_cffi_register_named(c"default".as_ptr(), &STUB_VTABLE) };
3271        assert_eq!(ret, NROS_RMW_RET_OK);
3272
3273        // Open a session.
3274        let cfg = RmwConfig {
3275            mode: SessionMode::Client,
3276            locator: "tcp/127.0.0.1:7447",
3277            domain_id: 0,
3278            node_name: "test_node",
3279            namespace: "",
3280            properties: &[],
3281        };
3282        let mut session = Rmw::open(CffiRmw, &cfg).expect("session open");
3283        assert!(unsafe { STUB_OPEN_CALLED });
3284        assert_eq!(session.node_name(), "test_node");
3285
3286        // Create a publisher; verify backend received the typed
3287        // struct with topic_name + qos populated.
3288        let topic = TopicInfo::new("/chatter", "std_msgs/msg/Int32", "RIHS01_abc");
3289        let qos = nros_rmw::QosSettings::default();
3290        let publisher = session
3291            .create_publisher(&topic, qos)
3292            .expect("publisher create");
3293        assert!(unsafe { STUB_CREATE_PUB_CALLED });
3294        let topic_buf = unsafe { &STUB_LAST_TOPIC_NAME };
3295        assert_eq!(
3296            core::str::from_utf8(topic_buf)
3297                .unwrap_or("")
3298                .trim_end_matches('\0'),
3299            "/chatter"
3300        );
3301        let type_buf = unsafe { &STUB_LAST_TYPE_NAME };
3302        assert_eq!(
3303            core::str::from_utf8(type_buf)
3304                .unwrap_or("")
3305                .trim_end_matches('\0'),
3306            "std_msgs/msg/Int32"
3307        );
3308
3309        // Rust accessors read back the typed-struct fields.
3310        assert_eq!(publisher.topic_name(), "/chatter");
3311        assert_eq!(publisher.type_name(), "std_msgs/msg/Int32");
3312        assert!(publisher.can_loan_messages());
3313
3314        // Publish — verify backend_data round-trips correctly via
3315        // the typed view.
3316        use nros_rmw::Publisher as _;
3317        publisher.publish_raw(&[1u8, 2, 3]).expect("publish");
3318        assert!(unsafe { STUB_PUBLISH_CALLED });
3319    }
3320}