1use core::marker::PhantomData;
4
5use nros_core::{RosAction, RosMessage, RosService};
6use nros_rmw::{ActionInfo, QosSettings, ServiceInfo, Session as _, TopicInfo, TransportError};
7
8use crate::{
9 cyclonedds_register::{MessageForRmw, register_type},
10 session,
11};
12
13use super::{
14 handles::{
15 ActionClient, ActionClientCallback, ActionServer, EmbeddedPublisher, EmbeddedServiceClient,
16 EmbeddedServiceServer, ServiceClientCallback, Subscription,
17 },
18 types::NodeError,
19};
20
21pub struct NodeHandle<'a> {
27 name: heapless::String<64>,
28 namespace: heapless::String<64>,
29 session: &'a mut session::ConcreteSession,
30 domain_id: u32,
31 qos_overrides: &'static [nros_rmw::QosOverride],
38}
39
40impl<'a> NodeHandle<'a> {
41 pub(crate) fn new(
43 name: heapless::String<64>,
44 namespace: heapless::String<64>,
45 session: &'a mut session::ConcreteSession,
46 domain_id: u32,
47 ) -> Self {
48 Self {
49 name,
50 namespace,
51 session,
52 domain_id,
53 qos_overrides: &[],
54 }
55 }
56
57 pub fn set_qos_overrides(&mut self, overrides: &'static [nros_rmw::QosOverride]) {
65 self.qos_overrides = overrides;
66 }
67
68 #[must_use]
70 pub fn qos_overrides(&self) -> &'static [nros_rmw::QosOverride] {
71 self.qos_overrides
72 }
73
74 pub fn name(&self) -> &str {
76 &self.name
77 }
78
79 #[must_use]
102 pub fn logger(&self) -> &'static nros_log::Logger {
103 nros_log::get_logger(self.name())
104 }
105
106 pub fn domain_id(&self) -> u32 {
108 self.domain_id
109 }
110
111 pub fn set_domain_id(&mut self, domain_id: u32) {
113 self.domain_id = domain_id;
114 }
115
116 pub fn session_mut(&mut self) -> &mut session::ConcreteSession {
118 self.session
119 }
120
121 fn topic_info<'b>(
141 domain_id: u32,
142 node_name: &'b str,
143 namespace: &'b str,
144 topic_name: &'b str,
145 type_name: &'b str,
146 type_hash: &'b str,
147 ) -> TopicInfo<'b> {
148 TopicInfo::new(topic_name, type_name, type_hash)
149 .with_domain(domain_id)
150 .with_node_name(node_name)
151 .with_namespace(namespace)
152 }
153
154 fn service_info<'b>(
155 domain_id: u32,
156 node_name: &'b str,
157 namespace: &'b str,
158 service_name: &'b str,
159 type_name: &'b str,
160 type_hash: &'b str,
161 ) -> ServiceInfo<'b> {
162 ServiceInfo::new(service_name, type_name, type_hash)
163 .with_domain(domain_id)
164 .with_node_name(node_name)
165 .with_namespace(namespace)
166 }
167
168 fn action_info<'b>(
169 domain_id: u32,
170 action_name: &'b str,
171 type_name: &'b str,
172 type_hash: &'b str,
173 ) -> ActionInfo<'b> {
174 ActionInfo::new(action_name, type_name, type_hash).with_domain(domain_id)
178 }
179
180 pub fn create_publisher<M: MessageForRmw>(
184 &mut self,
185 topic_name: &str,
186 ) -> Result<EmbeddedPublisher<M>, NodeError> {
187 self.create_publisher_with_qos::<M>(topic_name, QosSettings::default())
188 }
189
190 pub fn create_publisher_with_qos<M: MessageForRmw>(
192 &mut self,
193 topic_name: &str,
194 qos: QosSettings,
195 ) -> Result<EmbeddedPublisher<M>, NodeError> {
196 register_type::<M>()?;
200 let qos = qos.apply_overrides(
204 topic_name,
205 nros_rmw::QosOverrideRole::Publisher,
206 self.qos_overrides,
207 );
208 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
211 .map_err(NodeError::Transport)?;
212 let topic = Self::topic_info(
213 self.domain_id,
214 &self.name,
215 &self.namespace,
216 topic_name,
217 <M as RosMessage>::TYPE_NAME,
218 <M as RosMessage>::TYPE_HASH,
219 );
220 let handle = self
221 .session
222 .create_publisher(&topic, qos)
223 .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))?;
224 Ok(EmbeddedPublisher {
225 handle,
226 event_regs: crate::executor::handles::empty_event_regs(),
227 _phantom: PhantomData,
228 })
229 }
230
231 pub fn create_publisher_raw(
237 &mut self,
238 topic_name: &str,
239 type_name: &str,
240 type_hash: &str,
241 ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
242 self.create_publisher_raw_with_qos(topic_name, type_name, type_hash, QosSettings::default())
243 }
244
245 pub fn create_publisher_raw_with_qos(
247 &mut self,
248 topic_name: &str,
249 type_name: &str,
250 type_hash: &str,
251 qos: QosSettings,
252 ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
253 let qos = qos.apply_overrides(
255 topic_name,
256 nros_rmw::QosOverrideRole::Publisher,
257 self.qos_overrides,
258 );
259 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
260 .map_err(NodeError::Transport)?;
261 let topic = Self::topic_info(
262 self.domain_id,
263 &self.name,
264 &self.namespace,
265 topic_name,
266 type_name,
267 type_hash,
268 );
269 let handle = self
270 .session
271 .create_publisher(&topic, qos)
272 .map_err(|_| NodeError::Transport(TransportError::PublisherCreationFailed))?;
273 Ok(crate::executor::handles::EmbeddedRawPublisher {
274 handle,
275 arena: crate::executor::handles::TxArena::new(),
276 event_regs: crate::executor::handles::empty_event_regs(),
277 })
278 }
279
280 pub fn publisher<'t>(&mut self, topic: &'t str) -> PublisherBuilder<'_, 'a, 't> {
286 PublisherBuilder {
287 node: self,
288 topic,
289 qos: QosSettings::default(),
290 }
291 }
292
293 pub fn create_subscription<M: MessageForRmw>(
297 &mut self,
298 topic_name: &str,
299 ) -> Result<Subscription<M>, NodeError> {
300 self.create_subscription_sized::<M, { crate::config::DEFAULT_RX_BUF_SIZE }>(topic_name)
301 }
302
303 pub fn create_subscription_sized<M: MessageForRmw, const RX_BUF: usize>(
305 &mut self,
306 topic_name: &str,
307 ) -> Result<Subscription<M, RX_BUF>, NodeError> {
308 self.create_subscription_with_qos::<M, RX_BUF>(topic_name, QosSettings::default())
309 }
310
311 pub fn create_subscription_with_qos<M: MessageForRmw, const RX_BUF: usize>(
313 &mut self,
314 topic_name: &str,
315 qos: QosSettings,
316 ) -> Result<Subscription<M, RX_BUF>, NodeError> {
317 register_type::<M>()?;
319 let qos = qos.apply_overrides(
321 topic_name,
322 nros_rmw::QosOverrideRole::Subscription,
323 self.qos_overrides,
324 );
325 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
326 .map_err(NodeError::Transport)?;
327 let topic = Self::topic_info(
328 self.domain_id,
329 &self.name,
330 &self.namespace,
331 topic_name,
332 <M as RosMessage>::TYPE_NAME,
333 <M as RosMessage>::TYPE_HASH,
334 );
335 let handle = self
336 .session
337 .create_subscriber(&topic, qos)
338 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?;
339 Ok(Subscription {
340 handle,
341 buffer: [0u8; RX_BUF],
342 event_regs: crate::executor::handles::empty_event_regs(),
343 _phantom: PhantomData,
344 })
345 }
346
347 pub fn create_subscription_raw(
349 &mut self,
350 topic_name: &str,
351 type_name: &str,
352 type_hash: &str,
353 ) -> Result<crate::executor::handles::RawSubscription, NodeError> {
354 self.create_subscription_raw_sized::<{ crate::config::DEFAULT_RX_BUF_SIZE }>(
355 topic_name, type_name, type_hash,
356 )
357 }
358
359 pub fn create_subscription_raw_sized<const RX_BUF: usize>(
361 &mut self,
362 topic_name: &str,
363 type_name: &str,
364 type_hash: &str,
365 ) -> Result<crate::executor::handles::RawSubscription<RX_BUF>, NodeError> {
366 let qos = QosSettings::default().apply_overrides(
371 topic_name,
372 nros_rmw::QosOverrideRole::Subscription,
373 self.qos_overrides,
374 );
375 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
376 .map_err(NodeError::Transport)?;
377 let topic = Self::topic_info(
378 self.domain_id,
379 &self.name,
380 &self.namespace,
381 topic_name,
382 type_name,
383 type_hash,
384 );
385 let handle = self
386 .session
387 .create_subscriber(&topic, qos)
388 .map_err(|_| NodeError::Transport(TransportError::SubscriberCreationFailed))?;
389 Ok(crate::executor::handles::RawSubscription {
390 handle,
391 buffer: [0u8; RX_BUF],
392 event_regs: crate::executor::handles::empty_event_regs(),
393 })
394 }
395
396 pub fn create_service<Svc: RosService>(
400 &mut self,
401 service_name: &str,
402 ) -> Result<EmbeddedServiceServer<Svc>, NodeError>
403 where
404 Svc::Request: MessageForRmw,
405 Svc::Reply: MessageForRmw,
406 {
407 self.create_service_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, QosSettings::services_default())
408 }
409
410 pub fn create_service_with_qos<Svc: RosService>(
413 &mut self,
414 service_name: &str,
415 qos: QosSettings,
416 ) -> Result<EmbeddedServiceServer<Svc>, NodeError>
417 where
418 Svc::Request: MessageForRmw,
419 Svc::Reply: MessageForRmw,
420 {
421 self.create_service_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, qos)
422 }
423
424 pub fn create_service_sized<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>(
426 &mut self,
427 service_name: &str,
428 qos: QosSettings,
429 ) -> Result<EmbeddedServiceServer<Svc, REQ_BUF, REPLY_BUF>, NodeError>
430 where
431 Svc::Request: MessageForRmw,
432 Svc::Reply: MessageForRmw,
433 {
434 register_type::<Svc::Request>()?;
437 register_type::<Svc::Reply>()?;
438 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
443 .map_err(NodeError::Transport)?;
444 let info = Self::service_info(
445 self.domain_id,
446 &self.name,
447 &self.namespace,
448 service_name,
449 Svc::SERVICE_NAME,
450 Svc::SERVICE_HASH,
451 );
452 let handle = self
453 .session
454 .create_service_server(&info, qos)
455 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
456 Ok(EmbeddedServiceServer {
457 handle,
458 req_buffer: [0u8; REQ_BUF],
459 reply_buffer: [0u8; REPLY_BUF],
460 _phantom: PhantomData,
461 })
462 }
463
464 pub fn create_client<Svc: RosService>(
466 &mut self,
467 service_name: &str,
468 ) -> Result<EmbeddedServiceClient<Svc>, NodeError>
469 where
470 Svc::Request: MessageForRmw,
471 Svc::Reply: MessageForRmw,
472 {
473 self.create_client_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, QosSettings::services_default())
474 }
475
476 pub fn create_client_with_qos<Svc: RosService>(
478 &mut self,
479 service_name: &str,
480 qos: QosSettings,
481 ) -> Result<EmbeddedServiceClient<Svc>, NodeError>
482 where
483 Svc::Request: MessageForRmw,
484 Svc::Reply: MessageForRmw,
485 {
486 self.create_client_sized::<Svc, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(service_name, qos)
487 }
488
489 pub fn create_client_sized<Svc: RosService, const REQ_BUF: usize, const REPLY_BUF: usize>(
491 &mut self,
492 service_name: &str,
493 qos: QosSettings,
494 ) -> Result<EmbeddedServiceClient<Svc, REQ_BUF, REPLY_BUF>, NodeError>
495 where
496 Svc::Request: MessageForRmw,
497 Svc::Reply: MessageForRmw,
498 {
499 register_type::<Svc::Request>()?;
501 register_type::<Svc::Reply>()?;
502 qos.validate_against(nros_rmw::Session::supported_qos_policies(self.session))
505 .map_err(NodeError::Transport)?;
506 let info = Self::service_info(
507 self.domain_id,
508 &self.name,
509 &self.namespace,
510 service_name,
511 Svc::SERVICE_NAME,
512 Svc::SERVICE_HASH,
513 );
514 let handle = self
515 .session
516 .create_service_client(&info, qos)
517 .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?;
518 Ok(EmbeddedServiceClient {
519 handle,
520 req_buffer: [0u8; REQ_BUF],
521 reply_buffer: [0u8; REPLY_BUF],
522 in_flight: false,
523 _phantom: PhantomData,
524 })
525 }
526
527 pub fn create_service_raw(
532 &mut self,
533 service_name: &str,
534 type_name: &str,
535 type_hash: &str,
536 ) -> Result<crate::executor::handles::RawServiceServer, NodeError> {
537 self.create_service_raw_sized::<
538 { crate::config::DEFAULT_RX_BUF_SIZE },
539 { crate::config::DEFAULT_RX_BUF_SIZE },
540 >(service_name, type_name, type_hash)
541 }
542
543 pub fn create_service_raw_sized<const REQ_BUF: usize, const RESP_BUF: usize>(
545 &mut self,
546 service_name: &str,
547 type_name: &str,
548 type_hash: &str,
549 ) -> Result<crate::executor::handles::RawServiceServer<REQ_BUF, RESP_BUF>, NodeError> {
550 let info = Self::service_info(
551 self.domain_id,
552 &self.name,
553 &self.namespace,
554 service_name,
555 type_name,
556 type_hash,
557 );
558 let handle = self
559 .session
560 .create_service_server(&info, QosSettings::services_default())
561 .map_err(|_| NodeError::Transport(TransportError::ServiceServerCreationFailed))?;
562 Ok(crate::executor::handles::RawServiceServer::new(handle))
563 }
564
565 pub fn create_client_raw(
567 &mut self,
568 service_name: &str,
569 type_name: &str,
570 type_hash: &str,
571 ) -> Result<crate::executor::handles::RawServiceClient, NodeError> {
572 self.create_client_raw_sized::<
573 { crate::config::DEFAULT_RX_BUF_SIZE },
574 { crate::config::DEFAULT_RX_BUF_SIZE },
575 >(service_name, type_name, type_hash)
576 }
577
578 pub fn create_client_raw_sized<const REQ_BUF: usize, const REPLY_BUF: usize>(
580 &mut self,
581 service_name: &str,
582 type_name: &str,
583 type_hash: &str,
584 ) -> Result<crate::executor::handles::RawServiceClient<REQ_BUF, REPLY_BUF>, NodeError> {
585 let info = Self::service_info(
586 self.domain_id,
587 &self.name,
588 &self.namespace,
589 service_name,
590 type_name,
591 type_hash,
592 );
593 let handle = self
594 .session
595 .create_service_client(&info, QosSettings::services_default())
596 .map_err(|_| NodeError::Transport(TransportError::ServiceClientCreationFailed))?;
597 Ok(crate::executor::handles::RawServiceClient::new(handle))
598 }
599
600 pub fn create_action_server_raw(
610 &mut self,
611 action_name: &str,
612 type_name: &str,
613 type_hash: &str,
614 ) -> Result<
615 super::action_core::ActionServerCore<
616 { crate::config::DEFAULT_RX_BUF_SIZE },
617 { crate::config::DEFAULT_RX_BUF_SIZE },
618 { crate::config::DEFAULT_RX_BUF_SIZE },
619 4,
620 >,
621 NodeError,
622 > {
623 self.create_action_server_raw_sized::<
624 { crate::config::DEFAULT_RX_BUF_SIZE },
625 { crate::config::DEFAULT_RX_BUF_SIZE },
626 { crate::config::DEFAULT_RX_BUF_SIZE },
627 4,
628 >(action_name, type_name, type_hash)
629 }
630
631 pub fn create_action_server_raw_sized<
633 const GOAL_BUF: usize,
634 const RESULT_BUF: usize,
635 const FEEDBACK_BUF: usize,
636 const MAX_GOALS: usize,
637 >(
638 &mut self,
639 action_name: &str,
640 type_name: &str,
641 type_hash: &str,
642 ) -> Result<
643 super::action_core::ActionServerCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>,
644 NodeError,
645 > {
646 let action_info = Self::action_info(self.domain_id, action_name, type_name, type_hash);
647
648 let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
649 let send_goal_info = Self::service_info(
650 self.domain_id,
651 &self.name,
652 &self.namespace,
653 &send_goal_keyexpr,
654 type_name,
655 type_hash,
656 );
657 let send_goal_server = self
658 .session
659 .create_service_server(&send_goal_info, QosSettings::services_default())
660 .map_err(|_| NodeError::ActionCreationFailed)?;
661
662 let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
663 let cancel_goal_info = Self::service_info(
664 self.domain_id,
665 &self.name,
666 &self.namespace,
667 &cancel_goal_keyexpr,
668 "action_msgs::srv::dds_::CancelGoal_",
669 type_hash,
670 );
671 let cancel_goal_server = self
672 .session
673 .create_service_server(&cancel_goal_info, QosSettings::services_default())
674 .map_err(|_| NodeError::ActionCreationFailed)?;
675
676 let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
677 let get_result_info = Self::service_info(
678 self.domain_id,
679 &self.name,
680 &self.namespace,
681 &get_result_keyexpr,
682 type_name,
683 type_hash,
684 );
685 let get_result_server = self
686 .session
687 .create_service_server(&get_result_info, QosSettings::services_default())
688 .map_err(|_| NodeError::ActionCreationFailed)?;
689
690 let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
691 let feedback_topic = Self::topic_info(
692 self.domain_id,
693 &self.name,
694 &self.namespace,
695 &feedback_keyexpr,
696 type_name,
697 type_hash,
698 );
699 let feedback_publisher = self
700 .session
701 .create_publisher(&feedback_topic, QosSettings::QOS_PROFILE_DEFAULT)
702 .map_err(|_| NodeError::ActionCreationFailed)?;
703
704 let status_keyexpr: heapless::String<256> = action_info.status_key();
705 let status_topic = Self::topic_info(
706 self.domain_id,
707 &self.name,
708 &self.namespace,
709 &status_keyexpr,
710 "action_msgs::msg::dds_::GoalStatusArray_",
711 type_hash,
712 );
713 let status_publisher = self
714 .session
715 .create_publisher(
716 &status_topic,
717 QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT,
718 )
719 .map_err(|_| NodeError::ActionCreationFailed)?;
720
721 Ok(super::action_core::ActionServerCore {
722 send_goal_server,
723 cancel_goal_server,
724 get_result_server,
725 feedback_publisher,
726 status_publisher,
727 active_goals: heapless::Vec::new(),
728 completed_results: heapless::Vec::new(),
729 pending_get_results: heapless::Vec::new(),
730 result_slab: [0u8; RESULT_BUF],
731 result_slab_used: 0,
732 goal_buffer: [0u8; GOAL_BUF],
733 feedback_buffer: [0u8; FEEDBACK_BUF],
734 cancel_buffer: [0u8; 256],
735 })
736 }
737
738 pub fn create_action_client_raw(
742 &mut self,
743 action_name: &str,
744 type_name: &str,
745 type_hash: &str,
746 ) -> Result<
747 super::action_core::ActionClientCore<
748 { crate::config::DEFAULT_RX_BUF_SIZE },
749 { crate::config::DEFAULT_RX_BUF_SIZE },
750 { crate::config::DEFAULT_RX_BUF_SIZE },
751 >,
752 NodeError,
753 > {
754 self.create_action_client_raw_sized::<
755 { crate::config::DEFAULT_RX_BUF_SIZE },
756 { crate::config::DEFAULT_RX_BUF_SIZE },
757 { crate::config::DEFAULT_RX_BUF_SIZE },
758 >(action_name, type_name, type_hash)
759 }
760
761 pub fn create_action_client_raw_sized<
763 const GOAL_BUF: usize,
764 const RESULT_BUF: usize,
765 const FEEDBACK_BUF: usize,
766 >(
767 &mut self,
768 action_name: &str,
769 type_name: &str,
770 type_hash: &str,
771 ) -> Result<super::action_core::ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
772 {
773 let action_info = Self::action_info(self.domain_id, action_name, type_name, type_hash);
774
775 let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
776 let send_goal_info = Self::service_info(
777 self.domain_id,
778 &self.name,
779 &self.namespace,
780 &send_goal_keyexpr,
781 type_name,
782 type_hash,
783 );
784 let send_goal_client = self
785 .session
786 .create_service_client(&send_goal_info, QosSettings::services_default())
787 .map_err(|_| NodeError::ActionCreationFailed)?;
788
789 let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
790 let cancel_goal_info = Self::service_info(
791 self.domain_id,
792 &self.name,
793 &self.namespace,
794 &cancel_goal_keyexpr,
795 "action_msgs::srv::dds_::CancelGoal_",
796 type_hash,
797 );
798 let cancel_goal_client = self
799 .session
800 .create_service_client(&cancel_goal_info, QosSettings::services_default())
801 .map_err(|_| NodeError::ActionCreationFailed)?;
802
803 let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
804 let get_result_info = Self::service_info(
805 self.domain_id,
806 &self.name,
807 &self.namespace,
808 &get_result_keyexpr,
809 type_name,
810 type_hash,
811 );
812 let get_result_client = self
813 .session
814 .create_service_client(&get_result_info, QosSettings::services_default())
815 .map_err(|_| NodeError::ActionCreationFailed)?;
816
817 let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
818 let feedback_topic = Self::topic_info(
819 self.domain_id,
820 &self.name,
821 &self.namespace,
822 &feedback_keyexpr,
823 type_name,
824 type_hash,
825 );
826 let feedback_subscriber = self
827 .session
828 .create_subscriber(&feedback_topic, QosSettings::BEST_EFFORT)
829 .map_err(|_| NodeError::ActionCreationFailed)?;
830
831 Ok(super::action_core::ActionClientCore::new(
832 send_goal_client,
833 cancel_goal_client,
834 get_result_client,
835 feedback_subscriber,
836 ))
837 }
838
839 pub fn create_action_server<A: RosAction>(
841 &mut self,
842 action_name: &str,
843 ) -> Result<ActionServer<A>, NodeError>
844 where
845 A::Goal: MessageForRmw,
846 A::Result: MessageForRmw,
847 A::Feedback: MessageForRmw,
848 A::SendGoalRequest: MessageForRmw,
849 A::SendGoalResponse: MessageForRmw,
850 A::GetResultRequest: MessageForRmw,
851 A::GetResultResponse: MessageForRmw,
852 A::FeedbackMessage: MessageForRmw,
853 {
854 self.create_action_server_sized::<A, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, 4>(action_name)
855 }
856
857 pub fn create_action_server_sized<
859 A: RosAction,
860 const GOAL_BUF: usize,
861 const RESULT_BUF: usize,
862 const FEEDBACK_BUF: usize,
863 const MAX_GOALS: usize,
864 >(
865 &mut self,
866 action_name: &str,
867 ) -> Result<ActionServer<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>, NodeError>
868 where
869 A::Goal: MessageForRmw,
870 A::Result: MessageForRmw,
871 A::Feedback: MessageForRmw,
872 A::SendGoalRequest: MessageForRmw,
873 A::SendGoalResponse: MessageForRmw,
874 A::GetResultRequest: MessageForRmw,
875 A::GetResultResponse: MessageForRmw,
876 A::FeedbackMessage: MessageForRmw,
877 {
878 register_type::<A::Goal>()?;
886 register_type::<A::Result>()?;
887 register_type::<A::Feedback>()?;
888 register_type::<A::SendGoalRequest>()?;
889 register_type::<A::SendGoalResponse>()?;
890 register_type::<A::GetResultRequest>()?;
891 register_type::<A::GetResultResponse>()?;
892 register_type::<A::FeedbackMessage>()?;
893 let action_info =
894 Self::action_info(self.domain_id, action_name, A::ACTION_NAME, A::ACTION_HASH);
895
896 let send_goal_type = super::action_core::action_service_base_type(
905 <A::SendGoalRequest as RosMessage>::TYPE_NAME,
906 A::ACTION_NAME,
907 );
908 let get_result_type = super::action_core::action_service_base_type(
909 <A::GetResultRequest as RosMessage>::TYPE_NAME,
910 A::ACTION_NAME,
911 );
912 let feedback_type = <A::FeedbackMessage as RosMessage>::TYPE_NAME;
913
914 let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
915 let send_goal_info = Self::service_info(
916 self.domain_id,
917 &self.name,
918 &self.namespace,
919 &send_goal_keyexpr,
920 send_goal_type,
921 A::ACTION_HASH,
922 );
923 let send_goal_server = self
924 .session
925 .create_service_server(&send_goal_info, QosSettings::services_default())
926 .map_err(|_| NodeError::ActionCreationFailed)?;
927
928 let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
929 let cancel_goal_info = Self::service_info(
930 self.domain_id,
931 &self.name,
932 &self.namespace,
933 &cancel_goal_keyexpr,
934 "action_msgs::srv::dds_::CancelGoal_",
935 A::ACTION_HASH,
936 );
937 let cancel_goal_server = self
938 .session
939 .create_service_server(&cancel_goal_info, QosSettings::services_default())
940 .map_err(|_| NodeError::ActionCreationFailed)?;
941
942 let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
943 let get_result_info = Self::service_info(
944 self.domain_id,
945 &self.name,
946 &self.namespace,
947 &get_result_keyexpr,
948 get_result_type,
949 A::ACTION_HASH,
950 );
951 let get_result_server = self
952 .session
953 .create_service_server(&get_result_info, QosSettings::services_default())
954 .map_err(|_| NodeError::ActionCreationFailed)?;
955
956 let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
957 let feedback_topic = Self::topic_info(
958 self.domain_id,
959 &self.name,
960 &self.namespace,
961 &feedback_keyexpr,
962 feedback_type,
963 A::ACTION_HASH,
964 );
965 let feedback_publisher = self
966 .session
967 .create_publisher(&feedback_topic, QosSettings::QOS_PROFILE_DEFAULT)
968 .map_err(|_| NodeError::ActionCreationFailed)?;
969
970 let status_keyexpr: heapless::String<256> = action_info.status_key();
971 let status_topic = Self::topic_info(
972 self.domain_id,
973 &self.name,
974 &self.namespace,
975 &status_keyexpr,
976 "action_msgs::msg::dds_::GoalStatusArray_",
977 A::ACTION_HASH,
978 );
979 let status_publisher = self
980 .session
981 .create_publisher(
982 &status_topic,
983 QosSettings::QOS_PROFILE_ACTION_STATUS_DEFAULT,
984 )
985 .map_err(|_| NodeError::ActionCreationFailed)?;
986
987 Ok(ActionServer {
988 core: super::action_core::ActionServerCore {
989 send_goal_server,
990 cancel_goal_server,
991 get_result_server,
992 feedback_publisher,
993 status_publisher,
994 active_goals: heapless::Vec::new(),
995 completed_results: heapless::Vec::new(),
996 pending_get_results: heapless::Vec::new(),
997 result_slab: [0u8; RESULT_BUF],
998 result_slab_used: 0,
999 goal_buffer: [0u8; GOAL_BUF],
1000 feedback_buffer: [0u8; FEEDBACK_BUF],
1001 cancel_buffer: [0u8; 256],
1002 },
1003 typed_goals: heapless::Vec::new(),
1004 completed_goals: heapless::Vec::new(),
1005 })
1006 }
1007
1008 pub fn create_action_client<A: RosAction>(
1010 &mut self,
1011 action_name: &str,
1012 ) -> Result<ActionClient<A>, NodeError>
1013 where
1014 A::Goal: MessageForRmw,
1015 A::Result: MessageForRmw,
1016 A::Feedback: MessageForRmw,
1017 A::SendGoalRequest: MessageForRmw,
1018 A::SendGoalResponse: MessageForRmw,
1019 A::GetResultRequest: MessageForRmw,
1020 A::GetResultResponse: MessageForRmw,
1021 A::FeedbackMessage: MessageForRmw,
1022 {
1023 self.create_action_client_sized::<A, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }, { crate::config::DEFAULT_RX_BUF_SIZE }>(action_name)
1024 }
1025
1026 pub fn create_action_client_sized<
1028 A: RosAction,
1029 const GOAL_BUF: usize,
1030 const RESULT_BUF: usize,
1031 const FEEDBACK_BUF: usize,
1032 >(
1033 &mut self,
1034 action_name: &str,
1035 ) -> Result<ActionClient<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
1036 where
1037 A::Goal: MessageForRmw,
1038 A::Result: MessageForRmw,
1039 A::Feedback: MessageForRmw,
1040 A::SendGoalRequest: MessageForRmw,
1041 A::SendGoalResponse: MessageForRmw,
1042 A::GetResultRequest: MessageForRmw,
1043 A::GetResultResponse: MessageForRmw,
1044 A::FeedbackMessage: MessageForRmw,
1045 {
1046 register_type::<A::Goal>()?;
1048 register_type::<A::Result>()?;
1049 register_type::<A::Feedback>()?;
1050 register_type::<A::SendGoalRequest>()?;
1051 register_type::<A::SendGoalResponse>()?;
1052 register_type::<A::GetResultRequest>()?;
1053 register_type::<A::GetResultResponse>()?;
1054 register_type::<A::FeedbackMessage>()?;
1055 let action_info =
1056 Self::action_info(self.domain_id, action_name, A::ACTION_NAME, A::ACTION_HASH);
1057
1058 let send_goal_type = super::action_core::action_service_base_type(
1066 <A::SendGoalRequest as RosMessage>::TYPE_NAME,
1067 A::ACTION_NAME,
1068 );
1069 let get_result_type = super::action_core::action_service_base_type(
1070 <A::GetResultRequest as RosMessage>::TYPE_NAME,
1071 A::ACTION_NAME,
1072 );
1073 let feedback_type = <A::FeedbackMessage as RosMessage>::TYPE_NAME;
1074
1075 let send_goal_keyexpr: heapless::String<256> = action_info.send_goal_key();
1076 let send_goal_info = Self::service_info(
1077 self.domain_id,
1078 &self.name,
1079 &self.namespace,
1080 &send_goal_keyexpr,
1081 send_goal_type,
1082 A::ACTION_HASH,
1083 );
1084 let send_goal_client = self
1085 .session
1086 .create_service_client(&send_goal_info, QosSettings::services_default())
1087 .map_err(|_| NodeError::ActionCreationFailed)?;
1088
1089 let cancel_goal_keyexpr: heapless::String<256> = action_info.cancel_goal_key();
1090 let cancel_goal_info = Self::service_info(
1091 self.domain_id,
1092 &self.name,
1093 &self.namespace,
1094 &cancel_goal_keyexpr,
1095 "action_msgs::srv::dds_::CancelGoal_",
1096 A::ACTION_HASH,
1097 );
1098 let cancel_goal_client = self
1099 .session
1100 .create_service_client(&cancel_goal_info, QosSettings::services_default())
1101 .map_err(|_| NodeError::ActionCreationFailed)?;
1102
1103 let get_result_keyexpr: heapless::String<256> = action_info.get_result_key();
1104 let get_result_info = Self::service_info(
1105 self.domain_id,
1106 &self.name,
1107 &self.namespace,
1108 &get_result_keyexpr,
1109 get_result_type,
1110 A::ACTION_HASH,
1111 );
1112 let get_result_client = self
1113 .session
1114 .create_service_client(&get_result_info, QosSettings::services_default())
1115 .map_err(|_| NodeError::ActionCreationFailed)?;
1116
1117 let feedback_keyexpr: heapless::String<256> = action_info.feedback_key();
1118 let feedback_topic = Self::topic_info(
1119 self.domain_id,
1120 &self.name,
1121 &self.namespace,
1122 &feedback_keyexpr,
1123 feedback_type,
1124 A::ACTION_HASH,
1125 );
1126 let feedback_subscriber = self
1127 .session
1128 .create_subscriber(&feedback_topic, QosSettings::BEST_EFFORT)
1129 .map_err(|_| NodeError::ActionCreationFailed)?;
1130
1131 Ok(ActionClient {
1132 core: super::action_core::ActionClientCore {
1133 send_goal_client,
1134 cancel_goal_client,
1135 get_result_client,
1136 feedback_subscriber,
1137 goal_buffer: [0u8; GOAL_BUF],
1138 result_buffer: [0u8; RESULT_BUF],
1139 feedback_buffer: [0u8; FEEDBACK_BUF],
1140 goal_counter: 0,
1141 in_flight_send_goal: false,
1142 in_flight_cancel: false,
1143 in_flight_get_result: false,
1144 },
1145 _phantom: PhantomData,
1146 })
1147 }
1148}
1149
1150pub struct PublisherBuilder<'n, 'a, 't> {
1157 node: &'n mut NodeHandle<'a>,
1158 topic: &'t str,
1159 qos: QosSettings,
1160}
1161
1162impl<'n, 'a, 't> PublisherBuilder<'n, 'a, 't> {
1163 pub fn qos(mut self, qos: QosSettings) -> Self {
1165 self.qos = qos;
1166 self
1167 }
1168
1169 pub fn typed<M: MessageForRmw>(self) -> TypedPublisherBuilder<'n, 'a, 't, M> {
1171 TypedPublisherBuilder {
1172 node: self.node,
1173 topic: self.topic,
1174 qos: self.qos,
1175 _phantom: PhantomData,
1176 }
1177 }
1178
1179 pub fn generic(
1182 self,
1183 type_name: &'t str,
1184 type_hash: &'t str,
1185 ) -> GenericPublisherBuilder<'n, 'a, 't> {
1186 GenericPublisherBuilder {
1187 node: self.node,
1188 topic: self.topic,
1189 type_name,
1190 type_hash,
1191 qos: self.qos,
1192 }
1193 }
1194}
1195
1196pub struct TypedPublisherBuilder<'n, 'a, 't, M> {
1198 node: &'n mut NodeHandle<'a>,
1199 topic: &'t str,
1200 qos: QosSettings,
1201 _phantom: PhantomData<M>,
1202}
1203
1204impl<'n, 'a, 't, M: MessageForRmw> TypedPublisherBuilder<'n, 'a, 't, M> {
1205 pub fn qos(mut self, qos: QosSettings) -> Self {
1206 self.qos = qos;
1207 self
1208 }
1209
1210 pub fn build(self) -> Result<EmbeddedPublisher<M>, NodeError> {
1211 self.node
1212 .create_publisher_with_qos::<M>(self.topic, self.qos)
1213 }
1214}
1215
1216pub struct GenericPublisherBuilder<'n, 'a, 't> {
1218 node: &'n mut NodeHandle<'a>,
1219 topic: &'t str,
1220 type_name: &'t str,
1221 type_hash: &'t str,
1222 qos: QosSettings,
1223}
1224
1225impl<'n, 'a, 't> GenericPublisherBuilder<'n, 'a, 't> {
1226 pub fn qos(mut self, qos: QosSettings) -> Self {
1227 self.qos = qos;
1228 self
1229 }
1230
1231 pub fn build(self) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1232 self.node.create_publisher_raw_with_qos(
1233 self.topic,
1234 self.type_name,
1235 self.type_hash,
1236 self.qos,
1237 )
1238 }
1239}
1240
1241pub struct NodeCtx<'e> {
1248 executor: &'e mut super::spin::Executor,
1249 node_id: super::node_record::NodeId,
1250}
1251
1252impl<'e> NodeCtx<'e> {
1253 pub(crate) fn new(
1254 executor: &'e mut super::spin::Executor,
1255 node_id: super::node_record::NodeId,
1256 ) -> Self {
1257 Self { executor, node_id }
1258 }
1259
1260 pub fn subscription<'t>(&mut self, topic: &'t str) -> SubscriptionBuilder<'_, 'e, 't> {
1263 SubscriptionBuilder {
1264 ctx: self,
1265 topic,
1266 qos: QosSettings::default(),
1267 }
1268 }
1269
1270 pub fn publisher<'t>(&mut self, topic: &'t str) -> CtxPublisherBuilder<'_, 'e, 't> {
1277 CtxPublisherBuilder {
1278 ctx: self,
1279 topic,
1280 qos: QosSettings::default(),
1281 }
1282 }
1283
1284 pub fn create_publisher<M: MessageForRmw>(
1286 &mut self,
1287 topic: &str,
1288 ) -> Result<EmbeddedPublisher<M>, NodeError> {
1289 self.executor
1290 .create_publisher_on::<M>(self.node_id, topic, QosSettings::default())
1291 }
1292
1293 pub fn create_generic_publisher(
1295 &mut self,
1296 topic: &str,
1297 type_name: &str,
1298 type_hash: &str,
1299 ) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1300 self.executor.create_publisher_raw_on(
1301 self.node_id,
1302 topic,
1303 type_name,
1304 type_hash,
1305 QosSettings::default(),
1306 )
1307 }
1308
1309 pub fn create_subscription<M, F>(
1312 &mut self,
1313 topic: &str,
1314 callback: F,
1315 ) -> Result<super::types::HandleId, NodeError>
1316 where
1317 M: MessageForRmw + 'static,
1318 F: FnMut(&M) + 'static,
1319 {
1320 self.executor
1321 .register_subscription_buffered_on::<M, F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1322 self.node_id,
1323 topic,
1324 QosSettings::default(),
1325 callback,
1326 )
1327 }
1328
1329 pub fn create_client_with_callback<Svc, F>(
1335 &mut self,
1336 service_name: &str,
1337 callback: F,
1338 ) -> Result<ServiceClientCallback<Svc>, NodeError>
1339 where
1340 Svc: RosService + 'static,
1341 Svc::Request: MessageForRmw,
1342 Svc::Reply: MessageForRmw,
1343 F: FnMut(&Svc::Reply) + 'static,
1344 {
1345 self.create_client_with_callback_sized::<
1346 Svc,
1347 F,
1348 { crate::config::DEFAULT_RX_BUF_SIZE },
1349 { crate::config::DEFAULT_RX_BUF_SIZE },
1350 >(service_name, callback)
1351 }
1352
1353 pub fn create_client_with_callback_sized<Svc, F, const REQ_BUF: usize, const REPLY_BUF: usize>(
1355 &mut self,
1356 service_name: &str,
1357 callback: F,
1358 ) -> Result<ServiceClientCallback<Svc, REQ_BUF, REPLY_BUF>, NodeError>
1359 where
1360 Svc: RosService + 'static,
1361 Svc::Request: MessageForRmw,
1362 Svc::Reply: MessageForRmw,
1363 F: FnMut(&Svc::Reply) + 'static,
1364 {
1365 register_type::<Svc::Request>()?;
1366 register_type::<Svc::Reply>()?;
1367 let (_id, hdr) = self
1368 .executor
1369 .register_service_client_callback::<Svc, F, REPLY_BUF>(
1370 Some(self.node_id),
1371 service_name,
1372 Svc::SERVICE_NAME,
1373 Svc::SERVICE_HASH,
1374 QosSettings::services_default(),
1375 callback,
1376 )?;
1377 Ok(ServiceClientCallback::new(hdr))
1378 }
1379
1380 #[allow(clippy::type_complexity)]
1388 pub fn create_action_client_with_callbacks<A, GRespF, FbF, ResF>(
1389 &mut self,
1390 action_name: &str,
1391 on_goal_response: GRespF,
1392 on_feedback: FbF,
1393 on_result: ResF,
1394 ) -> Result<ActionClientCallback<A>, NodeError>
1395 where
1396 A: RosAction + 'static,
1397 A::Goal: MessageForRmw,
1398 A::Result: MessageForRmw,
1399 A::Feedback: MessageForRmw,
1400 GRespF: FnMut(&nros_core::GoalId, bool) + 'static,
1401 FbF: FnMut(&nros_core::GoalId, &A::Feedback) + 'static,
1402 ResF: FnMut(&nros_core::GoalId, nros_core::GoalStatus, &A::Result) + 'static,
1403 {
1404 self.create_action_client_with_callbacks_sized::<
1405 A,
1406 GRespF,
1407 FbF,
1408 ResF,
1409 { crate::config::DEFAULT_RX_BUF_SIZE },
1410 { crate::config::DEFAULT_RX_BUF_SIZE },
1411 { crate::config::DEFAULT_RX_BUF_SIZE },
1412 >(action_name, on_goal_response, on_feedback, on_result)
1413 }
1414
1415 #[allow(clippy::type_complexity)]
1417 pub fn create_action_client_with_callbacks_sized<
1418 A,
1419 GRespF,
1420 FbF,
1421 ResF,
1422 const GOAL_BUF: usize,
1423 const RESULT_BUF: usize,
1424 const FEEDBACK_BUF: usize,
1425 >(
1426 &mut self,
1427 action_name: &str,
1428 on_goal_response: GRespF,
1429 on_feedback: FbF,
1430 on_result: ResF,
1431 ) -> Result<ActionClientCallback<A, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>, NodeError>
1432 where
1433 A: RosAction + 'static,
1434 A::Goal: MessageForRmw,
1435 A::Result: MessageForRmw,
1436 A::Feedback: MessageForRmw,
1437 GRespF: FnMut(&nros_core::GoalId, bool) + 'static,
1438 FbF: FnMut(&nros_core::GoalId, &A::Feedback) + 'static,
1439 ResF: FnMut(&nros_core::GoalId, nros_core::GoalStatus, &A::Result) + 'static,
1440 {
1441 register_type::<A::Goal>()?;
1442 register_type::<A::Result>()?;
1443 register_type::<A::Feedback>()?;
1444 let (_id, core) = self
1445 .executor
1446 .register_action_client_callback::<A, GRespF, FbF, ResF, GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>(
1447 Some(self.node_id),
1448 action_name,
1449 A::ACTION_NAME,
1450 A::ACTION_HASH,
1451 8u16,
1454 on_goal_response,
1455 on_feedback,
1456 on_result,
1457 )?;
1458 Ok(ActionClientCallback::new(core))
1459 }
1460
1461 pub fn create_generic_subscription<F>(
1463 &mut self,
1464 topic: &str,
1465 type_name: &str,
1466 type_hash: &str,
1467 callback: F,
1468 ) -> Result<super::types::HandleId, NodeError>
1469 where
1470 F: FnMut(&[u8]) + 'static,
1471 {
1472 self.executor
1473 .register_subscription_buffered_raw_on::<F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1474 self.node_id,
1475 topic,
1476 type_name,
1477 type_hash,
1478 QosSettings::default(),
1479 callback,
1480 )
1481 }
1482
1483 #[cfg(feature = "safety-e2e")]
1490 pub fn create_generic_subscription_with_integrity<F>(
1491 &mut self,
1492 topic: &str,
1493 type_name: &str,
1494 type_hash: &str,
1495 callback: F,
1496 ) -> Result<super::types::HandleId, NodeError>
1497 where
1498 F: FnMut(&[u8], &nros_rmw::IntegrityStatus) + 'static,
1499 {
1500 self.executor
1501 .register_subscription_buffered_raw_safety_on::<F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1502 self.node_id,
1503 topic,
1504 type_name,
1505 type_hash,
1506 QosSettings::default(),
1507 callback,
1508 )
1509 }
1510
1511 pub fn create_subscription_borrowed<B, F>(
1527 &mut self,
1528 topic: &str,
1529 callback: F,
1530 ) -> Result<super::types::HandleId, NodeError>
1531 where
1532 B: nros_core::BorrowedMessage + 'static,
1533 F: for<'a> FnMut(&B::View<'a>) + 'static,
1534 {
1535 self.executor
1536 .register_subscription_buffered_borrowed_on::<B, F, { crate::config::DEFAULT_RX_BUF_SIZE }>(
1537 self.node_id,
1538 topic,
1539 QosSettings::default().keep_last(1),
1540 callback,
1541 )
1542 }
1543
1544 pub fn service<'t>(&mut self, name: &'t str) -> CtxServiceBuilder<'_, 'e, 't> {
1548 CtxServiceBuilder {
1549 ctx: self,
1550 name,
1551 qos: QosSettings::services_default(),
1552 }
1553 }
1554
1555 pub fn create_service<Svc, F>(
1558 &mut self,
1559 name: &str,
1560 callback: F,
1561 ) -> Result<super::types::HandleId, NodeError>
1562 where
1563 Svc: RosService + 'static,
1564 Svc::Request: crate::cyclonedds_register::MessageForRmw,
1565 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
1566 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
1567 {
1568 self.executor.register_service_sized_on::<
1569 Svc,
1570 F,
1571 { crate::config::DEFAULT_RX_BUF_SIZE },
1572 { crate::config::DEFAULT_RX_BUF_SIZE },
1573 >(self.node_id, name, QosSettings::services_default(), callback)
1574 }
1575}
1576
1577pub struct CtxServiceBuilder<'c, 'e, 't> {
1579 ctx: &'c mut NodeCtx<'e>,
1580 name: &'t str,
1581 qos: QosSettings,
1582}
1583
1584impl<'c, 'e, 't> CtxServiceBuilder<'c, 'e, 't> {
1585 pub fn qos(mut self, qos: QosSettings) -> Self {
1588 self.qos = qos;
1589 self
1590 }
1591
1592 pub fn build<Svc, F>(self, callback: F) -> Result<super::types::HandleId, NodeError>
1593 where
1594 Svc: RosService + 'static,
1595 Svc::Request: crate::cyclonedds_register::MessageForRmw,
1596 Svc::Reply: crate::cyclonedds_register::MessageForRmw,
1597 F: FnMut(&Svc::Request) -> Svc::Reply + 'static,
1598 {
1599 self.ctx.executor.register_service_sized_on::<
1600 Svc,
1601 F,
1602 { crate::config::DEFAULT_RX_BUF_SIZE },
1603 { crate::config::DEFAULT_RX_BUF_SIZE },
1604 >(self.ctx.node_id, self.name, self.qos, callback)
1605 }
1606}
1607
1608pub struct CtxPublisherBuilder<'c, 'e, 't> {
1610 ctx: &'c mut NodeCtx<'e>,
1611 topic: &'t str,
1612 qos: QosSettings,
1613}
1614
1615impl<'c, 'e, 't> CtxPublisherBuilder<'c, 'e, 't> {
1616 pub fn qos(mut self, qos: QosSettings) -> Self {
1617 self.qos = qos;
1618 self
1619 }
1620
1621 pub fn typed<M: MessageForRmw>(self) -> CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1623 CtxTypedPublisherBuilder {
1624 ctx: self.ctx,
1625 topic: self.topic,
1626 qos: self.qos,
1627 _phantom: PhantomData,
1628 }
1629 }
1630
1631 pub fn generic(
1633 self,
1634 type_name: &'t str,
1635 type_hash: &'t str,
1636 ) -> CtxGenericPublisherBuilder<'c, 'e, 't> {
1637 CtxGenericPublisherBuilder {
1638 ctx: self.ctx,
1639 topic: self.topic,
1640 type_name,
1641 type_hash,
1642 qos: self.qos,
1643 }
1644 }
1645}
1646
1647pub struct CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1649 ctx: &'c mut NodeCtx<'e>,
1650 topic: &'t str,
1651 qos: QosSettings,
1652 _phantom: PhantomData<M>,
1653}
1654
1655impl<'c, 'e, 't, M: MessageForRmw> CtxTypedPublisherBuilder<'c, 'e, 't, M> {
1656 pub fn qos(mut self, qos: QosSettings) -> Self {
1657 self.qos = qos;
1658 self
1659 }
1660
1661 pub fn build(self) -> Result<EmbeddedPublisher<M>, NodeError> {
1662 self.ctx
1663 .executor
1664 .create_publisher_on::<M>(self.ctx.node_id, self.topic, self.qos)
1665 }
1666}
1667
1668pub struct CtxGenericPublisherBuilder<'c, 'e, 't> {
1670 ctx: &'c mut NodeCtx<'e>,
1671 topic: &'t str,
1672 type_name: &'t str,
1673 type_hash: &'t str,
1674 qos: QosSettings,
1675}
1676
1677impl<'c, 'e, 't> CtxGenericPublisherBuilder<'c, 'e, 't> {
1678 pub fn qos(mut self, qos: QosSettings) -> Self {
1679 self.qos = qos;
1680 self
1681 }
1682
1683 pub fn build(self) -> Result<crate::executor::handles::EmbeddedRawPublisher, NodeError> {
1684 self.ctx.executor.create_publisher_raw_on(
1685 self.ctx.node_id,
1686 self.topic,
1687 self.type_name,
1688 self.type_hash,
1689 self.qos,
1690 )
1691 }
1692}
1693
1694pub struct SubscriptionBuilder<'c, 'e, 't> {
1696 ctx: &'c mut NodeCtx<'e>,
1697 topic: &'t str,
1698 qos: QosSettings,
1699}
1700
1701impl<'c, 'e, 't> SubscriptionBuilder<'c, 'e, 't> {
1702 pub fn qos(mut self, qos: QosSettings) -> Self {
1703 self.qos = qos;
1704 self
1705 }
1706
1707 pub fn typed<M: MessageForRmw + 'static>(self) -> TypedSubscriptionBuilder<'c, 'e, 't, M> {
1709 TypedSubscriptionBuilder {
1710 ctx: self.ctx,
1711 topic: self.topic,
1712 qos: self.qos,
1713 sched: None,
1714 _phantom: PhantomData,
1715 }
1716 }
1717
1718 pub fn generic(
1720 self,
1721 type_name: &'t str,
1722 type_hash: &'t str,
1723 ) -> GenericSubscriptionBuilder<'c, 'e, 't> {
1724 GenericSubscriptionBuilder {
1725 ctx: self.ctx,
1726 topic: self.topic,
1727 type_name,
1728 type_hash,
1729 qos: self.qos,
1730 sched: None,
1731 }
1732 }
1733}
1734
1735pub struct TypedSubscriptionBuilder<
1738 'c,
1739 'e,
1740 't,
1741 M,
1742 const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1743> {
1744 ctx: &'c mut NodeCtx<'e>,
1745 topic: &'t str,
1746 qos: QosSettings,
1747 sched: Option<super::sched_context::SchedContextId>,
1748 _phantom: PhantomData<M>,
1749}
1750
1751impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1752 TypedSubscriptionBuilder<'c, 'e, 't, M, RX>
1753{
1754 pub fn qos(mut self, qos: QosSettings) -> Self {
1755 self.qos = qos;
1756 self
1757 }
1758
1759 pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1761 self.sched = Some(sc);
1762 self
1763 }
1764
1765 pub fn rx_buffer<const N: usize>(self) -> TypedSubscriptionBuilder<'c, 'e, 't, M, N> {
1767 TypedSubscriptionBuilder {
1768 ctx: self.ctx,
1769 topic: self.topic,
1770 qos: self.qos,
1771 sched: self.sched,
1772 _phantom: PhantomData,
1773 }
1774 }
1775
1776 pub fn message_info(self) -> TypedSubInfoBuilder<'c, 'e, 't, M, RX> {
1781 TypedSubInfoBuilder {
1782 ctx: self.ctx,
1783 topic: self.topic,
1784 qos: self.qos,
1785 sched: self.sched,
1786 _phantom: PhantomData,
1787 }
1788 }
1789
1790 #[cfg(feature = "safety-e2e")]
1793 pub fn safety(self) -> TypedSubSafetyBuilder<'c, 'e, 't, M, RX> {
1794 TypedSubSafetyBuilder {
1795 ctx: self.ctx,
1796 topic: self.topic,
1797 qos: self.qos,
1798 sched: self.sched,
1799 _phantom: PhantomData,
1800 }
1801 }
1802
1803 pub fn build<F: FnMut(&M) + 'static>(
1804 self,
1805 callback: F,
1806 ) -> Result<super::types::HandleId, NodeError> {
1807 let handle = self
1808 .ctx
1809 .executor
1810 .register_subscription_buffered_on::<M, F, RX>(
1811 self.ctx.node_id,
1812 self.topic,
1813 self.qos,
1814 callback,
1815 )?;
1816 if let Some(sc) = self.sched {
1817 self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1818 }
1819 Ok(handle)
1820 }
1821}
1822
1823pub struct TypedSubInfoBuilder<
1826 'c,
1827 'e,
1828 't,
1829 M,
1830 const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1831> {
1832 ctx: &'c mut NodeCtx<'e>,
1833 topic: &'t str,
1834 qos: QosSettings,
1835 sched: Option<super::sched_context::SchedContextId>,
1836 _phantom: PhantomData<M>,
1837}
1838
1839impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1840 TypedSubInfoBuilder<'c, 'e, 't, M, RX>
1841{
1842 pub fn qos(mut self, qos: QosSettings) -> Self {
1843 self.qos = qos;
1844 self
1845 }
1846
1847 pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1848 self.sched = Some(sc);
1849 self
1850 }
1851
1852 pub fn rx_buffer<const N: usize>(self) -> TypedSubInfoBuilder<'c, 'e, 't, M, N> {
1853 TypedSubInfoBuilder {
1854 ctx: self.ctx,
1855 topic: self.topic,
1856 qos: self.qos,
1857 sched: self.sched,
1858 _phantom: PhantomData,
1859 }
1860 }
1861
1862 pub fn build<F: FnMut(&M, Option<&nros_core::MessageInfo>) + 'static>(
1863 self,
1864 callback: F,
1865 ) -> Result<super::types::HandleId, NodeError> {
1866 let handle = self
1867 .ctx
1868 .executor
1869 .register_subscription_with_info_sized_inner::<M, F, RX>(
1870 Some(self.ctx.node_id),
1871 self.topic,
1872 self.qos,
1873 callback,
1874 )?;
1875 if let Some(sc) = self.sched {
1876 self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1877 }
1878 Ok(handle)
1879 }
1880}
1881
1882#[cfg(feature = "safety-e2e")]
1885pub struct TypedSubSafetyBuilder<
1886 'c,
1887 'e,
1888 't,
1889 M,
1890 const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1891> {
1892 ctx: &'c mut NodeCtx<'e>,
1893 topic: &'t str,
1894 qos: QosSettings,
1895 sched: Option<super::sched_context::SchedContextId>,
1896 _phantom: PhantomData<M>,
1897}
1898
1899#[cfg(feature = "safety-e2e")]
1900impl<'c, 'e, 't, M: MessageForRmw + 'static, const RX: usize>
1901 TypedSubSafetyBuilder<'c, 'e, 't, M, RX>
1902{
1903 pub fn qos(mut self, qos: QosSettings) -> Self {
1904 self.qos = qos;
1905 self
1906 }
1907
1908 pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1909 self.sched = Some(sc);
1910 self
1911 }
1912
1913 pub fn rx_buffer<const N: usize>(self) -> TypedSubSafetyBuilder<'c, 'e, 't, M, N> {
1914 TypedSubSafetyBuilder {
1915 ctx: self.ctx,
1916 topic: self.topic,
1917 qos: self.qos,
1918 sched: self.sched,
1919 _phantom: PhantomData,
1920 }
1921 }
1922
1923 pub fn build<F: FnMut(&M, &nros_rmw::IntegrityStatus) + 'static>(
1924 self,
1925 callback: F,
1926 ) -> Result<super::types::HandleId, NodeError> {
1927 let handle = self
1928 .ctx
1929 .executor
1930 .register_subscription_with_safety_sized_inner::<M, F, RX>(
1931 Some(self.ctx.node_id),
1932 self.topic,
1933 self.qos,
1934 callback,
1935 )?;
1936 if let Some(sc) = self.sched {
1937 self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
1938 }
1939 Ok(handle)
1940 }
1941}
1942
1943pub struct GenericSubscriptionBuilder<
1945 'c,
1946 'e,
1947 't,
1948 const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
1949> {
1950 ctx: &'c mut NodeCtx<'e>,
1951 topic: &'t str,
1952 type_name: &'t str,
1953 type_hash: &'t str,
1954 qos: QosSettings,
1955 sched: Option<super::sched_context::SchedContextId>,
1956}
1957
1958impl<'c, 'e, 't, const RX: usize> GenericSubscriptionBuilder<'c, 'e, 't, RX> {
1959 pub fn qos(mut self, qos: QosSettings) -> Self {
1960 self.qos = qos;
1961 self
1962 }
1963
1964 pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
1965 self.sched = Some(sc);
1966 self
1967 }
1968
1969 pub fn rx_buffer<const N: usize>(self) -> GenericSubscriptionBuilder<'c, 'e, 't, N> {
1970 GenericSubscriptionBuilder {
1971 ctx: self.ctx,
1972 topic: self.topic,
1973 type_name: self.type_name,
1974 type_hash: self.type_hash,
1975 qos: self.qos,
1976 sched: self.sched,
1977 }
1978 }
1979
1980 pub fn message_info(self) -> GenericSubInfoBuilder<'c, 'e, 't, RX> {
1984 GenericSubInfoBuilder {
1985 ctx: self.ctx,
1986 topic: self.topic,
1987 type_name: self.type_name,
1988 type_hash: self.type_hash,
1989 qos: self.qos,
1990 sched: self.sched,
1991 }
1992 }
1993
1994 pub fn build<F: FnMut(&[u8]) + 'static>(
1995 self,
1996 callback: F,
1997 ) -> Result<super::types::HandleId, NodeError> {
1998 let handle = self
1999 .ctx
2000 .executor
2001 .register_subscription_buffered_raw_on::<F, RX>(
2002 self.ctx.node_id,
2003 self.topic,
2004 self.type_name,
2005 self.type_hash,
2006 self.qos,
2007 callback,
2008 )?;
2009 if let Some(sc) = self.sched {
2010 self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
2011 }
2012 Ok(handle)
2013 }
2014}
2015
2016pub struct GenericSubInfoBuilder<
2019 'c,
2020 'e,
2021 't,
2022 const RX: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
2023> {
2024 ctx: &'c mut NodeCtx<'e>,
2025 topic: &'t str,
2026 type_name: &'t str,
2027 type_hash: &'t str,
2028 qos: QosSettings,
2029 sched: Option<super::sched_context::SchedContextId>,
2030}
2031
2032impl<'c, 'e, 't, const RX: usize> GenericSubInfoBuilder<'c, 'e, 't, RX> {
2033 pub fn qos(mut self, qos: QosSettings) -> Self {
2034 self.qos = qos;
2035 self
2036 }
2037
2038 pub fn sched_context(mut self, sc: super::sched_context::SchedContextId) -> Self {
2039 self.sched = Some(sc);
2040 self
2041 }
2042
2043 pub fn rx_buffer<const N: usize>(self) -> GenericSubInfoBuilder<'c, 'e, 't, N> {
2044 GenericSubInfoBuilder {
2045 ctx: self.ctx,
2046 topic: self.topic,
2047 type_name: self.type_name,
2048 type_hash: self.type_hash,
2049 qos: self.qos,
2050 sched: self.sched,
2051 }
2052 }
2053
2054 pub fn build<F: FnMut(&[u8], &nros_core::RawMessageInfo) + 'static>(
2055 self,
2056 callback: F,
2057 ) -> Result<super::types::HandleId, NodeError> {
2058 let handle = self
2059 .ctx
2060 .executor
2061 .register_subscription_buffered_raw_info_on::<F, RX>(
2062 self.ctx.node_id,
2063 self.topic,
2064 self.type_name,
2065 self.type_hash,
2066 self.qos,
2067 callback,
2068 )?;
2069 if let Some(sc) = self.sched {
2070 self.ctx.executor.bind_handle_to_sched_context(handle, sc)?;
2071 }
2072 Ok(handle)
2073 }
2074}
2075
2076#[cfg(all(test, not(feature = "rmw-cffi")))]
2082mod builder_tests {
2083 use super::*;
2084 use crate::{executor::Executor, mock::MockSession};
2085 use nros_core::{CdrReader, CdrWriter, DeserError, Deserialize, SerError, Serialize};
2086
2087 struct TestMsg;
2088 impl RosMessage for TestMsg {
2089 const TYPE_NAME: &'static str = "test/msg/TestMsg";
2090 const TYPE_HASH: &'static str = "test_hash";
2091 }
2092 impl Serialize for TestMsg {
2093 fn serialize(&self, _w: &mut CdrWriter) -> Result<(), SerError> {
2094 Ok(())
2095 }
2096 }
2097 impl Deserialize for TestMsg {
2098 fn deserialize(_r: &mut CdrReader) -> Result<Self, DeserError> {
2099 Ok(Self)
2100 }
2101 }
2102 #[cfg(rmw_cyclonedds_present)]
2108 impl nros_serdes::schema::Message for TestMsg {
2109 const TYPE_NAME: &'static str = "test/msg/TestMsg";
2110 const FIELDS: &'static [nros_serdes::schema::Field] = &[nros_serdes::schema::Field {
2111 name: "data",
2112 ty: nros_serdes::schema::FieldType::Uint8,
2113 offset: 0,
2114 }];
2115 }
2116
2117 fn s(v: &str) -> heapless::String<64> {
2118 heapless::String::try_from(v).unwrap()
2119 }
2120
2121 #[test]
2122 fn publisher_builder_typed_and_generic() {
2123 let mut session = MockSession::new();
2124 let mut node = NodeHandle::new(s("n"), s("/"), &mut session, 0);
2125
2126 let _typed = node
2128 .publisher("/chatter")
2129 .typed::<TestMsg>()
2130 .qos(QosSettings::default().keep_last(5))
2131 .build()
2132 .expect("typed publisher builds");
2133
2134 let _generic = node
2136 .publisher("/chatter")
2137 .qos(QosSettings::default())
2138 .generic("std_msgs/msg/Int32", "hash")
2139 .build()
2140 .expect("generic publisher builds");
2141 }
2142
2143 #[test]
2144 fn subscription_builder_and_convenient() {
2145 let mut exec: Executor = Executor::from_session(MockSession::new());
2146 let id = exec.node_builder("n").build().expect("node");
2147
2148 let _h = exec
2150 .node_mut(id)
2151 .subscription("/chatter")
2152 .typed::<TestMsg>()
2153 .qos(QosSettings::default().keep_last(5))
2154 .build(|_m: &TestMsg| {})
2155 .expect("typed subscription builds");
2156
2157 let _g = exec
2159 .node_mut(id)
2160 .subscription("/raw")
2161 .generic("std_msgs/msg/Int32", "hash")
2162 .build(|_b: &[u8]| {})
2163 .expect("generic subscription builds");
2164
2165 let sc = exec.default_sched_context_id();
2167 let _s = exec
2168 .node_mut(id)
2169 .subscription("/sized")
2170 .typed::<TestMsg>()
2171 .rx_buffer::<64>()
2172 .sched_context(sc)
2173 .build(|_m: &TestMsg| {})
2174 .expect("sized + sched subscription builds");
2175
2176 let _c = exec
2178 .node_mut(id)
2179 .create_subscription::<TestMsg, _>("/conv", |_m: &TestMsg| {})
2180 .expect("convenient typed subscription builds");
2181 }
2182
2183 #[test]
2184 fn generic_message_info_builder() {
2185 let mut exec: Executor = Executor::from_session(MockSession::new());
2188 let id = exec.node_builder("n").build().expect("node");
2189
2190 let _i = exec
2191 .node_mut(id)
2192 .subscription("/info")
2193 .generic("std_msgs/msg/Int32", "hash")
2194 .message_info()
2195 .rx_buffer::<256>()
2196 .build(|_payload: &[u8], info: &nros_core::RawMessageInfo| {
2197 let _ = info.attachment();
2198 })
2199 .expect("generic + message_info subscription builds");
2200 }
2201
2202 #[test]
2203 fn typed_message_info_builder() {
2204 let mut exec: Executor = Executor::from_session(MockSession::new());
2207 let id = exec.node_builder("n").build().expect("node");
2208 let _h = exec
2209 .node_mut(id)
2210 .subscription("/chatter")
2211 .typed::<TestMsg>()
2212 .qos(QosSettings::default().keep_last(5))
2213 .message_info()
2214 .build(|_m: &TestMsg, _info: Option<&nros_core::MessageInfo>| {})
2215 .expect("typed + message_info subscription builds");
2216 }
2217
2218 #[cfg(feature = "safety-e2e")]
2219 #[test]
2220 fn typed_safety_builder() {
2221 let mut exec: Executor = Executor::from_session(MockSession::new());
2223 let id = exec.node_builder("n").build().expect("node");
2224 let _h = exec
2225 .node_mut(id)
2226 .subscription("/chatter")
2227 .typed::<TestMsg>()
2228 .safety()
2229 .build(|_m: &TestMsg, _status: &nros_rmw::IntegrityStatus| {})
2230 .expect("typed + safety subscription builds");
2231 }
2232
2233 #[test]
2234 fn generator_emitted_chain_compiles() {
2235 let mut exec: Executor = Executor::from_session(MockSession::new());
2238 let id = exec.node_builder("n").build().expect("node");
2239 let _h = exec
2240 .node_mut(id)
2241 .subscription("/topic")
2242 .generic("std_msgs/msg/Int32", "hash")
2243 .qos(QosSettings::default().keep_last(1))
2244 .rx_buffer::<1024>()
2245 .build(|_data: &[u8]| {})
2246 .expect("generator-shape subscription builds");
2247 }
2248
2249 #[test]
2250 fn nodectx_publisher_and_bridge_shape() {
2251 let mut exec: Executor = Executor::from_session(MockSession::new());
2255 let id = exec.node_builder("n").build().expect("node");
2256
2257 let _p = exec
2259 .node_mut(id)
2260 .create_publisher::<TestMsg>("/p")
2261 .expect("ctx convenient publisher");
2262 let dest_pub = exec
2263 .node_mut(id)
2264 .publisher("/fwd")
2265 .generic("std_msgs/msg/Int32", "hash")
2266 .build()
2267 .expect("ctx generic publisher builds"); let _s = exec
2271 .node_mut(id)
2272 .subscription("/src")
2273 .generic("std_msgs/msg/Int32", "hash")
2274 .message_info()
2275 .build(move |payload: &[u8], _info: &nros_core::RawMessageInfo| {
2276 let _ = dest_pub.publish_raw(payload);
2277 })
2278 .expect("bridge-shape source subscription builds");
2279 }
2280}