ZeroMQ (ZMQ) Messaging

ZeroMQ is a high-performance asynchronous messaging library that enables the creation of distributed applications. Unlike traditional message brokers (MQTT, RabbitMQ, Kafka), ZMQ works as an embedded library with no central server required.

Key characteristics:

Socket Patterns

ZMQ provides different socket types for different communication patterns. Each socket type has specific messaging semantics.

Pattern Socket Types Use Case
Request-Reply REQ, REP Synchronous RPC-like communication
Publish-Subscribe PUB, SUB One-to-many broadcasting
Pipeline PUSH, PULL Task distribution (fan-out/fan-in)
Exclusive Pair PAIR Thread-to-thread, process-to-process
Router ROUTER, DEALER Async request-reply with routing

PUB-SUB: Publish-Subscribe

One publisher sends messages to many subscribers. Subscribers connect to topics they're interested in.

Publisher

import zmq
import time

context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5555")

while True:
    publisher.send_string(f"topic1 Hello World {time.time()}")
    publisher.send_string(f"topic2 Special message {time.time()}")
    time.sleep(1)

Subscriber

import zmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5555")

# Subscribe to topics
subscriber.setsockopt(zmq.SUBSCRIBE, b"topic1")
subscriber.setsockopt(zmq.SUBSCRIBE, b"")  # Subscribe to everything

while True:
    message = subscriber.recv_string()
    print(f"Received: {message}")
Note: SUB sockets have a slow-joiner problem. Add a small sleep after connecting to allow the subscription to propagate.

REQ-REP: Request-Reply

Synchronous request-response pattern. The client sends a request and waits for a reply.

Server (REP)

import zmq

context = zmq.Context()
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")

while True:
    request = server.recv_string()
    print(f"Received: {request}")
    
    # Process request and send reply
    response = f"Reply to: {request}"
    server.send_string(response)

Client (REQ)

import zmq

context = zmq.Context()
client = context.socket(zmq.REQ)
client.connect("tcp://localhost:5555")

for i in range(5):
    client.send_string(f"Request {i}")
    reply = client.recv_string()
    print(f"Reply: {reply}")

PUSH-PULL: Pipeline

Asymmetric pattern where pushers send to pullers. Great for task distribution and worker pools.

Pusher (Ventilator)

import zmq
import time

context = zmq.Context()
pusher = context.socket(zmq.PUSH)
pusher.bind("tcp://*:5555")

for i in range(100):
    pusher.send_string(f"Task {i}")
    print(f"Sent task {i}")

pusher.send_string(b"END")  # Signal end

Puller (Worker)

import zmq

context = zmq.Context()
puller = context.socket(zmq.PULL)
puller.connect("tcp://localhost:5555")

while True:
    message = puller.recv_string()
    if message == "END":
        break
    print(f"Processing: {message}")
    # Do actual work here...

ROUTER-DEALER: Async Router

Asymmetric request-reply with identity tracking. Useful for building service-oriented architectures.

# Router - handles multiple clients with identity
context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5555")

while True:
    # Messages are [identity, empty, payload]
    identity = router.recv()
    router.recv()  # Empty delimiter
    payload = router.recv()
    
    print(f"From {identity}: {payload}")
    
    # Reply back to specific client
    router.send(identity, zmq.SNDMORE)
    router.send(b"")  # Empty delimiter
    router.send(b"Reply to" + payload)

PAIR: Exclusive Pair

Bi-directional communication between exactly two peers. Unlike other patterns, PAIR maintains a persistent connection.

# Peer A
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:5555")

# Connect from another process/thread
# socket.connect("tcp://localhost:5555")

socket.send(b"Hello from A")
response = socket.recv()
print(response)

Multithreading with ZMQ

ZMQ objects (contexts, sockets) should not be shared between threads. Each thread should create its own sockets.

import threading
import zmq

def worker_thread():
    # Each thread gets its own context and sockets
    context = zmq.Context()
    receiver = context.socket(zmq.PULL)
    receiver.connect("inproc://worker-channel")
    
    while True:
        msg = receiver.recv_string()
        print(f"Worker received: {msg}")

# Main thread - push work to workers
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("inproc://worker-channel")

# Start worker threads
worker = threading.Thread(target=worker_thread)
worker.start()

# Send work
sender.send_string(b"Task 1")
sender.send_string(b"Task 2")
Tip: Use inproc:// transport for fast intra-process communication between threads.

Avoiding Deadlocks

ZMQ doesn't prevent deadlocks - it's up to you to avoid them. Common issues:

The RACE

When a REQ socket sends before the connection is established, the message might be lost.

# Solution: Wait a bit or use explicit synchronization
import time
time.sleep(0.5)  # Let connection establish

Lost Messages

PUB socket won't wait for subscribers. Use pub-sub proxies for reliable delivery.

Polling with zmq.Poller

Monitor multiple sockets simultaneously without blocking on any single one.

import zmq
import time

context = zmq.Context()

# Create multiple sockets
socket1 = context.socket(zmq.PULL)
socket1.connect("tcp://localhost:5555")
socket2 = context.socket(zmq.PULL)
socket2.connect("tcp://localhost:5556")
socket3 = context.socket(zmq.SUB)
socket3.connect("tcp://localhost:5557")
socket3.setsockopt(zmq.SUBSCRIBE, b"")

# Create poller
poller = zmq.Poller()
poller.register(socket1, zmq.POLLIN)
poller.register(socket2, zmq.POLLIN)
poller.register(socket3, zmq.POLLIN)

while True:
    # Poll with 1000ms timeout
    events = dict(poller.poll(1000))
    
    if socket1 in events:
        print(f"Socket1: {socket1.recv()}")
    if socket2 in events:
        print(f"Socket2: {socket2.recv()}")
    if socket3 in events:
        print(f"Socket3: {socket3.recv()}")

Transports

ZMQ supports different transport mechanisms for different scenarios:

Transport Prefix Use Case
inproc inproc://name Intra-process (fastest)
IPC ipc://path Inter-process on same machine
TCP tcp://host:port Network communication
PGM/EPGM pgm:// Reliable multicast

PyZMQ: Python Bindings

PyZMQ provides Python bindings for ZeroMQ with support for asyncio.

Installation

pip install pyzmq

Async IO Support

import asyncio
import zmq.asyncio

async def receiver():
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    
    while True:
        msg = await socket.recv()
        print(f"Received: {msg}")

asyncio.run(receiver())

Context Management

import zmq

# Option 1: Manual termination
context = zmq.Context()
# ... use sockets ...
context.term()  # Wait for all sockets to close

# Option 2: Use context manager (Python 3.10+)
with zmq.Context() as context:
    socket = context.socket(zmq.REQ)
    # ... use socket ...
# Context automatically terminated

Common Options

socket.setsockopt(zmq.RCVTIMEO, 1000)  # Receive timeout (ms)
socket.setsockopt(zmq.SNDTIMEO, 1000)  # Send timeout (ms)
socket.setsockopt(zmq.LINGER, 0)    # Don't wait on close
socket.setsockopt(zmq.HWM, 1000)       # High water mark

Monitoring Sockets

# Monitor for events on a socket
monitor = socket.get_monitor_socket()
while True:
    event = monitor.recv()
    print(zmq.events[event[0]])
socket.close_monitor()