在很多业务场景下,我们都需要通过日志的采集来分析系统运行情况以及用户使用情况一般情况下我们可以通过Kafka来进行日志的采集进行日志分类汇总,那么下面我们就来看看如何在SpringBoot应用程序中整合Kafka来进行日志采集。
添加Kafka依赖
要使用Kafka必不可少的要添加Kafka相关的POM依赖如下所示。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka
接下来需要利用SpringBoot提供的自动配置在配置文件中添加Kafka的连接信息,方便在后续过程中使用。如下所示。
spring.kafka.bootstrap-servers=localhost:9092
编写日志文件监听器
配置完成之后,我们可以创建一个监听器,用来监听日志文件的变化情况,然后将新的日志文件发送到Kafka的某个主题中完成日志信息的采集,我们可以通过@Component注解来标识该类使得这个类能够被SpringBoot自动配置机制扫描到如下所示。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Scheduled;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
@Component
public class LogFileListener {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedRate = 1000) // 定时扫描日志文件,每秒执行一次
public void readLogFile() {
try {
File file = new File("/path/to/your/log/file.log");
BufferedReader reader = new BufferedReader(new FileReader(file));
String line;
while ((line = reader.readLine()) != null) {
// 将日志消息发送到 Kafka 主题中
kafkaTemplate.send("logs-topic", line);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
配置 Kafka 生产者
采集日志监听配置完成之后,接下来需要在Spring Boot项目中配置Kafka的消息生产者,用这个生产者将消息发送到日志的消费者主题上,如下所示。
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
创建 Kafka 主题
在代码中配置完成之后,需要再Kafka的服务中设置对应的Topic来接收发送过来的日志消息,如下所示
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logs-topic
读取日志文件路径
在上面的例子中,我们是将日志通过硬编码的方式存储在代码中,无法进行动态的调整。在实际的开发过程中,我们的这个日志采集的应用可能会部署到任意的服务器上进行日志采集,这个时候我们就需要将日志路径的配置调整为动态的,我们可以通过如下的方式来实现。
如果我们想动态的指定日志文件路径,可以通过Spring Boot提供的配置属性来 实现,可以在配置文件中添加日志的采集路径属性通过这个属性来动态的获取日志采集路径如下所示。
log.file.path=/path/to/your/log/file.log
然后,在 Spring Bean 中读取该属性的值
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class LogFileListener {
@Value("${log.file.path}")
private String logFilePath;
// 其他代码...
}
这样就可以动态地指定日志文件路径,并且可以在不修改代码的情况下更改日志文件路径。
总结
这样我们就实现了一个用于日志采集的SpringBoot的应用程序,我们可以在任意的服务器上去部署该项目用来完成日志变化的监听操作,当然我们也可以实现复杂的逻辑去监听某个日志目录下的所有日志文件并且对日志文件进行采集。