10#ifndef NROS_CPP_SUBSCRIPTION_HPP
11#define NROS_CPP_SUBSCRIPTION_HPP
21#include "nros_cpp_ffi.h"
33 const char* type_name,
const char* type_hash,
36 void* context, uint8_t sched_context,
37 size_t* out_handle_id);
42 const uint8_t* attachment,
43 size_t attachment_len,
void* ctx);
46 const nros_cpp_node_t* node,
const char* topic,
const char* type_name,
const char* type_hash,
48 uint8_t sched_context,
size_t* out_handle_id);
194 View(
View&&
o) : sub_(
o.sub_), buf_(
o.buf_), len_(
o.len_), token_(
o.token_) {
215 size_t size()
const {
return len_; }
216 bool empty()
const {
return token_ ==
nullptr; }
220 : sub_(
sub), buf_(
buf), len_(len), token_(
token) {}
224 if (token_ && sub_) {
243 void*
token =
nullptr;
285 if (initialized_ && !stream_.
is_valid()) {
303 if (initialized_ && !callback_mode_) {
306 initialized_ =
false;
317 user_fn_ =
other.user_fn_;
318 user_fn_ctx_ =
other.user_fn_ctx_;
319 user_ctx_ =
other.user_ctx_;
320 callback_mode_ =
other.callback_mode_;
321 sched_handle_id_ =
other.sched_handle_id_;
322 if (
other.initialized_ && !
other.callback_mode_) {
324 ::memcpy(topic_name_,
other.topic_name_,
sizeof(topic_name_));
327 other.initialized_ =
false;
332 if (
this != &
other) {
333 if (initialized_ && !callback_mode_) {
337 initialized_ =
other.initialized_;
338 user_fn_ =
other.user_fn_;
339 user_fn_ctx_ =
other.user_fn_ctx_;
340 user_ctx_ =
other.user_ctx_;
341 callback_mode_ =
other.callback_mode_;
342 sched_handle_id_ =
other.sched_handle_id_;
343 if (
other.initialized_ && !
other.callback_mode_) {
345 ::memcpy(topic_name_,
other.topic_name_,
sizeof(topic_name_));
348 other.initialized_ =
false;
408 static void message_trampoline(
const uint8_t* data,
size_t len,
void*
ctx) {
410 if (
self ==
nullptr)
return;
412 if (M::ffi_deserialize(data, len, &
msg) != 0)
return;
413 if (
self->user_fn_ !=
nullptr) {
415 }
else if (
self->user_fn_ctx_ !=
nullptr) {
423 static void message_info_trampoline(
const uint8_t* data,
size_t len,
const uint8_t* attachment,
424 size_t attachment_len,
void* ctx) {
426 if (self ==
nullptr)
return;
428 if (M::ffi_deserialize(data, len, &msg) != 0)
return;
429 if (self->user_fn_info_ !=
nullptr) {
430 self->user_fn_info_(msg, attachment, attachment_len);
434 alignas(8) uint8_t storage_[NROS_SUBSCRIBER_SIZE];
442 size_t sched_handle_id_ =
static_cast<size_t>(-1);
448 void* user_ctx_ =
nullptr;
449 bool callback_mode_ =
false;
470 ffi_qos.liveliness_lease_ms =
qos.liveliness_lease_ms();
471 ffi_qos.avoid_ros_namespace_conventions =
qos.avoid_ros_namespace_conventions() ? 1 : 0;
482 out.initialized_ =
true;
503 if (!
r.ok())
return r;
509 executor_handle_,
out.sched_handle_id(),
static_cast<uint8_t>(
options.sched_context));
515 out.initialized_ =
false;
526template <
typename M,
typename F,
typename>
538 ffi_qos.liveliness_lease_ms =
qos.liveliness_lease_ms();
539 ffi_qos.avoid_ros_namespace_conventions =
qos.avoid_ros_namespace_conventions() ? 1 : 0;
544 out.user_fn_ctx_ =
nullptr;
545 out.user_ctx_ =
nullptr;
550 size_t handle =
static_cast<size_t>(-1);
555 out.sched_handle_id_ = handle;
556 out.callback_mode_ =
true;
557 out.initialized_ =
true;
566template <
typename M,
typename F,
typename>
578 ffi_qos.liveliness_lease_ms =
qos.liveliness_lease_ms();
579 ffi_qos.avoid_ros_namespace_conventions =
qos.avoid_ros_namespace_conventions() ? 1 : 0;
582 out.user_fn_ =
nullptr;
583 out.user_fn_ctx_ =
nullptr;
584 out.user_ctx_ =
nullptr;
589 size_t handle =
static_cast<size_t>(-1);
594 out.sched_handle_id_ = handle;
595 out.callback_mode_ =
true;
596 out.initialized_ =
true;
Definition result.hpp:160
ErrorCode error() const
Definition result.hpp:183
bool ok() const
Definition result.hpp:176
Result create_subscription(Subscription< M > &out, const char *topic, const QoS &qos=QoS::default_profile())
Definition subscription.hpp:460
Result create_subscription_with_info(Subscription< M > &out, const char *topic, F callback, const QoS &qos=QoS::default_profile(), const SubscriptionOptions &options={})
Definition subscription.hpp:567
static constexpr QoS default_profile()
Default profile: RELIABLE + VOLATILE + KEEP_LAST(10).
Definition qos.hpp:160
static constexpr Result success()
Named constructors.
Definition result.hpp:74
bool is_valid() const
Check if the stream is connected to a valid source.
Definition stream.hpp:105
Definition subscription.hpp:191
View & operator=(View &&o)
Definition subscription.hpp:198
View(const View &)=delete
View(View &&o)
Definition subscription.hpp:194
View()
Definition subscription.hpp:193
const uint8_t * data() const
Definition subscription.hpp:214
bool empty() const
Definition subscription.hpp:216
size_t size() const
Definition subscription.hpp:215
View(void *sub, const uint8_t *buf, size_t len, void *token)
Internal constructor — callers use Subscription::try_borrow().
Definition subscription.hpp:219
View & operator=(const View &)=delete
~View()
Definition subscription.hpp:212
Definition subscription.hpp:76
void(*)(const M &msg, const uint8_t *attachment, size_t attachment_len) TypedSubscriptionInfoFn
Definition subscription.hpp:85
Result on_liveliness_changed(nros_cpp_liveliness_changed_cb_t cb, void *user_context=nullptr)
Definition subscription.hpp:379
const char * get_topic_name() const
Get the topic name.
Definition subscription.hpp:275
Result on_requested_deadline_missed(uint32_t deadline_ms, nros_cpp_subscriber_count_cb_t cb, void *user_context=nullptr)
Register a callback for requested-deadline-missed events.
Definition subscription.hpp:386
Result try_recv_sequence(uint8_t *buf, size_t per_msg_cap, size_t max_msgs, size_t *out_lens, size_t &out_count)
Definition subscription.hpp:262
Subscription(Subscription &&other)
Definition subscription.hpp:316
Subscription()
Definition subscription.hpp:356
~Subscription()
Definition subscription.hpp:302
Expected< View > try_borrow()
Definition subscription.hpp:239
const Stream< M > & stream() const
Definition subscription.hpp:291
Stream< M > & stream()
Definition subscription.hpp:284
size_t sched_handle_id() const
Definition subscription.hpp:369
bool has_sched_handle() const
Definition subscription.hpp:368
Result try_recv(M &msg)
Definition subscription.hpp:97
Result on_message_lost(nros_cpp_subscriber_count_cb_t cb, void *user_context=nullptr)
Register a callback for message-lost events.
Definition subscription.hpp:394
Result try_recv_raw(uint8_t *buf, size_t capacity, size_t &out_len)
Definition subscription.hpp:145
void(*)(const M &msg, void *ctx) TypedSubscriptionFnWithCtx
Definition subscription.hpp:82
bool is_valid() const
Check if the subscription is initialized and valid.
Definition subscription.hpp:294
Subscription & operator=(Subscription &&other)
Definition subscription.hpp:331
Result try_recv_raw_with_attachment(uint8_t *buf, size_t capacity, size_t &out_len, uint8_t *att, size_t att_capacity, size_t &out_att_len)
Definition subscription.hpp:171
void(*)(const M &msg) TypedSubscriptionFn
Definition subscription.hpp:81
Result try_recv_validated(M &msg, nros_cpp_integrity_status_t &status)
Definition subscription.hpp:123
Inline storage-size macros for opaque entity buffers.
int nros_cpp_ret_t
Definition future.hpp:20
bool ok()
Check if the nros session is initialized.
Definition node.hpp:717
static constexpr size_t SUBSCRIPTION_TOPIC_NAME_MAX
Definition subscription.hpp:56
@ Error
Generic failure not covered by a more specific code.
@ TryAgain
Transient — no data ready yet (non-blocking take). Retry later.
nros::Node and global session helpers.
nros_cpp_qos_history_t
Definition qos.hpp:31
nros_cpp_qos_liveliness_t
Definition qos.hpp:35
nros_cpp_qos_durability_t
Definition qos.hpp:27
nros_cpp_qos_reliability_t
Definition qos.hpp:23
nros::Result, nros::ErrorCode, and the NROS_TRY macro.
nros::Stream<T> — multi-shot message receiver.
nros_cpp_ret_t nros_cpp_subscription_register(const nros_cpp_node_t *node, const char *topic, const char *type_name, const char *type_hash, nros_cpp_qos_t qos, nros_cpp_subscription_message_callback_t callback, void *context, uint8_t sched_context, size_t *out_handle_id)
void(* nros_cpp_subscription_message_info_callback_t)(const uint8_t *data, size_t len, const uint8_t *attachment, size_t attachment_len, void *ctx)
Definition subscription.hpp:41
void(* nros_cpp_subscription_message_callback_t)(const uint8_t *data, size_t len, void *ctx)
Definition subscription.hpp:29
nros_cpp_ret_t nros_cpp_subscription_register_with_info(const nros_cpp_node_t *node, const char *topic, const char *type_name, const char *type_hash, nros_cpp_qos_t qos, nros_cpp_subscription_message_info_callback_t callback, void *context, uint8_t sched_context, size_t *out_handle_id)