In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >
Share
Shulou(Shulou.com)06/01 Report--
This article focuses on "how to use ogg to transfer Oracle data to Flume to kafka", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to use ogg to transfer Oracle data to flume and kafka."
Source-side test server:
Server environment deployment:
The steps of the command are as follows:
[root@test ~] # groupadd oinstall
[root@test ~] # groupadd dba
[root@test] # useradd-g oinstall-G dba oracle
[root@test ~] #
Modify permissions:
[root@test] # chown-R oracle:oinstall / data
[root@test ~] #
2. Set the global java environment variable
[root@test ~] # cat / etc/redhat-release
CentOS release 6.4 (Final)
[root@test ~] #
[oracle@test data] $tar-zxvf jdk-8u60-linux-x64.tar.gz
Perform the configuration under root:
Set the java environment variable:
Vi / etc/profile
# jdk
Export JAVA_HOME=/data/jdk1.8.0_60
Export JAVA_BIN=/data/jdk1.8.0_60/bin
Export PATH=$PATH:$JAVA_HOME/bin
Export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
Export JAVA_HOME JAVA_BIN PATH CLASSPATH
Export LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH
Toggle Oracle user check:
[root@test ~] # su-oracle
[oracle@test ~] $java-version
Java version "1.8.0mm 60"
Java (TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot (TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
[oracle@test ~] $
If it does not work:
Modify the java environment variable:
Alternatives-install / usr/bin/java java / data/jdk1.8.0_60/bin/java 100
Alternatives-install / usr/bin/jar jar / data/jdk1.8.0_60/bin/jar 100
Alternatives-install / usr/bin/javac javac / data/jdk1.8.0_60/bin/javac 100
Update-alternatives-install / usr/bin/javac javac / data/jdk1.8.0_60/bin/javac 100
# / usr/sbin/alternatives-config java
[root@test1 data] # / usr/sbin/alternatives-- config java
There are 4 programs which provide 'java'.
Selection Command
1 / usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
* + 2 / usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
3 / usr/lib/jvm/jre-1.5.0-gcj/bin/java
4 / data/jdk1.8.0_60/bin/java
Enter to keep the current selection [+], or type selection number: 4
[root@test1 data] # / usr/sbin/alternatives-- config java
There are 4 programs which provide 'java'.
Selection Command
1 / usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
* 2 / usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
3 / usr/lib/jvm/jre-1.5.0-gcj/bin/java
+ 4 / data/jdk1.8.0_60/bin/java
Enter to keep the current selection [+], or type selection number:
[root@test1 data] #
[root@test1 data] # java-version
Java version "1.8.0mm 60"
Java (TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot (TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
[root@test1 data] #
Modify the flume parameter configuration:
[oracle@test1 conf] $cat flume-conf.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing
# software distributed under the License is distributed on an
# "ASIS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources
# the channels and the sinks.
# Sources, channels and sinks are defined per agent
# in this case called 'agent'
Agent.sources = R1
Agent.channels = fileChannel
Agent.sinks = kafkaSink
# For each one of the sources, the type is defined
Agent.sources.seqGenSrc.type = seq
# The channel can be defined as follows.
Agent.sources.seqGenSrc.channels = fileChannel
#
Agent.sources.r1.type = avro
Agent.sources.r1.port = 14141
Agent.sources.r1.bind = 192.168.88.66
Agent.sources.r1.channels = fileChannel
# Each sink's type must be defined
Agent.sinks.loggerSink.type = logger
# Specify the channel the sink should use
Agent.sinks.loggerSink.channel = memoryChannel
# kafka sink
Agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
Agent.sinks.kafkaSink.topic = my_schema
Agent.sinks.kafkaSink.brokerList = 192.168.88.1 purl 9092192.168.88.2 purl 9092192.168.88.3 purl 9092192.168.88.4 purl 9092
Agent.sinks.kafkaSink.requiredAcks = 1
Agent.sinks.kafkaSink.batchSize = 20
Agent.sinks.kafkaSink.channel = fileChannel
# Each channel's type is defined.
Agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel (sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
Agent.channels.memoryChannel.capacity = 100
# File Channel
Agent.channels.fileChannel.type = file
Agent.channels.fileChannel.transactionCapacity = 20000000
Agent.channels.fileChannel.capacity = 50000000
Agent.channels.fileChannel.maxFileSize = 2147483648
Agent.channels.fileChannel.minimumRequiredSpace = 52428800
Agent.channels.fileChannel.keep-alive = 3
Agent.channels.fileChannel.checkpointInterval = 20000
Agent.channels.fileChannel.checkpointDir = / data/apache-flume-1.6.0-bin/CheckpointDir
Agent.channels.fileChannel.dataDirs = / data/apache-flume-1.6.0-bin/DataDir
[oracle@test1 conf] $
# configure OGG
The main library is in
The source library creates a new extraction process:
Dblogin userid goldengate, password goldengate
Add extract EXTJMS,tranlog, threads 2,begin now
Add exttrail / data/goldengate/dirdat/kf, extract EXTJMS megabytes 200
Add schematrandata my_schema
Add trandata my_schema.*
Original extraction process:
Extract EXTJMS
Setenv (ORACLE_SID= "testdb")
Setenv (NLS_LANG= "AMERICAN_AMERICA.AL32UTF8")
Userid goldengate, password goldengate
TRANLOGOPTIONS DBLOGREADER
Exttrail / data/goldengate/dirdat/kf
Discardfile / data/goldengate/dirrpt/EXTJMS.dsc,append
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
Numfiles 3000
CHECKPOINTSECS 20
DISCARDROLLOVER AT 05:30
Dynamicresolution
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
Ddl &
Include mapped &
Exclude objtype 'TRIGGER' &
Exclude objtype 'PROCEDURE' &
Exclude objtype 'FUNCTION' &
Exclude objtype 'PACKAGE' &
Exclude objtype 'PACKAGE BODY' &
Exclude objtype 'TYPE' &
Exclude objtype 'GRANT' &
Exclude instr 'GRANT' &
Exclude objtype 'DATABASE LINK' &
Exclude objtype 'CONSTRAINT' &
Exclude objtype 'JOB' &
Exclude instr 'ALTER SESSION' &
Exclude INSTR'AS SELECT' &
Exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "my_schema.DBMS_TABCOMP_TEMP_UNCMP"
FETCHOPTIONS NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORT
TABLEEXCLUDE * .DBMS_TABCOMP_TEMP*
-- extract table user
TABLE my_schema.*
SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS
Database altered.
SQL > ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS
Database altered.
SQL > select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI, FORCE_LOGGING from v$database
SUPPLEME SUP SUP FOR
YES YES YES YES
SQL >
Add a new pump process on the source side:
Add the pump process in the my_schema source library test:
Add a pump process:
Add a new pump:
Add extract EDPKF,exttrailsource / data/goldengate/dirdat/kf, begin now
Edit param EDPKF
EXTRACT EDPKF
Setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
RMTHOST 192.168.88.66, MGRPORT 7839
RMTTRAIL / data/ogg_for_bigdata/dirdat/kp
DISCARDFILE. / dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5
TABLE my_schema.*
Add rmttrail / data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200
Edit param defgen
Userid goldengate, password goldengate
Defsfile dirdef/my_schema.def
TABLE my_schema.*
Pass the definition file:
. / defgen paramfile. / dirprm/defgen.prm
Target end direct end
Mgr:
PORT 7839
DYNAMICPORTLIST 7840-7850
-- AUTOSTART replicat *
-- AUTORESTART replicat *, RETRIES 5 Magi WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS / data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
Add UE DATA PUMP:
Use version:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209
ADD EXTRACT LOANFLM, EXTTRAILSOURCE / data/ogg_for_bigdata/dirdat/kp
Edit param JMSFLM
GGSCI (localhost.localdomain) 18 > view param JMSFLM
EXTRACT JMSFLM
SETENV (GGS_USEREXIT_CONF = "dirprm/JMSFLM.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/my_schema.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE my_schema.MV*
TABLE my_schema.*
-- alter prodjms extseqno 736, extrba 0
Note: there is no need to install the Oracle database on the target side, you can put it together with the flume environment, and finally swipe the data to the kafka server to receive messages.
This case is achieved through flume transit, no problem at all.
Of course, you can also transfer data directly to kafka to process messages, the principle is the same.
More big data integration in the future is also a good plan, whether it is mysql,mongodb,hdfs and so on can be a perfect combination.
Parameter file:
$cat JMSFLM.props
Gg.handlerlist=flumehandler
Gg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandler
Gg.handler.flumehandler.host=192.168.88.66
Gg.handler.flumehandler.port=14141
Gg.handler.flumehandler.rpcType=avro
Gg.handler.flumehandler.delimiter=\ u0001
Gg.handler.flumehandler.mode=op
Gg.handler.flumehandler.includeOpType=true
# Indicates if the operation timestamp should be included as part of output in the delimited separated values
# true-Operation timestamp will be included in the output
# false-Operation timestamp will not be included in the output
# Default:-true
# gg.handler.flumehandler.includeOpTimestamp=true
# gg.handler.name.deleteOpKey=D
# gg.handler.name.updateOpKey=U
# gg.handler.name.insertOpKey=I
# gg.handler.name.pKUpdateOpKey=P
# gg.handler.name.includeOpType=true
# Optional properties to use the transaction grouping functionality
# gg.handler.flumehandler.maxGroupSize=1000
# gg.handler.flumehandler.minGroupSize=1000
# native library config #
Goldengate.userexit.nochkpt=TRUE
Goldengate.userexit.timestamp=utc
Goldengate.log.logname=cuserexit
Goldengate.log.level=DEBUG
Goldengate.log.tofile=true
Goldengate.userexit.writers=javawriter
Goldengate.log.level.JAVAUSEREXIT=DEBUG
# gg.brokentrail=true
Gg.report.time=30sec
Gg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*
Javawriter.stats.full=TRUE
Javawriter.stats.display=TRUE
Javawriter.bootoptions=-Xmx81920m-Xms20480m-Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar-Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
At this point, I believe you have a deeper understanding of "how to use ogg to transfer Oracle data to Flume to kafka". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.