Messaging
ZMQ is a library for messaging that enables the creation of asynchronous applications.
Unlike MQTT, RabbitMQ or Kafka, ZMQ has a variety of socket types and transports:
- PUB-SUB
- REQ-REP
- PUSH-PULL
- ROUTER
- PAIR
- Websocket
- ... and some more
It can be extremely fast, reliable and does not require a broker (although it is easy to create a broker-system using ZMQ if required).
multithreaded PAIR¶
Here is a example that uses PAIR (bi-directional communication) in a threaded application:
import threading
import zmq
def zeromq_thread():
context = zmq.Context()
socket1 = context.socket(zmq.PAIR)
socket1.bind("tcp://*:5555")
socket2 = context.socket(zmq.PAIR)
socket2.connect("tcp://localhost:5555")
while True:
message = socket1.recv()
print("Received message: ", message)
socket2.send(message)
thread = threading.Thread(target=zeromq_thread)
thread.start()
Deadlocks¶
ZeroMQ does not prevent deadlocks directly, but it can help to avoid them by providing an asynchronous messaging system. This means that messages are sent and received in an asynchronous manner, so that if one part of the system is blocked, the other parts can still continue to communicate. This helps to avoid the situation where two parts of the system are waiting for each other to respond, which can lead to a deadlock.
Polling¶
In the context of ZeroMQ, polling is the process of checking a socket for incoming messages.
This is done by calling zmq_poll()
, which takes a list of sockets and a timeout value as parameters.
The function will then wait for the specified amount of time for any incoming messages on the sockets.
If any messages are received, the function will return a list of the sockets that have received messages.
Polling is a useful way to check for incoming messages without blocking the main thread of execution.
The recv()
method on a ZeroMQ socket is blocking, which means that the execution of the program will be paused until a message is received on that socket.
However, in some cases, you may want to monitor multiple sockets for incoming messages, and you may not want the program to block on any single socket.
In such scenarios, you can use the poll() function to check multiple sockets for incoming messages at the same time, and only block execution until at least one of the sockets has a message waiting. This way, you can handle messages from multiple sources concurrently without blocking the execution of your program on any single socket.
The `poll() method also allows you to specify a timeout, so if none of the sockets have any messages waiting after a certain amount of time, the function will return and allow the program to continue execution.
In other words, the `poll() function is used to efficiently monitor multiple sockets for incoming messages, and only block execution when necessary.
This example creates a context, two sockets (one for sending and one for receiving), and a poller. The poller is used to monitor the sockets for incoming messages. The while loop runs indefinitely, and on each iteration, it uses the poller to check if there is a message waiting on the receiver socket. If there is, it prints the message. On each iteration, the loop sends a message on the sender socket every 2 seconds.
import zmq
# Create a context and two sockets, one for sending and one for receiving
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5555")
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5556")
# Create a poller to monitor the sockets
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
while True:
# Poll the sockets for incoming messages
socks = dict(poller.poll())
# If there is a message on the receiver socket, print it
if receiver in socks:
message = receiver.recv()
print(f"Received message: {message}")
# Send a message on the sender socket every 2 seconds
sender.send(b"Hello World")
time.sleep(2)
This example creates a context, two sockets (one for sending and one for receiving), and a poller. The poller is used to monitor the sockets for incoming messages. The while loop runs indefinitely, and on each iteration, it uses the poller to check if there is a message waiting on the receiver socket. If there is, it prints the message. On each iteration, the loop sends a message on the sender socket every 2 seconds.