iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >如何使用ogg将Oracle数据传输到flume刷到kafka
  • 287
分享到

如何使用ogg将Oracle数据传输到flume刷到kafka

2023-06-02 16:06:30 287人浏览 薄情痞子
摘要

本篇内容主要讲解“如何使用ogg将oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!

本篇内容主要讲解“如何使用ogg将oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!

源端测试服务器

服务器环境部署:

命令步骤如下:

[root@test ~]# groupadd oinstall

[root@test ~]# groupadd dba

[root@test ~]# useradd -g oinstall -G dba oracle

[root@test ~]#

修改权限:

[root@test ~]#  chown -R oracle:oinstall /data

[root@test ~]# 

2. 设置全局java环境变量

[root@test ~]# cat /etc/redhat-release 

Centos release 6.4 (Final)

[root@test ~]#

[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz

在root下执行配置:

设置java环境变量:

vi /etc/profile

###jdk

export JAVA_HOME=/data/jdk1.8.0_60

export JAVA_BIN=/data/jdk1.8.0_60/bin

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME JAVA_BIN PATH CLASSPATH

export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH

切换Oracle用户核对:

[root@test ~]# su - oracle

[oracle@test ~]$ java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[oracle@test ~]$

如果不生效:

修改java环境变量:

alternatives --install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100 

alternatives --install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100 

alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100 

update-alternatives --install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100

#  /usr/sbin/alternatives --config java

[root@test1 data]#   /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/JVM/jre-1.6.0-openjdk.x86_64/bin/java

*+ 2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-GCj/bin/java

   4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 4

[root@test1 data]# /usr/sbin/alternatives --config java

There are 4 programs which provide 'java'.

  Selection    Command

-----------------------------------------------

   1           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

*  2           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

   3           /usr/lib/jvm/jre-1.5.0-gcj/bin/java

 + 4           /data/jdk1.8.0_60/bin/java

Enter to keep the current selection[+], or type selection number: 

[root@test1 data]# 

[root@test1 data]# java -version

java version "1.8.0_60"

Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[root@test1 data]# 

修改flume 参数配置:

[oracle@test1 conf]$ cat flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional infORMation

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

Http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing,

# software distributed under the License is distributed on an

# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied.  See the License for the

# specific language Governing permissions and limitations

# under the License.

# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'agent'

agent.sources = r1

agent.channels = fileChannel

agent.sinks = kafkaSink

# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = fileChannel

#

agent.sources.r1.type = avro

agent.sources.r1.port = 14141

agent.sources.r1.bind = 192.168.88.66

agent.sources.r1.channels = fileChannel

# Each sink's type must be defined

agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel

#kafka sink

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkaSink.topic = my_schema 

agent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092

agent.sinks.kafkaSink.requiredAcks = 1

agent.sinks.kafkaSink.batchSize = 20

agent.sinks.kafkaSink.channel = fileChannel

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100

#File Channel

agent.channels.fileChannel.type = file

agent.channels.fileChannel.transactionCapacity = 20000000

agent.channels.fileChannel.capacity = 50000000

agent.channels.fileChannel.maxFileSize = 2147483648

agent.channels.fileChannel.minimumRequiredSpace = 52428800

agent.channels.fileChannel.keep-alive = 3

agent.channels.fileChannel.checkpointInterval = 20000

agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDir

agent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir

[oracle@test1 conf]$ 

############配置OGG

主库在

源库创建新的抽取进程:

dblogin userid goldengate, passWord goldengate

add extract EXTJMS,tranlog, threads 2,begin now

add exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200

add schematrandata my_schema

add trandata my_schema.*

原抽取进程:

extract EXTJMS

setenv (ORACLE_SID="testdb")

setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8") 

userid goldengate, password goldengate

TRANLOGOPTIONS DBLOGREADER

exttrail /data/goldengate/dirdat/kf

discardfile  /data/goldengate/dirrpt/EXTJMS.dsc,append

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

numfiles 3000

CHECKPOINTSECS 20

DISCARDROLLOVER AT 05:30

dynamicresolution

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

ddl &

include mapped &

exclude objtype 'TRIGGER' &

exclude objtype 'PROCEDURE' &

exclude objtype 'FUNCTION' &

exclude objtype 'PACKAGE' &

exclude objtype 'PACKAGE BODY' &

exclude objtype 'TYPE' &

exclude objtype 'GRANT' &

exclude instr 'GRANT' &

exclude objtype 'DATABASE LINK' &

exclude objtype 'CONSTRAINT' &

exclude objtype 'JOB' &

exclude instr 'ALTER SESSION' &

exclude INSTR 'AS SELECT' &

exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"

FETCHOPTIONS  NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT

TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;

--extract table user

TABLE my_schema.*;

sql>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;

Database altered.

SQL>  ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;

Database altered.

SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;

SUPPLEME SUP SUP FOR

-------- --- --- ---

YES      YES YES YES

SQL> 

源端添加新的pump进程:

在my_schema源库测试添加pump进程:

添加pump进程:

添加新的pump:

add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin now

edit param EDPKF

EXTRACT EDPKF

setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

RMTHOST 192.168.88.66, MGRPORT 7839

RMTTRAIL /data/ogg_for_bigdata/dirdat/kp

DISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5

TABLE my_schema.* ;

add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200

edit  param defgen

userid goldengate, password goldengate

defsfile dirdef/my_schema.def

TABLE my_schema.*;

传递定义文件:

./defgen paramfile ./dirprm/defgen.prm

目标端直接端

mgr:

PORT 7839

DYNAMICPORTLIST 7840-7850

--AUTOSTART replicat *

--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

添加 UE DATA PUMP:

 使用版本:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kp

edit param JMSFLM

GGSCI (localhost.localdomain) 18> view param JMSFLM

EXTRACT JMSFLM

SETENV (GGS_USEREXIT_CONF ="dirprm/JMSFLM.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/my_schema.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NotcpSourceTimer

TABLEEXCLUDE my_schema.MV*;

TABLE my_schema.*;

--alter prodjms extseqno 736, extrba 0

注释: 在目标端完全可以不安装Oracle数据库,可以和flume环境放在一起,最终刷数据到kafka的服务器接收消息。

本案例是 通过flume中转实现的,完全没有问题。

当然也可以直接将数据传输到kafka处理消息,原理都是一样的。

未来更多的大数据融合也是一个不错的方案,无论是mysqlmongodbhdfs等都可以完美结合。

 参数文件:

$ cat JMSFLM.props

gg.handlerlist=flumehandler

gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler

gg.handler.flumehandler.host=192.168.88.66

gg.handler.flumehandler.port=14141

gg.handler.flumehandler.rpcType=avro

gg.handler.flumehandler.delimiter=\u0001

gg.handler.flumehandler.mode=op

gg.handler.flumehandler.includeOpType=true

# Indicates if the operation timestamp should be included as part of output in the delimited separated values

# true - Operation timestamp will be included in the output

# false - Operation timestamp will not be included in the output

# Default :- true

#gg.handler.flumehandler.includeOpTimestamp=true

#gg.handler.name.deleteOpKey=D

#gg.handler.name.updateOpKey=U

#gg.handler.name.insertOpKey=I

#gg.handler.name.pKUpdateOpKey=P

#gg.handler.name.includeOpType=true

# Optional properties to use the transaction grouping functionality

#gg.handler.flumehandler.maxGroupSize=1000

#gg.handler.flumehandler.minGroupSize=1000

### native library config ###

goldengate.userexit.nochkpt=TRUE

goldengate.userexit.timestamp=utc

goldengate.log.logname=cuserexit

goldengate.log.level=DEBUG

goldengate.log.tofile=true

goldengate.userexit.writers=javawriter

goldengate.log.level.JAVAUSEREXIT=DEBUG

#gg.brokentrail=true

gg.report.time=30sec

gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*

javawriter.stats.full=TRUE

javawriter.stats.display=TRUE

javawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

到此,相信大家对“如何使用ogg将Oracle数据传输到flume刷到kafka”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

--结束END--

本文标题: 如何使用ogg将Oracle数据传输到flume刷到kafka

本文链接: https://www.lsjlt.com/news/230607.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何使用ogg将Oracle数据传输到flume刷到kafka
    本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!...
    99+
    2023-06-02
  • 使用ogg将Oracle数据传输到flume刷到kafka中的内存设置是怎样的
    本篇文章为大家展示了使用ogg将Oracle数据传输到flume刷到kafka中的内存设置是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。flume 环境可以根据不同的需求启动多个进程,并配置...
    99+
    2023-06-02
  • Oracle数据怎么发送到kafka传输数据
    这篇文章主要介绍“Oracle数据怎么发送到kafka传输数据”,在日常操作中,相信很多人在Oracle数据怎么发送到kafka传输数据问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Oracle数据怎么发送到...
    99+
    2023-06-03
  • Kafka中如何将数据导入到Elasticsearch
    本篇文章为大家展示了Kafka中如何将数据导入到Elasticsearch,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和...
    99+
    2023-06-02
  • 如何使用OGG同步ORACLE ACTIVE DATAGUARD数据到CLOUDERA HBASE测试
    这篇文章主要介绍了如何使用OGG同步ORACLE ACTIVE DATAGUARD数据到CLOUDERA HBASE测试,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让...
    99+
    2024-04-02
  • 如何使用mysqlimport将数据上传到MySQL表中?
    For uploading the data into MySQL tables by using mysqlimport we need to follow following steps −Step-1 − Creating the t...
    99+
    2023-10-22
  • 如何将数据从阿里云服务器传输到电脑
    在云计算时代,越来越多的企业和个人选择将数据存储在云端服务器上,以实现数据的高效管理和共享。然而,有时候我们需要将数据从阿里云服务器传输到个人电脑,以便进行进一步处理或备份。本文将介绍几种常见的方法,帮助您轻松完成数据传输任务。 1. 使用...
    99+
    2024-01-14
    阿里 如何将 服务器
  • JavaScript如何将数据输出到控制台
    在 JavaScript 中打印可以是一个简单而重要的任务。这个功能不仅可以帮助我们调试错误,还可以输出数据给用户或者将生成的信息发送到日志文件中。在这篇文章中,我们将介绍几种打印技术和方法,通过它们我们可以将 JavaScript 中的数...
    99+
    2023-05-14
  • 如何使用mysqlimport将数据上传到多个MySQL表中?
    通过mysqlimport的帮助,我们可以将数据上传到多个MySQL表中。下面的示例中有说明:示例假设我们想要从两个数据文件student1_tbl.txt上传以下数据:1 Saurav 11th 2 Sahil ...
    99+
    2023-10-22
  • python如何将数据输出到文件中
    要将数据输出到文件中,可以使用Python中的文件对象和相关的方法。首先,需要使用内置的`open()`函数来打开一个文件,并传入文...
    99+
    2023-09-20
    python
  • 如何将数据上传到云服务器
    将数据上传到云服务器可以通过以下步骤实现:1. 选择云服务提供商:选择一个可靠的云服务提供商。2. 创建云服务器实例:在选择的云服务...
    99+
    2023-09-27
    云服务器
  • 如何使用阿里云主机将数据库中的文件传输到电脑上
    本文将介绍如何使用阿里云主机将数据库中的文件传输到电脑上的方法。通过以下步骤,您可以在不破坏数据库结构的情况下,方便地将文件从数据库中提取出来,并保存到您的电脑上。 1. 登录阿里云主机首先,您需要登录到您的阿里云主机。在浏览器中输入您的主...
    99+
    2023-12-28
    阿里 数据库中 如何使用
  • socket传输接收不到数据如何解决
    有几种可能的原因导致无法接收到数据:1. 网络连接问题:检查网络连接是否正常,确保服务器和客户端能够互相通信。2. 端口号问题:确保...
    99+
    2023-09-12
    socket
  • springMVC如何将controller中数据传递到jsp页面
    1> 将方法的返回值该为ModelAndView在返回时,将数据存储在ModelAndView对象中如:newModelAndView("/WEBINF/jsp/showData.jsp","message",message)...
    99+
    2023-05-31
    springmvc controller jsp
  • 如何将matlab数据导入到Python中使用
    相信不少小伙伴都遇到过和我一样的问题,就是在尝试使用scipy.io.loadmat将matlab类型的数据导入python中的时候遇到如下错误提示。 import scipy as...
    99+
    2022-12-15
    matlab数据导入到Python中使用 matlab导入Python
  • 如何使用UniApp传递数据到服务器
    UniApp是一款开发跨平台应用的框架,使用它可以快速地开发出同时在不同平台上工作的应用程序。在应用程序中,我们经常需要向服务器传递数据,这里我们介绍如何使用UniApp传递数据到服务器。一、UniApp发送网络请求UniApp提供了一个A...
    99+
    2023-05-14
  • 如何使用sqoop从oracle导数据到Hbase
    这篇文章主要为大家展示了“如何使用sqoop从oracle导数据到Hbase”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何使用sqoop从oracle导数据...
    99+
    2024-04-02
  • pytest如何使用parametrize将参数化变量传递到fixture
    本篇内容介绍了“pytest如何使用parametrize将参数化变量传递到fixture”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、...
    99+
    2023-06-30
  • 如何从mysql中将数据导入到oracle数据库中
    这篇文章主要讲解了“如何从mysql中将数据导入到oracle数据库中”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何从mysql中将数据导入到oracl...
    99+
    2024-04-02
  • 如何使用 Go 将用户输入插入到 Postgres Db
    php小编百草为你介绍如何使用Go将用户输入插入到Postgres数据库。在现代应用程序开发中,数据库是必不可少的一部分。Postgres是一个强大的开源关系型数据库管理系统,而Go是...
    99+
    2024-02-12
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作