Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to synchronize oracle data to kafka message queue in real time through goldengate

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces how to synchronize oracle data to kafka message queue in real time through goldengate. It is very detailed and has certain reference value. Interested friends must finish reading it.

Environment introduction

Component version

module

Version

Description

Source-side oracle

Oracle 11.2.0.4 for linux x64

Source-side oracle

Source-side ogg

Oracle ogg 11.2.0.1.20 for oracle linux x64

Source-side ogg, which is used to extract the data changes of the source-side oracle and send the change log to the destination side

Destination side kafka

Kafka_2.11-0.11.0.2 for linux x64

Message queue, which receives the data pushed by the destination ogg

Destination side ogg

The destination ogg receives the oracle transaction change log sent by the source, and pushes the change to the kafka message queue

Overall architecture diagram

Noun interpretation

1.OGG Manager

OGG Manager is used to configure and manage other OGG components, configure data extraction, data push, data replication, start and stop related components, and view the operation of related components.

two。 Data extraction (Extract)

Extract changes to the source-side database (DML, DDL). Data extraction is mainly divided into the following types: local extraction captures incremental change data from the local database, writes to the local Trail file data push (Data Pump) reads the data from the local Trail file, and pushes it to the target. Initial data extraction derives full data from a database table for initial data loading

3. Data push (Data Pump)

Data Pump is a special type of data extraction (Extract) that reads data from a local Trail file and sends it over the network to the destination OGG.

4.Trail file

Data extraction the change information crawled from the source database will be written to the Trail file.

5. Data reception (Collector)

The data receiving program runs on the target machine and is used to receive the Trail log sent by Data Pump and write the data to the local Trail file.

6. Data replication (Replicat)

Data replication runs on the target machine, reads the data changes from the Trail file, and applies the change data to the target data storage system. In this case, data replication pushes the data to the kafka message queue.

7. Checkpoint (Checkpoint)

Checkpoints are used to record changes in database transactions.

Operation steps

Source-side Oracle database configuration

Open source archiving

SQL > archive log list

Database log mode Archive Mode

Automatic archival Enabled

Archive destination / u01/app/oracle/product/11.2.3/db_1/dbs/arch

Oldest online log sequence 12

Next log sequence to archive 17

Current log sequence 17

To open the archive, the solution is as follows:

Conn / as sysdba (connect to the database as DBA)

Shutdown immediate (shut down the database immediately)

Startup mount (start the instance and load the database, but not open it)

Alter database archivelog; (change database to archive mode)

Alter database open; (Open Database)

Alter system archive log start; (enable automatic archiving)

2) OGG carries out real-time transmission based on the auxiliary log, etc., so you need to open the relevant log to ensure that the transaction content can be obtained. Check the status by using the following command

SQL > select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database

FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI

-

YES YES

If the secondary log is not enabled, it needs to be enabled.

SQL > alter database force logging

SQL > alter database add supplemental log data

SQL > alter database add supplemental log data (all) columns

3. Enable goldengate replication parameters

SQL > alter system set enable_goldengate_replication = true

4. Create a source Oracle account

SQL > create tablespace tbs_ogg datafile'/ oradata/dtstack/tbs_ogg.dbf' size 1024m autoextend on

SQL > create user ggsadmin identified by oracle default tablespace tbs_ogg

SQL > grant dba to ggsadmin

5. Create a test table (production strategy)

SQL > create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id

< 500; SQL>

Alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key (object_id)

SQL > select count (*) from baiyang.ora_to_kfk

COUNT (*)

-

four hundred and thirty six

Deploy ogg

Source side (oracle source side)

1. Decompression

Set up the ogg directory first

Mkdir-p / ogg

Tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar-C / ogg

Chown-R oracle:oinstall / ogg (enable oracle users to have ogg privileges, some of which need to be executed under oracle users to succeed)

2 configure ogg environment variables

For simplicity and convenience, it is recommended to configure oracle in the environment variable file / home/oracle/.bash_profile in production.

Export JAVA_HOME=/usr/local/java1.8

Export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH

Export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar

Export JAVA=$JAVA_HOME/bin/java

Export OGG_HOME=/ogg

Export PATH=$PATH:$OGG_HOME

Export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so

Effective environment variable

Source / home/oracle/.bash_profile

3. OGG initialization

Ggsci

Create subdirs

Ggsci

Oracle GoldenGate Command Interpreter for Oracle

Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.

GGSCI (ambari.master.com) 1 > create subdirs

Creating subdirectories under current directory / root

Parameter files / root/dirprm: created

Report files / root/dirrpt: created

Checkpoint files / root/dirchk: created

Process status files / root/dirpcs: created

SQL script files / root/dirsql: created

Database definitions files / root/dirdef: created

Extract data files / root/dirdat: created

Temporary files / root/dirtmp: created

Stdout files / root/dirout: created

4. Configure source-side Manager

GGSCI (dtproxy) 4 > dblogin userid ggsadmin password oracle

GGSCI (dtproxy as ggsadmin@dtstack) 5 > edit param. / globals

-- add

Oggschema ggsadmin

GGSCI (dtproxy as ggsadmin@dtstack) 6 > edit param mgr

-add

PORT 7810-default listening port

DYNAMICPORTLIST 7811-7820-dynamic Port list

AUTORESTART EXTRACT *, RETRIES 5WAITMINUTES 3-there is a problem with the process. Restart every 3 minutes for a total of five times

PURGEOLDEXTRACTS. / dirdat/, USECHECKPOINTS, MINKEEPDAYS 7-/

LAGREPORTHOURS 1-checks for transmission delays every other hour

LAGINFOMINUTES 30-error log will be written if the transfer delay exceeds 30 minutes

LAGCRITICALMINUTES 45-warning log will be written if the transmission delay exceeds 45 minutes

PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7-periodically clean up trail files

-- ACCESSRULE, PROG, IPADDR 172., ALLOW-- set the network segment to be connectable.

5. Add synchronized table-level logs

GGSCI (dtproxy as ggsadmin@dtstack) 9 > add trandata baiyang.ora_to_kfk

GGSCI (dtproxy as ggsadmin@dtstack) 10 > info trandata baiyang.ora_to_kfk

Destination side (kafka destination side)

1. Decompression

Mkdir-p / ogg

Unzip V839824-01.zip

Tar xf ggs_Adapters_Linux_x64.tar-C / ogg/

2 configure ogg environment variables

For simplicity and convenience, it is recommended to configure oracle in the environment variable file / home/oracle/.bash_profile in production.

Export JAVA_HOME=/usr/local/java1.8/jre

Export PATH=$JAVA_HOME/bin:$PATH

Export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH

Export OGG_HOME=/ogg

Export PATH=$PATH:$OGG_HOME

Export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so

Effective environment variable

Source / home/oracle/.bash_profile

OGG initialization

Ggsci

Create subdirs

Ggsci

Oracle GoldenGate Command Interpreter for Oracle

Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.

GGSCI (ambari.master.com) 1 > create subdirs

Creating subdirectories under current directory / root

Parameter files / root/dirprm: created

Report files / root/dirrpt: created

Checkpoint files / root/dirchk: created

Process status files / root/dirpcs: created

SQL script files / root/dirsql: created

Database definitions files / root/dirdef: created

Extract data files / root/dirdat: created

Temporary files / root/dirtmp: created

Stdout files / root/dirout: created

Configure source-side Manager

GGSCI (dtproxy as ggsadmin@dtstack) 6 > edit param mgr

-add

PORT 7810-default listening port

DYNAMICPORTLIST 7811-7820-dynamic Port list

AUTORESTART EXTRACT *, RETRIES 5WAITMINUTES 3-there is a problem with the process. Restart every 3 minutes for a total of five times

PURGEOLDEXTRACTS. / dirdat/, USECHECKPOINTS, MINKEEPDAYS 7

PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7-periodically clean up trail files

-- ACCESSRULE, PROG, IPADDR 172., ALLOW-- set the network segment to be connectable.

GGSCI (17216,10124224) 4 > edit param. / GLOBALS

-- add

CHECKPOINTTABLE ggsadmin.checkpoint

Full data synchronization (oracle to kafka)

1. Configure source-side data initialization

1) configure the source initialization process

GGSCI (dtproxy as ggsadmin@dtstack) 15 > add extract initkfk,sourceistable

2) configure source initialization parameters

GGSCI (dtproxy as ggsadmin@dtstack) 16 > edit params initkfk

EXTRACT initkfk

SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)

USERID ggsadmin,PASSWORD oracle

RMTHOST 172.16.101.242, MGRPORT 7810

RMTFILE. / dirdat/ek,maxfiles 999, megabytes 500

Table baiyang.ora_to_kfk

3) generate table structure define files at the source end

GGSCI (dtproxy as ggsadmin@dtstack) 17 > edit param define_kfk

-- add

Defsfile / ogg/dirdef/define_kfk.txt

Userid ggsadmin,password oracle

Table baiyang.ora_to_kfk

4) obtain full data of oracle

$cd / ogg

$. / defgen paramfile dirprm/define_kfk.prm

-- Definitions generated for 1 table in / oradata/oggorcl/ogg/dirdef/define_kfk.txt

5) transfer the acquired full data record to the destination side

-transfer this file to the destination segment dirdef folder

Scp / ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt

2. Configure the data initialization process on the target side

1) configure the initialization process on the target side

GGSCI (172,16,101,242,3) > ADD replicat initkfk,specialrun

2) configure the initialization parameters of the destination side

GGSCI (172,16,101,242,6) > edit params initkfk

-- add

SPECIALRUN

End runtime

Setenv (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")

Targetdb libfile libggjava.so set property=./dirprm/kafka.props

SOURCEDEFS. / dirdef/define_kfk.txt

REPLACEBADCHAR SKIP

SOURCECHARSET OVERRIDE ISO-8859-1

EXTFILE. / dirdat/ek

Reportcount every 1 minutes, rate

Grouptransops 10000

Map baiyang.ora_to_kfk,target baiyang.ora_to_kfk

3) configure ogg for kafka related parameters

Vi. / dirprm/kafka.props

-- add

Gg.handlerlist=kafkahandler

Gg.handler.kafkahandler.type=kafka

Gg.handler.kafkahandler.format.includePrimaryKeys=true

Gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

Gg.handler.kafkahandler.topicName=test_ogg-the parameter of the old version, which is used this time

# gg.handler.kafkahandler.topicMappingTemplate=test_ogg-New version parameter

Gg.handler.kafkahandler.format=json

Gg.handler.kafkahandler.mode=op

Gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*

Location of kafka installation ogg installation location

Copy the. / dirprm/kafka.props file to the / ogg/AdapterExamples/big-data/kafka directory

Vi. / dirprm/custom_kafka_producer.properties

Bootstrap.servers=172.16.101.242:9092-kafka address

Acks=-1

Compression.type=gzip

Reconnect.backoff.ms=1000

Value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Batch.size=102400-data is piled up

Linger.ms=10000-there is a delay in data transmission kafka

Copy the. / dirprm/custom_kafka_producer.properties file to / ogg/AdapterExamples/big-data/kafka

3. Start the task of full extraction

Source side:

GGSCI (dtproxy) 20 > start mgr

GGSCI (dtproxy) 21 > start initkfk

Application of total data on the target side

GGSCI (17216,101242) 13 > start mgr

Cd / ogg

. / replicat paramfile. / dirprm/initkfk.prm reportfile. / dirrpt/init01.rpt-p INITIALDATALOAD

-- check the application log for errors

Cd / opt/ogg/dirrpt

More init01.rpt

4. Verify the full data of kafka

Cd / kafka

Bin/kafka-console-consumer.sh-- bootstrap-server 172.16.101.242-- topic test_ogg-- from-beginning

{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11-11 20V 23V 19.703779", "current_ts": "2019-11-11T20:48:55.946000", "pos": "- 00000000000000001", "after": {"OWNER": "SYS", "OBJECT_NAME": "C_OBJ#", "SUBOBJECT_NAME": null, "OBJECT_ID": 2, "DATA_OBJECT_ID": 2 "OBJECT_TYPE": "CLUSTER"}}

{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11-11 20V 23V 19.703779", "current_ts": "2019-11-11T20:48:56.289000", "pos": "- 00000000000000001", "after": {"OWNER": "SYS", "OBJECT_NAME": "I_OBJ#", "SUBOBJECT_NAME": null, "OBJECT_ID": 3, "DATA_OBJECT_ID": 3 "OBJECT_TYPE": "INDEX"}}

All data has been synchronized to the target kafka topic test_ogg

Incremental data synchronization (oracle to kafka)

Source-side configuration

1. Source-side extraction process configuration

GGSCI (dtproxy) 9 > edit param extkfk

-- add

Extract extkfk

Dynamicresolution

SETENV (ORACLE_SID = "orcl")

SETENV (NLS_LANG = "american_america.AL32UTF8")

Userid ggsadmin,password oracle

FETCHOPTIONS NOUSESNAPSHOT

GETUPDATEBEFORES

NOCOMPRESSDELETES

NOCOMPRESSUPDATES

Exttrail. / dirdat/to

Table baiyang.ora_to_kfk

2. Add extract process

GGSCI (dtproxy) 10 > add extract extkfk,tranlog,begin now

GGSCI (dtproxy) 11 > add exttrail. / dirdat/to,extract extkfk

3. Configure the source push process

GGSCI (dtproxy) 12 > edit param pupkfk

-- add

Extract pupkfk

Passthru

Dynamicresolution

Userid ggsadmin,password oracle

Rmthost 172.16.101.242 mgrport 7810

Rmttrail. / dirdat/to

Table baiyang.ora_to_kfk

4. Add delivery process

GGSCI (dtproxy) 13 > add extract pupkfk,exttrailsource. / dirdat/to

GGSCI (dtproxy) 14 > add rmttrail. / dirdat/to,extract pupkfk

Target side configuration

1. Configure the recovery process on the target side

Edit param repkfk

-- add

REPLICAT repkfk

SOURCEDEFS. / dirdef/define_kfk.txt

Targetdb libfile libggjava.so set property=./dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk

2. Add trail files to the replicate process

Add replicat repkfk exttrail. / dirdat/to,checkpointtable ggsadmin.checkpoint

Enable incremental real-time data crawling

Source side:

. / ggsci

GGSCI (dtproxy) 5 > start extkfk

Sending START request to MANAGER...

EXTRACT EXTKFK starting

GGSCI (dtproxy) 6 > start pupkfk

Sending START request to MANAGER...

EXTRACT PUPKFK starting

GGSCI (dtproxy) 7 > status all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKFK 00:00:00 00:00:10

EXTRACT RUNNING PUPKFK 00:00:00 00:00:00

Destination side:

/ ggsci

GGSCI (172,16,101,242,7) > start replicat repkfk

Sending START request to MANAGER...

REPLICAT REPKFK starting

GGSCI (172,16,101,242,8) > info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REPKFK 00:00:00 00:00:00

Test incremental data fetching

Source side:

Oracle inserts incremental data

SQL > insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id > 500and object_id

< 1000; SQL>

Commit

SQL > select count (*) from baiyang.ora_to_kfk

COUNT (*)

nine hundred and five

Destination side:

View Kafka message queuing consumption data

Cd / kafka

Bin/kafka-console-consumer.sh-- bootstrap-server 172.16.101.242-- topic test_ogg

{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11 2121 04VR 11.158786", "current_ts": "2019-11-11T21:10:54.042000", "pos": "000000000000075298", "after": {"OWNER": "SYS", "OBJECT_NAME": "APPLY$_READER_STATS", "SUBOBJECT_NAME": null, "OBJECT_ID": 998, "DATA_OBJECT_ID": 998 "OBJECT_TYPE": "TABLE"}}

{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11 2121 04op_ts", "current_ts": "2019-11-11T21:10:54.042001", "pos": "000000000000075459", "after": {"OWNER": "SYS", "OBJECT_NAME": "APPLY$_READER_STATS_I", "SUBOBJECT_NAME": null, "OBJECT_ID": 999, "DATA_OBJECT_ID": 999 "OBJECT_TYPE": "INDEX"}}

DDL operation

If the table on the source side of ogg, that is, on the oracle side, adds or deletes fields, or modifies fields, etc., as long as the table structure is modified, even if it is a DDL operation, synchronous ddl statements are not supported in the stable version of ogg for bigdata 12.2, and ddl will be supported after version 122.3.

In 12.2 ogg for bigdata, if the source side does ddl, it needs to regenerate the definition of the define_kfk.txt file in the definition table structure file of the source side, and transfer the define_kfk.txt file to the target side.

Examples are as follows:

Source side: (oracle side)

1) add id field to the source table

Alter table ORA_TO_KFK add id number

2) the source side of ogg needs to regenerate the table definition file

Mv / ogg/dirdef/define_kfk.txt / ogg/dirdef/define_kfk.txt.bak1

Cd / ogg

/ defgen paramfile dirprm/define_kfk.prm

3) scp the generated table definition file to the target side

Cd / ogg

Scp. / dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/

4) the source extraction process needs to be restarted.

GGSCI (edsir1p9) 2 > stop EXTKFK

Sending STOP request to EXTRACT EXTKFK...

Request processed.

GGSCI (edsir1p9) 3 > start EXTKFK

Sending START request to MANAGER...

EXTRACT EXTKFK starting

GGSCI (edsir1p9) 4 > info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKFK 00:00:00 00:00:08

EXTRACT RUNNING PUPKFK 00:00:00 00:00:07

Destination side: (kafka side)

1) check that abend has occurred in the application process on the target side

GGSCI (node) 38 > info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT ABENDED REPKFK 00:10:27 00:05:29

2) start the replication process

GGSCI (node) 40 > start REPKFK

Sending START request to MANAGER...

REPLICAT REPKFK starting

GGSCI (node) 9 > info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REPKFK 00:00:00 00:00:04

Test:

Insert a piece of data on the source side

SQL > insert into ORA_TO_KFK (OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9)

1 row created.

SQL > commit

Destination side:

Cd / kafka

Bin/kafka-console-consumer.sh-- bootstrap-server 192.168.56.57 topic ogg_test

The data has been synchronized from the source side oracle to the destination side kafka. At this point, a new column has been added to oracle, which can be synchronized to kafka normally.

The above is all the content of the article "how oracle data is synchronized to the kafka message queue in real time through goldengate". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report