消息队列(Message Queue, MQ)是一种在分布式系统中用于异步通信的技术。Java中常用的消息队列包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。本文将详细介绍它们的基本概念、工作原理、应用场景以及使用代码示例,帮助您在实际开发中正确选择和使用消息队列。
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQProducer {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TestQueue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
session.close();
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQConsumer {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TestQueue");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
session.close();
connection.close();
}
}
RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)协议的消息代理系统,擅长处理复杂的路由、负载均衡等任务,特别适用于实时性要求较高的场景。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class RabbitMQProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
·高吞吐量:适合处理大规模数据传输。
·支持事务消息:可以实现分布式事务。import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("TestTopic", "Hello RocketMQ!".getBytes());
producer.send(message);
producer.shutdown();
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
Kafka 是 Apache 基金会开源的分布式消息队列系统,适用于大规模的数据流处理,尤其是在流计算、日志采集等场景广泛应用。
·分布式:具备分布式特性,支持集群部署。
·高吞吐量:可以处理每秒百万级的消息。import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("TestTopic", "Hello Kafka!"));
producer.close();
}
}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(Consumer
Config.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("TestTopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
本文详细介绍了四种主流消息队列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)及其特点和使用场景,并附上了丰富的代码示例,帮助您更好地理解和应用这些消息队列。希望本文能够为您的项目选择合适的消息队列提供参考。
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《行业指标体系白皮书》下载地址:
《数据治理行业实践白皮书》下载地址:
《数栈V6.0产品白皮书》下载地址:
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址: