玖叶教程网

前端编程开发入门

SpringBoot整合Kafka实现消息的生产消费?

在介绍整合之前,首先先来介绍如何安装和搭建Kafka的开发环境。

安装Kafka

在安装Kafka环境之前,首先需要安装JDK的环境,如下所示,我们以OpenJDK17为例进行安装。

wget https://github.com/openjdk/jdk/archive/refs/tags/jdk-17-ga.tar.gz

解压源码包

tar -xvf jdk-17-ga.tar.gz
cd jdk-17-ga

接下来需要安装一些必要的系统依赖库和系统工具库,如下所示。

sudo apt-get update
sudo apt-get install build-essential autoconf libx11-dev libxext-dev libxrender-dev libxtst-dev libxt-dev libxrandr-dev libcups2-dev libfontconfig1-dev libasound2-dev libfreetype6-dev libcairo2-dev libgtk2.0-dev libgtk-3-dev libpng-dev libjpeg-dev libgif-dev libgconf2-dev libnss3-dev

接下来我们可以尝试构建JDK,通过configure命令和make命令来进行构建。

bash configure
make images

这个构建过程可能会消耗一部分的时间,完成之后JDK会在build目录中,接下来我们需要根据路径来去配置相应的环境变量情况,如下所示。

export JAVA_HOME=$(pwd)/build/linux-x86_64-server-release/images/jdk
export PATH=$JAVA_HOME/bin:$PATH

配置好之后,我们就可以通过java命令来检查JDK环境安装情况。

安装完成JDK之后,接下来我们要安装Kafka了,我们可以从Kafka官网上下载Kafka的安装文件,如下所示。

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz

接下来我们需要解压并且配置Kafka需要使用的Zookeeper集群环境来管理Kafka。

tar -xvf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1

由于用到了Zookeeper,所以我们需要去Zookeeper官网上下载Zookeeper对应的源码包进行安装。

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1.tar.gz

安装好之后,接下来就是启动Zookeeper。

# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

在另外的窗口中启动Kafka

# 启动Kafka服务器
bin/kafka-server-start.sh config/server.properties

简单验证Kafka的安装

首先先来创建一个主题,如下所示。

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

查看我们创建好的主题,如下所示。

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

启动消息生产者,进行消息的发送操作。

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

启动消息消费者,进行消息的接收操作。

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

这个时候,我们可以在生产者的窗口中输入一些消息,我们就可以在消费者的窗口中看到这些消息。

SpringBoot整合Kafka

安装好Kafka之后,接下来我们就来看看在SpringBoot应用中如何使用Kafka进行消息的生产和消费。

添加Maven依赖

在pom.xml文件中添加上相关的依赖配置,如下所示。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

配置Kafka连接

在application.properties文件中,添加Kafka的连接配置操作,如下所示。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

创建Kafka生产者

package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "test";

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

创建Kafka消费者

package com.example.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "test", groupId = "group_id")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

创建一个REST控制器来发送消息到Kafka

package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message) {
        kafkaProducer.sendMessage(message);
        return "Message sent to Kafka topic";
    }
}

总结

上述步骤介绍了如何安装Kafka环境,并且在SpringBoot项目中如何使用Kafka来实现消息发布和消息消费的逻辑,并且通过REST接口来进行了测试,在实际操作过程中,可能会遇到各种复杂情况,需要结合具体情况来对上述实现逻辑进行调整。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言