In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "how to send Oracle data to kafka to transmit data". In daily operation, I believe many people have doubts about how to send Oracle data to kafka to transmit data. Xiaobian consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the question of "how to send Oracle data to kafka to transmit data". Next, please follow the editor to study!
Configuration
OGG ADPATER FOR KAFKA
Required kafka package:
Kafka 0.8.2.1
Kafka-clients-0.8.2.1.jar
Lz4-1.2.0.jar
Slf4j-api-1.7.6.jar
Snappy-java-1.1.1.6.jar
# configure OGG
Main library
Dblogin userid goldengate, password oggpassword add extract EXTJMS,tranlog, threads 3,begin now add exttrail / data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200
Add trandata testKAFKA.*-add schematrandata testKAFKA
Extraction process:
Edit param extjms EXTRACT EXTJMS
SETENV (ORACLE_SID = "rac3")
SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)
SETENV (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500
Userid goldengate, password oggpassword
EXTTRAIL / data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5
DISCARDFILE / data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500
Tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN
CHECKPOINTSECS 5
FLUSHCSECS 80
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
-- DDL INCLUDE ALL
DDL INCLUDE MAPPED &
Exclude objname testKAFKA.PK_CATEGORY_RANKLIST &
Exclude objtype 'PACKAGE' &
Exclude objtype 'PACKAGE BODY' &
Exclude INSTR 'REPLACE SYNONYM' &
Exclude INSTR 'CREATE OR REPLACE PACKAGE' &
Exclude objtype 'PROCEDURE' &
Exclude objtype 'FUNCTION' &
Exclude objtype 'TYPE' &
Exclude objtype 'TRIGGER' &
Exclude objtype 'GRANT' &
Exclude instr 'GRANT' &
Exclude objtype 'DATABASE LINK' &
Exclude objtype 'CONSTRAINT' &
Exclude objtype 'JOB' &
Exclude instr 'ALTER SESSION' &
Exclude instr 'MATERIALIZED VIEW' &
Exclude INSTR'AS SELECT' &
Exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP"
-- GETUPDATEBEFORES
-- ddloptions addtrandata,REPORT
FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT
Dynamicresolution
EOFDELAYCSECS 5
TABLEEXCLUDE testKAFKA.RULE_ACTION_LOG
TABLE testKAFKA.*
SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS; Database altered. SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS; Database altered. SQL > select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from vested database; SUPPLEME SUP SUP FOR
YES YES YES YES SQL > add a new pump process on the source side:
Add the pump process in the testKAFKA source library test:
Add a pump process:
Add a new pump:
Add extract EDPKK,exttrailsource / data/oggsrc/dirdat/jm, begin now edit param EDPKK EXTRACT EDPKK
Setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
RecoveryOptions OverwriteMode
RMTHOST 192.168.0.3, MGRPORT 7839
RMTTRAIL / data/ogg_for_bigdata/dirdat/kk
RMTTRAIL / data/ogg_for_kafka/dirdat/kk
DISCARDFILE. / dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5
TABLE testKAFKA.*; add rmttrail / data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200add rmttrail / data/ogg_for_kafka/dirdat/kk,extract EDPKK megabytes 200Editing the definition file:
Userid goldengate, password oggpassword
Defsfile dirdef/testKAFKA.def
TABLEEXCLUDE * .DBMS_TABCOMP_TEMP*
TABLE testKAFKA.*
Pass the definition file:
. / defgen paramfile. / dirprm/defgen.prm
Cd dirdef/
[oracle@vm01 dirdef] $scp dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef
The authenticity of host '192.168.0.3 (192.168.0.3)' can't be established.
RSA key fingerprint is 46:8c:35:61:74:ca:43:e0:b0:74:d5:ff:0c:2f:67:8a.
Are you sure you want to continue connecting (yes/no)? Yes
Warning: Permanently added '192.168.0.3' (RSA) to the list of known hosts.
Reverse mapping checking getaddrinfo for bogon failed-POSSIBLE BREAK-IN ATTEMPT!
Oracle@192.168.0.3's password:
TestKAFKA.def 100% 899KB 898.7KB/s 00:00
[oracle@vm01 dirdef] $destination side Direct side
Mgr: PORT 7839
DYNAMICPORTLIST 7840-7850
-- AUTOSTART replicat *
-- AUTORESTART replicat *, RETRIES 5 Magi WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS / data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
PURGEOLDEXTRACTS / data/ogg_for_kafka/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
Add UE DATA PUMP:
Use version:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209 new port 7889 on the destination side:
ADD EXTRACT repkk, EXTTRAILSOURCE / data/ogg_for_bigdata/dirdat/kk
ADD EXTRACT repkk, EXTTRAILSOURCE / data/ogg_for_kafka/dirdat/kk edit param repkk GGSCI (localhost.localdomain) 18 > view param repkk EXTRACT repkk
SETENV (GGS_USEREXIT_CONF = "dirprm/repkk.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/testKAFKA.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE testKAFKA.MV*
TABLE testKAFKA.*
[oracle@repvm dirdef] $cp testKAFKA.def / data/ogg_for_kafka/dirdef
Configuration file:
[oracle@repvm dirprm] $cat repkk.props
Gg.handlerlist = kafkahandler
# gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler
Gg.handler.kafkahandler.type=kafka
Gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties
Gg.handler.kafkahandler.TopicName = zqtest
Gg.handler.kafkahandler.format = avro_op
Gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
Gg.handler.kafkahandler.BlockingSend = false
Gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode = tx
# gg.handler.kafkahandler.maxGroupSize = 100,100, 1Mb
# gg.handler.kafkahandler.minGroupSize = 50, 500Kb goldengate.userexit.timestamp=utc
Goldengate.userexit.writers=javawriter
Javawriter.stats.display=TRUE
Javawriter.stats.full=TRUE gg.log=log4j
# gg.log.level=INFO
Gg.log.level=DEBUG gg.report.time=30sec # gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2 amp libs *
Gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2 * javawriter.bootoptions=-Xmx4096m-Xms4096m-Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar- Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
[oracle@repvm dirprm] $javawriter.bootoptions must contain the lib package of ogg for kafka
Properties file for kafka:
Bootstrap.servers=localhost:9092
Acks = 1
Compression.type = gzip
Reconnect.backoff.ms = 1000
Value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
Key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
Batch.size = 102400
Linger.ms = 10000
Max.request.size = 5024000
Send.buffer.bytes = 5024000
Compression.type parameter default: configure kafka:
[oracle@repvm kafka_2.10-0.8.2.2] $pwd
/ data/kafka_2.10-0.8.2.2
[oracle@repvm kafka_2.10-0.8.2.2] $
[oracle@repvm kafka_2.10-0.8.2.2] $grep-v'^ $\ | ^\ s *\ # 'config/server.properties
Broker.id=0
Port=9092
Num.network.threads=3
Num.io.threads=8
Socket.send.buffer.bytes=102400
Socket.receive.buffer.bytes=102400
Socket.request.max.bytes=104857600
Log.dirs=/tmp/kafka-logs
Num.partitions=1
Num.recovery.threads.per.data.dir=1
Log.retention.hours=168
Log.segment.bytes=1073741824
Log.retention.check.interval.ms=300000
Log.cleaner.enable=false
Zookeeper.connect=localhost:2181
Zookeeper.connection.timeout.ms=6000
[oracle@repvm kafka_2.10-0.8.2.2] $
[oracle@repvm libs] $ll
Total 17452
-rw-r--r-- 1 oracle oinstall 53244 Aug 31 2014 jopt-simple-3.2.jar
-rw-r--r-- 1 oracle oinstall 3991269 Sep 3 2015 kafka_2.10-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 37748 Sep 3 2015 kafka_2.10-0.8.2.2-javadoc.jar
-rw-r--r-- 1 oracle oinstall 2324165 Sep 3 2015 kafka_2.10-0.8.2.2-scaladoc.jar
-rw-r--r-- 1 oracle oinstall 521466 Sep 3 2015 kafka_2.10-0.8.2.2-sources.jar
-rw-r--r-- 1 oracle oinstall 1233391 Sep 3 2015 kafka_2.10-0.8.2.2-test.jar
-rw-r--r-- 1 oracle oinstall 324016 Sep 3 2015 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 481535 Aug 31 2014 log4j-1.2.16.jar
-rw-r--r-- 1 oracle oinstall 165505 Aug 31 2014 lz4-1.2.0.jar
-rw-r--r-- 1 oracle oinstall 82123 Aug 31 2014 metrics-core-2.2.0.jar
-rw-r--r-- 1 oracle oinstall 7126372 Nov 25 2014 scala-library-2.10.4.jar
-rw-r--r-- 1 oracle oinstall 28688 Aug 31 2014 slf4j-api-1.7.6.jar
-rw-r--r-- 1 oracle oinstall 9753 Aug 31 2014 slf4j-log4j12-1.6.1.jar
-rw-r--r-- 1 oracle oinstall 594033 May 29 2015 snappy-java-1.1.1.7.jar
-rw-r--r-- 1 oracle oinstall 64009 Aug 31 2014 zkclient-0.3.jar
-rw-r--r-- 1 oracle oinstall 792964 Aug 31 2014 zookeeper-3.4.6.jar
[oracle@repvm libs] $
Kafka-clients-0.8.2.1.jar
Lz4-1.2.0.jar
Slf4j-api-1.7.6.jar
Snappy-java-1.1.1.6.jar
[oracle@repvm bin] $pwd
/ data/kafka_2.10-0.8.2.2/bin
[oracle@repvm bin] $
Nohup sh kafka-server-start.sh.. / config/server.properties > / tmp/server.log &
Start zookeeper first:
[oracle@repvm kafka_2.10-0.8.2.2] $nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 18645
Nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2] $tail-f nohup.out
[2016-06-02 12 INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 purl 22 42981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 22 43013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 22 14 013] INFO minSessionTimeout set to-1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 22 14 013] INFO maxSessionTimeout set to-1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12 22 43035] INFO binding to port 0.0.0.0 Unix 0.0.0.0 purl 2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Start kafka:
[oracle@repvm kafka_2.10-0.8.2.2] $nohup bin/kafka-server-start.sh config/server.properties &
[1] 18845
Nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2] $create topic:
[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic zqtest
Created topic "zqtest".
[oracle@repvm kafka_2.10-0.8.2.2] $
View:
[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-topics.sh-- list-- zookeeper localhost:2181
Zqtest
[oracle@repvm kafka_2.10-0.8.2.2] $. / kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic zqtest
Test send message:
[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-console-producer.sh-- broker-list localhost:9092-- topic zqtest
[2016-06-02 14 ds 33 ds] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 50690
Receiving end:
[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-console-consumer.sh-- zookeeper localhost:2181-- topic zqtest-- from-beginning ds successfully accepted!
At this point, the study on "how to send Oracle data to kafka to transmit data" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.