nros C++ API
Lightweight ROS 2 client for embedded real-time systems (C++ headers)
Loading...
Searching...
No Matches
stream.hpp
Go to the documentation of this file.
1// nros-cpp: Stream<T> -- multi-shot message receiver
2// Freestanding C++ -- no exceptions, no STL required
3
10#ifndef NROS_CPP_STREAM_HPP
11#define NROS_CPP_STREAM_HPP
12
13#include <cstdint>
14#include <cstddef>
15
16#include "nros/result.hpp"
17
18// FFI declarations
19extern "C" {
20typedef int nros_cpp_ret_t;
21nros_cpp_ret_t nros_cpp_spin_once(void* handle, int32_t timeout_ms);
25uint64_t nros_cpp_time_ns(void);
26}
27
28namespace nros {
29
42template <typename T> class Stream {
43 public:
53 if (!try_recv_fn_) return Result(ErrorCode::NotInitialized);
54 uint8_t buf[T::SERIALIZED_SIZE_MAX];
55 size_t len = 0;
56 nros_cpp_ret_t ret = try_recv_fn_(storage_, buf, sizeof(buf), &len);
57 if (ret != 0) return Result(ret);
58 if (len == 0) return Result(ErrorCode::TryAgain);
59 if (T::ffi_deserialize(buf, len, &out) != 0) return Result(ErrorCode::Error);
60 return Result::success();
61 }
62
73 Result wait_next(void* executor_handle, uint32_t timeout_ms, T& out, uint32_t poll_ms = 10) {
74 if (!try_recv_fn_) return Result(ErrorCode::NotInitialized);
75 if (poll_ms == 0) poll_ms = 1;
76 // Phase 118.C: budget by wall-clock. Accumulating `step` per
77 // iteration breaks when `zpico_spin_once` returns early on a
78 // signaled condvar (keep-alives, discovery gossip) — the
79 // iteration loop collapses into milliseconds and returns
80 // Timeout before the message has a chance to land. Mirrors the
81 // Phase 89.2 fix on Future::wait().
83 const uint64_t budget_ns = static_cast<uint64_t>(timeout_ms) * 1000000ULL;
84 while (true) {
85 nros_cpp_ret_t ret = nros_cpp_spin_once(executor_handle, static_cast<int32_t>(poll_ms));
86 // Transient conditions: keep polling. Anything else propagates.
87 if (ret != 0 && ret != static_cast<nros_cpp_ret_t>(ErrorCode::Timeout) &&
88 ret != static_cast<nros_cpp_ret_t>(ErrorCode::TryAgain)) {
89 return Result(ret);
90 }
92 if (rn.ok()) return Result::success();
93 // TryAgain / NotInitialized / Error from try_next: keep polling
94 // unless we've hit a hard error that's not "no data yet".
95 if (rn.code() != ErrorCode::TryAgain && rn.code() != ErrorCode::NotInitialized) {
96 return rn;
97 }
99 if (now_ns - start_ns >= budget_ns) break;
100 }
102 }
103
105 bool is_valid() const { return try_recv_fn_ != nullptr; }
106
107 // Move semantics (non-copyable)
108 Stream(Stream&& other) noexcept : storage_(other.storage_), try_recv_fn_(other.try_recv_fn_) {
109 other.storage_ = nullptr;
110 other.try_recv_fn_ = nullptr;
111 }
112
114 if (this != &other) {
115 storage_ = other.storage_;
116 try_recv_fn_ = other.try_recv_fn_;
117 other.storage_ = nullptr;
118 other.try_recv_fn_ = nullptr;
119 }
120 return *this;
121 }
122
124 Stream() : storage_(nullptr), try_recv_fn_(nullptr) {}
125
126 private:
127 Stream(const Stream&) = delete;
128 Stream& operator=(const Stream&) = delete;
129
130 template <typename M> friend class Subscription;
131 template <typename A> friend class ActionClient;
132
133 using TryRecvFn = nros_cpp_ret_t (*)(void*, uint8_t*, size_t, size_t*);
134
135 Stream(void* storage, TryRecvFn fn) : storage_(storage), try_recv_fn_(fn) {}
136
137 void bind(void* storage, TryRecvFn fn) {
138 storage_ = storage;
139 try_recv_fn_ = fn;
140 }
141
142 void* storage_;
143 TryRecvFn try_recv_fn_;
144};
145
146} // namespace nros
147#endif // NROS_CPP_STREAM_HPP
Definition action_client.hpp:67
Definition future.hpp:40
Definition result.hpp:52
static constexpr Result success()
Named constructors.
Definition result.hpp:74
Definition stream.hpp:42
Result wait_next(void *executor_handle, uint32_t timeout_ms, T &out, uint32_t poll_ms=10)
Definition stream.hpp:73
bool is_valid() const
Check if the stream is connected to a valid source.
Definition stream.hpp:105
Stream(Stream &&other) noexcept
Definition stream.hpp:108
Stream & operator=(Stream &&other) noexcept
Definition stream.hpp:113
Stream()
Default constructor – creates an unbound stream.
Definition stream.hpp:124
Result try_next(T &out)
Definition stream.hpp:52
Definition subscription.hpp:76
int nros_cpp_ret_t
Definition future.hpp:20
nros_cpp_ret_t nros_cpp_spin_once(void *handle, int32_t timeout_ms)
uint64_t nros_cpp_time_ns(void)
Definition nros.hpp:42
@ Error
Generic failure not covered by a more specific code.
@ Timeout
Operation deadline elapsed before completion.
@ TryAgain
Transient — no data ready yet (non-blocking take). Retry later.
nros::Result, nros::ErrorCode, and the NROS_TRY macro.
int nros_cpp_ret_t
Definition stream.hpp:20
nros_cpp_ret_t nros_cpp_spin_once(void *handle, int32_t timeout_ms)
uint64_t nros_cpp_time_ns(void)