使用 Logstash 将 MySQL 数据同步到 Elasticsearch(版本6.5.1)

学习笔记 马富天 2020-02-12 13:34:24 42 0

【摘要】本文记录一下使用 elasticsearch 官方插件 logstash 实现将 mysql 数据同步到 es 中的具体步骤。

首先说明一下使用 logstash 进行数据同步的优缺点:

优点:

(1)无需开发,只需要安装和配置 logstash 即可;

(2)只要是通过 SQL 查询的数据 logstash 均可以实现数据同步;

(3)每次同步的时候,支持全量同步,也支持按照特定字段(例如递增ID,修改时间)实现增量同步;

(4)自动同步的频率可控,最快同步频率每分钟一次;

(5)作为 ES 官网的固有插件,版本更新迭代快,相对稳定,使用简单。

缺点:

(1)不能实现同步删除操作,即 MySQL 数据物理删除后,ES 中数据仍存在。

本文以 Elasticsearch 6.5.1 版本为例,记录一下使用 MySQL 数据同步的实例。

官方文档:

https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

部署环境

操作系统:windows 10

elasticsearch 版本:elasticsearch-6.5.1

logstash 版本:logstash-6.5.1(必须与 es 版本一致)

java 版本:Logstash requires Java 8 or Java 11(安装后添加 JAVA_HOME 环境变量)

logstash 连接 mysql 的 jar 包:mysql-connector-java-5.1.7-bin.jar

软件下载地址:

elasticsearch:https://www.elastic.co/cn/downloads/past-releases#elasticsearch

logstash:https://www.elastic.co/cn/downloads/past-releases#logstash

java:https://www.java.com/zh_CN/download/win10.jsp

mysql-connector-java 驱动包:http://www.java2s.com/Code/Jar/c/Downloadcloudmysqlconnectorjava517binjar.htm

mysql 数据准备

  1. CREATE TABLE `class` (
  2.   `class_id` int(11) NOT NULL AUTO_INCREMENT,
  3.   `class_name` varchar(32) NOT NULL COMMENT '用户名称',
  4.   PRIMARY KEY (`class_id`)
  5. ) ENGINE=InnoDB AUTO_INCREMENT=35 DEFAULT CHARSET=utf8;
  6. INSERT INTO `class` VALUES ('1', '数据结构与算法');
  7. INSERT INTO `class` VALUES ('2', 'Java');
  8. INSERT INTO `class` VALUES ('3', 'elasticsearch');
  9. CREATE TABLE `student` (
  10.   `id` int(11) NOT NULL AUTO_INCREMENT,
  11.   `class_id` char(1) DEFAULT NULL COMMENT '性别',
  12.   `username` varchar(32) NOT NULL COMMENT '用户名称',
  13.   `birthday` date DEFAULT NULL COMMENT '生日',
  14.   `address` varchar(256) DEFAULT NULL COMMENT '地址',
  15.   PRIMARY KEY (`id`)
  16. ) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8;
  17. INSERT INTO `student` VALUES ('1', '2', '王五', '2018-08-28', '上海');
  18. INSERT INTO `student` VALUES ('10', '1', '张三', '2014-07-10', '北京市');
  19. INSERT INTO `student` VALUES ('16', '1', '张小明', '2018-08-16', '河南郑州');
  20. INSERT INTO `student` VALUES ('22', '1', '陈小明', '2018-08-17', '河南郑州');
  21. INSERT INTO `student` VALUES ('24', '1', '张三丰', '2018-08-26', '河南郑州');
  22. INSERT INTO `student` VALUES ('25', '1', '陈小明', '2018-08-14', '河南郑州');
  23. INSERT INTO `student` VALUES ('26', '1', '王五', '2018-08-28', '北京');
  24. INSERT INTO `student` VALUES ('30', '1', '马小跳', '2018-08-28', '西安');
  25. INSERT INTO `student` VALUES ('33', '1', '马富天', '2020-02-11', '福建龙岩');

配置步骤

这里只需要配置 logstash 即可

(1)解压后,进入 bin 目录,并在其中新建目录 bin,将 mysql-connector-java-5.1.7-bin.jar 放进去;

请输入图片名称

(2)在 logstash-6.5.1/bin 目录下,创建配置文件 logstash.conf 和 sql 执行文件:jdbc.sql

请输入图片名称

jdbc.sql 文件中写入需要同步的 sql 语句:

  1. select id,username,class_name,birthday,address from student left join class using(class_id)

logstash.conf 文件内容:

  1. input {
  2.     stdin {
  3.     
  4.     }
  5.     jdbc {
  6.         # mysql 相关的 jdbc 配置,jy 是连接数据库名称
  7.         jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/jy?useUnicode=true&characterEncoding=utf-8&useSSL=false"
  8.         jdbc_user => "root"
  9.         jdbc_password => "root"
  10.         # mysql 连接包位置,注意路劲中不能包含中文
  11.         # 可以是绝对路径,也可以是相对路径
  12.         jdbc_driver_library => "bin/mysql-connector-java-5.1.7-bin.jar"
  13.         jdbc_driver_class => "com.mysql.jdbc.Driver"
  14.         jdbc_paging_enabled => "true"
  15.         jdbc_page_size => "50000"
  16.         
  17.         jdbc_default_timezone =>"Asia/Shanghai"
  18.         # 执行的 sql 文件
  19.         statement_filepath => "jdbc.sql"
  20.         # mysql文件, 也可以直接写SQL语句在此处,如下:
  21.         # statement => "select * from student;"
  22.         # 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,
  23.         # 全部为*默认含义为每分钟都更新
  24.         schedule => "* * * * *"
  25.         # 是否将字段名称转小写
  26.         lowercase_column_names => false
  27.     }
  28. }
  29. output {
  30.     elasticsearch {
  31.         # ES的IP地址及端口
  32.         hosts => ["localhost:9200"]
  33.         # 索引名称 可自定义
  34.         index => "test"
  35.         # 需要关联的数据库中有有一个id字段,对应类型中的id
  36.         document_id => "%{id}"
  37.         document_type => "test"
  38.     }
  39.     # 这里输出调试,正式运行时可以注释掉
  40.     stdout {
  41.         # JSON格式输出
  42.         codec => json_lines
  43.     }
  44. }

(3)执行 logstash,在命令行中进入 bin ,执行命令:logstash -f logstash.conf

请输入图片名称

执行成功如下图所示:

请输入图片名称请输入图片名称

执行后,每一分钟同步一次。

(4)执行命令可能会有错误提示:Unrecognized VM option 'UseParNewGC'

解决措施:注释掉 logstash-6.5.1/config/jvm.options 配置文件中:

# -XX:+UseParNewGC

请输入图片名称

以上就是完整的针对 logstash 同步 mysql 数据到 es 中的流程,最核心的部分就是 logstash.conf 配置文件的设置,上述例子中只是对单条 sql 进行同步,同时也就是只能同步单张表,下篇文章对多张表的同步进行相关简介。

另外是时区问题,会发现写入的时区不一样,这个问题之后再讨论。

最后小提示:logstash 的配置文件编码必须是 UTF-8 格式,不然会报错。

版权归 马富天个人博客 所有

本文标题:《使用 Logstash 将 MySQL 数据同步到 Elasticsearch(版本6.5.1)》

本文链接地址:http://www.mafutian.com/438.html

转载请务必注明出处,小生将不胜感激,谢谢! 喜欢本文或觉得本文对您有帮助,请分享给您的朋友 ^_^

0

0

上一篇《 MySQL 索引在 in 语句中失效的情况 》 下一篇《 logstash 同步 mysql 多张表到 es 》

暂无评论

评论审核未开启
表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情 表情
验证码

TOP10

  • 浏览最多
  • 评论最多