ZeroMQ (ZMQ) Messaging
ZeroMQ is a high-performance asynchronous messaging library that enables the creation of distributed applications. Unlike traditional message brokers (MQTT, RabbitMQ, Kafka), ZMQ works as an embedded library with no central server required.
Key characteristics:
- Brokerless - Can work peer-to-peer without any central server
- Multiple patterns - PUB-SUB, REQ-REP, PUSH-PULL, ROUTER, PAIR, etc.
- Multiple transports - inproc, IPC, TCP, UDP, multicast
- Extremely fast - Designed for high-throughput scenarios
- Async by default - Non-blocking message passing
Socket Patterns
ZMQ provides different socket types for different communication patterns. Each socket type has specific messaging semantics.
| Pattern | Socket Types | Use Case |
|---|---|---|
| Request-Reply | REQ, REP | Synchronous RPC-like communication |
| Publish-Subscribe | PUB, SUB | One-to-many broadcasting |
| Pipeline | PUSH, PULL | Task distribution (fan-out/fan-in) |
| Exclusive Pair | PAIR | Thread-to-thread, process-to-process |
| Router | ROUTER, DEALER | Async request-reply with routing |
PUB-SUB: Publish-Subscribe
One publisher sends messages to many subscribers. Subscribers connect to topics they're interested in.
Publisher
import zmq
import time
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5555")
while True:
publisher.send_string(f"topic1 Hello World {time.time()}")
publisher.send_string(f"topic2 Special message {time.time()}")
time.sleep(1)
Subscriber
import zmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5555")
# Subscribe to topics
subscriber.setsockopt(zmq.SUBSCRIBE, b"topic1")
subscriber.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to everything
while True:
message = subscriber.recv_string()
print(f"Received: {message}")
REQ-REP: Request-Reply
Synchronous request-response pattern. The client sends a request and waits for a reply.
Server (REP)
import zmq
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")
while True:
request = server.recv_string()
print(f"Received: {request}")
# Process request and send reply
response = f"Reply to: {request}"
server.send_string(response)
Client (REQ)
import zmq
context = zmq.Context()
client = context.socket(zmq.REQ)
client.connect("tcp://localhost:5555")
for i in range(5):
client.send_string(f"Request {i}")
reply = client.recv_string()
print(f"Reply: {reply}")
PUSH-PULL: Pipeline
Asymmetric pattern where pushers send to pullers. Great for task distribution and worker pools.
Pusher (Ventilator)
import zmq
import time
context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5555")
for i in range(100):
pusher.send_string(f"Task {i}")
print(f"Sent task {i}")
pusher.send_string(b"END") # Signal end
Puller (Worker)
import zmq
context = zmq.Context()
puller = context.socket(zmq.PULL)
puller.connect("tcp://localhost:5555")
while True:
message = puller.recv_string()
if message == "END":
break
print(f"Processing: {message}")
# Do actual work here...
ROUTER-DEALER: Async Router
Asymmetric request-reply with identity tracking. Useful for building service-oriented architectures.
# Router - handles multiple clients with identity
context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5555")
while True:
# Messages are [identity, empty, payload]
identity = router.recv()
router.recv() # Empty delimiter
payload = router.recv()
print(f"From {identity}: {payload}")
# Reply back to specific client
router.send(identity, zmq.SNDMORE)
router.send(b"") # Empty delimiter
router.send(b"Reply to" + payload)
PAIR: Exclusive Pair
Bi-directional communication between exactly two peers. Unlike other patterns, PAIR maintains a persistent connection.
# Peer A
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:5555")
# Connect from another process/thread
# socket.connect("tcp://localhost:5555")
socket.send(b"Hello from A")
response = socket.recv()
print(response)
Multithreading with ZMQ
ZMQ objects (contexts, sockets) should not be shared between threads. Each thread should create its own sockets.
import threading
import zmq
def worker_thread():
# Each thread gets its own context and sockets
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("inproc://worker-channel")
while True:
msg = receiver.recv_string()
print(f"Worker received: {msg}")
# Main thread - push work to workers
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("inproc://worker-channel")
# Start worker threads
worker = threading.Thread(target=worker_thread)
worker.start()
# Send work
sender.send_string(b"Task 1")
sender.send_string(b"Task 2")
inproc:// transport for fast intra-process communication between threads.
Avoiding Deadlocks
ZMQ doesn't prevent deadlocks - it's up to you to avoid them. Common issues:
The RACE
When a REQ socket sends before the connection is established, the message might be lost.
# Solution: Wait a bit or use explicit synchronization
import time
time.sleep(0.5) # Let connection establish
Lost Messages
PUB socket won't wait for subscribers. Use pub-sub proxies for reliable delivery.
Polling with zmq.Poller
Monitor multiple sockets simultaneously without blocking on any single one.
import zmq
import time
context = zmq.Context()
# Create multiple sockets
socket1 = context.socket(zmq.PULL)
socket1.connect("tcp://localhost:5555")
socket2 = context.socket(zmq.PULL)
socket2.connect("tcp://localhost:5556")
socket3 = context.socket(zmq.SUB)
socket3.connect("tcp://localhost:5557")
socket3.setsockopt(zmq.SUBSCRIBE, b"")
# Create poller
poller = zmq.Poller()
poller.register(socket1, zmq.POLLIN)
poller.register(socket2, zmq.POLLIN)
poller.register(socket3, zmq.POLLIN)
while True:
# Poll with 1000ms timeout
events = dict(poller.poll(1000))
if socket1 in events:
print(f"Socket1: {socket1.recv()}")
if socket2 in events:
print(f"Socket2: {socket2.recv()}")
if socket3 in events:
print(f"Socket3: {socket3.recv()}")
Transports
ZMQ supports different transport mechanisms for different scenarios:
| Transport | Prefix | Use Case |
|---|---|---|
| inproc | inproc://name |
Intra-process (fastest) |
| IPC | ipc://path |
Inter-process on same machine |
| TCP | tcp://host:port |
Network communication |
| PGM/EPGM | pgm:// |
Reliable multicast |
PyZMQ: Python Bindings
PyZMQ provides Python bindings for ZeroMQ with support for asyncio.
Installation
pip install pyzmq
Async IO Support
import asyncio
import zmq.asyncio
async def receiver():
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE, b"")
while True:
msg = await socket.recv()
print(f"Received: {msg}")
asyncio.run(receiver())
Context Management
import zmq
# Option 1: Manual termination
context = zmq.Context()
# ... use sockets ...
context.term() # Wait for all sockets to close
# Option 2: Use context manager (Python 3.10+)
with zmq.Context() as context:
socket = context.socket(zmq.REQ)
# ... use socket ...
# Context automatically terminated
Common Options
socket.setsockopt(zmq.RCVTIMEO, 1000) # Receive timeout (ms)
socket.setsockopt(zmq.SNDTIMEO, 1000) # Send timeout (ms)
socket.setsockopt(zmq.LINGER, 0) # Don't wait on close
socket.setsockopt(zmq.HWM, 1000) # High water mark
Monitoring Sockets
# Monitor for events on a socket
monitor = socket.get_monitor_socket()
while True:
event = monitor.recv()
print(zmq.events[event[0]])
socket.close_monitor()