广告
返回顶部
首页 > 资讯 > 数据库 >OGG同步ORACLE数据到KAFKA
  • 549
分享到

OGG同步ORACLE数据到KAFKA

2024-04-02 19:04:59 549人浏览 八月长安
摘要

环境:源端:oracle12.2 ogg for oracle 12.3目标端:kafka ogg for bigdata 12.3将oracle中的数据通过OGG同步到KAFKA 源

环境:
源端:oracle12.2 ogg for oracle 12.3
目标端:kafka ogg for bigdata 12.3
将oracle中的数据通过OGG同步到KAFKA

源端配置:
1、为要同步的表添加附加日志
dblogin USERID ogg@orclpdb, PASSWord ogg
add trandata scott.tab1
add trandata scott.tab2

2、 添加抽取进程
GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
GGSCI>add EXTTRaiL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数:
GGSCI> edit params EXT_KAF1

extract EXT_KAF1
userid C##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k1,FORMAT RELEASE 12.3
SOURCECATALOG orclpdb --(指定pdb)
table scott.tab1;
table scott.tab2;

注册进程
GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
GGSCI> reGISter extract EXT_KAF1 database container (orclpdb)

3、添加投递进程:
GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数:
GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/f1,format release 12.3
SOURCECATALOG orclpdb
TABLE scott.tab1;
table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化
GGSCI> add extract ek_01, sourceistable

编辑参数:
GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02
EXTRACT ek_02
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab2;

5、生成def文件:
GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;
table scott.tab2;

在OGG_HOME下执行如下命令生成def文件
defgen paramfile dirprm/defgen1.prm

将生成的def文件传到目标端$OGG_HOME/dirdef下

目标端配置:
1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下
cd $OGG_HOME/AdapterExamples/big-data/kafka
cp * $OGG_HOME/dirprm

2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下
cd $ORACLE_HOME/AdapterExamples/trail
cp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)
GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/ka
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrun
GGSCI> EDIT PARAMS rp_02

SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/kb
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程:
GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
GGSCI>edit params r_kaf1

REPLICAT r_kaf1
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISioNS
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
GGSCI> edit params r_kaf2

REPLICAT r_kaf2
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置:
custom_kafka_producer.properties文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=10000

kafka1.props文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= topic1
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =JSON --这里做了改动,指定格式为json格式
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键
gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
Goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复:
1、启动源端抓取进程
GGSCI> start EXT_KAF1
2、启动源端投递进程
GGSCI> start PMP_KAF1
3、启动源端初始化进程
GGSCI> start ek_01
4、启动目标端初始化进程
在$OGG_HOME下执行如下命令:
./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
5、启动目标端恢复进程
GGSCI> start R_KAF1

遇到的错误:
1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)
OGG同步ORACLE数据到KAFKA
原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的)
解决:重启MGR进程

2、ERROR OG-15051 Java or JNI exception
OGG同步ORACLE数据到KAFKA
原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。
解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

您可能感兴趣的文档:

--结束END--

本文标题: OGG同步ORACLE数据到KAFKA

本文链接: https://www.lsjlt.com/news/36796.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • OGG同步ORACLE数据到KAFKA
    环境:源端:oracle12.2 ogg for oracle 12.3目标端:KAFKA ogg for bigdata 12.3将oracle中的数据通过OGG同步到KAFKA 源...
    99+
    2022-10-18
  • Oracle同步数据到kafka的方法
    目录环境准备软件准备下载地址实施过程Oracle主机(A)配置Kafka主机(B)配置配置apache-maven工具配置Kafka 2.13-2.6.0配置kafka-connec...
    99+
    2022-11-13
  • ogg 同步kafka OGG-15051 Java or JNI exception:
    2018-08-09 16:02:01  ERROR   OGG-15051  Java or JNI exception: oracle.goldengate.util...
    99+
    2022-10-18
  • 如何使用ogg将Oracle数据传输到flume刷到kafka
    本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!...
    99+
    2023-06-02
  • cancel框架同步mysql数据到kafka
    下载cancel https://github.com/alibaba/canal/releases/tag/canal-1.1.5 修改conf文件夹下的canal.properties配置文件 c...
    99+
    2023-10-07
    mysql kafka 数据库
  • Oracle+Ogg 归档丢失 重新导数据建立ogg同步步骤
    1031 ogg 恢复操作 1、在目标端停掉复制进程,删除目标端OGG的复制进程 ggsci>  dblogin userid ggtarget,password ggtarget ggsc...
    99+
    2022-10-18
  • 如何使用OGG同步ORACLE ACTIVE DATAGUARD数据到CLOUDERA HBASE测试
    这篇文章主要介绍了如何使用OGG同步ORACLE ACTIVE DATAGUARD数据到CLOUDERA HBASE测试,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让...
    99+
    2022-10-19
  • OGG怎么实现两台Oracle数据库的同步
    本篇内容介绍了“OGG怎么实现两台Oracle数据库的同步”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!首...
    99+
    2022-10-18
  • oracle 11.2.0.4 ogg for centos6.8安装及数据半同步测试
    目的:本博文给出11.2.0.4 oracle数据库ogg搭建过程中,备库的准备过程包括2种方式:第一种是主库rman全量备份后恢复,第二种是expdp从主库导出用户然后在备库导入。 对ogg数据...
    99+
    2022-10-18
  • oracle数据如何通过goldengate实时同步到kafka消息队列中
    这篇文章主要介绍oracle数据如何通过goldengate实时同步到kafka消息队列中,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!环境介绍组件版本组件版本描述源端oracleo...
    99+
    2022-10-19
  • 使用ogg从oracle 同步mysql会遇到什么问题
    这篇文章主要介绍了使用ogg从oracle 同步mysql会遇到什么问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。ORACLE ...
    99+
    2022-10-18
  • 基于 Dinky + FlinkSQL + Flink CDC 同步 MySQL 数据到 Elasticsearch、Kafka
    一、概述 Dinky 是一个开箱即用的一站式实时计算平台以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架致力于流批一体和湖仓一体的建设与实践。本文以此为FlinkSQL可视化工...
    99+
    2023-09-27
    kafka mysql flink
  • 详解Flink同步Kafka数据到ClickHouse分布式表
    目录引言什么是ClickHouse?创建复制表通过jdbc写入引言 业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse 什么是ClickHouse?...
    99+
    2022-12-01
    Flink数据同步KafkaClickHouse ClickHouse分布式表
  • MySQL 到Oracle 实时数据同步
    目录第一步:配置MySQL 连接第二步:配置 Oracle连接第四步:进行数据校验其他数据库的同步操作摘要:很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同...
    99+
    2022-11-12
  • 使用ogg将Oracle数据传输到flume刷到kafka中的内存设置是怎样的
    本篇文章为大家展示了使用ogg将Oracle数据传输到flume刷到kafka中的内存设置是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。flume 环境可以根据不同的需求启动多个进程,并配置...
    99+
    2023-06-02
  • Canal+Kafka实现Mysql数据同步
    Canal介绍 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。 canal是应阿里巴巴...
    99+
    2023-09-05
    kafka mysql 分布式
  • ogg怎么防止数据死循环同步
    这篇文章主要讲解了“ogg怎么防止数据死循环同步”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“ogg怎么防止数据死循环同步”吧!最近在ORACLE 到MYS...
    99+
    2022-10-18
  • 从Oracle用goldengate抽取数据到kafka
    Goldengate到kafka配置详解 环境介绍: 源端数据库版本 源端OGG版本 ...
    99+
    2022-10-18
  • 使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表
    使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEn...
    99+
    2023-08-30
    mysql kafka 数据库
  • Oracle数据怎么发送到kafka传输数据
    这篇文章主要介绍“Oracle数据怎么发送到kafka传输数据”,在日常操作中,相信很多人在Oracle数据怎么发送到kafka传输数据问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Oracle数据怎么发送到...
    99+
    2023-06-03
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作