Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to send Oracle data to kafka to transfer data

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "how to send Oracle data to kafka to transmit data". In daily operation, I believe many people have doubts about how to send Oracle data to kafka to transmit data. Xiaobian consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the question of "how to send Oracle data to kafka to transmit data". Next, please follow the editor to study!

Configuration

OGG ADPATER FOR KAFKA

Required kafka package:

Kafka 0.8.2.1

Kafka-clients-0.8.2.1.jar

Lz4-1.2.0.jar

Slf4j-api-1.7.6.jar

Snappy-java-1.1.1.6.jar

# configure OGG

Main library

Dblogin userid goldengate, password oggpassword add extract EXTJMS,tranlog, threads 3,begin now add exttrail / data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200

Add trandata testKAFKA.*-add schematrandata testKAFKA

Extraction process:

Edit param extjms EXTRACT EXTJMS

SETENV (ORACLE_SID = "rac3")

SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)

SETENV (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")

DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500

Userid goldengate, password oggpassword

EXTTRAIL / data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5

DISCARDFILE / data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500

Tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN

CHECKPOINTSECS 5

FLUSHCSECS 80

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

-- DDL INCLUDE ALL

DDL INCLUDE MAPPED &

Exclude objname testKAFKA.PK_CATEGORY_RANKLIST &

Exclude objtype 'PACKAGE' &

Exclude objtype 'PACKAGE BODY' &

Exclude INSTR 'REPLACE SYNONYM' &

Exclude INSTR 'CREATE OR REPLACE PACKAGE' &

Exclude objtype 'PROCEDURE' &

Exclude objtype 'FUNCTION' &

Exclude objtype 'TYPE' &

Exclude objtype 'TRIGGER' &

Exclude objtype 'GRANT' &

Exclude instr 'GRANT' &

Exclude objtype 'DATABASE LINK' &

Exclude objtype 'CONSTRAINT' &

Exclude objtype 'JOB' &

Exclude instr 'ALTER SESSION' &

Exclude instr 'MATERIALIZED VIEW' &

Exclude INSTR'AS SELECT' &

Exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP"

-- GETUPDATEBEFORES

-- ddloptions addtrandata,REPORT

FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT

Dynamicresolution

EOFDELAYCSECS 5

TABLEEXCLUDE testKAFKA.RULE_ACTION_LOG

TABLE testKAFKA.*

SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS; Database altered. SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS; Database altered. SQL > select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from vested database; SUPPLEME SUP SUP FOR

YES YES YES YES SQL > add a new pump process on the source side:

Add the pump process in the testKAFKA source library test:

Add a pump process:

Add a new pump:

Add extract EDPKK,exttrailsource / data/oggsrc/dirdat/jm, begin now edit param EDPKK EXTRACT EDPKK

Setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

RecoveryOptions OverwriteMode

RMTHOST 192.168.0.3, MGRPORT 7839

RMTTRAIL / data/ogg_for_bigdata/dirdat/kk

RMTTRAIL / data/ogg_for_kafka/dirdat/kk

DISCARDFILE. / dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5

TABLE testKAFKA.*; add rmttrail / data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200add rmttrail / data/ogg_for_kafka/dirdat/kk,extract EDPKK megabytes 200Editing the definition file:

Userid goldengate, password oggpassword

Defsfile dirdef/testKAFKA.def

TABLEEXCLUDE * .DBMS_TABCOMP_TEMP*

TABLE testKAFKA.*

Pass the definition file:

. / defgen paramfile. / dirprm/defgen.prm

Cd dirdef/

[oracle@vm01 dirdef] $scp dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef

The authenticity of host '192.168.0.3 (192.168.0.3)' can't be established.

RSA key fingerprint is 46:8c:35:61:74:ca:43:e0:b0:74:d5:ff:0c:2f:67:8a.

Are you sure you want to continue connecting (yes/no)? Yes

Warning: Permanently added '192.168.0.3' (RSA) to the list of known hosts.

Reverse mapping checking getaddrinfo for bogon failed-POSSIBLE BREAK-IN ATTEMPT!

Oracle@192.168.0.3's password:

TestKAFKA.def 100% 899KB 898.7KB/s 00:00

[oracle@vm01 dirdef] $destination side Direct side

Mgr: PORT 7839

DYNAMICPORTLIST 7840-7850

-- AUTOSTART replicat *

-- AUTORESTART replicat *, RETRIES 5 Magi WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS / data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

PURGEOLDEXTRACTS / data/ogg_for_kafka/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

Add UE DATA PUMP:

Use version:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209 new port 7889 on the destination side:

ADD EXTRACT repkk, EXTTRAILSOURCE / data/ogg_for_bigdata/dirdat/kk

ADD EXTRACT repkk, EXTTRAILSOURCE / data/ogg_for_kafka/dirdat/kk edit param repkk GGSCI (localhost.localdomain) 18 > view param repkk EXTRACT repkk

SETENV (GGS_USEREXIT_CONF = "dirprm/repkk.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/testKAFKA.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NoTcpSourceTimer

TABLEEXCLUDE testKAFKA.MV*

TABLE testKAFKA.*

[oracle@repvm dirdef] $cp testKAFKA.def / data/ogg_for_kafka/dirdef

Configuration file:

[oracle@repvm dirprm] $cat repkk.props

Gg.handlerlist = kafkahandler

# gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler

Gg.handler.kafkahandler.type=kafka

Gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties

Gg.handler.kafkahandler.TopicName = zqtest

Gg.handler.kafkahandler.format = avro_op

Gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic

Gg.handler.kafkahandler.BlockingSend = false

Gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode = tx

# gg.handler.kafkahandler.maxGroupSize = 100,100, 1Mb

# gg.handler.kafkahandler.minGroupSize = 50, 500Kb goldengate.userexit.timestamp=utc

Goldengate.userexit.writers=javawriter

Javawriter.stats.display=TRUE

Javawriter.stats.full=TRUE gg.log=log4j

# gg.log.level=INFO

Gg.log.level=DEBUG gg.report.time=30sec # gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2 amp libs *

Gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2 * javawriter.bootoptions=-Xmx4096m-Xms4096m-Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar- Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

[oracle@repvm dirprm] $javawriter.bootoptions must contain the lib package of ogg for kafka

Properties file for kafka:

Bootstrap.servers=localhost:9092

Acks = 1

Compression.type = gzip

Reconnect.backoff.ms = 1000

Value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer

Key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

Batch.size = 102400

Linger.ms = 10000

Max.request.size = 5024000

Send.buffer.bytes = 5024000

Compression.type parameter default: configure kafka:

[oracle@repvm kafka_2.10-0.8.2.2] $pwd

/ data/kafka_2.10-0.8.2.2

[oracle@repvm kafka_2.10-0.8.2.2] $

[oracle@repvm kafka_2.10-0.8.2.2] $grep-v'^ $\ | ^\ s *\ # 'config/server.properties

Broker.id=0

Port=9092

Num.network.threads=3

Num.io.threads=8

Socket.send.buffer.bytes=102400

Socket.receive.buffer.bytes=102400

Socket.request.max.bytes=104857600

Log.dirs=/tmp/kafka-logs

Num.partitions=1

Num.recovery.threads.per.data.dir=1

Log.retention.hours=168

Log.segment.bytes=1073741824

Log.retention.check.interval.ms=300000

Log.cleaner.enable=false

Zookeeper.connect=localhost:2181

Zookeeper.connection.timeout.ms=6000

[oracle@repvm kafka_2.10-0.8.2.2] $

[oracle@repvm libs] $ll

Total 17452

-rw-r--r-- 1 oracle oinstall 53244 Aug 31 2014 jopt-simple-3.2.jar

-rw-r--r-- 1 oracle oinstall 3991269 Sep 3 2015 kafka_2.10-0.8.2.2.jar

-rw-r--r-- 1 oracle oinstall 37748 Sep 3 2015 kafka_2.10-0.8.2.2-javadoc.jar

-rw-r--r-- 1 oracle oinstall 2324165 Sep 3 2015 kafka_2.10-0.8.2.2-scaladoc.jar

-rw-r--r-- 1 oracle oinstall 521466 Sep 3 2015 kafka_2.10-0.8.2.2-sources.jar

-rw-r--r-- 1 oracle oinstall 1233391 Sep 3 2015 kafka_2.10-0.8.2.2-test.jar

-rw-r--r-- 1 oracle oinstall 324016 Sep 3 2015 kafka-clients-0.8.2.2.jar

-rw-r--r-- 1 oracle oinstall 481535 Aug 31 2014 log4j-1.2.16.jar

-rw-r--r-- 1 oracle oinstall 165505 Aug 31 2014 lz4-1.2.0.jar

-rw-r--r-- 1 oracle oinstall 82123 Aug 31 2014 metrics-core-2.2.0.jar

-rw-r--r-- 1 oracle oinstall 7126372 Nov 25 2014 scala-library-2.10.4.jar

-rw-r--r-- 1 oracle oinstall 28688 Aug 31 2014 slf4j-api-1.7.6.jar

-rw-r--r-- 1 oracle oinstall 9753 Aug 31 2014 slf4j-log4j12-1.6.1.jar

-rw-r--r-- 1 oracle oinstall 594033 May 29 2015 snappy-java-1.1.1.7.jar

-rw-r--r-- 1 oracle oinstall 64009 Aug 31 2014 zkclient-0.3.jar

-rw-r--r-- 1 oracle oinstall 792964 Aug 31 2014 zookeeper-3.4.6.jar

[oracle@repvm libs] $

Kafka-clients-0.8.2.1.jar

Lz4-1.2.0.jar

Slf4j-api-1.7.6.jar

Snappy-java-1.1.1.6.jar

[oracle@repvm bin] $pwd

/ data/kafka_2.10-0.8.2.2/bin

[oracle@repvm bin] $

Nohup sh kafka-server-start.sh.. / config/server.properties > / tmp/server.log &

Start zookeeper first:

[oracle@repvm kafka_2.10-0.8.2.2] $nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

[1] 18645

Nohup: ignoring input and appending output to `nohup.out'

[oracle@repvm kafka_2.10-0.8.2.2] $tail-f nohup.out

[2016-06-02 12 INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 purl 22 42981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 22 43013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 22 14 013] INFO minSessionTimeout set to-1 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 22 14 013] INFO maxSessionTimeout set to-1 (org.apache.zookeeper.server.ZooKeeperServer)

[2016-06-02 12 22 43035] INFO binding to port 0.0.0.0 Unix 0.0.0.0 purl 2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Start kafka:

[oracle@repvm kafka_2.10-0.8.2.2] $nohup bin/kafka-server-start.sh config/server.properties &

[1] 18845

Nohup: ignoring input and appending output to `nohup.out'

[oracle@repvm kafka_2.10-0.8.2.2] $create topic:

[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic zqtest

Created topic "zqtest".

[oracle@repvm kafka_2.10-0.8.2.2] $

View:

[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-topics.sh-- list-- zookeeper localhost:2181

Zqtest

[oracle@repvm kafka_2.10-0.8.2.2] $. / kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic zqtest

Test send message:

[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-console-producer.sh-- broker-list localhost:9092-- topic zqtest

[2016-06-02 14 ds 33 ds] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 50690

Receiving end:

[oracle@repvm kafka_2.10-0.8.2.2] $bin/kafka-console-consumer.sh-- zookeeper localhost:2181-- topic zqtest-- from-beginning ds successfully accepted!

At this point, the study on "how to send Oracle data to kafka to transmit data" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Network Security

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report