Synchronous Messaging VS. Asynchronous Messaging
Synchronous Messaging and Asynchronous Messaging are two commnication paradigms used in distributed systems. In synchronous messaging, the sender will not continue executing until it receives the acknowledged message from the receiver. This paradigm is commonly used in situations that require immediate feedback to continue operations, such as payments or transaction processing.
The advantages of this paradigm include ease of implementation and usefulness in situations where immediate feedback is needed. However, the drawbacks are also obvious: it may lead to performance bottlenecks when the receiver is slow or unavailable, and it can also limit scalability.
--- title: Synchronous Messaging --- sequenceDiagram participant Sender participant MessageBus participant MessageListener Sender->>MessageBus: MessageBusUtil.sendSynchronousMessage() MessageBus->>MessageListener: destination.send() MessageListener-->>MessageBus: return response note over Sender: (block and wait) MessageBus-->>Sender: return response
In asynchronous messaging, the sender doesn’t block after sending the message, meaning it can continue processing without waiting for the response from the receiver. It is commonly used in situations where systems can tolerate delays in response, like message notification after a successful payment. Compared to synchronous messaging, asynchronous messaging is more complex to implement. Additionaly, the sender has no guarantee of immediate feedback. It requires extra techniques, such as callback functions or message listeners, to handle the response.
sequenceDiagram participant Sender participant MessageBus as Message Bus participant MessageListener1 as Message Listener 1 Sender->>MessageBus: MessageBusUtil.sendMessage() MessageBus->>MessageListener: destination.send() MessageListener-->>MessageBus: return response Sender->>Sender: (continue with processing) MessageBus-->>Sender: return response
RabbitMQ
Introduction to RabbitMQ
RabbitMQ is a message broker that facilitates asynchronous communication between distributed systems. Its working mechnism involves four key components:
- Producer: Responsible for sending messages
- Exchange: Handles the routing of messages to the appropriate queues.
- Queue: Stores messages until they are consumed.
- Consumer: Processes the messages from the queue.
--- title: Introduction to RabbitMQ --- graph LR P((P)) -->|send| X{{Exchange}} -->|binding| Q[[Queue]] -->|receive| C((C))
Example: Hello RabbitMQ
SHOW CODE
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Producer {
// Name of the queue where the message will be sent
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// Create a connection factory to configure and connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Configure RabbitMQ server address (default is localhost)
factory.setHost("localhost");
// Configure the port for RabbitMQ (default is 5672)
factory.setPort(5672);
// Use try-with-resource to automatically close the connection and channel after use
try (Connection connection = factory.newConnection(); // Establish a connection to RabbitMQ
Channel channel = connection.createChannel()) { // Create a communication channel
// Declare a queue with the specified name
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
// Publish the message to the queue
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [P] Sent '" + message + "'");
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// Create a connection factory to configure and connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Configure RabbitMQ server address (default is localhost)
factory.setHost("localhost");
// Configure the port for RabbitMQ (default is 5672)
factory.setPort(5672);
// Establish a connection to RabbitMQ
Connection connection = factory.newConnection();
// Create a communication channel
Channel channel = connection.createChannel();
// Declare a queue with the specified name
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Define the callback function to handle incoming message
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// Retrieve the message body and convert it to a string using UTF-8 encoding
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [C] Received '" + message + "'");
};
// Start consuming messages from the queue
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
// No action required for cancellation in this example
});
}
}
Message Acknowledgment
In RabbitMQ, message acknowledgement is a mechanism that ensures messages have been received and processed successfully. There are two main types of message acknowledgement: auto ack and manual ack.
In auto-ack mode, RabbitMQ considers the message acknowledged as soon as it is delieved to the consumer. The consumer does not need to explictly send an acknowledgement. However, this can be risky. If the consumer fails due to a connection issue or channel break, the message may be lost, meaning it hasn’t been processed successfully.
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, consumer);
In manual ack mode, the consumer must send an acknowledgement back to RabbitMQ once the message has been processed successfully. If RabbitMQ does not receive the acknowledgement from the consumer within a specified time (30 minutes by default), it will redeliver the message to another consumer or the same consumer, depending on the configuration.
// Acknowledging a message manually
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
// Process the message here
channel.basicAck(envelope.getDeliveryTag(), false); // Acknowledge the message
}
});
Message Durability
In RabbitMQ, message durability refers to the ability to ensure that a message will not be lost in case of a broker failure. It involves saving both the queue and message to disk so they can be recovered after a system restart. To guarantee durability, two key components need to be considered: durable queues and persistent messages.
When a queue is declared as durable, it is saved to disk. The queue will still exist even if the broker fails.
boolean durable = true;
channel.queueDeclare("myQueue", durable, false, false, null);
To ensure a message’s persistence after a broker restart, it must be marked as persistent by setting the
delieveryMode
to 2 (persistent). This ensures that the message is saved to disk. However, it’s important to note that marking a message as persistent does not fully guarantee that it won’t be lost. There is still a short window in which RabbitMQ has accepted the message but has not yet saved it to disk. Additionaly, RabbitMQ does not callfsync(2)
for every message, meaning it may store the message in cache rather than writing it to disk immediately. Therefore, the durability guerantee is not strong.
// Publish the message to the queue
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, // delieveryMode = 2
message.getBytes("UTF-8"));
Prefetch
When a consumer subscribes to a message queue, RabbitMQ begins delivering messages. By default, RabbitMQ uses a Round Robin Dispatching strategy, which evenly distributes messages across multiple consumers. However, if one of the consumers processes tasks slowly, it may lead to message buildup and memory pressure.
In RabbitMQ, the
prefetch
mechnism controls the maximum number of messages a consumer can hold at a time. When the number of messages a consumer holds reaches the prefetch limit, RabbitMQ will stop sending additional messages to that consumer until it acknowledges some of the messages. This mechanism helps prevent message accumulation and reduces memory pressure.
--- title: Worker Queue - prefetch --- graph LR P((P)) -->|send| Q[[Worker]] -->|prefetch = 1| C1((C1)) Q -->|prefetch = 1| C2((C2))
SHOW CODE
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Producer {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// Create a connection factory to configure and connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Configure RabbitMQ server address (default is localhost)
factory.setHost("localhost");
// Configure the port for RabbitMQ (default is 5672)
factory.setPort(5672);
// Use try-with-resource to automatically close the connection and channel after use
try (Connection connection = factory.newConnection(); // Establish a connection to RabbitMQ
Channel channel = connection.createChannel()) { // Create a communication channel
// Declare a queue with the specified name
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ!";
// Publish the message to the queue
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [P] Sent '" + message + "'");
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
// Name of the queue where the message will be sent
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
// Create a connection factory to configure and connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Configure RabbitMQ server address (default is localhost)
factory.setHost("localhost");
// Establish a connection to RabbitMQ
final Connection connection = factory.newConnection();
// Create a communication channel
final Channel channel = connection.createChannel();
// Declare a queue with the specified name
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Specify the prefetch count
channel.basicQos(1);
// Define a callback to process incoming messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [C] Received '" + message + "'");
try {
try { // Simulate task processing
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
} finally {
// Acknowledge the message after work is completed
System.out.println(" [C] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// Start consuming messages with manual acknowledgment
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
}
Publish/Subscribe
Publish/Subscribe is a communication pattern where a message is sent by a producer to multiple consumers. In RabbitMQ, this pattern is implemented using Exchanges and Queues.
In the Publish/Subscribe pattern in RabbitMQ, the producer sends a message to an Exchange, which then routes the message to multiple queues that are bound to it. There are four types of exchanges in RabbitMQ:
fanout
: Routes messages to all queues bound to it, without considering any routing key.direct
: Routes messages to queues based on an exact match with the routing key.topic
: Routes messages to queues based on pattern matching of the routing key.headers
: Routes messages based on the attributes of message headers.
Fanout Exchange
--- title: Publish/Subscribe - fanout --- graph LR P((P)) -->|send| Ex{{fanout}} -->|binding| Q1[[Q1]] Ex -->|binding| Q2[[Q2]] Q1 --> C1((C1)) Q2 --> C2((C2))
SHOW CODE: Fanout Exchange
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
// Create a new connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
// Use try-with-resource to ensures automatic resource management
try (Connection connection = factory.newConnection(); // Establish a connection
Channel channel = connection.createChannel()) { // Create a channel
// Declare an exchange of type 'fanout' which sends messages to all bound queues
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
// Publish the message to the exchange with an empty routing key (for fanout exchange)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
import com.rabbitmq.client.*;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
// Create a new connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
// Establish a connection and create a new channel for communication
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare the exchange with the same name as in the emitter (logs) and of type 'fanout'
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// Bind the queue to the exchange with an empty routing key (for fanout exchange)
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Define the callback function to process incoming messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// Consume messages from the queue
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
Direct Exchange
--- title: Publish/Subscribe - direct --- graph LR P((P)) -->|send| Ex{{direct}} -->|error| Q1[[Q1]] Ex -->|info| Q2[[Q2]] Ex -->|warn| Q2[[Q2]] Ex -->|error| Q2[[Q2]] Q1 --> C1((C1)) Q2 --> C2((C2))
SHOW CODE: Direct Exchange
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
// Create a new connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
//Use try-with-resource for automatic resource management
try (Connection connection = factory.newConnection(); // Establish a new connection
Channel channel = connection.createChannel()) { // Create a new channel
// Declare an exchange with type of 'direct'
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Get the severity level (routing key) from the command-line arguments
String severity = getSeverity(argv);
// Get the message content from the command-line arguments
String message = getMessage(argv);
// Publish the message to the exchange with given severity as the routing key
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
private static String getSeverity(String[] strings) {
if (strings.length < 1) {
return "info";
}
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2) {
return "Hello World!";
}
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) {
return "";
}
if (length <= startIndex) {
return "";
}
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
// Create a new connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
// Establish a new connection
Connection connection = factory.newConnection();
// Create a new channel
Channel channel = connection.createChannel();
// Declare a direct exchange, where messages will be routed based on the routing key
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// Exit the program if no arguments are passed (no routing keys provided)
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
// Bind the queue to the exchange with a specific routing key
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Define the callback function for processing the received messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
// Consume messages from the queue
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
Topic Exchange
In RabbitMQ, the Topic Exchange is a type of exchange that routes messages to queues based on pattern matching of the routing key. The routing key in a topic exchange is typically a string of words separated by dots (
.
). The routing key can also contain wildcards like*
and#
. The*
wildcard matches exactly one word, while the#
wildcard matches zero or more words. Compared to thedirect
exchange, the topic exchange offers greater flexibility and efiiciency in routing messages to queues based on more complex patterns.
SHOW CODE: Topic Exchange
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
// Create a connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
// Use try-with-resource for automatic management
try (Connection connection = factory.newConnection(); // Establish a new connection
Channel channel = connection.createChannel()) { // Create a new channel
// Declare a topic exchange with specified name
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = getRouting(argv);
String message = getMessage(argv);
// Publish the message to the exchange
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length < startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
import com.rabbitmq.client.*;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
// Create a new connection factory to connect to RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// Set the host (default is 'localhost')
factory.setHost("localhost");
// Establish a new connection
Connection connection = factory.newConnection();
// Create a new channel
Channel channel = connection.createChannel();
// Declare a topic exchange with the specified name
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Define the callback function for process received messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
// Consume messages from the queue
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
Reliability: Publisher
Publisher Retry
Publisher Retry in RabbitMQ refers to the mechanism where the publisher reconnects to RabbitMQ after a network issue. This mechanism improves the likelihood of a successful connection when the network is unstable. However, the retry process is executed in blocking manner, which may lead to performance issues during reconnection. Below are the relavant configurations for
application.properties
.
spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled.
spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
spring.rabbitmq.listener.simple.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts.
spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful.
Publisher Confirm & Publisher Return
RabbitMQ provides two types of message confirmation mechanisms between the publisher and the broker: Publisher Confirm and Publisher Return. The Publisher Confirm mechanism allows the publisher to receive an
ack
from the broker when the message is successfully delivered to the broker. The Publisher Return mechanism enables the broker to notify the publisher when a message cannot be routed to any queue.
When a message is sent to the broker, if the message type is temporary and can be successfully routed to the specified queue, the broker will return an
ack
to the publisher. If the message type if durable and the message can be delievered to the queue and successfully saved to disk, the broker will return anack
. Otherwise, the broker will return annack
(negative acknowledgement) to the publisher.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
spring.rabbitmq.publisher-returns=false # Enable publisher returns.
The Publisher Confirm mechanism provides two types of confirmation: synchronous mode (
simple
) and asynchronous mode (correlated
). Synchronous mode offers a strict acknowledgement process but may lead to performance issues in high-throughput environments. It is typically used when reliability is the top priority. On the other hand, asynchronous mode improves throughput but requires that the acknowledgement process be handled in the background.
It is important to note that using Publisher Confirm and Publisher Return mechanism incurs additional network and system resource costs. Therefore, these mechanisms should be used with caution in production systems. If they must be used, it’s recommended to rely on Publisher Confirm only, as routing issues are often caused by business logic problems, and Publisher Return is less commonly needed. Additionally, for messages that are negatively acknowledged (
nack
), retry them within a limited time window to avoid an infinite retry loop. If retried continue to fail, record the exception logs instead of attempting retries indefinitely.