侧边栏壁纸
博主头像
银河小徐博主等级

A Good Boy ⛵️⛵️⛵️

  • 累计撰写 42 篇文章
  • 累计创建 39 个标签
  • 累计收到 10 条评论

目 录CONTENT

文章目录

Java 操作 Kafka 动态扩展主题分区

银河小徐
2021-08-27 / 0 评论 / 10 点赞 / 60 阅读 / 6,400 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-06-04,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

引入Maven依赖

一定要注意 spring-kafka 与 kafka-clients 版本的兼容性。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

Kafka主题枚举类

package com.baidu.citool.common.kafka.bean;

import java.util.ArrayList;
import java.util.List;

/**
 * @author xiaoxuxuy
 * @date 2021/6/4 10:57 上午
 */
public enum KafkaTopicEnum {
    pipeline_scan("pipeline_scan"),
    pipeline_build("pipeline_build");

    private String name;

    KafkaTopicEnum(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public static List<String> toList() {
        List<String> list = new ArrayList<>();
        for (KafkaTopicEnum item : KafkaTopicEnum.values()) {
            list.add(item.name);
        }
        return list;
    }
}

Kafka配置属性类

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author xiaoxuxuy
 * @date 2021/6/4 10:39 上午
 */
@Data
@Component
public class KafkaConfigProperties {

    /**
     * kafka集群servers
     */
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * kafka分区数,分区个数不要超过集群连接个数
     */
    @Value("${kafka.topic.numPartitions}")
    private int numPartitions;

    /**
     * kafka副本数
     */
    @Value("${kafka.topic.replicationFactor}")
    private int replicationFactor;

    /**
     * kafka重试次数
     */
    @Value("${kafka.producer.retries}")
    private int retries;

    /**
     * kafka ack副本机制
     */
    @Value("${kafka.producer.acks}")
    private String acks;
}

创建单例对象管理生产者

import com.baidu.citool.common.kafka.bean.KafkaTopicEnum;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author xiaoxuxuy
 * @date 2021/6/4 10:55 上午
 */
@Slf4j
@Component
public class KafkaManager {

    @Autowired
    private KafkaConfigProperties kafkaConfigProperties;

    private static KafkaProducer<String, String> producer;

    public static AdminClient adminClient;

    @PostConstruct
    private void initKafkaManger() {
        try {
            // 创建kafka生产者的配置信息
            Properties properties = new Properties();
            // 指定连接的kafka集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
            // key和value序列化类
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 指定ack副本机制
            properties.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
            // 如果接受ack超时,重试的次数
            properties.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
            // 创建或者更新topics
            refreshTopics(properties);
            // 创建生产者对象
            producer = new KafkaProducer<>(properties);
        } catch (Exception e) {
            log.error("创建KafkaProducer失败", e);
        }
    }

    /**
     * 创建或者更新topics
     *
     * @param properties
     */
    private void refreshTopics(Properties properties) {
        // 创建操作客户端
        adminClient = KafkaAdminClient.create(properties);
        // 检查心主题和分区
        adminClient.listTopics().names()
                .whenComplete((topics, throwable) -> {
                    List<String> newTopics = KafkaTopicEnum.toList();
                    log.info("当前已有主题:{}", topics);
                    newTopics.removeIf(str -> !topics.isEmpty() && topics.contains(str));
                    if (!newTopics.isEmpty()) {
                        log.info("创建新的主题:{}", newTopics);
                        List<NewTopic> newTopicList = new ArrayList<>();
                        for (String ntp : newTopics) {
                            NewTopic newTopic = new NewTopic(ntp, kafkaConfigProperties.getNumPartitions(), (short) kafkaConfigProperties.getReplicationFactor());
                            newTopicList.add(newTopic);
                        }
                        adminClient.createTopics(newTopicList);
                    }
                    // 已存在的topic检查分区
                    adminClient.describeTopics(topics).all().whenComplete((descriptionMap, throwable1) -> {
                        for (Map.Entry<String, TopicDescription> entry : descriptionMap.entrySet()) {
                            List<TopicPartitionInfo> tps = entry.getValue().partitions();
                            log.info("主题:{},描述:{}", entry.getKey(), tps);
                            log.info("当前主题分区数:{},即将修改的分区数:{}", tps.size(), kafkaConfigProperties.getNumPartitions());
                            List<String> allTopics = KafkaTopicEnum.toList();
                            log.info("所有主题:{}", allTopics);
                            if (tps.size() < kafkaConfigProperties.getNumPartitions() && allTopics.contains(entry.getKey())) {
                                log.info("主题扩展分区:{}", entry.getKey());
                                // 注意创建方式
                                NewPartitions newPartitions = NewPartitions.increaseTo(kafkaConfigProperties.getNumPartitions());
                                Map<String, NewPartitions> partitionsMap = new HashMap<>();
                                partitionsMap.put(entry.getKey(), newPartitions);
                                adminClient.createPartitions(partitionsMap);
                            }
                        }
                    });
                });
    }

    /**
     * 传入kafka约定的topicName,json格式字符串,发送给kafka集群
     *
     * @param topicName
     * @param jsonMessage
     */
    public static void sendMessage(String topicName, String jsonMessage) {
        producer.send(new ProducerRecord<>(topicName, jsonMessage));
    }

    /**
     * 传入kafka约定的topicName,json格式字符串数组,发送给kafka集群
     * 用于批量发送消息,性能较高。
     *
     * @param topicName
     * @param jsonMessages
     * @throws InterruptedException
     */
    public static void sendMessage(String topicName, String... jsonMessages) throws InterruptedException {
        for (String jsonMessage : jsonMessages) {
            producer.send(new ProducerRecord<>(topicName, jsonMessage));
        }
    }

    /**
     * 传入kafka约定的topicName,Map集合,内部转为json发送给kafka集群
     * 用于批量发送消息,性能较高。
     *
     * @param topicName
     * @param mapMessageToJSONForArray
     */
    public static void sendMessage(String topicName, List<Map<Object, Object>> mapMessageToJSONForArray) {
        for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
            String array = JSONObject.fromObject(mapMessageToJSON).toString();
            producer.send(new ProducerRecord<>(topicName, array));
        }
    }

    /**
     * 传入kafka约定的topicName,Map,内部转为json发送给kafka集群
     *
     * @param topicName
     * @param mapMessageToJSON
     */
    public static void sendMessage(String topicName, Map<Object, Object> mapMessageToJSON) {
        String array = JSONObject.fromObject(mapMessageToJSON).toString();
        producer.send(new ProducerRecord<>(topicName, array));
    }
}
10

评论区