博客 Java中消息队列——ActiveMQ、RabbitMQ、RocketMQ、Kafka

Java中消息队列——ActiveMQ、RabbitMQ、RocketMQ、Kafka

   数栈君   发表于 2025-01-20 14:12  206  0

消息队列(Message Queue, MQ)是一种在分布式系统中用于异步通信的技术。Java中常用的消息队列包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。本文将详细介绍它们的基本概念、工作原理、应用场景以及使用代码示例,帮助您在实际开发中正确选择和使用消息队列。

1. ActiveMQ

ActiveMQ 是 Apache 软件基金会提供的一个开源消息队列实现,支持 JMS(Java Message Service)规范。ActiveMQ 支持多种协议,适用于 Java 应用程序之间的异步通信和数据传输。

1.1 ActiveMQ 的特点

·JMS 支持:遵循 JMS 规范,支持点对点(Queue)和发布/订阅(Topic)模型。
·协议支持广泛:支持 AMQP、STOMP、MQTT 等多种协议。
·高可用:支持主备切换、持久化、分布式集群。

1.2 ActiveMQ 示例代码

使用 Java 连接 ActiveMQ 进行基本的消息生产和消费。

1.2.1 ActiveMQ 生产者示例

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQProducer {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TestQueue");

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);

session.close();
connection.close();
}
}

1.2.2 ActiveMQ 消费者示例

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQConsumer {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TestQueue");

MessageConsumer consumer = session.createConsumer(destination);

Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}

session.close();
connection.close();
}
}


2. RabbitMQ

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)协议的消息代理系统,擅长处理复杂的路由、负载均衡等任务,特别适用于实时性要求较高的场景。

2.1 RabbitMQ 的特点

·灵活的路由机制:支持多种路由模式(Direct、Fanout、Topic、Headers)。
·消息确认机制:支持消息持久化,确保消息不丢失。
·支持多语言客户端:不仅限于 Java,也支持 Python、Ruby 等。

2.2 RabbitMQ 示例代码

使用 Java 连接 RabbitMQ 进行基本的消息生产和消费。

2.2.1 RabbitMQ 生产者示例

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMQProducer {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}

2.2.2 RabbitMQ 消费者示例

import com.rabbitmq.client.*;

public class RabbitMQConsumer {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}


3. RocketMQ

RocketMQ 是阿里巴巴开源的分布式消息队列系统,主要用于高并发、低延迟的场景,适合处理金融级的消息数据流。

3.1 RocketMQ 的特点

·高吞吐量:适合处理大规模数据传输。

·支持事务消息:可以实现分布式事务。
·水平扩展:支持多种分布式部署模式。

3.2 RocketMQ 示例代码

使用 Java 连接 RocketMQ 进行基本的消息生产和消费。

3.2.1 RocketMQ 生产者示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class RocketMQProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message message = new Message("TestTopic", "Hello RocketMQ!".getBytes());
producer.send(message);

producer.shutdown();
}
}

3.2.2 RocketMQ 消费者示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
}
}

4. Kafka

Kafka 是 Apache 基金会开源的分布式消息队列系统,适用于大规模的数据流处理,尤其是在流计算、日志采集等场景广泛应用。

4.1 Kafka 的特点

·分布式:具备分布式特性,支持集群部署。

·高吞吐量:可以处理每秒百万级的消息。
·持久化存储:消息持久化到磁盘,确保数据的可靠性。

4.2 Kafka 示例代码

使用 Java 连接 Kafka 进行基本的消息生产和消费。

4.2.1 Kafka 生产者示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("TestTopic", "Hello Kafka!"));
producer.close();
}
}


4.2.2 Kafka 消费者示例

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(Consumer

Config.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("TestTopic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}

5. 总结

本文详细介绍了四种主流消息队列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)及其特点和使用场景,并附上了丰富的代码示例,帮助您更好地理解和应用这些消息队列。希望本文能够为您的项目选择合适的消息队列提供参考。


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:


0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群