Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

OGG synchronizes ORACLE data to KAFKA

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Environment:

Source: oracle12.2 ogg for oracle 12.3

Destination side: KAFKA ogg for bigdata 12.3

Synchronize the data in oracle to KAFKA through OGG

Source configuration:

1. Add additional logs to the tables to be synchronized

Dblogin USERID ogg@orclpdb, PASSWORD ogg

Add trandata scott.tab1

Add trandata scott.tab2

2. Add extraction process

GGSCI > add extract EXT_KAF1,integrated tranlog, begin now

GGSCI > add EXTTRAIL. / dirdat/k1, extract EXT_KAF1,MEGABYTES

Edit the extraction process parameters:

GGSCI > edit params EXT_KAF1

Extract EXT_KAF1

Userid c##ggadmin,PASSWORD ggadmin

LOGALLSUPCOLS

UPDATERECORDFORMAT COMPACT

Exttrail. / dirdat/k1,FORMAT RELEASE 12.3

SOURCECATALOG orclpdb-- (specify pdb)

Table scott.tab1

Table scott.tab2

Registration process

GGSCI > DBLOGIN USERID canalogadmin. password ggadmin

GGSCI > register extract EXT_KAF1 database container (orclpdb)

3. Add delivery process:

GGSCI > add extract PMP_KAF1, exttrailsource. / dirdat/k1

GGSCI > add rmttrail. / dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

Edit the delivery process parameters:

GGSCI > edit param PMP_KAF1

EXTRACT PMP_KAF1

USERID c##ggadmin,PASSWORD ggadmin

PASSTHRU

RMTHOST 10.1.1.247, MGRPORT 9178

RMTTRAIL. / dirdat/f1,format release 12.3

SOURCECATALOG orclpdb

TABLE scott.tab1

Table scott.tab2

4. Add data initialization process (Oracle initial load). Multiple tables can be initialized separately or together. Here, separate initialization is selected.

GGSCI > add extract ek_01, sourceistable

Edit parameters:

GGSCI > EDIT PARAMS ek_01

EXTRACT ek_01

USERID c##ggadmin,PASSWORD ggadmin

RMTHOST 10.1.1.247, MGRPORT 9178

RMTFILE. / dirdat/ka,maxfiles 999, megabytes 500,format release 12.3

SOURCECATALOG orclpdb

Table scott.tab1

GGSCI > add extract ek_02, sourceistable

EDIT PARAMS ek_02

EXTRACT ek_02

USERID c##ggadmin,PASSWORD ggadmin

RMTHOST 10.1.1.247, MGRPORT 9178

RMTFILE. / dirdat/kb,maxfiles 999, megabytes 500,format release 12.3

SOURCECATALOG orclpdb

Table scott.tab2

5. Generate def file:

GGSCI > edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin

Defsfile / home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3

SOURCECATALOG orclpdb

Table scott.tab1

Table scott.tab2

Execute the following command under OGG_HOME to generate the def file

Defgen paramfile dirprm/defgen1.prm

Transfer the generated def file to the target side $OGG_HOME/dirdef

Destination side configuration:

1. Copy all the files under $OGG_HOME/AdapterExamples/big-data/kafka to $OGG_HOME/dirprm

Cd $OGG_HOME/AdapterExamples/big-data/kafka

Cp * $OGG_HOME/dirprm

2. Tr000000000 copy the file under $ORACLE_HOME/AdapterExamples/trail to $OGG_HOME/dirdat

Cd $ORACLE_HOME/AdapterExamples/trail

Cp tr000000000 $OGG_HOME/dirdat

3. Add initialization process: (you can initialize multiple tables together or separately. Choose separate initialization here)

GGSCI > ADD replicat rp_01, specialrun

GGSCI > EDIT PARAMS rp_01

SPECIALRUN

End runtime

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

Targetdb libfile libggjava.so set property=./dirprm/kafka1.props

SOURCEDEFS. / dirdef/defgen1.def

EXTFILE. / dirdat/ka

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP orclpdb.scott.tab1, TARGET scott.tab1

GGSCI > ADD replicat rp_02, specialrun

GGSCI > EDIT PARAMS rp_02

SPECIALRUN

End runtime

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

Targetdb libfile libggjava.so set property=./dirprm/kafka2.props

SOURCEDEFS. / dirdef/defgen1.def

EXTFILE. / dirdat/kb

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP orclpdb.scott.tab2, TARGET scott.tab2

4. Add recovery process:

GGSCI > add replicat ritual Kaf1 Magazine exttrail. / dirdat/f1

GGSCI > edit params r_kaf1

REPLICAT r_kaf1

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

HANDLECOLLISIONS

Targetdb libfile libggjava.so set property=./dirprm/kafka1.props

SOURCEDEFS. / dirdef/defgen1.def

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP orclpdb.scott.tab1, TARGET scott.tab1

GGSCI > add replicat ritual Kaf2 Magazine exttrail. / dirdat/f2

GGSCI > edit params r_kaf2

REPLICAT r_kaf2

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

HANDLECOLLISIONS

Targetdb libfile libggjava.so set property=./dirprm/kafka2.props

SOURCEDEFS. / dirdef/defgen1.def

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP orclpdb.scott.tab2, TARGET scott.tab2

5. Parameter configuration:

The custom_kafka_producer.properties file is as follows:

Bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200-- just change this line to specify the address and port number of the kafka

Acks=1

Reconnect.backoff.ms=1000

Value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Batch.size=16384

Linger.ms=10000

The kafka1.props file is as follows:

Gg.handlerlist = kafkahandler

Gg.handler.kafkahandler.type=kafka

Gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

# The following resolves the topic name using the short table name

Gg.handler.kafkahandler.topicMappingTemplate= topic1

# gg.handler.kafkahandler.format=avro_op

Gg.handler.kafkahandler.format = json-- this has been changed to specify the format as json format

Gg.handler.kafkahandler.format.insertOpKey=I

Gg.handler.kafkahandler.format.updateOpKey=U

Gg.handler.kafkahandler.format.deleteOpKey=D

Gg.handler.kafkahandler.format.truncateOpKey=T

Gg.handler.kafkahandler.format.prettyPrint=false

Gg.handler.kafkahandler.format.jsonDelimiter=CDATA []

Gg.handler.kafkahandler.format.includePrimaryKeys=true-contains the primary key

Gg.handler.kafkahandler.SchemaTopicName= topic1-- specified here as the name of the target topic to be synchronized to

Gg.handler.kafkahandler.BlockingSend = false

Gg.handler.kafkahandler.includeTokens=false

Gg.handler.kafkahandler.mode=op

Goldengate.userexit.timestamp=utc

Goldengate.userexit.writers=javawriter

Javawriter.stats.display=TRUE

Javawriter.stats.full=TRUE

Gg.log=log4j

Gg.log.level=INFO

Gg.report.time=30sec

# Sample gg.classpath for Apache Kafka

Gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/-- specify classpath, which is important here, you must have a class library for the kafka installation files.

# Sample gg.classpath for HDP

# gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

Javawriter.bootoptions=-Xmx512m-Xms32m-Djava.class.path=ggjava/ggjava.jar

Start the process to resume:

1. Start the source-side crawling process

GGSCI > start EXT_KAF1

2. Start the source delivery process

GGSCI > start PMP_KAF1

3. Start the source initialization process

GGSCI > start ek_01

4. Start the initialization process on the target side

Execute the following command under $OGG_HOME:

. / replicat paramfile. / dirprm/rp_01.prm reportfile. / dirrpt/rp_01.rpt-p INITIALDATALOAD

5. Start the recovery process on the target side

GGSCI > start R_KAF1

Error encountered:

1. ERROR OGG-15050 Error loading Java VM runtime library (2 no such file or directory)

Reason: class library not found (OGG's mgr process did not restart after configuring environment variables)

Solution: restart the MGR process

2 、 ERROR OG-15051 Java or JNI exception

Reason: instead of using the kafka.props that comes with ogg12.3.1.1.1, the kafka.props of ogg12.2 is copy, resulting in an exception.

Solution: use the kafka.props that comes with ogg12.3.1.1.1 and specify relevant attributes to solve the problem.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report