iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >全量同步多张db表到ES同一个索引
  • 523
分享到

全量同步多张db表到ES同一个索引

摘要

一、演示场景: 演示的场景主要是解决MySQL多张业务大表进行多表join查询效率低下的问题。通过把MySQL的多张大表的数据同步到同一个ES索引中。(也就是有多表字段合并到es一张宽表来解决MySQL多表join效率低下的问题) 1.1、

全量同步多张db表到ES同一个索引[数据库教程]

一、演示场景:

演示的场景主要是解决MySQL多张业务大表进行多表join查询效率低下的问题。
通过把MySQL的多张大表的数据同步到同一个ES索引中。(也就是有多表字段合并到es一张宽表来解决MySQL多表join效率低下的问题)

1.1、演示环境

自建Mysql服务5.7.22

ES单实例版本6.2.4

服务器python环境2.7.5

部署同步服务mysqlmom程序

具体安装部署此处忽略。有需要可以查看本博客找

二、Mysqlmom具体配置文件

[[email protected] ~]# cat /data1/soft/mysqlsmom01/test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTioN = {
    ‘host‘: ‘172.16.0.197‘,
    ‘port‘: 3306,
    ‘user‘: ‘click_rep‘,
    ‘passwd‘: ‘Jwtest123456‘
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 50000
# 修改elasticsearch节点
#nodeS = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
# 同步stdb01.test03到es:
    {
        "stream": {
            "database": "test_db",  # 在此数据库执行sql语句
            "sql": "select * from test01",  # 将该sql语句选中的数据同步到 elasticsearch
            # "pk": {"field": "id", "type": "char"}  # 当主键id的类型是字符串时
        },
        "jobs": [
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    {"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
                    {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test01_company_index",   # 设置 index
                        "type": "test01",          # 设置 type
                        "nodes": NODES
                    }
                }
            }
        ]
    },
    {
        "stream": {
            "database": "test_db",  # 在此数据库执行sql语句
            "sql": "select * from company_staff",  # 将该sql语句选中的数据同步到 elasticsearch
            # "pk": {"field": "id", "type": "char"}  # 当主键id的类型是字符串时
        },
        "jobs": [
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    {"only_fields": {"fields": ["id", "company_name", "company_staff", "channel", "url"]}}, # 只同步 id 和 username字段
                    {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test01_company_index",   # 设置 index
                        "type": "test01",          # 设置 type
                        "nodes": NODES
                    }
                }
            }
        ]
    }

]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"

?

三、mysql测试表建表语句,表数据以及同步程序启动

mysql测试表建表语句和表数据如下:

[email protected] 16:16:  [test_db]> show create table company_staffG
*************************** 1. row ***************************
       Table: company_staff
Create Table: CREATE TABLE `company_staff` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ‘id‘,
  `company_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘公司名‘,
  `company_staff` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘人员规模‘,
  `channel` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘来源‘,
  `url` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘url‘,
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间‘,
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ‘更新时间‘,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT=‘namelist人员规模表‘
1 row in set (0.00 sec)

[email protected] 16:17:  [test_db]> select * from company_staff;
+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+
| id | company_name                                                                | company_staff | channel     | url                                     | create_time         | update_time         |
+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+
|  1 | 永兴东润(中国)服饰有限公司北京海淀第四儿童服饰店                          | liepin        | 100-499人   | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |
|  2 | 东(中国)服饰有限公司北京海淀第四儿童服饰店                                 | liepin        | 100-499人   | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |
|  3 | 永兴东润(中国)服饰有限公司北京海淀第四儿童服饰店                           | liepin        | 100-499人   | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |
|  4 | 润(中国)                                                                   | liepin        | 100-499人   | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |
+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+
4 rows in set (0.00 sec)

[email protected] 16:17:  [test_db]> show create table test01G
*************************** 1. row ***************************
       Table: test01
Create Table: CREATE TABLE `test01` (
  `id` int(8) NOT NULL AUTO_INCREMENT,
  `username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
  `password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
  `create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
1 row in set (0.00 sec)


[email protected] 16:17:  [test_db]> select * from test01;
+----+----------+------------+---------------------+
| id | username | password   | create_time         |
+----+----------+------------+---------------------+
|  1 | tomcat   | xiaohuahua | 2021-07-03 23:51:17 |
|  2 | php      | xiao       | 2021-07-03 23:53:36 |
|  3 | fix      | xiao       | 2021-07-03 23:53:49 |
|  4 | java     | bai        | 2021-07-03 23:54:01 |
+----+----------+------------+---------------------+
4 rows in set (0.00 sec)



[[email protected] mysqlsmom01]# mom run -c ./test_mom/binlog_config.py >mysqlmom.log 2>&1 &
2021-08-08 16:23:57,873 root         INFO     {"username": "Tomcat", "_id": 1, "id": 1}
2021-08-08 16:23:57,874 root         INFO     {"username": "PHP", "_id": 2, "id": 2}
2021-08-08 16:23:57,874 root         INFO     {"username": "fix", "_id": 3, "id": 3}
2021-08-08 16:23:57,874 root         INFO     {"username": "java", "_id": 4, "id": 4}
2021-08-08 16:23:57,975 elasticsearch INFO     POST Http://172.16.0.247:9999/_bulk [status:200 request:0.101s]
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6uff08u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 1, "id": 1, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u4e1c(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 2, "id": 2, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 3, "id": 3, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6da6(u4e2du56fduff09", "_id": 4, "id": 4, "channel": "100-499u4eba "}
2021-08-08 16:23:58,007 elasticsearch INFO     POST http://172.16.0.247:9999/_bulk [status:200 request:0.027s]

real	0m0.637s
user	0m0.447s
sys	0m0.061s


全量同步启动命令如下:

?

[[email protected] mysqlsmom01]# mom run -c ./test_mom/init_config.py >mysqlmom.log 2>&1 &
2021-08-08 16:23:57,873 root         INFO     {"username": "tomcat", "_id": 1, "id": 1}
2021-08-08 16:23:57,874 root         INFO     {"username": "php", "_id": 2, "id": 2}
2021-08-08 16:23:57,874 root         INFO     {"username": "fix", "_id": 3, "id": 3}
2021-08-08 16:23:57,874 root         INFO     {"username": "java", "_id": 4, "id": 4}
2021-08-08 16:23:57,975 elasticsearch INFO     POST http://172.16.0.247:9999/_bulk [status:200 request:0.101s]
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6uff08u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 1, "id": 1, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u4e1c(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 2, "id": 2, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 3, "id": 3, "channel": "100-499u4eba "}
2021-08-08 16:23:57,979 root         INFO     {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6da6(u4e2du56fduff09", "_id": 4, "id": 4, "channel": "100-499u4eba "}
2021-08-08 16:23:58,007 elasticsearch INFO     POST http://172.16.0.247:9999/_bulk [status:200 request:0.027s]

real	0m0.637s
user	0m0.447s
sys	0m0.061s

?

图示如下:
技术分享图片

到此时,全量同步多张db表数据到ES同一个索引演示完成

全量同步多张db表到ES同一个索引

原文:https://blog.51cto.com/wujianwei/3314093

您可能感兴趣的文档:

--结束END--

本文标题: 全量同步多张db表到ES同一个索引

本文链接: https://www.lsjlt.com/news/8636.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • oracle怎么查询当前用户所有的表
    要查询当前用户拥有的所有表,可以使用以下 sql 命令:select * from user_tables; 如何查询当前用户拥有的所有表 要查询当前用户拥有的所有表,可以使...
    99+
    2024-05-14
    oracle
  • oracle怎么备份表中数据
    oracle 表数据备份的方法包括:导出数据 (exp):将表数据导出到外部文件。导入数据 (imp):将导出文件中的数据导入表中。用户管理的备份 (umr):允许用户控制备份和恢复过程...
    99+
    2024-05-14
    oracle
  • oracle怎么做到数据实时备份
    oracle 实时备份通过持续保持数据库和事务日志的副本来实现数据保护,提供快速恢复。实现机制主要包括归档重做日志和 asm 卷管理系统。它最小化数据丢失、加快恢复时间、消除手动备份任务...
    99+
    2024-05-14
    oracle 数据丢失
  • oracle怎么查询所有的表空间
    要查询 oracle 中的所有表空间,可以使用 sql 语句 "select tablespace_name from dba_tablespaces",其中 dba_tabl...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限设置
    答案:要创建 oracle 新用户,请执行以下步骤:以具有 create user 权限的用户身份登录;在 sql*plus 窗口中输入 create user identified ...
    99+
    2024-05-14
    oracle
  • oracle怎么建立新用户
    在 oracle 数据库中创建用户的方法:使用 sql*plus 连接数据库;使用 create user 语法创建新用户;根据用户需要授予权限;注销并重新登录以使更改生效。 如何在 ...
    99+
    2024-05-14
    oracle
  • oracle怎么创建新用户并赋予权限密码
    本教程详细介绍了如何使用 oracle 创建一个新用户并授予其权限:创建新用户并设置密码。授予对特定表的读写权限。授予创建序列的权限。根据需要授予其他权限。 如何使用 Oracle 创...
    99+
    2024-05-14
    oracle
  • oracle怎么查询时间段内的数据记录表
    在 oracle 数据库中查询指定时间段内的数据记录表,可以使用 between 操作符,用于比较日期或时间的范围。语法:select * from table_name wh...
    99+
    2024-05-14
    oracle
  • oracle怎么查看表的分区
    问题:如何查看 oracle 表的分区?步骤:查询数据字典视图 all_tab_partitions,指定表名。结果显示分区名称、上边界值和下边界值。 如何查看 Oracle 表的分区...
    99+
    2024-05-14
    oracle
  • oracle怎么导入dump文件
    要导入 dump 文件,请先停止 oracle 服务,然后使用 impdp 命令。步骤包括:停止 oracle 数据库服务。导航到 oracle 数据泵工具目录。使用 impdp 命令导...
    99+
    2024-05-14
    oracle
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作