●镜像集群
●Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息。
RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),知道节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接使用磁盘节点。
docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq
docker run -d --hostname rabbit02 --name mq02 -p 5672:5672 -p 15672:15672 --link mq01:mylink01 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq
docker run -d --hostname rabbit03 --name mq03 -p 5673:5672 -p 15673:15672 --link mq01:mylink02 --link mq02:mylink03 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq
docker exec -it mq02 /bin/bash
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app
docker exec -it mq03 /bin/bash
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmq-plugins enable rabbitmq_management
docker exec -it mq01 /bin/bash # 进入容器
cd /etc/rabbitmq/conf.d/ # 进入容器内该目录下
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
exit # 退出当前容器
docker restart mq01 # 重启容器
spring:
rabbitmq:
addresses: IP地址:5671,IP地址:5672,IP地址:5673
username: guest
password: guest
@Configuration
public class RabbitConfig {
public static final String CLUSTER_EXCHANGE_NAME = "cluster_exchange_name";
public static final String CLUSTER_QUEUE_NAME = "cluster_queue_name";
public static final String CLUSTER_ROUTING_KEY = "cluster_routing_key";
@Bean
public Queue msgQueue(){
return new Queue(CLUSTER_QUEUE_NAME, true, false, false, null);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(CLUSTER_EXCHANGE_NAME, true, false);
}
@Bean
public Binding queueBindingExchange(@Qualifier("msgQueue") Queue msgQueue,
@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(msgQueue).to(directExchange).with(CLUSTER_ROUTING_KEY);
}
}
@SpringBootTest
class RabbitmqClusterApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitConfig.CLUSTER_EXCHANGE_NAME, RabbitConfig.CLUSTER_ROUTING_KEY, "你好");
}
}
@Component
public class RabbitConsumer {
@RabbitListener(queues = RabbitConfig.CLUSTER_QUEUE_NAME)
public void receiveMsg(Message msg){
System.out.println("消息:" + new String(msg.getBody()));
}
}
docker exec -it mq01 /bin/bash
rabbitmqctl stop_app
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@rabbit01' of durable queue 'cluster_queue_name' in vhost '/' is down or inaccessible, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.14.2.jar:5.14.2]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.14.2.jar:5.14.2]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.14.2.jar:5.14.2]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.14.2.jar:5.14.2]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.14.2.jar:5.14.2]
... 20 common frames omitted
●Pattern:queue 的匹配模式(正则表达式)
●Definition:镜像定义,主要由三个参数:ha-mode,ha-params,ha-sync-mode。
- ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
- ha-params:ha-mode 模式需要用到的参数
- ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
●priority 为可选参数,表示 policy 的优先级
配置完成后,点击下面的 add/update policy 按钮,完成策略的添加,如下:
添加完成后,进行一个简单测试:
首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息。发送完成之后关闭 mq01 实例。接下来启动 consumer,此时发现 consumer 可以完成消息的消费(注意和前面的反向测试区分),这就说明镜像队列已经搭建成功了。
rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack