引入Maven依赖
一定要注意 spring-kafka 与 kafka-clients 版本的兼容性。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
Kafka主题枚举类
package com.baidu.citool.common.kafka.bean;
import java.util.ArrayList;
import java.util.List;
/**
* @author xiaoxuxuy
* @date 2021/6/4 10:57 上午
*/
public enum KafkaTopicEnum {
pipeline_scan("pipeline_scan"),
pipeline_build("pipeline_build");
private String name;
KafkaTopicEnum(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static List<String> toList() {
List<String> list = new ArrayList<>();
for (KafkaTopicEnum item : KafkaTopicEnum.values()) {
list.add(item.name);
}
return list;
}
}
Kafka配置属性类
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author xiaoxuxuy
* @date 2021/6/4 10:39 上午
*/
@Data
@Component
public class KafkaConfigProperties {
/**
* kafka集群servers
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* kafka分区数,分区个数不要超过集群连接个数
*/
@Value("${kafka.topic.numPartitions}")
private int numPartitions;
/**
* kafka副本数
*/
@Value("${kafka.topic.replicationFactor}")
private int replicationFactor;
/**
* kafka重试次数
*/
@Value("${kafka.producer.retries}")
private int retries;
/**
* kafka ack副本机制
*/
@Value("${kafka.producer.acks}")
private String acks;
}
创建单例对象管理生产者
import com.baidu.citool.common.kafka.bean.KafkaTopicEnum;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @author xiaoxuxuy
* @date 2021/6/4 10:55 上午
*/
@Slf4j
@Component
public class KafkaManager {
@Autowired
private KafkaConfigProperties kafkaConfigProperties;
private static KafkaProducer<String, String> producer;
public static AdminClient adminClient;
@PostConstruct
private void initKafkaManger() {
try {
// 创建kafka生产者的配置信息
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
// key和value序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定ack副本机制
properties.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
// 如果接受ack超时,重试的次数
properties.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
// 创建或者更新topics
refreshTopics(properties);
// 创建生产者对象
producer = new KafkaProducer<>(properties);
} catch (Exception e) {
log.error("创建KafkaProducer失败", e);
}
}
/**
* 创建或者更新topics
*
* @param properties
*/
private void refreshTopics(Properties properties) {
// 创建操作客户端
adminClient = KafkaAdminClient.create(properties);
// 检查心主题和分区
adminClient.listTopics().names()
.whenComplete((topics, throwable) -> {
List<String> newTopics = KafkaTopicEnum.toList();
log.info("当前已有主题:{}", topics);
newTopics.removeIf(str -> !topics.isEmpty() && topics.contains(str));
if (!newTopics.isEmpty()) {
log.info("创建新的主题:{}", newTopics);
List<NewTopic> newTopicList = new ArrayList<>();
for (String ntp : newTopics) {
NewTopic newTopic = new NewTopic(ntp, kafkaConfigProperties.getNumPartitions(), (short) kafkaConfigProperties.getReplicationFactor());
newTopicList.add(newTopic);
}
adminClient.createTopics(newTopicList);
}
// 已存在的topic检查分区
adminClient.describeTopics(topics).all().whenComplete((descriptionMap, throwable1) -> {
for (Map.Entry<String, TopicDescription> entry : descriptionMap.entrySet()) {
List<TopicPartitionInfo> tps = entry.getValue().partitions();
log.info("主题:{},描述:{}", entry.getKey(), tps);
log.info("当前主题分区数:{},即将修改的分区数:{}", tps.size(), kafkaConfigProperties.getNumPartitions());
List<String> allTopics = KafkaTopicEnum.toList();
log.info("所有主题:{}", allTopics);
if (tps.size() < kafkaConfigProperties.getNumPartitions() && allTopics.contains(entry.getKey())) {
log.info("主题扩展分区:{}", entry.getKey());
// 注意创建方式
NewPartitions newPartitions = NewPartitions.increaseTo(kafkaConfigProperties.getNumPartitions());
Map<String, NewPartitions> partitionsMap = new HashMap<>();
partitionsMap.put(entry.getKey(), newPartitions);
adminClient.createPartitions(partitionsMap);
}
}
});
});
}
/**
* 传入kafka约定的topicName,json格式字符串,发送给kafka集群
*
* @param topicName
* @param jsonMessage
*/
public static void sendMessage(String topicName, String jsonMessage) {
producer.send(new ProducerRecord<>(topicName, jsonMessage));
}
/**
* 传入kafka约定的topicName,json格式字符串数组,发送给kafka集群
* 用于批量发送消息,性能较高。
*
* @param topicName
* @param jsonMessages
* @throws InterruptedException
*/
public static void sendMessage(String topicName, String... jsonMessages) throws InterruptedException {
for (String jsonMessage : jsonMessages) {
producer.send(new ProducerRecord<>(topicName, jsonMessage));
}
}
/**
* 传入kafka约定的topicName,Map集合,内部转为json发送给kafka集群
* 用于批量发送消息,性能较高。
*
* @param topicName
* @param mapMessageToJSONForArray
*/
public static void sendMessage(String topicName, List<Map<Object, Object>> mapMessageToJSONForArray) {
for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<>(topicName, array));
}
}
/**
* 传入kafka约定的topicName,Map,内部转为json发送给kafka集群
*
* @param topicName
* @param mapMessageToJSON
*/
public static void sendMessage(String topicName, Map<Object, Object> mapMessageToJSON) {
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<>(topicName, array));
}
}
评论区