Skip to main content

Subscriber

Trait Subscriber 

Source
pub trait Subscriber {
    type Error;

Show 14 methods // Required methods fn try_recv_raw( &mut self, buf: &mut [u8], ) -> Result<Option<usize>, Self::Error>; fn deserialization_error(&self) -> Self::Error; // Provided methods fn has_data(&self) -> bool { ... } fn try_recv_raw_with_attachment( &mut self, buf: &mut [u8], _att_buf: &mut [u8], ) -> Result<Option<(usize, usize)>, Self::Error> { ... } fn try_recv_sequence( &mut self, buf: &mut [u8], per_msg_cap: usize, max_msgs: usize, out_lens: &mut [usize], ) -> Result<usize, Self::Error> { ... } fn try_recv<M>(&mut self, buf: &mut [u8]) -> Result<Option<M>, Self::Error> where M: RosMessage { ... } fn process_raw_in_place( &mut self, f: impl FnOnce(&[u8]), ) -> Result<bool, Self::Error> where Self::Error: From<TransportError> { ... } fn supports_process_in_place(&self) -> bool { ... } fn process_raw_in_place_with_info( &mut self, f: impl FnOnce(&[u8], Option<MessageInfo>), ) -> Result<bool, Self::Error> where Self::Error: From<TransportError> { ... } fn try_recv_raw_with_info( &mut self, buf: &mut [u8], ) -> Result<Option<(usize, Option<MessageInfo>)>, Self::Error> { ... } fn register_waker(&self, _waker: &Waker) { ... } fn supports_event(&self, _kind: EventKind) -> bool { ... } unsafe fn register_event_callback( &mut self, _kind: EventKind, _deadline_ms: u32, _cb: unsafe extern "C" fn(EventKind, *const c_void, *mut c_void), _user_ctx: *mut c_void, ) -> Result<(), Self::Error> { ... } fn unsupported_event_error(&self) -> Self::Error { ... }
}
Expand description

Subscriber trait for receiving messages.

§Threading

&mut self on try_recv_raw — the executor takes exclusive ownership of the subscriber for the duration of a receive. A backend that wants to allow concurrent receives must split into per-thread sub-handles internally.

§Buffer ownership

buf is caller-owned. The implementation copies the next ready message into buf and returns the byte count. The caller may re-use or drop buf immediately after the call.

§Blocking

try_recv_raw is non-blocking: returns Ok(None) (or equivalent for backends that map empty into a zero-length read) when no message is ready. Use Session::drive_io to wait for data; never sleep inside try_recv_raw.

Required Associated Types§

Source

type Error

Error type for receive operations

Required Methods§

Source

fn try_recv_raw(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error>

Try to receive one message into buf.

Non-blocking. On success returns Ok(Some(len)) where len is the byte count written into buf[..len]. Returns Ok(None) if no message is ready. If buf is too small the backend may either truncate (and document it) or return an error (preferred).

Source

fn deserialization_error(&self) -> Self::Error

Return a deserialization error (implementation specific)

Provided Methods§

Source

fn has_data(&self) -> bool

Check if data is available without consuming it.

Non-destructive — does not advance the receive cursor. Conservative default returns true (always assume data may be available); backends should override with a real check to avoid spurious receive attempts.

Source

fn try_recv_raw_with_attachment( &mut self, buf: &mut [u8], _att_buf: &mut [u8], ) -> Result<Option<(usize, usize)>, Self::Error>

Phase 128.F.4 — receive with attachment bytes alongside the payload.

On success returns Ok(Some((payload_len, attachment_len))) with the payload written into buf[..payload_len] and the attachment (if any) written into att_buf[..attachment_len]. attachment_len == 0 means the incoming sample carried no attachment.

Default body falls back to try_recv_raw and reports a 0-length attachment. Backends with native attachment support override to populate att_buf. Cross-RMW bridges use the attachment to read the bridge_origin tag stamped by the sending side.

Source

fn try_recv_sequence( &mut self, buf: &mut [u8], per_msg_cap: usize, max_msgs: usize, out_lens: &mut [usize], ) -> Result<usize, Self::Error>

Phase 124.D.1 — burst-take.

Drain up to max_msgs queued samples into the contiguous buf block in one call, with the i-th sample at buf[i * per_msg_cap .. i * per_msg_cap + out_lens[i]]. Returns the number of messages actually delivered. Partial drains MUST report the count, not error out.

Default body loop-drives try_recv_raw so callers can commit to the batched API regardless of backend support. Concrete backends opt in by overriding with a native batch take (zenoh queue drain, dds_take(max_samples)).

Source

fn try_recv<M>(&mut self, buf: &mut [u8]) -> Result<Option<M>, Self::Error>
where M: RosMessage,

Try to receive a typed message (non-blocking)

Source

fn process_raw_in_place( &mut self, f: impl FnOnce(&[u8]), ) -> Result<bool, Self::Error>
where Self::Error: From<TransportError>,

Process the received message in-place without copying.

Calls f with a reference to the raw CDR bytes in the subscriber’s internal receive buffer, avoiding a copy into a caller-provided buffer. While f executes the buffer is exclusively borrowed — any messages arriving from the transport during that time are dropped to prevent data races.

Returns Ok(true) if a message was available and f was called, Ok(false) if no message was available.

Default body: returns Err(MessageTooLarge) — the old default silently truncated anything larger than 1 KB into a stack buffer, which broke large messages with no diagnostic. Backends must override this with a real zero-copy path if they advertise support for process_raw_in_place; callers that hit the default should use try_recv_raw with a caller-sized buffer instead.

Source

fn supports_process_in_place(&self) -> bool

Whether this backend implements the in-place dispatch methods (process_raw_in_place / process_raw_in_place_with_info) with a real zero-copy borrow.

The executor consults this at subscription registration to choose the in-place arena dispatch (borrow + deserialize from the backend slot, no arena buffer) over the buffered dispatch (copy into an arena buffer first). Backends that leave the in-place methods at their unsupported default return false (the default) and keep the buffered path. (RFC-0038, Phase 231 Wave 0.2.)

Source

fn process_raw_in_place_with_info( &mut self, f: impl FnOnce(&[u8], Option<MessageInfo>), ) -> Result<bool, Self::Error>
where Self::Error: From<TransportError>,

In-place processing variant that also surfaces publisher metadata.

Same borrow contract as process_raw_in_place: f receives the raw CDR bytes plus the parsed MessageInfo — the co-located attachment (publisher GID / sequence / source timestamp), or None when no attachment was present — for the duration of the call; the slot is released after f returns. Ok(true) = a message was available and f was called; Ok(false) = none ready.

Default body: returns the unsupported error (mirrors process_raw_in_place). Backends that advertise in-place support override this with a real zero-copy path; callers that hit the default should use the buffered try_recv_raw_with_info path instead. (RFC-0038, Phase 231 Wave 0.1.)

Source

fn try_recv_raw_with_info( &mut self, buf: &mut [u8], ) -> Result<Option<(usize, Option<MessageInfo>)>, Self::Error>

Try to receive raw data along with publisher metadata.

When available, MessageInfo contains the publisher’s GID (Global Identifier) and source timestamp, extracted from a transport-level attachment on the incoming message.

Returns Ok(Some((len, info))) if data is available, where:

  • len is the number of bytes written to the buffer
  • info is the parsed publisher metadata (if attachment was present)

Default: delegates to try_recv_raw with no info.

Source

fn register_waker(&self, _waker: &Waker)

Register an async waker to be notified when data arrives.

Called from Future::poll() implementations to store the waker. The transport backend calls waker.wake() from its receive callback when new data is available, enabling event-driven async without busy-polling.

Default: no-op (backends that don’t support waking simply ignore this).

Source

fn supports_event(&self, _kind: EventKind) -> bool

Phase 108 — true if the backend can generate this event for this subscriber. Default returns false; backends override per supported event kind.

Subscriber-side event kinds: EventKind::LivelinessChanged, EventKind::RequestedDeadlineMissed, EventKind::MessageLost. Publisher kinds always return false here.

Source

unsafe fn register_event_callback( &mut self, _kind: EventKind, _deadline_ms: u32, _cb: unsafe extern "C" fn(EventKind, *const c_void, *mut c_void), _user_ctx: *mut c_void, ) -> Result<(), Self::Error>

Phase 108 — register a callback fired when the named status event occurs. deadline_ms applies to EventKind::RequestedDeadlineMissed only; ignored otherwise. Default impl returns the backend’s “unsupported”-shaped error.

§Safety

cb and user_ctx must remain valid for the entity’s lifetime. Caller (typically nros-node’s typed wrapper) is responsible for keeping the closure / context arena alive.

Source

fn unsupported_event_error(&self) -> Self::Error

Phase 108 — backend’s error variant for “this event kind is not supported.” Default reuses deserialization_error() for backends that don’t have a distinct Unsupported mapping.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementations on Foreign Types§

Source§

impl Subscriber for CffiSubscriber

Source§

type Error = TransportError

Source§

fn supports_process_in_place(&self) -> bool

Source§

fn process_raw_in_place( &mut self, f: impl FnOnce(&[u8]), ) -> Result<bool, <CffiSubscriber as Subscriber>::Error>

Source§

fn has_data(&self) -> bool

Source§

fn try_recv_raw( &mut self, buf: &mut [u8], ) -> Result<Option<usize>, TransportError>

Source§

fn try_recv_raw_with_info( &mut self, buf: &mut [u8], ) -> Result<Option<(usize, Option<MessageInfo>)>, TransportError>

Source§

fn try_recv_sequence( &mut self, buf: &mut [u8], per_msg_cap: usize, max_msgs: usize, out_lens: &mut [usize], ) -> Result<usize, TransportError>

Source§

fn deserialization_error(&self) -> TransportError

Source§

fn unsupported_event_error(&self) -> TransportError

Source§

unsafe fn register_event_callback( &mut self, kind: EventKind, deadline_ms: u32, cb: unsafe extern "C" fn(EventKind, *const c_void, *mut c_void), user_ctx: *mut c_void, ) -> Result<(), TransportError>

Implementors§