博客 【项目实战】Kafka 生产者写入分区的策略

【项目实战】Kafka 生产者写入分区的策略

   数栈君   发表于 2023-07-31 11:32  213  0

1、生产者写入分区的策略有哪些?

生产者写入分区的策略主要有以下几种:

    1.轮询分区策略:生产者可以使用轮询策略将消息依次写入每个分区,实现负载均衡。在每次发送消息时,生产者会按照轮询的方式选择下一个可用的分区,并将消息写入该分区。这样可以确保消息均匀地分布在各个分区中。

    2.随机分区策略:Kafka生产者随机的将消息写入分区,有可能会造成消息的分布不均,所以这个策略基本上也很少用。

    3.按 key 分区策略:Kafka生产者基于消息的键(key)进行哈希计算,然后将消息写入对应的分区。这种策略可以保证具有相同键的消息被写入到相同的分区,从而保证消息的顺序性。

    4.自定义分区策略:Kafka生产者可以使用自定义分区策略来决定将消息写入哪个分区。

2、轮询分区策略

轮询分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class RoundRobinPartitioner implements Partitioner {

    private int currentPartition;

    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化当前分区索引
        currentPartition = 0;
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
            // 轮询选择下一个分区
        int selectedPartition = currentPartition;
        currentPartition = (currentPartition + 1) % numPartitions;
            return selectedPartition;
    }

    @Override
    public void close() {
        // 可选:清理资源
    }
}

partition 方法会使用一个变量 currentPartition 来记录当前选择的分区索引。每次调用 partition 方法时,会将 currentPartition 增加 1,并通过取模运算来确保选择的分区索引始终在分区数范围内。

要使用轮询分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");

3、随机分区策略

随机分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
  import java.util.List;
import java.util.Map;
import java.util.Random;

public class RandomPartitioner implements Partitioner {

    private final Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

partition 方法会随机选择一个分区返回。 random.nextInt(numPartitions) 方法会生成一个小于分区数的随机数,作为分区的索引。

要使用随机分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");

4、按 key 分区策略

按 key 分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
  import java.util.List;
import java.util.Map;

public class KeyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
            if (keyBytes == null) {
                // 如果 key 为 null,则使用轮询分区策略
                return Math.abs(key.hashCode()) % numPartitions;
        } else {
                // 使用 key 的哈希码来确定分区
                return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {
        // 可选:清理资源
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置方法
    }
}

partition 方法会检查 key 是否为 null。如果 key 为 null,就会使用轮询分区策略,通过计算 key 的哈希码并对分区数取模来确定分区。如果 key 不为 null,则使用 key 的字节数组的哈希码来确定分区。

要使用基于 key 的分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");

5、自定义分区策略

自定义分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
            // 自定义分区逻辑
            // 根据消息的 key 或 value 来选择分区
            // 这里以 key 的哈希值作为分区选择依据
        int partition = Math.abs(key.hashCode()) % numPartitions;
            return partition;
    }

    @Override
    public void close() {
        // 可选:清理资源
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置分区器
    }
}

partition 方法根据消息的 key 或 value 来选择分区。这里使用 key 的哈希值进行取模运算,以确保选择的分区索引在分区数范围内。

要使用自定义分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");

写在最后

通过y以上这些实现,生产者将根据自定义的分区策略来选择分区来发送消息。您可以根据自己的需求,实现不同的分区逻辑。

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群