iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >通过logstash实现mysql与es的双向数据同步
  • 844
分享到

通过logstash实现mysql与es的双向数据同步

logstashmysqleselasticSearchlinux 2023-09-11 09:09:23 844人浏览 安东尼
摘要

参考题目 一种基于Mysql和elasticsearch的数据同步方法及系统基于mysql和Elasticsearch的数据同步方法一种基于Mysql和Elasticsearch的数据同步系统基于MySQL和Elasticsearch

参考题目

  1. 一种基于Mysqlelasticsearch的数据同步方法及系统
  2. 基于mysql和Elasticsearch的数据同步方法
  3. 一种基于Mysql和Elasticsearch的数据同步系统
  4. 基于MySQL和Elasticsearch的数据同步技术

目录

1【理论调研】

方案1:使用Logstash实现数据同步

方案2:使用Canal实现数据同步

方案3:使用Debezium实现数据同步

使用其他工具

2【使用Logstash实现MySQL和ES之间的双向数据同步】

2.0【MySQL测试数据库sql导入代码】

2.1【Logstash实现MySQL数据同步至ES】

2.2【Logstash实现ES数据同步至MySQL】

2.2.1【Bug记录】

2.2.2【参考文章】


1【理论调研】

实现MySQL和ES的双向数据同步,可以考虑使用以下几种解决方案:

实现MySQL和Elasticsearch(ES)之间的双向数据同步,需要使用一些工具和技术。以下是一些可能的方法:

方案1:使用Logstash实现数据同步

Logstash是一种流处理工具,可以从不同的来源获取数据并将其转换为指定格式输出到目标存储中,它支持从MySQL数据库读取数据,并将数据写入ES中,也可以从ES中读取数据并将数据写入MySQL数据库中。使用Logstash实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Logstash;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Logstash的输入和输出插件,从MySQL中读取数据并写入ES中,同时从ES中读取数据并写入MySQL中;
  4. 启动Logstash并监控同步过程。

Logstash是一个流处理引擎,可以轻松地将数据从MySQL和ES之间传输。使用Logstash,您可以轻松地将MySQL表的数据导入到ES中,也可以将ES中的数据写回MySQL表中。您可以使用以下配置文件将数据从MySQL同步到ES:

input {  jdbc {    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"    jdbc_user => "myuser"    jdbc_password => "mypassword"    jdbc_driver_library => "/path/to/mysql-connector-java.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    statement => "SELECT * FROM mytable"  }}output {  elasticsearch {    hosts => ["localhost:9200"]    index => "myindex"    document_id => "%{id}"  }}

这将从MySQL的“mytable”表中选择所有行,并将它们写入名为“myindex”的ES索引中。

如果您想将ES中的数据写回MySQL表中,您可以使用类似以下的配置文件:

input {  elasticsearch {    hosts => ["localhost:9200"]    index => "myindex"    query => '{"query": {"match_all": {}}}'    scroll => "5m"    docinfo => true  }}output {  jdbc {    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"    jdbc_user => "myuser"    jdbc_password => "mypassword"    jdbc_driver_library => "/path/to/mysql-connector-java.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    statement => "UPDATE mytable SET myfield = ? WHERE id = ?"    prepared_statement_bind_values => ["%{myfield}", "%{[@metadata][_id]}"]  }}

这将从名为“myindex”的ES索引中选择所有文档,并将它们写回名为“mytable”的MySQL表中。

方案2:使用Canal实现数据同步

Canal是阿里巴巴开源的一款基于数据库增量日志解析,提供增量数据订阅和消费的组件,它支持从MySQL中读取增量数据,并将数据写入ES中,同时支持从ES中读取数据并将数据写入MySQL中。使用Canal实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Canal;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Canal的实例,包括MySQL的binlog信息、ES的索引信息等;
  4. 启动Canal并监控同步过程。

需要注意的是,在使用Logstash或Canal进行数据同步时,可能会出现数据类型不匹配、数据格式错误、数据丢失等问题,需要根据具体情况进行调整和优化。同时,为了确保数据同步的实时性和准确性,可以考虑增加监控和告警机制。

方案3:使用Debezium实现数据同步

Debezium是一个开源的分布式平台,可在数据源和目标之间实现实时数据流。它支持MySQL和ES之间的数据同步,并支持双向同步。使用Debezium,您可以在MySQL和ES之间实时同步数据更改。您可以按照以下步骤使用Debezium进行双向数据同步:

  • 下载并安装Debezium
  • 配置Debezium以监视MySQL表的更改
  • 配置Debezium以将更改写入ES
  • 配置Debezium以监视ES的更改
  • 配置Debezium以将更改写回MySQL

使用其他工具

除了Logstash和Debezium之外,还有一些其他工具可用于MySQL和ES之间的数据同步。例如,您可以使用StreamSets Data Collector或Apache Nifi来将数据从MySQL导入到ES,并将数据从ES写回MySQL。您还可以编写自己的脚本来执行此操作。无论您选择哪种方法,确保您的同步逻辑能够处理。

2【使用Logstash实现MySQL和ES之间的双向数据同步】

软件版本:

  1. logstash -f ../config/newsManager/mysql2es.conf
  2. logstash -f ../config/newsManager/es2mysql.conf

2.0【MySQL测试数据库sql导入代码】

  1. MySQL数据库名称:news_manager
  2. MySQL数据库版本:5.5.40
;;;;;;CREATE DATABASE `news_manager` ;USE `news_manager`;DROP TABLE IF EXISTS `item_user`;CREATE TABLE `item_user` (  `item_user_id` int(11) NOT NULL AUTO_INCREMENT,  `user_id` int(11) DEFAULT NULL,  `item_id` int(11) DEFAULT NULL,  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',  PRIMARY KEY (`item_user_id`),  KEY `FK_Reference_2` (`user_id`),  KEY `FK_Reference_3` (`item_id`),  CONSTRaiNT `FK_Reference_2` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`),  CONSTRAINT `FK_Reference_3` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;insert  into `item_user`(`item_user_id`,`user_id`,`item_id`,`create_time`,`update_time`,`status`) values (1,1,2,'2020-11-23 11:24:16','2020-11-25 10:27:54',1),(2,2,4,NULL,'2020-11-25 09:38:17',1),(3,1,1,'2020-11-24 09:19:58','2020-11-25 09:38:21',1),(5,1,18,NULL,'2020-11-25 09:44:16',1),(6,1,27,'2020-11-25 11:11:35','2020-11-25 11:11:35',1),(7,1,28,'2020-11-25 11:17:59','2020-11-25 11:17:59',1),(8,1,29,'2020-11-25 11:29:14','2020-11-25 11:29:14',1),(9,1,30,'2020-11-25 11:30:54','2020-11-25 11:30:54',1),(10,1,31,'2020-11-25 11:36:51','2020-11-25 11:36:51',1),(11,1,32,'2020-11-25 16:26:23','2020-11-25 16:26:23',1),(12,1,33,'2020-11-25 16:26:37','2020-11-25 16:26:37',1),(13,1,34,'2020-11-26 10:01:29','2020-11-26 10:01:29',1),(14,1,35,'2020-11-26 10:28:53','2020-11-26 10:28:53',1);DROP TABLE IF EXISTS `logs_info`;CREATE TABLE `logs_info` (  `logs_id` int(11) NOT NULL AUTO_INCREMENT,  `user_id` int(11) DEFAULT NULL,  `logs_content` text COLLATE utf8mb4_hungarian_ci,  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',  PRIMARY KEY (`logs_id`),  KEY `FK_Reference_1` (`user_id`),  CONSTRAINT `FK_Reference_1` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`)) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;insert  into `logs_info`(`logs_id`,`user_id`,`logs_content`,`create_time`,`update_time`) values (1,1,NULL,NULL,'2020-11-24 09:27:05'),(2,2,NULL,NULL,'2020-11-24 09:27:12'),(3,4,NULL,NULL,'2020-11-23 11:29:06'),(14,1,'woshishenren','2020-11-24 09:24:52','2020-11-24 09:24:52'),(15,1,'woshishenren','2020-11-24 09:25:58','2020-11-24 09:25:58');DROP TABLE IF EXISTS `news_info`;CREATE TABLE `news_info` (  `new_id` int(11) NOT NULL AUTO_INCREMENT,  `item_id` int(11) DEFAULT NULL,  `news_title` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,  `news_image` varchar(255) COLLATE utf8mb4_hungarian_ci DEFAULT NULL,  `news_content` text COLLATE utf8mb4_hungarian_ci,  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',  PRIMARY KEY (`new_id`),  KEY `FK_Reference_4` (`item_id`),  CONSTRAINT `FK_Reference_4` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;insert  into `news_info`(`new_id`,`item_id`,`news_title`,`news_image`,`news_content`,`create_time`,`update_time`) values (1,2,'蓝桥杯比赛',NULL,NULL,NULL,'2020-11-23 09:27:39'),(2,3,'新学期学费',NULL,NULL,NULL,'2020-11-23 09:28:10'),(3,1,'拔河比赛',NULL,'拔河比赛要使劲!!!','2020-11-25 14:57:28','2020-11-25 14:57:32'),(4,18,'街舞比赛',NULL,'一起摇摆~','2020-11-25 15:54:09','2020-11-25 15:54:11'),(10,27,'数学建模',NULL,'一起加油!','2020-11-25 16:10:02','2020-11-25 22:17:19'),(11,29,'班班唱',NULL,'《走向复兴》','2020-11-25 16:12:23','2020-11-25 16:12:23'),(12,1,'篮球比赛',NULL,'冲冲冲~','2020-11-25 16:13:04','2020-11-25 16:13:04'),(13,1,'NECCS',NULL,'冲呀~','2020-11-25 16:27:22','2020-11-26 08:38:03'),(14,18,'卓见杯',NULL,'啦啦啦~','2020-11-25 17:41:32','2020-11-25 22:17:56'),(15,33,'动则升阳',NULL,'年轻不养生,年老养医生!','2020-11-26 00:12:42','2020-11-26 00:12:42'),(16,33,'11月26日',NULL,'筑基修士','2020-11-26 10:02:20','2020-11-26 10:02:20'),(17,35,'大家好',NULL,'333','2020-11-26 10:29:35','2020-11-26 10:29:35'),(18,35,'大家好!!!',NULL,'333','2020-11-26 10:29:45','2020-11-26 10:29:45'),(19,35,'我是新增数据!',NULL,'我是新增数据!','2023-03-15 16:43:01','2023-03-15 16:43:02');DROP TABLE IF EXISTS `news_item`;CREATE TABLE `news_item` (  `item_id` int(11) NOT NULL AUTO_INCREMENT,  `item_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',  PRIMARY KEY (`item_id`)) ENGINE=InnoDB AUTO_INCREMENT=36 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;insert  into `news_item`(`item_id`,`item_name`,`create_time`,`update_time`,`status`) values (1,'呵呵哒','2020-11-24 15:47:00','2020-11-26 10:28:39',1),(2,'党支部','2020-11-24 15:47:03','2020-11-25 14:44:31',0),(3,'分团委','2020-11-24 15:47:05','2020-11-25 14:43:54',1),(4,'院团委','2020-11-24 15:47:08','2020-11-25 14:44:38',1),(5,'111','2020-11-23 15:22:54','2020-11-25 14:45:55',1),(6,'学生会','2020-11-24 09:27:36','2020-11-25 14:46:01',1),(8,'党支部','2020-11-24 13:51:13','2020-11-25 14:46:07',1),(18,'党支部','2020-11-25 09:11:51','2020-11-25 15:49:06',1),(19,'院团委','2020-11-25 10:42:54','2020-11-25 14:46:16',1),(20,'111','2020-11-25 10:54:12','2020-11-25 14:46:19',1),(21,'学生会','2020-11-25 10:56:21','2020-11-25 14:46:35',1),(22,'党支部','2020-11-25 10:57:35','2020-11-25 14:46:43',1),(23,'分团委','2020-11-25 11:00:20','2020-11-25 14:46:48',1),(24,'院团委','2020-11-25 11:00:47','2020-11-25 14:46:55',1),(25,'qweqwe','2020-11-25 11:01:37','2020-11-25 11:01:37',1),(26,'eqweqweqwe','2020-11-25 11:01:53','2020-11-25 11:01:53',1),(27,'分团委','2020-11-25 11:11:35','2020-11-25 15:49:18',1),(28,'sadsads','2020-11-25 11:17:59','2020-11-25 11:18:40',0),(29,'院团委','2020-11-25 11:29:13','2020-11-25 15:49:25',1),(30,'789','2020-11-25 11:30:54','2020-11-25 11:37:19',0),(31,'zyk','2020-11-25 11:36:51','2020-11-25 11:37:19',0),(32,'委员会','2020-11-25 16:26:23','2020-11-26 08:37:48',0),(33,'委员会~~~','2020-11-25 16:26:37','2020-11-26 10:01:40',1),(34,'演示~','2020-11-26 10:01:29','2020-11-26 10:01:33',0),(35,'筑基修士!!!修改!','2020-11-26 10:28:53','2023-03-15 16:44:39',1);DROP TABLE IF EXISTS `user_info`;CREATE TABLE `user_info` (  `user_id` int(11) NOT NULL AUTO_INCREMENT,  `user_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,  `user_pwd` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',  PRIMARY KEY (`user_id`)) ENGINE=InnoDB AUTO_INCREMENT=121 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;insert  into `user_info`(`user_id`,`user_name`,`user_pwd`,`create_time`,`update_time`,`status`) values (1,'宋书航','1','2020-11-23 09:30:16','2020-11-25 22:16:25',1),(2,'雨柔子','1','2020-11-23 11:25:41','2020-11-25 22:16:25',1),(4,'王五','1','2020-11-23 11:25:58','2020-11-25 22:16:26',0),(5,'赵柳','1','2020-11-23 11:26:12','2020-11-25 22:16:26',0),(8,'田七','1','2020-11-23 11:26:29','2020-11-25 22:16:27',0),(9,'田七','1','2020-11-23 15:03:23','2020-11-25 22:16:28',0),(10,'田七','1','2020-11-23 15:03:43','2020-11-25 22:16:28',0),(11,'戴沐白','1','2020-11-24 10:45:06','2020-11-25 22:16:29',1),(12,'张小凡','1','2020-11-24 10:45:29','2020-11-25 22:16:29',1),(13,'userName2','1','2020-11-24 10:45:29','2020-11-25 22:16:30',0),(15,'碧瑶','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(16,'赵恋凡','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(17,'李长寿','1','2020-11-24 10:45:29','2020-11-25 22:16:32',1),(18,'蓝梦娥','1','2020-11-24 10:45:29','2020-11-25 22:16:33',1),(22,'路明非','123456','2020-11-24 10:45:29','2020-11-25 17:44:31',1),(23,'楚子航','123456','2020-11-24 10:45:29','2020-11-25 22:14:26',1),(33,'乔微尼','123456','2020-11-24 10:45:29','2020-11-25 23:05:48',1),(97,'userName86','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(98,'userName87','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(99,'userName88','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(100,'userName89','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(101,'2020年好运来~','123456','2020-11-24 10:45:32','2020-11-26 10:01:06',1),(102,'333','123456','2020-11-24 10:45:32','2020-11-26 10:28:14',1),(103,'666','888','2020-11-24 10:45:32','2020-11-26 10:28:26',1),(104,'userName93','123456','2020-11-24 10:45:32','2020-11-26 10:00:54',0),(105,'userName94','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(106,'userName95','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(107,'userName96','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(108,'userName97','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(109,'userName98','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(110,'userName99','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(111,'userName100','123456','2020-11-24 10:45:33','2020-11-25 10:12:58',0),(115,'萧潜','1','2020-11-25 23:00:16','2020-11-25 23:00:16',1),(116,'演示视频','haha','2020-11-26 10:00:33','2020-11-26 10:27:37',0),(117,'啦啦啦','1','2020-11-26 10:27:14','2020-11-26 10:27:37',0),(118,'演示视频','222','2020-11-26 10:27:23','2020-11-26 10:27:37',0),(119,'实训小组hyy','111','2020-11-26 14:37:14','2020-11-26 14:37:14',1),(120,'我是新增数据!修改!','1111','2023-04-18 20:38:41','2023-04-18 20:48:13',1);;;;;

2.1【Logstash实现MySQL数据同步至ES】

先启动es,在启动logstash。

大数据周会-本周学习内容总结06【Linux启动ELK步骤】

input {stdin {    }    jdbc { # 01# 配置MySQL数据库链接,变量为数据库名jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"# 配置MySQL数据库用户名和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动jar包存放位置jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"# MySQL驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql,文件路径+名称:statement_filepath# statement_filepath => ""# 要执行的sql语句statement => "select * from item_user"# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型type => "item_user"}    jdbc { # 02# 配置MySQL数据库链接,变量为数据库名jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"# 配置MySQL数据库用户名和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动jar包存放位置jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"# MySQL驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql,文件路径+名称:statement_filepath# statement_filepath => ""# 要执行的sql语句statement => "select * from logs_info"# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型type => "logs_info"}    jdbc { # 03# 配置MySQL数据库链接,变量为数据库名jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"# 配置MySQL数据库用户名和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动jar包存放位置jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"# MySQL驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql,文件路径+名称:statement_filepath# statement_filepath => ""# 要执行的sql语句statement => "select * from news_item"# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型type => "news_item"}    jdbc { # 04# 配置MySQL数据库链接,变量为数据库名jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"# 配置MySQL数据库用户名和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动jar包存放位置jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"# MySQL驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql,文件路径+名称:statement_filepath# statement_filepath => ""# 要执行的sql语句statement => "select * from news_info"# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型type => "news_info"}    jdbc { # 05# 配置MySQL数据库链接,变量为数据库名jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"# 配置MySQL数据库用户名和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动jar包存放位置jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"# MySQL驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql,文件路径+名称:statement_filepath# statement_filepath => ""# 要执行的sql语句statement => "select * from user_info"# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型type => "user_info"}}filter {    json {        source => "message"        remove_field => ["message"]    }}output {if[type] == "item_user" { # 01elasticsearch {hosts => ["1.2.3.4:9200"]index => "test_item_user"# document_id => "%{id}"}}if[type] == "logs_info" { # 02elasticsearch {hosts => ["1.2.3.4:9200"]index => "test_logs_info"# document_id => "%{id}"}}if[type] == "news_item" { # 03elasticsearch {hosts => ["1.2.3.4:9200"]index => "test_news_item"# document_id => "%{id}"}}if[type] == "news_info" { # 04elasticsearch {hosts => ["1.2.3.4:9200"]index => "test_news_info"# document_id => "%{id}"}}if[type] == "user_info" { # 05elasticsearch {hosts => ["1.2.3.4:9200"]index => "test_user_info"# document_id => "%{id}"}}    stdout {        codec => JSON_lines    }}

2.2【Logstash实现ES数据同步至MySQL】

  1. logstash-plugin install --no-verify logstash-output-jdbc   # Logstash安装插件logstash-output-jdbc
  2. logstash-plugin list   # 查看Logstash已安装的插件

【es与mysql双向同步-通过logstash将es同步至mysql】功能已实现,但是只进行了简单测试。问题包括但不限于:中文乱码、时间戳字段插入错误等。

input {elasticsearch {hosts => ["hadoop100:9200"]index => "test_user_info"query => '{ "query": { "match_all": {} } }'schedule => "* * * * *"}}output {jdbc {driver_jar_path => "/opt/jar/mysql-connector-java-5.1.31.jar"driver_class => "com.mysql.jdbc.Driver"# user => "root"# password => "root"# "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/douyin?autoReconnect=true&user=xxxx@xxxx&password=xxxxx"connection_string => "jdbc:mysql://1.2.3.4:3306/school_matriculate?autoReconnect=true&user=root&password=root&useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"# statement => ["insert into tb_videos(md5, Id, view, timestamp) values(?,?,?,?)","[md5]", "[Id]", "[view]", "[timestamp]"]# statement => ["INSERT INTO user_info (user_name, user_pwd, create_time, update_time, status) VALUES (?, ?, ?, ?, ?)", "[user_name]", "[user_pwd]", "[create_time]", "[update_time]", "[status]"]statement => ["INSERT INTO user_info (user_name, user_pwd) VALUES (?, ?)", "[user_name]", "[user_pwd]"]}}

2.2.1【Bug记录】

Unknown setting 'jdbc_user' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'prepared_statement_bind_values' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_passWord' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_library' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_connection_string' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_class' for jdbc

[2023-04-19T20:27:23,669][ERROR][logstash.agent           ] Failed to execute action

java.sql.SQLException: Access denied for user ''@'upward' (using password: NO)

        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1094) ~[mysql-connector-java-5.1.31.jar:?]

[2023-04-19T21:48:53,751][ERROR][com.zaxxer.hikari.pool.HikariPool][main] HikariPool-1 - Exception during pool initialization.

com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.

2.2.2【参考文章】

  1. Java:Logstash如何安装插件logstash-output-jdbc_netyeaxi的博客-CSDN博客
  2. logstash的logstash-output-jdbc插件安装_logstash output jdbc_&捕风的汉子&的博客-CSDN博客
  3. logstash-output-jdbc使用
  4. https://GitHub.com/theangryangel/logstash-output-jdbc

  5. Https://www.elastic.co/guide/en/logstash/current/index.html

来源地址:https://blog.csdn.net/weixin_44949135/article/details/130215181

您可能感兴趣的文档:

--结束END--

本文标题: 通过logstash实现mysql与es的双向数据同步

本文链接: https://www.lsjlt.com/news/402891.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • sql怎么查看表的索引
    通过查询系统表,可以获取表的索引信息,包括索引名称、是否唯一、索引类型、索引列和行数。常用系统表有:mysql 的 information_schema.statistics、postg...
    99+
    2024-05-14
    mysql oracle
  • sql怎么查看索引
    您可以使用 sql 通过以下方法查看索引:show indexes 语句:显示表中定义的索引列表及其信息。explain 语句:显示查询计划,其中包含用于执行查询的索引。informat...
    99+
    2024-05-14
  • sql怎么查看存储过程
    如何查看 sql 存储过程的源代码:使用 show create procedure 语句直接获取创建脚本。查询 information_schema.routines 表的 routi...
    99+
    2024-05-14
  • sql怎么查看视图表
    要查看视图表,可以使用以下步骤:使用 select 语句获取视图中的数据。使用 desc 语句查看视图的架构。使用 explain 语句分析视图的执行计划。使用 dbms 提供...
    99+
    2024-05-14
    oracle python
  • sql怎么查看创建的视图
    可以通过sql查询查看已创建的视图,具体步骤包括:连接到数据库并执行查询select * from information_schema.views;查询结果将显示视图的名称、...
    99+
    2024-05-14
    mysql
  • sql怎么用循环语句实现查询
    可以通过 do 和 while 语句创建循环,并在循环内执行查询,详细步骤包括:定义循环变量设置循环初始值循环执行查询更新循环变量执行查询循环退出条件 SQL 中使用循环语句实现查询 ...
    99+
    2024-05-14
  • sql怎么用代码修改表中数据
    通过 sql 代码修改表中数据的方法包括:修改单个记录:使用 update 语句设置列值并指定条件。修改多条记录:在 update 语句中指定多个条件来修改满足条件的所有记录。增加新列:...
    99+
    2024-05-14
  • sql怎么用命令创建数据库
    在 sql 中使用 create database 命令创建新数据库,其语法包含以下步骤:指定数据库名称。指定数据库文件和日志文件的位置(可选)。指定数据库大小、最大大小和文件增长(可选...
    99+
    2024-05-14
  • sql怎么用身份证提取年龄
    sql 中提取身份证号码中的年龄的方法:提取出生日期部分(身份证号码中第 7-14 位);使用 to_date 函数转换为日期格式;使用 extract 函数计算与当前日期之间的年差。 ...
    99+
    2024-05-14
  • sql怎么看字段长度
    有两种方法可查看 sql 中的字段长度:使用 information_schema 架构,其中包含元数据信息,可用于查询字段长度。使用内建函数,如 length(),其适用于字符串数据类...
    99+
    2024-05-14
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作