在介绍整合之前,首先先来介绍如何安装和搭建Kafka的开发环境。
安装Kafka
在安装Kafka环境之前,首先需要安装JDK的环境,如下所示,我们以OpenJDK17为例进行安装。
wget https://github.com/openjdk/jdk/archive/refs/tags/jdk-17-ga.tar.gz
解压源码包
tar -xvf jdk-17-ga.tar.gz
cd jdk-17-ga
接下来需要安装一些必要的系统依赖库和系统工具库,如下所示。
sudo apt-get update
sudo apt-get install build-essential autoconf libx11-dev libxext-dev libxrender-dev libxtst-dev libxt-dev libxrandr-dev libcups2-dev libfontconfig1-dev libasound2-dev libfreetype6-dev libcairo2-dev libgtk2.0-dev libgtk-3-dev libpng-dev libjpeg-dev libgif-dev libgconf2-dev libnss3-dev
接下来我们可以尝试构建JDK,通过configure命令和make命令来进行构建。
bash configure
make images
这个构建过程可能会消耗一部分的时间,完成之后JDK会在build目录中,接下来我们需要根据路径来去配置相应的环境变量情况,如下所示。
export JAVA_HOME=$(pwd)/build/linux-x86_64-server-release/images/jdk
export PATH=$JAVA_HOME/bin:$PATH
配置好之后,我们就可以通过java命令来检查JDK环境安装情况。
安装完成JDK之后,接下来我们要安装Kafka了,我们可以从Kafka官网上下载Kafka的安装文件,如下所示。
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
接下来我们需要解压并且配置Kafka需要使用的Zookeeper集群环境来管理Kafka。
tar -xvf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
由于用到了Zookeeper,所以我们需要去Zookeeper官网上下载Zookeeper对应的源码包进行安装。
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1.tar.gz
安装好之后,接下来就是启动Zookeeper。
# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
在另外的窗口中启动Kafka
# 启动Kafka服务器
bin/kafka-server-start.sh config/server.properties
简单验证Kafka的安装
首先先来创建一个主题,如下所示。
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
查看我们创建好的主题,如下所示。
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
启动消息生产者,进行消息的发送操作。
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
启动消息消费者,进行消息的接收操作。
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
这个时候,我们可以在生产者的窗口中输入一些消息,我们就可以在消费者的窗口中看到这些消息。
SpringBoot整合Kafka
安装好Kafka之后,接下来我们就来看看在SpringBoot应用中如何使用Kafka进行消息的生产和消费。
添加Maven依赖
在pom.xml文件中添加上相关的依赖配置,如下所示。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
配置Kafka连接
在application.properties文件中,添加Kafka的连接配置操作,如下所示。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
创建Kafka生产者
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "test";
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
创建Kafka消费者
package com.example.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
}
创建一个REST控制器来发送消息到Kafka
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/send")
public String sendMessage(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
return "Message sent to Kafka topic";
}
}
总结
上述步骤介绍了如何安装Kafka环境,并且在SpringBoot项目中如何使用Kafka来实现消息发布和消息消费的逻辑,并且通过REST接口来进行了测试,在实际操作过程中,可能会遇到各种复杂情况,需要结合具体情况来对上述实现逻辑进行调整。