在 Kafka 3.3.1 中,可以使用 ACL(Access Control List)控制用户对 topic 的访问权限。以下是一些基本示例:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
#定义默认ACL规则,允许所有人都可以对topic进行读写操作,这很不安全,请不要在生产环境中使用
super.users=User:admin其中 SimpleAclAuthorizerr 是一种内置的 ACL 校验器,User:admin 是一组超级管理员。如果没有在配置文件中指定 super.users,则只有在 Zookeeper 上配置了访问控制时才会应用 ACL 规则。
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:developers --operation Read --operation Write --topic my-topic现在,开发人员组可以对该 topic 进行读写操作。如果你需要撤销授权(删除 ACL),则可以使用以下命令:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者对象并发送消息
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
System.out.println(result.get().toString());
}
producer.close();
// 创建 Kafka 消费者对象并订阅 Topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToBeginning(Collections.singletonList(tp));
// 读取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack