kafkalogstash elasticsearchkibanawindow安装配置

xiaoxiao2021-02-27  389

主要使用 kafka logstash elasticsearch kibana进行配置日志分析收集,由于只是学习,所以在window机器上搭建

kafka_2.10-0.10.1.1 http://kafka.apache.org/downloads zookeeper-3.4.6 http://zookeeper.apache.org/logstash-5.3.2 https://www.elastic.co/cn/downloads/logstashelasticsearch-5.3.2 https://www.elastic.co/cn/downloads/elasticsearch

kibana-5.3.2-windows-x86

https://www.elastic.co/cn/downloads/kibana

由于 logstash 、kibana 、elasticsearch都是一个公司的,可各个产品首页显示的不是5.3.2 版本 ,可以点击产品页面上的View past releases. 下载对应的版本,为了避免版本冲突 建议这三个采用同一个版本

简单的原理: kafka(在应用中采集日志)—-logstash(从kafka中收集日志到ES中)—-elasticsearch(负责存储日志信息)——–kibana(负责用来展示数据)

所以启动顺序: 先把zookeeper kafka elasticsearch 1. zookeeper启动

cd zookeeper-3.4.6/bin zkServer.cmd

2.kafka服务启动

cd kafka_2.10-0.10.1.1\bin\windows kafka-server-start.bat ../../config/server.properties 为了查看效果 顺带启动一下 kafka 消费者 kafka-console-consumer.bat –zookeeper localhost:2181 –topic mytopic –from-beginning

3.启动elasticsearch

cd elasticsearch-5.3.2\bin elasticsearch.bat elasticsearch启动后 可以看http://localhost:9200/ 正常会返回一个json 对应你的elasticsearch的版本相关信息

4.配置kibana 首先修改一下kibana的配置文件 config\kibana.yml

cd kibana-5.3.2-windows-x86\config 在config增加一行配置 elasticsearch.url: “http://localhost:9200” cd kibana-5.3.2-windows-x86\bin kibana.bat

启动后可以进入kibana首页查看http://localhost:5601 一开始进去会提示没有index pattern。原因是因为我们还没有把数据弄到elasticsearch中 5.配置logstash

cd logstash-5.3.2\bin 我们在这个目录下 新建一个配置文件 名字:logstash.conf 内容如下:

input{ kafka { bootstrap_servers => "localhost:9092" topics => ["mytopic"] enable_auto_commit => "true" auto_offset_reset => "latest" } } output{ stdout{} elasticsearch { hosts => [ "localhost:9200" ] index => "logstash-cylog" } }

简单说明一下这个配置文件: input:代表数据源怎么来的。我们这里使用kafka消费者 filter :代表过滤数据,这里配置代表是 将message进行json解析。(非必须) output:输出数据,stdout代表输出到控制台中(方便直接查看效果) elasticsearch 配置 指向本地的地址 input filter output 这个只是简单配置,详细的可以参考logstash的官网 https://www.elastic.co/cn/products/logstash 里面有非常详细,针对每个配置说明

由于我们使用需要使用到kafka-input插件 所以我们要安装kafka的插件

cd logstash-5.3.2\bin logstash-plugin.bat install logstash-input-kafka

安装插件后 启动logstash

提示成功后,就是我们最重要的一步了。

把日志发送到kafka 这里有很多种方式,我们采用最简单的log4j kafka appender 新建一个maven项目 KafkaAppenderApplication

package com.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaAppenderApplication { public static Logger logger = LogManager.getLogger(KafkaAppenderApplication.class); public static void main(String[] args) { long t = System.currentTimeMillis(); logger.info("inf1o123" + t + "info"); SpringApplication.run(KafkaAppenderApplication.class, args); System.exit(0); } }

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>kafkaAppender</artifactId> <version>1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>

log4j2.xml

<Configuration status="warn" monitorInterval="30" strict="true" schema="Log4J-V2.2.xsd"> <Appenders> <!-- 输出到控制台 --> <Console name="Console" target="SYSTEM_OUT"> <!-- 需要记录的级别 --> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n"/> </Console> <!-- 输出到Kafka topci:mytopic bootstrap.servers: localhost:9092--> <Kafka name="KAFKA" topic="mytopic"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] [%-5p] {%F:%L} - %m%n" /> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> </Appenders> <Loggers> <Root level="info"> <!-- 全局配置 --> <AppenderRef ref="Console"/> <AppenderRef ref="KAFKA"/> </Root> </Loggers> </Configuration>

启动项目后 kafka消费者显示日志:

2017-05-04 14:16:35.221 [main] [INFO ] {KafkaAppenderApplication.java:15} - inf1 o1231493878595218info 2017-05-04 14:16:35.865 [background-preinit] [INFO ] {Version.java:30} - HV00000 1: Hibernate Validator 5.2.4.Final

然后我们可以登录到kafka 首先建立index 可以选择logstash-* 进入 Discover后 可以看到我们刚才的日志

转载请注明原文地址: https://www.6miu.com/read-1882.html

最新回复(0)