Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use ogg to transfer Oracle data to flume brush to kafka

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >

Share

Shulou(Shulou.com)06/01 Report--

This article focuses on "how to use ogg to transfer Oracle data to Flume to kafka", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to use ogg to transfer Oracle data to flume and kafka."

Source-side test server:

Server environment deployment:

The steps of the command are as follows:

[root@test ~] # groupadd oinstall

[root@test ~] # groupadd dba

[root@test] # useradd-g oinstall-G dba oracle

[root@test ~] #

Modify permissions:

[root@test] # chown-R oracle:oinstall / data

[root@test ~] #

2. Set the global java environment variable

[root@test ~] # cat / etc/redhat-release

CentOS release 6.4 (Final)

[root@test ~] #

[oracle@test data] $tar-zxvf jdk-8u60-linux-x64.tar.gz

Perform the configuration under root:

Set the java environment variable:

Vi / etc/profile

# jdk

Export JAVA_HOME=/data/jdk1.8.0_60

Export JAVA_BIN=/data/jdk1.8.0_60/bin

Export PATH=$PATH:$JAVA_HOME/bin

Export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

Export JAVA_HOME JAVA_BIN PATH CLASSPATH

Export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH

Toggle Oracle user check:

[root@test ~] # su-oracle

[oracle@test ~] $java-version

Java version "1.8.0mm 60"

Java (TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot (TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[oracle@test ~] $

If it does not work:

Modify the java environment variable:

Alternatives-install / usr/bin/java java / data/jdk1.8.0_60/bin/java 100

Alternatives-install / usr/bin/jar jar / data/jdk1.8.0_60/bin/jar 100

Alternatives-install / usr/bin/javac javac / data/jdk1.8.0_60/bin/javac 100

Update-alternatives-install / usr/bin/javac javac / data/jdk1.8.0_60/bin/javac 100

# / usr/sbin/alternatives-config java

[root@test1 data] # / usr/sbin/alternatives-- config java

There are 4 programs which provide 'java'.

Selection Command

1 / usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

* + 2 / usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

3 / usr/lib/jvm/jre-1.5.0-gcj/bin/java

4 / data/jdk1.8.0_60/bin/java

Enter to keep the current selection [+], or type selection number: 4

[root@test1 data] # / usr/sbin/alternatives-- config java

There are 4 programs which provide 'java'.

Selection Command

1 / usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java

* 2 / usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java

3 / usr/lib/jvm/jre-1.5.0-gcj/bin/java

+ 4 / data/jdk1.8.0_60/bin/java

Enter to keep the current selection [+], or type selection number:

[root@test1 data] #

[root@test1 data] # java-version

Java version "1.8.0mm 60"

Java (TM) SE Runtime Environment (build 1.8.0_60-b27)

Java HotSpot (TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

[root@test1 data] #

Modify the flume parameter configuration:

[oracle@test1 conf] $cat flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements. See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership. The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing

# software distributed under the License is distributed on an

# "ASIS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

# KIND, either express or implied. See the License for the

# specific language governing permissions and limitations

# under the License.

# The configuration file needs to define the sources

# the channels and the sinks.

# Sources, channels and sinks are defined per agent

# in this case called 'agent'

Agent.sources = R1

Agent.channels = fileChannel

Agent.sinks = kafkaSink

# For each one of the sources, the type is defined

Agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.

Agent.sources.seqGenSrc.channels = fileChannel

#

Agent.sources.r1.type = avro

Agent.sources.r1.port = 14141

Agent.sources.r1.bind = 192.168.88.66

Agent.sources.r1.channels = fileChannel

# Each sink's type must be defined

Agent.sinks.loggerSink.type = logger

# Specify the channel the sink should use

Agent.sinks.loggerSink.channel = memoryChannel

# kafka sink

Agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

Agent.sinks.kafkaSink.topic = my_schema

Agent.sinks.kafkaSink.brokerList = 192.168.88.1 purl 9092192.168.88.2 purl 9092192.168.88.3 purl 9092192.168.88.4 purl 9092

Agent.sinks.kafkaSink.requiredAcks = 1

Agent.sinks.kafkaSink.batchSize = 20

Agent.sinks.kafkaSink.channel = fileChannel

# Each channel's type is defined.

Agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel (sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

Agent.channels.memoryChannel.capacity = 100

# File Channel

Agent.channels.fileChannel.type = file

Agent.channels.fileChannel.transactionCapacity = 20000000

Agent.channels.fileChannel.capacity = 50000000

Agent.channels.fileChannel.maxFileSize = 2147483648

Agent.channels.fileChannel.minimumRequiredSpace = 52428800

Agent.channels.fileChannel.keep-alive = 3

Agent.channels.fileChannel.checkpointInterval = 20000

Agent.channels.fileChannel.checkpointDir = / data/apache-flume-1.6.0-bin/CheckpointDir

Agent.channels.fileChannel.dataDirs = / data/apache-flume-1.6.0-bin/DataDir

[oracle@test1 conf] $

# configure OGG

The main library is in

The source library creates a new extraction process:

Dblogin userid goldengate, password goldengate

Add extract EXTJMS,tranlog, threads 2,begin now

Add exttrail / data/goldengate/dirdat/kf, extract EXTJMS megabytes 200

Add schematrandata my_schema

Add trandata my_schema.*

Original extraction process:

Extract EXTJMS

Setenv (ORACLE_SID= "testdb")

Setenv (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")

Userid goldengate, password goldengate

TRANLOGOPTIONS DBLOGREADER

Exttrail / data/goldengate/dirdat/kf

Discardfile / data/goldengate/dirrpt/EXTJMS.dsc,append

THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000

Numfiles 3000

CHECKPOINTSECS 20

DISCARDROLLOVER AT 05:30

Dynamicresolution

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

Ddl &

Include mapped &

Exclude objtype 'TRIGGER' &

Exclude objtype 'PROCEDURE' &

Exclude objtype 'FUNCTION' &

Exclude objtype 'PACKAGE' &

Exclude objtype 'PACKAGE BODY' &

Exclude objtype 'TYPE' &

Exclude objtype 'GRANT' &

Exclude instr 'GRANT' &

Exclude objtype 'DATABASE LINK' &

Exclude objtype 'CONSTRAINT' &

Exclude objtype 'JOB' &

Exclude instr 'ALTER SESSION' &

Exclude INSTR'AS SELECT' &

Exclude INSTR 'REPLACE SYNONYM' &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &

EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"

FETCHOPTIONS NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT

TABLEEXCLUDE * .DBMS_TABCOMP_TEMP*

-- extract table user

TABLE my_schema.*

SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS

Database altered.

SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS

Database altered.

SQL > select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from v$database

SUPPLEME SUP SUP FOR

YES YES YES YES

SQL >

Add a new pump process on the source side:

Add the pump process in the my_schema source library test:

Add a pump process:

Add a new pump:

Add extract EDPKF,exttrailsource / data/goldengate/dirdat/kf, begin now

Edit param EDPKF

EXTRACT EDPKF

Setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)

PASSTHRU

GETUPDATEBEFORES

NOCOMPRESSUPDATES

NOCOMPRESSDELETES

RecoveryOptions OverwriteMode

RMTHOST 192.168.88.66, MGRPORT 7839

RMTTRAIL / data/ogg_for_bigdata/dirdat/kp

DISCARDFILE. / dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5

TABLE my_schema.*

Add rmttrail / data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200

Edit param defgen

Userid goldengate, password goldengate

Defsfile dirdef/my_schema.def

TABLE my_schema.*

Pass the definition file:

. / defgen paramfile. / dirprm/defgen.prm

Target end direct end

Mgr:

PORT 7839

DYNAMICPORTLIST 7840-7850

-- AUTOSTART replicat *

-- AUTORESTART replicat *, RETRIES 5 Magi WAITMINUTES 2

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10

PURGEOLDEXTRACTS / data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2

LAGREPORTHOURS 1

LAGINFOMINUTES 30

LAGCRITICALMINUTES 45

Add UE DATA PUMP:

Use version:

Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

ADD EXTRACT LOANFLM, EXTTRAILSOURCE / data/ogg_for_bigdata/dirdat/kp

Edit param JMSFLM

GGSCI (localhost.localdomain) 18 > view param JMSFLM

EXTRACT JMSFLM

SETENV (GGS_USEREXIT_CONF = "dirprm/JMSFLM.props")

GetEnv (JAVA_HOME)

GetEnv (PATH)

GetEnv (LD_LIBRARY_PATH)

SourceDefs dirdef/my_schema.def

CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores

GetUpdateBefores

NoCompressDeletes

NoCompressUpdates

NoTcpSourceTimer

TABLEEXCLUDE my_schema.MV*

TABLE my_schema.*

-- alter prodjms extseqno 736, extrba 0

Note: there is no need to install the Oracle database on the target side, you can put it together with the flume environment, and finally swipe the data to the kafka server to receive messages.

This case is achieved through flume transit, no problem at all.

Of course, you can also transfer data directly to kafka to process messages, the principle is the same.

More big data integration in the future is also a good plan, whether it is mysql,mongodb,hdfs and so on can be a perfect combination.

Parameter file:

$cat JMSFLM.props

Gg.handlerlist=flumehandler

Gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler

Gg.handler.flumehandler.host=192.168.88.66

Gg.handler.flumehandler.port=14141

Gg.handler.flumehandler.rpcType=avro

Gg.handler.flumehandler.delimiter=\ u0001

Gg.handler.flumehandler.mode=op

Gg.handler.flumehandler.includeOpType=true

# Indicates if the operation timestamp should be included as part of output in the delimited separated values

# true-Operation timestamp will be included in the output

# false-Operation timestamp will not be included in the output

# Default:-true

# gg.handler.flumehandler.includeOpTimestamp=true

# gg.handler.name.deleteOpKey=D

# gg.handler.name.updateOpKey=U

# gg.handler.name.insertOpKey=I

# gg.handler.name.pKUpdateOpKey=P

# gg.handler.name.includeOpType=true

# Optional properties to use the transaction grouping functionality

# gg.handler.flumehandler.maxGroupSize=1000

# gg.handler.flumehandler.minGroupSize=1000

# native library config #

Goldengate.userexit.nochkpt=TRUE

Goldengate.userexit.timestamp=utc

Goldengate.log.logname=cuserexit

Goldengate.log.level=DEBUG

Goldengate.log.tofile=true

Goldengate.userexit.writers=javawriter

Goldengate.log.level.JAVAUSEREXIT=DEBUG

# gg.brokentrail=true

Gg.report.time=30sec

Gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*

Javawriter.stats.full=TRUE

Javawriter.stats.display=TRUE

Javawriter.bootoptions=-Xmx81920m-Xms20480m-Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar-Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties

At this point, I believe you have a deeper understanding of "how to use ogg to transfer Oracle data to Flume to kafka". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Network Security

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report