Skip to main content

nros_node/
node.rs

1//! Node implementation
2
3use heapless::Vec;
4use nros_core::RosMessage;
5use nros_rmw::{QosSettings, TopicInfo};
6
7use crate::{publisher::PublisherHandle, subscriber::SubscriberHandle};
8
9/// Node configuration
10#[derive(Debug, Clone)]
11pub struct NodeConfig<'a> {
12    /// Node name
13    pub name: &'a str,
14    /// Node namespace
15    pub namespace: &'a str,
16    /// Domain ID (default: 0)
17    pub domain_id: u32,
18}
19
20impl<'a> NodeConfig<'a> {
21    /// Create a new node configuration
22    pub const fn new(name: &'a str, namespace: &'a str) -> Self {
23        Self {
24            name,
25            namespace,
26            domain_id: 0,
27        }
28    }
29
30    /// Set the domain ID
31    pub const fn with_domain(mut self, domain_id: u32) -> Self {
32        self.domain_id = domain_id;
33        self
34    }
35}
36
37impl Default for NodeConfig<'_> {
38    fn default() -> Self {
39        Self::new("nros_node", "/")
40    }
41}
42
43/// Node error types
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum NodeError {
46    /// Maximum publishers reached
47    MaxPublishersReached,
48    /// Maximum subscribers reached
49    MaxSubscribersReached,
50    /// Invalid publisher handle
51    InvalidPublisherHandle,
52    /// Invalid subscriber handle
53    InvalidSubscriberHandle,
54    /// Serialization failed
55    SerializationFailed,
56    /// Deserialization failed
57    DeserializationFailed,
58    /// Buffer too small
59    BufferTooSmall,
60    /// Transport error
61    TransportError,
62    /// Not connected
63    NotConnected,
64    /// A topic/endpoint name exceeded the bounded string capacity (Phase 192.1 —
65    /// surfaced instead of silently truncating, which would corrupt the wire key).
66    TopicNameTooLong,
67    /// A node name exceeded the bounded string capacity.
68    NameTooLong,
69    /// A node namespace exceeded the bounded string capacity.
70    NamespaceTooLong,
71}
72
73/// Default serialization (tx) / reception (rx) buffer length for a `Node`.
74/// Phase 192.5 — names the inline `1024` (was duplicated at the field decl + the
75/// `Node::new` initializer).
76pub const NODE_TX_BUF_LEN: usize = 1024;
77/// Default reception buffer length for a `Node`.
78pub const NODE_RX_BUF_LEN: usize = 1024;
79
80/// Publisher registration info
81#[derive(Debug, Clone)]
82#[allow(dead_code)] // Fields used when transport is connected
83struct PublisherInfo {
84    /// Topic name
85    topic_name: heapless::String<64>,
86    /// Type name
87    type_name: &'static str,
88    /// Type hash
89    type_hash: &'static str,
90    /// QoS settings
91    qos: QosSettings,
92    /// Active flag
93    active: bool,
94}
95
96/// Subscriber registration info
97#[derive(Debug, Clone)]
98#[allow(dead_code)] // Fields used when transport is connected
99struct SubscriberInfo {
100    /// Topic name
101    topic_name: heapless::String<64>,
102    /// Type name
103    type_name: &'static str,
104    /// Type hash
105    type_hash: &'static str,
106    /// QoS settings
107    qos: QosSettings,
108    /// Active flag
109    active: bool,
110}
111
112/// ROS 2 Node for embedded systems
113///
114/// The node manages publishers and subscribers with static allocation.
115/// `MAX_PUBS` and `MAX_SUBS` define the maximum number of publishers
116/// and subscribers that can be created.
117///
118/// # Type Parameters
119///
120/// - `MAX_PUBS`: Maximum number of publishers
121/// - `MAX_SUBS`: Maximum number of subscribers
122pub struct Node<const MAX_PUBS: usize = 8, const MAX_SUBS: usize = 8> {
123    /// Node name
124    name: heapless::String<64>,
125    /// Node namespace
126    namespace: heapless::String<64>,
127    /// Domain ID
128    domain_id: u32,
129    /// Registered publishers
130    publishers: Vec<PublisherInfo, MAX_PUBS>,
131    /// Registered subscribers
132    subscribers: Vec<SubscriberInfo, MAX_SUBS>,
133    /// Serialization buffer for publishing
134    tx_buffer: [u8; NODE_TX_BUF_LEN],
135    /// Reception buffer for subscribing
136    #[allow(dead_code)] // Used when transport is connected
137    rx_buffer: [u8; NODE_RX_BUF_LEN],
138}
139
140/// Options for creating a publisher
141#[derive(Debug, Clone)]
142pub struct PublisherOptions<'a> {
143    /// Topic name
144    pub topic: &'a str,
145    /// QoS settings
146    pub qos: QosSettings,
147}
148
149impl<'a> PublisherOptions<'a> {
150    /// Create new publisher options with the given topic and default QoS
151    pub fn new(topic: &'a str) -> Self {
152        Self {
153            topic,
154            qos: QosSettings::default(),
155        }
156    }
157
158    /// Set the QoS settings
159    pub fn qos(mut self, qos: QosSettings) -> Self {
160        self.qos = qos;
161        self
162    }
163}
164
165/// Options for creating a subscriber
166#[derive(Debug, Clone)]
167pub struct SubscriberOptions<'a> {
168    /// Topic name
169    pub topic: &'a str,
170    /// QoS settings
171    pub qos: QosSettings,
172}
173
174impl<'a> SubscriberOptions<'a> {
175    /// Create new subscriber options with the given topic and default QoS
176    pub fn new(topic: &'a str) -> Self {
177        Self {
178            topic,
179            qos: QosSettings::default(),
180        }
181    }
182
183    /// Set the QoS settings
184    pub fn qos(mut self, qos: QosSettings) -> Self {
185        self.qos = qos;
186        self
187    }
188}
189
190impl<const MAX_PUBS: usize, const MAX_SUBS: usize> Node<MAX_PUBS, MAX_SUBS> {
191    /// Create a new node with the given configuration
192    pub fn new(config: NodeConfig) -> Result<Self, NodeError> {
193        // Phase 192.1 — validate the name/namespace fit the bounded buffers
194        // instead of silently truncating (a truncated node identity mis-routes).
195        let mut name = heapless::String::new();
196        name.push_str(config.name)
197            .map_err(|_| NodeError::NameTooLong)?;
198
199        let mut namespace = heapless::String::new();
200        namespace
201            .push_str(config.namespace)
202            .map_err(|_| NodeError::NamespaceTooLong)?;
203
204        Ok(Self {
205            name,
206            namespace,
207            domain_id: config.domain_id,
208            publishers: Vec::new(),
209            subscribers: Vec::new(),
210            tx_buffer: [0u8; NODE_TX_BUF_LEN],
211            rx_buffer: [0u8; NODE_RX_BUF_LEN],
212        })
213    }
214
215    /// Get the node name
216    pub fn name(&self) -> &str {
217        &self.name
218    }
219
220    /// Get the node namespace
221    pub fn namespace(&self) -> &str {
222        &self.namespace
223    }
224
225    /// Get the domain ID
226    pub fn domain_id(&self) -> u32 {
227        self.domain_id
228    }
229
230    /// Get the fully qualified node name.
231    ///
232    /// Phase 192.1 — fallible: namespace (≤64) + `/` + name (≤64) can reach 129
233    /// bytes, one over the `String<128>` capacity, so a maxed name+namespace
234    /// would otherwise silently truncate the FQN.
235    pub fn fully_qualified_name(&self) -> Result<heapless::String<128>, NodeError> {
236        let mut fqn = heapless::String::new();
237        fqn.push_str(&self.namespace)
238            .map_err(|_| NodeError::NamespaceTooLong)?;
239        if !self.namespace.ends_with('/') {
240            fqn.push('/').map_err(|_| NodeError::NamespaceTooLong)?;
241        }
242        fqn.push_str(&self.name)
243            .map_err(|_| NodeError::NameTooLong)?;
244        Ok(fqn)
245    }
246
247    /// Create a publisher with the given options
248    pub fn create_publisher<M: RosMessage>(
249        &mut self,
250        options: PublisherOptions,
251    ) -> Result<PublisherHandle<M>, NodeError> {
252        if self.publishers.len() >= MAX_PUBS {
253            return Err(NodeError::MaxPublishersReached);
254        }
255
256        let mut topic_name = heapless::String::new();
257        // Phase 192.1 — error on overflow instead of silently truncating the
258        // topic name (a truncated key expression mis-routes on the wire).
259        topic_name
260            .push_str(options.topic)
261            .map_err(|_| NodeError::TopicNameTooLong)?;
262
263        let info = PublisherInfo {
264            topic_name,
265            type_name: M::TYPE_NAME,
266            type_hash: M::TYPE_HASH,
267            qos: options.qos,
268            active: true,
269        };
270
271        let index = self.publishers.len();
272        self.publishers
273            .push(info)
274            .map_err(|_| NodeError::MaxPublishersReached)?;
275
276        Ok(PublisherHandle::new(index))
277    }
278
279    /// Create a subscriber with the given options
280    pub fn create_subscriber<M: RosMessage>(
281        &mut self,
282        options: SubscriberOptions,
283    ) -> Result<SubscriberHandle<M>, NodeError> {
284        if self.subscribers.len() >= MAX_SUBS {
285            return Err(NodeError::MaxSubscribersReached);
286        }
287
288        let mut topic_name = heapless::String::new();
289        // Phase 192.1 — error on overflow instead of silently truncating.
290        topic_name
291            .push_str(options.topic)
292            .map_err(|_| NodeError::TopicNameTooLong)?;
293
294        let info = SubscriberInfo {
295            topic_name,
296            type_name: M::TYPE_NAME,
297            type_hash: M::TYPE_HASH,
298            qos: options.qos,
299            active: true,
300        };
301
302        let index = self.subscribers.len();
303        self.subscribers
304            .push(info)
305            .map_err(|_| NodeError::MaxSubscribersReached)?;
306
307        Ok(SubscriberHandle::new(index))
308    }
309
310    /// Get topic info for a publisher
311    pub fn publisher_topic_info(&self, handle: PublisherHandle<()>) -> Option<TopicInfo<'_>> {
312        self.publishers.get(handle.index()).map(|info| {
313            TopicInfo::new(&info.topic_name, info.type_name, info.type_hash)
314                .with_domain(self.domain_id)
315        })
316    }
317
318    /// Get topic info for a subscriber
319    pub fn subscriber_topic_info(&self, handle: SubscriberHandle<()>) -> Option<TopicInfo<'_>> {
320        self.subscribers.get(handle.index()).map(|info| {
321            TopicInfo::new(&info.topic_name, info.type_name, info.type_hash)
322                .with_domain(self.domain_id)
323        })
324    }
325
326    /// Serialize a message for publishing
327    ///
328    /// Returns the serialized bytes. The caller is responsible for
329    /// sending the bytes via the transport layer.
330    pub fn serialize_message<M: RosMessage>(
331        &mut self,
332        _handle: &PublisherHandle<M>,
333        msg: &M,
334    ) -> Result<&[u8], NodeError> {
335        use nros_core::CdrWriter;
336
337        let mut writer = CdrWriter::new_with_header(&mut self.tx_buffer)
338            .map_err(|_| NodeError::BufferTooSmall)?;
339        msg.serialize(&mut writer)
340            .map_err(|_| NodeError::SerializationFailed)?;
341        let len = writer.position();
342
343        Ok(&self.tx_buffer[..len])
344    }
345
346    /// Deserialize a received message
347    ///
348    /// The caller provides the raw bytes received from the transport layer.
349    pub fn deserialize_message<M: RosMessage>(
350        &self,
351        _handle: &SubscriberHandle<M>,
352        data: &[u8],
353    ) -> Result<M, NodeError> {
354        use nros_core::CdrReader;
355
356        let mut reader =
357            CdrReader::new_with_header(data).map_err(|_| NodeError::DeserializationFailed)?;
358        M::deserialize(&mut reader).map_err(|_| NodeError::DeserializationFailed)
359    }
360
361    /// Get the number of active publishers
362    pub fn publisher_count(&self) -> usize {
363        self.publishers.iter().filter(|p| p.active).count()
364    }
365
366    /// Get the number of active subscribers
367    pub fn subscriber_count(&self) -> usize {
368        self.subscribers.iter().filter(|s| s.active).count()
369    }
370}
371
372impl<const MAX_PUBS: usize, const MAX_SUBS: usize> Default for Node<MAX_PUBS, MAX_SUBS> {
373    fn default() -> Self {
374        // The default config name ("nros_node") + namespace ("/") are short
375        // literals that always fit the bounded buffers, so `new` can't fail here.
376        Self::new(NodeConfig::default())
377            .expect("default NodeConfig fits the bounded name/namespace")
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    // Mock message type for testing
386    #[derive(Debug, Clone, Default)]
387    struct TestMessage {
388        data: i32,
389    }
390
391    impl RosMessage for TestMessage {
392        const TYPE_NAME: &'static str = "test_msgs::msg::TestMessage";
393        const TYPE_HASH: &'static str = "abc123";
394    }
395
396    impl nros_core::Serialize for TestMessage {
397        fn serialize(&self, writer: &mut nros_core::CdrWriter) -> Result<(), nros_core::SerError> {
398            self.data.serialize(writer)
399        }
400    }
401
402    impl nros_core::Deserialize for TestMessage {
403        fn deserialize(reader: &mut nros_core::CdrReader) -> Result<Self, nros_core::DeserError> {
404            Ok(Self {
405                data: i32::deserialize(reader)?,
406            })
407        }
408    }
409
410    #[test]
411    fn test_node_creation() {
412        let config = NodeConfig::new("test_node", "/test");
413        let node = Node::<4, 4>::new(config).unwrap();
414
415        assert_eq!(node.name(), "test_node");
416        assert_eq!(node.namespace(), "/test");
417        assert_eq!(node.domain_id(), 0);
418    }
419
420    #[test]
421    fn test_fully_qualified_name() {
422        let config = NodeConfig::new("my_node", "/my_ns");
423        let node = Node::<4, 4>::new(config).unwrap();
424
425        assert_eq!(
426            node.fully_qualified_name().unwrap().as_str(),
427            "/my_ns/my_node"
428        );
429    }
430
431    #[test]
432    fn test_create_publisher() {
433        let mut node = Node::<4, 4>::default();
434        let handle = node.create_publisher::<TestMessage>(PublisherOptions::new("/test_topic"));
435
436        assert!(handle.is_ok());
437        assert_eq!(node.publisher_count(), 1);
438    }
439
440    #[test]
441    fn test_create_subscriber() {
442        let mut node = Node::<4, 4>::default();
443        let handle = node.create_subscriber::<TestMessage>(SubscriberOptions::new("/test_topic"));
444
445        assert!(handle.is_ok());
446        assert_eq!(node.subscriber_count(), 1);
447    }
448
449    #[test]
450    fn test_max_publishers() {
451        let mut node = Node::<2, 2>::default();
452
453        let _ = node.create_publisher::<TestMessage>(PublisherOptions::new("/topic1"));
454        let _ = node.create_publisher::<TestMessage>(PublisherOptions::new("/topic2"));
455        let result = node.create_publisher::<TestMessage>(PublisherOptions::new("/topic3"));
456
457        assert_eq!(result, Err(NodeError::MaxPublishersReached));
458    }
459
460    #[test]
461    fn test_serialize_deserialize() {
462        let mut node = Node::<4, 4>::default();
463        let pub_handle = node
464            .create_publisher::<TestMessage>(PublisherOptions::new("/test"))
465            .unwrap();
466        let sub_handle = node
467            .create_subscriber::<TestMessage>(SubscriberOptions::new("/test"))
468            .unwrap();
469
470        let msg = TestMessage { data: 42 };
471
472        // Copy bytes since serialize_message borrows the internal buffer
473        let mut buf = [0u8; 128];
474        let bytes = node.serialize_message(&pub_handle, &msg).unwrap();
475        let len = bytes.len();
476        buf[..len].copy_from_slice(bytes);
477
478        let received: TestMessage = node.deserialize_message(&sub_handle, &buf[..len]).unwrap();
479        assert_eq!(received.data, 42);
480    }
481}