In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Received a request from the business department to synchronize a table in the Oracle database to the Mysql database. We use kafka to implement the heterogeneous environment. Here are some specific configurations.
Due to business needs, we now apply to use the architecture group data synchronization service to synchronize the following data to the butler MySQL database
Agent user data:
a. Data source: SSP library AAA.system_user
b. Data target: MySQL DLS library DLS_SYSTEM_USER
c. Synchronization logic: none
d. Synchronization data and correspondence: see attachment
e. Whether sensitive information is involved: no
Preparation work; since the table already exists in the target library Mysql library, we back up the table and get the table creation statement
-- get the table creation statement
Mysql > show create table dls_system_user
-- Export a single data table structure and data
Mysqldump-uroot-p dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql
-- renaming table
ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622
-- create an empty table
CREATE TABLE dls_system_user (
ID varchar (100) NOT NULL
ACCOUNT_EXPIRED int (1) NOT NULL DEFAULT'0'
ACCOUNT_LOCKED int (1) NOT NULL DEFAULT'0'
ENABLED int (1) NOT NULL DEFAULT'0'
ORG_NO varchar (255) NOT NULL DEFAULT''
USER_CODE varchar (100) NOT NULL DEFAULT''
REMARK_NAME varchar (255) NOT NULL DEFAULT''
IS_CREATE_PERSON varchar (255) NOT NULL DEFAULT''
STATUS int (10) NOT NULL DEFAULT'0'
PRIMARY KEY (ID)
KEY IDX_DLS_SYSTEM_USER_USER_CODE (USER_CODE)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
Oracle source GoldenGate configuration:
1. Add additional logs to the tables to be synchronized
Dblogin USERID ggs@ntestdb, password ggs
Add trandata AAA.system_user
2. Add extraction process
Add extract ext_kafb, tranlog, begin now
Add EXTTRAIL. / dirdat/a2, extract ext_kafb,MEGABYTES 200
Edit params EXT_KAFB
Extract EXT_KAFB
USERID ggs@ntestdb, password ggs
LOGALLSUPCOLS
Exttrail. / dirdat/a2,FORMAT RELEASE 11.2
Table AAA.system_user
3. Add delivery process:
Add extract pmp_kafb, exttrailsource. / dirdat/a2
Add rmttrail. / dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200
Eidt params pmp_kafb
EXTRACT pmp_kafb
USERID ggs@ntestdb, password ggs
PASSTHRU
RMTHOST 172.16.xxx.5, MGRPORT 9178-- kafka server address
RMTTRAIL. / dirdat/b2,format release 11.2
Table AAA.system_user
-initialization files are stored in / ggs/ggs12/dirprm/
4. Add initialization process
ADD EXTRACT ek_20, sourceistable-added at the source end
Edit params ek_20
EXTRACT ek_20
USERID ggs@ntestdb, password ggs
RMTHOST 172.16.154.5, MGRPORT 9178
RMTFILE. / dirdat/lb,maxfiles 999, megabytes 500
Table AAA.system_user
5. Generate the def file:
GGSCI > edit param defgen_n9
USERID ggs@ntestdb, password ggs
Defsfile / goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2
Table AAA.system_user
Execute the following command under OGG_HOME to generate the def file
Defgen paramfile / goldengate/ggskafka/dirprm/defgen_n9.prm
Transfer the generated def file to the kafka server $OGG_HOME/dirdef
-the destination mysql database address is 172.16.xxx.148, and a new kafka user is required
Grant select,insert,update,delete,create,drop on DLS.* to 'kafka'@'%' identified by' jiubugaosuni'
-- kafka server GoldenGate operation
1. Add initialization process:-dirprm
GGSCI > ADD replicat rn_3,specialrun
EDIT PARAMS rn_3
SPECIALRUN
End runtime
Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")
Targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS. / dirdef/defgen_n9.def
EXTFILE. / dirdat/lb
Reportcount every 1 minutes, rate
Grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER
2. Add replication process:
GGSCI > add replicat RN_KF3,exttrail. / dirdat/b2
GGSCI > edit params RN_KF3
REPLICAT RN_KF3
Setenv (NLS_LANG= "AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
Targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS. / dirdef/defgen_n9.def
Reportcount every 1 minutes, rate
Grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER
3. Parameter configuration:
Cd / home/app/ogg/ggs12/dirprm
The custom_kafka_producer.properties file is as follows:
[app@test-datamanager dirprm] $more custom_kafka_producer.properties
Bootstrap.servers=172.16.xxx.5:9092172.16.xxx.7:9092
Acks=1
Reconnect.backoff.ms=1000
Value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
Key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
100KB per partition
Batch.size=16384
Linger.ms=0
-vi add the corresponding file kafkat_n3.props
The kafka.props file is as follows:
Gg.handlerlist = kafkahandler
Gg.handler.kafkahandler.type=kafka
Gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# The following resolves the topic name using the short table name
Gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS
# gg.handler.kafkahandler.format=avro_op
Gg.handler.kafkahandler.format = json-- specify the file type
Gg.handler.kafkahandler.format.insertOpKey=I
Gg.handler.kafkahandler.format.updateOpKey=U
Gg.handler.kafkahandler.format.deleteOpKey=D
Gg.handler.kafkahandler.format.truncateOpKey=T
Gg.handler.kafkahandler.format.prettyPrint=false
Gg.handler.kafkahandler.format.jsonDelimiter=CDATA []
Gg.handler.kafkahandler.format.includePrimaryKeys=true
Gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS-- specify the topic name
Gg.handler.kafkahandler.BlockingSend = false
Gg.handler.kafkahandler.includeTokens=false
Gg.handler.kafkahandler.mode=op
Goldengate.userexit.timestamp=utc
Goldengate.userexit.writers=javawriter
Javawriter.stats.display=TRUE
Javawriter.stats.full=TRUE
Gg.log=log4j
Gg.log.level=INFO
Gg.report.time=30sec
# Sample gg.classpath for Apache Kafka
Gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/-patch path
# Sample gg.classpath for HDP
# gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
Javawriter.bootoptions=-Xmx512m-Xms32m-Djava.class.path=ggjava/ggjava.jar
At this point, our configuration is basically complete, and now let's start the process and initialize the data.
1. Start the source-side crawling process
GGSCI > start EXT_KAFB
2. Start the source delivery process
GGSCI > start pmp_kafb
3. Start the source initialization process
GGSCI > start ek_20
4. Start the initialization process on the target side
GGSCI > start rn_3
Execute the following command under $OGG_HOME:
. / replicat paramfile. / dirprm/rn_3.prm reportfile. / dirrpt/rn_3.rpt-p INITIALDATALOAD
5. Start the recovery process on the target side
GGSCI > start RN_KF3
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.