nros C++ API
Lightweight ROS 2 client for embedded real-time systems (C++ headers)
Loading...
Searching...
No Matches
subscription.hpp
Go to the documentation of this file.
1// nros-cpp: Subscription class
2// Freestanding C++ — no exceptions, no STL required
3
10#ifndef NROS_CPP_SUBSCRIPTION_HPP
11#define NROS_CPP_SUBSCRIPTION_HPP
12
13#include <cstdint>
14#include <cstddef>
15#include <string.h> // memcpy — `<cstring>` isn't in Zephyr's minimal libcpp
16
17#include "nros/config.hpp"
18#include "nros/result.hpp"
19#include "nros/stream.hpp"
20
21#include "nros_cpp_ffi.h"
22
23// Phase 189.M3.x — `nros_cpp_subscription_register` is excluded from cbindgen
24// (its Rust signature uses `RawSubscriptionCallback`, an external-crate type
25// alias cbindgen names without defining). Declare it locally with a plain
26// function-pointer typedef matching the ABI (`void(data, len, ctx)`), mirroring
27// the service.hpp callback-register treatment.
28extern "C" {
29typedef void (*nros_cpp_subscription_message_callback_t)(const uint8_t* data, size_t len,
30 void* ctx);
31
32nros_cpp_ret_t nros_cpp_subscription_register(const nros_cpp_node_t* node, const char* topic,
33 const char* type_name, const char* type_hash,
36 void* context, uint8_t sched_context,
37 size_t* out_handle_id);
38
39// Phase 189.M3.4 — callback-style register that also delivers the sample's wire
40// attachment (5-arg trampoline). Same cbindgen-exclusion reason as above.
41typedef void (*nros_cpp_subscription_message_info_callback_t)(const uint8_t* data, size_t len,
42 const uint8_t* attachment,
43 size_t attachment_len, void* ctx);
44
46 const nros_cpp_node_t* node, const char* topic, const char* type_name, const char* type_hash,
48 uint8_t sched_context, size_t* out_handle_id);
49} // extern "C"
50
51namespace nros {
52
56static constexpr size_t SUBSCRIPTION_TOPIC_NAME_MAX = 256;
57
76template <typename M> class Subscription {
77 public:
81 using TypedSubscriptionFn = void (*)(const M& msg);
82 using TypedSubscriptionFnWithCtx = void (*)(const M& msg, void* ctx);
83 // Phase 189.M3.4 — callback-with-attachment handler (`bridge_origin` etc.).
84 using TypedSubscriptionInfoFn = void (*)(const M& msg, const uint8_t* attachment,
85 size_t attachment_len);
86
98 if (!initialized_) return Result(ErrorCode::NotInitialized);
99 uint8_t buf[M::SERIALIZED_SIZE_MAX];
100 size_t len = 0;
101 nros_cpp_ret_t ret = nros_cpp_subscription_try_recv_raw(storage_, buf, sizeof(buf), &len);
102 if (ret != 0) return Result(ret);
103 if (len == 0) return Result(ErrorCode::TryAgain);
104 if (M::ffi_deserialize(buf, len, &msg) != 0) return Result(ErrorCode::Error);
105 return Result::success();
106 }
107
124 if (!initialized_) return Result(ErrorCode::NotInitialized);
125 uint8_t buf[M::SERIALIZED_SIZE_MAX];
126 size_t len = 0;
128 nros_cpp_subscription_try_recv_validated(storage_, buf, sizeof(buf), &len, &status);
129 if (ret != 0) return Result(ret);
130 if (len == 0) return Result(ErrorCode::TryAgain);
131 if (M::ffi_deserialize(buf, len, &msg) != 0) return Result(ErrorCode::Error);
132 return Result::success();
133 }
134
145 Result try_recv_raw(uint8_t* buf, size_t capacity, size_t& out_len) {
146 if (!initialized_) {
147 out_len = 0;
149 }
151 if (ret != 0) return Result(ret);
152 if (out_len == 0) return Result(ErrorCode::TryAgain);
153 return Result::success();
154 }
155
172 uint8_t* att, size_t att_capacity, size_t& out_att_len) {
173 if (!initialized_) {
174 out_len = 0;
175 out_att_len = 0;
177 }
179 storage_, buf, capacity, &out_len, att, att_capacity, &out_att_len);
180 if (ret != 0) return Result(ret);
181 if (out_len == 0) return Result(ErrorCode::TryAgain);
182 return Result::success();
183 }
184
185 // ====================================================================
186 // Phase 124.A.7 — zero-copy receive (borrow / release)
187 // ====================================================================
188
191 class View {
192 public:
193 View() : sub_(nullptr), buf_(nullptr), len_(0), token_(nullptr) {}
194 View(View&& o) : sub_(o.sub_), buf_(o.buf_), len_(o.len_), token_(o.token_) {
195 o.sub_ = nullptr;
196 o.token_ = nullptr;
197 }
199 if (this != &o) {
200 release();
201 sub_ = o.sub_;
202 buf_ = o.buf_;
203 len_ = o.len_;
204 token_ = o.token_;
205 o.sub_ = nullptr;
206 o.token_ = nullptr;
207 }
208 return *this;
209 }
210 View(const View&) = delete;
211 View& operator=(const View&) = delete;
212 ~View() { release(); }
213
214 const uint8_t* data() const { return buf_; }
215 size_t size() const { return len_; }
216 bool empty() const { return token_ == nullptr; }
217
219 View(void* sub, const uint8_t* buf, size_t len, void* token)
220 : sub_(sub), buf_(buf), len_(len), token_(token) {}
221
222 private:
223 void release() {
224 if (token_ && sub_) {
225 nros_cpp_subscription_release(sub_, token_);
226 token_ = nullptr;
227 }
228 }
229
230 void* sub_;
231 const uint8_t* buf_;
232 size_t len_;
233 void* token_;
234 };
235
240 if (!initialized_) return Expected<View>::error(Result(ErrorCode::NotInitialized));
241 const uint8_t* buf = nullptr;
242 size_t len = 0;
243 void* token = nullptr;
244 int32_t rc = nros_cpp_subscription_borrow(storage_, &buf, &len, &token);
245 if (rc < 0) return Expected<View>::error(Result(rc));
246 if (rc == 0) return Expected<View>::ok(View{});
247 return Expected<View>::ok(View{storage_, buf, len, token});
248 }
249
263 size_t& out_count) {
264 if (!initialized_) {
265 out_count = 0;
267 }
269 storage_, buf, per_msg_cap, max_msgs, out_lens, &out_count);
270 if (ret != 0) return Result(ret);
271 return Result::success();
272 }
273
275 const char* get_topic_name() const { return initialized_ ? topic_name_ : ""; }
276
285 if (initialized_ && !stream_.is_valid()) {
286 stream_.bind(storage_, &nros_cpp_subscription_try_recv_raw);
287 }
288 return stream_;
289 }
290
291 const Stream<M>& stream() const { return stream_; }
292
294 bool is_valid() const { return initialized_; }
295
303 if (initialized_ && !callback_mode_) {
305 }
306 initialized_ = false;
307 }
308
309 // Move semantics (non-copyable). Relocation goes through the
310 // `nros_cpp_subscription_relocate` runtime call (Phase 84.C1);
311 // the `stream_` is rebound to the new storage afterwards.
312 // A callback-style subscription must NOT be moved after register — the
313 // executor arena holds `this` as the trampoline context (Phase 189.M3.x);
314 // the move only transfers bookkeeping and leaves that pointer stale. The
315 // poll-style relocation path is unchanged.
316 Subscription(Subscription&& other) : initialized_(other.initialized_) {
317 user_fn_ = other.user_fn_;
318 user_fn_ctx_ = other.user_fn_ctx_;
319 user_ctx_ = other.user_ctx_;
320 callback_mode_ = other.callback_mode_;
321 sched_handle_id_ = other.sched_handle_id_;
322 if (other.initialized_ && !other.callback_mode_) {
323 nros_cpp_subscription_relocate(other.storage_, storage_);
324 ::memcpy(topic_name_, other.topic_name_, sizeof(topic_name_));
325 stream_.bind(storage_, &nros_cpp_subscription_try_recv_raw);
326 }
327 other.initialized_ = false;
328 other.stream_ = Stream<M>();
329 }
330
332 if (this != &other) {
333 if (initialized_ && !callback_mode_) {
335 stream_ = Stream<M>();
336 }
337 initialized_ = other.initialized_;
338 user_fn_ = other.user_fn_;
339 user_fn_ctx_ = other.user_fn_ctx_;
340 user_ctx_ = other.user_ctx_;
341 callback_mode_ = other.callback_mode_;
342 sched_handle_id_ = other.sched_handle_id_;
343 if (other.initialized_ && !other.callback_mode_) {
344 nros_cpp_subscription_relocate(other.storage_, storage_);
345 ::memcpy(topic_name_, other.topic_name_, sizeof(topic_name_));
346 stream_.bind(storage_, &nros_cpp_subscription_try_recv_raw);
347 }
348 other.initialized_ = false;
349 other.stream_ = Stream<M>();
350 }
351 return *this;
352 }
353
356 Subscription() : storage_(), topic_name_{}, initialized_(false), stream_() {}
357
368 bool has_sched_handle() const { return sched_handle_id_ != static_cast<size_t>(-1); }
369 size_t sched_handle_id() const { return sched_handle_id_; }
370
371 // ====================================================================
372 // Phase 108 — status events
373 // ====================================================================
374
384
387 void* user_context = nullptr) {
388 if (!initialized_) return Result(ErrorCode::NotInitialized);
390 user_context));
391 }
392
398
399 private:
400 Subscription(const Subscription&) = delete;
401 Subscription& operator=(const Subscription&) = delete;
402
403 friend class Node;
404
408 static void message_trampoline(const uint8_t* data, size_t len, void* ctx) {
409 auto* self = static_cast<Subscription*>(ctx);
410 if (self == nullptr) return;
411 M msg;
412 if (M::ffi_deserialize(data, len, &msg) != 0) return;
413 if (self->user_fn_ != nullptr) {
414 self->user_fn_(msg);
415 } else if (self->user_fn_ctx_ != nullptr) {
416 self->user_fn_ctx_(msg, self->user_ctx_);
417 }
418 }
419
423 static void message_info_trampoline(const uint8_t* data, size_t len, const uint8_t* attachment,
424 size_t attachment_len, void* ctx) {
425 auto* self = static_cast<Subscription*>(ctx);
426 if (self == nullptr) return;
427 M msg;
428 if (M::ffi_deserialize(data, len, &msg) != 0) return;
429 if (self->user_fn_info_ != nullptr) {
430 self->user_fn_info_(msg, attachment, attachment_len);
431 }
432 }
433
434 alignas(8) uint8_t storage_[NROS_SUBSCRIBER_SIZE];
435 char topic_name_[SUBSCRIPTION_TOPIC_NAME_MAX];
436 bool initialized_;
437 Stream<M> stream_;
438 // Phase 189.M3.1 — executor HandleId for sched-context binding, or
439 // SIZE_MAX (the default) when no bindable handle exists. The poll-style
440 // thin-wrapper create path leaves this unset (see has_sched_handle());
441 // the callback-style create (Phase 189.M3.x) stores the real arena handle.
442 size_t sched_handle_id_ = static_cast<size_t>(-1);
443 // Callback-style state (Phase 189.M3.x); unused in poll mode. The executor
444 // arena owns the subscriber + dispatches `message_trampoline` during spin.
445 TypedSubscriptionFn user_fn_ = nullptr;
446 TypedSubscriptionFnWithCtx user_fn_ctx_ = nullptr;
447 TypedSubscriptionInfoFn user_fn_info_ = nullptr;
448 void* user_ctx_ = nullptr;
449 bool callback_mode_ = false;
450};
451
452} // namespace nros
453
454// Phase 84.G8: out-of-line definition of Node::create_subscription<M>().
455#include "nros/node.hpp"
456
457namespace nros {
458
459template <typename M>
461 if (!initialized_) return Result(ErrorCode::NotInitialized);
463 ffi_qos.reliability = static_cast<nros_cpp_qos_reliability_t>(qos.reliability_raw());
464 ffi_qos.durability = static_cast<nros_cpp_qos_durability_t>(qos.durability_raw());
465 ffi_qos.history = static_cast<nros_cpp_qos_history_t>(qos.history_raw());
466 ffi_qos.liveliness_kind = static_cast<nros_cpp_qos_liveliness_t>(qos.liveliness_raw());
467 ffi_qos.depth = qos.depth();
468 ffi_qos.deadline_ms = qos.deadline_ms();
469 ffi_qos.lifespan_ms = qos.lifespan_ms();
470 ffi_qos.liveliness_lease_ms = qos.liveliness_lease_ms();
471 ffi_qos.avoid_ros_namespace_conventions = qos.avoid_ros_namespace_conventions() ? 1 : 0;
472 nros_cpp_ret_t ret = nros_cpp_subscription_create(&handle_, topic, M::TYPE_NAME, M::TYPE_HASH,
473 ffi_qos, out.storage_);
474 if (ret == 0) {
475 // Phase 87.6: topic name lives C++-side now.
476 size_t topic_len = 0;
477 while (topic[topic_len] != '\0' && topic_len + 1 < sizeof(out.topic_name_)) {
478 out.topic_name_[topic_len] = topic[topic_len];
479 ++topic_len;
480 }
481 out.topic_name_[topic_len] = '\0';
482 out.initialized_ = true;
483 }
484 return Result(ret);
485}
486
499template <typename M>
503 if (!r.ok()) return r;
504
505 // TODO(M3.4): honour options.message_info via the with-info arena path.
506
507 if (options.sched_context != SCHED_CONTEXT_UNSET && out.has_sched_handle()) {
509 executor_handle_, out.sched_handle_id(), static_cast<uint8_t>(options.sched_context));
510 if (bind != 0) {
511 // Roll back so the caller doesn't observe a half-configured
512 // entity. Destructor-on-out would also fire, but explicit
513 // teardown keeps the returned error authoritative.
515 out.initialized_ = false;
516 return Result(bind);
517 }
518 }
519 return Result::success();
520}
521
522// Phase 189.M3.x — callback-style (arena-registered) subscription. The arena
523// owns the subscriber + dispatches `out`'s message handler during spin_once, so
524// the handle is real and `options.sched_context` is functional. Mirrors the
525// callback-style `create_service` one entity over.
526template <typename M, typename F, typename>
528 const QoS& qos, const SubscriptionOptions& options) {
529 if (!initialized_) return Result(ErrorCode::NotInitialized);
531 ffi_qos.reliability = static_cast<nros_cpp_qos_reliability_t>(qos.reliability_raw());
532 ffi_qos.durability = static_cast<nros_cpp_qos_durability_t>(qos.durability_raw());
533 ffi_qos.history = static_cast<nros_cpp_qos_history_t>(qos.history_raw());
534 ffi_qos.liveliness_kind = static_cast<nros_cpp_qos_liveliness_t>(qos.liveliness_raw());
535 ffi_qos.depth = qos.depth();
536 ffi_qos.deadline_ms = qos.deadline_ms();
537 ffi_qos.lifespan_ms = qos.lifespan_ms();
538 ffi_qos.liveliness_lease_ms = qos.liveliness_lease_ms();
539 ffi_qos.avoid_ros_namespace_conventions = qos.avoid_ros_namespace_conventions() ? 1 : 0;
540
541 // Store the user handler (compile error if F isn't convertible to the
542 // plain-fn-ptr handler type).
544 out.user_fn_ctx_ = nullptr;
545 out.user_ctx_ = nullptr;
546
547 uint8_t sched = (options.sched_context == SCHED_CONTEXT_UNSET)
548 ? 0u
549 : static_cast<uint8_t>(options.sched_context);
550 size_t handle = static_cast<size_t>(-1);
552 nros_cpp_subscription_register(&handle_, topic, M::TYPE_NAME, M::TYPE_HASH, ffi_qos,
553 &Subscription<M>::message_trampoline, &out, sched, &handle);
554 if (ret == 0) {
555 out.sched_handle_id_ = handle;
556 out.callback_mode_ = true;
557 out.initialized_ = true;
558 }
559 return Result(ret);
560}
561
562// Phase 189.M3.4 — callback-style subscription that delivers the wire attachment.
563// Mirrors the callback `create_subscription` one step over, but stores the
564// `(const M&, attachment, att_len)` handler + registers via the with-info arena
565// path so the trampoline receives the attachment.
566template <typename M, typename F, typename>
568 const QoS& qos, const SubscriptionOptions& options) {
569 if (!initialized_) return Result(ErrorCode::NotInitialized);
571 ffi_qos.reliability = static_cast<nros_cpp_qos_reliability_t>(qos.reliability_raw());
572 ffi_qos.durability = static_cast<nros_cpp_qos_durability_t>(qos.durability_raw());
573 ffi_qos.history = static_cast<nros_cpp_qos_history_t>(qos.history_raw());
574 ffi_qos.liveliness_kind = static_cast<nros_cpp_qos_liveliness_t>(qos.liveliness_raw());
575 ffi_qos.depth = qos.depth();
576 ffi_qos.deadline_ms = qos.deadline_ms();
577 ffi_qos.lifespan_ms = qos.lifespan_ms();
578 ffi_qos.liveliness_lease_ms = qos.liveliness_lease_ms();
579 ffi_qos.avoid_ros_namespace_conventions = qos.avoid_ros_namespace_conventions() ? 1 : 0;
580
581 out.user_fn_info_ = typename Subscription<M>::TypedSubscriptionInfoFn(callback);
582 out.user_fn_ = nullptr;
583 out.user_fn_ctx_ = nullptr;
584 out.user_ctx_ = nullptr;
585
586 uint8_t sched = (options.sched_context == SCHED_CONTEXT_UNSET)
587 ? 0u
588 : static_cast<uint8_t>(options.sched_context);
589 size_t handle = static_cast<size_t>(-1);
591 &handle_, topic, M::TYPE_NAME, M::TYPE_HASH, ffi_qos,
593 if (ret == 0) {
594 out.sched_handle_id_ = handle;
595 out.callback_mode_ = true;
596 out.initialized_ = true;
597 }
598 return Result(ret);
599}
600
604template <typename M>
605inline Expected<Subscription<M>> make_subscription(Node& node, const char* topic,
606 const QoS& qos = QoS::default_profile()) {
608 Result r = node.create_subscription<M>(s, topic, qos);
609 if (!r.ok()) return Expected<Subscription<M>>::error(r);
610 return Expected<Subscription<M>>::ok(std::move(s));
611}
612
613} // namespace nros
614
615#endif // NROS_CPP_SUBSCRIPTION_HPP
Definition result.hpp:160
ErrorCode error() const
Definition result.hpp:183
bool ok() const
Definition result.hpp:176
Definition future.hpp:40
Definition node.hpp:158
Result create_subscription(Subscription< M > &out, const char *topic, const QoS &qos=QoS::default_profile())
Definition subscription.hpp:460
Result create_subscription_with_info(Subscription< M > &out, const char *topic, F callback, const QoS &qos=QoS::default_profile(), const SubscriptionOptions &options={})
Definition subscription.hpp:567
Definition qos.hpp:67
static constexpr QoS default_profile()
Default profile: RELIABLE + VOLATILE + KEEP_LAST(10).
Definition qos.hpp:160
Definition result.hpp:52
static constexpr Result success()
Named constructors.
Definition result.hpp:74
bool is_valid() const
Check if the stream is connected to a valid source.
Definition stream.hpp:105
Definition subscription.hpp:191
View & operator=(View &&o)
Definition subscription.hpp:198
View(const View &)=delete
View(View &&o)
Definition subscription.hpp:194
View()
Definition subscription.hpp:193
const uint8_t * data() const
Definition subscription.hpp:214
bool empty() const
Definition subscription.hpp:216
size_t size() const
Definition subscription.hpp:215
View(void *sub, const uint8_t *buf, size_t len, void *token)
Internal constructor — callers use Subscription::try_borrow().
Definition subscription.hpp:219
View & operator=(const View &)=delete
~View()
Definition subscription.hpp:212
Definition subscription.hpp:76
void(*)(const M &msg, const uint8_t *attachment, size_t attachment_len) TypedSubscriptionInfoFn
Definition subscription.hpp:85
Result on_liveliness_changed(nros_cpp_liveliness_changed_cb_t cb, void *user_context=nullptr)
Definition subscription.hpp:379
const char * get_topic_name() const
Get the topic name.
Definition subscription.hpp:275
Result on_requested_deadline_missed(uint32_t deadline_ms, nros_cpp_subscriber_count_cb_t cb, void *user_context=nullptr)
Register a callback for requested-deadline-missed events.
Definition subscription.hpp:386
Result try_recv_sequence(uint8_t *buf, size_t per_msg_cap, size_t max_msgs, size_t *out_lens, size_t &out_count)
Definition subscription.hpp:262
Subscription(Subscription &&other)
Definition subscription.hpp:316
Subscription()
Definition subscription.hpp:356
~Subscription()
Definition subscription.hpp:302
Expected< View > try_borrow()
Definition subscription.hpp:239
const Stream< M > & stream() const
Definition subscription.hpp:291
Stream< M > & stream()
Definition subscription.hpp:284
size_t sched_handle_id() const
Definition subscription.hpp:369
bool has_sched_handle() const
Definition subscription.hpp:368
Result try_recv(M &msg)
Definition subscription.hpp:97
Result on_message_lost(nros_cpp_subscriber_count_cb_t cb, void *user_context=nullptr)
Register a callback for message-lost events.
Definition subscription.hpp:394
Result try_recv_raw(uint8_t *buf, size_t capacity, size_t &out_len)
Definition subscription.hpp:145
void(*)(const M &msg, void *ctx) TypedSubscriptionFnWithCtx
Definition subscription.hpp:82
bool is_valid() const
Check if the subscription is initialized and valid.
Definition subscription.hpp:294
Subscription & operator=(Subscription &&other)
Definition subscription.hpp:331
Result try_recv_raw_with_attachment(uint8_t *buf, size_t capacity, size_t &out_len, uint8_t *att, size_t att_capacity, size_t &out_att_len)
Definition subscription.hpp:171
void(*)(const M &msg) TypedSubscriptionFn
Definition subscription.hpp:81
Result try_recv_validated(M &msg, nros_cpp_integrity_status_t &status)
Definition subscription.hpp:123
Inline storage-size macros for opaque entity buffers.
int nros_cpp_ret_t
Definition future.hpp:20
Definition nros.hpp:42
bool ok()
Check if the nros session is initialized.
Definition node.hpp:717
static constexpr size_t SUBSCRIPTION_TOPIC_NAME_MAX
Definition subscription.hpp:56
@ Error
Generic failure not covered by a more specific code.
@ TryAgain
Transient — no data ready yet (non-blocking take). Retry later.
nros::Node and global session helpers.
nros_cpp_qos_history_t
Definition qos.hpp:31
nros_cpp_qos_liveliness_t
Definition qos.hpp:35
nros_cpp_qos_durability_t
Definition qos.hpp:27
nros_cpp_qos_reliability_t
Definition qos.hpp:23
nros::Result, nros::ErrorCode, and the NROS_TRY macro.
nros::Stream<T> — multi-shot message receiver.
Definition qos.hpp:41
nros_cpp_ret_t nros_cpp_subscription_register(const nros_cpp_node_t *node, const char *topic, const char *type_name, const char *type_hash, nros_cpp_qos_t qos, nros_cpp_subscription_message_callback_t callback, void *context, uint8_t sched_context, size_t *out_handle_id)
void(* nros_cpp_subscription_message_info_callback_t)(const uint8_t *data, size_t len, const uint8_t *attachment, size_t attachment_len, void *ctx)
Definition subscription.hpp:41
void(* nros_cpp_subscription_message_callback_t)(const uint8_t *data, size_t len, void *ctx)
Definition subscription.hpp:29
nros_cpp_ret_t nros_cpp_subscription_register_with_info(const nros_cpp_node_t *node, const char *topic, const char *type_name, const char *type_hash, nros_cpp_qos_t qos, nros_cpp_subscription_message_info_callback_t callback, void *context, uint8_t sched_context, size_t *out_handle_id)