博客 Logstash输入Kafka输出Es配置

Logstash输入Kafka输出Es配置

   数栈君   发表于 2024-05-06 11:00  1074  0

Logstash介绍
Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。Logstash的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过滤器和输出插件进行转换。

Logstash的工作原理是使用管道方式进行日志的搜集处理和输出。这个管道包括三个阶段:输入、处理和输出。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。

Logstash的输入支持各种选择,可以同时从众多常用来源捕捉事件,如日志、指标、Web应用、数据存储以及各种AWS服务等。在数据从源传输到存储库的过程中,Logstash的过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash的输出也可以根据需要选择不同的存储方式,除了Elasticsearch作为首选输出方向外,还有其他的输出选择。

Logstash是一个强大的开源工具,可以用于实时处理和转换来自各种数据源的数据,为数据分析和商业决策提供支持。

Kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。它是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,Kafka是一个可行的解决方案。

Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Es介绍
ES指的是Elasticsearch,它是一个基于RESTful web接口并且构建在Apache Lucene之上的开源分布式搜索引擎。它还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索。它能够横向扩展至数以百计的服务器存储以及处理PB级的数据,可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。

Logstash输入输出配置
Logstash的输入输出配置主要是针对其输入和输出插件进行设置。以下是一些常见的输入和输出插件的配置示例:

输入配置:

file:从文件读取日志信息,例如:
input {
file {
path => "/var/log/error.log"
type => "error"
start_position => "beginning"
}
}
1
2
3
4
5
6
7
stdin:从标准输入读取日志信息,例如:
input {
stdin {}
}
1
2
3
syslog:从系统日志读取日志信息,例如:
input {
syslog {
type => "syslog"
}
}
1
2
3
4
5
输出配置:

stdout:将日志信息输出到标准输出,例如:
output {
stdout {}
}
1
2
3
elasticsearch:将日志信息输出到Elasticsearch集群,例如:
output {
elasticsearch {
hosts => "localhost:9200"
index => "myindex"
}
}
1
2
3
4
5
6
以上是一些常见的输入输出插件配置示例,Logstash还支持其他多种输入输出插件,可以根据具体需求进行选择和配置。

Logstash输入Kafka输出Es配置
Logstash的输入配置可以通过Kafka插件从Kafka中读取数据,输出配置可以通过Elasticsearch插件将数据写入Elasticsearch集群。以下是一个示例配置:

input {
kafka {
bootstrap_servers => "your_kafka_server:9092"
client_id => "your_client_id"
group_id => "your_group_id"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["your_topic"]
}
}

output {
if [@metadata][kafka][topic] == "your_topic" {
elasticsearch {
hosts => "your_elasticsearch_server:9200"
index => "your_index"
timeout => 300
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在这个配置中,Logstash通过Kafka插件从指定的Kafka服务器和主题中读取数据,然后通过Elasticsearch插件将数据写入指定的Elasticsearch索引。你可以根据实际情况修改配置中的参数,例如Kafka服务器的地址、客户端ID、组ID、主题等。

上面的配置参数的含义如下所示:
bootstrap_servers: 这是Kafka服务器的地址和端口。你需要提供Kafka集群中至少一个服务器的地址。
client_id: 这是客户端的唯一标识符,用于标识连接到Kafka集群的客户端。
group_id: 这是消费者组的ID。如果你有多个Logstash实例读取同一个Kafka主题,并且你想将它们作为一个消费者组来处理,那么你需要使用这个参数。
auto_offset_reset: 这个参数决定了当Logstash无法找到其之前读取的偏移量时应该怎么做。设置为"latest"意味着从最新的记录开始读取。
consumer_threads: 这是用于消费Kafka消息的线程数。增加线程数可以加快数据读取速度,但也会增加CPU和内存的使用。
decorate_events: 如果设置为true,Logstash会为每个事件添加额外的元数据,例如Kafka主题和分区信息。
topics: 这是Logstash要读取的Kafka主题列表。
if [@metadata][kafka][topic] == “your_topic”: 这是一个条件语句,用于确定是否将事件发送到Elasticsearch。只有当事件的主题与指定的"your_topic"匹配时,事件才会被发送到Elasticsearch。
hosts: 这是Elasticsearch集群的地址和端口。
index: 这是Logstash将数据写入Elasticsearch的索引名称。
timeout: 这是Logstash与Elasticsearch集群通信的超时时间(以秒为单位)。
这些参数可以根据你的具体需求进行调整,以满足你的数据收集和处理需求。

java发送消息到Kafka示例
Apache Kafka是一种分布式流处理平台,你可以使用它来处理各种数据。以下是使用Java向Kafka发送消息的示例代码:

首先,你需要添加Apache Kafka的依赖到你的项目中。如果你正在使用Maven,那么你可以在pom.xml文件中添加如下依赖:

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
以下是使用Java发送消息的示例代码:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class ProducerDemo {
public static void main(String[] args) {
// 1. 配置生产者客户端参数
Properties props = new Properties();
// Kafka集群地址
props.put("bootstrap.servers", "your_kafka_server:9092");
// 消息ack模式: all表示消息被leader和follower都写入后才返回ack, -1表示只被leader写入就返回ack
props.put("acks", "all");
// 重试次数
props.put("retries", 0);
// 批量发送大小
props.put("batch.size", 16384);
// 发送延时,用于控制producer发送请求的延迟时间,可以提高吞吐量
props.put("linger.ms", 1);
// 缓冲区大小
props.put("buffer.memory", 33554432);
// key序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建生产者对象,传入配置参数
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
// 3. 创建消息对象,指定topic、消息key和消息体value
ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "key" + i, "value" + i);
// 4. 发送消息到Kafka集群,并获取返回结果
RecordMetadata metadata = producer.send(record).get();
// 打印结果,发送是否成功,以及发送到的分区和offset等信息
System.out.printf("offset = %d, partition = %d%n", metadata.offset(), metadata.partition());
}
// 5. 关闭生产者对象,释放资源
producer.close();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
在这个示例中,我们创建了一个名为ProducerDemo的类,这个类使用Kafka的生产者API发送消息到名为"my-topic"的主题。请注意你需要替换"bootstrap.servers"属性的值为你的Kafka集群的实际地址。如果你的集群在本地运行,并且使用的是默认的端口,那么你可以使用"localhost:9092"。

Logstash常用输入插件
Logstash的常用输入插件包括以下几种:

file:该插件可以从文件中读取事件。它使用了FileWatch库来监听文件变化,并跟踪被监听的日志文件的当前读取位置,从而确保不会漏过任何数据。
stdin:该插件是标准的输入插件,能够从命令行中读取事件。
TCP:从TCP连接中读取数据。
UDP:从UDP套接字中读取数据。
Redis:从Redis中读取数据。
JDBC:从关系型数据库中读取数据。
HTTP:从HTTP服务器中读取数据。
Logstash常用输出插件
Logstash常用的输出插件包括以下几种:

Elasticsearch:将日志数据输出到Elasticsearch,用于后续的搜索和分析。
Kafka:将日志数据发送到Kafka集群,供其他消费者使用。
File:将日志数据输出到文件中,便于后续查看和审计。
Gelf:将日志数据输出到Gelf兼容的服务器,用于远程监控和报警。
Fluentd:将日志数据输出到Fluentd,用于统一日志收集和转发。


《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群