In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces how to synchronize oracle data to kafka message queue in real time through goldengate. It is very detailed and has certain reference value. Interested friends must finish reading it.
Environment introduction
Component version
module
Version
Description
Source-side oracle
Oracle 11.2.0.4 for linux x64
Source-side oracle
Source-side ogg
Oracle ogg 11.2.0.1.20 for oracle linux x64
Source-side ogg, which is used to extract the data changes of the source-side oracle and send the change log to the destination side
Destination side kafka
Kafka_2.11-0.11.0.2 for linux x64
Message queue, which receives the data pushed by the destination ogg
Destination side ogg
The destination ogg receives the oracle transaction change log sent by the source, and pushes the change to the kafka message queue
Overall architecture diagram
Noun interpretation
1.OGG Manager
OGG Manager is used to configure and manage other OGG components, configure data extraction, data push, data replication, start and stop related components, and view the operation of related components.
two。 Data extraction (Extract)
Extract changes to the source-side database (DML, DDL). Data extraction is mainly divided into the following types: local extraction captures incremental change data from the local database, writes to the local Trail file data push (Data Pump) reads the data from the local Trail file, and pushes it to the target. Initial data extraction derives full data from a database table for initial data loading
3. Data push (Data Pump)
Data Pump is a special type of data extraction (Extract) that reads data from a local Trail file and sends it over the network to the destination OGG.
4.Trail file
Data extraction the change information crawled from the source database will be written to the Trail file.
5. Data reception (Collector)
The data receiving program runs on the target machine and is used to receive the Trail log sent by Data Pump and write the data to the local Trail file.
6. Data replication (Replicat)
Data replication runs on the target machine, reads the data changes from the Trail file, and applies the change data to the target data storage system. In this case, data replication pushes the data to the kafka message queue.
7. Checkpoint (Checkpoint)
Checkpoints are used to record changes in database transactions.
Operation steps
Source-side Oracle database configuration
Open source archiving
SQL > archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination / u01/app/oracle/product/11.2.3/db_1/dbs/arch
Oldest online log sequence 12
Next log sequence to archive 17
Current log sequence 17
To open the archive, the solution is as follows:
Conn / as sysdba (connect to the database as DBA)
Shutdown immediate (shut down the database immediately)
Startup mount (start the instance and load the database, but not open it)
Alter database archivelog; (change database to archive mode)
Alter database open; (Open Database)
Alter system archive log start; (enable automatic archiving)
2) OGG carries out real-time transmission based on the auxiliary log, etc., so you need to open the relevant log to ensure that the transaction content can be obtained. Check the status by using the following command
SQL > select force_logging, supplemental_log_data_min,supplemental_log_data_all from v$database
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
-
YES YES
If the secondary log is not enabled, it needs to be enabled.
SQL > alter database force logging
SQL > alter database add supplemental log data
SQL > alter database add supplemental log data (all) columns
3. Enable goldengate replication parameters
SQL > alter system set enable_goldengate_replication = true
4. Create a source Oracle account
SQL > create tablespace tbs_ogg datafile'/ oradata/dtstack/tbs_ogg.dbf' size 1024m autoextend on
SQL > create user ggsadmin identified by oracle default tablespace tbs_ogg
SQL > grant dba to ggsadmin
5. Create a test table (production strategy)
SQL > create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id
< 500; SQL>Alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key (object_id)
SQL > select count (*) from baiyang.ora_to_kfk
COUNT (*)
-
four hundred and thirty six
Deploy ogg
Source side (oracle source side)
1. Decompression
Set up the ogg directory first
Mkdir-p / ogg
Tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar-C / ogg
Chown-R oracle:oinstall / ogg (enable oracle users to have ogg privileges, some of which need to be executed under oracle users to succeed)
2 configure ogg environment variables
For simplicity and convenience, it is recommended to configure oracle in the environment variable file / home/oracle/.bash_profile in production.
Export JAVA_HOME=/usr/local/java1.8
Export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
Export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jar
Export JAVA=$JAVA_HOME/bin/java
Export OGG_HOME=/ogg
Export PATH=$PATH:$OGG_HOME
Export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
Effective environment variable
Source / home/oracle/.bash_profile
3. OGG initialization
Ggsci
Create subdirs
Ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1 > create subdirs
Creating subdirectories under current directory / root
Parameter files / root/dirprm: created
Report files / root/dirrpt: created
Checkpoint files / root/dirchk: created
Process status files / root/dirpcs: created
SQL script files / root/dirsql: created
Database definitions files / root/dirdef: created
Extract data files / root/dirdat: created
Temporary files / root/dirtmp: created
Stdout files / root/dirout: created
4. Configure source-side Manager
GGSCI (dtproxy) 4 > dblogin userid ggsadmin password oracle
GGSCI (dtproxy as ggsadmin@dtstack) 5 > edit param. / globals
-- add
Oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6 > edit param mgr
-add
PORT 7810-default listening port
DYNAMICPORTLIST 7811-7820-dynamic Port list
AUTORESTART EXTRACT *, RETRIES 5WAITMINUTES 3-there is a problem with the process. Restart every 3 minutes for a total of five times
PURGEOLDEXTRACTS. / dirdat/, USECHECKPOINTS, MINKEEPDAYS 7-/
LAGREPORTHOURS 1-checks for transmission delays every other hour
LAGINFOMINUTES 30-error log will be written if the transfer delay exceeds 30 minutes
LAGCRITICALMINUTES 45-warning log will be written if the transmission delay exceeds 45 minutes
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7-periodically clean up trail files
-- ACCESSRULE, PROG, IPADDR 172., ALLOW-- set the network segment to be connectable.
5. Add synchronized table-level logs
GGSCI (dtproxy as ggsadmin@dtstack) 9 > add trandata baiyang.ora_to_kfk
GGSCI (dtproxy as ggsadmin@dtstack) 10 > info trandata baiyang.ora_to_kfk
Destination side (kafka destination side)
1. Decompression
Mkdir-p / ogg
Unzip V839824-01.zip
Tar xf ggs_Adapters_Linux_x64.tar-C / ogg/
2 configure ogg environment variables
For simplicity and convenience, it is recommended to configure oracle in the environment variable file / home/oracle/.bash_profile in production.
Export JAVA_HOME=/usr/local/java1.8/jre
Export PATH=$JAVA_HOME/bin:$PATH
Export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH
Export OGG_HOME=/ogg
Export PATH=$PATH:$OGG_HOME
Export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so
Effective environment variable
Source / home/oracle/.bash_profile
OGG initialization
Ggsci
Create subdirs
Ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1 > create subdirs
Creating subdirectories under current directory / root
Parameter files / root/dirprm: created
Report files / root/dirrpt: created
Checkpoint files / root/dirchk: created
Process status files / root/dirpcs: created
SQL script files / root/dirsql: created
Database definitions files / root/dirdef: created
Extract data files / root/dirdat: created
Temporary files / root/dirtmp: created
Stdout files / root/dirout: created
Configure source-side Manager
GGSCI (dtproxy as ggsadmin@dtstack) 6 > edit param mgr
-add
PORT 7810-default listening port
DYNAMICPORTLIST 7811-7820-dynamic Port list
AUTORESTART EXTRACT *, RETRIES 5WAITMINUTES 3-there is a problem with the process. Restart every 3 minutes for a total of five times
PURGEOLDEXTRACTS. / dirdat/, USECHECKPOINTS, MINKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7-periodically clean up trail files
-- ACCESSRULE, PROG, IPADDR 172., ALLOW-- set the network segment to be connectable.
GGSCI (17216,10124224) 4 > edit param. / GLOBALS
-- add
CHECKPOINTTABLE ggsadmin.checkpoint
Full data synchronization (oracle to kafka)
1. Configure source-side data initialization
1) configure the source initialization process
GGSCI (dtproxy as ggsadmin@dtstack) 15 > add extract initkfk,sourceistable
2) configure source initialization parameters
GGSCI (dtproxy as ggsadmin@dtstack) 16 > edit params initkfk
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE. / dirdat/ek,maxfiles 999, megabytes 500
Table baiyang.ora_to_kfk
3) generate table structure define files at the source end
GGSCI (dtproxy as ggsadmin@dtstack) 17 > edit param define_kfk
-- add
Defsfile / ogg/dirdef/define_kfk.txt
Userid ggsadmin,password oracle
Table baiyang.ora_to_kfk
4) obtain full data of oracle
$cd / ogg
$. / defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in / oradata/oggorcl/ogg/dirdef/define_kfk.txt
5) transfer the acquired full data record to the destination side
-transfer this file to the destination segment dirdef folder
Scp / ogg/dirdef/define_kfk.txt 172.16.101.242:/ogg/dirdef/define_kfk.txt
2. Configure the data initialization process on the target side
1) configure the initialization process on the target side
GGSCI (172,16,101,242,3) > ADD replicat initkfk,specialrun
2) configure the initialization parameters of the destination side
GGSCI (172,16,101,242,6) > edit params initkfk
-- add
SPECIALRUN
End runtime
Setenv (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")
Targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS. / dirdef/define_kfk.txt
REPLACEBADCHAR SKIP
SOURCECHARSET OVERRIDE ISO-8859-1
EXTFILE. / dirdat/ek
Reportcount every 1 minutes, rate
Grouptransops 10000
Map baiyang.ora_to_kfk,target baiyang.ora_to_kfk
3) configure ogg for kafka related parameters
Vi. / dirprm/kafka.props
-- add
Gg.handlerlist=kafkahandler
Gg.handler.kafkahandler.type=kafka
Gg.handler.kafkahandler.format.includePrimaryKeys=true
Gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
Gg.handler.kafkahandler.topicName=test_ogg-the parameter of the old version, which is used this time
# gg.handler.kafkahandler.topicMappingTemplate=test_ogg-New version parameter
Gg.handler.kafkahandler.format=json
Gg.handler.kafkahandler.mode=op
Gg.classpath=dirprm/:/kafka/libs/*:/ogg/:/ogg/lib/*
Location of kafka installation ogg installation location
Copy the. / dirprm/kafka.props file to the / ogg/AdapterExamples/big-data/kafka directory
Vi. / dirprm/custom_kafka_producer.properties
Bootstrap.servers=172.16.101.242:9092-kafka address
Acks=-1
Compression.type=gzip
Reconnect.backoff.ms=1000
Value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Batch.size=102400-data is piled up
Linger.ms=10000-there is a delay in data transmission kafka
Copy the. / dirprm/custom_kafka_producer.properties file to / ogg/AdapterExamples/big-data/kafka
3. Start the task of full extraction
Source side:
GGSCI (dtproxy) 20 > start mgr
GGSCI (dtproxy) 21 > start initkfk
Application of total data on the target side
GGSCI (17216,101242) 13 > start mgr
Cd / ogg
. / replicat paramfile. / dirprm/initkfk.prm reportfile. / dirrpt/init01.rpt-p INITIALDATALOAD
-- check the application log for errors
Cd / opt/ogg/dirrpt
More init01.rpt
4. Verify the full data of kafka
Cd / kafka
Bin/kafka-console-consumer.sh-- bootstrap-server 172.16.101.242-- topic test_ogg-- from-beginning
{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11-11 20V 23V 19.703779", "current_ts": "2019-11-11T20:48:55.946000", "pos": "- 00000000000000001", "after": {"OWNER": "SYS", "OBJECT_NAME": "C_OBJ#", "SUBOBJECT_NAME": null, "OBJECT_ID": 2, "DATA_OBJECT_ID": 2 "OBJECT_TYPE": "CLUSTER"}}
{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11-11 20V 23V 19.703779", "current_ts": "2019-11-11T20:48:56.289000", "pos": "- 00000000000000001", "after": {"OWNER": "SYS", "OBJECT_NAME": "I_OBJ#", "SUBOBJECT_NAME": null, "OBJECT_ID": 3, "DATA_OBJECT_ID": 3 "OBJECT_TYPE": "INDEX"}}
All data has been synchronized to the target kafka topic test_ogg
Incremental data synchronization (oracle to kafka)
Source-side configuration
1. Source-side extraction process configuration
GGSCI (dtproxy) 9 > edit param extkfk
-- add
Extract extkfk
Dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
Userid ggsadmin,password oracle
FETCHOPTIONS NOUSESNAPSHOT
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
Exttrail. / dirdat/to
Table baiyang.ora_to_kfk
2. Add extract process
GGSCI (dtproxy) 10 > add extract extkfk,tranlog,begin now
GGSCI (dtproxy) 11 > add exttrail. / dirdat/to,extract extkfk
3. Configure the source push process
GGSCI (dtproxy) 12 > edit param pupkfk
-- add
Extract pupkfk
Passthru
Dynamicresolution
Userid ggsadmin,password oracle
Rmthost 172.16.101.242 mgrport 7810
Rmttrail. / dirdat/to
Table baiyang.ora_to_kfk
4. Add delivery process
GGSCI (dtproxy) 13 > add extract pupkfk,exttrailsource. / dirdat/to
GGSCI (dtproxy) 14 > add rmttrail. / dirdat/to,extract pupkfk
Target side configuration
1. Configure the recovery process on the target side
Edit param repkfk
-- add
REPLICAT repkfk
SOURCEDEFS. / dirdef/define_kfk.txt
Targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk
2. Add trail files to the replicate process
Add replicat repkfk exttrail. / dirdat/to,checkpointtable ggsadmin.checkpoint
Enable incremental real-time data crawling
Source side:
. / ggsci
GGSCI (dtproxy) 5 > start extkfk
Sending START request to MANAGER...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6 > start pupkfk
Sending START request to MANAGER...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7 > status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
Destination side:
/ ggsci
GGSCI (172,16,101,242,7) > start replicat repkfk
Sending START request to MANAGER...
REPLICAT REPKFK starting
GGSCI (172,16,101,242,8) > info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
Test incremental data fetching
Source side:
Oracle inserts incremental data
SQL > insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id > 500and object_id
< 1000; SQL>Commit
SQL > select count (*) from baiyang.ora_to_kfk
COUNT (*)
nine hundred and five
Destination side:
View Kafka message queuing consumption data
Cd / kafka
Bin/kafka-console-consumer.sh-- bootstrap-server 172.16.101.242-- topic test_ogg
{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11 2121 04VR 11.158786", "current_ts": "2019-11-11T21:10:54.042000", "pos": "000000000000075298", "after": {"OWNER": "SYS", "OBJECT_NAME": "APPLY$_READER_STATS", "SUBOBJECT_NAME": null, "OBJECT_ID": 998, "DATA_OBJECT_ID": 998 "OBJECT_TYPE": "TABLE"}}
{"table": "BAIYANG.ORA_TO_KFK", "op_type": "I", "op_ts": "2019-11-11 2121 04op_ts", "current_ts": "2019-11-11T21:10:54.042001", "pos": "000000000000075459", "after": {"OWNER": "SYS", "OBJECT_NAME": "APPLY$_READER_STATS_I", "SUBOBJECT_NAME": null, "OBJECT_ID": 999, "DATA_OBJECT_ID": 999 "OBJECT_TYPE": "INDEX"}}
DDL operation
If the table on the source side of ogg, that is, on the oracle side, adds or deletes fields, or modifies fields, etc., as long as the table structure is modified, even if it is a DDL operation, synchronous ddl statements are not supported in the stable version of ogg for bigdata 12.2, and ddl will be supported after version 122.3.
In 12.2 ogg for bigdata, if the source side does ddl, it needs to regenerate the definition of the define_kfk.txt file in the definition table structure file of the source side, and transfer the define_kfk.txt file to the target side.
Examples are as follows:
Source side: (oracle side)
1) add id field to the source table
Alter table ORA_TO_KFK add id number
2) the source side of ogg needs to regenerate the table definition file
Mv / ogg/dirdef/define_kfk.txt / ogg/dirdef/define_kfk.txt.bak1
Cd / ogg
/ defgen paramfile dirprm/define_kfk.prm
3) scp the generated table definition file to the target side
Cd / ogg
Scp. / dirdef/define_kfk.txt root@192.168.56.57:/ogg/dirdef/
4) the source extraction process needs to be restarted.
GGSCI (edsir1p9) 2 > stop EXTKFK
Sending STOP request to EXTRACT EXTKFK...
Request processed.
GGSCI (edsir1p9) 3 > start EXTKFK
Sending START request to MANAGER...
EXTRACT EXTKFK starting
GGSCI (edsir1p9) 4 > info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:08
EXTRACT RUNNING PUPKFK 00:00:00 00:00:07
Destination side: (kafka side)
1) check that abend has occurred in the application process on the target side
GGSCI (node) 38 > info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT ABENDED REPKFK 00:10:27 00:05:29
2) start the replication process
GGSCI (node) 40 > start REPKFK
Sending START request to MANAGER...
REPLICAT REPKFK starting
GGSCI (node) 9 > info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:04
Test:
Insert a piece of data on the source side
SQL > insert into ORA_TO_KFK (OWNER,OBJECT_NAME,OBJECT_ID,ID) values ('gg','gg',876,9)
1 row created.
SQL > commit
Destination side:
Cd / kafka
Bin/kafka-console-consumer.sh-- bootstrap-server 192.168.56.57 topic ogg_test
The data has been synchronized from the source side oracle to the destination side kafka. At this point, a new column has been added to oracle, which can be synchronized to kafka normally.
The above is all the content of the article "how oracle data is synchronized to the kafka message queue in real time through goldengate". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.