一、前言 之前整理了kafka在windows下的安装过程,也通过shell命令进行了消息产生者和消息消费者的创建及消息发送,所以想到把kafka与最流行的SpringBoot的框架进行整合,与项目结合,进行消息的发送。
二、整合开始 1.SpringBoot工程搭建,此处不多讲,可以看之前的帖子,项目结构如下
2.在pom文件中加入kafka的依赖包
1 2 3 4 5 6 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > <version > 1.1.1.RELEASE</version > </dependency >
3.在application中填写kafka相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #服务器端口 server.port=8088 #配置kafka #zk地址 kafka.consumer.zookeeper.connect=172.18.229.49:2181 #消费者服务提供配置 kafka.consumer.servers=172.18.229.49:9092 #是否自动提交 kafka.consumer.enable.auto.commit=true #超时时间 kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest #配置topics kafka.consumer.topic=yangTest kafka.consumer.group.id=yangTest kafka.consumer.concurrency=10 #消息提供者地址 kafka.producer.servers=172.18.229.49:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960
4.在kafka包下创建消息提供者配置类KafkaProducerConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.yang.kafka;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;@Configuration @EnableKafka public class KafkaProducerConfig { @Value ("${kafka.producer.servers}" ) private String servers; @Value ("${kafka.producer.retries}" ) private int retries; @Value ("${kafka.producer.batch.size}" ) private int batchSize; @Value ("${kafka.producer.linger}" ) private int linger; @Value ("${kafka.producer.buffer.memory}" ) private int bufferMemory; public Map<String, Object> producerConfigs () { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory () { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate () { return new KafkaTemplate<String, String>(producerFactory()); } }
5.在kafka包下创建消费者配置类KafkaConsumerConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package com.yang.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configuration @EnableKafka public class KafkaConsumerConfig { @Value ("${kafka.consumer.servers}" ) private String servers; @Value ("${kafka.consumer.enable.auto.commit}" ) private boolean enableAutoCommit; @Value ("${kafka.consumer.session.timeout}" ) private String sessionTimeout; @Value ("${kafka.consumer.auto.commit.interval}" ) private String autoCommitInterval; @Value ("${kafka.consumer.group.id}" ) private String groupId; @Value ("${kafka.consumer.auto.offset.reset}" ) private String autoOffsetReset; @Value ("${kafka.consumer.concurrency}" ) private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500 ); return factory; } public ConsumerFactory<String, String> consumerFactory () { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs () { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener () { return new Listener(); } }
6.配置KafkaConsumerConfig中指定消息监听器Listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.yang.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;public class Listener { protected final Logger logger = LoggerFactory.getLogger(this .getClass()); @KafkaListener (topics = {"yangTest" }) public void listen (ConsumerRecord<?, ?> record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); } }
7.在controller包创建一个controller,用来从浏览器输入消息源CollectController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.yang.controller;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.*;@RestController @RequestMapping ("/kafka" )public class CollectController { protected final Logger logger = LoggerFactory.getLogger(this .getClass()); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping (value = "/send" , method = RequestMethod.GET) public String sendKafka (@RequestParam("message" ) String message) { try { logger.info("kafka的消息={}" ,message); kafkaTemplate.send("yangTest" , "key" , message); logger.info("发送kafka成功." ); return "发送kafka成功" ; } catch (Exception e) { logger.error("发送kafka失败" , e); return "发送kafka失败" ; } } }
8.在boot包下创建启动类StartApplication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.yang.boot;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.ComponentScan;@SpringBootApplication @EnableAutoConfiguration @ComponentScan ("com.yang" ) public class StartApplication { public static void main (String[] args) { SpringApplication.run(StartApplication.class, args); } }
9.在本地的kafka包下执行shell脚本,启动kafka服务
1 .\bin\windows\kafka-server-start.bat .\config\server.properties
10.在启动类上右键启动服务,浏览器访问localhost:8088/kafka/send?message=hello kafka
11.观察控制台日志,查看日志输入,消息发送成功,Listener监听消息成功,此时kafka整合成功
1 2 3 4 2019 -01 -31 10 :27 :02 .656 INFO 5236 --- [nio-8088 -exec-4 ] com.yang .controller .CollectController : kafka的消息=hello kafka2019 -01 -31 10 :27 :02 .657 INFO 5236 --- [nio-8088 -exec-4 ] com.yang .controller .CollectController : 发送kafka成功.2019 -01 -31 10 :27 :02 .661 INFO 5236 --- [afka-consumer-1 ] com.yang .kafka .Listener : kafka的key: key2019 -01 -31 10 :27 :02 .661 INFO 5236 --- [afka-consumer-1 ] com.yang .kafka .Listener : kafka的value: hello kafka
三、下载地址及问题描述 1.在整合的过程中,如果出现超时异常,请检查zk客户端是否启动,kafka客户端是否已经启动 1 2 3 4 org.apache .kafka .common .errors .TimeoutException : Failed to update metadata after 60000 ms. 2019 -01 -31 10 :32 :28 .833 INFO 8448 --- [nio-8088 -exec-4 ] com.yang .controller .CollectController : 发送kafka成功.2019 -01 -31 10 :32 :29 .030 ERROR 8448 --- [nio-8088 -exec-5 ] o.s .k .support .LoggingProducerListener : Exception thrown when sending a message with key='key' and payload='he llo kafka' to topic yangTest:
2.可通过命令来查看当前客户端下已经存在的topics 1 ./kafka-topics.bat --list --zookeeper localhost:2181
3.项目源码下载地址 点我下载