Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Parsing Spark in this way

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

To analyze Spark in this way, I believe that many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Spark scene

Spark is a memory-based iterative computing framework, which is suitable for applications that need to manipulate specific data sets multiple times. The more repeated operations are required, the greater the amount of data that needs to be read, the greater the benefit, and the smaller the amount of data but more computing-intensive situations, the benefit is relatively small. Due to the characteristics of RDD, Spark is not suitable for applications that update status with asynchronous fine granularity, such as web service storage or incremental web crawlers and indexes. That is, the incremental modified application model is not suitable for the large amount of data, but it requires real-time statistical analysis.

Spark Master mode (Url)

1. Local: in this way, a thread is started locally to run the job; 2. Local [N]: also in local mode, but N threads are started; 3. Local [*]: it is still in local mode, but all cores in the system are used; 4. Local [NJM]: there are two parameters, and the first represents the number of cores used. The second parameter allows the job to fail M times; 5, local-cluster [N, cores, memory]: local pseudo-cluster mode; 6, spark://: this is the Standalone mode of Spark; 7, (mesos | zk): / /: this is Mesos mode; 8, yarn\ yarn-cluster\ yarn-client: this is YARN mode. The first two represent the cluster mode; the latter represent the client mode; 9. Simr://: simr is actually an abbreviation for Spark In MapReduce

Spark deploy mode

1. Local mode local mode has a pseudo-cluster mode (local-cluster), and all local uses LocalBackend and TaskSchedulerImpl classes. LocalBackend receives receiveOffers () calls from TaskSchedulerImpl, generates WorkerOffer based on the CPU core passed in from running Application, calls scheduler.resourceOffers (offers) to generate Task, and finally executes these Task through executor.launchTask. 2. The Standalone Standalone pattern using SparkDeploySchedulerBackend and TaskSchedulerImpl,SparkDeploySchedulerBackend inherits from the CoarseGrainedSchedulerBackend class and overrides some of these methods. CoarseGrainedSchedulerBackend is a coarse-grained resource scheduling class that saves all the Executor during the entire run of the Spark job, and does not release the Executor or request a new Executor from the Scheduler when the task is finished. There are many ways to start Executor, which need to be judged according to the Master URL submitted by Application. A DriverActor class is encapsulated in CoarseGrainedSchedulerBackend that accepts Executor registrations (RegisterExecutor), status updates (StatusUpdate), responds to Scheduler ReviveOffers requests, kills Task, and so on. One or more CoarseGrainedExecutorBackend will be started in this mode. Specifically, the Master is requested to register the Application through the AppClient class. When the registration is successful, Master will give feedback to Client and call schedule to start Driver and CoarseGrainedExecutorBackend, and the launched Executor will register with DriverActor. CoarseGrainedExecutorBackend then launches the submitted Task through the aunchTask method. 3. The classes involved in yarn-cluster yarn-cluster cluster mode are YarnClusterScheduler and YarnClusterSchedulerBackend. YarnClusterSchedulerBackend also inherits from CoarseGrainedSchedulerBackend. YarnClusterScheduler, which inherits from TaskSchedulerImpl, simply encapsulates TaskSchedulerImpl and overrides the getRackForHost and postStartHook methods. The Client class launches a Container on the Hadoop cluster through YarnClient, runs ApplicationMaster in it, and launches multiple Container in the cluster to run CoarseGrainedExecutorBackend through the interface provided by Yarn, and registers with DriverActor in CoarseGrainedSchedulerBackend. 4. The classes involved in yarn-client yarn-cluster cluster mode are YarnClientClusterScheduler and YarnClientSchedulerBackend. YarnClientClusterScheduler inherits from TaskSchedulerImpl and overrides the getRackForHost method in it. In Yarn-client mode, an ExecutorLauncher is launched outside the cluster as driver, and the cluster is asked to apply for Container to start CoarseGrainedExecutorBackend and register with DriverActor in CoarseGrainedSchedulerBackend. 5. There are two scheduling modes of Mesos Mesos mode: coarse-grained and fine-grained. The classes involved in coarse granularity are CoarseMesosSchedulerBackend and TaskSchedulerImpl classes, while the classes involved in fine granularity are MesosSchedulerBackend and TaskSchedulerImpl classes. Both CoarseMesosSchedulerBackend and MesosSchedulerBackend inherit MScheduler (actually Mesos's Scheduler) and make it easy to register into the Mesos resource scheduling framework. Choose which mode can be configured through the spark.mesos.coarse parameter. The default is MesosSchedulerBackend.

The above involves many deployment models of Spark, it is difficult to say which model is good, depending on the requirements, if you are just testing Spark Application, you can choose the local mode. And if the amount of data is not very large, Standalone is a good choice. When you need to centrally manage cluster resources (Hadoop, Spark, etc.), you can choose Yarn, but the maintenance cost will be higher. There is still a big difference between the internal implementation of the yarn-cluster and yarn-client patterns. If you need to use it in a production environment, choose yarn-cluster;, and if it's just a Debug program, you can choose yarn-client.

Spark Jar/File Url format

The file:/ files are absolute paths, and file:/URI is downloaded through the drive's HTTP file server, and each executor pulls these files from the drive's HTTP server. Hdfs:/http:/https:/ftp: Spark will download the required files and jar packages from the specified URI location. Local:/ specifies local or shared files that are accessible on each worker node. This means that it does not take up network IO, especially for large files or jar packages, and it is best to use this approach, where files can be shared through NFS and GlusterFS when you need to push files to each work node.

Spark execution model

Dependency Dependency represents the dependency between RDD, that is, the consanguinity NarrowDependency represents narrow dependency, that is, the partition of the parent RDD, which is most used by one partition of the child RDD. Therefore, parallel computing is supported. OneToOneDependency indicates that the partition dependency of parent RDD and child RDD is one-to-one RangeDependency means that within a range range, the dependency is one-to-one, so there will be a range during initialization, the partitionId outside the scope, and the returned Nil Shuffle represents the wide dependency. The RDD is in the form of KV, requires a partitioner to specify the partition method, and requires a serialization tool class Partition Partition to specifically represent each data partition of RDD. PartitionerPartitioner determines how the RDD in the form of KV partition the accompanying object of the default PartitionerPartitioner according to key. The logic is: in the incoming RDD (at least two), traverse (the order is the number of partition from large to small) RDD, and if there is already a Partitioner, use it. If none of the RDD has Partitioner, the default HashPartitioner is used. The number of initialization partition of HashPartitioner depends on whether Spark.default.parallelism is set. If not, take the value of the largest number of partition in RDD HashPartitioner based on Java Object.hashCode. There will be a problem that Java's Array has its own hashCode and is not based on the content in Array, so there is a problem with using HashPartitioner for RDD [Array [_]] or RDD [(Array [_], _)]. The KV RDD handled by RangePartitioner requires Key to be sortable, that is, the default cache () process of the ordered [K] type Persist/Unpersist that satisfies Scala is to put RDD persist in memory, persist () operation can reassign StorageLevel (useDisk, useMemory, useOffHeap, deserialized, replication) for RDD, persist is not action, and will not trigger any calculation CheckpointRDD Actions api that provides the checkpoint () method, and will put this RDD save into the SparkContext CheckpointDir directory. It is recommended that the RDD is already persist in memory, otherwise recomputation is required. TransformationsRDD transformation under ActionsRDD action see Job- > Stage- > Task- > Transformations/Action the Job of a Spark is divided into multiple stage, the last stage will include one or more ResultTask, and the previous stages will include one or more ShuffleMapTasks. ResultTask executes and returns the result to driver application. ShuffleMapTask separates task's output into multiple buckets according to task's partition. A ShuffleMapTask corresponds to a partition of a ShuffleDependency, and the total partition number is the same as the parallelism and the number of reduce. The DAGScheduler-oriented scheduling layer generates a DAG composed of stage for the job, and submits the TaskSet to the TaskScheduler for execution in the unit of stage. Within each Stage, there is an independent tasks, and they execute the same compute function together and enjoy the same shuffledependencies. DAG is divided into stage according to the occurrence of shuffle as the limit. DAGSchedulerEventTaskSchedulerTaskScheduler receives task, receives assigned resources and executor, maintains information, deals with backend, assigns tasks SchedulableBuilderFIFO and Fair, and addTaskSetManager adds TaskSetManager to pool. There is only one pool for FIFO. There are multiple pool,Pool in Fair, which can be divided into two modes: FIFO and Fair. That is, Stage encapsulates all the tasks of a stage and submits it to TaskSchedulerResultTask to directly generate the result ShuffleMapTask corresponding to ShuffleMap Stage. As the input TaskSetManager of other stage, the resulting result is responsible for starting this batch of Tasks, failing to retry, perceiving localization, and so on. Each time the reourseOffer method finds a suitable (conditional execId, host, locality) Task and launches it TaskResultGetter to maintain a thread pool for deserialization and remote acquisition of task results BlockManagerMaster/BlockManagerWorkerTaskResult contains BolckId, and BlockManagerMaster obtains (deserializes) block data through the bolck locations,BlockManagerWorker of this blockId through these locations

Spark RDD

RDD is an abstract data structure type in Spark, and any data is represented as RDD in Spark. From a programming point of view, RDD can be simply thought of as an array. Unlike ordinary arrays, data in RDD is stored in partitions so that data from different partitions can be distributed on different machines and can be processed in parallel. What the Spark application does is to convert the data that needs to be processed into RDD, and then perform a series of transformations and operations on RDD to get the result a RDD object that contains the following five core properties. A list of partitions in which part of the data (or blocks) of RDD is in each partition. A dependency list that stores other RDD of dependencies. A calculation function named compute (implemented by a subclass) is used to calculate the values of each partition of the RDD. A divider (optional) for RDD of key / value types, such as a RDD that is partitioned by hash. A list of priority locations when calculating partitions (optional). For example, when generating RDD from files on HDFS, the location of the RDD partition gives priority to the node where the data is located, which avoids the overhead of data movement.

Work with RDD

Object array-> object list-> object rddobject array-> object list-> Row list-> Row rdd + StructType schema-> object dfobject arrays-> object lists-> object rdds-> object rdd queue-> object dstream

RDD Transformer

Map (func): use func for each element in the RDD dataset that calls map, and then return a new RDD, which returns a distributed dataset keyBy (f: t = > K) filter (func): use func for each element in the RDD dataset that calls filter, and then return a RDDflatMap (func) that contains elements that make func true: similar to map But flatMap generates multiple results mapPartitions (func): very similar to map, but map is each element, and mapPartitions is each partitionmapPartitionsWithSplit (func): very similar to mapPartitions, but func works on one of the split, so there should be indexsample (withReplacement,faction,seed): sampling union (otherDataset): Union in func, returning a new dataset Set intersection (otherDataset) containing the elements of the source dataset and the given dataset: intersection subtract (otherDataset): difference distinct ([numTasks]): returns a new dataset, this dataset contains the elementgroupByKey (numTasks) of the distinct in the source dataset: return (KMagerSeq [V]), that is, the key-valuelistreduceByKey (func, [numTasks]) accepted by the reduce function in hadoop: it is generated by reacting on groupByKey with a given reducefunc, such as summing. Find the average sortByKey ([ascending], [numTasks]): sort by key, ascending or descending, ascending is boolean type join (otherDataset, [numTasks]): when there are two KV dataset (KMagol V) and (KMagol W), the dataset,numTasks returned (K, (VMagneW) is the number of concurrent tasks cogroup (otherDataset, [numTasks]): when there are two KV dataset (KMagazine V) and (KMagol W) The dataset,numTasks returned is the number of concurrent tasks cartesian (otherDataset): the Cartesian product is m*npipe (command: String), which outputs RDD data through ProcessBuilder to create additional processes to output zip (ProcessBuilder [U]): RDD [(T, U)] the number of two RDD partitions is the same, and the number of data entries per partition is the same

RDD Action

Reduce (func): to put it bluntly, it is aggregation, but the incoming function returns a value from two parameter inputs. This function must be a special reduce of fold (zeroValue: t) (op: (T, T) = > T) that satisfies the commutative law and the associative law, with initial values. Functional semantic foldaggregate (zeroValue: U) (seqOp: (U, T) = > U, combOp: (U, U) = > U): an aggregation method with three complete conditions: initial value, reduce aggregation and merge aggregation. Rdd transfers the function to the partition to do the calculation, and finally summarizes the results of each partition to calculate subtract (RDD [T]): rdd is implemented as map (x = > (x, null)). SubtractByKey (other.map ((_, null)), p2). Keys and intersection are similar to collect (): usually when the result is filter or small enough Then use collect encapsulation to return an array count (): return the number of element in dataset first (): return the first element in dataset, top (n) (ordering): the handler function passed into top in each partition, get the heap of the partition, use rdd.reduce (), stack and sort each partition, take the first n take (n): return the first n elements, the takeSample returned by this driverprogram (withReplacement,num) Seed): sample returns num elements in a dataset, random seed seedsaveAsTextFile (path): write dataset to a textfile, or hdfs, or a file system supported by hdfs. Spark converts each record into a row of records, and then writes to file saveAsSequenceFile (path): can only be used on key-value pairs, and then generates SequenceFile writes to local or hadoop file system countByKey (): returns a map corresponding to the number of key Acting on a RDDcountByValue (): Map [T, Long] rdd is implemented as map (value = > (value, null)) .countByKey (): it is essentially a simple combineByKey that returns Map and puts the whole load into driver memory, requiring a smaller dataset foreach (func): use funcmax () / min () special reduce for each element in dataset Pass in the max/min comparison function PairRDDFunctions.DoubleRDDFunctionssum () rdd implementation is reduce (_ + _) stats () rdd implementation is mapPartitions (nums = > Iterator (StatCounter (nums). Reduce ((a, b) = > a.merge (b)) StatCounter counts the median, variance and count in one traversal Merge () is his internal method mean () rdd implementation is stats (). Meanvariance () / sampleVariance () rdd implementation is stats (). Variancestdev () / sampleStdev () rdd implementation is stats (). Stdev to find the standard deviation meanApprox () / sumApprox () call runApproximateJobhistogram () is a more complex calculation, rdd implementation is first mapPartitions and then reduce, including several recursions

Spark Core

The distributed parallel computing framework of directed acyclic graph (DAG) is provided, and the Cache mechanism is provided to support multiple iterative computing or data sharing, which greatly reduces the overhead of reading data offices between iterative computations. This greatly improves the performance of data mining and analysis that requires multiple iterations. RDD (Resilient Distributed Dataset) abstraction is introduced into Spark, which is a set of read-only objects distributed in a group of nodes. These sets are flexible, and if part of the dataset is lost, they can be rebuilt according to the "pedigree" to ensure the high fault tolerance of the data. Mobile computing instead of mobile data, RDD Partition can read blocks of data from the distributed file system to the memory of each node to calculate using the multi-thread pool model to reduce task startup by using fault-tolerant, highly scalable akka as the communication framework.

Spark SQL

A new RDD type SchemaRDD is introduced, which can be defined as a traditional database definition table that consists of row objects that define column data types. SchemaRDD can be converted from RDD, read from a Parquet file, or obtained from Hive using HiveQL. Embedded Catalyst query optimization framework, after parsing SQL into a logical execution plan, use some classes and interfaces in the Catalyst package to perform some simple execution plan optimization, and finally become the calculation of RDD in the application can be mixed with data from different sources, such as data from HiveQL and data from SQL for Join operations. Memory column storage (In-Memory Columnar Storage), the table data of sparkSQL is stored in memory not by the original JVM object storage mode, but by memory column storage; bytecode generation technology (Bytecode Generation), Spark1.1.0 adds a codegen module to the expressions of the Catalyst module, uses dynamic bytecode generation technology, and uses specific code to dynamically compile matching expressions. In addition, SQL expressions are optimized by CG, and the implementation of CG optimization mainly depends on Scala2.10 's runtime radiation mechanism (runtime reflection); Scala code optimization SparkSQL tries to avoid inefficient and easy GC code when writing code using Scala; although it increases the difficulty of writing code, the interface is unified for users.

Spark MLlib

MLBase is part of the Spark ecosystem that focuses on machine learning, making it easier for users who may not know about machine learning to use MLbase. MLBase is divided into four parts: MLlib, MLI, ML Optimizer and MLRuntime. ML Optimizer will select the machine learning algorithm and related parameters that it considers most suitable to be implemented internally to process the data entered by the user and return the results of the model or other help analysis. MLI is an API or platform for algorithm implementation of feature extraction and advanced ML programming abstraction. MLlib is a common machine learning algorithm and utility program implemented by Spark, including classification, regression, clustering, collaborative filtering, dimensionality reduction and underlying optimization. MLRuntime is based on Spark computing framework and applies Spark distributed computing to the field of machine learning.

Spark GraphX

GraphX is the API used for graph (e.g.Web-Graphs and Social Networks) and graph parallel computing (e.g.PageRank and Collaborative Filtering) in Spark, which can be regarded as the rewriting and optimization of GraphLab (C++) and Pregel (C++) on Spark (Scala). Compared with other distributed graph computing frameworks, GraphX's greatest contribution is to provide a stack of data solutions on top of Spark, which can easily and efficiently complete a set of pipelining operations for graph computing. GraphX started as a distributed graph computing framework project of Berkeley AMPLAB and later integrated into Spark as a core component. The core abstraction of GraphX is Resilient Distributed Property Graph, a directed multigraph with attributes on both vertices and edges. It extends the abstraction of Spark RDD, with both Table and Graph views, and requires only one physical storage. Both views have their own unique operators, resulting in flexible operation and execution efficiency. Just like the code of Spark,GraphX is very concise. The core code of GraphX is only more than 3, 000 lines, while the Pregel model implemented on it is only more than 20 lines. The overall code structure of GraphX is shown in the following figure, most of which are implemented around the optimization of Partition. To some extent, this shows that the storage of point segmentation and the corresponding computational optimization are indeed the key and difficult points of the graph computing framework. The underlying design of GraphX has the following key points. 1. All operations on the Graph view are eventually converted to the RDD operation of its associated Table view. In this way, the calculation of a graph is logically equivalent to a series of RDD conversion processes. Therefore, Graph finally has three key features of RDD: Immutable, Distributed, and Fault-Tolerant. The most important of these is Immutable (invariance). Logically, the transformation and operation of all graphs produce a new graph; physically, GraphX will have a certain degree of reuse optimization of invariant vertices and edges, which is transparent to users. two。 The physical data shared at the bottom of the two views consists of two RDD, RDD [VertexPartition] and RDD [EdgePartition]. Points and edges are not actually stored in the form of table Collection [tuple], but VertexPartition/EdgePartition stores a fragment of data with an indexed structure internally to speed up traversal in different views. The constant index structure is shared in the RDD conversion process, which reduces the computing and storage overhead. 3. The distributed storage of the graph adopts the point partition mode, and uses the partitionBy method, and different partition strategies (PartitionStrategy) are specified by the user. The partitioning strategy assigns edges to each EdgePartition, and vertex Master assignment to each VertexPartition,EdgePartition caches the Ghost copy of the local border point. Different partition strategies will affect the number of Ghost copies to be cached and the balance of the edges assigned by each EdgePartition. It is necessary to choose the best strategy according to the structural characteristics of the graph. There are four strategies: EdgePartition2d, EdgePartition1d, RandomVertexCut and CanonicalRandomVertexCut. In most scenes of Taobao, EdgePartition2d works best.

Spark Streaming

SparkStreaming is a high-throughput, fault-tolerant streaming system for real-time data streams. It can perform complex operations such as Map, Reduce and Join on a variety of data sources (such as Kdfka, Flume, Twitter, Zero and TCP sockets), and save the results to external file systems, databases or real-time dashboards. Calculation process: Spark Streaming is the decomposition of streaming computing into a series of short batch jobs. The batch engine here is Spark Core, that is, the input data of Spark Streaming is divided into segments of data (Discretized Stream) according to batch size (such as 1 second), each piece of data is converted into RDD (Resilient Distributed Dataset) in Spark, and then the Transformation operation on DStream in Spark Streaming is changed into Transformation operation on RDD in Spark, and the RDD is stored in memory after the operation. The whole streaming computing can overlay the intermediate results or store them to external devices according to the needs of the business. The following figure shows the entire process of Spark Streaming. Fault tolerance: fault tolerance is very important for streaming computing. First of all, we need to clarify the fault-tolerant mechanism of RDD in Spark. Each RDD is an immutable distributed recalculable data set, which records deterministic operational inheritance relations (lineage), so as long as the input data is fault-tolerant, the Partition of any RDD can be recalculated by conversion operations using the original input data. Real-time: the discussion of real-time will involve the application scenario of streaming framework. Spark Streaming decomposes streaming computing into multiple Spark Job, and the processing of each piece of data goes through the decomposition of Spark DAG diagram and the scheduling process of Spark task set. For the current version of Spark Streaming, the minimum Batch Size is chosen between 0.5 seconds and 2 seconds (the smallest delay of Storm is about 100ms), so Spark Streaming can meet all streaming quasi-real-time computing scenarios except for very high real-time requirements (such as high-frequency real-time transactions). Scalability and throughput: Spark has been able to scale linearly to 100 nodes (4Core per node) on EC2, can process the data volume of 6GB/s with a delay of several seconds (60m records/s), and its throughput is 2-5 times higher than the popular Storm. Berkeley uses WordCount and Grep use cases to test, in the Grep test, the throughput of each node in Spark Streaming is 670k records/s, while Storm is 115k records/s.

SparkR

SparkR is a R development package released by AMPLab, which makes R get rid of the fate of running on a single machine, and can run on the cluster as the job of Spark, which greatly expands the data processing capacity of R. Several features of SparkR: API of flexible distributed dataset (RDD) in Spark is provided, and users can run Spark job interactively on the cluster through R shell. Support for ordering closures, which can automatically serialize variables referenced in user-defined functions to other machines in the cluster. SparkR can also easily invoke the R development package by reading the R development package with includePackage before performing operations on the cluster, of course, by installing the R development package on the cluster.

SparkPython

Spark Python

Pom.xml

Org.scala-lang scala-library 2.11.8 org.apache.spark spark-mllib_2.11 2.0.0 com.typesafe.play play-json_2.11 2.3.9 net.alchim31.maven scala-maven-plugin 3.1.3 scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile Maven_central http://central.maven.org/maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/snapshots Typesafe http://repo.typesafe.com/typesafe/releases/ maven_central http://central.maven.org / maven2/ sonatype-nexus-snapshots https://oss.sonatype.org/content/repositories/releases/

Tests.scala

Def listRdd () {var sc = new SparkContext ("local [1]", "spdb") var sqlContext = new SQLContext (sc) var listStr1 = "" zm,zn,zq "" var list = listStr1.split (","). ToList var rdd = sc.parallelize (list, 2) var max = rdd.max () println (max)}

After reading the above, have you mastered the method of parsing Spark in this way? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report