spring kafka如何设置分区数

作者:袖梨 2026-06-15

在Spring Kafka中,要设置分区数,您需要在创建KafkaListenerContainerFactory时配置分区数。以下是一个示例,展示了如何在Java配置类中设置分区数:

spring kafka怎样设置分区数

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListenerConfigurer;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;import org.springframework.kafka.config.MethodKafkaListenerEndpoint;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistry;import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig implements KafkaListenerConfigurer {@Value("${kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量return factory;}public ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.PARTITION_COUNT, 5); // 设置分区数return new DefaultKafkaConsumerFactory<>(props);}@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {MethodKafkaListenerEndpointRegistry registry = new MethodKafkaListenerEndpointRegistry();registrar.setEndpoints(registry.getEndpoints());}}

在这个示例中,我们通过设置PARTITION_COUNT属性来配置分区数。请注意,这个属性应该与您的Kafka主题的分区数相匹配。如果您不知道当前主题的分区数,可以使用Kafka命令行工具或管理界面来查看。

相关文章

精彩推荐