Skip to main content

Session

Trait Session 

Source
pub trait Session {
    type Error;
    type PublisherHandle;
    type SubscriberHandle;
    type ServiceServerHandle;
    type ServiceClientHandle;

    // Required methods
    fn create_publisher(
        &mut self,
        topic: &TopicInfo<'_>,
        qos: QosSettings,
    ) -> Result<Self::PublisherHandle, Self::Error>;
    fn create_subscriber(
        &mut self,
        topic: &TopicInfo<'_>,
        qos: QosSettings,
    ) -> Result<Self::SubscriberHandle, Self::Error>;
    fn create_service_server(
        &mut self,
        service: &ServiceInfo<'_>,
        qos: QosSettings,
    ) -> Result<Self::ServiceServerHandle, Self::Error>;
    fn create_service_client(
        &mut self,
        service: &ServiceInfo<'_>,
        qos: QosSettings,
    ) -> Result<Self::ServiceClientHandle, Self::Error>;
    fn close(&mut self) -> Result<(), Self::Error>;
    fn drive_io(&mut self, timeout_ms: i32) -> Result<(), Self::Error>;

    // Provided methods
    fn supported_qos_policies(&self) -> QosPolicyMask { ... }
    fn next_deadline_ms(&self) -> Option<u32> { ... }
    unsafe fn set_wake_callback(
        &mut self,
        cb: Option<unsafe extern "C" fn(*mut c_void)>,
        ctx: *mut c_void,
    ) { ... }
    fn supports_wake_callback(&self) -> bool { ... }
    fn ping_session(&mut self, timeout_ms: i32) -> Result<(), Self::Error>
       where Self::Error: From<TransportError> { ... }
}
Expand description

Transport session trait — the per-process anchor an RMW backend gives to the executor.

§Threading

&mut self on every method means the executor serialises all session calls onto a single thread. A backend may rely on this — no internal locking is required for create_* / close / drive_io. Publisher / subscriber / service handles created from the session, however, are typically used from worker threads and must carry their own synchronisation (see the Publisher / Subscriber trait docs).

§Calling pattern

  1. Open the session (backend-specific factory; not on this trait).
  2. create_* for every entity at startup. Creating entities mid- flight after drive_io has run is allowed but not common.
  3. The executor calls drive_io periodically. Worker threads publish / receive in parallel.
  4. close once at shutdown. Entities must be dropped first.

Required Associated Types§

Source

type Error

Error type for this session

Source

type PublisherHandle

Publisher handle type

Source

type SubscriberHandle

Subscriber handle type

Source

type ServiceServerHandle

Service server handle type

Source

type ServiceClientHandle

Service client handle type

Required Methods§

Source

fn create_publisher( &mut self, topic: &TopicInfo<'_>, qos: QosSettings, ) -> Result<Self::PublisherHandle, Self::Error>

Create a publisher bound to this session.

May allocate transport resources (zenoh declarations, DDS writers). Returns a handle that outlives the call but not the session — drop the handle before close().

Source

fn create_subscriber( &mut self, topic: &TopicInfo<'_>, qos: QosSettings, ) -> Result<Self::SubscriberHandle, Self::Error>

Create a subscriber bound to this session.

Subscribers may start receiving immediately after creation if the transport supports late-joining publishers. Late messages are buffered up to the QoS depth.

Source

fn create_service_server( &mut self, service: &ServiceInfo<'_>, qos: QosSettings, ) -> Result<Self::ServiceServerHandle, Self::Error>

Create a service server bound to this session. Replies are matched to requests by the sequence number returned from ServiceServerTrait::try_recv_request.

qos is applied to both the request and reply endpoints (a service is two DDS topics; rmw uses one profile for both). The default is QosSettings::services_default (RELIABLE+VOLATILE+KEEP_LAST(10)).

Source

fn create_service_client( &mut self, service: &ServiceInfo<'_>, qos: QosSettings, ) -> Result<Self::ServiceClientHandle, Self::Error>

Create a service client bound to this session.

qos is applied to both the request and reply endpoints (a service is two DDS topics; rmw uses one profile for both). The default is QosSettings::services_default (RELIABLE+VOLATILE+KEEP_LAST(10)).

Source

fn close(&mut self) -> Result<(), Self::Error>

Close the session, releasing transport resources. All entity handles created from this session must already be dropped.

Source

fn drive_io(&mut self, timeout_ms: i32) -> Result<(), Self::Error>

Drive transport I/O (poll network, dispatch callbacks).

Both zenoh-pico and XRCE-DDS are pull-based: they require the application to periodically call this method to read from the network socket and dispatch incoming messages to subscriber buffers.

timeout_ms is the maximum time to wait for data (0 = non-blocking; negative values mean “block indefinitely” — see Phase 84.D7 for the planned migration to core::time::Duration).

Required. There is no default body — both shipped backends (zenoh and XRCE) must drive I/O, and a silent no-op default was a trap for third-party implementers. If your backend genuinely receives data via OS callbacks (push-based) and has nothing to do here, return Ok(()) explicitly.

Provided Methods§

Source

fn supported_qos_policies(&self) -> QosPolicyMask

Phase 109 — report which QoS policies the active backend honours. The runtime validates requested QoS against this mask at entity-create time and returns TransportError::IncompatibleQos if the requested profile includes a policy the backend can’t enforce. No silent downgrade.

Default returns QosPolicyMask::CORE — reliability + durability VOLATILE + history + depth. Backends override per supported policy.

Source

fn next_deadline_ms(&self) -> Option<u32>

Phase 110.0 — backend’s next internal-event deadline in milliseconds from now (lease keepalive, heartbeat, reader ACK-NACK timeout, etc.).

The executor caps its drive_io timeout against min(user_timeout, timer_deadline, this) so quiet links don’t wake early, see no user-visible work, and round-trip back into drive_io. Returns None when the backend has no internal deadlines or chooses not to expose them.

Default None keeps existing backends working unchanged; opt-in per backend.

Source

unsafe fn set_wake_callback( &mut self, cb: Option<unsafe extern "C" fn(*mut c_void)>, ctx: *mut c_void, )

Phase 124.B.1 — install (or clear, when cb.is_none()) the executor wake callback. The runtime calls this once per session after open with cb pointing at a runtime-owned function and ctx pointing at the executor’s wake state. The backend stores (cb, ctx) in its per-session state and calls cb(ctx) whenever its transport notification path fires (datagram arrival, condvar wake, etc.) — the runtime cb does flag-write + condvar-signal atomically, so a spin_once blocked on the wake condvar resumes immediately instead of waiting for the next poll iteration.

§Safety

When cb is Some, ctx must remain valid until the callback is cleared or the session is closed. The backend may invoke cb(ctx) from its transport notification path.

Default body: ignore the call. Poll-only backends (XRCE, bare-metal) leave the default in place; the executor still drains them on its deadline-bound cv-wait boundary.

Source

fn supports_wake_callback(&self) -> bool

Phase 130.4 — does this backend actually honour set_wake_callback?

true means the backend installs the callback and will fire it from its async notify path (worker thread, ISR, signalfd, …). false (the default) means set_wake_callback was a no-op — the executor must drive I/O for the caller’s full timeout because no async wake will pre-empt it.

The executor uses this to choose between the wake-primitive wait (NodeWake::wait_ms / std::Condvar::wait_timeout_while) and a direct drive_io(timeout_ms). Poll-only backends (XRCE-DDS-Client, bare-metal smoltcp) return false; event-driven backends (zenoh-pico with an RX task that invokes the callback on packet arrival) return true.

Source

fn ping_session(&mut self, timeout_ms: i32) -> Result<(), Self::Error>
where Self::Error: From<TransportError>,

Phase 124.F.1 — session-level connectivity probe.

Sends a wire-level round-trip probe and waits up to timeout_ms. Ok(()) on reply, Err(TransportError::Timeout) on no reply, Err(TransportError::Unsupported) when the backend can’t probe (DDS without participant introspection). Lesson from micro-ROS’s rmw_uros_ping_agent.

Default body: Err(Unsupported). Backends with a native ping API (zenoh: z_send_ping; XRCE: uxr_ping_agent_session_until_timeout) opt in by overriding.

Implementations on Foreign Types§

Source§

impl Session for CffiSession

Source§

fn supported_qos_policies(&self) -> QosPolicyMask

Phase 115.K.2.5.1.2 — declare a permissive QoS-policy mask here so backends behind the cffi vtable don’t get rejected by the runtime’s pre-validate step before they ever see the create_publisher / create_subscriber call. The vtable doesn’t expose a per-backend policy mask yet; until it does, the cffi route has to assume the registered backend supports the union of every policy any nros-supported RMW honours. Backends that don’t support a policy MUST surface NROS_RMW_RET_INCOMPATIBLE_QOS from create_publisher etc. to keep the no-silent-degradation contract.

TODO 115.K.2.x: extend nros_rmw_vtable_t with a supported_qos_policies() callback so the runtime queries the backend instead of guessing.

Source§

type Error = TransportError

Source§

type PublisherHandle = CffiPublisher

Source§

type SubscriberHandle = CffiSubscriber

Source§

type ServiceServerHandle = CffiServiceServer

Source§

type ServiceClientHandle = CffiServiceClient

Source§

fn create_publisher( &mut self, topic: &TopicInfo<'_>, qos: QosSettings, ) -> Result<CffiPublisher, TransportError>

Source§

fn create_subscriber( &mut self, topic: &TopicInfo<'_>, qos: QosSettings, ) -> Result<CffiSubscriber, TransportError>

Source§

fn create_service_server( &mut self, service: &ServiceInfo<'_>, qos: QosSettings, ) -> Result<CffiServiceServer, TransportError>

Source§

fn create_service_client( &mut self, service: &ServiceInfo<'_>, qos: QosSettings, ) -> Result<CffiServiceClient, TransportError>

Source§

fn close(&mut self) -> Result<(), TransportError>

Source§

fn drive_io(&mut self, timeout_ms: i32) -> Result<(), TransportError>

Source§

fn next_deadline_ms(&self) -> Option<u32>

Source§

unsafe fn set_wake_callback( &mut self, cb: Option<unsafe extern "C" fn(*mut c_void)>, ctx: *mut c_void, )

Source§

fn supports_wake_callback(&self) -> bool

Source§

fn ping_session(&mut self, timeout_ms: i32) -> Result<(), TransportError>

Implementors§