In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
The following is to sort out the various collection methods of flume code for direct use
First, the source type is netcat
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = netcat
A1.sources.r1.bind = linux1
A1.sources.r1.port = 666,
A1.sinks.k1.type = logger
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
Command. / flume-ng agent-n A1-f.. / conf/netcat.conf-Dflume.root.logger=INFO,console
Second, the source type is spooldir
A1.sources = R1 # # name the source of agent
A1.sinks = K1 # # name the sinks of agent
A1.channels = C1 # # name the channels of agent
A1.sources.r1.type = spooldir # # folder
A1.sources.r1.spoolDir = / root/flume # # the directory to be collected
A1.sources.r1.fileHeader = true # # do you need to add a suffix to the collected files
A1.sinks.k1.type = logger
A1.channels.c1.type = memory # # put cached data in memory
A1.channels.c1.capacity = 1000 # # how many events can be stored in the pipeline
A1.channels.c1.transactionCapacity = 100 # # how many events are most received at a time
A1.sources.r1.channels = C1 # # connect source to channels
A1.sinks.k1.channel = C1 # # connect sinks to channel
Command:... / bin/flume-ng agent-n A1-f.. / conf/spooldir.conf-Dflume.root.logger=INFO,console
Third, the source type is avro
This configuration source is of type avro, is a server, this server
Open a port 8088 to receive data
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = avro
A1.sources.r1.bind = linux1 # # the ip of the current machine
A1.sources.r1.port = 8088
A1.sinks.k1.type = logger
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
Command:... / bin/flume-ng agent-n A1-f.. / conf/server.conf-Dflume.root.logger=INFO,console
4. Source is of socket type
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = syslogtcp
A1.sources.r1.bind=linux1
A1.sources.r1.port=8080
A1.sinks.k1.type = logger
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
The type of sink is avro
The data sender is a client whose purpose is to send data
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = netcat
A1.sources.r1.bind = linux2 # # the ip of the current machine
A1.sources.r1.port = 666,
A1.sinks.k1.type = avro
A1.sinks.k1.hostname = linux1
A1.sinks.k1.port = 8088
A1.sinks.k1.batch-size = 2
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
. / bin/flume-ng agent-n A1-f. / conf/client.conf-Dflume.root.logger=INFO,console
6. The sink type is two avro
A1.sources = R1 R2
A1.sinks = K1 K2
A1.channels = C1 c2
A1.sources.r1.type = netcat
A1.sources.r1.bind = linux2
A1.sources.r1.port = 666,
A1.sources.r2.type = netcat
A1.sources.r2.bind = linux2
A1.sources.r2.port = 777
A1.sinks.k1.type = avro
A1.sinks.k1.hostname = linux1
A1.sinks.k1.port = 8088
A1.sinks.k1.batch-size = 2
A1.sinks.k2.type = avro
A1.sinks.k2.hostname = linux1
A1.sinks.k2.port = 8088
A1.sinks.k2.batch-size = 2
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.channels.c2.type = memory
A1.channels.c2.capacity = 1000
A1.channels.c2.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
A1.sources.r2.channels = c2
A1.sinks.k2.channel = c2
7. Sink type is hdfs
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = netcat
A1.sources.r1.bind = linux1
A1.sources.r1.port = 666,
A1.sinks.k1.type = hdfs # sink to hdfs
A1.sinks.k1.hdfs.path = / flume/events/%y-%m-%d/%H%M/
# # default value of filePrefix: FlumeData
# # write the file name prefix of hdfs. You can use the date provided by flume and the% {host} expression.
A1.sinks.k1.hdfs.filePrefix = events-
# # default value: 30
# # how long is the interval between hdfs sink to scroll a temporary file to the final target file (in seconds)
# # if set to 0, the file is not scrolled according to time
# Note: scrolling (roll) means that hdfs sink renames a temporary file to the final target file
# and open a new temporary file to write data
A1.sinks.k1.hdfs.rollInterval = 30
# # default value: 1024
# # when the temporary file reaches this size (in bytes), scroll to the target file
# # if set to 0, the file is not scrolled according to the temporary file size
A1.sinks.k1.hdfs.rollSize = 0
# # default value: 10
# # when the events data reaches this amount, scroll the temporary file to the target file
# # if set to 0, the file is not scrolled according to events data
A1.sinks.k1.hdfs.rollCount = 0
# # default value of batchSize: 100
# # number of events refreshed to HDFS in each batch
A1.sinks.k1.hdfs.batchSize = 1
# # useLocalTimeStamp
# # default value: flase
# # whether to use local time.
A1.sinks.k1.hdfs.useLocalTimeStamp = true
# the file type generated is Sequencefile by default. If DataStream is available, it is plain text.
A1.sinks.k1.hdfs.fileType = DataStream
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
8. Sink type is kafka type
A1.sources = R1
A1.sinks = K1
A1.channels = C1
A1.sources.r1.type = netcat
A1.sources.r1.bind = linux1
A1.sources.r1.port = 666,
A1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
A1.sinks.k1.topic = Hellokafka
A1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
A1.sinks.k1.requiredAcks = 1
A1.sinks.k1.batchSize = 20
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
9. Source is mysql
A1.sources = R1
A1.sinks = K1
A1.channels = C1
# sources#
R1
A1.sources.r1.type = org.keedio.flume.source.SQLSource
A1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
A1.sources.r1.hibernate.connection.user = root
A1.sources.r1.hibernate.connection.password = 123456
A1.sources.r1.hibernate.connection.autocommit = true
A1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
A1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
A1.sources.r1.run.query.delay=10000
A1.sources.r1.status.file.path = / root/data/flume/
A1.sources.r1.status.file.name = sqlSource.status
A1.sources.r1.start.from = 0
A1.sources.r1.custom.query = select id,userName from user where id > $@ $order by id asc
A1.sources.r1.batch.size = 1000
A1.sources.r1.max.rows = 1000
A1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
A1.sources.r1.hibernate.c3p0.min_size=1
A1.sources.r1.hibernate.c3p0.max_size=10
A1.sinks.k1.type = logger
A1.channels.c1.type = memory
A1.channels.c1.capacity = 1000
A1.channels.c1.transactionCapacity = 100
A1.sources.r1.channels = C1
A1.sinks.k1.channel = C1
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.