●提高系统可靠性:避免因未处理的死信而导致程序异常,提高系统的可靠性。
●实现延迟消息:可以通过设置TTL时间,将超时未消费的消息转移到死信队列中,实现延迟消息的功能。
●防止滥用:当某些生产者恶意发送低质量的消息或进行滥用时,可以通过丢弃或重定向死信消息来防止滥用和恶意攻击。
●消息格式错误:当消息格式错误时,可能会导致消费者无法正确地解析或处理该消息,这个问题通常与生产者的代码有关。为了避免消息失效,并提高系统可靠性,我们可以使用死信队列。
●消费者故障:另一个常见的场景是消息处理者无法正确地处理或响应到推入到队列中的消息,例如消费者创建了一个协程并在逻辑执行完成后未正确地关闭该协程。由于该协程始终处于打开状态,它将一直阻止该消费者对其他消息进行正确消费。为了避免这种消息挂起并影响其他消息的正常处理,可以将其加入死信中心。
RabbitMQ实现
创建交换机和队列
import pika
def main():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
# 创建死信交换机
dlx_exchnage_name = 'my-dlx-exchange'
with pika.BlockingConnection(parameters) as connection:
channel = connection.channel()
channel.exchange_declare(exchange=dlx_exchnage_name, exchange_type='fanout', durable=True)
# 创建死信队列和交换机
dead_letter_queue_name = 'dead-letter-queue'
with pika.BlockingConnection(parameters) as connection:
channel = connection.channel()
channel.queue_declare(queue=dead_letter_queue_name, durable=True)
channel.queue_bind(queue=dead_letter_queue_name, exchange=dlx_exchnage_name)
# 创建消息队列,并将其绑定到死信队列上
queue_name = "job-queue"
arguments = {"x-dead-letter-exchange": dlx_exchnage_name}
with pika.BlockingConnection(parameters) as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True,
arguments=arguments)
channel.queue_bind(exchange='', queue=queue_name, routing_key=queue_name)
print("Queue is created")
if __name__ == '__main__':
main()
import pika
def send_message():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 发送一个消息5秒后过期,并且未被消费端确认
queue_name = "job-queue"
properties = pika.BasicProperties(delivery_mode=2,
expiration="5000")
channel.basic_publish(exchange='', routing_key=queue_name, body='Hello World!', properties=properties)
channel.close()
connection.close()
def receive_message():
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
dlx_exchnage_name = 'my-dlx-exchange'
def callback(ch, method, properties, body):
print("Receivedmessage: %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消费来自job-queue的消息
queue_name = "job-queue"
channel.basic_consume(queue_name, callback)
channel.start_consuming()
if __name__ == '__main__':
send_message()
receive_message()
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
@Configuration
public class RabbitConfig {
// 死信交换机
public static final String DLX_EXCHANGE_NAME = "my-dlx-exchange";
// 死信队列
public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";
// 业务处理队列
public static final String PROCESS_QUEUE_NAME = "process-queue";
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange(DLX_EXCHANGE_NAME);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.build();
}
@Bean
public Queue processQueue() {
return QueueBuilder.durable(PROCESS_QUEUE_NAME)
.withArgument("x-message-ttl", 5000)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.build();
}
@Bean
public Binding dlxBinding(Queue deadLetterQueue, TopicExchange dlxExchange) {
return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");
}
}
以上代码创建了my-dlx-exchange死信交换机和dead-letter-queue死信队列。同时创建一个process-queue业务处理队列,该队列设置了消息的生存时间为5s,并在该时间内未被消费者消费,则将该消息转移到死信队列中。
发送和接收消息
@Component
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("", RabbitConfig.PROCESS_QUEUE_NAME, message);
}
}
@Component
public class MessageReceiver {
@RabbitListener(queues = "process-queue")
public void receive(String message) throws Exception {
System.out.println("Received message: " + message);
Thread.sleep(10000);
}
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack