robotmq is a lightweight, high-performance message queue for robotics Python programs. An independent C++ thread runs all message passing in background so your Python main thread never blocks.
Install:
pip install robotmqKey features:
- ~2 GB/s local throughput via shared memory; ~20 MB/s over the network via ZeroMQ
- No message schema — any
bytes-serializable data works (numpy arrays, dicts, etc.) - Automatic message expiration prevents unbounded memory growth
- No dependency: self-contained in a single pip package
Common use cases:
- Peripheral readout (camera, spacemouse, etc.): run a dedicated reader process with an
RMQServer, thenpeek/popdata from your main program viaRMQClient. - Detached policy inference: run your neural network on a GPU server with an
RMQServer, send observations and receive actions from the robot controller viaRMQClient.
This package is developed during the following research project. If you find it useful, please cite our corresponding paper:
@misc{gao2026gatedmemorypolicy,
title = {Gated Memory Policy},
author = {Yihuai Gao and Jinyun Liu and Shuang Li and Shuran Song},
year = {2026},
eprint = {2604.18933},
archivePrefix = {arXiv},
primaryClass = {cs.RO},
url = {https://arxiv.org/abs/2604.18933},
}- Overview
- Motivation: Why robotmq?
- Architecture & Design Patterns
- API Reference
- Usage Patterns
- Advantages Over Existing Methods
- Performance Characteristics
- Installation
- Troubleshooting
robotmq is a lightweight, high-performance message queue designed specifically for robotics applications in Python. It provides a C++ core (exposed via pybind11) that handles all message passing in background threads, allowing Python programs to exchange data asynchronously without blocking the main thread.
The library bridges two common robotics needs:
- Streaming sensor data (cameras, force sensors, spacemouse) from producer processes to consumer processes.
- Offloading computation (neural network inference, motion planning) to separate processes or remote machines.
Robotics Python programs frequently need to share data between processes — a camera reader feeding frames to a control loop, or a robot controller sending observations to a GPU server for policy inference. The existing solutions each have significant drawbacks:
| Method | Problem |
|---|---|
Python multiprocessing.Queue |
GIL contention, pickle overhead, no network support, no shared memory for large arrays |
| ROS / ROS2 | Heavyweight dependency, rigid message types (.msg files), complex setup, overkill for simple data passing |
| Raw ZeroMQ in Python | Blocks the Python thread during send/recv, no built-in topic management or message expiration |
| Redis / RabbitMQ | External service dependency, serialization overhead, not optimized for large binary data (images, point clouds) |
Python shared_memory |
No built-in synchronization, no network fallback, manual lifecycle management |
robotmq solves these problems with a focused design:
- C++ background threads handle all network I/O — Python never blocks on send/recv.
- Shared memory transport for local communication achieves ~2 GB/s throughput for large numpy arrays.
- ZeroMQ transport for network communication provides ~20 MB/s across machines with zero configuration.
- No message schema required — any data that can be serialized to
bytesworks. The includedserialize()/deserialize()utilities handle nested structures of numpy arrays without version conflicts. - Automatic message expiration — old messages are discarded based on a configurable time window, preventing unbounded memory growth.
- Minimal API surface — two classes (
RMQServer,RMQClient) and a handful of methods cover all use cases.
┌─────────────────────┐ ┌─────────────────────┐
│ RMQServer │ │ RMQClient │
│ │ │ │
│ ┌───────────────┐ │ ZeroMQ │ │
│ │ Background │◄─┼──────────┼─── peek_data() │
│ │ Thread │──┼──────────┼──► returns data │
│ │ (REP socket) │ │ TCP/IPC │ │
│ └───────┬───────┘ │ │ │
│ │ │ │ │
│ ┌───────▼───────┐ │ │ │
│ │ DataTopic │ │ │ │
│ │ "sensor_A" │ │ │ │
│ ├───────────────┤ │ │ │
│ │ DataTopic │ │ Shared │ │
│ │ "camera" │◄─┼─ Memory──┼─── (direct read) │
│ ├───────────────┤ │ │ │
│ │ DataTopic │ │ │ │
│ │ "inference" │ │ │ │
│ └───────────────┘ │ │ │
└─────────────────────┘ └─────────────────────┘
The server owns all topics and data. It runs a background thread that listens for client requests on a ZeroMQ REP socket. The client sends requests on a REQ socket. This REQ-REP pattern ensures reliable, ordered communication.
Key design decisions:
- The server's background thread is a C++
std::thread, completely independent of Python's GIL. The GIL is only acquired briefly to check for Python signals (e.g.,KeyboardInterrupt). - Each topic is a
std::dequeof timestamped message pointers, providing O(1) push/pop from both ends. - Thread safety is guaranteed by
std::mutexlocks on the topic map, request queue, and reply channel.
robotmq supports two transport mechanisms per topic, chosen at topic creation time:
server.add_topic("sensor_data", message_remaining_time_s=10.0)- Data is stored in the server process's heap memory.
- Clients receive data through the ZeroMQ REP-REQ channel.
- Suitable for small-to-medium messages or cross-network communication.
server.add_shared_memory_topic("camera_frames", message_remaining_time_s=5.0, shared_memory_size_gb=1.0)- Data is stored in a ring buffer in
/dev/shm(POSIX shared memory). - The ZeroMQ channel only transfers metadata (offset, size) — the actual data is read directly from shared memory by the client.
- A
pthread_mutexin shared memory provides cross-process synchronization. - The ring buffer automatically wraps around, overwriting the oldest data when full.
- SHM path format:
rmq_{username}_{pid}_{server_name}_{topic_name}
This dual approach lets you use the optimal transport per topic: shared memory for large, high-frequency local data (camera images, point clouds), and ZeroMQ for smaller data or cross-machine communication.
Each topic is an independent message queue with:
- Time-based expiration: Messages older than
message_remaining_time_sare automatically pruned on every insertion. - Flexible retrieval: Clients can peek (non-destructive read) or pop (destructive read) messages with flexible indexing:
n > 0: Firstnmessages (oldest first)n < 0: Last|n|messages (most recent, preserving chronological order)n = 0: All messages
This is purpose-built for robotics: a control loop typically wants the latest sensor reading (n=-1), while a logger might want all buffered readings (n=0).
A single server supports both patterns simultaneously:
-
Publish-Subscribe (Asynchronous): Server calls
put_data()to publish; clients callpeek_data()orpop_data()to consume. No coordination needed — the server writes at its own rate, clients read at theirs. -
Request-Reply (Synchronous): Client calls
request_with_data()to send a request and block until a reply arrives. Server callswait_for_request()+reply_request()to handle it. Built-in deduplication prevents double-processing if the client retries on timeout.
RMQServer(server_name: str, server_endpoint: str, log_level: RMQLogLevel = RMQLogLevel.INFO)| Parameter | Description | Example |
|---|---|---|
server_name |
Unique name for this server instance (used in logging and SHM paths) | "robot_server" |
server_endpoint |
ZeroMQ endpoint to bind to | "tcp://*:5555" or "ipc:///tmp/feeds/0" |
log_level |
Logging verbosity | RMQLogLevel.INFO |
Endpoint formats:
tcp://*:PORT— Listen on all interfaces (use for network communication)tcp://0.0.0.0:PORT— Same as above, explicit bind-allipc:///path/to/socket— Unix domain socket (local only, lower latency than TCP)
server.add_topic(topic: str, message_remaining_time_s: float) -> NoneCreates a regular (ZeroMQ-only) topic. Messages older than message_remaining_time_s seconds are automatically discarded.
server.add_shared_memory_topic(topic: str, message_remaining_time_s: float, shared_memory_size_gb: float) -> NoneCreates a shared memory topic with a ring buffer of shared_memory_size_gb gigabytes. Large data is stored directly in /dev/shm for zero-copy local access.
server.put_data(topic: str, data: bytes) -> NonePublishes data to a topic. The data is stored in the topic's queue and timestamped automatically. Expired messages are pruned on each insertion.
server.peek_data(topic: str, n: int) -> tuple[list[bytes], list[float]]Reads n messages from the topic without removing them. Returns a tuple of (data_list, timestamp_list).
server.pop_data(topic: str, n: int) -> tuple[list[bytes], list[float]]Reads n messages from the topic and removes them. Same return format as peek_data.
Indexing for n:
| Value | Behavior |
|---|---|
n > 0 |
First (oldest) n messages |
n < 0 |
Last (newest) |n| messages, in chronological order |
n = 0 |
All messages currently in the topic |
server.wait_for_request(timeout_s: float) -> tuple[bytes, str]Blocks until a client sends a request_with_data() call, or until timeout_s elapses. Returns (request_data, topic_name). If the timeout elapses, returns (b"", "").
server.reply_request(topic: str, data: bytes) -> NoneSends a reply back to the client that issued the request on the given topic. Must be called after wait_for_request() returns a valid request.
server.get_all_topic_status() -> dict[str, int]Returns a dictionary mapping topic names to their current message count.
server.get_timestamp() -> floatReturns seconds elapsed since the server's internal clock started (the first message event). Useful for consistent timestamping across topics.
server.reset_start_time(system_time_us: int) -> NoneResets the internal clock to align with a system timestamp. Pass robotmq.system_clock_us() to synchronize with wall-clock time.
RMQClient(client_name: str, server_endpoint: str, log_level: RMQLogLevel = RMQLogLevel.INFO)| Parameter | Description | Example |
|---|---|---|
client_name |
Unique name for this client instance (used in logging) | "control_loop" |
server_endpoint |
ZeroMQ endpoint to connect to (must match server) | "tcp://192.168.1.10:5555" |
log_level |
Logging verbosity | RMQLogLevel.INFO |
Note: For TCP, the client uses the server's IP address (not *). For IPC, the path must match exactly.
client.peek_data(topic: str, n: int, timeout_s: float = 1.0, automatic_resend: bool = True) -> tuple[list[bytes], list[float]]Reads n messages from the remote topic without removing them. The n parameter follows the same convention as the server (positive = oldest, negative = newest, zero = all).
client.pop_data(topic: str, n: int, timeout_s: float = 1.0, automatic_resend: bool = True) -> tuple[list[bytes], list[float]]Reads n messages and removes them from the server's topic.
client.put_data(topic: str, data: bytes, timeout_s: float = 1.0, automatic_resend: bool = True) -> NoneSends data to a topic on the server. This allows clients to publish data to server-managed topics (useful for bidirectional communication).
| Parameter | Description |
|---|---|
timeout_s |
Seconds to wait for server response before retrying. Default: 1.0 |
automatic_resend |
If True, automatically retry on timeout (up to 800 retries). If False, raise an exception immediately on the first timeout. Default: True |
client.request_with_data(topic: str, data: bytes, timeout_s: float = 1.0, automatic_resend: bool = True) -> bytesSends data as a request to the server's topic and blocks until the server replies. The server must call wait_for_request() + reply_request() to handle it. Returns the reply data as bytes.
Built-in deduplication: if the client retries (due to timeout), the server recognizes the duplicate request by its timestamp and returns the cached reply without re-processing.
client.get_topic_status(topic: str, timeout_s: float) -> intQueries the server for the status of a topic. Unlike other client methods, timeout_s has no default value and must be provided explicitly. If timeout_s is negative, the call blocks indefinitely until the server responds.
This is the only client method that detects server disconnection via a return code (-2) rather than raising an exception. (Other client methods with automatic_resend=False raise an exception on timeout instead.)
| Return Value | Meaning |
|---|---|
-2 |
Server unreachable (no reply within timeout_s seconds; never returned when timeout_s < 0) |
-1 |
Server connected, but topic does not exist |
0 |
Topic exists, but contains no messages |
> 0 |
Number of messages currently in the topic |
client.get_last_retrieved_data() -> tuple[list[bytes], list[float]]Returns the data from the most recent peek_data, pop_data, put_data, or request_with_data call. Useful for re-accessing the last result without another network round-trip.
client.get_timestamp() -> floatReturns seconds elapsed since the client's internal clock started.
client.reset_start_time(system_time_us: int) -> NoneSynchronizes the client's internal clock with a system timestamp.
from robotmq import serialize, deserialize # top-level exports
from robotmq.utils import clear_shared_memory # only available from robotmq.utilsRecursively serializes arbitrarily nested structures (dicts, lists, tuples) containing numpy arrays and other picklable types. Numpy arrays are converted to (raw_bytes, dtype_str, shape_tuple) before pickling, which avoids numpy version incompatibilities between sender and receiver.
data = {
"image": np.random.rand(480, 640, 3),
"joints": np.array([0.1, 0.2, 0.3]),
"metadata": {"timestamp": 1234.5, "frame_id": 42}
}
payload = serialize(data) # safe to send across numpy versionsReverse of serialize(). Automatically detects and reconstructs numpy arrays from the (bytes, dtype_str, shape) representation. Returns None with a warning if given empty bytes.
result = deserialize(payload)
# result["image"].shape == (480, 640, 3)Removes all shared memory files in /dev/shm created by robotmq for the current user (files matching rmq_{username}_*). Call this to clean up leftover shared memory from crashed processes.
An enum controlling the verbosity of the C++ spdlog logger:
| Level | Description |
|---|---|
RMQLogLevel.TRACE |
Most verbose — all internal operations |
RMQLogLevel.DEBUG |
Debug-level details |
RMQLogLevel.INFO |
Normal operation messages (default) |
RMQLogLevel.WARNING |
Warnings only |
RMQLogLevel.ERROR |
Errors only |
RMQLogLevel.CRITICAL |
Critical errors only |
RMQLogLevel.OFF |
No logging |
robotmq.steady_clock_us() -> int # Monotonic clock in microseconds (for durations)
robotmq.system_clock_us() -> int # Wall-clock time in microseconds (for synchronization)A producer process reads from a sensor and publishes data; a consumer process reads it at its own pace.
Producer (Server):
import robotmq
from robotmq.utils import serialize
import numpy as np
server = robotmq.RMQServer("sensor_server", "tcp://*:5555")
server.add_shared_memory_topic("camera", message_remaining_time_s=5.0, shared_memory_size_gb=2.0)
while True:
frame = capture_camera_frame() # returns np.ndarray
server.put_data("camera", serialize(frame))Consumer (Client):
import robotmq
from robotmq.utils import deserialize
client = robotmq.RMQClient("controller", "tcp://192.168.1.10:5555")
while True:
# Get only the latest frame (n=-1), non-destructively
data_list, timestamps = client.peek_data("camera", n=-1)
if data_list:
frame = deserialize(data_list[0])
process(frame)Key points:
peek_datawithn=-1always gets the most recent message — ideal for control loops that only care about the latest state.pop_datawithn=0drains the queue — useful for loggers that need every message.- The server and client can run at different rates. Old messages expire automatically.
A robot controller sends observations to a GPU server and receives actions back.
GPU Server:
import robotmq
from robotmq.utils import serialize, deserialize
server = robotmq.RMQServer("policy_server", "tcp://*:5555")
server.add_shared_memory_topic("inference", message_remaining_time_s=10.0, shared_memory_size_gb=1.0)
model = load_policy_model()
while True:
request_data, topic = server.wait_for_request(timeout_s=1.0)
if not topic: # timeout
continue
observation = deserialize(request_data)
action = model.predict(observation)
server.reply_request(topic, serialize(action))Robot Controller (Client):
import robotmq
from robotmq.utils import serialize, deserialize
client = robotmq.RMQClient("robot", "tcp://gpu-server:5555")
while True:
obs = get_robot_observation()
action_bytes = client.request_with_data("inference", serialize(obs), timeout_s=5.0)
action = deserialize(action_bytes)
execute_action(action)Key points:
request_with_datablocks until the server replies — simple synchronous RPC semantics.- Built-in retry and deduplication: if the client times out and retries, the server won't re-run inference; it returns the cached result.
- Works over the network — run the policy on a GPU server and the robot controller on an edge device.
A single server can handle both asynchronous data streams and synchronous requests simultaneously. This is useful when a robot controller needs to both stream sensor data and handle inference requests.
server = robotmq.RMQServer("robot_server", "tcp://*:5555")
server.add_topic("joint_states", message_remaining_time_s=10.0)
server.add_shared_memory_topic("inference", message_remaining_time_s=5.0, shared_memory_size_gb=1.0)
# In the main loop:
# 1. Publish sensor data asynchronously
server.put_data("joint_states", serialize(joint_positions))
# 2. Handle inference requests synchronously
request_data, topic = server.wait_for_request(timeout_s=0.01)
if topic:
result = process_request(deserialize(request_data))
server.reply_request(topic, serialize(result))| Aspect | ROS2 | robotmq |
|---|---|---|
| Setup | Requires ROS installation, workspace, package structure, CMake/colcon build | pip install robotmq |
| Message types | Must define .msg files, generate code, rebuild |
Any picklable Python object works |
| Cross-version | Strict version matching between nodes | Server and client can use different Python versions and different numpy versions |
| Shared memory | Available in ROS2 (Humble+), but requires DDS configuration | Built-in, one-line add_shared_memory_topic() |
| Learning curve | Steep — publishers, subscribers, executors, QoS profiles, launch files | Two classes, ~5 methods total |
| Dependency weight | Gigabytes of packages | Single pip package, ~few MB |
When to use ROS2 instead: If you need a full robotics middleware (TF transforms, parameter server, lifecycle management, multi-language support, standardized message types for interoperability).
| Aspect | multiprocessing | robotmq |
|---|---|---|
| GIL | Queue operations acquire the GIL | C++ background thread, GIL-free |
| Network | Local only | TCP support for cross-machine communication |
| Message expiration | Manual management | Automatic time-based expiration |
| Peek semantics | Not supported (get is destructive) | Both peek (non-destructive) and pop (destructive) |
| Large arrays | Pickle serialization overhead | Shared memory ring buffer, ~2 GB/s |
| Aspect | pyzmq | robotmq |
|---|---|---|
| Blocking | recv() blocks the Python thread |
Background C++ thread, non-blocking Python |
| Topic management | Manual implementation | Built-in topics with expiration and indexing |
| Shared memory | Not included | Integrated SHM ring buffer |
| Request deduplication | Manual implementation | Built-in for request-reply pattern |
| Serialization | BYO | Included numpy-safe serialize()/deserialize() |
| Aspect | Redis/RabbitMQ | robotmq |
|---|---|---|
| Deployment | External service to install and maintain | Embedded in your Python process |
| Large binary data | Not optimized (base64 encoding, size limits) | Native bytes, shared memory for large payloads |
| Latency | Extra network hop to broker | Direct peer-to-peer (or shared memory) |
| Dependencies | Server process + client library | Single pip package |
- Zero-GIL data path: All I/O happens in C++ threads. Python never blocks on network operations.
- Adaptive transport: Choose shared memory (~2 GB/s) or TCP (~20 MB/s) per topic based on data size and network topology.
- Robotics-native indexing:
n=-1for "latest only",n=0for "drain all" — matches how control loops and loggers actually consume data. - Automatic message expiration: No unbounded memory growth. Set a time window and forget about it.
- Cross-environment compatibility: Server and client can run in different Python environments, different Python versions, and different numpy versions without serialization errors.
- Minimal API: Two classes, ~10 methods. No schema definitions, no build systems, no broker processes.
- Request-reply with deduplication: Synchronous RPC semantics with built-in retry safety — critical for robot control where double-executing an action is dangerous.
| Scenario | Throughput | Notes |
|---|---|---|
| Shared memory, large arrays (76 MB) | ~2 GB/s | Ring buffer in /dev/shm, mutex-protected |
| TCP, local loopback | ~500 MB/s | Depends on message size |
| TCP, across network | ~20 MB/s | Limited by network bandwidth |
| Message serialization (numpy) | ~1 GB/s | tobytes() is near-memcpy speed |
serialize()/deserialize() |
Slightly slower | Adds pickle overhead for structure metadata |
Memory usage:
- Regular topics: Messages stored in server process heap. Bounded by
message_remaining_time_s× publish rate × message size. - Shared memory topics: Fixed allocation of
shared_memory_size_gbin/dev/shm. Ring buffer reclaims space automatically.
Prebuilt wheels for Linux x86_64 and aarch64:
pip install robotmq# Using conda (recommended for C++ dependencies)
conda install spdlog cppzmq zeromq boost pybind11 cmake make gxx_linux-64 -y
# Install in development mode
pip install -e .
# Or install as a regular package
pip install .Build dependencies: C++17 compiler, CMake, ZeroMQ, cppzmq, spdlog, Boost, pybind11, fmt.
If you see errors related to numpy._core when exchanging data between processes with different numpy versions, use serialize() / deserialize() instead of pickle.dumps() / pickle.loads(). These can be imported directly from robotmq (from robotmq import serialize, deserialize). They convert numpy arrays to raw bytes with dtype metadata, avoiding version-specific pickle representations.
If a server process crashes, shared memory files may remain in /dev/shm. Clean them up with:
from robotmq.utils import clear_shared_memory
clear_shared_memory()Or manually: rm /dev/shm/rmq_${USER}_*
get_topic_status() closes and recreates the ZeroMQ socket each time the server is unreachable. Due to file descriptors not being released immediately, calling it repeatedly without a server response can eventually hit the OS file descriptor limit (~1024). Avoid polling get_topic_status() in a tight loop — add a sleep or use it only for initial connection checks.
If you're using mixed mode (both async put/peek and sync request-reply), be aware that the server's background thread handles requests sequentially. If wait_for_request() is blocking for a long timeout, other clients may experience delays. Use short timeouts in mixed-mode servers.
If the server's port is firewalled, forward it through SSH:
ssh -L 5555:server_ip:5555 user@server_ipThen connect the client to tcp://localhost:5555.