Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the five ways to perform aggregation in Apache Spark

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What this article shares to you is about what are the five ways to implement aggregation in Apache Spark. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.

Aggregation is a widely used operator in data analysis tasks, and Spark provides a solid framework for this. Here are five different ways to aggregate for big data using Spark.

GroupByKey or ReduceByKey transformations on RDD: RDD is the earliest representation of distributed data collection in Spark, where the data is represented by any Java object of type "T". Aggregation on RDD is similar to the concept of reduce in the map-reduce framework, where the reduce function (which acts on two input records to generate aggregate records) is the key to aggregation. With RDD, aggregation can be performed through GroupByKey or ReduceByKey conversions, but these conversions are limited to Pair RDD (a collection of tuple objects, each consisting of a key object of type "K" and a value object of type "V").

In the case of aggregation through GroupByKey, the transformation results in a tuple object having a collection of key objects and all value objects for that key object. Therefore, you then need to apply a mapper (mapping transformation via map,maptoPair or mapPartitions) to reduce the collection of value objects for each Tuple object to an aggregate value object.

> Aggregation on a Pair RDD (with 2 partitions) via GroupByKey followed via either of map, maptopair

Mappers, such as map,maptoPair and mappartitions conversions, contain aggregate functions to reduce the collection of value objects of type "V" to aggregate objects of type "U". The aggregate function can be any function and does not need to follow the correlation or exchange properties. There are three styles of GroupByKey transformations, which differ in the partition specification of RDD due to the application of GroupByKey transformations. GroupByKey can be summarized as follows:

GroupByKey (PairRDD) = > PairRDD > Map (PairRDD >) = > PairRDD

If aggregation is done through ReduceByKey, the transformation directly results in tuple objects with key objects and aggregate objects for that key object. Like GroupByKey, there is no need for a mapper after ReduceByKey. The ReduceByKey transformation uses associated and exchangeable aggregate functions so that records in the same partition can be aggregated locally before they are aggregated across partitions. Similarly, the aggregate function accepts two value objects of type "V" and returns an object of type "V". Similar to GroupByKey, ReduceByKey transformations have three styles, and the difference between them is the RDD partition specification resulting from the application of ReduceByKey transformations. ReduceByKey can be summarized as follows:

ReduceByKey (PairRDD, Function) = > PairRDD

In GroupByKey and ReduceByKey, the former is more generic and can be used with any aggregate function, while the latter is more efficient, but only for the type of aggregate function described earlier.

Mappartitions on RDD or dataset: as mentioned in previous blogs, Mappartitions is one of the powerful narrow transformations that can be used on RDD and Dataset (data representation in Spark) to perform various operations wisely. One of such operations also includes aggregation. However, the only condition that needs to be met is that records belonging to the same grouping keyword should be in a single partition. This condition can be implicitly satisfied in a RDD or dataset (to be aggregated) implemented in a mixed operation involving a grouped key. Similarly, this condition can be explicitly achieved by first repartitioning the RDD or dataset based on the packet key.

Within a mappartitions for a typical aggregation flow, you must first instantiate a Hashmap to store the aggregated Value Objects corresponding to the Hashmap and the corresponding grouping key. This Hashmap is then updated repeatedly as you iterate over the data collection for the underlying partition. Finally, the iterator of the aggregate value / object (optional and associated grouping key) contained in the mapping is returned.

Because Mappartitions-based aggregation involves keeping Hashmap in memory to hold keys and aggregated Value objects, if a large number of unique grouping keys reside in the underlying partition, Hashmap will require a large amount of heap memory, which may result in the risk of out-of-memory termination of the corresponding executing program. Henceforth, grouped key distribution across partitions should not be distorted, otherwise executor memory will be wasted due to excessive provision of executor memory to handle skew. In addition, due to the need for an aggregate hash graph based on heap memory, there is more relative memory allocation to memory than the dedicated aggregation operator in Spark, but if memory is not a constraint, Mappartitions-based aggregation can provide a good performance boost.

UDAF for data frames or datasets: unlike the above approach, UDAF implements aggregation based on the concept of aggregation buffers and a set of methods running on this buffer.

> Aggregation buffer based aggregation flow in Spark (for Datasets and Dataframe)

By far, UDAF is by far the most common way to write aggregation logic for Dataframe or Dataset representations of distributed data collection in Spark. UDAF works on an untyped view of data collection, where the data record is treated as a row (of a table), and its schema defines the type and nullability of each column in that row. You can create a UDAF in Spark by extending the "UserDefinedAggregationFunction" class that exists in the package "org.apache.spark.sql.expressions" and overriding the implementation of the following methods in the base class:

/ * Return schema for input column (s) to the UDAF, schema being built using StructType*/ = > public StructType inputSchema () / * Return schema of aggregation buffer, schema being built using StructType*/ = > public StructType bufferSchema () / * DataType of final aggregation result*/ = > public DataType dataType () / * Initialize aggregation buffer*/ = > public void initialize (MutableAggregationBuffer buffer) / * Update aggregation buffer for each of the untyped view (Row) of an input object*/ = > public void update (MutableAggregationBuffer buffer) Row row) / * Update current aggregation buffer with a partially aggregated buffer*/ = > public void merge (MutableAggregationBuffer buffer, Row buffer) / * Evaluate final aggregation buffer and return the evaluated value of DataType declared earlier * / = > public Object evaluate (Row buffer)

In addition to overriding the above methods, you can always declare other fields (using optional initialization in the UDAF constructor) and customize other methods in the UDAF class so that they can be used in the override methods to achieve the aggregation goal.

Before you can use UDAF, you must register the same instance in the Spark framework:

Spark.udf.register ('sampleUDAF, new SampleUDAF ())

After registration, you can use UDAF in the Spark SQL query to aggregate the entire dataset / dataset or group of records in the dataset / dataset (grouped by one or more columns). In addition to being used directly in Spark SQL queries, you can also use UDAF through data box / dataset aggregation API (for example, "agg").

Although UDAF is a popular way to define custom aggregations, you can encounter performance problems when using complex data types (arrays or mappings) in the aggregation buffer. This is due to the fact that converting the scala data type (user-specific) to the corresponding catalyst data type (catalyst internal data type) (and vice versa) becomes very expensive for complex data types during each update operation in UDAF. From a memory and computing point of view, this cost is higher.

Aggregator for datasets: aggregators are the latest methods for aggregating datasets, similar to UDAF, which are also based on the concept of aggregate buffers and a set of methods that run on that buffer. However, the way aggregators aggregate is called typed aggregation because it involves manipulating various types of objects / using various types of objects. The input of the aggregator, the aggregate buffer, and the final aggregate output (derived from the buffer) are all certain types of objects with corresponding Spark encoders. Users can define their own custom Aggregator by extending the abstract generic 'Aggregator' class (provided in the package 'org.apache.spark.sql.expressions) with the type defined for IN (input record type), the type defined for BUF (aggregation buffer) and the type defined for OUT (output record type), and overriding the implementation of the following methods in the base class:

/ * return Encoder for aggregation buffer of type BUF. This is required for buffer ser/deser during shuffling or disk spilling * / = > public Encoder bufferEncoder () / * return Encoder for output object of type OUT after aggregation is performed * / = > public Encoder outputEncoder () / * return updated aggregation buffer object of type BUF after aggregating the existing buffer object of type BUF with the input object of type IN*/ = > public BUF reduce (BUF buffer, IN input) () / * return updated aggregation buffer of type BUF after merging two partially aggregated buffer objects of type BUF * / = > public BUF merge (BUF buffer1 BUF buffer2) / * return output object of type OUT from evaluation of aggregation buffer of type BUF * / = > public OUT finish (BUF arg0) / * return buffer object of type BUF after initializing the same * / = > public BUF zero ()

Because Aggregator natively supports aggregate buffers as objects, it is efficient and does not require unnecessary overhead (like UDAF) associated with converting from Scala types to catalyst types (and vice versa). Similarly, the aggregator's aggregation approach provides more flexibility and programmatic beauty when writing aggregation logic. Aggregators have also been integrated into typeless aggregation streams to support SQL, such as queries in upcoming releases.

Predefined aggregation capabilities: Spark provides a variety of pre-built aggregation capabilities that can be used for data boxes or dataset representations of distributed data collection. These pre-built functions can be used in SPARK SQL query expressions or with aggregate API defined for Dataframe or Dataset. In the org.apache.spark.sql package, all pre-built aggregate functions are defined as static methods of the function class. An underlined link lists all of these features.

Predefined aggregate functions are highly optimized and can be used directly with the Spark tungusten format in most cases. Therefore, if there are pre-built aggregate functions in the "functions" class, Spark programmers should always prefer to use them. In case there is no required aggregate function, only one can resort to writing a custom aggregate function.

These are the five ways to perform aggregation in Apache Spark. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report