博客 默契之舞 之 生产者消费者模式(RabbitMQ)

默契之舞 之 生产者消费者模式(RabbitMQ)

   数栈君   发表于 2025-01-08 14:00  103  0

一、RabbitMQ 快速入门

1、创建一个空的项目

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f1f7e317e700234b8c3a288a1acd19f5..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/fda4baf26944bcdf0c59181a1c7b5bce..png


2、引入依赖

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/44c79a273b9ace71ae1d71dc43145a3a..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6aa84a90f11523f5cd22ef6f1c5f731c..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a1809cb086c55d5391c1de78b61405a4..png


com.rabbitmq
amqp-client
5.20.0


二、编写生产者代码

建立连接需要信息

1.ip
2.端口号
3.账号
4.密码
5.虚拟主机
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/67578675187494d47598405d1aab9749..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d4affeba95d6c7716dd13d52ad041396..png


生产者和消费者创建的 channel 并不是同⼀个
//2.开启信道
Channel channel = connection.createChannel();

//3.生产者需要声明一个交换机
//rabbitmq 在创建的时候就有默认的交换机
//使用内置的交换机

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/94c2530d2aa158e6f8b69749725703ec..png


·声明队列
·例如: 如果有⼀个名为 “hello” 的队列, 生产者可以直接发送消息到 “hello” 队列, 而消费者可以从
“hello” 队列中接收消息, 而不需要关心交换机的存在. 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是⼀对⼀的.
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0c94c83641a70843c636a6df14df134f..png



//4.声明队列
/**
* AMQP.Queue.DeclareOk queueDeclare
* (String var1, boolean var2, boolean var3, boolean var4, Map var5)
* throws IOException;
*
* 参数说明:
* var1 :队列名称(queue)
* var2 :可持久化 (durable)
* var3:是否独占(exclusive)
* var4:是否自动删除(autoDelete)
* var5: 参数
*/
channel.queueDeclare("hello",true,false,false,null);,>


  • 生产者完整代码
package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者
*/
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip 此时 rabbitmq 在云服务器上,我们通过本地去运行的,需要公网IP,需要提前开放端口号
connectionFactory.setHost("123.57.16.61");
//设置端口号 默认的端口号 5672
connectionFactory.setPort(5672);
//设置账号
connectionFactory.setUsername("study");
//设置密码
connectionFactory.setPassword("study");
//设置虚拟机
connectionFactory.setVirtualHost("bite");
//在这个工厂里面拿到一个连接
Connection connection = connectionFactory.newConnection();

//2.开启信道
Channel channel = connection.createChannel();

//3.生产者需要声明一个交换机
//rabbitmq 在创建的时候就有默认的交换机
//使用内置的交换机


//4.声明队列
/**
* AMQP.Queue.DeclareOk queueDeclare
* (String var1, boolean var2, boolean var3, boolean var4, Map var5)
* throws IOException;
*
* 参数说明:
* var1 :队列名称(queue)
* var2 :可持久化 (durable)
* var3:是否独占(exclusive)
* var4:是否自动删除(autoDelete)
* var5: 参数
*/
channel.queueDeclare("hello",true,false,false,null);

//交换机与队列之间的绑定关系
//内置交换机都有自己的绑定关系

//5.发送消息
//依然用 channel 进行发送
/**
* void basicPublish
* (String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
* throws IOException;
* 参数说明:
* var1:交换机的名称 (exchane)
* var2:内置交换机(routingKey)和队列名称保持一致,
* var3:属性配置
* var4:消息
*/
String mag = "hello rabbitmq~";
channel.basicPublish("","hello",null,mag.getBytes());

//6.资源的释放
channel.close();
connection.close();
}
},>

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b754c8a881ed9407e204af388ef106b9..png

  • 观察界面(发送信息成功)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d062066ecf3a6e199a50ffff7ecf9253..png

如果在代码中注掉资源释放的代码, 在Connections和Channels也可以看到相关信息

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9c46f10a4942776a92596f3eaf3a8e1a..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7797b0d0e23c030c8126bbb4d41e90dd..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c9ee80a8226d612d5fc8d3b1a861f194..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f9ad618687d2609b61dc140d5ad4cdbc..png


三、编写消费者代码

消费者代码和⽣产者前3步都是⼀样的, 第4步改为消费当前队列

  1. 创建连接
  2. 创建Channel
  3. 声明⼀个队列Queue
  4. 消费消息
  5. 释放资源

消费当前队列消费当前队列

basicConsume:

/*
2 basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue: 队列名称
2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
3. callback: 回调对象
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

Consumer:

Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了**Consumer** 接⼝的对象.

DefaultConsumer 是 **RabbitMQ**提供的⼀个默认消费者, 实现了Consumer 接⼝.

核心方法:


handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时, 会自动动调用该方法.
在这个方法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.
参数说明如下:
▪ consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
▪ envelope : 包含消息的封包信息,如队列名称, 交换机等.
▪ properties : ⼀些配置信息
▪ body : 消息的具体内容

//6. 接收消息, 并消费
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue: 队列名称
2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
3. callback: 回调对象
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
1. consumerTag: 标识
2. envelope: 获取⼀些信息, 交换机, 路由key
3. properties:配置信息
4. body:数据

*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume("hello", true, consumer);

运行代码, 观察结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/fe911df860c7785e567d52265fb425aa..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/742fa73df96126d01278a5ba34e5d5f1..png

  • 完整代码
package rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个工厂连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("123.57.16.61");//设置公网ip
connectionFactory.setPort(5672);
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
connectionFactory.setVirtualHost("bite");//设置虚拟机
Connection connection = connectionFactory.newConnection();

//创建一个通信
Channel channel = connection.createChannel();

//声明一个队列(可以省略)
channel.queueDeclare("hello",true,false,false,null);

//4. 消费消息
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数说明:
* queue: 队列名称
* autoAck: 是否自动确认
* callback: 接收到消息后, 执行的逻辑
*/
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};

channel.basicConsume("hello", true, consumer);
//等待程序执行完成
// Thread.sleep(2000);
//5. 释放资源
// channel.close();
// connection.close();
}
}


四、常见报错异常类型

1、关闭顺序调换

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/87abd3486de51e97ac95e8610cff5624..png


2、队列不存在

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/87e7379c71069ab67037ee2b2cacc0bc..png


3、端口号错误

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/37b087606effcfdc3eaff70348213a5a..png


4、检查账号密码是否错误

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9b3230069087f43ae1a658850fa76fa4..png


5、检查虚拟机

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3c3d6aea2f3f6904623dc58555a19651..png


五、小结

生产者-消费者模型是一种常见的消息队列应用场景,尤其在异步处理、解耦和提高系统可扩展性方面非常有效。使用 RabbitMQ 实现生产者-消费者模式可以让我们在不同的服务或系统之间解耦,提高系统的可靠性和性能。


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群