enpose_api/
posestream.rs

1//! Client-side pose streaming from a single Enpose tracker device.
2//!
3//! A [`PoseStream`] delivers a live stream of [`MarkerPose`] updates from
4//! one device. Construct it from a [`DeviceInfo`] returned by
5//! [`crate::DeviceDiscovery`] or directly from an [`IpAddr`], then poll it
6//! with [`PoseStream::receive_pose_updates`] to get the poses that have
7//! arrived since your last call.
8//!
9//! The `create_thread` constructor argument selects between two modes:
10//!
11//! * **Threaded** (`true`) — a background thread continuously receives and
12//!   buffers incoming poses, so none are missed regardless of how often you
13//!   poll. [`PoseStream::receive_pose_updates`] returns the buffered poses.
14//!   Use this when you cannot guarantee a regular polling cadence.
15//!
16//! * **Single-threaded** (`false`) — no thread is spawned; poses are
17//!   collected when you call [`PoseStream::receive_pose_updates`]. Use this
18//!   when you poll on your own regular cadence and want the stream to use
19//!   only your thread. Poll often enough that updates are not missed between
20//!   calls.
21//!
22//! In either mode, [`PoseStream::receive_pose_updates`] takes a `block` flag:
23//! pass `false` to return whatever has arrived (possibly nothing) without
24//! waiting, or `true` to wait for at least one pose update. A blocking call
25//! waits at most 3 seconds, then returns an empty result if none arrived.
26//!
27//! The connection is closed automatically when the [`PoseStream`] is
28//! dropped.
29
30use std::io;
31use std::net::{IpAddr, SocketAddr, UdpSocket};
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::{Arc, Condvar, Mutex};
34use std::thread::{self, JoinHandle};
35use std::time::{Duration, Instant};
36
37use crate::devicediscovery::DeviceInfo;
38use crate::protocol::{
39    PACKET_SIZE, PKT_TYPE_POSE_DATA, POSE_KEEPALIVE_INTERVAL_SECS, POSE_PORT,
40    encode_pose_subscribe, encode_pose_unsubscribe, parse_packet,
41};
42use crate::marker_pose::MarkerPose;
43
44/// Read timeout used by the background receive thread. Bounds how long the
45/// thread blocks on `recv` so it can wake up to send keep-alives and to
46/// notice a stop request promptly.
47const THREAD_RECV_TIMEOUT: Duration = Duration::from_millis(200);
48
49/// Maximum time a blocking [`PoseStream::receive_pose_updates`] call (one made
50/// with `block = true`) waits for a pose update before giving up and returning
51/// an empty result.
52const BLOCK_TIMEOUT: Duration = Duration::from_secs(3);
53
54/// Receive buffer size. Large enough for a full pose datagram (fixed
55/// header plus the MessagePack-encoded poses of one frame).
56const RECV_BUF_SIZE: usize = 65536;
57
58/// A live pose stream from one Enpose tracker device.
59///
60/// Dropping a `PoseStream` automatically disconnects it (see
61/// [`PoseStream::disconnect`]).
62pub struct PoseStream {
63    /// Socket connected to the device's [`POSE_PORT`]. Shared with the
64    /// background thread in threaded mode.
65    socket: Arc<UdpSocket>,
66    /// `false` once [`Self::disconnect`] has run, so `Drop` does not send a
67    /// second disconnect packet or join an already-joined thread.
68    connected: bool,
69    /// Background receiver state, present only in threaded mode.
70    thread: Option<ThreadState>,
71    /// When the last keep-alive was sent, used only in single-threaded mode
72    /// to decide when the next one is due.
73    last_keepalive: Instant,
74}
75
76/// State owned by the background receive thread in threaded mode.
77struct ThreadState {
78    /// Poses received since the last [`PoseStream::receive_pose_updates`]
79    /// call, in arrival order, paired with a condvar the receiver signals
80    /// whenever it appends — so a blocking receive can wait for data.
81    buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
82    /// Set to request the thread to exit.
83    stop: Arc<AtomicBool>,
84    /// Handle to join the thread on disconnect.
85    handle: Option<JoinHandle<()>>,
86}
87
88impl PoseStream {
89    /// Connect a pose stream to the device at `ip`.
90    ///
91    /// `ip` must be IPv4 — the Enpose API is IPv4-only. An IPv6 address is
92    /// rejected with [`io::ErrorKind::Unsupported`].
93    ///
94    /// When `create_thread` is `true`, spawns the background receiver thread
95    /// described in the [module docs](crate::posestream).
96    ///
97    /// # Errors
98    ///
99    /// Returns an [`io::Error`] if `ip` is not IPv4, or if the connection to
100    /// the device cannot be established.
101    pub fn from_ip(ip: IpAddr, create_thread: bool) -> io::Result<Self> {
102        if ip.is_ipv6() {
103            return Err(io::Error::new(
104                io::ErrorKind::Unsupported,
105                "the Enpose API supports IPv4 only",
106            ));
107        }
108        Self::connect_to(SocketAddr::new(ip, POSE_PORT), create_thread)
109    }
110
111    /// Connect a pose stream to a device discovered via
112    /// [`crate::DeviceDiscovery`].
113    ///
114    /// Convenience wrapper around [`Self::from_ip`] using [`DeviceInfo::ip`].
115    pub fn from_device(device: &DeviceInfo, create_thread: bool) -> io::Result<Self> {
116        Self::from_ip(device.ip, create_thread)
117    }
118
119    /// Test constructor that targets a full socket address instead of the
120    /// fixed [`POSE_PORT`], so unit tests can run a fake device on an
121    /// ephemeral loopback port.
122    #[cfg(test)]
123    pub(crate) fn with_target(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
124        Self::connect_to(addr, create_thread)
125    }
126
127    /// Bind an ephemeral local socket, connect it to `addr`, send the
128    /// initial subscribe packet, and optionally spawn the receiver thread.
129    fn connect_to(addr: SocketAddr, create_thread: bool) -> io::Result<Self> {
130        let socket = UdpSocket::bind((IpAddr::from([0, 0, 0, 0]), 0))?;
131        socket.connect(addr)?;
132        let socket = Arc::new(socket);
133
134        // Subscribe immediately so poses start flowing without waiting for
135        // the first keep-alive interval.
136        socket.send(&encode_pose_subscribe())?;
137
138        let thread = if create_thread {
139            Some(Self::spawn_receiver(socket.clone()))
140        } else {
141            None
142        };
143
144        Ok(Self {
145            socket,
146            connected: true,
147            thread,
148            last_keepalive: Instant::now(),
149        })
150    }
151
152    /// Return the marker poses received from the stream.
153    ///
154    /// When `block` is `false`, returns all poses that have arrived since the
155    /// previous call, or an empty vector if none have — it never waits. When
156    /// `block` is `true`, it waits for a pose update and returns the poses
157    /// gathered so far; the wait is bounded by a 3-second timeout, so a
158    /// blocking call still returns an empty vector if no update arrives within
159    /// that window.
160    ///
161    /// Call it repeatedly to keep receiving updates. In threaded mode the
162    /// poses are those gathered by the background thread.
163    ///
164    /// # Errors
165    ///
166    /// Returns an [`io::Error`] only for an unrecoverable communication
167    /// failure. In threaded mode this never returns an error.
168    pub fn receive_pose_updates(&mut self, block: bool) -> io::Result<Vec<MarkerPose>> {
169        if let Some(thread) = &self.thread {
170            let (buffer, available) = &*thread.buffered;
171            let mut buffer = buffer.lock().unwrap();
172            if block {
173                // Wait until the background thread buffers a pose, or until the
174                // timeout elapses (handles spurious wakeups internally).
175                (buffer, _) = available
176                    .wait_timeout_while(buffer, BLOCK_TIMEOUT, |b| b.is_empty())
177                    .unwrap();
178            }
179            return Ok(std::mem::take(&mut *buffer));
180        }
181
182        // Single-threaded: send keep-alives and drain the socket ourselves.
183        self.send_keepalive_if_due()?;
184
185        let mut poses = Vec::new();
186        self.drain_available(&mut poses)?;
187        if block && poses.is_empty() {
188            self.block_for_pose(&mut poses)?;
189            self.drain_available(&mut poses)?;
190        }
191        Ok(poses)
192    }
193
194    /// Send a keep-alive subscribe packet if the keep-alive interval has
195    /// elapsed since the previous one. Single-threaded mode only.
196    fn send_keepalive_if_due(&mut self) -> io::Result<()> {
197        if self.last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
198            self.socket.send(&encode_pose_subscribe())?;
199            self.last_keepalive = Instant::now();
200        }
201        Ok(())
202    }
203
204    /// Drain every pose datagram already waiting on the socket into `poses`,
205    /// without blocking.
206    fn drain_available(&self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
207        self.socket.set_nonblocking(true)?;
208        let mut buf = [0u8; RECV_BUF_SIZE];
209        loop {
210            match self.socket.recv(&mut buf) {
211                Ok(n) => {
212                    if let Some(batch) = parse_pose_packet(&buf[..n]) {
213                        poses.extend(batch);
214                    }
215                }
216                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
217                Err(e) => return Err(e),
218            }
219        }
220        Ok(())
221    }
222
223    /// Block until at least one pose datagram arrives or [`BLOCK_TIMEOUT`]
224    /// elapses, appending any poses to `poses` while keeping the connection
225    /// alive. Returns with `poses` still empty if the timeout is reached.
226    fn block_for_pose(&mut self, poses: &mut Vec<MarkerPose>) -> io::Result<()> {
227        self.socket.set_nonblocking(false)?;
228        let deadline = Instant::now() + BLOCK_TIMEOUT;
229        let mut buf = [0u8; RECV_BUF_SIZE];
230        while poses.is_empty() {
231            // Stop once the deadline passes. Otherwise wake at least once per
232            // keep-alive tick, but never wait past the deadline.
233            let remaining = match deadline.checked_duration_since(Instant::now()) {
234                Some(d) if !d.is_zero() => d,
235                _ => break,
236            };
237            self.socket.set_read_timeout(Some(remaining.min(THREAD_RECV_TIMEOUT)))?;
238            match self.socket.recv(&mut buf) {
239                Ok(n) => {
240                    if let Some(batch) = parse_pose_packet(&buf[..n]) {
241                        poses.extend(batch);
242                    }
243                }
244                // The read timeout fired (reported as WouldBlock or TimedOut
245                // depending on the platform): no data yet, so refresh the
246                // keep-alive and keep waiting until the deadline.
247                Err(e)
248                    if e.kind() == io::ErrorKind::WouldBlock
249                        || e.kind() == io::ErrorKind::TimedOut =>
250                {
251                    self.send_keepalive_if_due()?;
252                }
253                Err(e) => return Err(e),
254            }
255        }
256        Ok(())
257    }
258
259    /// Disconnect the stream, closing the connection to the device.
260    ///
261    /// Idempotent, and called automatically when the [`PoseStream`] is
262    /// dropped, so you only need to call it explicitly to disconnect early.
263    pub fn disconnect(&mut self) {
264        if !self.connected {
265            return;
266        }
267        self.connected = false;
268
269        if let Some(thread) = &mut self.thread {
270            thread.stop.store(true, Ordering::Relaxed);
271            if let Some(handle) = thread.handle.take() {
272                let _ = handle.join();
273            }
274        }
275
276        // Best-effort: the device times the connection out regardless.
277        let _ = self.socket.send(&encode_pose_unsubscribe());
278    }
279
280    /// Spawn the background receiver thread and return its shared state.
281    fn spawn_receiver(socket: Arc<UdpSocket>) -> ThreadState {
282        let buffered = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
283        let stop = Arc::new(AtomicBool::new(false));
284        let thread_buffered = buffered.clone();
285        let thread_stop = stop.clone();
286        let handle =
287            thread::spawn(move || Self::receiver_thread(socket, thread_buffered, thread_stop));
288        ThreadState {
289            buffered,
290            stop,
291            handle: Some(handle),
292        }
293    }
294
295    /// Background thread body: periodically sends keep-alives and buffers
296    /// incoming pose datagrams until asked to stop, signalling the condvar so
297    /// a blocking receive wakes when new poses land.
298    fn receiver_thread(
299        socket: Arc<UdpSocket>,
300        buffered: Arc<(Mutex<Vec<MarkerPose>>, Condvar)>,
301        stop: Arc<AtomicBool>,
302    ) {
303        let _ = socket.set_read_timeout(Some(THREAD_RECV_TIMEOUT));
304        let mut last_keepalive = Instant::now();
305        let mut buf = [0u8; RECV_BUF_SIZE];
306        let (buffer, available) = &*buffered;
307
308        while !stop.load(Ordering::Relaxed) {
309            if last_keepalive.elapsed() >= Duration::from_secs(POSE_KEEPALIVE_INTERVAL_SECS) {
310                // Best-effort: a transient send error should not kill the
311                // stream; the next tick retries.
312                let _ = socket.send(&encode_pose_subscribe());
313                last_keepalive = Instant::now();
314            }
315
316            // A recv error is either the expected read timeout (which bounds
317            // the loop) or a transient failure retried next iteration; only a
318            // successful read carries poses. Append them and wake any blocking
319            // receive waiting on the condvar.
320            if let Ok(n) = socket.recv(&mut buf)
321                && let Some(batch) = parse_pose_packet(&buf[..n])
322                && !batch.is_empty()
323            {
324                buffer.lock().unwrap().extend(batch);
325                available.notify_one();
326            }
327        }
328    }
329}
330
331impl Drop for PoseStream {
332    fn drop(&mut self) {
333        self.disconnect();
334    }
335}
336
337/// Parse a [`PKT_TYPE_POSE_DATA`] datagram into its poses, or return
338/// `None` if the buffer is not a valid pose-data packet (wrong magic,
339/// wrong type, or undecodable payload).
340fn parse_pose_packet(data: &[u8]) -> Option<Vec<MarkerPose>> {
341    let parsed = parse_packet(data)?;
342    if parsed.pkt_type != PKT_TYPE_POSE_DATA {
343        return None;
344    }
345    rmp_serde::from_slice::<Vec<MarkerPose>>(&data[PACKET_SIZE..]).ok()
346}
347
348#[cfg(test)]
349#[path = "posestream_tests.rs"]
350mod tests;