博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka多线程
阅读量:3520 次
发布时间:2019-05-20

本文共 5476 字,大约阅读时间需要 18 分钟。

单线程问题主要出现在导入包上,多线程主要注意处理方法

package com.zkdj.kafka.common.config;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.ContainerProperties;import com.zkdj.kafka.common.config.Constants.Pks;/** * @Description: kafka配置类 * @Author:SXJ */@Configurationpublic class KafKaConfig {	protected static final ConfigManager config = ConfigManager.getInstance();             /**     * @Description: 生产者的配置     * @Author:SXJ     * @return     */    public Map
producerConfigs() { Map
props = new HashMap
(); // 集群的服务器地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(Pks.KAFKA_SERVERS)); // 消息缓存 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 生产者空间不足时,send()被阻塞的时间,默认60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 生产者重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 0); // 指定ProducerBatch(消息累加器中BufferPool中的)可复用大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // key 和 value 的序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 客户端id props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo"); return props; } /** * @Description: 生产者工厂 * @Author:SXJ * @return */ @Bean public ProducerFactory
producerFactory() { return new DefaultKafkaProducerFactory
(producerConfigs()); } /** * @Description: KafkaTemplate * @Author:SXJ * @return */ @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate
(producerFactory()); } // ------------------------------------------------------------------------------------------------------------ /** * @Description: 消费者配置 * @Author:SXJ * @return */ public Map
consumerConfigs() { Map
props = new HashMap
(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(Pks.KAFKA_SERVERS)); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, config.get(Pks.KAFKA_CONSUMER_GROUP_ID)); // 自动位移提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 自动位移提交间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); // 消费组失效超时时间 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 位移丢失和位移越界后的恢复起始位置 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // key 和 value 的反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); return props; } /** * @Description: 消费者工厂 * @Author:SXJ * @return */ @Bean public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * @Description: kafka 监听容器工厂 * @Author:SXJ * @return */ @Bean public KafkaListenerContainerFactory
>kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 要创建的消费者数量(10 个线程并发处理) factory.setConcurrency(4); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 关闭ack自动提交偏移 return factory; }}
package com.zkdj.kafka.controller.kafka;import java.util.List;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;/** * @Description: kafka消费者 * @Author:sxj */@Componentpublic class KafkaConsumer {    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);        /**     * @Description: 可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开     * @param record     */    @KafkaListener(topics = { "topinfo" })    public void receive(List
> records, Acknowledgment ack) { logger.info("此线程消费"+records.size()+"条消息----线程名:"+Thread.currentThread().getName()); records.forEach(record -> System.out.println("topic名称:"+record.topic()+"\n"+"分区位置:"+record.partition()+"\n"+"key:"+record.key()+"\n"+"偏移量:"+record.offset()+"\n"+"消息内容:"+record.value())); ack.acknowledge(); }}

其余部分参考我的另一篇博文

此处只写多线程修改地方!

转载地址:http://pyrqj.baihongyu.com/

你可能感兴趣的文章
Eclipse如何设置自动提示?
查看>>
2021-06-09数据库添加多条数据
查看>>
简单的JAVA小作品
查看>>
一些方便的遍历方法
查看>>
CMake下载
查看>>
未调用fflush产生的图片文件无法打开问题
查看>>
SQL 约束(二)
查看>>
SQL ALTER用法(三)
查看>>
SQL where子句及查询条件语句(六)
查看>>
SQL 连接JOIN(九)
查看>>
linux VM虚拟机可以ping通主机,但主机无法ping通虚拟机
查看>>
linux 错误码
查看>>
C++ 中Struct与typedef struct总结
查看>>
WNetAddConnection2调用失败,错误码1200/1312
查看>>
POI读写Excel的基本使用
查看>>
淘宝网站的架构演进
查看>>
设置zookeeper开机自启动流程
查看>>
CentOS安装mysql5.7的教详细流程
查看>>
项目整合微信扫码登录功能
查看>>
分布式文件系统FastDfs的搭建
查看>>