一、前言 之前整理了kafka在windows下的安装过程,也通过shell命令进行了消息产生者和消息消费者的创建及消息发送,所以想到把kafka与最流行的SpringBoot的框架进行整合,与项目结合,进行消息的发送。
二、整合开始 1.SpringBoot工程搭建,此处不多讲,可以看之前的帖子,项目结构如下
2.在pom文件中加入kafka的依赖包
1 2 3 4 5 6 <dependency >           <groupId > org.springframework.kafka</groupId >            <artifactId > spring-kafka</artifactId >            <version > 1.1.1.RELEASE</version >        </dependency >  
 
3.在application中填写kafka相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #服务器端口 server.port=8088 #配置kafka #zk地址 kafka.consumer.zookeeper.connect=172.18.229.49:2181 #消费者服务提供配置 kafka.consumer.servers=172.18.229.49:9092 #是否自动提交 kafka.consumer.enable.auto.commit=true #超时时间 kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest #配置topics kafka.consumer.topic=yangTest kafka.consumer.group.id=yangTest kafka.consumer.concurrency=10 #消息提供者地址 kafka.producer.servers=172.18.229.49:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960 
 
4.在kafka包下创建消息提供者配置类KafkaProducerConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package  com.yang.kafka;import  java.util.HashMap;import  java.util.Map;import  org.apache.kafka.clients.producer.ProducerConfig;import  org.apache.kafka.common.serialization.StringSerializer;import  org.springframework.beans.factory.annotation.Value;import  org.springframework.context.annotation.Bean;import  org.springframework.context.annotation.Configuration;import  org.springframework.kafka.annotation.EnableKafka;import  org.springframework.kafka.core.DefaultKafkaProducerFactory;import  org.springframework.kafka.core.KafkaTemplate;import  org.springframework.kafka.core.ProducerFactory;@Configuration @EnableKafka public  class  KafkaProducerConfig   {    @Value ("${kafka.producer.servers}" )     private  String servers;     @Value ("${kafka.producer.retries}" )     private  int  retries;     @Value ("${kafka.producer.batch.size}" )     private  int  batchSize;     @Value ("${kafka.producer.linger}" )     private  int  linger;     @Value ("${kafka.producer.buffer.memory}" )     private  int  bufferMemory;          public  Map<String, Object> producerConfigs ()   {         Map<String, Object> props = new  HashMap<>();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);         props.put(ProducerConfig.RETRIES_CONFIG, retries);         props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);         props.put(ProducerConfig.LINGER_MS_CONFIG, linger);         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         return  props;     }          public  ProducerFactory<String, String> producerFactory ()   {         return  new  DefaultKafkaProducerFactory<>(producerConfigs());     }          @Bean      public  KafkaTemplate<String, String> kafkaTemplate ()   {     	         return  new  KafkaTemplate<String, String>(producerFactory());     } } 
 
5.在kafka包下创建消费者配置类KafkaConsumerConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package  com.yang.kafka;import  org.apache.kafka.clients.consumer.ConsumerConfig;import  org.apache.kafka.common.serialization.StringDeserializer;import  org.springframework.beans.factory.annotation.Value;import  org.springframework.context.annotation.Bean;import  org.springframework.context.annotation.Configuration;import  org.springframework.kafka.annotation.EnableKafka;import  org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import  org.springframework.kafka.config.KafkaListenerContainerFactory;import  org.springframework.kafka.core.ConsumerFactory;import  org.springframework.kafka.core.DefaultKafkaConsumerFactory;import  org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import  java.util.HashMap;import  java.util.Map;@Configuration @EnableKafka public  class  KafkaConsumerConfig   {    @Value ("${kafka.consumer.servers}" )     private  String servers;     @Value ("${kafka.consumer.enable.auto.commit}" )     private  boolean  enableAutoCommit;     @Value ("${kafka.consumer.session.timeout}" )     private  String sessionTimeout;     @Value ("${kafka.consumer.auto.commit.interval}" )     private  String autoCommitInterval;     @Value ("${kafka.consumer.group.id}" )     private  String groupId;     @Value ("${kafka.consumer.auto.offset.reset}" )     private  String autoOffsetReset;     @Value ("${kafka.consumer.concurrency}" )     private  int  concurrency;     @Bean      public  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {         ConcurrentKafkaListenerContainerFactory<String, String> factory = new  ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory());         factory.setConcurrency(concurrency);         factory.getContainerProperties().setPollTimeout(1500 );         return  factory;     }          public  ConsumerFactory<String, String> consumerFactory ()   {         return  new  DefaultKafkaConsumerFactory<>(consumerConfigs());     }          public  Map<String, Object> consumerConfigs ()   {         Map<String, Object> propsMap = new  HashMap<>();         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);         propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);         propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);         propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);         propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);         propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);         return  propsMap;     }          @Bean      public  Listener listener ()   {         return  new  Listener();     } } 
 
6.配置KafkaConsumerConfig中指定消息监听器Listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package  com.yang.kafka;import  org.apache.kafka.clients.consumer.ConsumerRecord;import  org.slf4j.Logger;import  org.slf4j.LoggerFactory;import  org.springframework.kafka.annotation.KafkaListener;public  class  Listener   {    protected  final  Logger logger = LoggerFactory.getLogger(this .getClass());          @KafkaListener (topics = {"yangTest" })      public  void  listen (ConsumerRecord<?, ?> record)   {         logger.info("kafka的key: "  + record.key());         logger.info("kafka的value: "  + record.value().toString());     } } 
 
7.在controller包创建一个controller,用来从浏览器输入消息源CollectController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package  com.yang.controller;import  org.slf4j.Logger;import  org.slf4j.LoggerFactory;import  org.springframework.beans.factory.annotation.Autowired;import  org.springframework.kafka.core.KafkaTemplate;import  org.springframework.web.bind.annotation.*;@RestController @RequestMapping ("/kafka" )public  class  CollectController   {    protected  final  Logger logger = LoggerFactory.getLogger(this .getClass());     @Autowired      private  KafkaTemplate<String, String> kafkaTemplate;     @RequestMapping (value = "/send" , method = RequestMethod.GET)     public  String sendKafka (@RequestParam("message" )  String message)  {         try  {             logger.info("kafka的消息={}" ,message);                          kafkaTemplate.send("yangTest" , "key" , message);             logger.info("发送kafka成功." );            return  "发送kafka成功" ;         } catch  (Exception e) {             logger.error("发送kafka失败" , e);             return  "发送kafka失败" ;         }     } } 
 
8.在boot包下创建启动类StartApplication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package  com.yang.boot;import  org.springframework.boot.SpringApplication;import  org.springframework.boot.autoconfigure.EnableAutoConfiguration;import  org.springframework.boot.autoconfigure.SpringBootApplication;import  org.springframework.context.annotation.ComponentScan;@SpringBootApplication  @EnableAutoConfiguration   @ComponentScan ("com.yang" ) public  class  StartApplication   {    public  static  void  main (String[] args)   {         SpringApplication.run(StartApplication.class, args);     } } 
 
9.在本地的kafka包下执行shell脚本,启动kafka服务
1 .\bin\windows\kafka-server-start.bat .\config\server.properties 
 
10.在启动类上右键启动服务,浏览器访问localhost:8088/kafka/send?message=hello kafka
11.观察控制台日志,查看日志输入,消息发送成功,Listener监听消息成功,此时kafka整合成功
1 2 3 4 2019 -01 -31  10 :27 :02 .656   INFO 5236  --- [nio-8088 -exec-4 ] com.yang .controller .CollectController     : kafka的消息=hello kafka2019 -01 -31  10 :27 :02 .657   INFO 5236  --- [nio-8088 -exec-4 ] com.yang .controller .CollectController     : 发送kafka成功.2019 -01 -31  10 :27 :02 .661   INFO 5236  --- [afka-consumer-1 ] com.yang .kafka .Listener                   : kafka的key: key2019 -01 -31  10 :27 :02 .661   INFO 5236  --- [afka-consumer-1 ] com.yang .kafka .Listener                   : kafka的value: hello kafka
 
三、下载地址及问题描述 1.在整合的过程中,如果出现超时异常,请检查zk客户端是否启动,kafka客户端是否已经启动 1 2 3 4 org.apache .kafka .common .errors .TimeoutException : Failed to update metadata after 60000  ms. 2019 -01 -31  10 :32 :28 .833   INFO 8448  --- [nio-8088 -exec-4 ] com.yang .controller .CollectController     : 发送kafka成功.2019 -01 -31  10 :32 :29 .030  ERROR 8448  --- [nio-8088 -exec-5 ] o.s .k .support .LoggingProducerListener     : Exception thrown when sending a message with  key='key' and  payload='he llo kafka' to topic yangTest:
 
2.可通过命令来查看当前客户端下已经存在的topics 1 ./kafka-topics.bat --list --zookeeper localhost:2181  
 
3.项目源码下载地址 点我下载