iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >Mysql数据库如何监听binlog的开启步骤
  • 313
分享到

Mysql数据库如何监听binlog的开启步骤

2024-04-02 19:04:59 313人浏览 独家记忆
摘要

这篇文章主要介绍Mysql数据库如何监听binlog的开启步骤,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!前言我们经常需要根据用户对自己数据的一些操作来做一些事情.比如如果用户删除

这篇文章主要介绍Mysql数据库如何监听binlog的开启步骤,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

前言

我们经常需要根据用户对自己数据的一些操作来做一些事情.

比如如果用户删除了自己的账号,我们就给他发短信骂他,去发短信求他回来.

类似于这种功能,当然可以在业务逻辑层实现,在收到用户的删除请求之后执行这一操作,但是数据库的binlog为我们提供了另外一种操作方法.

要监听binlog,需要两步,第一步当然是你的mysql需要开启这一个功能,第二个是要写程序来对日志进行读取.

mysql开启binlog.

首先mysql的binlog日常是不打开的,因此我们需要:

找到mysql的配置文件my.cnf,这个因操作系统不一样,位置也不一定一样,可以自己找一下,

在其中加入以下内容:

[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-fORMat = ROW

之后重启mysql.

ubuntu
service mysql restart
// Mac
mysql.server restart

监测是否开启成功

进入mysql命令行,执行:

show variables like '%log_bin%' ;

如果结果如下图,则说明成功了:

Mysql数据库如何监听binlog的开启步骤

查看正在写入的binlog状态:

Mysql数据库如何监听binlog的开启步骤

代码读取binlog

引入依赖

我们使用开源的一些实现,这里因为一些奇怪的原因,我选用了mysql-binlog-connector-java这个包,(官方GitHub仓库)[github.com/shyiko/mysq…]具体依赖如下:

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
 <dependency>
 <groupId>com.github.shyiko</groupId>
 <artifactId>mysql-binlog-connector-java</artifactId>
 <version>0.17.0</version>
 </dependency>

当然,对binlog的处理有很多开源实现,阿里的cancl就是一个,也可以使用它.

写个demo

根据官方仓库中readme里面,来简单的写个demo.

 public static void main(String[] args) {
 BinaryLoGClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
 EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 client.reGISterEventListener(new BinaryLogClient.EventListener() {

 @Override
 public void onEvent(Event event) {
 // TODO
 dosomething();
 logger.info(event.toString());
 }
 });
 client.connect();
 }

这个完全是根据官方教程里面写的,在onEvent里面可以写自己的业务逻辑,由于我只是测试,所以我在里面将每一个event都打印了出来.

之后我手动登录到mysql,分别进行了增加,修改,删除操作,监听到的log如下:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}

根据自己的业务,封装一个更好使,更定制的工具类

开始的时候打算贴代码的,,,但是代码越写越多,索性传在github上了,这里只贴部分的实现.代码传送门

实现思路

  1. 支持对单个表的监听,因为我们不想真的对所有数据库中的所有数据表进行监听.

  2. 可以多线程消费.

  3. 把监听到的内容转换成我们喜闻乐见的形式(文中的数据结构不一定很好,我没想到更加合适的了).

所以实现思路大致如下:

  1. 封装个客户端,对外只提供获取方法,屏蔽掉初始化的细节代码.

  2. 提供注册监听器(伪)的方法,可以注册对某个表的监听(重新定义一个监听接口,所有注册的监听器实现这个就好).

  3. 真正的监听器只有客户端,他将此数据库实例上的所有操作,全部监听到并转换成我们想要的格式LogItem放进阻塞队列里面.

  4. 启动多个线程,消费阻塞队列,对某一个LogItem调用对应的数据表的监听器,做一些业务逻辑.

初始化代码:

 public MysqlBinLogListener(Conf conf) {
 BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
 EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
 EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
 );
 client.setEventDeserializer(eventDeserializer);
 this.parseClient = client;
 this.queue = new ArrayBlockingQueue<>(1024);
 this.conf = conf;
 listeners = new ConcurrentHashMap<>();
 dbTableCols = new ConcurrentHashMap<>();
 this.consumer = Executors.newFixedThreadPool(consumerThreads);
 }

注册代码:

 public void regListener(String db, String table, BinLogListener listener) throws Exception {
 String dbTable = getdbTable(db, table);
 Class.forName("com.mysql.jdbc.Driver");
 // 保存当前注册的表的colum信息
 Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
 Map<String, Colum> cols = getColMap(connection, db, table);
 dbTableCols.put(dbTable, cols);

 // 保存当前注册的listener
 List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
 list.add(listener);
 listeners.put(dbTable, list);
 }

在这个步骤中,我们在注册监听者的同时,获得了该表的schema信息,并保存到map里面去,方便后续对数据进行处理.

监听代码:

 @Override
 public void onEvent(Event event) {
 EventType eventType = event.getHeader().getEventType();

 if (eventType == EventType.TABLE_MAP) {
 TableMapEventData tableData = event.getData();
 String db = tableData.getDatabase();
 String table = tableData.getTable();
 dbTable = getdbTable(db, table);
 }

 // 只处理添加删除更新三种操作
 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
 if (isWrite(eventType)) {
 WriteRowsEventData data = event.getData();
 for (Serializable[] row : data.getRows()) {
  if (dbTableCols.containsKey(dbTable)) {
  LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
  e.setDbTable(dbTable);
  queue.add(e);
  }
 }
 }
 }
 }

我偷懒了,,,这里面只实现了对添加操作的处理,其他操作没有写.

消费代码:

 public void parse() throws IOException {
 parseClient.registerEventListener(this);

 for (int i = 0; i < consumerThreads; i++) {
 consumer.submit(() -> {
 while (true) {
  if (queue.size() > 0) {
  try {
  LogItem item = queue.take();
  String dbtable = item.getDbTable();
  listeners.get(dbtable).forEach(l -> {
  l.onEvent(item);
  });

  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  }
  Thread.sleep(1000);
 }
 });
 }
 parseClient.connect();
 }

消费时,从队列中获取item,之后获取对应的一个或者多个监听者,分别消费这个item.

测试代码:

 public static void main(String[] args) throws Exception {
 Conf conf = new Conf();
 conf.host = "hostname";
 conf.port = 3306;
 conf.username = conf.passwd = "hhsgsb";

 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
 mysqlBinLogListener.parseArgsAndRun(args);
 mysqlBinLogListener.regListener("pf", "student", item -> {
 System.out.println(new String((byte[])item.getAfter().get("name")));
 logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
 });
 mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));

 mysqlBinLogListener.parse();
 }

在这段很少的代码里,注册了两个监听者,分别监听student和teacher表,并分别进行打印处理,经测试,在teacher表插入数据时,可以独立的运行定义的业务逻辑.

注意:这里的工具类并不能直接投入使用,因为里面有许多的异常处理没有做,且功能仅监听了插入语句,可以用来做实现的参考.

以上是“Mysql数据库如何监听binlog的开启步骤”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网数据库频道!

您可能感兴趣的文档:

--结束END--

本文标题: Mysql数据库如何监听binlog的开启步骤

本文链接: https://www.lsjlt.com/news/57716.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Mysql数据库如何监听binlog的开启步骤
    这篇文章主要介绍Mysql数据库如何监听binlog的开启步骤,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!前言我们经常需要根据用户对自己数据的一些操作来做一些事情.比如如果用户删除...
    99+
    2024-04-02
  • Mysql 数据库开启 binlog
    1 简介 在MySQL中,binlog指的是binary log,二进制日志文件。这个文件记录了MySQL所有的DML操作。通过binlog日志,我们可以做数据恢复,做主从复制等等。对于运维或架构人员来说,开启binlog日志功能非常重要。...
    99+
    2023-08-18
    数据库 mysql
  • oracle数据库监听配置步骤是
    oracle 监听程序配置包含以下步骤:使用 netca 创建监听程序。将监听程序与实例关联,指定监听程序名称和实例名称。使用 lsnrctl 启动监听程序。使用客户端工具连接到数据库,...
    99+
    2024-05-11
    oracle
  • oracle数据库的监听怎么开启
    oracle 数据库监听开启方法 Oracle 数据库监听是数据库与客户端通信的门户。以下是如何开启 Oracle 监听: 步骤 1:验证监听状态 使用以下命令检查监听是否已启动: &l...
    99+
    2024-04-19
    oracle
  • linux如何重启oracle数据库监听
    要重启Oracle数据库监听程序,可以按照以下步骤进行操作:1. 使用root用户登录到Linux系统。2. 打开终端或命令行窗口。...
    99+
    2023-10-19
    linux oracle数据库
  • MySQL数据库通过Binlog恢复数据的详细步骤
    目录Mysql Binlog 简介开启Binlog使用Binlog恢复数据附录总结 Mysql Binlog 简介 Mysql Binlog是二进制格式的日志文件Binlo...
    99+
    2024-04-02
  • 如何开启MySQL的binlog日志
    这篇文章将为大家详细讲解有关如何开启MySQL的binlog日志,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。binlog 就是binary log,二进制日志文件,这个文件记录了mysql所有的dml操...
    99+
    2023-06-14
  • 步骤:如何备份MySQL数据库
    MySQL数据库备份的步骤,需要具体代码示例 数据库备份是非常重要的一项工作,它能帮助我们防止数据丢失以及在需要时恢复数据。对于MySQL数据库来说,备份数据是相对简单的操作。下面将详...
    99+
    2024-02-22
    mysql 步骤 数据库备份 sql语句 数据丢失
  • oracle数据库如何配置监听
    要配置 oracle 数据库监听器,请执行以下步骤:创建监听器,并指定其名称。编辑监听器配置文件,指定主机、端口和 ip 地址。启动监听器。验证监听器是否正在运行。 如何配置 Orac...
    99+
    2024-04-19
    oracle
  • php如何监听数据库变化
    要监听数据库的变化,可以使用以下两种方法:1. 轮询:在应用程序中设置一个定时器,定期查询数据库以检查是否有变化。可以使用定时器函数...
    99+
    2023-09-09
    php 数据库
  • 如何查看oracle数据库监听服务是否启动
    要查看 oracle 数据库监听服务是否启动,可以连接到数据库服务器并查询 v$listener 视图。结果集中 "status" 列将指示监听服务的状态:"listener" 表示已启...
    99+
    2024-04-19
    oracle linux
  • 在Windows系统中配置开启MySQL数据库日志的步骤
    第一步:在终端窗口中查看日志开启状态         1.首先Win+R,打开命令运行框,输入cmd打开终端窗口。           2.输入指令mysql -u root -p回车输入密码进入mysql数据库。          3...
    99+
    2023-09-04
    数学建模 mysql 数据库
  • MySQL数据库如何只监听某个特定地址
    本篇内容介绍了“MySQL数据库如何只监听某个特定地址”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!问:怎...
    99+
    2024-04-02
  • mysql创建数据库的步骤
    这篇文章主要讲解了“mysql创建数据库的步骤”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“mysql创建数据库的步骤”吧! ...
    99+
    2024-04-02
  • mysql建立数据库的步骤
    这篇文章给大家分享的是有关mysql建立数据库的步骤的内容。小编觉得挺实用的,因此分享给大家做个参考。一起跟随小编过来看看吧。mysql怎么建立数据库?mysql创建数据库有三种方式:1.使用 create...
    99+
    2024-04-02
  • 阿里云上启动MySQL数据库的详细步骤
    本文将详细地介绍在阿里云上启动MySQL数据库的步骤,包括环境配置、数据库安装、数据库配置、数据导入、权限设置等。 一、环境配置创建阿里云账号:如果你还没有阿里云账号,可以通过阿里云官网注册一个。登录阿里云账号:在浏览器中输入阿里云官网地址...
    99+
    2023-12-12
    阿里 步骤 数据库
  • 数据库中如何清理监听日志
    这篇文章主要为大家展示了“数据库中如何清理监听日志”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“数据库中如何清理监听日志”这篇文章吧。清理监听日志处理的方法:&...
    99+
    2024-04-02
  • 如何通过Java监听MySQL数据的变化
    目录原理开启MySQL的binlog功能Java监听MySQL的binlog实现监听数据变化总结原理 原理:java通过bin-log监控mysql数据变化 binlog :binl...
    99+
    2023-03-14
    java监听mysql数据表变化 java监听数据库变化 监听mysql数据变化
  • 一步步教你如何使用mysql binlog恢复数据
    如果想通过 mysql 的 binlog 恢复数据,首先要开启 binlog 。这里搭建一个测试的环境,了解一下 mysql binlog 是如何恢复数据库的。原理比较简单,binl...
    99+
    2023-05-14
    mysql使用binlog恢复数据 mysql binlog恢复数据 mysql恢复数据的语句
  • 开启阿里云数据库开放root权限的详细步骤
    在使用阿里云数据库时,有时候需要进行一些高级操作,如开启数据库的root权限。这篇文章将会详细说明如何开启阿里云数据库开放root权限。 阿里云数据库是一种基于云计算的数据库服务,提供了强大的计算和存储能力,可以方便地进行数据库管理和数据处...
    99+
    2023-10-30
    阿里 步骤 权限
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作