In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
In-depth Core of Spark (2)
Tags (separated by spaces): portions of Spark
1: optimization of log cleaning 2: Spark RDD 3: SparkContext three major functions 4: Spark on YARN 5: spark RDD dependence 1, log cleaning optimization: 1.1 log cleaning dirty data problems hdfs dfs-mkdir / apachelog/hdfs dfs-put access_log / apachelogshdfs dfs-ls / apachelogs
An error was reported in the execution result.
LogAnalyzer.scalapackage com.ibeifeng.bigdata.spark.app.coreimport org.apache.spark. {SparkContext SparkConf} / * Created by zhangyy on 2016-7-16. * / object LogAnalyzer {def main (args: Array [String]) {/ / step 0: SparkContext val sparkConf = new SparkConf () .setAppName ("LogAnalyzer Applicaiton") / / name .setMaster ("local [2]") / /-- master local [2] | spark://xx:7077 | yarn / / Create SparkContext val sc = new SparkContext (sparkConf) / * * = / val logFile = "/ apachelogs/access_log" / / step 1: input data val accessLogs = sc.textFile (logFile) / / filer logs data .filter (ApacheAccessLog.isValidateLogLine) / / closures / * parse log * / .map (line = > ApacheAccessLog.parseLogLine (line)) / * The average Min, and max content size of responses returned from the server. * / val contentSizes = accessLogs.map (log = > log.contentSize) / / computeval avgContentSize = contentSizes.reduce (_ + _) / contentSizes.count () val minContentSize = contentSizes.min () val maxContentSize = contentSizes.max () / / println printf ("ContentSize Avg:% s, Min:% s, Max:% s" .format (avgContentSize, minContentSize MaxContentSize)) / * * A count of response code's returned * / val responseCodeToCount = accessLogs .map (log = > (log.responseCode, 1)) .reduceByKey (_ + _) .take (3) println (s "Response Code Count: ${responseCodeToCount.mkString (") ")}") / * All IPAddresses that have accessed this server more than N times * / val ipAddresses = accessLogs .map (log = > (log.ipAddress, 1)) .reduceByKey (_ + _) / / .filter (x = > (x.room2 > 10)) .take (5) println (s" IPAddress: ${ipAddresses.mkString (")
< ", ", " ," >")}") / * * The top endpoints requested by count * / val topEndpoints = accessLogs .map (log = > (log.endPoint, 1)) .reduceByKey (_ + _) .top (3) (OrderingUtils.SecondValueOrdering) / / .map (tuple = > (tuple._2) Tuple._1)) / / .sortByKey (false) / / .take (3) / / .map (tuple = > (tuple._2, tuple._1)) println (s "Top Endpoints: ${topEndpoints.mkString (" [",", ") "]")} ") / * * = * / Stop SparkContext sc.stop ()}} ApacheAccessLog.scalapackage com.ibeifeng.bigdata.spark.app.core/** * Created by zhangyy on 2016-7-16. * * 1.1.1.1-- [21/Jul/2014:10:00:00-0800] *" GET / chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer .java HTTP/1.1 "* 1234 * / case class ApacheAccessLog (ipAddress: String ClientIndentd: String, userId: String, dateTime:String, method: String, endPoint: String, protocol: String, responseCode: Int ContentSize: Long) object ApacheAccessLog {/ / regex / / 1.1.1.1-- [21/Jul/2014:10:00:00-0800] "GET / chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234 val PARTTERN = "" ^ (\ S+) (\ S+) (\ S+)\ [([\ wvehicle /]] +\ S [+\ -]\ d {4})\] "(\ S+) (\ S+) (\ d {3}) (\ d +)" .r / * @ param log * @ return * / def isValidateLogLine (log: String): Boolean = {/ / parse log val res = PARTTERN.findFirstMatchIn (log) / / invalidate if (res.isEmpty) {false} Else {true}} / * @ param log * @ return * / def parseLogLine (log: String): ApacheAccessLog = {/ / parse log val res = PARTTERN.findFirstMatchIn (log) / / invalidate if (res.isEmpty) {throw new RuntimeException ("Cannot parse log line:" + log)} / / get valueval m = res.get / / return ApacheAccessLog (/ / m.group (1) / / m.group (2), m.group (3), m.group (4), m.group (5), m.group (6), m.group (7), m.group (8). ToInt M.group (9) .toLong)} OrderingUtils.scalapackage com.ibeifeng.bigdata.spark.app.coreimport scala.math.Ordering/** * Created by zhangyy on 2016-7-16. * / object OrderingUtils {object SecondValueOrdering extends Ordering [(String, Int)] {/ * @ param x * @ param y * @ return * / override def compare (x: (String, Int), y: (String) Int): Int = {x._2.compare (.room2) / / x.room2 compare .room2 / / 1 to 10 | 1.to (10)}}
Second, the meaning of Spark RDD2.1:RDD: RDD, whose full name is Resilient Distributed Datasets, is a fault-tolerant and parallel data structure that allows users to explicitly store data to disk and memory, and to control the partition of the data. At the same time, RDD provides a rich set of operations to manipulate the data. Among these operations, transformation operations such as map, flatMap, filter, etc., implement the monad pattern, which fits well with the collection operation of Scala. In addition, RDD also provides more convenient operations such as join, groupBy, reduceByKey (note that reduceByKey is action, not transformation) to support common data operations 2.2 and the structure of RDD in hdfs
Val rdd = sc.textFile ("/ spark/rdd") rdd.partitions.lengthrdd.cacherdd.count one partition defaults to one task partition to handle the default is two partitions to handle
A list of partitions: (protected def getPartitions: Array [Partition]) A series of slices, such as 64m, similar to split2 in hadoop. A function ofr computing each split: (@ DeveloperApi def compute (split: Partition, context: TaskContext): Iterator [T]) there is a way on each shard to iterate / execute / calculate 3. A list of dependencies on other RDD: (protected def getDependencies: Seq [DependencyDependency[ _]] = deps) A series of dependencies: RDDa is converted to RDDb, converted to RDDc, so RDDc depends on RDDb RDDb also relies on RDDa-wordcount program: # # val rdd = sc.textFile ("xxxx") val wordRdd = rdd.flatMap (_ .split (")) val kvRdd = wordRdd.map ((_, 1)) val WordCountRdd = kvRdd.reduceByKey (_ + _) # wrodcountRdd.saveAsTextFile (" yy ") kvRdd tuple._2._2.isEmpty). Collectrepartition () Application: val rdd = sc.textFile (" / spark/rdd ") rdd.repartition (2) rdd.count
2.5.2: RDD Actions operation
Val list = List (("aa", 1), ("bb", 4), ("aa", 56), ("cc", 0), ("aa", 89), ("cc", 34)) val rdd = sc.parallelize (list) rdd.countByKey
Wordcount transforms val rdd = sc.textFile ("\ input") rdd.flatMap (_ .split (")) .map ((_, 1)) .countByKey
Foreach () uses val list = List (1Jing 2 Jing 3 Jing 4 Jing 5) val rdd = sc.parallelize (list) rdd.foreach (line = > println (line)) grouping topkeyaa 78bb 98aa 80cc 98aa 69cc 87bb 97cc 86aa 97bb 78bb 34cc 85bb 92cc 72bb 32bb 23val rdd = sc.textFile ("/ topkeytest") val topRdd = rdd.map (line = > line.split (")) .map (arr = > (arr (0), arr (1) .toInt). GroupByKey (). Map (tuple = > (tuple._1)) Tuple._2.toList.sorted.takeRight (3) .reverse) topRdd.collect
Third, the three major functions of SparkContext 3.1. no broadcast variables are used.
The function of SparkContext:-1, apply for resources from Master (master node, master node of cluster management), run all Executor-2, create entry sc.textFile ("") of RDD / / create sc.parxx () / / parallelization from external storage system Create-3 from a collection in Driver Scheduling management JOB runs DAGScheduler, TaskScheduler-- 3.1 builds DAG diagrams for each Job-- 3.2DAG diagrams are divided into Stage according to whether there is Shuffle backstepping between RDD (Stack)-- 3.3The Task code is the same in each phase of TaskSet in each Stage. Only deal with data differently 3.2 use broadcast variables
Val list = List (".", "?", "!", "#", "$") val braodCastList = sc.broadcast (list) val wordRdd = sc.textFile ("") wordRdd.filter (word = > {braodCastList.value.contains (word)}) 3.4 spark cluster mode
3.4.1 deployment mode for spark: the default mode for 1.spark is local mode spark-submint Scala_Project.jar
2. Spark job runs in client cluster mode: spark-submit-- master spark://192.168.3.1:7077-- deploy-mode cluster Scala_Project.jar
3.5 ways for spark to add external dependencies on jar packages:-- jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. The location of the jar package must be written to determine the path. Method 2:-- driver-class-path Extra class path entries to pass to the driver. Note that jars added with-- jars are automatically included in the classpath. Method 3: SPARK_CLASSPATH configure this environment variable 3.5.1 Spark Application submission in the enterprise Shell script sparkripappsubmit.shvuzhuanxinxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Tools/scalaProject.jar 4: Spark on YARN4.1 starts the YARN of hadoop cd / soft/hadoop/sbin starts rescouremanager:. / yarn-daemon.sh start resourcemanager starts nodemanger:./yarn-daemon.sh start nodemanager
4.2 Architecture of yarn
YARN-1, distributed resource management master node: ResouceManager slave node: NodeManager-> responsible for managing resources (memory and CPU Core)-2 on each machine, resource scheduling-1, container Container AM/Task-2, for each application running on YARN An application manager ApplicaitonMaster resource request and task scheduling 4.2 Spark ApplicationSpark Application-1 dint driver Program resource request and task scheduling-2 Executor executors each Executor is actually a JVM, that is, a process with spark deploy mode: client AM-- all allow Executors to run in Container in Container Similar to Map Task and Reduce Task in MapReduce tasks: Driver-> AM-> RM
4.3Operation spark-shell of spark on yarn-- master yarn
4.4 spark job on yarncd jars/spark-submit-master yarn-deploy-mode cluster Scala_Project.jar
Fifth, dependence of spark RDD 5.1 RDD Rependencies
Spark's wordcount## val rdd = sc.textFile ("/ input") # # val wordRdd = rdd.flatMap (_ .split (")) val kvRdd = wordRdd.map ((_, 1)) val wordcountRdd = kvRdd.reduceByKey (_ + _) # # wordcountRdd.collect- input-> wordRdd-> kvRdd: Stage-01-> ShuffleMapStage-> SMT- > wordcountRdd-> output: Stage-02-> ResultStage-> ResultTask1. Narrow dependency (narrow dependencies) 1.1: each partition of the child RDD depends on constant parent partitions (that is, independent of the data size) 1.2: input and output one-to-one operators, and the partition structure of the RDD remains unchanged, mainly map,flatMap 1.3: output one-to-one, the partition structure of the single result RDD changes, such as: union,coalesce 1.4: the operator that selects some elements from the input Such as filer,distinct,subtract,sample2. Wide dependency (wide dependencies) 2.1: each partition of the child RDD depends on all parent RDD partitions 2.2: key-based reorganization and reduce for a single RDD, such as groupByKey,reduceByKey 2.3: join and reorganization of two RDD based on key For example, how does join determine whether the RDD is narrow dependent or wide dependent: each partition data of the parent RDD gives each partition data of the child RDD 1-> 11-> N: the inherent principle of the shuffle5.2.1 spark shuffle of Shuffle5.2 spark in MapReduce in the MapReduce framework, shuffle is the bridge between Map and Reduce, and the output of Map must go through shuffle if it is used in Reduce. The performance of shuffle directly affects the performance and throughput of the whole program. As an implementation of MapReduce framework, Spark naturally implements the logic of shuffle. 5.2.2 shuffleShuffle is a specific phase in the MapReduce framework, which is between Map phase and Reduce phase. When the output result of Map is to be used by Reduce, the output result needs to be hashed by key and distributed to each Reducer. This process is shuffle. Because shuffle involves disk reading and writing and network transmission, the performance of shuffle directly affects the running efficiency of the whole program. The following diagram clearly describes the entire flow of the MapReduce algorithm, where shuffle phase is between Map phase and Reduce phase.
Conceptually, shuffle is a bridge to communicate data connections, so in fact, how is this part of shuffle (partition) realized? let's take Spark as an example to talk about the implementation of shuffle in Spark. 5.2.3 shuffle for spark
1. First of all, each Mapper will create a corresponding number of bucket,bucket according to the number of Reducer is M × RM × R, where MM is the number of Map and RR is the number of Reduce. two。 Secondly, the results produced by Mapper will be populated into each bucket according to the set partition algorithm. The partition algorithm here can be customized, and of course the default algorithm is to go to different bucket according to key hash. When Reducer starts, it fetches the corresponding bucket from the remote or local block manager according to the id of its own task and the id of the Mapper it depends on as input to the Reducer. Bucket here is an abstract concept. In the implementation, each bucket can correspond to a file, part of a file or something else. 3. The Shuffle process of Apache Spark is similar to the Shuffle process of Apache Hadoop. Some concepts can be applied directly. For example, in the Shuffle process, one end of providing data is called Map side, and each task of generating data on Map side is called Mapper, and the corresponding end of receiving data is called Reduce side. Every task of pulling data on Reduce side is called Reducer,Shuffle process, which essentially divides the data obtained by Map side using a divider. The process of sending data to the corresponding Reducer. Those operations can cause shuffle1. With readjusted partition operation, eg: repartition,coalese2. * ByKey eg: groupByKey,reduceByKey3. Associate operation eg:join,cogroup
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.