博客 RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

RabbitMQ 工作模式使用案例之(发布订阅模式、路由模式、通配符模式)

   数栈君   发表于 2025-01-08 17:37  188  0

一、Publish/Subscribe(发布/订阅)

在发布/订阅模型中,多了一个Exchange角色.
Exchange 常见有三种类型, 分别代表不同的路由规则
a) Fanout:广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe模式)
b) Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
c) Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)
也就分别对应不同的工作模式


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1a1f15b56b1f18a43a918efee199066a..png

1、引入依赖

<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>


2、编写配置类

package rabbitmq.constant;

public class Constants {
public static final String HOST = "123.57.16.61";
public static final Integer PORT = 5672;
public static final String USERNAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "bite";
//发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
}


3、编写生产者代码

package rabbitmq.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
/**
* 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
*/
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//声明队列
//queueDeclare 队列声明
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

//交换机和队列进行绑定
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
//发布消息
String msg = "hello fanout...";
//basicPublish (基础发布)
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
System.out.println("消息发送成功!!!");

//关闭资源
channel.close();
connection.close();
}
}

点击运行:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/99030effbdaae8c95bf32e759a2bebff..png


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7abd7fb031480fd139454fa96b72a5c5..png



http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/2441e8451bc8ed2963115da01d1aecca..png


4、编写消费者代码

消费者1:

package rabbitmq.fanout;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);

}
}


消费者2:

package rabbitmq.fanout;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2,true,consumer);

}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/eb1b920d90471f5b6da9a5e131790fb2..png


二、Routing (路由模式)

队列和交换机的绑定, 不能是任意的绑定了, 而是要指定⼀个BindingKey(RoutingKey的⼀种)
消息的发送方在向 Exchange 发送消息时, 也需要指定消息的 RoutingKey
Exchange也不再把消息交给每⼀个绑定的key, 而是根据消息的RoutingKey进行判断, 只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致, 才会接收到消息

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/774b93ad769b34008302c358f04fe8dd..png


1、引入依赖

<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>


2、编写配置类

package rabbitmq.constant;

public class Constants {
public static final String HOST = "123.57.16.61";
public static final Integer PORT = 5672;
public static final String USERNAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "bite";

//路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
}


3、编写生产者代码

package rabbitmq.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
/**
* 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
*/
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//声明队列
//queueDeclare 队列声明
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

//交换机和队列进行绑定
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");


//发布消息
String msg_a = "hello direct, my routingkey is a...";
//basicPublish (基础发布)
channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_a.getBytes());

String msg_b = "hello direct, my routingkey is b...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_b.getBytes());

String msg_c = "hello direct, my routingkey is c...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "", null, msg_c.getBytes());

System.out.println("消息发送成功!!!");

//关闭资源
channel.close();
connection.close();
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8ae6d92ba8ae49d785bea3a4a40af654..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a374d28ae3daf1569cc89b7435283c8e..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/30c2459ba235b66893b31d84034f945c..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c27366cf7a90401ec545721c5cdda8d1..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/970de70766b518045d6b6f727bc2e178..png

4、编写消费者代码

消费者1:
package rabbitmq.direct;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/49860b91565a7a5123f693abbf039dcb..png


消费者2:

package rabbitmq.direct;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f1a1260893b542b7b66ba61895074b19..png



三、Topics (通配符模式)

Topics 和Routing模式的区别是:

1.topics 模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)
2.topic 类型的交换机在匹配规则上进行了扩展, Binding Key⽀持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5491a5255dd0a300e5676a78a90f8614..png


在topic类型的交换机在匹配规则上, 有些要求:

1.RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw ",
" quick.orange.rabbit "
2.BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.
3.Binding Key中可以存在两种特殊字符串, 用于模糊匹配

* 表⽰⼀个单词 # 表⽰多个单词(0-N个)

比如:

• Binding Key 为"d.a.b" 会同时路由到Q1 和Q2
• Binding Key 为"d.a.f" 会路由到Q1
• Binding Key 为"c.e.f" 会路由到Q2
• Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)

1、引入依赖

<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>


2、编写配置类

package rabbitmq.constant;

public class Constants {
public static final String HOST = "123.57.16.61";
public static final Integer PORT = 5672;
public static final String USERNAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "bite";

//通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";

3、编写生产者代码

package rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
/**
* 交换机名称,交换机类型,开启可持久化(关机数据不会丢失)
*/
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
//声明队列
//queueDeclare 队列声明
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

//交换机和队列进行绑定
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");


//发布消息
String msg_a = "hello topic, my routingkey is ae.a.f...";
//basicPublish (基础发布)
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());

String msg_b = "hello topic, my routingkey is ef.a.b...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());

String msg_c = "hello topic, my routingkey is c.ef.b...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.b", null, msg_c.getBytes());

System.out.println("消息发送成功!!!");

//关闭资源
channel.close();
connection.close();
}
}


4、编写消费者代码

消费者1:

package rabbitmq.topic;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
}
}

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/491e2fd024001923ecb8584e43f6fc8f..png

消费者2:

package rabbitmq.topic;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USERNAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//声明队列
//queueDeclare 队列声明 (也可以省略)
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2,true,consumer);
}
}


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/cde0b45d4c79e01cfd9962bbe9e49897..png


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack


0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群