Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the basic knowledge points of Spark

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "what are the basic knowledge points of Spark". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what are the basic knowledge points of Spark"?

Basic Spark features of Spark

Spark is written in a concise and elegant Scala language, provides an interactive programming experience based on Scala, and provides a variety of easy-to-use API. Following the design concept of "one software stack meets different application scenarios", Spark has gradually formed a complete ecosystem (including memory computing framework provided by Spark, SQL impromptu query (Spark SQL), streaming computing (Spark Streaming), machine learning (MLlib), graphical computing (Graph X), etc.). Spark can be deployed on yarn Explorer to provide one-stop big data solution. Batch, streaming, and interactive queries can be supported at the same time.

MapReduce computing model has high latency and can not meet the needs of real-time and fast computing, so it is only suitable for offline scenarios. Spark draws lessons from MapReduce computing model, but has the following advantages (fast, easy to use, comprehensive):

Spark provides more operation types of data sets, and the programming model is more flexible than MapReduce; Spark provides in-memory calculation and puts the calculation results directly in memory, which reduces the IO overhead of iterative calculation and has more efficient operation efficiency. Spark based on DAG task scheduling and execution mechanism, iterative efficiency is higher; in actual development, MapReduce needs to write a lot of low-level code, which is not efficient enough. Spark provides a variety of high-level, concise API applications to achieve the same function, and the amount of code is much less than MapReduce.

As a computing framework, Spark only replaces the MapReduce computing framework in the Hadoop ecosystem. It requires HDFS to achieve distributed data storage. Other components of Hadoop still play an important role in the enterprise big data system.

Shortcomings of Spark: although Spark is very fast, it is still not satisfactory in the production environment, regardless of scalability, stability, management and other aspects need to be further enhanced; at the same time, the capacity of Spark in the field of stream processing is limited, if you want to achieve subsecond or large-capacity data acquisition or processing, you need other streaming products.

In view of the fact that Cloudera aims to make Spark streaming data technology suitable for 80% of use cases, in real-time analysis (rather than simple data filtering or distribution) scenarios, many implementations that previously used streaming engines such as S4 or Storm have been gradually replaced by Kafka+Spark Streaming

The popularity of Hadoop divided into three pieces of HDFS/MR/YARN,Spark will gradually bring MapReduce and Tez into the museum; Spark is only a computing engine with better performance than MR, but its storage and scheduling framework still depends on HDFS/YARN,Spark and has its own scheduling framework, but it is immature and basically uncommercially available.

Spark deployment (on Yarn)

YARN implements a cluster with multiple frameworks, that is, deploy a unified resource scheduling management framework on a cluster, and deploy various other computing frameworks. YARN provides unified resource scheduling and management services for these computing frameworks, and can adjust their resources according to the load needs of various computing frameworks, so as to achieve cluster resource sharing and resource flexibility.

In addition, YARN implements the mashup of different application loads on the cluster, which effectively improves the utilization of the cluster, and different computing frameworks can share the underlying storage, avoiding the movement of data sets across clusters.

Here, Spark on Yarn mode is used for deployment. To configure on yarn mode, you only need to modify the configuration very little, and you do not need to use the command to start spark cluster. When you need to submit a task, you must specify it on the yarn.

Spark requires Scala language to run. You must download Scala and Spark and extract them to the home directory, set the current user's environment variable (~ / .bash_profile), increase the SCALA_HOME and SPARK_HOME path, and take immediate effect. Launch the scala command and spark-shell command to verify whether it is successful. The configuration file modification of Spark is difficult to understand according to the official website tutorial. The configuration completed here is referred to blog and debugging.

Spark needs to modify two configuration files: spark-env.sh and spark-default.conf. The former needs to specify the configuration file path and Spark.master.host address of Hadoop's hdfs and yarn, while the latter needs to specify the jar packet address.

The spark-env.sh configuration file is modified as follows:

Export JAVA_HOME=/home/stream/jdk1.8.0_144

Export SCALA_HOME=/home/stream/scala-2.11.12

Export HADOOP_HOME=/home/stream/hadoop-3.0.3

Export HADOOP_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop

Export YARN_CONF_DIR=/home/stream/hadoop-3.0.3/etc/hadoop

Export SPARK_MASTER_HOST=xx

Export SPARK_LOCAL_IP=xxx

The spark-default.conf configuration is modified as follows:

/ / add jar packet address

Spark.yarn.jars=hdfs://1xxx/spark_jars/*

This setting indicates that if the jar address is defined on hdfs, all jar packages under the ~ / spark/jars path must be uploaded to the / spark_jars/ path of hdfs (hadoop hdfs-put ~ / spark/jars/*), otherwise an error will be reported and the error of compiling jar package cannot be found.

Spark startup and verification

Start. / spark-shell directly without parameters, running in local mode:

Start. / spark-shell-master yarn, running on yarn mode, provided that yarn is successfully configured and available:

Create a file README.md in the hdfs file system and read it into RDD. Using the parameter conversion that comes with RDD, RDD defaults to one value per behavior:

Use. / spark-shell-- master yarn to start spark and run the command: val textFile=sc.textFile ("README.md") reads the README.md file on hdfs to RDD, and uses the built-in function to test the following, indicating that the spark on yarn configuration is successful.

common problem

When starting spark-shell, it is reported that the maximum allocated memory configured in Yarn-site.xml is insufficient. Increase this value to 2048m, which will take effect after you restart yarn.

The set hdfs address conflicts. The hdfs-site.xml setting in the hdfs configuration file does not have a port, but the spark.yarn.jars value in spark-default.conf has a port. The configuration address of the modified spark-default.conf is the same as the former:

Basic principles of Spark

In practical application, big data processing mainly includes the following three types: complex batch data processing: usually time span is between tens of minutes to several hours; interactive query based on historical data: usually time span is between tens of seconds to several minutes; data processing based on real-time data flow: usually time span is between hundreds of milliseconds to several seconds.

At the same time, in the above scenarios, multiple components need to be deployed at the same time, such as MapReduce/Impala/Storm, which will inevitably bring some problems: input and output data between different scenarios can not be seamlessly shared, data format conversion is usually needed, and different software requires different development and maintenance teams, which brings high cost. It is difficult to coordinate and allocate resources uniformly among systems in the same cluster.

The design of Spark follows the concept of "one software stack meets different application scenarios", and gradually forms a complete ecosystem, which includes Spark Core, Spark SQL, Spark Streaming (Structured Streaming), MLLib and GraphX components, which can not only provide memory computing framework, but also support SQL impromptu query, real-time streaming computing, machine learning and graph computing.

And Spark can be deployed on top of the resource manager YARN to provide an one-stop big data solution; therefore, the ecosystem provided by Spark is sufficient to cope with the above three scenarios, namely batch processing, interactive query and streaming data processing.

Spark concept / architecture design

RDD: short for Resilient Distributed Dataset (Elastic distributed data set), it is an abstract concept of distributed memory and provides a highly restricted shared memory model.

DAG: the abbreviation of Directed Acyclic Graph (directed acyclic graph), which reflects the dependency relationship between RDD.

Executor: a process running on the work node (WorkerNode), responsible for running Task

Application (Application): Spark applications written by users

Task (Task): unit of work running on Executor

Job: a job contains multiple RDD and various operations acting on the corresponding RDD

Stage: the basic scheduling unit of a job. A job is divided into multiple groups of tasks, each of which is called a phase, or a task set, which represents a set of tasks that are related and do not have Shuffle dependencies on each other.

The Spark running architecture includes a cluster resource manager (Cluster Manager), a work node that runs job tasks (Worker Node), a task control node (Driver) for each application, and an execution process (Executor) for specific tasks on each work node. Resource managers can bring their own or use Mesos/YARN

An application consists of a Driver and several jobs, a job consists of multiple phases, and a phase consists of multiple tasks without Shuffle relationships.

When executing an application, Driver requests resources from the cluster manager, starts Executor, sends application code and files to Executor, and then executes tasks on Executor. After running, the execution results are returned to Driver or written to HDFS or other databases.

Spark running process

The SparkContext object represents a connection to a cluster:

(1) first, build a basic running environment for the application, that is, a SparkContext is created by Driver to apply for resources, assign tasks and monitor.

(2) Resource manager allocates resources to Executor and starts the Executor process

(3) SparkContext builds the DAG diagram according to the dependency relationship of RDD, and the DAG diagram is submitted to DAGScheduler to be parsed into Stage, and then each TaskSet is submitted to the underlying scheduler TaskScheduler for processing; Executor applies to SparkContext for Task,Task Scheduler to issue Task to Executor to run, and provides the application code

(4) Task runs on Executor, and the execution result is fed back to TaskScheduler, and then to DAGScheduler. After running, data is written and all resources are released.

Spark RDDRDD concepts / features

Many iterative algorithms (such as machine learning, graph algorithm, etc.) and interactive data mining tools have in common that intermediate results are reused between different computing phases. MapReduce framework writes intermediate results to stable storage (such as disk), which brings a lot of data replication, disk IO and serialization overhead.

RDD emerged to meet this demand. It provides an abstract data architecture. Developers do not have to worry about the distributed characteristics of the underlying data, but only need to express the specific application logic as a series of conversion processes. The conversion operations between different RDD form dependencies, which can be pipelined and avoid intermediate data storage. A RDD is a collection of distributed objects, which is essentially a read-only set of partition records. Each RDD can be divided into multiple partitions, each partition is a data set fragment, and different partitions of a RDD can be saved to different nodes in the cluster, so that parallel computing can be carried out on different nodes in the cluster.

RDD provides a highly constrained shared memory model, that is, a RDD is a collection of read-only record partitions that cannot be modified directly, but can only be created based on datasets in stable physical storage, or a new RDD can be created by performing certain transformation operations on other RDD, such as map, join, and group by.

RDD provides a wealth of operations to support common data operations, divided into two types: "Transformation" and "Action". The conversion interfaces provided by RDD are very simple, which are coarse-grained data conversion operations such as map, filter, groupBy, join, etc., rather than fine-grained modifications to a data item (not suitable for web crawlers). On the surface, the function of RDD is very limited and not powerful enough. in fact, RDD has been proved to be efficient in expressing the programming models of many frameworks (such as MapReduce, SQL, Pregel). Spark implements the API of RDD in Scala language, and programmers can implement various operations on RDD by calling API.

The typical execution process of RDD is as follows, and this series of processing is called a Lineage (consanguinity), that is, the result of DAG topological sorting:

RDD is read into an external data source to create; RDD goes through a series of Transformation operations, each of which produces a different RDD for the next conversion operation; the last RDD is converted through an "action" operation and output to an external data source: lazy call, pipelining, avoiding synchronous waiting, no need to save intermediate results, and simple operation

The main reasons why Spark can achieve efficient computing after adopting RDD are:

(1) High fault tolerance: consanguinity, recalculating lost partitions, no need to roll back the system, the recalculation process is parallel between different nodes, and only coarse-grained operations are recorded

(2) the intermediate result is persisted to memory: data is transferred between multiple RDD operations in memory, avoiding unnecessary disk overhead of reading and writing.

(3) the data stored are Java objects: avoid unnecessary object serialization and deserialization

RDD dependency relationship

Spark generates DAG by analyzing the dependency relationship of each RDD, and divides a job into several stages according to the RDD dependency relationship. The stage division is based on narrow dependency and wide dependency. Narrow dependency can achieve pipeline optimization, wide dependency includes Shuffle process, and can not be pipelined.

The narrow dependency shows that the partition of a parent RDD corresponds to the partition of a child RDD or the partition of multiple parent RDD corresponds to the partition of a child RDD, while the wide dependency shows that there is a partition of a parent RDD corresponding to multiple partitions of a child RDD.

Logically, each RDD operation is a fork/join (a framework for executing tasks in parallel), computing fork to each RDD partition, performing join operations on the results of each partition after the calculation is completed, and then fork/join the next RDD operation

RDD Stage partition: Spark generates DAG by analyzing the dependency relationship of each RDD, and then determines how to partition Stage by analyzing the dependency relationship between partitions in each RDD. Specific method:

Perform reverse parsing in DAG, break if you encounter wide dependencies, add the current RDD to Stage when you encounter narrow dependencies, and divide narrow dependencies into the same Stage as far as possible, you can realize pipelined computation. RDD running process

Through the above introduction of RDD concept, dependency and Stage partition, and combined with the basic process of Spark operation, this paper summarizes the running process of RDD in Spark architecture:

(1) create a RDD object

(2) SparkContext is responsible for calculating the dependency relationship between RDD and constructing DAG.

(3) DAG Scheduler is responsible for decomposing the DAG graph into multiple Stage. Each Stage contains multiple Task, and each Task is distributed by the TaskScheduler to the Executor on each WorkerNode for execution.

RDD creation

RDD can be created by loading data from the file system, or by creating a RDD through parallel collections (arrays). Spark uses the textFile () method to load data from the file system to create a RDD, which takes the URI of the file as a parameter, and this URI can be the address of the local file system or the address of the distributed file system HDFS

Load data from the file system:

Scala > val lines = sc.textFile ("file:///usr/local/spark/mycode/rdd/word.txt")

Load data from HDFS:

Scala > val lines = sc.textFile ("hdfs://localhost:9000/user/hadoop/word.txt")

You can call the parallelize method of SparkContext to create it on a collection (array) that already exists in Driver.

Scala > val array = Array (1, 2, 3, 4, 5)

Scala > val rdd = sc.parallelize (array)

Or create from the list:

Scala > val list = List (1, 2, 3, 4, 5)

Scala > val rdd = sc.parallelize (list) RDD operation

For RDD, each conversion operation will produce a different RDD, which will be used for the next "conversion". The converted RDD is lazily evaluated, that is to say, the whole conversion process only records the trajectory of the conversion, and no real calculation will take place. Only when there is an action operation, the real calculation will occur and the physical conversion operation will begin from the source of the consanguinity.

Common RDD conversion operations are summarized as follows:

Filter (func) operation: filter out the elements that satisfy the function func and return a new dataset

Scala > val lines = sc.textFile (file:///usr/local/spark/mycode/rdd/word.txt)

Scala > val linesWithSpark=lines.filter (line = > line.contains ("Spark"))

Map (func) operation: the map (func) operation passes each element to the function func and returns the result as a new dataset

Scala > data=Array (1, 2, 3, 4, 5)

Scala > val rdd1= sc.parallelize (data)

Scala > val rdd2=rdd1.map (x = > xx 10)

Another example:

Scala > val lines = sc.textFile ("file:///usr/local/spark/mycode/rdd/word.txt")

Scala > val words=lines.map (line = > line.split (""))

FlatMap (func) operation: flattening operation

Scala > val lines = sc.textFile ("file:///usr/local/spark/mycode/rdd/word.txt")

Scala > val words=lines.flatMap (line = > line.split (""))

GroupByKey () operation: returns a new dataset in the form of (K, Iterable) when applied to the dataset of a key-value pair.

ReduceByKey (func) operation: the dataset applied to the (KMagneV) key-value pair returns a new (KMagneV) form dataset, where each value is the result of aggregating each key passed into the function func:

The action action is where the calculation is really triggered. When the Spark program executes the action, it will perform the real calculation. This is the inertia mechanism. The "inertia mechanism" means that the whole conversion process only records the trajectory of the conversion, and the real calculation will not occur. Only when the action is encountered, the real calculation "from beginning to end" will be triggered, the common action operation:

RDD persistence

Spark RDD uses the lazy evaluation mechanism, but every time an action is encountered, the calculation will be performed from scratch, and each call to the action will trigger a calculation from scratch, which is very expensive for iterative calculation, which often requires the repeated use of the same set of data:

Scala > val list = List ("Hadoop", "Spark", "Hive")

Scala > val rdd = sc.parallelize (list)

Scala > println (rdd.count ()) / / Action action, triggering a real calculation from beginning to end

Scala > println (rdd.collect (). MkString (",")) / / Action action to trigger a real calculation from beginning to end

The overhead of double calculation can be avoided through the persistence (cache) mechanism. The persist () method can be used to mark a RDD as persistent. The reason for saying "marking as persistent" is that where the persist () statement appears, the generated RDD will not be calculated and persisted immediately, but the result will not be persisted until the first action triggers the real calculation. The persisted RDD will be retained in the memory of the compute node and reused by subsequent actions.

The parentheses of persist () contain the persistence level parameter. Persist (MEMORY_ONLY) means that RDD is stored in JVM as a deserialized object. If memory is insufficient, the contents of the cache should be replaced according to LRU principle. Persist (MEMORY_AND_DISK) means that RDD is stored in JVM as deserialized object. If memory is insufficient, excess partitions will be stored on the hard disk. In general, persist (MEMORY_ONLY) is called when the cache () method is used, and the persisted RDD can be manually removed from the cache using the unpersist () method.

For the above example, the execution process after adding the persistence statement is as follows:

Scala > val list = List ("Hadoop", "Spark", "Hive")

Scala > val rdd = sc.parallelize (list)

Scala > rdd.cache () / / calls persist (MEMORY_ONLY), but rdd is not cached when the statement is executed here, because rdd has not yet been calculated and generated.

Scala > println (rdd.count ()) / / the first action triggers a real calculation from beginning to end. Only then will the above rdd.cache () be executed, and put the rdd in the cache.

Scala > println (rdd.collect (). MkString (",")) / / the second action does not need to trigger the calculation from beginning to end, but only needs to reuse the rdd RDD partition in the above cache

RDD is a flexible distributed data set, usually the RDD is very large, will be divided into many partitions stored on different nodes, the role of partitions: (1) increase parallelism (2) reduce communication overhead. The principle of RDD partitioning is to make the number of partitions equal to the number of CPU cores (core) in the cluster as far as possible. For different Spark deployment modes (local mode, Standalone mode, YARN mode, Mesos mode), you can configure the default number of partitions by setting the value of the parameter spark.default.parallelism. Generally speaking:

Local mode: the default is the number of CPU of the local machine. If local [N] is set, it defaults to N

Standalone or YARN: take the larger value as the default value between "Total number of all CPU cores in the cluster" and 2.

There are two ways to set the number of partitions: manually specify the number of partitions when creating a RDD, and reset the number of partitions using the reparititon method

Manually specify the number of partitions when creating a RDD: you can manually specify the number of partitions when calling the textFile () and parallelize () methods. The syntax format is sc.textFile (path, partitionNum), where the path parameter is used to specify the address of the file to be loaded, and the partitionNum parameter is used to specify the number of partitions.

Scala > val array = Array (1, 2, 3, 4, 5)

Scala > val rdd = sc.parallelize (array,2) / / set two partitions

The reparititon method resets the number of partitions: when you get a new RDD through the conversion operation, you can directly call the repartition method, such as:

Scala > val data = sc.textFile ("file:///usr/local/spark/mycode/rdd/word.txt",2)

Scala > data.partitions.size / / displays the number of partitions of the RDD data

Scala > val rdd = data.repartition (1) / / A pair of data this RDD is repartitioned

Scala > rdd.partitions.size

Res4: Int = 1

Spark-shell batch processing

After completing the Spark deployment, use the spark-shell instruction to enter the Scala interactive programming interface. Spark-shell creates a sparkContext (sc) by default. When spark-shell starts, you can check whether the running mode is on yarn or local mode. Using the interactive interface, you can directly reference the sc variable to use.

Use Spark-shell to process data example: read files in HDFS file system to achieve WordCount word count:

Sc.textFile ("hdfs://172.22.241.183:8020/user/spark/yzg_test.txt"). FlatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _). Collect ()

Where map ((_, 1)) is equivalent to map (x = > (x, 1))

Use the saveAsText File () function to save the results to the file system.

Scala and functional programming

Spark is written in Scala language. In development, you need to be familiar with functional programming ideas and be proficient in using Scala language. The amount of code developed by Spark with Scala is much less than that developed by Java.

Functional programming features

Functional programming is a kind of declarative programming, which describes calculation as the evaluation of mathematical function, but functional programming has no accurate definition, just a series of ideas, and does not need to be strictly guarded. It can be understood as functional programming regards the program as a mathematical function, input independent variables, output dependent variables, through the expression to complete the calculation, more and more imperative languages support some functional programming characteristics.

In functional programming, the function as a first-class citizen, that is to say, the behavior of the function is no different from that of ordinary variables, it can be passed as a function parameter, or a function can be declared inside the function, then the outer function is called high-order function.

Curryization of functional programming: transforms a function that accepts multiple parameters into a function that accepts a single parameter, and returns a new function that accepts the remaining parameters and returns the result.

Functional programming requires all variables to be constant (the word variable used here is not accurate, just for ease of understanding). Erlang is a typical language, and although many languages support the features of partial functional programming, variables are not required to be constant. This feature increases the complexity of programming, but makes the code free of side effects, and brings a great benefit of greatly simplifying concurrent programming.

The most commonly used concurrency mode in Java is the shared memory model, which depends on threads and locks. If the code is not written properly, deadlocks and competition conditions will occur, and with the increase of the number of threads, it will take up a lot of system resources. In functional programming, there is no need to consider situations such as deadlocks because they are all constants. Why does an assignment increase the complexity of programming? since all variables are constant, we can't change the value of a variable, and loops don't make much sense, so recursion is used instead of loops in haskell and erlang.

Scala syntax

Scala, namely Scalable language (Scalable Language), is a multi-paradigm programming language, similar to java programming. It is designed to integrate the features of object-oriented programming and functional programming.

Scala function status: first-class citizen

In Scala, a function is a first-class citizen, like a variable, it can be used as a function parameter or assign a function to a variable; and the creation of a function does not depend on a class, or object, but on a class, abstract class or interface in Java. There are two definitions of the Scala function:

Scala's function definition is normalized, and the last line of code is its return value:

The simplified function definition can have only one line:

You can also directly use val to define the function as a variable, indicating the definition function addInt. There are two input parameters, both of which are of type Int, and the return value is the sum of the two, and the type Int:

Scala anonymous function (function numeric face quantity)

Scala anonymous function, also known as the function number area, can be used as a function parameter, can also be assigned to a variable, in the definition of anonymous function "= >" can be understood as a converter, it uses the right side of the algorithm, the left input data into the new output data, the use of anonymous functions, our code becomes more concise.

Val test = (x:Int) = > x + 1 Scala higher order function

Scala uses the term "higher order function" to refer to methods and functions that take a function as a parameter or a function as a return result. For example, there are common functions such as map,filter,reduce, which can take a function as an argument.

Scala closure

The closure in Scala means that when the variable of the function exceeds its valid scope, it can also access the variables within the function; the closure in Scala captures the variable itself rather than just the value of the variable, and when the free variable changes, the closure in Scala can capture this change; if the free variable changes inside the closure, it will also reflect the value of the free variable defined outside the function.

Scala partial application function

Some application functions only provide some default parameters on the basis of "existing functions", and use underscores instead of the default parameters to create a "function value". When using this function value (part of the application function), you only need to provide the parameters corresponding to the underscore part. Part of the application function is essentially an expression of a value type, and you don't need to provide all the parameters when using it, only some of them.

Scala Corialization function

Corialization in scala refers to the process of changing a function that takes two parameters into a new function that accepts one parameter. The new function returns a function that takes the second parameter as the parameter.

Def someAction (f: (Double) = > Double) = f (10)

As long as the function argument is a double and the return value is a double, the function can be used as a f value.

Spark SQLShark and Spark SQL

With the emergence of Shark, the performance of SQL-on-Hadoop is 10-100 times higher than that of Hive, but the design of Shark leads to two problems:

First, the execution plan optimization is completely dependent on Hive, and it is not convenient to add new optimization strategies. Second, because Spark is thread-level parallelism, while MapReduce is process-level parallelism, Spark has thread safety problems in the implementation of compatibility with Hive, resulting in Shark having to use another set of independently maintained patched Hive source code branches.

At the Hive compatibility level, Spark SQL only relies on HiveQL parsing and Hive metadata, that is, from the time HQL is parsed into an abstract grammar tree (AST), it is all taken over by Spark SQL. Catalyst (functional Relational query Optimization Framework) is responsible for the generation and optimization of Spark SQL execution plans.

DataFrame and RDD

Spark SQL adds DataFrame (RDD with Schema information) to enable users to execute SQL statements in Spark SQL. The data can come from RDD, external data sources such as Hive, HDFS, Cassandra, or JSON format. Spark SQL currently supports Scala, Java, Python and SQL-92 specifications.

With the introduction of DataFrame, Spark has the ability to deal with large-scale structured data, which is not only easier to use than the original RDD transformation, but also achieves higher computing performance; Spark can easily transform from MySQL to DataFrame and supports SQL queries; RDD is a collection of distributed Java objects, but the internal structure of objects is unknown to RDD. DataFrame is a distributed data set based on RDD, which provides detailed structural information.

RDD is like a room, looking for something in this room can only be found; DataFrame is equivalent to putting shelves in your house, just tell him which shelf you are in, DataFrame is adding columns on the basis of RDD, processing data is like dealing with a two-dimensional table.

The main difference between DataFrame and RDD is that the former carries schema meta-information, that is, each column of a two-dimensional table dataset represented by DataFrame has a name and type. This enables Spark SQL to gain insight into more structural information, so as to optimize the data sources hidden behind DataFrame and the transformations acting on DataFrame, and finally achieve the goal of greatly improving runtime efficiency. In contrast, RDD, because there is no way to know the specific internal structure of the stored data elements, Spark Core can only do simple and general pipeline optimization at the stage level.

The creation of DataFrame

Starting from the Spark2.0 version, Spark uses the brand-new SparkSession interface to replace the SQLContext and HiveContext interface in Spark1.6 to realize the functions of data loading, conversion, processing and so on. SparkSession implements all the functions of SQLContext and HiveContext

SparkSession supports loading data from different data sources and converting the data into DataFrame, transforming DataFrame into tables in SQLContext itself, and then using SQL statements to manipulate the data. SparkSession also provides support for HiveQL and other Hive-dependent features; you can create a SparkSession object with the following statement:

Scala > import org.apache.spark.sql.SparkSession

Scala > val spark=SparkSession.builder (). GetOrCreate ()

Before creating DataFrame, in order to support the conversion of RDD to DataFrame and subsequent SQL operations, you need to import the corresponding package through the import statement (that is, import spark.implicits._) to enable implicit conversion.

When creating DataFrame, you can use spark.read operation to load data from different types of files to create DataFrame, such as: spark.read.json ("people.json"): read people.json file to create DataFrame; when reading local file or HDFS file, pay attention to giving the correct file path; spark.read.csv ("people.csv"): read people.csv file to create DataFrame

Read the json file on hdfs and print it. The json file is:

{"name": "Michael"}

{"name": "Andy", "age": 30}

{"name": "Justin", "age": 19}

Read the substitution code:

Import org.apache.spark.sql.SparkSession

Val spark=SparkSession.builder () .getOrCreate ()

Import spark.implicits._

Val df = spark.read.json ("hdfs://172.22.241.183:8020/user/spark/json_sparksql.json")

Df.show () RDD convert DataFrame

The Spark official website provides two ways to convert RDD to DataFrame: ① uses reflection to infer the schema of RDD containing specific types of objects, which is suitable for RDD conversion of known data structures; ② uses a programming interface to construct a schema and apply it to a known RDD

Spark-sql impromptu query

There are two states of SparkSQL's metadata: ① in_memory, which is lost even after using it; ② is saved through hive, where hive's metadata is stored, its metadata also exists. SparkSQL data warehouse is built on Hive, and when you use SparkSQL to build a data warehouse, you must rely on Hive.

The Spark-sql command line provides impromptu query capability and can manipulate data sources in a sql-like way, which is more efficient than hive.

Spark Streaming

Spark Streaming is a real-time processing engine with high throughput and high fault tolerance extended by Spark Core. The biggest difference from Storm is that it can not achieve millisecond computing, while Storm can achieve millisecond response. Spark Streaming is implemented by batch computing, cutting stream to form static data according to time slices, and it is easier to do efficient fault-tolerant processing based on RDD data sets. The input and output data sources of Spark Streaming can be multiple. Spark Streaming reads the data in real time and divides the data into small batches of batch, and then processes the resulting batch result sets in the spark engine. Spark Streaming provides a high-level abstraction called discrete flow, or DStream, which represents continuous data streams. DStreams can be created from input data streams from sources such as Kafka, Flume, and so on, or by applying advanced operations to other DStreams. Internally, DStream is represented as a RDD sequence.

Let's start with an example. StreamingContext is the main entity of all streaming computing. Create a local StreamingContext with two execution threads and a 1-second batch, and then create a Dstream (lines) to listen on the TCP port. Each line in lines is a RDD,flatMap function that decomposes a RDD into multiple records, which is an one-to-many Dstream operation. Here spaces are used to decompose lines into words, and words is mapped into (word, 1) pairs. Then carry on the word frequency statistics, the code of the example is as follows:

Import org.apache.spark._

Import org.apache.spark.streaming._

Val conf = new SparkConf () .setMaster ("local [2]") .setAppName ("NetworkWordCount")

Val ssc = new StreamingContext (conf, Seconds (1))

Val lines = ssc.socketTextStream ("localhost", 9999)

Val words = lines.flatMap (_ .split (""))

Val pairs = words.map (word = > (word, 1))

Val wordCounts = pairs.reduceByKey (_ + _)

WordCounts.print ()

Ssc.start ()

Ssc.awaitTermination () Streaming principle

Spark Streaming provides a high-level abstraction called discrete flow, or DStream, which represents continuous data flows, and internally DStream is represented as RDD sequences, and each RDD contains data at intervals, as shown in the following figure:

All operations on DStream are converted into operations on RDDs accordingly. In the above example, the flatMap operation is applied to each RDD in lines to generate a set of RDD (that is, words)

The basic steps for writing Spark Streaming programs are summarized as follows:

1. Define the input source by creating an input DStream

two。 Define stream computation by applying conversion and output operations to DStream

3. Start receiving data and processing flow with streamingContext.start ()

4. Wait for the processing to finish (manually or because of an error) through the streamingContext.awaitTermination () method

5. You can manually end the flow calculation process through streamingContext.stop ()

StreamingContext

There are two ways to create a StreamingContext: through SparkContext and through SparkConf

Spark conf creation:

Val conf = new SparkConf () .setAppName (appName) .setmaster (master)

Val ssc = new StreamingContext (conf, Seconds (1))

AppName is the name of the application used to display on the Spark UI. Master is the URL of a Spark, Mesos, or Yarn cluster, or local [*]. Batch interval can be set according to the latency requirements of your application and the available cluster resources.

SparkContext creation:

Val sc = new SparkContext (conf)

Val ssc = new StreamingContext (sc, Seconds (1)) enter DStreams and Receiver

In the previous example, lines is the input DStream from the source, and the input DStream corresponds to a receiver object that can receive messages from the source and store them in Spark memory for processing. Spark Streaming provides two streaming sources:

Basic sources: sources of streaming context API can be used directly, such as files and socket; advanced sources: Kafka,Flume sources obtained by referencing additional entity classes; multiple input DStreams can be created in an application to read multiple data streams at the same time. Worker/executor is a persistent task, so it will occupy a core allocated to the application, so Spark Streaming needs to allocate enough core to run the receiver and process the received data.

When running Spark Streaming programs locally, do not use "local" or "local [1]" as the primary URL. Either of these means that only one thread is used to run the task locally. If you use receiver-based input DStream (such as Kafka, Flume, and so on), this means that a single thread will be used to run receiver, leaving no thread to process the received data. So when running locally, always use "local [n]" as the primary URL, where n must be greater than the number of receiver running, otherwise the system will receive data but cannot process it.

Sources such as Kafka and Flume require external dependency packages, and some of these libraries have complex dependencies that are not available in Spark shell, so applications based on these advanced source code cannot be tested in spark-shell, but packages can be introduced manually

Based on the consideration of reliability, data sources can be divided into two categories: reliable receivers send acknowledgements to the source (such as Kafka, Flume) after receiving by Receiver, and store the data in spark, while unreliable receivers do not send acknowledgements to the source.

DStreams conversion

Similar to RDD, transition operations allow you to modify data from the input DStream, including stateless transition operations and stateful transition operations.

Stateless transition operation example: stateless transition is used in the word frequency statistics of "socket flow" in the next section of spark-shell. Each statistics only counts the word frequency of the words arrived in the current batch, which has nothing to do with the previous batch and will not be accumulated.

Examples of stateful transition operations: sliding window transition operation and updateStateByKey operation

Some common conversions are as follows:

Window operation

Each time the window slides on the source DStream, the source RDD within the window is combined / manipulated to generate the window RDD. In the legend, the data of the past three time units will be manipulated and slid by two time units.

Any window operation needs to specify two parameters: the window length: the duration of the window (the value in the figure is 3), and the sliding interval: the interval at which the window operation is performed (the value in the figure is 2). These two parameters must be multiples of the batch interval of the source DStream (the value in the figure is 1)

An example of window operation: you want to extend the previous example by generating words in the last 30 seconds of data every 10 seconds. To do this, we must apply the reduceByKey operation to the DStream key-value pair of (word,1) on the last 30 seconds of data. This is done using the reduceByKeyAndWindow operation.

Val windowedWordCounts = pairs.reduceByKeyAndWindow ((a Seconds Int) = > (a + b), Seconds (30), Seconds (10))

All sliding window operations need to use parameters: windowLength (window length) and slideInterval (sliding interval). Common window operations are summarized as follows, and the corresponding meaning can be found in the conversion operation of RDD:

Window: calculate the new Dstream based on the windowed batch data generated by the source DStream

CountByWindow: returns the sliding window count of elements in DStream

ReduceByWindow: returns a single element stream. Use the function func to aggregate the elements of the stream at sliding intervals to create this single-element flow. The function func must satisfy the association law to support parallel computing.

ReduceByKeyAndWindow (three parameters): when applied to a DStream consisting of a key-value pair, a new DStream consisting of a key-value pair is returned. The value of each key is aggregated by the given reduce function (func function). Note: by default, this operator takes advantage of Spark's default number of concurrent tasks to group. You can specify different tasks by setting the numTasks parameter

ReduceByKeyAndWindow (four parameters): a more efficient reduceByKeyAndWindow than the above reduceByKeyAndWindow (three parameters). The reduction value of each window is calculated incrementally based on the reduction value of the previous window; it performs reduce operations on the new data entering the sliding window and "reverse reduce" on the old data leaving the window. However, it can only be used for "reversible reduce functions", that is, those reduce functions have a corresponding "inverse reduce function" (passed in as an InvFunc argument).

CountByValueAndWindow: when applied to a DStream consisting of a key-value pair, a new DStream consisting of a key-value pair is returned. The value of each key is how often they appear in the sliding window.

UpdateStateByKey: when you need to maintain state across batches, you must use updateStateByKey operations

Multi-stream association

Join operations are very useful in window computing, and different types of join, including leftouterjoin, rightouterjoin, and fulloterjoin, can be easily implemented in Spark Streaming. The RDD generated by stream1 in each batch interval is associated with the RDD generated by stream2 as follows:

Val stream1: DStream [String, String] =.

Val stream2: DStream [String, String] =.

Val joinedStream = output of stream1.join (stream2) Dstream

The output operation allows DStream data to be pushed to an external system, such as a database or files. Because the output operation triggers the actual execution of all DStream transformations (similar to RDD operations), and allows the external system to use the converted data, the output operations are as follows:

In the output DStream, Dstream.foreachRDD is a powerful primitive.

DataFrame and SQL operation

You can easily stream data using DataFrames and SQL operations, but you must create a SparkSession using the SparkContext that StreamingContext is using. The following example uses DataFrames and SQL to generate a word count. Each RDD is converted to DataFrame, registered as a temporary table and queried using SQL:

Val words: DStream [String] =

Words.foreachRDD {rdd = >

Val spark = SparkSession.builder.config (rdd.sparkContext.getConf). GetOrCreate ()

Import spark.implicits._

Val wordsDataFrame = rdd.toDF ("word")

WordsDataFrame.createOrReplaceTempView ("words")

Val wordCountsDataFrame = spark.sql ("select word, count (*) as total from words group by word")

WordCountsDataFrame.show ()

} Spark-shell stream processing

After entering spark-shell, you get the SparkConext (sc) by default. Create a StreamingContext object from the SparkConf object, and create a StreamingContext object in the spark-shell as follows:

Scala > import org.apache.spark.streaming._

Scala > val ssc = new StreamingContext (sc, Seconds (1))

If you are writing a stand-alone Spark Streaming program instead of running it in spark-shell, you need to create a StreamingContext object as follows:

Import org.apache.spark._

Import org.apache.spark.streaming._

Val conf = new SparkConf () .setAppName ("TestDStream") .setMaster ("local [2]")

Val ssc = new StreamingContext (conf, Seconds (1)) file stream

The file stream can read native files or files on HDFS. If you deploy Spark in on yarn mode, start spark-shell to read the files under hdfs:xxxx/user/xx/ on HDFS by default.

Scala > import org.apache.spark.streaming._

Scala > val ssc = new StreamingContext (sc, Seconds (5))

Scala > val lines = ssc.textFileStream ("hdfs://xxx/yzg_test.txt")

Scala > val Counts = lines.flatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _)

Scala > Counts.saveAsTextFiles ("hdfs://xxx/bendi"))

Scala > ssc.start ()

Scala > ssc.awaitTermination ()

Scala > ssc.stop ()

After the above code is run in spark-shell, the files on hdfs are read and counted every 5 seconds, and then written to the "bendi- timestamp" folder in hdfs until ssc.stop (); Counts.saveAsTextFiles ("file://xxx/bendi")) and Counts.print write local and std output, respectively."

Socket socket stream

Spark Streaming can listen and receive data calculation in real time through Socket port. The steps are as follows:

The driver side creates a StreamingContext object, creates JobScheduler and ReceiverTracker in turn when starting the context, and calls their start method. ReceiverTracker sends a startup receiver message to the remote Executor in the start method, and the message contains the address information of the ServerSocket. On the executor side, the Receiver TrackerEndpoint terminal receives the message, extracts the message content, creates a ReceiverRDD object using sparkContext combined with the message content, and finally submits the rdd to the spark cluster. In the code implementation, use nc-lk 9999 to open the 9999 listening port of the address 172.22.241.184 host, and continue to write data to it; use spark-shell to implement the listening port code as follows, the input source is socket source, after simple word frequency statistics, the statistical results are output to the HDFS file system

Scala > import org.apache.spark._

Scala > import org.apache.spark.streaming._

Scala > import org.apache.spark.storage.StorageLevel

Scala > val ssc = new StreamingContext (sc, Seconds (5))

Scala > val lines = ssc.socketTextStream ("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)

Scala > val wordCounts = lines.flatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _)

Scala > wordCounts.saveAsTextFiles ("hdfs://xxx/bendi-socket"))

Scala > ssc.start ()

Scala > ssc.awaitTermination ()

Scala > ssc.stop ()

Kafka stream (window)

Advanced input sources such as Kafka and Flume need to rely on independent libraries (jar files). If you use spark-shell to read advanced input sources such as kafka, you need to put the corresponding dependent jar package under the dependent folder lib of spark.

According to the kafka version currently in use, adapt to the spark-streaming-kafka dependent version you need, and download it from the maven repository at the following address: https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.2.1

Place the corresponding dependent jar package under the dependent folder lib of CDH's spark, and verify whether it is successful by introducing intra-package dependencies:

Scala > import org.apache.spark._

Scala > import org.apache.spark.streaming._

Scala > import org.apache.spark.streaming.kafka._

Scala > val ssc = new StreamingContext (sc, Seconds (5))

Scala > ssc.checkpoint ("hdfs://usr/spark/kafka/checkpoint")

Scala > val zkQuorum = "172.22.241.186purl 2181"

Scala > val group = "test-consumer-group"

Scala > val topics = "yzg_spark"

Scala > val numThreads = 1

Scala > val topicMap = topics.split (",") .map ((_, numThreads.toInt)) .toMap

Scala > val lineMap = KafkaUtils.createStream (ssc,zkQuorum,group,topicMap)

Scala > val pair = lineMap.map (_. _ 2). FlatMap (_ .split (")) .map ((_, 1))

Scala > val wordCounts = pair.reduceByKeyAndWindow (_ + _, _ -, Minutes (2), Seconds (10), 2)

Scala > wordCounts.print

Scala > ssc.start

UpdateStateByKey operation

UpdateStateByKey operations must be used when Spark Streaming needs to maintain state across batches. Taking the word frequency statistics as an example, for the stateful transition operation, the word frequency statistics of the current batches are continuously accumulated on the basis of the word frequency statistics of the previous batches, so the final word frequency is the total word frequency statistics of all batches.

Val updateFunc = (values: Seq [Int], state: Option [Int]) = > {

Val currentCount = values.foldLeft (0) (_ + _)

Val previousCount = state.getOrElse (0)

Some (currentCount + previousCount)

}

Achieve:

Import org.apache.spark._

Import org.apache.spark.streaming._

Import org.apache.spark.storage.StorageLevel

Val ssc = new StreamingContext (sc, Seconds (5))

Ssc.checkpoint ("hdfs:172.22.241.184:8020//usr/spark/checkpoint")

Val lines = ssc.socketTextStream ("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)

Val wordCounts = lines.flatMap (_ .split (")) .map ((_, 1)) .updateStateByKey [Int] (updateFunc)

WordCounts.saveAsTextFiles ("hdfs:172.22.241.184:8020//user/spark/bendi-socket")

Ssc.start ()

Ssc.awaitTermination ()

Ssc.stop () Streaming interacts with Kafka to create Dstream

With regard to the fact that the SparkStreaming real-time computing framework reads the data in kafka in real time and then calculates it, kafkaUtils provides two ways to create Dstream after the spark1.3 version, one is KafkaUtils.createDstream, the other is KafkaUtils.createDirectStream.

KafkaUtils.createDstream mode

Its constructor is KafkaUtils.createDstream (ssc, [zk], [consumer group id], [per-topic,partitions]), which uses receivers to receive data, using Kafka's high-level consumer api, for all the data received by receivers will be saved in Spark executors, and then start job through Spark Streaming to deal with these data, which will be lost by default, WAL log can be enabled, and it will synchronously save the received data to the distributed file system such as HDFS. So the data can be recovered in the event of an error.

A. Create a receiver to regularly pull data from kafka. The RDD partition of ssc and the topic partition of Kafka are not the same concept, so if increasing the number of threads consumed by a specific main consumption is only increasing the number of threads consuming topic in a receiver, it does not increase the number of parallel processing data of spark.

B. For different group and topic, you can use multiple receivers to create different DStream

C, if WAL (spark.streaming.receiver.writeAheadLog.enable=true) is enabled

At the same time, you need to set the storage level (default StorageLevel.MEMORY_AND_DISK_SER_2), namely KafkaUtils.createStream (… ., StorageLevel.MEMORY_AND_DISK_SER)

KafkaUtils.createDirectStream mode

After spark1.3, Direct mode is introduced, which is different from Receiver mode. Direct mode does not have receiver layer, it will periodically get the latest offsets in each partition of each topic in Kafka, and then process each batch according to the set maxRatePerPartition.

The advantages of this approach over the Receiver approach are:

Simplified parallelism: in the Receiver approach, we mentioned that after creating multiple Receiver, we use union to merge into one Dstream to improve the parallelism of data transmission. In Direct mode, partition in Kafka reads Kafka data in parallel with partition in RDD, and this mapping relationship is more convenient for understanding and optimization.

Efficient: in the way of Receiver, in order to achieve zero data loss, the data needs to be stored in Write Ahead Log, so two pieces of data are saved in Kafka and log, which is a waste! The second method does not have this problem, as long as our Kafka data is retained long enough, we can recover data from Kafka.

Accurate: in Receiver mode, the high-level API API of Kafka is used to obtain the offset value from Zookeeper, which is also the traditional way to read data from Kafka, but this method occasionally results in repeated consumption of data because the data consumed by Spark Streaming is out of sync with the offset recorded in Zookeeper. In the second way, the simple low-level Kafka API,Offsets is directly used to record it using the checkpoints of Spark Streaming, which eliminates this inconsistency.

The disadvantage of this approach is that it does not update the offset in Zookeeper, so the Zookeeper-based Kafka monitoring tool will not show progress. However, you can access the offset handled by this method in each batch and update the Zookeeper yourself.

Location strategy

According to the official API document address: http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html, location policy is used to control which executor a specific topic partition is consumed on, and how consumers are scheduled for topic partition in executor, and the choice of location is relative. Location policy has three options:

1. PreferBrokers: kafka server is preferred. This policy can only be used if the kafka server and executor are on the same host.

2. PreferConsistent: consistency is the first choice. Most of the time, all partitions of kakfa topics are evenly distributed on all available executors, which can comprehensively utilize the computing resources of the cluster.

3. PreferFixed: the fixed mode is preferred. If the load is not balanced, you can use this strategy to place the specified topic partition on a specific node; this configuration is a manual control scheme, and scheme (2) is still adopted if there is no explicitly specified partition.

Consumption strategy

Consumer policy is to control how consumer objects are created and configured or how messages on the Kafka are consumed, such as partitions 0 and 1 of T1 topics, or specific message segments on specific partitions. This class can be extended and implemented by itself.

1. ConsumerStrategies.Assign: specify a fixed set of partitions

Def Assign [K, V] (

TopicPartitions: Iterable [TopicPartition]

KafkaParams: collection.Map [String, Object]

Offsets: collection.Map [TopicPartition, Long])

2. ConsumerStrategies.Subscribe: allows consumption to subscribe to a fixed collection of topics

3. ConsumerStrategies.SubscribePattern: use regular expressions to specify a collection of topics of interest.

Spark Streaming development

As a common development tool, IDEA uses maven for unified management of dependent packages, configures the development environment of Scala, and carries out API development of Spark Streaming.

Download and crack IDEA, and add the Chinese package to lib. Restart takes effect.

Import offline Scala plug-ins in IDEA: you need to make sure that you have downloaded and installed Scala and set environment variables on the current win host. First download the Scala plug-in for IDEA without decompressing it, and then add it to IDEA, such as new---setting--plugins-- "input scala"-- install plugin from disk.

Maven Shortcut shift multiple times-find classes and plug-ins

Shift+ctrl+enter- ends the current line and automatically completes the semicolon

Shift+alter+s-setting Settin

Exception thrown by alter+enter- completion

Alter+insert- automatically generates get, set, constructor, etc.

Ctrl+X-deletes the current line

Ctrl+r- replacement

Ctrl+/- multiline code line comments, one comment symbol per line

Ctrl+shift+/- multi-line code is commented in a block, and comment symbol tasks are submitted only at the beginning and end.

New maven project: file--new--project--maven (select quickstart framework model to create new). GroupId and ArtifactID are used to distinguish the java project.

Maven automatically generates pom.xml configuration files for the dependency and introduction of various packages. If you use maven packaging, you need to introduce maven packaging plug-ins: use maven-compiler-plugin and maven-jar-plugin plug-ins, and add the configuration of the specified program entry in prom.xml. For more information, please see https://blog.csdn.net/qq_17348297/article/details/79092383.

Set mainClass to HelloWorld (main class), click the right window maven-> package to generate the jar package, and then use the spark-submit instruction to submit the jar package and execute it.

Spark-submit-class "JSONRead" / usr/local/spark/mycode/json/target/scala-2.11/json-project_2.11-1.0.jar

If there is a cannot find main class error, you need to delete the-class xx.jar option; if there is a "Invalid signature file digest for Manifest main" error, run the zip-d xxx.jar 'META-INF/.SF'' META-INF/.RSA' 'META-INF/*SF' directive to delete the .SF / .RSA / related files in the jar package to which you belong. Task yarn Manager to see how the task is running

Structured Streaming

In Spark2.x, spark has newly opened an unlimited streaming component Structured Streaming based on DataFrame. In the past, when using streaming, it processed all the data of the current batch at once, and processed all kinds of data for this wave of data. If you want to do some statistics similar to pv,uv, you need to rely on the DStream of stateful state, or with the help of some distributed cache systems, such as Redis, it is very inconvenient to do some Streaming operations similar to Group by. In the face of complex streaming scenarios, it is difficult to support the aggregation logic of event_time-based time window.

In Structured Streaming, a steady stream of data is "appended" or "updated" to an unbounded DataFrame through a fixed pattern. The rest of the work is the same as normal DataFrame, you can go to map, filter, or groupby (). Count (), or even join the streaming dataframe with other "static" DataFrame. In addition, streaming based on window time is also provided. In short, Structured Streaming provides fast, scalable, highly available, and highly reliable streaming.

Structured Streaming is built on top of the sparksql engine and can handle your stream calculations in the same way as static data. As the stream data flows in, the Sparksql engine will incrementally and continuously process and update the results. DataSet/DataFrame 's API can be used for streaming aggregations, event-time windows, stream-to-batch joins, etc., and the calculation is also based on the optimized sparksql engine. Through checkpointing and Write Ahead Logs, the system can guarantee point-to-point, one-time processing and fault-tolerant guarantee.

At this point, I believe you have a deeper understanding of "what are the basic knowledge points of Spark?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report