In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces how to use Scala language to develop Spark applications, the article is very detailed, has a certain reference value, interested friends must read it!
The Spark kernel is developed by the Scala language, so it is natural to use Scala to develop Spark applications. If you are not familiar with the Scala language, you can read the online tutorial A Scala Tutorial for Java Programmers or related Scala books to learn.
This article will introduce three Scala Spark programming examples, namely WordCount, TopK and SparkJoin, which represent three typical applications of Spark.
1. WordCount programming example
WordCount is the simplest distributed application example. The main function is to count the total number of occurrences of all words in the input directory. The steps are as follows:
Step 1: create a SparkContext object with four parameters: Spark master location, application name, Spark installation directory and jar location. For Spark On YARN, the first two parameters are the most important. The * parameter is specified as "yarn-standalone", and the second parameter is a custom string, as an example:
Val sc = new SparkContext (args (0), "WordCount", System.getenv ("SPARK_HOME"), Seq (System.getenv ("SPARK_TEST_JAR")
Step 2: read the input data. To read text data from HDFS, you can use SparkCon
Val textFile = sc.textFile (args (1))
Of course, Spark allows you to use any Hadoop InputFormat, such as the binary input format SequenceFileInputFormat, and you can use the hadoopRDD function in SparkContext, as an example:
Val inputFormatClass = classOf [SequenceFileInputFormat [Text,Text]] var hadoopRdd = sc.hadoopRDD (conf, inputFormatClass, classOf [Text], classOf [Text])
Or create a HadoopRDD object directly:
Var hadoopRdd = new HadoopRDD (sc, conf, classOf [SequenceFileInputFormat [Text,Text, classOf [Text], classOf [Text])
Val result = hadoopRdd.flatMap {case (key, value) = > value.toString (). Split ("\\ s +");} .map (word = > (word, 1)). ReduceByKey (_ + _)
Among them, flatMap function can convert a record into multiple records (one-to-many relationship), map function converts one record into another record (one-to-one relationship), reduceByKey function divides the data with the same key into a bucket and calculates them in groups with key as units. The specific meaning of these functions can be referred to: Spark Transformation.
Step 4: save the resulting RDD dataset to HDFS. You can use the number of saveAsTextFile hats in SparkContext to save the dataset to the HDFS directory. By default, the TextOutputFormat provided by Hadoop is used, and each record is printed in the form of "(key,value)". You can also use the saveAsSequenceFile function to save the data in SequenceFile format, as an example:
Result.saveAsSequenceFile (args (2))
Of course, generally speaking, when we write Spark programs, we need to include the following two header files:
Import org.apache.spark._ import SparkContext._
The complete WordCount program has been introduced in the article "Apache Spark Learning: building a Spark Integrated Development Environment with Eclipse", which will not be repeated again.
It should be noted that when specifying input and output files, you need to specify the URI of hdfs, such as the input directory is hdfs://hadoop-test/tmp/input, and the output directory is hdfs://hadoop-test/tmp/output, in which "hdfs://hadoop-test" is specified by the parameter fs.default.name in the Hadoop configuration file core- site.xml, which can be replaced with your configuration.
2. TopK programming example
The task of the TopK program is to count the word frequency of a pile of text and return K words with a frequency of occurrence. If you use MapReduce implementation, you need to write two assignments: WordCount and TopK, while using Spark requires only one job, of which the WordCount part has been implemented previously, and then follow the previous implementation to find the word TopK. Note that the implementation of this article is not * *, there is a lot of room for improvement.
Step 1: first, all words need to be sorted by word frequency, as follows:
Val sorted = result.map {case (key, value) = > (value, key); / / exchange key and value} .sortByKey (true, 1)
Step 2: return to the first K:
Val topK = sorted.top (args (3) .toInt)
Step 3: print out the K words:
TopK.foreach (println)
Note that for the content of the application's standard output, YARN will be saved to the stdout log of Container. In YARN, there are three log files for each Container, namely stdout, stderr, and syslog. The first two save the content generated by standard output, and the third saves the log printed by log4j, which usually contains only the third log.
The complete code of this program, the compiled jar package and the running script can be downloaded from here. After downloading, follow the operation flow of "Apache Spark Learning: building a Spark Integrated Development Environment with Eclipse".
3. SparkJoin programming example
In the recommended field, there is a famous open test set given by movielens. The download link is: http://grouplens.org/datasets/movielens/, the test set contains three files, namely ratings.dat, sers.dat, and movies.dat. The specific introduction can be read: README.txt. The SparkJoin example given in this section gets a list of movies with an average score of more than 4.0 by connecting ratings.dat and movies.dat files, using the dataset: ml-1m. The program code is as follows:
Import org.apache.spark._ import SparkContext._ object SparkJoin {def main (args: Array [String]) {if (args.length! = 4) {println ("usage is org.test.WordCount") return} val sc = new SparkContext (args (0), "WordCount", System.getenv ("SPARK_HOME") Seq (System.getenv ("SPARK_TEST_JAR")) / / Read rating from HDFS fileval textFile = sc.textFile (args (1)) / / extract (movieid, rating) val rating = textFile.map (line = > {val fileds = line.split ("::") (fileds (1). ToInt Fileds (2) .toDouble}) val movieScores = rating .groupByKey () .map (data = > {val avg = data._2.sum / data._2.size (data._1) Avg)}) / / Read movie from HDFS fileval movies = sc.textFile (args (2)) val movieskey = movies.map (line = > {val fileds = line.split ("::") (fileds (0). ToInt, fileds (1)}) .keyby (tup = > tup._1) / / by join We get val result = movieScores .keyby (tup = > tup._1) .join (movieskey) .filter (f = > f.room2.room1.room2 > 4.0) .map (f = > (f.room1, f.room2.room1.class2) .map ) result.saveAsTextFile (args (3))}} these are all the contents of the article "how to develop Spark applications using Scala language" Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.