第七章 ES和MYSQL数据同步
第1节 logstash简介
Logstash是一款开源的数据收集引擎,具备实时管道处理能力。简单来说,logstash作为数据源与数据存储分析工具之间的桥梁,结合ElasticSearch以及Kibana,能够极大方便数据的处理与分析。通过200多个插件,logstash可以接受几乎各种各样的数据。包括日志、网络请求、关系型数据库、传感器或物联网等等
第2节 logstash下载(6.3.2)
官网地址
1
https://www.elastic.co/cn/downloads/logstash
华为镜像站
1
https://mirrors.huaweicloud.com/logstash/
第3节 logstash配置(简单配置-单表/多表)
1
2
3
4
logstash工作时,主要设置3个部分的工作属性。
input:设置数据来源
filter:可以对数据进行一定的加工处理过滤,但是不建议做复杂的处理逻辑。这个步骤不是必须的
output:设置输出目标
安装/配置步骤(使用单个表生成索引)
1
2
3
1. 将下载的logstash-6.3.2.zip文件解压到指定位置
2. 创建一个配置文件名字自定义,我命名为 mysql.conf,将自定义配置文件放到指定位置(我放到了D:\soft\logstash-6.3.2\config文件夹下)
3. 在mysql.conf中添加配置信息
input{
jdbc{
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM department"
}
}
output{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
4. 启动logstash服务,使用我们自定义的配置启动
-- 使用cmd命令行定位到当前logstash目录的bin目录下使用命令为: logstash -f ../config/mysql.conf 启动服务
5. 等待索引创建完成(这种方式为全量方式)
安装/配置步骤(使用多张表生成索引)
input{
# 第1张表
jdbc{
# 设置当前jdbc的type
type => "department"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM department"
}
# 第2张表
jdbc{
# 设置当前jdbc的type
type => "student"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM student"
}
}
output{
# 根据jdbc中的type值指定输入的jdbc对应的输出为哪一个elasticsearch
if[type]=="department"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
if[type]=="student"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "student"
# 生成根据student_id,生成ES的 _id
document_id => "%{student_id}"
# 类型
document_type => "student_doc"
}
}
}
第4节 logstash配置(其他配置)
官网配置地址
1
https://www.elastic.co/guide/en/logstash/6.3/plugins-inputs-jdbc.html
4.1 使用时间字段进行追踪(一般使用更新时间字段)
1
在阿里巴巴嵩山版手册中强调,任何数据库表都必须有创建时间和更新时间字段,都是datetime类型,也就是保存时间为年月日时分秒
input{
# 第1张表
jdbc{
# 设置当前jdbc的type
type => "department"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务,一分钟同步一次
schedule => "* * * * *"
# 如果sql语句使用sql_last_value属性要设置use_column_value为true,并且指定tracking_column属性,指定哪一列
use_column_value => true
# 指定sql_last_value参数使用哪一列
tracking_column => "create_time"
# 设置tracking_column属性名保存的状态
last_run_metadata_path => "D:\\soft\\logstash-6.3.2\\a\\a.txt"
# 是否保留前一次的运行状态,如果不保留下次重启服务器所以会被重新创建
clean_run => false
# 设置时间戳
tracking_column_type => "timestamp"
# 生成索引的数据来源
statement => "SELECT * FROM department WHERE create_time > :sql_last_value AND create_time < now() ORDER BY create_time desc"
}
# 第2张表
jdbc{
# 设置当前jdbc的type
type => "student"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务,一分钟同步一次
schedule => "* * * * *"
# 如果sql语句使用sql_last_value属性要设置use_column_value为true
use_column_value => true
# 指定sql_last_value参数使用哪一列
tracking_column => "create_time"
# 设置tracking_column属性名保存的状态
last_run_metadata_path => "D:\\soft\\logstash-6.3.2\\b\\b.txt"
# 是否保留前一次的运行状态,如果不保留下次重启服务器所以会被重新创建
clean_run => false
# 设置时间戳
tracking_column_type => "timestamp"
# 生成索引的数据来源
statement => "SELECT * FROM student WHERE create_time > :sql_last_value AND create_time < now() ORDER BY create_time desc"
}
}
output{
# 根据jdbc中的type值指定输入的jdbc对应的输出为哪一个elasticsearch
if[type]=="department"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
if[type]=="student"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "student"
# 生成根据student_id,生成ES的 _id
document_id => "%{student_id}"
# 类型
document_type => "student_doc"
}
}
}
4.2 使用其他字段进行追踪(设置使用主键进行追踪)
1
在修改其他字段进行追踪时,一定不要忘记修改tracking_column_type类型,详情修改请参官网
input{
# 第1张表
jdbc{
# 设置当前jdbc的type
type => "department"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务,一分钟同步一次
schedule => "* * * * *"
# 如果sql语句使用sql_last_value属性要设置use_column_value为true,并且指定tracking_column属性,指定哪一列
use_column_value => true
# 指定sql_last_value参数使用哪一列
tracking_column => "dept_id"
# 设置tracking_column属性名保存的状态
last_run_metadata_path => "D:\\soft\\logstash-6.3.2\\a\\a.txt"
# 是否保留前一次的运行状态,如果不保留下次重启服务器所以会被重新创建
clean_run => false
# 设置列的类型
tracking_column_type => "numeric"
# 生成索引的数据来源
statement => "SELECT * FROM department WHERE dept_id > :sql_last_value"
}
# 第2张表
jdbc{
# 设置当前jdbc的type
type => "student"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务,一分钟同步一次
schedule => "* * * * *"
# 如果sql语句使用sql_last_value属性要设置use_column_value为true
use_column_value => true
# 指定sql_last_value参数使用哪一列
tracking_column => "student_id"
# 设置tracking_column属性名保存的状态
last_run_metadata_path => "D:\\soft\\logstash-6.3.2\\b\\b.txt"
# 是否保留前一次的运行状态,如果不保留下次重启服务器所以会被重新创建
clean_run => false
# 设置列的类型
tracking_column_type => "numeric"
# 生成索引的数据来源
statement => "SELECT * FROM student WHERE student_id > :sql_last_value"
}
}
output{
# 根据jdbc中的type值指定输入的jdbc对应的输出为哪一个elasticsearch
if[type]=="department"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
if[type]=="student"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "student"
# 生成根据student_id,生成ES的 _id
document_id => "%{student_id}"
# 类型
document_type => "student_doc"
}
}
}
注意: 日常bug记录与采坑指南,如果出现下面的BUG并且在进行配置查找的时候,无论怎么校验都不能从自己的配置中找到错误,但是服务器还启动不起来,那么可以尝试在错误信息中查找关键信息,经过反复校验,如果设置了last_run_metadata_path服务器可以正常启动,如果配置中少了last_run_metadata_path 属性服务器启动失败,就会报如下错误,并且在下面的错误中,有相关信息,我的报错信息如下,关键信息last_run_metadata_path=>"C:\\Users\\lenovo/.logstash_jdbc_last_run",说明在上次发生错的时候logstash对我上次的错误信息进行的保存,造成服务器每次启动读取错误信息,造成的服务器启动失败,去指定位置删除即可.
1
2
3
[2020-08-20T02:27:40,897][ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main", :plugin=>"<LogStash::Inputs::Jdbc jdbc_driver_library=>\"D:\\\\\\\\soft\\\\\\\\logstash-6.3.2\\\\\\\\lib\\\\\\\\mysql-connector-java-5.1.47.jar\", jdbc_driver_class=>\"com.mysql.jdbc.Driver\", jdbc_connection_string=>\"jdbc:mysql://127.0.0.1:3306/gn_oa\", jdbc_user=>\"root\", jdbc_password=><password>, schedule=>\"* * * * *\", statement=>\"SELECT * FROM department\", id=>\"02efb29127a061192768f52b1a09e6638672c521b53edda48b19f6b9880e90d6\", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>\"plain_e50f7aa8-3c57-4f33-9000-86265d442ec7\", enable_metric=>true, charset=>\"UTF-8\">, jdbc_paging_enabled=>false, jdbc_page_size=>100000, jdbc_validate_connection=>false, jdbc_validation_timeout=>3600, jdbc_pool_timeout=>5, sql_log_level=>\"info\", connection_retry_attempts=>1, connection_retry_attempts_wait_time=>0.5, last_run_metadata_path=>\"C:\\\\Users\\\\lenovo/.logstash_jdbc_last_run\", use_column_value=>false, tracking_column_type=>\"numeric\", clean_run=>false, record_last_run=>true, lowercase_column_names=>true>", :error=>"can't dup Fixnum", :thread=>"#<Thread:0x703b95e6 run>"}
[2020-08-20T02:27:41,426][ERROR][logstash.pipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: can't dup Fixnum>, :backtrace=>["org/jruby/RubyKernel.java:1882:in `dup'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:838:in `_parse'", "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:1830:in `parse'", "D:/soft/logstash-6.3.2/vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.9/lib/logstash/plugin_mixins/value_tracking.rb:87:in `set_value'", "D:/soft/logstash-6.3.2/vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.9/lib/logstash/plugin_mixins/value_tracking.rb:36:in `initialize'", "D:/soft/logstash-6.3.2/vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.9/lib/logstash/plugin_mixins/value_tracking.rb:29:in `build_last_value_tracker'", "D:/soft/logstash-6.3.2/vendor/bundle/jruby/2.3.0/gems/logstash-input-jdbc-4.3.9/lib/logstash/inputs/jdbc.rb:216:in `register'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:340:in `register_plugin'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:351:in `block in register_plugins'", "org/jruby/RubyArray.java:1734:in `each'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:351:in `register_plugins'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:498:in `start_inputs'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:392:in `start_workers'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:288:in `run'", "D:/soft/logstash-6.3.2/logstash-core/lib/logstash/pipeline.rb:248:in `block in start'"], :thread=>"#<Thread:0x703b95e6 run>"}
[2020-08-20T02:27:41,467][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>,
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack