Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to run KafkaWordCount

2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

How to run KafkaWordCount, aiming at this problem, this article introduces the corresponding analysis and answer in detail, hoping to help more partners who want to solve this problem to find a more simple and easy way.

Summary

Spark application development is very practical, most of the time may be spent on the construction and operation of the environment, if there is a good guidance will greatly shorten the application development process. Spark Streaming involves integration with many third-party programs, and the documentation on how the examples in the source code really work is neither numerous nor detailed.

The following is mainly about how to run KafkaWordCount, which involves the construction of Kafka clusters, or the more careful the better.

Set up Kafka cluster

Step 1: download kafka 0.8.1 and extract it

Wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

Tar zvxf kafka_2.10-0.8.1.1.tgz

Cd kafka_2.10-0.8.1.1

Step 2: start zookeeper

Bin/zookeeper-server-start.sh config/zookeeper.properties

Step 3: modify the configuration file config/server.properties by adding the following

Host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the

# value for "host.name" if configured. Otherwise, it will use the value returned from

# java.net.InetAddress.getCanonicalHostName ().

Advertised.host.name=localhost

Step 4: start Kafka server

Bin/kafka-server-start.sh config/server.properties

Step 5: create a topic

Bin/kafka-topics.sh-create-zookeeper localhost:2181-replication-factor 1-partitions 1-topic test

Verify that the topic is created successfully

Bin/kafka-topics.sh-list-zookeeper localhost:2181

If test is returned normally

Step 6: open producer and send a message

Bin/kafka-console-producer.sh-broker-list localhost:9092-topic test

# # after starting successfully, enter the following content to test

This is a message

This is another message

Step 7: open consumer and receive messages

Bin/kafka-console-consumer.sh-zookeeper localhost:2181-topic test-from-beginning

# after the startup is successful, the input on the producer side will be displayed if everything is normal.

This is a message

This is another message

Run KafkaWordCount

KafkaWordCount source file location examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

Although there are instructions, see below, but if you do not have some prior knowledge of Kafka, you definitely do not know what these parameters mean or how to fill in them.

/ * *

* Consumes messages from one or more topics in Kafka and does wordcount.

* Usage: KafkaWordCount

* is a list of one or more zookeeper servers that make quorum

* is the name of kafka consumer group

* is a list of one or more kafka topics to consume from

* is the number of threads the kafka consumer should use

*

* Example:

* `$ bin/run-example\

* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03\

* my-consumer-group topic1,topic2 1`

, /

Object KafkaWordCount {

Def main (args: Array [String]) {

If (args.length

< 4) { System.err.println("Usage: KafkaWordCount ") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x =>

(X, 1L))

.reduceByKeyAndWindow (_ + _, _-_, Minutes (10), Seconds (2), 2)

WordCounts.print ()

Ssc.start ()

Ssc.awaitTermination ()

}

}

Take a look at how to run KafkaWordCount

Step 1: stop running kafka-console-producer and kafka-console-consumer

Step 2: run KafkaWordCountProducer

Bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5

Explain the meaning of the parameter. Localhost:9092 indicates the address and port of producer, test indicates how many messages are sent per second, and 5 indicates how many words there are in each message.

Step 3: run KafkaWordCount

Bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1

Explain the parameters. Localhost:2181 represents the listening address of zookeeper, test-consumer-group represents the name of consumer-group, which must be consistent with the configuration of group.id in $KAFKA_HOME/config/consumer.properties, and test represents topic,1 indicates the number of threads.

This is the answer to the question about how to run KafkaWordCount. I hope the above content can be of some help to you. If you still have a lot of doubts to solve, you can follow the industry information channel to learn more about it.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report