In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
What is Spark?
A) it is a general big data computing framework.
B) Spark Core offline calculation
Spark SQL interactive query
Spark Streaming real-time streaming computing
Spark MLlib machine learning
Spark GraphX diagram calculation
C) Features:
I. One-stop shop: a technology stack to solve the computing problems in big data's field
Ii. Based on memory
D) Spark2009 was born in the AMPLab Laboratory at the University of Berkeley
The Spark project was officially opened up in 2010.
In 2013, Spark became a project under Apache
It developed rapidly in 2014 and became the top project of Apache.
Rose in China in 2015 to replace mr,hive,storm and so on.
Author: Xin Shi (shi)
E) Spark and Hive:
Advantages of Spark:
I. High speed
Ii. Spark SQL supports a large number of different data sources
F) Spark and Storm
I. The calculation model is different
Ii. Large throughput of Spark
G) Features: fast, easy to use, universal, compatibility
H) spark operation mode
I. local (local)
Ii. Standalone (Cluster)
Iii. On yarn (yarn is responsible for task scheduling and computing as the resource scheduling Spark)
Iv. On mesos (with mesos as the resource scheduling S)
V. on cloud ()
I) configuration steps
= on yarn=
[description]
1. Spark tasks run on yarn, and yarn schedules and manages resources. Spark is only responsible for task scheduling and calculation.
two。 No need to configure and start the spark cluster
3. You only need to install and configure spark on yarn mode on the node where the task is submitted
4. Must find a node to install spark
5. Steps:
i. Install and configure JDK
ii. Vi spark-env.sh
1. Export JAVA_HOME=/opt/modules/jdk1.7_6.0
2. Export HADOOP_CONF_DIR = / opt/modules/hadoop/etc/hadoop
iii. Test whether the spark on yarn mode is installed successfully
iv. Network testing: http://hadoop-yarn1.beicai.com:8088
= sdandalone mode =
[description]
1. Spark runs on the spark cluster, and spark is responsible for resource scheduling and management. At the same time, it is also responsible for task scheduling and computing.
two。 Need to configure and start the spark cluster
3. Steps:
i. Install and configure JDK
ii. Upload and extract Spark
iii. Establish a soft connection ln-s spark spark or change the name
iv. Configure environment variables
v. Install configuration Spark and modify spark configuration files (spark-env.sh, slaves)
1. Vi spark-env.sh
A) export JAVA_HOME=/opt/modules/jdk (jdk location)
B) export SPARK_MASTER_IP=hadoop-yarn1.beicai.com
C) export SPARK_MASTER_PORT=7077
2. Vi slaves (used to specify which nodes to start worker on)
A) hadoop-yarn2.beicai.com
Hadoop-yarn3.beicai.com
vi. Send spark to another host
vii. Start
/ opt/modules/spark/bin/start-all.sh
vii. Check out the SparkUI interface: http://hadoop-yarn1.beicai.com:8080
4.
j)
I. Spark principle
1. The operation principle of Spark
I, distributed
Ii, primarily memory-based (in a few cases disk-based)
Iii, iterative calculation
2. Spark Computing Mode comparison of VS MapReduce Computing Mode
The computing model of Mr is relatively fixed, and there are only two stages, the map phase and the reduce phase. When the two phases are over, the task ends, which means that our operation is very limited, only in the map phase and reduce phase, and it also means that multiple mr tasks may be required to complete the job.
Spark is iterative computing. After a phase ends, there can be multiple stages until the task is completed, which means that we can do a lot of operations. This is where the Spark computing model is more powerful than mr.
What is Spark RDD?
1. What is RDD?
Resilient, distributed, dataset
(RDD logically represents a file on a HDFS, which is divided into multiple partitions and scattered on multiple nodes of the Spark.)
3. RDD---- elasticity
When the data of a partition of RDD is saved to a node, and when the memory of this node is limited, Spark will selectively save part of the data to the hard disk. For example, when the memory of worker can only hold 20w pieces of data, but this partition of RDD has 30w pieces of data, then Spark will save the extra 10w pieces of data to the hard disk. This selective tradeoff between memory and hard disk of Spark is the elastic feature of RDD.
4. Fault tolerance of Spark
The most important feature of RDD is that it provides fault tolerance and can automatically recover from the failed node, that is, if the RDD partition (data) on a node is lost, then RDD will automatically recalculate the partition through its own data source, which is transparent to the user.
2. The development type of Spark
(1) Core development: offline batch / demonstrative interactive data processing
(2), SQL query: the underlying layer is RDD and computing operations
(3) the bottom layer is RDD and computing operations.
(4) Machine learning
(5), figure calculation
3. Spark core development (Spark-core = = Spark-RDD) steps
(1) create the initial RDD
(2) convert the initial RDD to form a new RDD, and then operate the new RDD until the operation calculation is completed.
(3) save the data of the last RDD to a certain medium (hive, hdfs,MySQL, hbase...)
5. Spark principle
Connections between nodes in Driver,Master,Worker,Executor,Task
The role of each node in Spark:
1. The role of driver:
(1) Registration of tasks with master
(2) build the basic environment for running tasks.
(3) reverse registration of the executor that accepts the task
(4) assign tasks to the executor that belongs to the task
2. What is driver?
After the program we write is packed into a jar package, we find a driver that can connect to the nodes of the spark cluster to do the task, specifically SparkSubmit.
3. The function of Master:
(1) Monitoring cluster
(2) the up and down line of dynamic aware worker
(3) accept the driver registration request
(4) scheduling of task resources
4. The function of Worker:
(1) report the status to master regularly.
(2) accept the master resource scheduling command to schedule resources.
(3) Container Executor to start the task
5. The function of Executor:
(1) Save the calculated RDD partition data
(2) reverse registration with Driver
(3) accept the task Task sent by Driver, and execute it on RDD.
The flow of Spark programming:
1. The program we wrote is packaged into a jar package, and then the Spark-Submit script is called to submit the task.
2. Start driver to initialize the task
3. Driver will encapsulate the task and its parameters (parameters related to core,memory,driver) into ApplicationDescript and submit them to Master through taskSchedulerImpl
4. When Master receives a request to register a task on the driver side, it parses the request parameters, encapsulates them into APP, persists them, and adds them to the waitingAPPs in its task queue.
5. When it is our turn to submit a task to run, master will call the schedule () method to schedule task resources.
6. Master encapsulates the scheduled resources into launchExecutor and sends them to the specified worker
7. When Worker receives the sent launchExecutor, it parses it and encapsulates it into ExecutorRunner, and then calls the start method to start Executor
8. After Executor is started, reverse registration will be made to the Driver of the task
9. When all executor belonging to this task are successfully started and reverse registered, driver will end the initialization of the SparkContext object
10. When sc is initialized successfully, it means that the basic environment for running the task is ready, and driver will continue to run the code we have written.
11. Start registering the initial RDD, and continue to perform conversion operations. When an action operator is triggered, it means that a job is triggered. At this time, driver divides the dependency between RDD into a stage, encapsulates the stage into a taskset, then serializes each task in the taskset, encapsulates it into a launchtask, and sends it to the specified executor for execution.
12. Executor receives the task task sent by driver, deserializes task, and then sets the corresponding operator (flatmap,map,reduceByKey.). ) acts on the RDD partition
VI. Detailed explanation of RDD
1. What is RDD?
RDD (Resilient Disttibuted Dataset), called elastic distributed dataset, is the most basic data abstraction in spark. It represents an immutable, partitioned set of elements that can be computed in parallel.
2. The characteristics of RDD:
Automatic fault tolerance
Location-aware scheduling
Flexibility
3. Attributes of RDD:
(1) A group of fragments (partition), that is, the basic unit of the data set. For RDD, each shard is processed by a computing task and determines the granularity of parallel computing. Users can specify the number of RDD shards when creating RDD. If not, the default value will be used. The default value is the number of CPU Core assigned by the program.
(2) A function that calculates each partition. RDD in Spark is calculated in fragments, and each RDD implements the computer function to achieve this purpose. The Computer function composes iterators, eliminating the need to save the results of each calculation.
(3) the dependency relationship between RDD. Each transformation of RDD generates a new RDD, so there is a pipelined before-and-after dependency between RDD. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD.
(4) A partition, that is, the slicing function of RDD. At present, two types of sharding functions are implemented in Spark, one is based on hashPartitioner, and the other is based on scope RangePartitioner. Partitioner is available only for the RDD of key-value, and the value of Partitioner for non-key-value RDD is None. The Partitioner function determines not only the number of slices for RDD itself, but also the number of slices for partition RDD Shuffle output.
(5) A list that stores the priority location (preferred location) for accessing each Partition. For a HDFD file. This list holds the fast location of each Partition. According to the concept of "mobile data is not as good as mobile computing". When scheduling tasks, Spark assigns computing tasks to the storage location of the data blocks to be processed as much as possible.
4. Creation of RDD:
When doing Spark core programming, the first thing to do is to create an initial RDD. Spark Core provides three ways to create a RDD:
(1) use the collection in the program to create a RDD (call the parallelize () method)
(2) create RDD with local file (call textFile () method)
(3) use HDFD file to create RDD (call textFile () method)
7. Operator
1. What is operator?
Is a function defined in RDD that acts on each RDD shard. It can convert and manipulate the data in RDD.
2. Classification of RDD operators.
(1), Transformation operator, this kind of operator transformation does not trigger the submission job (the characteristic is lazy)
A RDD is returned
(2), Action operator, which triggers SparkContext to submit a job (triggers the run of a spark job, which triggers the execution of all previous transformation of the action)
A spark object is returned
3. Commonly used Transformation operators
8. RDD partition sorting
I, zoning
Two implementation methods: coalesce and repartition (the underlying call coalesce)
Coalesce (numPartitons,isShuffle)
The first parameter is the number of repartitions, and the second parameter is whether to perform shuffle.
If there were N partitions, there will be M partitions after the re-partition.
If M > N, the second parameter must be set to true (that is, shuffle), which is equivalent to repartition (numPartitons). If it is false, it will not work.
If M
< N 100-->The number of partitions after 10 multiple partitions is much smaller than the original, so it takes so long to use shuffle, that is, set to true.
The number of partitions after 100-90 multiple partitions is about the same as the original, so there is no need to use shuffle, that is, set to false.
II, sorting
SortBy (x = > x) is an operator with implicit conversion parameters.
X can sort (compare sizes), so this class must have a relatively large function, that is, it implements compareTo or compare.
There are two ways to achieve secondary sorting:
1. Inherit Comparable API or Ordered
2. Implicit conversion: implicit conversion function (Ordered) or implicit conversion value (Ordering) can be defined.
IX. Custom zoning
Custom Partition
Requirement: output the corresponding value to the specified partition according to key
Explanation: customize a custom partition class, inherit partitioner, and implement its two methods
1 、 numPartitions
2 、 getPartition
Customize the implementation of the specific function according to the requirements of the project, then call the partitionBy method, new the custom class, and pass in the parameters.
IX. RDD persistence principle
1. Persistence scenario: for a rdd, it will be referenced many times, and the rdd calculation process is complex, and the calculation time is very time-consuming.
2. How to persist, call the rdd.persist method or the cache method. The bottom layer of the cache method is to call the persist method.
* persist (StorageLevel.MEMORY_ONLY) *
If RDD is persisted, the default persistence level is storageLevel.MEMORY_ONLY, that is, persistence to memory. This persistence level is the most efficient, but because it is a pure Java object and saved to memory, the amount of memory that may be saved will be less.
* persist (StorageLevel.MEMORY_ONLY_SER) *
If we use MEMORY_ONLY_SER when our cluster resources are limited, we can use MEMORY_ONLY_SER, that is, to serialize Java objects and persist them into memory. The advantage of this persistence is that more data can be persisted into memory, but because serialization is needed during persistence and deserialization is needed after it is taken out, this process consumes CPU computing resources. Performance is slightly weaker than the persistence level of MEMORY_ONLY, but it is still more efficient
3. How to choose the RDD persistence strategy?
Spark provides a variety of persistence levels, mainly to make a trade-off between CPU and memory consumption. Here are some general persistence level selection recommendations:
1) give priority to using MEMORY_ONLY, and use this strategy if all data can be cached, because pure memory is the fastest and there is no serialization, so there is no need to consume CPU for deserialization
2) if the MEMORY_ONLY policy cannot store all the data, then using MEMORY_ONLY_SER to serialize the data, the pure memory operation is still very fast, but it consumes CPU for deserialization
3) if a fast failure recovery is needed, choose the policy with the suffix _ 2 to back up the data, so that in the event of failure, there is no need to recalculate
4. Do not use DISK-related strategies if you can. Sometimes, if you read data from disk, you might as well recalculate it.
11. Shared variables
1. There are two kinds of shared variables: broadcast variables and accumulators
Broadcast variable (broadcast)
2. Daily problems
Because each task needs to copy such a copy to executor for execution, we can imagine that if there are 1000 task executed on a worker, and this copy has 100m, it means that we need to copy 100g of data to be executed on a worker, which will greatly consume our network traffic and increase the memory consumption of executor, thus increasing the running time of our spark jobs. It greatly reduces the running efficiency of spark jobs and increases the probability of job failure.
3. How to solve the above problems, that is, when to use broadcast variables?
When RDD refers to an external variable and the amount of data of this external variable is large, and the number of task corresponding to this RDD is very large, then it is more appropriate to use broadcast shared variables.
We can make such a large external variable into a broadcast variable, and when the external variable is made into a broadcast variable, then there will be only one external variable in the memory of each executor, and this copy is shared for all task, so that the network traffic consumption is reduced, the memory consumption of executor is reduced, the running efficiency and running time of spark jobs are improved, and the probability of job failure is reduced.
4. The process of using broadcast variables:
1) if the first task of an executor is executed first, it will first look for an external variable from its own blockManager. If it is not available, the external variable will be obtained from the memory of the neighbor's executor's blockManager. If it is still not available, it will be obtained from the driver side and copied to the local executor's blockManager.
2) when other task of this executor is executed, there is no need to obtain a copy of this external variable from the outside, but directly from the local blockManager
5. How to get the value of broadcast variable?
You can call the value () method of the broadcast variable directly.
[note] broadcast variables are read-only and not writable
Accumulator (Accumulator)
The Accumulator provided by Spark is mainly used for multiple nodes to share a variable, while Accumulator only provides cumulative functions. But it provides us with the function of multiple task to operate on a variable in parallel, but task can only accumulate operations on Accumulator.
[note] task can only add classes to Accumulator, and only Driver programs can read the value of Accumulator
Explanation of RDD Partition and Fault tolerance Mechanism
1. RDD's Lineage bloodline
RDD only supports coarse-grained transformations, that is, a single operation performed on a large number of records, recording a series of Lineage (lineages) that created the RDD. In order to recover the lost partition
2. The dependence of RDD
There are two different types of relationships between RDD and its parent RDD:
1), narrow dependence (one-to-one, many-to-one)
Figurative metaphor: only child
2), wide dependence (many to many)
The metaphor of image: Chaosheng
Note: stage is divided on the basis of wide dependency, that is, whether there is a shuffle,shuffle process between RDD is a wide dependency process. The tasks before shuffle belongs to a stage,shuffle and the operations before and after stage,shuffle are narrow dependencies.
[note] shuffle process is divided into shuffle Write process and shuffle read process.
4. Generation of DAG (directed acyclic graph) and partition of tasks
DAG (Directed Acyclic Graph) is called a directed acyclic graph (a directed acyclic graph)
5. How many RDD will be generated by a wordCount process?
At least five RDD will be generated.
First, after loading from HDFS, you get a RDD (that is, using the sc.textFile () operator), that is, HadoopRDD
In the sc.textFile () process, a RDD (calling the map operator) is also generated, resulting in a MapPartitionRDD.
Second, use the flatMap operator to get a MapPartitionRDD
Third, use the map operator to get a MapPartitionRDD
Fourth, use the reduceByKey operator, that is, you will get a shuffledRDD after the shuffle process.
Fifth, use the saveAsTextFile operator to generate another MapPartitionRDD
Explanation of spark Program submission process
Brief introduction to Spark tasks:
Spark-submit--- > SparkSubmit-- > main-- > submit-- > doRunMain-- > RunMain-- > create the instance object of the main class we wrote through reflection, call the main method-- > start executing the code we wrote-- > initialize the SparkContext object-- > create the initial RDD-- > trigger the action operator-- > submit job-- > worker to execute the task-- > end of the task.
Detailed description of Spark tasks:
1) package the program we wrote into a jar package
2) call the spark-submit script to submit the task and run it on the cluster
3) run the main method of sparkSubmit, in which you create an instance object of the main class we wrote by reflection, and then call the main method to start executing our code (note that driver in our spark program runs in the sparkSubmit process)
4) when the contemporary code runs to create the SparkContext object, it starts to initialize the SparkContext object
5) when initializing the SparkContext object, two particularly important objects are created: DAGScheduler
And TaskScheduler
[role of DAGScheduler] split the dependencies of RDD into stage, and then submit stage to DriverActor as taskSet
6) while building taskScheduler, two very important objects will be created, namely DriverActor and ClientActor
[role of clientActor] tasks submitted to master registered users
[role of DriverActor] accept the reverse registration of executor and submit the task to executor
7) when clientActor starts, the task submitted by the user and related parameters will be encapsulated in the ApplicationDescription object, and then submitted to master for task registration
8) when master receives a task request submitted by clientActor, it parses the request parameters, encapsulates them into Application, persists them, and then adds them to the task queue waitingApps
9) when it is our turn to submit a task to run, we start to call schedule () to schedule task resources
10), master encapsulates the scheduled resources into launchExecutor and sends them to the specified worker
11) when worker receives the launchExecutor sent by Maseter, it decompresses it and encapsulates it into ExecutorRunner, and then calls the object's start () to start Executor
12), Executor will reverse register with DriverActor after startup
13), driverActor will send a message of successful registration to Executor
14) after receiving the message that DriverActor registration is successful, Executor will create a thread pool to execute the task task sent by DriverActor
15) when all the Executor belonging to the task are started and registered in reverse, it means that the environment in which the task is running is ready, and the driver ends the initialization of the SparkContext object, which means that the new SparkContext code is finished
16) after initializing the sc successfully, the driver side will continue to run the code we wrote, then start to create the initial RDD, and then perform a series of conversion operations, which means that a job is triggered when an action operator is encountered
17), driver will submit this job to DAGScheduler
18), DAGScheduler deduces the received job from the last operator, divides the DAG into a stage according to the wide dependency, then encapsulates the stage into a taskSet, and submits the task in the taskSet to DriverActor
19), DriverActor receives the task sent by DAGScheduler, gets a serializer, serializes the task, encapsulates the serialized task into launchTask, and then sends the launchTask to the specified Executor
20) when Executor receives the launchTask sent by DriverActor, it gets a deserializer, deserializes the launchTask, encapsulates it into TaskRunner, and then gets a thread from the thread pool of Executor, and acts the operators in the deserialized task on the partition corresponding to RDD.
[note]
There are two types of tasks for Spark:
A, previous tasks of shuffleMapTask:shuffle
B, tasks after resultTask:shuffle
The nature of the Spark task:
Split the dependency of RDD into a stage, and then send stage to Executor as TaskSet batches for execution.
XIII. Checkpoint
1. Scenarios where checkpoint is used:
A RDD will be referenced many times, and the calculation is very complex and time-consuming.
It is worried that some key RDD, which will be used repeatedly later, may lead to the loss of persistent data because of node failure.
2. How to checkpoint RDD?
1). Set the restore point directory and checkpoint directory
2) call the checkpoint of RDD to checkpoint the RDD
3. The principle of checkpoint
1) after RDD calls the checkpoint method, it accepts the management of the RDDCheckpointData object
2) the RDDCheckpointData object is responsible for setting the state of the RDD that calls checkpoint to MarkedForCheckpoint
3) when the job where the RDD is located is finished, the doCheckpoint of the last RDD will be called to look up according to its pedigree to find the RDD marked as MarkedForCheckpoint, and change its state to checkpointingInProgress
4) start a separate job to checkpoint the RDD marked checkpointingInProgress in the pedigree, that is, write the data of RDD to the directory of checkpoint
5) when a node fails, resulting in the loss of all data, including persistence, the data of each partition of RDD will be restored from the restore point directory, so that there is no need to calculate from scratch
4. Checkpoint needs to pay attention to.
Because when RDD does checkpoint, it will launch a separate job to recalculate the RDD that needs to be checkpoint, which will increase the running time of spark job, so spark strongly recommends that the RDD that needs to be checkpoint should be persisted (that is, called .cache) before doing checkpoint.
5. The difference between checkpoint and persistence
1) whether or not to change the bloodline:
Persistence (.cache): does not change the dependency of RDD, that is, does not change its pedigree
Checkpoint: changes the pedigree of RDD, RDD with checkpoint clears all dependencies, sets its parent RDD force to checkpointRDD, and changes the state of RDD to checkpointed
2) Reliability of RDD data:
Persistence: just persist the RDD data to memory or disk, but if the node fails, the persisted data will still be lost
The data of Checkpoint:checkpoint is saved in a highly reliable third-party distributed file system, and the machine test node fails and the data will not be lost, so checkpoint is more reliable than persistence.
6. Follow-up
After we have implemented checkpoint, when a task calls the iterator () method of the RDD, we implement a high fault tolerance mechanism. Even if the persistent data of the RDD is lost, or there is no persistence at all, we can still use the readCheckpointOrComputer () method to first read the data from the parent RDD-checkpointRDD, HDFS (external file system).
Part II spark-sql
1. Spark-SQL 's previous life and present life
1. The characteristics of Spark SQL
1) support multiple data sources: Hive, RDD, Parquet, JSON, JDBC, etc.
2), a variety of performance optimization techniques: in-memory columnar storage, byte-code generation, cost model dynamic evaluation and so on.
3) component extensibility: for SQL's syntax parser, parser and optimizer, users can redevelop and dynamically extend them.
2. Brief introduction of Spark SQL performance optimization technology.
1), memory column storage (in-memory columnar storage)
2), bytecode generation technology (byte-code generation)
3), optimization of Scala code writing
3 、 Spark SQL and DataFrame
Spark SQL is a module in Spark, which is mainly used to deal with structured data. The core programming abstraction it provides is DataFrame. At the same time, Spark SQL can be used as a distributed SQL query engine. One of the most important functions of Spark SQL is to query data from Hive.
DataFrame can be understood as a distributed collection of data organized in columns. It is actually very similar to tables in relational databases, but a lot of optimizations are made at the bottom. DataFrame can be built from many sources, including structured data files, tables in Hive, external relational databases, and RDD.
II. The use of Spark-sql
1. Convert RDD to DataFrame (two)
1) use reflection to infer the metadata of an RDD containing a specific data type
2) create DataFrame through programming interface
2. UDF custom function and UDAF custom aggregate function
UDF, in fact, is more for single-line input and returns an output
UDAF, the aggregate calculation can be carried out for multiple lines of input, and an output is returned, which is more powerful
3. Working principle of Spark-SQL
SqlParse-> parser
Analyser-> Analyzer
Optimizer-> optimizer
SparkPlan-> Physics Plan
Process:
1) the SQL statement written by yourself
You should know that as long as the database type of technology, such as: the most traditional MySQL,Oracle, including the current big data field data warehouse, such as hive, his basic SQL execution model, are similar, first of all to generate an SQL statement execution plan
2). Generate unparsed logical plan (unresolved LogicalPlan) through SqlParser (parser)
3) generate the parsed logical plan (resolved LogicalPlan) through Analyzer (Analyzer)
4) generate the optimized logical plan (optimized LogicalPlan) through Optimizer (optimizer)
In fact, databases such as traditional Oracle usually generate multiple execution plans, and then there is an optimizer to select the best plan for multiple plans. The optimization here in SparkSql refers to, for example, the performance of some areas in the newly generated execution plan is obvious and not very good, for example:
For example, we have a SQL statement, select name from (select... From.) Where.. =.
At this time, when the execution plan is parsed, it is actually parsed into an executable plan according to its original appearance. However, Optimizer will actually optimize the execution plan here. For example, it is found that the where condition can actually be placed in the subquery. In this way, the number of subqueries becomes much smaller and the execution speed can be optimized. It might look like this: select name from (select name from... where.. =..)
5) generate the final physical plan (PhysicalPlan) through SparkPlan
When it comes to the physics plan, it is actually a very "approachable" plan. That is to say, it is already clear what data to read from those files, how to read them, how to associate them, and so on.
6) execute the physical plan in executor
The execution plan of logic is more inclined to logic, for example, it is roughly like this.
From table students= > filter... = > select name...
Here, basically, logical plans use Tree, a tree structure.
7), generate RDD
Select name from students = > parsing, where to query, students table, in which file, which data is queried from which file, such as name column, in addition, complex SQL, and, for example, whether to filter and filter the data in the table when querying, not to mention, when complex, you need to have multi-table JOIN (in my traditional database, such as MySQL, the execution plan also involves how to scan and utilize the index)
4. Spark-SQL performance optimization.
1). Set the parallelism of the shuffle process: spark.sql.shuffle.partitions (SQLContext.setConf ())
2) in the process of building a hive data warehouse, set the data type reasonably, for example, if it can be set to int, do not set it to bigInt to reduce the unnecessary memory overhead caused by the data type.
3) when writing SQL, try to give a clear column name, such as select name from students. Don't write select * the way.
4) parallel processing of query results: for spark-SQL query results, if the amount of data is relatively large, such as more than 1000 pieces, then do not reprocess the query results from collect () to driver at one time, and use the foreach () operator to process the query results in parallel.
5) caching tables: for tables that may be used by a SQL statement, you can cache them using sqlContext.cacheTable (tableName) or DataFrame.cache (). Spark-SQL will cache the tables in the format stored in memory columns, and then spark-sql can scan only the columns you need and automatically optimize compression to minimize memory usage and GC overhead. SQLContext.uncacheTable (tableName) can remove tables from the cache. Use SQLContext. SetConf (), sets the spark.sql.inMemoryColumnarStorage.batchSize parameter (default 10000), and sets the unit of column storage
6). Broadcast join table: spark.sql.autoBroadcastJoinThreshold. Default is 10485760 (10 MB). If there is enough memory, you can increase its size. The parameter sets the maximum size of a table in join, which can be broadcast to optimize performance.
5. Hive on Spark configuration
1) secure transfer and configure Hive and Spark
2), Set hive.execution.engine=spark
3), set spark.master=spark://mini1:7077
Part III spark-streaming
1, Dstream
Dstream is the data model of sparkStreaming, which is essentially a series of continuous RDD, but it is a RDD of a period of time. The RDD of these periods is constantly connected.
This time can be set by yourself, the shorter the time setting, the higher the real-time performance, but the greater the performance consumption.
2. What are the ways in which spark streaming can obtain data from kafka?
There are two ways:
1. Through the receiver way
2. Through the direct way, the dirrect way needs to manage the offset itself.
3, the difference between sparkStreaming and storm
SparkStreaming is a component of spark that does streaming quasi-real-time computing. The data structure it uses is rdd with a series of time slices in Dstream,Dstream.
Compared with storm,sparkStreaming in real-time and ensuring no data loss, the advantage of spark streaming in the eyes of spark supporters is that sparkStreaming has high throughput. In essence, the advantage of sparkStreaming over storm is that sparkStreaming can be seamlessly integrated with spark core,spark SQL.
4. For those that need to be referenced many times, and the dstream computing time is particularly time-consuming, and the data is particularly important, then we need to checkpoint the dstream (only multiple references can be persisted), because even if the dstream is persisted, the data may be lost, and the possibility of checkpoint data loss is small, but this will affect the data throughput of spark-streaming, because at the same time of calculation. Data also needs to be written to an external storage system, which can degrade spark performance and affect throughput. It is not recommended when not necessary.
5. How to checkpoint dstream
First set the restore point directory, and then call the checkpoint method of dstream
[note]: the checkpoint cycle of dstream must produce an integral multiple of the batch time, and the spark official recommends that the checkpoint time be set to at least 10 seconds. In general, the checkpoint interval is set to 5-10 times the sliding interval for window operations
When the 6.spark program starts, it will go to this checkpointPath directory to see if there is any saved driver metadata (1.dstream operation conversion relationship, 2. Unprocessed batch) information, when the spark-streaming program starts for the second time, it will go to the checkpointPath directory to restore the program, load the unprocessed batch metadata information and recover it in memory, and continue the task processing.
7. In order to ensure that the spark-streaming program runs 24 hours a day, then our program should have high reliability, how to have high reliability?
a. The program failed and the driver died. The streaming program should have the function of automatic restart.
b. If the rdd is not completed, after the program stops abnormally, the unprocessed rdd will be processed after the next startup.
[note]: to add the-- deploy-mode parameter to spark_submit, the default value is client, that is, to start driver on the machine submitting the application, but to be able to restart driver automatically, you must set its value to cluster;. In addition, you need to add the-- supervise parameter, and restart automatically after failure.
/ / spark_submit-- executor-memory 1g-- total-execute-cores 5-- deploy-model cluster-- supervise
8. Enable the pre-write mechanism
a. The pre-writing logging mechanism, abbreviated as WAL and full name Write Ahead Log, has introduced the WAL mechanism based on fault-tolerant file system since the spark1.2 version. If this mechanism is enabled, all data received by Receiver is written to the write-ahead log in the configured checkpoint directory. This mechanism allows driver to avoid data loss during recovery, and ensures zero data loss during the whole real-time computing process.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.