Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Using Goldengate to synchronize one of the heterogeneous database Kafka middleware

2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Received a request from the business department to synchronize a table in the Oracle database to the Mysql database. We use kafka to implement the heterogeneous environment. Here are some specific configurations.

Due to business needs, we now apply to use the architecture group data synchronization service to synchronize the following data to the butler MySQL database

Agent user data:

a. Data source: SSP library AAA.system_user

b. Data target: MySQL DLS library DLS_SYSTEM_USER

c. Synchronization logic: none

d. Synchronization data and correspondence: see attachment

e. Whether sensitive information is involved: no

Preparation work; since the table already exists in the target library Mysql library, we back up the table and get the table creation statement

-- get the table creation statement

Mysql > show create table dls_system_user

-- Export a single data table structure and data

Mysqldump-uroot-p dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql

-- renaming table

ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622

-- create an empty table

CREATE TABLE dls_system_user (

ID varchar (100) NOT NULL

ACCOUNT_EXPIRED int (1) NOT NULL DEFAULT'0'

ACCOUNT_LOCKED int (1) NOT NULL DEFAULT'0'

ENABLED int (1) NOT NULL DEFAULT'0'

ORG_NO varchar (255) NOT NULL DEFAULT''

USER_CODE varchar (100) NOT NULL DEFAULT''

REMARK_NAME varchar (255) NOT NULL DEFAULT''

IS_CREATE_PERSON varchar (255) NOT NULL DEFAULT''

STATUS int (10) NOT NULL DEFAULT'0'

PRIMARY KEY (ID)

KEY IDX_DLS_SYSTEM_USER_USER_CODE (USER_CODE)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

Oracle source GoldenGate configuration:

1. Add additional logs to the tables to be synchronized

Dblogin USERID ggs@ntestdb, password ggs

Add trandata AAA.system_user

2. Add extraction process

Add extract ext_kafb, tranlog, begin now

Add EXTTRAIL. / dirdat/a2, extract ext_kafb,MEGABYTES 200

Edit params EXT_KAFB

Extract EXT_KAFB

USERID ggs@ntestdb, password ggs

LOGALLSUPCOLS

Exttrail. / dirdat/a2,FORMAT RELEASE 11.2

Table AAA.system_user

3. Add delivery process:

Add extract pmp_kafb, exttrailsource. / dirdat/a2

Add rmttrail. / dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200

Eidt params pmp_kafb

EXTRACT pmp_kafb

USERID ggs@ntestdb, password ggs

PASSTHRU

RMTHOST 172.16.xxx.5, MGRPORT 9178-- kafka server address

RMTTRAIL. / dirdat/b2,format release 11.2

Table AAA.system_user

-initialization files are stored in / ggs/ggs12/dirprm/

4. Add initialization process

ADD EXTRACT ek_20, sourceistable-added at the source end

Edit params ek_20

EXTRACT ek_20

USERID ggs@ntestdb, password ggs

RMTHOST 172.16.154.5, MGRPORT 9178

RMTFILE. / dirdat/lb,maxfiles 999, megabytes 500

Table AAA.system_user

5. Generate the def file:

GGSCI > edit param defgen_n9

USERID ggs@ntestdb, password ggs

Defsfile / goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2

Table AAA.system_user

Execute the following command under OGG_HOME to generate the def file

Defgen paramfile / goldengate/ggskafka/dirprm/defgen_n9.prm

Transfer the generated def file to the kafka server $OGG_HOME/dirdef

-the destination mysql database address is 172.16.xxx.148, and a new kafka user is required

Grant select,insert,update,delete,create,drop on DLS.* to 'kafka'@'%' identified by' jiubugaosuni'

-- kafka server GoldenGate operation

1. Add initialization process:-dirprm

GGSCI > ADD replicat rn_3,specialrun

EDIT PARAMS rn_3

SPECIALRUN

End runtime

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

Targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props

SOURCEDEFS. / dirdef/defgen_n9.def

EXTFILE. / dirdat/lb

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER

2. Add replication process:

GGSCI > add replicat RN_KF3,exttrail. / dirdat/b2

GGSCI > edit params RN_KF3

REPLICAT RN_KF3

Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")

HANDLECOLLISIONS

Targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props

SOURCEDEFS. / dirdef/defgen_n9.def

Reportcount every 1 minutes, rate

Grouptransops 10000

MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER

3. Parameter configuration:

Cd / home/app/ogg/ggs12/dirprm

The custom_kafka_producer.properties file is as follows:

[app@test-datamanager dirprm] $more custom_kafka_producer.properties

Bootstrap.servers=172.16.xxx.5:9092172.16.xxx.7:9092

Acks=1

Reconnect.backoff.ms=1000

Value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

Key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

100KB per partition

Batch.size=16384

Linger.ms=0

-vi add the corresponding file kafkat_n3.props

The kafka.props file is as follows:

Gg.handlerlist = kafkahandler

Gg.handler.kafkahandler.type=kafka

Gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

# The following resolves the topic name using the short table name

Gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS

# gg.handler.kafkahandler.format=avro_op

Gg.handler.kafkahandler.format = json-- specify the file type

Gg.handler.kafkahandler.format.insertOpKey=I

Gg.handler.kafkahandler.format.updateOpKey=U

Gg.handler.kafkahandler.format.deleteOpKey=D

Gg.handler.kafkahandler.format.truncateOpKey=T

Gg.handler.kafkahandler.format.prettyPrint=false

Gg.handler.kafkahandler.format.jsonDelimiter=CDATA []

Gg.handler.kafkahandler.format.includePrimaryKeys=true

Gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS-- specify the topic name

Gg.handler.kafkahandler.BlockingSend = false

Gg.handler.kafkahandler.includeTokens=false

Gg.handler.kafkahandler.mode=op

Goldengate.userexit.timestamp=utc

Goldengate.userexit.writers=javawriter

Javawriter.stats.display=TRUE

Javawriter.stats.full=TRUE

Gg.log=log4j

Gg.log.level=INFO

Gg.report.time=30sec

# Sample gg.classpath for Apache Kafka

Gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/-patch path

# Sample gg.classpath for HDP

# gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

Javawriter.bootoptions=-Xmx512m-Xms32m-Djava.class.path=ggjava/ggjava.jar

At this point, our configuration is basically complete, and now let's start the process and initialize the data.

1. Start the source-side crawling process

GGSCI > start EXT_KAFB

2. Start the source delivery process

GGSCI > start pmp_kafb

3. Start the source initialization process

GGSCI > start ek_20

4. Start the initialization process on the target side

GGSCI > start rn_3

Execute the following command under $OGG_HOME:

. / replicat paramfile. / dirprm/rn_3.prm reportfile. / dirrpt/rn_3.rpt-p INITIALDATALOAD

5. Start the recovery process on the target side

GGSCI > start RN_KF3

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report