"""Python binding for the Enpose 6-DoF tracking API.
A thin, Pythonic wrapper over the ``enpose_api`` C ABI using `cffi
<https://cffi.readthedocs.io>`_ in ABI (``dlopen``) mode: it loads the prebuilt
shared library at run time, so no C compiler or build step is needed — only the
library and ``cffi`` (``pip install cffi``).
It mirrors the other bindings and covers the full client workflow:
* :func:`discover` finds Enpose devices on the local network.
* :class:`PoseStream` connects to a device and yields live :class:`MarkerPose`
updates.
Locating the shared library: by default the loader searches a ``lib/`` directory
alongside the unpacked SDK and then the system loader path (``LD_LIBRARY_PATH``
etc.). Set ``ENPOSE_API_LIB`` to an explicit path to override.
"""
from __future__ import annotations
import os
import pathlib
import sys
from dataclasses import dataclass
from cffi import FFI
__all__ = ["Error", "DeviceInfo", "MarkerPose", "PoseStream", "discover"]
# C ABI declarations — kept in sync with c/include/enpose_api.h.
_CDEF = """
typedef enum {
ENPOSE_OK = 0,
ENPOSE_ERR_INVALID_ARG = -1,
ENPOSE_ERR_IO = -2,
ENPOSE_ERR_PANIC = -3
} EnposeStatus;
typedef struct EnposeDeviceInfo {
char ip[46];
uint32_t serial;
bool compatible;
} EnposeDeviceInfo;
typedef struct EnposeMarkerPose {
uint64_t timestamp;
uint16_t marker_id;
double x;
double y;
double z;
double rotation[9];
double position_rmse;
double rotation_rmse;
uint8_t sensors;
} EnposeMarkerPose;
typedef struct EnposePoseStream EnposePoseStream;
EnposeStatus enpose_discover(EnposeDeviceInfo **out_devices, size_t *out_count);
void enpose_device_info_array_free(EnposeDeviceInfo *devices, size_t count);
EnposePoseStream *enpose_pose_stream_connect(const char *ip, bool create_thread);
EnposeStatus enpose_pose_stream_receive(EnposePoseStream *stream,
bool block,
EnposeMarkerPose **out_poses,
size_t *out_count);
void enpose_marker_pose_array_free(EnposeMarkerPose *poses, size_t count);
void enpose_pose_stream_free(EnposePoseStream *stream);
"""
# Human-readable text for the non-OK status codes.
_STATUS_TEXT = {
-1: "invalid argument",
-2: "I/O error",
-3: "internal error",
}
[docs]
class Error(RuntimeError):
"""Raised when a call into the Enpose library fails."""
def _library_filename() -> str:
if sys.platform == "darwin":
return "libenpose_api.dylib"
if os.name == "nt":
return "enpose_api.dll"
return "libenpose_api.so"
def _candidate_paths(filename: str):
"""Yield library locations to try, most specific first."""
override = os.environ.get("ENPOSE_API_LIB")
if override:
yield override
# In the SDK layout the library sits in a `lib/` directory above this
# package; check every ancestor for one.
here = pathlib.Path(__file__).resolve()
for parent in here.parents:
yield str(parent / "lib" / filename)
# Fall back to the bare name, resolved via the system loader path.
yield filename
def _load_library():
ffi = FFI()
ffi.cdef(_CDEF)
filename = _library_filename()
attempts = []
for path in _candidate_paths(filename):
try:
return ffi, ffi.dlopen(path)
except OSError as exc:
attempts.append(f"{path}: {exc}")
tried = "\n ".join(attempts)
raise Error(
f"could not load the enpose_api shared library ({filename}); tried:\n "
f"{tried}\nSet ENPOSE_API_LIB to its path, or add the SDK's lib/ "
"directory to your loader path (e.g. LD_LIBRARY_PATH)."
)
_ffi, _lib = _load_library()
[docs]
@dataclass(frozen=True)
class DeviceInfo:
"""One device discovered on the local network."""
ip: str
"""Device IPv4 address."""
serial: int
"""Factory serial number."""
compatible: bool
"""True if the device's protocol version matches this binding."""
[docs]
@dataclass(frozen=True)
class MarkerPose:
"""Pose of one tracked marker in world coordinates.
Units: ``x``/``y``/``z`` and ``position_rmse`` are in meters;
``rotation_rmse`` is in radians.
"""
timestamp: int
"""Microseconds since the device started."""
marker_id: int
"""Marker identifier."""
x: float
"""World-space X position in meters (of the first model emitter)."""
y: float
"""World-space Y position in meters."""
z: float
"""World-space Z position in meters."""
rotation: tuple # 9 floats, row-major 3x3, model frame -> world
"""Row-major 3x3 rotation (9 values), model frame to world."""
position_rmse: float
"""RMS position error in meters (the same units as x/y/z)."""
rotation_rmse: float
"""RMS rotation error (radians, axis-angle magnitude)."""
sensors: int
"""Number of sensors that contributed to this pose."""
def _raise_status(func: str, status: int) -> None:
if status != 0:
text = _STATUS_TEXT.get(int(status), f"status {int(status)}")
raise Error(f"{func} failed: {text}")
[docs]
def discover() -> list:
"""Discover Enpose devices on the local network.
Returns a list of :class:`DeviceInfo` (empty if none are found). Raises
:class:`Error` on an I/O failure.
"""
out_devices = _ffi.new("EnposeDeviceInfo **")
out_count = _ffi.new("size_t *")
_raise_status("enpose_discover", _lib.enpose_discover(out_devices, out_count))
devices = out_devices[0]
count = out_count[0]
try:
return [
DeviceInfo(
ip=_ffi.string(devices[i].ip).decode("utf-8"),
serial=int(devices[i].serial),
compatible=bool(devices[i].compatible),
)
for i in range(count)
]
finally:
_lib.enpose_device_info_array_free(devices, count)
def _marker_pose(c) -> MarkerPose:
return MarkerPose(
timestamp=int(c.timestamp),
marker_id=int(c.marker_id),
x=float(c.x),
y=float(c.y),
z=float(c.z),
rotation=tuple(float(c.rotation[i]) for i in range(9)),
position_rmse=float(c.position_rmse),
rotation_rmse=float(c.rotation_rmse),
sensors=int(c.sensors),
)
[docs]
class PoseStream:
"""A live pose stream from one device.
Closes itself when used as a context manager or garbage-collected; call
:meth:`close` to release it eagerly.
"""
def __init__(self, target, create_thread: bool = True):
"""Connect to ``target`` — a :class:`DeviceInfo` or an IPv4 string.
The Enpose API is IPv4-only. When ``create_thread`` is true (the
default and preferred mode), a background thread receives and buffers
poses. Raises :class:`Error` on failure.
"""
ip = target.ip if isinstance(target, DeviceInfo) else str(target)
handle = _lib.enpose_pose_stream_connect(ip.encode("utf-8"), create_thread)
if handle == _ffi.NULL:
raise Error(f"failed to connect pose stream to {ip}")
self._handle = handle
[docs]
def receive(self, block: bool = False) -> list:
"""Return poses received from the stream.
When ``block`` is false (the default), returns the poses that have
arrived since the previous call (empty if none) without waiting. When
``block`` is true, waits for at least one pose update, up to a 3-second
timeout — so the result is still empty if none arrives in that window.
Raises :class:`Error` on an unrecoverable failure.
"""
if self._handle is None:
raise Error("pose stream is closed")
out_poses = _ffi.new("EnposeMarkerPose **")
out_count = _ffi.new("size_t *")
_raise_status(
"enpose_pose_stream_receive",
_lib.enpose_pose_stream_receive(self._handle, block, out_poses, out_count),
)
poses = out_poses[0]
count = out_count[0]
try:
return [_marker_pose(poses[i]) for i in range(count)]
finally:
_lib.enpose_marker_pose_array_free(poses, count)
[docs]
def close(self) -> None:
"""Disconnect and release the stream. Idempotent."""
handle, self._handle = getattr(self, "_handle", None), None
if handle is not None:
_lib.enpose_pose_stream_free(handle)
def __enter__(self) -> "PoseStream":
return self
def __exit__(self, *_exc) -> None:
self.close()
def __del__(self) -> None:
self.close()