Skip to main content

nros_node/executor/
action_core.rs

1//! Type-agnostic action protocol core types.
2//!
3//! [`ActionServerCore`] and [`ActionClientCore`] handle the raw-bytes
4//! action protocol (GoalId framing, status publishing, result slab)
5//! without requiring `RosAction` type parameters. The typed
6//! [`ActionServer`](super::handles::ActionServer) and
7//! [`ActionClient`](super::handles::ActionClient) wrap these cores
8//! and add serialization/deserialization at the boundary.
9
10use nros_core::{CdrReader, CdrWriter, GoalId, GoalInfo, GoalStatus, GoalStatusStamped, Serialize};
11use nros_rmw::{Publisher, ServiceClientTrait, ServiceServerTrait, Subscriber, TransportError};
12
13use super::types::NodeError;
14use crate::session;
15
16/// Scratch buffer for a decoded CancelGoal request. Cancel payloads are
17/// tiny (a `GoalId` + a `builtin_interfaces/Time` stamp), so a fixed
18/// 256-byte buffer covers them without a const-generic parameter like the
19/// goal/result/feedback slabs.
20pub(crate) const CANCEL_BUF: usize = 256;
21
22/// DDS type name of an action's `send_goal` / `get_result` service.
23///
24/// ROS 2 names the action's two services by their per-channel service types —
25/// `<Action>_SendGoal` / `<Action>_GetResult` — **not** the bare action type
26/// (`<Action>`). A real `rcl_action` peer matches on those, so advertising the
27/// bare action type leaves our send_goal/get_result services undiscovered and
28/// every goal times out.
29///
30/// The codegen emits each channel's *request* envelope type name in DDS form,
31/// e.g. `example_interfaces::action::dds_::Fibonacci_SendGoal_Request_`. The
32/// service layer (`xrce_dds_request_type` / the Zenoh shim) extends a base type
33/// ending in `_` with `Request_` / `Response_`, so the base we must pass is the
34/// request type name with its trailing `Request_` stripped:
35/// `…Fibonacci_SendGoal_Request_` → `…Fibonacci_SendGoal_`. Falls back to the
36/// bare action type if the request name has an unexpected shape.
37pub(crate) fn action_service_base_type<'a>(
38    request_type_name: &'a str,
39    fallback_action_type: &'a str,
40) -> &'a str {
41    request_type_name
42        .strip_suffix("Request_")
43        .unwrap_or(fallback_action_type)
44}
45
46/// Scratch buffer for serializing a `GoalStatusArray` before publishing it
47/// on the status topic. 512 bytes holds the CDR header plus a status entry
48/// (`GoalInfo` + status enum) for every concurrently-tracked goal.
49const STATUS_ARRAY_BUF: usize = 512;
50
51// ============================================================================
52// Supporting types
53// ============================================================================
54
55/// Goal tracked by the core — only GoalId + status, no typed data.
56#[derive(Clone, Copy)]
57pub struct RawActiveGoal {
58    /// Goal ID.
59    pub goal_id: GoalId,
60    /// Current status.
61    pub status: GoalStatus,
62}
63
64/// A `get_result` request held until its goal terminates (Phase 237).
65///
66/// `sequence_number` is the service-backend reply-correlation token; the backend
67/// must be able to `send_reply(sequence_number, …)` after the handler returned
68/// (Cyclone native; XRCE/Zenoh via the Phase 237 seq-keyed reply tables).
69#[derive(Clone, Copy)]
70pub struct PendingGetResult {
71    /// Goal whose terminal result the requester is waiting for.
72    pub goal_id: GoalId,
73    /// Backend reply-correlation token for the deferred `send_reply`.
74    pub sequence_number: i64,
75}
76
77/// Completed goal result metadata — indexes into the result slab.
78#[derive(Clone, Copy)]
79pub struct CompletedResultEntry {
80    /// Unique identifier for the completed goal.
81    pub goal_id: GoalId,
82    /// Terminal status of the goal.
83    pub status: GoalStatus,
84    /// Byte offset into the result slab.
85    pub offset: usize,
86    /// Length of the serialised result in bytes.
87    pub len: usize,
88}
89
90/// Phase 122.3.c.6.d — information about a peeked cancel-goal
91/// request. Returned by
92/// [`ActionServerCore::try_recv_cancel_request`].
93pub struct PendingCancelRequest {
94    /// The goal_id named in the cancel request.
95    pub goal_id: GoalId,
96    /// Service sequence number — pass back to
97    /// [`ActionServerCore::send_cancel_reply`].
98    pub sequence_number: i64,
99    /// Snapshot of the goal's current status at peek time
100    /// (`GoalStatus::Unknown` if no matching active goal).
101    pub current_status: GoalStatus,
102}
103
104/// Information about a received goal request.
105pub struct RawGoalRequest {
106    /// The parsed goal ID.
107    pub goal_id: GoalId,
108    /// Sequence number for the service reply.
109    pub sequence_number: i64,
110    /// Offset into the goal buffer where the CDR payload begins.
111    /// Backends may prepend a sequence-number header (DDS) or place
112    /// the payload at offset 0 (zenoh).
113    pub data_offset: usize,
114    /// Length of valid CDR data starting at `data_offset`.
115    pub data_len: usize,
116}
117
118// ============================================================================
119// GoalId CDR helpers
120// ============================================================================
121
122/// Read a GoalId from a CDR reader as a fixed `uint8[16]` array.
123///
124/// ROS 2 actions carry the goal id as `unique_identifier_msgs/UUID`, whose
125/// single field is a **fixed-size** `uint8[16]` array — CDR fixed arrays have
126/// **no** length prefix. We must read exactly 16 bytes with no leading count,
127/// matching `unique_identifier_msgs::msg::UUID::deserialize`. (The pre-233.6
128/// framing wrote a `u32(16)` sequence prefix, which self-matched nano-ros peers
129/// but added 4 bytes a real `rcl_action` peer rejects.)
130fn read_goal_id(reader: &mut CdrReader<'_>) -> Result<GoalId, NodeError> {
131    let mut goal_id = GoalId::default();
132    for byte in &mut goal_id.uuid {
133        *byte = reader
134            .read_u8()
135            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
136    }
137    Ok(goal_id)
138}
139
140/// Write a GoalId into a CDR writer as a fixed `uint8[16]` array (no length
141/// prefix) — see [`read_goal_id`] for why the prefix must be absent.
142fn write_goal_id(writer: &mut CdrWriter<'_>, goal_id: &GoalId) -> Result<(), NodeError> {
143    for b in &goal_id.uuid {
144        writer.write_u8(*b).map_err(|_| NodeError::Serialization)?;
145    }
146    Ok(())
147}
148
149// ============================================================================
150// ActionServerCore
151// ============================================================================
152
153/// Type-agnostic action server core handling the raw-bytes protocol.
154///
155/// Manages active goal tracking (GoalId + status), completed result storage
156/// in a fixed-size slab, and all CDR framing for the action protocol.
157///
158/// The typed [`ActionServer`](super::handles::ActionServer) wraps this
159/// and adds `A::Goal` / `A::Feedback` / `A::Result` (de)serialization.
160pub struct ActionServerCore<
161    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
162    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
163    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
164    const MAX_GOALS: usize = 4,
165> {
166    pub(crate) send_goal_server: session::RmwServiceServer,
167    pub(crate) cancel_goal_server: session::RmwServiceServer,
168    pub(crate) get_result_server: session::RmwServiceServer,
169    pub(crate) feedback_publisher: session::RmwPublisher,
170    pub(crate) status_publisher: session::RmwPublisher,
171    pub(crate) active_goals: heapless::Vec<RawActiveGoal, MAX_GOALS>,
172    pub(crate) completed_results: heapless::Vec<CompletedResultEntry, MAX_GOALS>,
173    /// `get_result` requests that arrived while their goal was still active
174    /// (Phase 237). `rclcpp_action` sends `get_result` immediately after
175    /// acceptance and expects the reply only once the goal terminates, so we
176    /// hold the request's correlation token (`sequence_number`) here and flush
177    /// it in [`Self::complete_goal_raw`]. Deferral relies on the service
178    /// backend honoring `send_reply(seq)` after the handler returns — the
179    /// seq-keyed reply contract (Cyclone native; XRCE/Zenoh per Phase 237).
180    pub(crate) pending_get_results: heapless::Vec<PendingGetResult, MAX_GOALS>,
181    /// Slab storage for completed result CDR bytes.
182    pub(crate) result_slab: [u8; RESULT_BUF],
183    pub(crate) result_slab_used: usize,
184    pub(crate) goal_buffer: [u8; GOAL_BUF],
185    pub(crate) feedback_buffer: [u8; FEEDBACK_BUF],
186    pub(crate) cancel_buffer: [u8; CANCEL_BUF],
187}
188
189impl<
190    const GOAL_BUF: usize,
191    const RESULT_BUF: usize,
192    const FEEDBACK_BUF: usize,
193    const MAX_GOALS: usize,
194> ActionServerCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF, MAX_GOALS>
195{
196    /// Phase 122.3.c.6.b — construct an `ActionServerCore` from the
197    /// 5 already-built transport channels. Caller (typically the C
198    /// API's `nros_action_server_init_polling`) owns wiring the
199    /// channels via the session's `create_*` methods.
200    pub fn from_channels(
201        send_goal_server: session::RmwServiceServer,
202        cancel_goal_server: session::RmwServiceServer,
203        get_result_server: session::RmwServiceServer,
204        feedback_publisher: session::RmwPublisher,
205        status_publisher: session::RmwPublisher,
206    ) -> Self {
207        Self {
208            send_goal_server,
209            cancel_goal_server,
210            get_result_server,
211            feedback_publisher,
212            status_publisher,
213            active_goals: heapless::Vec::new(),
214            completed_results: heapless::Vec::new(),
215            pending_get_results: heapless::Vec::new(),
216            result_slab: [0u8; RESULT_BUF],
217            result_slab_used: 0,
218            goal_buffer: [0u8; GOAL_BUF],
219            feedback_buffer: [0u8; FEEDBACK_BUF],
220            cancel_buffer: [0u8; CANCEL_BUF],
221        }
222    }
223
224    /// Try to receive a goal request from the send_goal service.
225    ///
226    /// Returns the parsed GoalId, sequence number, and data length.
227    /// The full CDR data (including GoalId) remains in `goal_buffer`.
228    pub fn try_recv_goal_request(&mut self) -> Result<Option<RawGoalRequest>, NodeError> {
229        // Capture buf base ptr before borrowing through `try_recv_request`
230        // so we can recover the data offset after the borrow ends.
231        // DDS-style backends place a sequence-number prefix before the
232        // CDR payload; reading the buffer from offset 0 unconditionally
233        // would feed the prefix bytes to the deserializer.
234        let buf_start = self.goal_buffer.as_ptr() as usize;
235        // Phase 120: NoData (no pending request) is the steady-state
236        // expected condition — collapse it to `Ok(None)` instead of
237        // surfacing as ServiceRequestFailed. Any other transport
238        // error remains ServiceRequestFailed.
239        let request = match self
240            .send_goal_server
241            .try_recv_request(&mut self.goal_buffer)
242        {
243            Ok(opt) => opt,
244            Err(TransportError::NoData) => return Ok(None),
245            Err(_) => return Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
246        };
247
248        let request = match request {
249            Some(r) => r,
250            None => return Ok(None),
251        };
252
253        let data_offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
254        let data_len = request.data.len();
255        let sequence_number = request.sequence_number;
256        #[allow(clippy::drop_non_drop)]
257        drop(request);
258
259        let mut reader =
260            CdrReader::new_with_header(&self.goal_buffer[data_offset..data_offset + data_len])
261                .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
262
263        let goal_id = read_goal_id(&mut reader)?;
264
265        Ok(Some(RawGoalRequest {
266            goal_id,
267            sequence_number,
268            data_offset,
269            data_len,
270        }))
271    }
272
273    /// Get a reference to the goal buffer (valid after `try_recv_goal_request`).
274    pub fn goal_buffer(&self) -> &[u8] {
275        &self.goal_buffer
276    }
277
278    /// Accept a goal: sends the acceptance reply, adds to active goals,
279    /// publishes status.
280    pub fn accept_goal(&mut self, goal_id: GoalId, seq: i64) -> Result<(), NodeError> {
281        // Serialize response: accepted=true + stamp (Time: sec=0, nanosec=0)
282        let mut writer = CdrWriter::new_with_header(&mut self.cancel_buffer)
283            .map_err(|_| NodeError::BufferTooSmall)?;
284        writer.write_u8(1).map_err(|_| NodeError::Serialization)?;
285        writer.write_i32(0).map_err(|_| NodeError::Serialization)?;
286        writer.write_u32(0).map_err(|_| NodeError::Serialization)?;
287        let reply_len = writer.position();
288
289        self.send_goal_server
290            .send_reply(seq, &self.cancel_buffer[..reply_len])
291            .map_err(|_| NodeError::ServiceReplyFailed)?;
292
293        let _ = self.active_goals.push(RawActiveGoal {
294            goal_id,
295            status: GoalStatus::Accepted,
296        });
297        let _ = self.publish_status_array();
298        Ok(())
299    }
300
301    /// Reject a goal: sends the rejection reply.
302    pub fn reject_goal(&mut self, seq: i64) -> Result<(), NodeError> {
303        // Serialize response: accepted=false + stamp
304        let mut writer = CdrWriter::new_with_header(&mut self.cancel_buffer)
305            .map_err(|_| NodeError::BufferTooSmall)?;
306        writer.write_u8(0).map_err(|_| NodeError::Serialization)?;
307        writer.write_i32(0).map_err(|_| NodeError::Serialization)?;
308        writer.write_u32(0).map_err(|_| NodeError::Serialization)?;
309        let reply_len = writer.position();
310
311        self.send_goal_server
312            .send_reply(seq, &self.cancel_buffer[..reply_len])
313            .map_err(|_| NodeError::ServiceReplyFailed)
314    }
315
316    /// Publish feedback with raw CDR bytes.
317    ///
318    /// Writes GoalId framing + raw feedback bytes into the feedback buffer
319    /// and publishes.
320    pub fn publish_feedback_raw(
321        &mut self,
322        goal_id: &GoalId,
323        feedback_cdr: &[u8],
324    ) -> Result<(), NodeError> {
325        // GoalId framing (4 + 16 = 20 bytes) + feedback_cdr must fit in FEEDBACK_BUF
326        let needed = 4 + 20 + feedback_cdr.len(); // CDR header + GoalId + feedback
327        if needed > FEEDBACK_BUF {
328            return Err(NodeError::BufferTooSmall);
329        }
330
331        let mut writer = CdrWriter::new_with_header(&mut self.feedback_buffer)
332            .map_err(|_| NodeError::BufferTooSmall)?;
333
334        write_goal_id(&mut writer, goal_id)?;
335
336        // Copy raw feedback bytes directly after GoalId
337        let pos = writer.position();
338        if pos + feedback_cdr.len() > FEEDBACK_BUF {
339            return Err(NodeError::BufferTooSmall);
340        }
341        self.feedback_buffer[pos..pos + feedback_cdr.len()].copy_from_slice(feedback_cdr);
342        let len = pos + feedback_cdr.len();
343
344        self.feedback_publisher
345            .publish_raw(&self.feedback_buffer[..len])
346            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
347    }
348
349    /// Set a goal's status and publish the updated GoalStatusArray.
350    pub fn set_goal_status(&mut self, goal_id: &GoalId, status: GoalStatus) {
351        for goal in &mut self.active_goals {
352            if goal.goal_id.uuid == goal_id.uuid {
353                goal.status = status;
354                break;
355            }
356        }
357        let _ = self.publish_status_array();
358    }
359
360    /// Complete a goal: remove from active, store raw result CDR in slab,
361    /// publish status.
362    pub fn complete_goal_raw(&mut self, goal_id: &GoalId, status: GoalStatus, result_cdr: &[u8]) {
363        // Remove from active goals
364        if let Some(pos) = self
365            .active_goals
366            .iter()
367            .position(|g| g.goal_id.uuid == goal_id.uuid)
368        {
369            self.active_goals.swap_remove(pos);
370        }
371
372        // Store result CDR in the slab
373        let offset = self.result_slab_used;
374        let end = offset + result_cdr.len();
375        let stored = if end <= RESULT_BUF {
376            self.result_slab[offset..end].copy_from_slice(result_cdr);
377            self.result_slab_used = end;
378            let _ = self.completed_results.push(CompletedResultEntry {
379                goal_id: *goal_id,
380                status,
381                offset,
382                len: result_cdr.len(),
383            });
384            true
385        } else {
386            false
387        };
388
389        // Phase 237 — flush any get_result requests that arrived while this goal
390        // was still active. The result is now in the slab; reply to each held
391        // requester via its retained `sequence_number`.
392        if stored {
393            let len = result_cdr.len();
394            let mut i = 0;
395            while i < self.pending_get_results.len() {
396                if self.pending_get_results[i].goal_id.uuid == goal_id.uuid {
397                    let seq = self.pending_get_results[i].sequence_number;
398                    // swap_remove moves the last entry into slot `i`; re-check `i`.
399                    let _ = self.pending_get_results.swap_remove(i);
400                    let _ = self.reply_get_result_from_slab(seq, status, offset, len);
401                } else {
402                    i += 1;
403                }
404            }
405        }
406
407        let _ = self.publish_status_array();
408    }
409
410    /// Phase 122.3.c.6.e — register a `Waker` that fires when a new
411    /// send_goal request arrives. Event-driven action servers
412    /// register here in place of polling `try_recv_goal_request` on
413    /// a timer.
414    pub fn register_goal_waker(&self, waker: &core::task::Waker) {
415        use nros_rmw::ServiceServerTrait;
416        self.send_goal_server.register_waker(waker);
417    }
418
419    /// Phase 122.3.c.6.e — register a `Waker` that fires when a
420    /// cancel-goal request arrives.
421    pub fn register_cancel_waker(&self, waker: &core::task::Waker) {
422        use nros_rmw::ServiceServerTrait;
423        self.cancel_goal_server.register_waker(waker);
424    }
425
426    /// Phase 122.3.c.6.e — register a `Waker` that fires when a
427    /// get_result query arrives.
428    pub fn register_get_result_waker(&self, waker: &core::task::Waker) {
429        use nros_rmw::ServiceServerTrait;
430        self.get_result_server.register_waker(waker);
431    }
432
433    /// Phase 122.3.c.6.d — peek a pending cancel-goal request without
434    /// generating a reply. Returns the goal_id named in the request,
435    /// the matching service sequence number (use it with
436    /// [`send_cancel_reply`](Self::send_cancel_reply)), and the
437    /// goal's current status (`GoalStatus::Unknown` if no such
438    /// active goal). Returns `Ok(None)` when no cancel request is
439    /// pending.
440    ///
441    /// Used by L1 polling-mode action servers (nros-c / nros-cpp C
442    /// FFI) that want to drive cancel-decision policy without
443    /// passing a Rust closure across the C ABI. See the matching
444    /// [`send_cancel_reply`](Self::send_cancel_reply) for the reply
445    /// side. The high-level closure-based
446    /// [`try_handle_cancel`](Self::try_handle_cancel) keeps working
447    /// and now delegates to this pair.
448    pub fn try_recv_cancel_request(&mut self) -> Result<Option<PendingCancelRequest>, NodeError> {
449        let buf_start = self.cancel_buffer.as_ptr() as usize;
450        let request = match self
451            .cancel_goal_server
452            .try_recv_request(&mut self.cancel_buffer)
453        {
454            Ok(Some(r)) => r,
455            Ok(None) | Err(TransportError::NoData) => return Ok(None),
456            Err(_) => return Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
457        };
458
459        let data_offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
460        let data_len = request.data.len();
461        let sequence_number = request.sequence_number;
462        #[allow(clippy::drop_non_drop)]
463        drop(request);
464
465        let mut reader =
466            CdrReader::new_with_header(&self.cancel_buffer[data_offset..data_offset + data_len])
467                .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
468
469        let goal_id = read_goal_id(&mut reader)?;
470        let current_status = self.find_goal_status(&goal_id);
471
472        Ok(Some(PendingCancelRequest {
473            goal_id,
474            sequence_number,
475            current_status,
476        }))
477    }
478
479    /// Phase 122.3.c.6.d — send the reply to a previously-peeked
480    /// cancel-goal request. `sequence_number` must match the value
481    /// returned by [`try_recv_cancel_request`](Self::try_recv_cancel_request).
482    ///
483    /// `return_code` is the overall RPC status (`CancelResponse::Ok`
484    /// = at least one cancel honoured; other variants = whole-request
485    /// failure). `accepted` lists the goals that transition to
486    /// `Canceling`; this function flips their stored status before
487    /// publishing the status array.
488    pub fn send_cancel_reply(
489        &mut self,
490        sequence_number: i64,
491        return_code: nros_core::CancelResponse,
492        accepted: &[GoalId],
493    ) -> Result<(), NodeError> {
494        for id in accepted {
495            self.set_goal_status(id, GoalStatus::Canceling);
496        }
497
498        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
499            .map_err(|_| NodeError::BufferTooSmall)?;
500        writer
501            .write_i8(return_code as i8)
502            .map_err(|_| NodeError::Serialization)?;
503        let count = u32::try_from(accepted.len()).unwrap_or(u32::MAX);
504        writer
505            .write_u32(count)
506            .map_err(|_| NodeError::Serialization)?;
507        for id in accepted {
508            write_goal_id(&mut writer, id)?;
509            // GoalInfo.stamp — zero timestamp.
510            writer.write_i32(0).map_err(|_| NodeError::Serialization)?;
511            writer.write_u32(0).map_err(|_| NodeError::Serialization)?;
512        }
513        let reply_len = writer.position();
514
515        self.cancel_goal_server
516            .send_reply(sequence_number, &self.goal_buffer[..reply_len])
517            .map_err(|_| NodeError::ServiceReplyFailed)?;
518
519        if !accepted.is_empty() {
520            let _ = self.publish_status_array();
521        }
522
523        Ok(())
524    }
525
526    /// Try to handle a cancel_goal request (type-agnostic).
527    pub fn try_handle_cancel(
528        &mut self,
529        cancel_handler: impl FnOnce(&GoalId, GoalStatus) -> nros_core::CancelResponse,
530    ) -> Result<Option<(GoalId, nros_core::CancelResponse)>, NodeError> {
531        let buf_start = self.cancel_buffer.as_ptr() as usize;
532        // Phase 120: NoData == steady-state idle; map to Ok(None).
533        let request = match self
534            .cancel_goal_server
535            .try_recv_request(&mut self.cancel_buffer)
536        {
537            Ok(Some(r)) => r,
538            Ok(None) | Err(TransportError::NoData) => return Ok(None),
539            Err(_) => return Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
540        };
541
542        let data_offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
543        let data_len = request.data.len();
544        let sequence_number = request.sequence_number;
545        #[allow(clippy::drop_non_drop)]
546        drop(request);
547
548        let mut reader =
549            CdrReader::new_with_header(&self.cancel_buffer[data_offset..data_offset + data_len])
550                .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
551
552        let goal_id = read_goal_id(&mut reader)?;
553
554        let current_status = self.find_goal_status(&goal_id);
555        let response = cancel_handler(&goal_id, current_status);
556
557        if response == nros_core::CancelResponse::Ok {
558            self.set_goal_status(&goal_id, GoalStatus::Canceling);
559        }
560
561        // Serialize response: return_code (i8) + goals_canceling (sequence of GoalInfo)
562        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
563            .map_err(|_| NodeError::BufferTooSmall)?;
564        writer
565            .write_i8(response as i8)
566            .map_err(|_| NodeError::Serialization)?;
567
568        let num_canceling = if response == nros_core::CancelResponse::Ok {
569            1u32
570        } else {
571            0u32
572        };
573        writer
574            .write_u32(num_canceling)
575            .map_err(|_| NodeError::Serialization)?;
576        if response == nros_core::CancelResponse::Ok {
577            write_goal_id(&mut writer, &goal_id)?;
578            writer.write_i32(0).map_err(|_| NodeError::Serialization)?;
579            writer.write_u32(0).map_err(|_| NodeError::Serialization)?;
580        }
581        let reply_len = writer.position();
582
583        self.cancel_goal_server
584            .send_reply(sequence_number, &self.goal_buffer[..reply_len])
585            .map_err(|_| NodeError::ServiceReplyFailed)?;
586
587        Ok(Some((goal_id, response)))
588    }
589
590    /// Try to handle a get_result request using raw bytes.
591    ///
592    /// For completed goals, sends the stored raw result CDR from the slab.
593    /// For active/unknown goals, sends the provided `default_result_cdr` bytes.
594    ///
595    /// `default_result_cdr` should contain serialized result data (without CDR
596    /// header or status byte) — typically `A::Result::default()` serialized.
597    pub fn try_handle_get_result_raw(
598        &mut self,
599        default_result_cdr: &[u8],
600    ) -> Result<Option<GoalId>, NodeError> {
601        let buf_start = self.goal_buffer.as_ptr() as usize;
602        // Phase 120: NoData == steady-state idle; map to Ok(None).
603        let request = match self
604            .get_result_server
605            .try_recv_request(&mut self.goal_buffer)
606        {
607            Ok(Some(r)) => r,
608            Ok(None) | Err(TransportError::NoData) => return Ok(None),
609            Err(_) => return Err(NodeError::Transport(TransportError::ServiceRequestFailed)),
610        };
611
612        let data_offset = (request.data.as_ptr() as usize).saturating_sub(buf_start);
613        let data_len = request.data.len();
614        let sequence_number = request.sequence_number;
615        #[allow(clippy::drop_non_drop)]
616        drop(request);
617
618        let mut reader =
619            CdrReader::new_with_header(&self.goal_buffer[data_offset..data_offset + data_len])
620                .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
621
622        let goal_id = read_goal_id(&mut reader)?;
623
624        // Look up in completed results
625        let completed = self
626            .completed_results
627            .iter()
628            .find(|c| c.goal_id.uuid == goal_id.uuid);
629
630        if let Some(entry) = completed {
631            // Completed: send status + stored result CDR from the slab.
632            let (off, len, status) = (entry.offset, entry.len, entry.status);
633            self.reply_get_result_from_slab(sequence_number, status, off, len)?;
634        } else if self
635            .active_goals
636            .iter()
637            .any(|g| g.goal_id.uuid == goal_id.uuid)
638        {
639            // Active goal → DEFER (Phase 237). `rclcpp_action` sends get_result
640            // right after acceptance and expects the reply only once the goal
641            // terminates; replying now with a non-terminal status makes the
642            // client treat an unfinished goal as done. Hold the request's
643            // correlation token; `complete_goal_raw` flushes it. The backend
644            // retains the reply token keyed by `sequence_number`.
645            if self
646                .pending_get_results
647                .push(PendingGetResult {
648                    goal_id,
649                    sequence_number,
650                })
651                .is_err()
652            {
653                // Table full — fail loud rather than silently strand the
654                // requester (caller surfaces it; the request is not re-queued).
655                return Err(NodeError::BufferTooSmall);
656            }
657        } else {
658            // Unknown goal → reply immediately with UNKNOWN + default result.
659            let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
660                .map_err(|_| NodeError::BufferTooSmall)?;
661            writer
662                .write_i8(GoalStatus::Unknown as i8)
663                .map_err(|_| NodeError::Serialization)?;
664            writer.align(4).map_err(|_| NodeError::Serialization)?;
665            let pos = writer.position();
666            if pos + default_result_cdr.len() > GOAL_BUF {
667                return Err(NodeError::BufferTooSmall);
668            }
669            self.goal_buffer[pos..pos + default_result_cdr.len()]
670                .copy_from_slice(default_result_cdr);
671            let reply_len = pos + default_result_cdr.len();
672
673            self.get_result_server
674                .send_reply(sequence_number, &self.goal_buffer[..reply_len])
675                .map_err(|_| NodeError::ServiceReplyFailed)?;
676        }
677
678        Ok(Some(goal_id))
679    }
680
681    /// Build + send a `get_result` reply — `[status i8][align(4)][result CDR]`
682    /// — copying the result bytes from the slab. Shared by the immediate
683    /// completed-goal path and the deferred flush in `complete_goal_raw`
684    /// (Phase 237). `goal_buffer` and `result_slab` are disjoint fields, so the
685    /// header build (into `goal_buffer`) and the slab copy don't alias.
686    fn reply_get_result_from_slab(
687        &mut self,
688        sequence_number: i64,
689        status: GoalStatus,
690        slab_offset: usize,
691        slab_len: usize,
692    ) -> Result<(), NodeError> {
693        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
694            .map_err(|_| NodeError::BufferTooSmall)?;
695        writer
696            .write_i8(status as i8)
697            .map_err(|_| NodeError::Serialization)?;
698        // Align to 4 after the status byte: the result CDR starts with a u32
699        // sequence length the reader will `align(4)` to.
700        writer.align(4).map_err(|_| NodeError::Serialization)?;
701        let pos = writer.position();
702        if pos + slab_len > GOAL_BUF {
703            return Err(NodeError::BufferTooSmall);
704        }
705        self.goal_buffer[pos..pos + slab_len]
706            .copy_from_slice(&self.result_slab[slab_offset..slab_offset + slab_len]);
707        let reply_len = pos + slab_len;
708        self.get_result_server
709            .send_reply(sequence_number, &self.goal_buffer[..reply_len])
710            .map_err(|_| NodeError::ServiceReplyFailed)
711    }
712
713    /// Get the number of active goals.
714    pub fn active_goal_count(&self) -> usize {
715        self.active_goals.len()
716    }
717
718    /// Get a reference to all active goals.
719    pub fn active_goals(&self) -> &[RawActiveGoal] {
720        &self.active_goals
721    }
722
723    /// Find the status of a goal (active or unknown).
724    pub fn find_goal_status(&self, goal_id: &GoalId) -> GoalStatus {
725        self.active_goals
726            .iter()
727            .find(|g| g.goal_id.uuid == goal_id.uuid)
728            .map(|g| g.status)
729            .unwrap_or(GoalStatus::Unknown)
730    }
731
732    /// Publish the current GoalStatusArray on the status topic.
733    pub fn publish_status_array(&self) -> Result<(), NodeError> {
734        let mut buf = [0u8; STATUS_ARRAY_BUF];
735        let mut writer =
736            CdrWriter::new_with_header(&mut buf).map_err(|_| NodeError::BufferTooSmall)?;
737
738        writer
739            .write_u32(self.active_goals.len() as u32)
740            .map_err(|_| NodeError::Serialization)?;
741
742        for goal in &self.active_goals {
743            let stamped = GoalStatusStamped::new(GoalInfo::with_id(goal.goal_id), goal.status);
744            stamped
745                .serialize(&mut writer)
746                .map_err(|_| NodeError::Serialization)?;
747        }
748
749        let len = writer.position();
750        self.status_publisher
751            .publish_raw(&buf[..len])
752            .map_err(|_| NodeError::Transport(TransportError::PublishFailed))
753    }
754}
755
756// ============================================================================
757// ActionClientCore
758// ============================================================================
759
760/// Type-agnostic action client core handling the raw-bytes protocol.
761///
762/// The typed [`ActionClient`](super::handles::ActionClient) wraps this
763/// and adds serialization/deserialization at the boundary.
764pub struct ActionClientCore<
765    const GOAL_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
766    const RESULT_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
767    const FEEDBACK_BUF: usize = { crate::config::DEFAULT_RX_BUF_SIZE },
768> {
769    pub(crate) send_goal_client: session::RmwServiceClient,
770    pub(crate) cancel_goal_client: session::RmwServiceClient,
771    pub(crate) get_result_client: session::RmwServiceClient,
772    pub(crate) feedback_subscriber: session::RmwSubscriber,
773    pub(crate) goal_buffer: [u8; GOAL_BUF],
774    pub(crate) result_buffer: [u8; RESULT_BUF],
775    pub(crate) feedback_buffer: [u8; FEEDBACK_BUF],
776    pub(crate) goal_counter: u64,
777    /// Phase 84.D3: per-sub-client in-flight flags. Each of the three
778    /// sub-clients (send_goal / cancel / get_result) is an independent
779    /// request/reply channel and tracks its own "unconsumed reply"
780    /// state. Cleared by `Promise::try_recv` on success.
781    pub(crate) in_flight_send_goal: bool,
782    pub(crate) in_flight_cancel: bool,
783    pub(crate) in_flight_get_result: bool,
784}
785
786impl<const GOAL_BUF: usize, const RESULT_BUF: usize, const FEEDBACK_BUF: usize>
787    ActionClientCore<GOAL_BUF, RESULT_BUF, FEEDBACK_BUF>
788{
789    /// Begin a server-discovery probe on the underlying `send_goal`
790    /// service client. Used by the C action-client wrapper
791    /// (`nros_action_client_wait_for_action_server`) to keep
792    /// `send_goal_client` private while still exposing the discovery
793    /// surface from `ServiceClientTrait`.
794    pub fn start_server_discovery(
795        &mut self,
796        timeout_ms: u32,
797    ) -> Result<(), nros_rmw::TransportError> {
798        use nros_rmw::ServiceClientTrait;
799        self.send_goal_client.start_server_discovery(timeout_ms)
800    }
801
802    /// Poll the in-flight server-discovery probe started by
803    /// [`start_server_discovery`](Self::start_server_discovery).
804    pub fn poll_server_discovery(&mut self) -> Result<Option<bool>, nros_rmw::TransportError> {
805        use nros_rmw::ServiceClientTrait;
806        self.send_goal_client.poll_server_discovery()
807    }
808
809    /// Latched "is server visible" snapshot. See
810    /// `ActionClient::action_server_is_ready` for the semantic.
811    pub fn is_server_ready(&self) -> bool {
812        use nros_rmw::ServiceClientTrait;
813        self.send_goal_client.is_server_ready()
814    }
815
816    /// Create a new action client core from the raw transport handles.
817    pub fn new(
818        send_goal_client: session::RmwServiceClient,
819        cancel_goal_client: session::RmwServiceClient,
820        get_result_client: session::RmwServiceClient,
821        feedback_subscriber: session::RmwSubscriber,
822    ) -> Self {
823        Self {
824            send_goal_client,
825            cancel_goal_client,
826            get_result_client,
827            feedback_subscriber,
828            goal_buffer: [0u8; GOAL_BUF],
829            result_buffer: [0u8; RESULT_BUF],
830            feedback_buffer: [0u8; FEEDBACK_BUF],
831            goal_counter: 0,
832            in_flight_send_goal: false,
833            in_flight_cancel: false,
834            in_flight_get_result: false,
835        }
836    }
837
838    /// Send a goal with raw CDR bytes. Returns the generated GoalId.
839    ///
840    /// The `goal_cdr` bytes are the serialized goal data (without GoalId framing).
841    /// This writes GoalId + goal_cdr into the goal buffer and sends the request.
842    ///
843    /// After calling, use `send_goal_client` and `result_buffer` to construct
844    /// a Promise for the acceptance reply.
845    pub fn send_goal_raw(&mut self, goal_cdr: &[u8]) -> Result<GoalId, NodeError> {
846        self.goal_counter += 1;
847        let mut goal_id = GoalId::default();
848        let counter_bytes = self.goal_counter.to_le_bytes();
849        goal_id.uuid[..8].copy_from_slice(&counter_bytes);
850
851        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
852            .map_err(|_| NodeError::BufferTooSmall)?;
853
854        write_goal_id(&mut writer, &goal_id)?;
855
856        // Copy raw goal CDR bytes after GoalId
857        let pos = writer.position();
858        if pos + goal_cdr.len() > GOAL_BUF {
859            return Err(NodeError::BufferTooSmall);
860        }
861        self.goal_buffer[pos..pos + goal_cdr.len()].copy_from_slice(goal_cdr);
862        let req_len = pos + goal_cdr.len();
863
864        self.send_goal_client
865            .send_request_raw(&self.goal_buffer[..req_len])
866            .map_err(|_| NodeError::ServiceRequestFailed)?;
867
868        Ok(goal_id)
869    }
870
871    /// Send a goal (blocking). Returns the GoalId and whether it was accepted.
872    ///
873    /// Uses the blocking `call_raw` path (like service calls) so the send_goal
874    /// z_get is fully consumed before returning. This avoids leaving a pending
875    /// get slot that could interfere with subsequent blocking calls.
876    pub fn send_goal_blocking(&mut self, goal_cdr: &[u8]) -> Result<(GoalId, bool), NodeError> {
877        self.goal_counter += 1;
878        let mut goal_id = GoalId::default();
879        let counter_bytes = self.goal_counter.to_le_bytes();
880        goal_id.uuid[..8].copy_from_slice(&counter_bytes);
881
882        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
883            .map_err(|_| NodeError::BufferTooSmall)?;
884
885        write_goal_id(&mut writer, &goal_id)?;
886
887        let pos = writer.position();
888        if pos + goal_cdr.len() > GOAL_BUF {
889            return Err(NodeError::BufferTooSmall);
890        }
891        self.goal_buffer[pos..pos + goal_cdr.len()].copy_from_slice(goal_cdr);
892        let req_len = pos + goal_cdr.len();
893
894        #[allow(deprecated)]
895        let len = self
896            .send_goal_client
897            .call_raw(&self.goal_buffer[..req_len], &mut self.result_buffer)
898            .map_err(|_| NodeError::ServiceRequestFailed)?;
899
900        // Reply CDR: header(4) + accepted(u8) + stamp(sec i32 + nanosec u32)
901        let accepted = len >= 5 && self.result_buffer[4] != 0;
902
903        Ok((goal_id, accepted))
904    }
905
906    /// Try to receive feedback (non-blocking, raw bytes).
907    ///
908    /// Returns the GoalId and total data length. The full CDR data
909    /// (including GoalId) is in `feedback_buffer`.
910    pub fn try_recv_feedback_raw(&mut self) -> Result<Option<(GoalId, usize)>, NodeError> {
911        let data = self
912            .feedback_subscriber
913            .try_recv_raw(&mut self.feedback_buffer)
914            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
915
916        let len = match data {
917            Some(len) => len,
918            None => return Ok(None),
919        };
920
921        let mut reader = CdrReader::new_with_header(&self.feedback_buffer[..len])
922            .map_err(|_| NodeError::Transport(TransportError::DeserializationError))?;
923
924        let goal_id = read_goal_id(&mut reader)?;
925
926        Ok(Some((goal_id, len)))
927    }
928
929    /// Cancel a goal (non-blocking). Sends the cancel request.
930    ///
931    /// After calling, use `cancel_goal_client` and `result_buffer` to construct
932    /// a Promise for the cancel response.
933    pub fn send_cancel_request(&mut self, goal_id: &GoalId) -> Result<(), NodeError> {
934        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
935            .map_err(|_| NodeError::BufferTooSmall)?;
936
937        write_goal_id(&mut writer, goal_id)?;
938        writer.write_i32(0).map_err(|_| NodeError::Serialization)?;
939        writer.write_u32(0).map_err(|_| NodeError::Serialization)?;
940
941        let req_len = writer.position();
942
943        self.cancel_goal_client
944            .send_request_raw(&self.goal_buffer[..req_len])
945            .map_err(|_| NodeError::ServiceRequestFailed)
946    }
947
948    /// Send a get_result request.
949    ///
950    /// After calling, use `get_result_client` and `result_buffer` to construct
951    /// a Promise for the result response.
952    pub fn send_get_result_request(&mut self, goal_id: &GoalId) -> Result<(), NodeError> {
953        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
954            .map_err(|_| NodeError::BufferTooSmall)?;
955
956        write_goal_id(&mut writer, goal_id)?;
957
958        let req_len = writer.position();
959
960        self.get_result_client
961            .send_request_raw(&self.goal_buffer[..req_len])
962            .map_err(|_| NodeError::ServiceRequestFailed)
963    }
964
965    /// Phase 122.3.c.6.e — register a `Waker` that fires when the
966    /// send_goal RPC reply lands.
967    pub fn register_goal_response_waker(&self, waker: &core::task::Waker) {
968        use nros_rmw::ServiceClientTrait;
969        self.send_goal_client.register_waker(waker);
970    }
971
972    /// Phase 122.3.c.6.e — register a `Waker` for cancel-RPC replies.
973    pub fn register_cancel_response_waker(&self, waker: &core::task::Waker) {
974        use nros_rmw::ServiceClientTrait;
975        self.cancel_goal_client.register_waker(waker);
976    }
977
978    /// Phase 122.3.c.6.e — register a `Waker` for get_result replies.
979    pub fn register_result_waker(&self, waker: &core::task::Waker) {
980        use nros_rmw::ServiceClientTrait;
981        self.get_result_client.register_waker(waker);
982    }
983
984    /// Phase 122.3.c.6.e — register a `Waker` for feedback messages.
985    pub fn register_feedback_waker(&self, waker: &core::task::Waker) {
986        use nros_rmw::Subscriber;
987        self.feedback_subscriber.register_waker(waker);
988    }
989
990    /// Phase 122.3.c.6.c — poll for a cancel reply (non-blocking,
991    /// raw bytes). Returns `Ok(Some(len))` when a reply landed; the
992    /// CDR payload is in `result_buffer_ref()[..len]`. Reply layout
993    /// is action_msgs/srv/CancelGoal_Response wire CDR.
994    pub fn try_recv_cancel_reply(&mut self) -> Result<Option<usize>, NodeError> {
995        match self
996            .cancel_goal_client
997            .try_recv_reply_raw(&mut self.result_buffer)
998        {
999            Ok(opt) => Ok(opt),
1000            Err(TransportError::NoData) => Ok(None),
1001            Err(_) => Err(NodeError::Transport(TransportError::DeserializationError)),
1002        }
1003    }
1004
1005    /// Poll for a get_result reply (non-blocking, raw bytes).
1006    ///
1007    /// Returns `Ok(Some(total_len))` if a reply arrived (data in result buffer),
1008    /// `Ok(None)` if no reply yet.
1009    ///
1010    /// After receiving, use [`result_buffer_ref()`](Self::result_buffer_ref)
1011    /// to access the raw CDR data. The layout is: CDR header (4) + status
1012    /// byte (1) + result data.
1013    pub fn try_recv_get_result_reply(&mut self) -> Result<Option<usize>, NodeError> {
1014        // Phase 120: NoData == steady-state polling; map to Ok(None).
1015        match self
1016            .get_result_client
1017            .try_recv_reply_raw(&mut self.result_buffer)
1018        {
1019            Ok(opt) => Ok(opt),
1020            Err(TransportError::NoData) => Ok(None),
1021            Err(_) => Err(NodeError::Transport(TransportError::DeserializationError)),
1022        }
1023    }
1024
1025    /// Poll for the send_goal acceptance reply (non-blocking, raw bytes).
1026    ///
1027    /// Returns `Ok(Some(total_len))` if a reply arrived (data in result buffer),
1028    /// `Ok(None)` if no reply yet.
1029    ///
1030    /// The reply CDR contains: header (4) + accepted (u8) + stamp (i32 + u32).
1031    pub fn try_recv_send_goal_reply(&mut self) -> Result<Option<usize>, NodeError> {
1032        // Phase 120: NoData == steady-state polling; map to Ok(None).
1033        match self
1034            .send_goal_client
1035            .try_recv_reply_raw(&mut self.result_buffer)
1036        {
1037            Ok(opt) => Ok(opt),
1038            Err(TransportError::NoData) => Ok(None),
1039            Err(_) => Err(NodeError::Transport(TransportError::DeserializationError)),
1040        }
1041    }
1042
1043    /// Blocking get_result request — sends the request and blocks until
1044    /// a reply is received (or timeout).
1045    ///
1046    /// Returns the total reply length. Data is stored in `result_buffer`.
1047    /// This uses the blocking `call_raw` path which is more reliable on
1048    /// platforms with separate I/O tasks (e.g. FreeRTOS).
1049    pub fn get_result_blocking(&mut self, goal_id: &GoalId) -> Result<usize, NodeError> {
1050        let mut writer = CdrWriter::new_with_header(&mut self.goal_buffer)
1051            .map_err(|_| NodeError::BufferTooSmall)?;
1052
1053        write_goal_id(&mut writer, goal_id)?;
1054
1055        let req_len = writer.position();
1056
1057        #[allow(deprecated)]
1058        let len = self
1059            .get_result_client
1060            .call_raw(&self.goal_buffer[..req_len], &mut self.result_buffer)
1061            .map_err(|_| NodeError::ServiceRequestFailed)?;
1062
1063        Ok(len)
1064    }
1065
1066    /// Read-only access to the result buffer (after polling a reply).
1067    pub fn result_buffer_ref(&self) -> &[u8] {
1068        &self.result_buffer
1069    }
1070
1071    /// Read-only access to the feedback buffer (after receiving feedback).
1072    pub fn feedback_buffer_ref(&self) -> &[u8] {
1073        &self.feedback_buffer
1074    }
1075
1076    /// Get the current goal counter (used to reconstruct the last goal ID).
1077    pub fn goal_counter(&self) -> u64 {
1078        self.goal_counter
1079    }
1080}