logstash 支持从一个 ES 集群中读取数据然后写入到另一个 ES 集群,因此可以使用 logstash 进行数据迁移,使用 logstash 进行迁移前,需要注意以下几点:
需要在和云上的 ES 集群相同的 VPC 下创建 服务器,部署 logstash,同时保证该 服务器能够访问到源 ES 集群。
用于部署 logstash 的服务器最好选择比较高的配置
logstash 应该和目标 ES 集群的主版本号相同,例如目标 ES 集群为6.8.2版本,则 logstash 也需要使用6.8版本。
需要特别注意索引 type 的问题,因为 ES 的不同版本对索引 type 的约束不同,跨大版本迁移 ES 集群时可能出现因为索引的 type 而导致写入目标集群失败等的问题。
一个常用的使用 logstash 进行跨集群数据迁移的配置文件如下:
input {
elasticsearch {
hosts => "1.1.1.1:9200"
index => "*"
docinfo => true
size => 5000
scroll => "5m"
}
}
output {
elasticsearch {
hosts => ["[http://2.2.2.2:9200]"]
user => "elastic"
password => "your_password"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
上述配置文件将源 ES 集群的所有索引同步到目标集群中,同时也可以设置只同步指定的索引,利用 logstash 进行迁移的更多功能可查阅 logstash-input-elasticsearch 和 logstash-output-elasticsearch
input {
elasticsearch {
hosts => "1.1.1.1:9200"
index => "es-runlog-2019.11.20"
#查询这个索引前5分钟的5000条数据
query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
size => 5000
scroll => "5m"
docinfo => true
schedule => "* * * * *" #定时任务,每分钟执行一次
}
}
filter {
mutate {
remove_field => ["source", "@version"]
}
}
output {
elasticsearch {
hosts => ["http://2.2.2.2:9200"]
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
pipeline => "timezone-pipeline"
}
}
每分钟执行一次,从源集群中拉取5分钟前到当前分钟的所有数据,同步到新的集群中;因为查询的粒度为分钟,所以每次执行定时任务查询时会有一部分重叠的数据,所以需要在output中配置document_id参数避免重复写入到新集群中。
实施过程中遇到的问题有:
a.用于运行logstash的机器的规格要比较大,因为logstash比较消耗内存和cpu,机器性能不够,很可能出现数据同步延迟增大
b.可以通过比较新旧集群当天的索引每分钟doc数据量,判断同步的延迟情况,如果延迟较大,可以通过调整logstash配置或者使用更大的机器运行logstash确保同步过程顺利进行
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack