In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
How to run KafkaWordCount, aiming at this problem, this article introduces the corresponding analysis and answer in detail, hoping to help more partners who want to solve this problem to find a more simple and easy way.
Summary
Spark application development is very practical, most of the time may be spent on the construction and operation of the environment, if there is a good guidance will greatly shorten the application development process. Spark Streaming involves integration with many third-party programs, and the documentation on how the examples in the source code really work is neither numerous nor detailed.
The following is mainly about how to run KafkaWordCount, which involves the construction of Kafka clusters, or the more careful the better.
Set up Kafka cluster
Step 1: download kafka 0.8.1 and extract it
Wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
Tar zvxf kafka_2.10-0.8.1.1.tgz
Cd kafka_2.10-0.8.1.1
Step 2: start zookeeper
Bin/zookeeper-server-start.sh config/zookeeper.properties
Step 3: modify the configuration file config/server.properties by adding the following
Host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName ().
Advertised.host.name=localhost
Step 4: start Kafka server
Bin/kafka-server-start.sh config/server.properties
Step 5: create a topic
Bin/kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic test
Verify that the topic is created successfully
Bin/kafka-topics.sh-list-zookeeper localhost:2181
If test is returned normally
Step 6: open producer and send a message
Bin/kafka-console-producer.sh-broker-list localhost:9092-topic test
# # after starting successfully, enter the following content to test
This is a message
This is another message
Step 7: open consumer and receive messages
Bin/kafka-console-consumer.sh-zookeeper localhost:2181-topic test-from-beginning
# after the startup is successful, the input on the producer side will be displayed if everything is normal.
This is a message
This is another message
Run KafkaWordCount
KafkaWordCount source file location examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
Although there are instructions, see below, but if you do not have some prior knowledge of Kafka, you definitely do not know what these parameters mean or how to fill in them.
/ * *
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount
* is a list of one or more zookeeper servers that make quorum
* is the name of kafka consumer group
* is a list of one or more kafka topics to consume from
* is the number of threads the kafka consumer should use
*
* Example:
* `$ bin/run-example\
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03\
* my-consumer-group topic1,topic2 1`
, /
Object KafkaWordCount {
Def main (args: Array [String]) {
If (args.length
< 4) { System.err.println("Usage: KafkaWordCount ") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x =>(X, 1L))
.reduceByKeyAndWindow (_ + _, _-_, Minutes (10), Seconds (2), 2)
WordCounts.print ()
Ssc.start ()
Ssc.awaitTermination ()
}
}
Take a look at how to run KafkaWordCount
Step 1: stop running kafka-console-producer and kafka-console-consumer
Step 2: run KafkaWordCountProducer
Bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
Explain the meaning of the parameter. Localhost:9092 indicates the address and port of producer, test indicates how many messages are sent per second, and 5 indicates how many words there are in each message.
Step 3: run KafkaWordCount
Bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
Explain the parameters. Localhost:2181 represents the listening address of zookeeper, test-consumer-group represents the name of consumer-group, which must be consistent with the configuration of group.id in $KAFKA_HOME/config/consumer.properties, and test represents topic,1 indicates the number of threads.
This is the answer to the question about how to run KafkaWordCount. I hope the above content can be of some help to you. If you still have a lot of doubts to solve, you can follow the industry information channel to learn more about it.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.