博客 Kafka针对用户做ACL权限控制

Kafka针对用户做ACL权限控制

   数栈君   发表于 2023-08-04 10:00  337  0

在 Kafka 3.3.1 中,可以使用 ACL(Access Control List)控制用户对 topic 的访问权限。以下是一些基本示例:

    1.创建一个名为 my-topic 的 topic

在命令行中执行以下命令创建一个名为 my-topic 的 topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

    1.设置 ACL

a. 在文件system-acls.properties或配置文件KafkaServer或者其他支持的外部ACL模块中设置ACLs规则,例如,我们可以通过修改/config/server.properties 配置文件来添加全局的访问控制规则:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

#定义默认ACL规则,允许所有人都可以对topic进行读写操作,这很不安全,请不要在生产环境中使用
super.users=User:admin
其中 SimpleAclAuthorizerr 是一种内置的 ACL 校验器,User:admin 是一组超级管理员。如果没有在配置文件中指定 super.users,则只有在 Zookeeper 上配置了访问控制时才会应用 ACL 规则。

b. 为特定用户或组添加 ACL 规则

假设我们要给组开发者(developers)授予对 my-topic 的读写权限,我们可以通过以下命令添加 ACL 规则:

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
现在,开发人员组可以对该 topic 进行读写操作。如果你需要撤销授权(删除 ACL),则可以使用以下命令:

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove \
--allow-principal User:developers --operation Read --operation Write --topic my-topic

    1.验证 ACL 规则

验证ACL规则的最简单方法是尝试读取或写入某个受保护的 topic。例如,在 java 中可以使用 KafkaProducer 和 KafkaConsumer API 来测试:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建 Kafka 生产者对象并发送消息
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
    Future<RecordMetadata> result = producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    System.out.println(result.get().toString());
}
producer.close();

// 创建 Kafka 消费者对象并订阅 Topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToBeginning(Collections.singletonList(tp));

// 读取消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群