文章目录 1. 引言1.1 什么是Logstash?1.2 Logstash的主要特点 2. 下载与配置2.1 下载2.2 文件结构2.3 环境配置 3. Logstash三大核心组件3.1 Input3.2 Filter3
Elasticsearch是在数据处理生态系统中担任一个开源的分布式搜索和分析引擎的角色,专为存储、检索和分析大量的数据而打造。与此相伴的是Kibana,一个开源数据可视化平台,用于以优雅的方式展示elasticsearch中的数据,并赋予用户创建仪表盘、图表和报告的能力。然而,实现完整的数据流并不仅止于此。Logstash担任数据处理的角色,使得数据处理的整个过程更加完整。这三大组件就构成了我们平时所说的ELK。下面就开始对Logstash进行详细的介绍。
Logstash作为一个具备实时流水线功能的开源数据收集引擎,拥有强大的能力。它能够从不同来源收集数据,并将其动态地汇聚,进而根据我们定义的规范进行转换或者输出到我们定义的目标地址。
Logstash通过清洗和使数据多样化,Logstash使数据变得适用于各种高级下游分析和可视化用例。此外,Logstash提供广泛的输入、过滤器和输出插件,而且许多本地编解码器进一步简化了数据摄取的过程。无论是数据整理还是提供给下游应用,Logstash都是一个强大且灵活的解决方案。
引用自Logstash官网的图片,更能说明他的角色和功能:

本文使用的是7.17最新的版本7.17.12,7.x版本也是最后支持jdk8的版本,后续8.0默认jdk11起.
登陆到elastic下载地址,选择产品 Logstash 和版本号 7.17.12

关于系统选择,如果是行x86架构的linux的选择下载LINUX X86_64,如果是windows的就选择WINDOWS,这两个系统没有什么区别,启动和配置都差不多,建议学习测试可以使用windows版本的。
解压后的文件目录如下:

bin: 这个目录包含了用于启动Logstash的可执行脚本。例如启动脚本,通过运行这脚本,可以启动Logstash。
config: 存放Logstash的配置文件。
data: 数据目录用于存储Logstash的持久化数据,如内部状态信息、临时文件等。
jdk: 包含Logstash所需的Java Development Kit(JDK)版本。它是Logstash自带的特定JDK版本,以确保Logstash在运行时有所需的Java环境。
lib: 该目录通常包含Logstash的依赖库和插件。
logstash-core 和 logstash-core-plugin-api: 这些目录包含Logstash核心功能和插件api的代码。Logstash核心实现了数据处理流水线的核心逻辑,而插件API允许开发者创建自定义插件来扩展Logstash的功能。
modules: 模块目录包含一些预定义的Logstash模块,用于处理特定类型的数据,如日志、网络流量等。这些模块可以简化配置并提供一些默认设置。
vendor: 该目录包含Logstash所需的依赖库和插件,以及一些其他工具。
x-pack: 是Elasticsearch、Kibana、Logstash和Beats的一个扩展功能套件,它提供了一系列的安全性、监控、警报、机器学习和其他高级功能,旨在增强Elastic Stack的功能。
Logstash 推荐在环境变量配置 LS_JAVA_HOME 变量指向jdk目录来使用jdk。配置jdk目录时,我们可以直接使用Logstash 中的jdk目录,省去了另外下载和有可能引起版本不能用的问题。(7.17.12支持的jdk版本只有 jdk8,jdk11和jdk15)
在7.17.12以及之前的版本中还兼容使用我们配置的JAVA_HOME环境变量,后续会取消对该变量的支持。
Logstash 管道有两个必需元素input(收集源数据)和output(输出数据),以及一个可选元素filter(格式化数据) 。三个插件在源数据和Elasticsearch中的关系如下:

input是用于从不同数据源收集数据的插件或配置部分。input插件允许您定义数据输入的来源,并将数据发送到Logstash进行后续处理。Logstash支持多种类型的输入插件,每种插件适用于不同的数据源类型。常用的有:
详细的插件可以查看Logstash官网
filter是用于对输入的数据进行处理、转换和过滤的部分。filter插件允许在数据进入 Logstash 后(经过input处理),再次对其进行各种操作,以满足特定需求,例如数据清洗、解析、标准化等。Logstash 支持多种类型的 “filter” 插件,每种插件适用于不同的数据处理需求。以下是一些常见的 Logstash “filter” 插件:
output是用于将处理过的数据发送到不同目标的插件或配置部分。output插件允许定义数据输出的目标,并将经过 Logstash 处理后的数据传输到这些目标。Logstash 支持多种类型的输出插件,每种插件适用于不同的数据存储、传输或处理需求。以下是一些常见的 Logstash “output” 插件:
启动logstash比较简单,只需要执行bin目录下的 logstash(linxu)或者logstash.bat(windows)。
# linux启动命令./bin/logstash# windows启动命令.\bin\logstash.bat 在编写示例前,需要先了解一下重要的配置文件
一些常见的配置项包括:
pipeline.batch.size:指定每个批次处理的事件数量。
pipeline.batch.delay:指定每个批次之间的延迟时间。
path.data:指定 Logstash 数据的存储路径。
http.host:指定 HTTP 监听的主机名。
http.port:指定 HTTP 监听的端口号。
pipeline.workers:指定并行处理事件的工作线程数量。
queue.type: 指定队列的存储类型,可选memory(内存)和persisted(持久)
一些常见的配置项包括:
-Xmx:指定 Java 堆内存的最大值。
-Xms:指定 Java 堆内存的初始值。
-XX:+UseConcMarkSweepGC:指定使用 CMS(Concurrent Mark-Sweep)垃圾回收器。
-Djava.io.tmpdir:指定临时文件的存储路径。
在logstash根目录下执行
# windows执行.\bin\logstash.bat -e "input { stdin { } } output { stdout {} }"#linux执行bin/logstash -e 'input { stdin { } } output { stdout {} }' 启动 Logstash 后,请等待,直到看到][main] Pipeline started {"pipeline.id"=>"main"},然后在命令提示符下输入:hello world

上面的例子我们直接采用参数 -e后跟随管道配置方式启动,我们还可以使用-f参数指定我们的配置文件方式启动。在跟目录下创建hello.conf文件,文件内容:
input { stdin { } } output { stdout { } } 然后执行启动命令
# windows执行.\bin\logstash.bat -f hello.conf#linux执行bin/logstash -f hello.conf 打开config目录下的pipelines.yml文件,输入
- pipeline.id: hello pipeline.workers: 1 pipeline.batch.size: 1 config.string: "input { stdin { } } output { stdout {} }" 除了直接配置管道处理规则,我们还可以指向刚刚编写的hello.conf文件
- pipeline.id: hello pipeline.workers: 1 pipeline.batch.size: 1 path.config: "/usr/local/logstash/hello.conf" 保存文件后执行:
# windows执行.\bin\logstash.bat#linux执行bin/logstash 需要提前准备好mysql的表结构和测试数据:
# 建表语句CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `content` varchar(255) DEFAULT NULL, `status` int(11) DEFAULT NULL, `update_time` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;# 插入数据insert into test (content,status,update_time) VALUES ("aaa",1,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("bbb",1,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("ccc",2,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("DDD",1,UNIX_TIMESTAMP()); 需要启动好elasticsearch和kibana,并且elasticsearch需要开启允许自动创建索引,如果没有开启需要事先创建好索引
在logstash根目录下创建一个mylib的目录,用于存放java连接mysql的jar文件,例如:mysql-connector-java-8.0.27.jar
需求:每分钟执行根据test表的id从小到大并且status等于1保存elasticsearch中,每次执行的数量是2条
在logstash跟目录下创建 mysql-by-id-to-es.conf 文件,文件内容:
input { jdbc { jdbc_driver_library => "./mylib/mysql-connector-java-8.0.27.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/test" jdbc_user => "root" jdbc_password => "123456" parameters => { "myStatus" => 1 } schedule => "* * * * *" statement => "SELECT id,content,status,update_time FROM test WHERE status = :myStatus AND id > :sql_last_value ORDER BY id ASC LIMIT 2" last_run_metadata_path => "mysql-by-id-to-es.index" tracking_column => "id" use_column_value => true tracking_column_type => "numeric" }}filter {mutate { add_field => { "from" => "logstash" } }}output { elasticsearch { index => "test-by-id-%{+YYYY.MM}" } stdout { }} 保存好文件内容,在logstash根目录下执行-f启动命令
# windows执行.\bin\logstash.bat -f mysql-by-id-to-es.conf# linux执行./bin/logstash.bat -f mysql-by-id-to-es.conf 控制台打印信息:

到kibana执行查看数据:
1.先执行GET _cat/indices?v 查看是否存储以test-by-id开头的索引

如果索引存在,执行查看数据的语句GET test-by-id-2023.08/_search{ "query": {"match_all": {}}},可以发现status等于2的记录不会被录入
{ "_index" : "test-by-id-2023.08", "_type" : "_doc", "_id" : "FsWU74kBjZ5FwUtCTy7a", "_score" : 1.0, "_source" : { "update_time" : 1691939803, "@version" : "1", "content" : "aaa", "@timestamp" : "2023-08-13T15:47:01.008Z", "status" : 1, "id" : 1, "from" : "logstash" } }, { "_index" : "test-by-id-2023.08", "_type" : "_doc", "_id" : "FcWU74kBjZ5FwUtCTy7a", "_score" : 1.0, "_source" : { "update_time" : 1691939803, "@version" : "1", "content" : "bbb", "@timestamp" : "2023-08-13T15:47:01.020Z", "status" : 1, "id" : 2, "from" : "logstash" } }, { "_index" : "test-by-id-2023.08", "_type" : "_doc", "_id" : "F8WV74kBjZ5FwUtCNS6W", "_score" : 1.0, "_source" : { "update_time" : 1691939803, "@version" : "1", "content" : "ddd", "@timestamp" : "2023-08-13T15:48:00.413Z", "status" : 1, "id" : 4, "from" : "logstash" } } 如果希望通过数据的更新时间录入,新建**mysql-by-uptime-to-es.conf文件,文件内容:
input { jdbc { jdbc_driver_library => ",/mylib/mysql-connector-java-8.0.27.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/test" jdbc_user => "root" jdbc_password => "123456" parameters => { "myStatus" => 1 } schedule => "* * * * *" statement => "SELECT id,content,status,update_time FROM test WHERE status = :myStatus AND update_time > :sql_last_value" last_run_metadata_path => "mysql-by-uptime-to-es.index" }}filter {mutate { add_field => { "from" => "logstash" } }}output { elasticsearch { index => "test-by-uptime-%{+YYYY.MM}" } stdout { }} 保存文件,启动和查看数据请参考前面的5.2.1
例子:
- 5 * 1-3 * 将在 5 月至 <> 月的每一天凌晨 <> 点执行一次。
0 * * * * 将在每天每小时的第 0 分钟执行。
0 6 * * * America/ChicaGo 将在每天上午 6:00 (UTC/GMT -5) 执行。
tracking_column 指定增量抽取时用来跟踪的列名,而 use_column_value 指示是否使用列的值作为跟踪标记。numeric 和timestamp,默认是timestamp。elasticsearch { # 配置es地址,有多个使用逗号隔开,不填默认就是 localhost:9200 hosts => ["localhost:9200"] # 配置索引 index => "test-by-uptime-%{+YYYY.MM}" # 配置账号和密码,默认不填 user => "elastic" password => "123456"} 本文通过详细介绍Logstash的核心组件和功能,涵盖如何从下载安装到编写第一个Hello World示例,再到定时同步MySQL数据到Elasticsearch的完整过程。希望此教程能成为您学习和掌握Logstash的重要参考。
--结束END--
本文标题: 【elasticsearch专题】:Logstash从入门到同步MySQL数据
本文链接: https://www.lsjlt.com/news/551735.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0