In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces what the real-time streaming process based on Flume+Kafka+Spark-Streaming is like, the content is very detailed, interested friends can refer to, hope to be helpful to you.
Complete flow of real-time streaming processing based on Flume+Kafka+Spark-Streaming
1. Environment preparation, four test servers
Three spark clusters, spark1,spark2,spark3
Three kafka clusters, spark1,spark2,spark3
Three zookeeper clusters, spark1,spark2,spark3
Log receiving server, spark1
Log collection server, redis (this machine is used for redis development, now used for log collection testing, the host name will not be changed)
Log collection process:
Log Collection Server-> Log receiver Server-> kafka Cluster-> spark Cluster processing
Description: log collection server, in the actual production is likely to be an application system server, log receiving server is one of the big data servers, logs are transmitted to the log receiving server through the network, and then processed in the cluster.
Because, in the production environment, the network is often only one-way open to a certain port of a server to access.
Flume version: apache-flume-1.5.0-cdh6.4.9, which has better integrated support for kafka
2. Log collection server (summary side)
Configure flume to dynamically collect specific logs. The collect.conf configuration is as follows:
# Name the components on this agenta1.sources = tailsource-1a1.sinks = remotesinka1.channels = memoryChnanel-1# Describe/configure the sourcea1.sources.tailsource-1.type = execa1.sources.tailsource-1.command = tail-F / opt/modules/tmpdata/logs/1.loga1.sources.tailsource-1.channels = memoryChnanel-1# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.memoryChnanel-1.type = memorya1.channels.memoryChnanel-1.keep-alive = 10a1.channels .memoryChnanel-1.capacity = 100000a1.channels.memoryChnanel-1.transactionCapacity = 10000 million Bind the source and sink to the channela1.sinks.remotesink.type = avroa1.sinks.remotesink.hostname = spark1a1.sinks.remotesink.port = 666a1.sinks.remotesink.channel = memoryChnanel-1
After the log is monitored in real time, it is transferred to port 666 of the spark1 server through the network avro type.
Start the log collection script:
Bin/flume-ng agent-conf conf--conf-file conf/collect.conf-name A1-Dflume.root.logger=INFO,console
3. Log receiving server
Configure flume to receive logs in real time. Collect.conf is configured as follows:
# agent section producer.sources = s producer.channels = cproducer.sinks = r # source section producer.sources.s.type = avroproducer.sources.s.bind = spark1producer.sources.s.port = 666producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSinkproducer.sinks.r.topic = mytopicproducer.sinks.r.brokerList = spark1:9092,spark2:9092 Spark3:9092producer.sinks.r.requiredAcks = 1producer.sinks.r.batchSize = 20producer.sinks.r.channel = C1 # Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. Producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannelproducer.channels.c.capacity = 10000producer.channels.c.transactionCapacity = 1000producer.channels.c.brokerListspark1pur9092Park2VOListspark2Park2Park2181Park2Junction 2181Park2Park2Junction 2181Park2Park2Park2181
The key is to specify that if the source is the data from the receiving network port, and enter the cluster of kafka, you need to configure the address of topic and zk.
Start the receiver script:
Bin/flume-ng agent-conf conf--conf-file conf/receive.conf-name producer-Dflume.root.logger=INFO,console
4. Spark cluster processes received data
Import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.StreamingContextimport kafka.serializer.StringDecoderimport scala.collection.immutable.HashMapimport org.apache.log4j.Levelimport org.apache.log4j.Logger/** * @ author Administrator * / object KafkaDataTest {def main (args: Array [String]): Unit = {Logger.getLogger ("org.apache.spark") .setLevel (Level.WARN) Logger.getLogger ("org.eclipse.jetty.server") .setLevel (Level.ERROR) Val conf = new SparkConf (). SetAppName ("stocker"). SetMaster ("local [2]") val sc = new SparkContext (conf) val ssc = new StreamingContext (sc, Seconds (1)) / / Kafka configurations val topics = Set ("mytopic") val brokers = "spark1:9092,spark2:9092,spark3:9092" val kafkaParams = Map [String, String] ("metadata.broker.list"-> brokers "serializer.class"-> "kafka.serializer.StringEncoder") / / Create a direct stream val kafkaStream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topics) val urlClickLogPairsDStream = kafkaStream.flatMap (_ _ 2.split ("). Map ((, 1)) val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow ((v1: Int, v2: Int) = > {v1 + v2}, Seconds (60), Seconds (5)) UrlClickCountDaysDStream.print (); ssc.start () ssc.awaitTermination ()}}
Spark-streaming receives the data from the kafka cluster and calculates the wordcount value within 60s every 5s.
5. Test results
Add logs to the past log three times in turn
The result of spark-streaming processing is as follows:
(hive,1)
(spark,2)
(hadoop,2)
(storm,1)
-
(hive,1)
(spark,3)
(hadoop,3)
(storm,1)
-
(hive,2)
(spark,5)
(hadoop,5)
(storm,2)
As expected, it fully embodies the characteristics of spark-streaming sliding window.
On the Flume+Kafka+Spark-Streaming-based real-time streaming process is shared here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.