Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Detailed explanation of Hive UDAF Development

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Description

This article is a lax translation from Hadoop Hive UDAF Tutorial-Extending Hive with Aggregation Functions: because the examples of the translated articles are easy to understand. In addition, I interspersed my UDAF understanding of Hive into the article.

Udfa is a user-defined aggregation function in Hive. The built-in UDAF function of hive includes sum () and count (). There are simple and general ways to implement UDAF. Simple UDAF has lost performance due to the use of Java reflection, and some features can not be used. In this blog, we will focus on the custom clustering function in Hive-GenericUDAF,UDAF development mainly involves the following two abstract classes:

[java] view plain copy

Org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

Org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

Source code link

All the code and data in the blog post can be found at the following link: hive examples

Sample data preparation

Start by creating a table containing the sample data: people, which has only one column name, which contains one or more names, and the table data is saved in the people.txt file.

[plain] view plain copy

~ $cat. / people.txt

John Smith

John and Ann White

Ted Green

Dorothy

Upload the file to the hdfs directory / user/matthew/people:

[plain] view plain copy

Hadoop fs-mkdir people

Hadoop fs-put. / people.txt people

Next, you will create an external table for hive, which will be executed in hive shell

[sql] view plain copy

CREATE EXTERNAL TABLE people (name string)

ROW FORMAT DELIMITED FIELDS

TERMINATED BY'\ t'

ESCAPED BY''

LINES TERMINATED BY'\ n'

STORED AS TEXTFILE

LOCATION'/ user/matthew/people'

Introduction to related abstract classes

To create a GenericUDAF, you must first understand the following two abstract classes:

[java] view plain copy

Org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

[java] view plain copy

Org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

In order to better understand the API of the above abstract class, remember that hive is just a mapreduce function, but hive has helped us write and hide mapreduce and provide a concise sql function, so we need to combine Mapper, Combiner and Reducer to help us understand this function. Keep in mind that there are several machines in the Hadoop cluster, and the Mapper and Reducer tasks run independently on different machines.

So in general, the UDAF function reads the data (mapper), aggregates a bunch of mapper output to a partial aggregate result (combiner), and finally creates a final aggregate result (reducer). Because we aggregate across multiple combiner of a domain, we need to save some of the aggregation results.

AbstractGenericUDAFResolver

Resolver is simple to override and implement the following method, which specifies which Evaluator is called for processing according to the parameter data format of the sql descendant.

[java] view plain copy

Public GenericUDAFEvaluator getEvaluator (TypeInfo [] parameters) throws SemanticException

GenericUDAFEvaluator

UDAF logic processing mainly occurs in Evaluator, and several methods of this abstract class are to be implemented.

Before you can understand Evaluator, you must understand the objectInspector interface and the inner class Model in GenericUDAFEvaluator.

ObjectInspector

The main function is to decouple the use of data and data format, make the data stream switch between different input and output formats on the input and output side, and use different formats on different Operator. You can refer to these two articles: first post on Hive UDFs, the role of ObjectInspector in Hive, which has an introduction to objectinspector.

Model

Model represents the various stages of UDAF in mapreduce.

[java] view plain copy

Public static enum Mode {

/ * *

* PARTIAL1: this is the map phase of mapreduce: from raw data to partial data aggregation

* iterate () and terminatePartial () will be called

, /

PARTIAL1

/ * *

* PARTIAL2: this is the Combiner phase on the map side of mapreduce, which is responsible for merging map data on the map side:: from partial data aggregation to partial data aggregation:

* merge () and terminatePartial () will be called

, /

PARTIAL2

/ * *

* FINAL: reduce phase of mapreduce: from partial data aggregation to full aggregation

* merge () and terminate () will be called

, /

FINAL

/ * *

* COMPLETE: if this stage occurs, it means that the mapreduce has only map and no reduce, so the result will be directly obtained on the map side: from the original data directly to the full aggregation.

* iterate () and terminate () will be called

, /

COMPLETE

}

Generally speaking, the complete UDAF logic is a mapreduce process. If you have mapper and reducer, you will go through PARTIAL1 (mapper), FINAL (reducer), and if you have combiner, you will go through PARTIAL1 (mapper), PARTIAL2 (combiner), FINAL (reducer).

In some cases of mapreduce, there is only mapper, but not reducer, so there is only the COMPLETE phase, which directly enters the original data and produces the result.

GenericUDAFEvaluator's method

[java] view plain copy

/ / determine the data format ObjectInspectors for the input and output parameters of each stage

Public ObjectInspector init (Mode m, ObjectInspector [] parameters) throws HiveException

/ / the class that saves the results of data aggregation

Abstract AggregationBuffer getNewAggregationBuffer () throws HiveException

/ / reset the aggregation result

Public void reset (AggregationBuffer agg) throws HiveException

/ / map phase, iteratively processes the column data passed by the input sql

Public void iterate (AggregationBuffer agg, Object [] parameters) throws HiveException

/ / the result is returned when map and combiner are finished, and some data aggregation results are obtained.

Public Object terminatePartial (AggregationBuffer agg) throws HiveException

/ / combiner merges the results returned by map, and the results returned by reducer merging mapper or combiner.

Public void merge (AggregationBuffer agg, Object partial) throws HiveException

/ / reducer stage, output the final result

Public Object terminate (AggregationBuffer agg) throws HiveException

Illustrating the relationship between Model and Evaluator

Each stage of Model corresponds to the call of Evaluator method

Deal with mapreduce process in each stage of Evaluator

Example

Here is an example of the aggregate function UDAF, where we will calculate the number of name column letters in the people table.

The following function code calculates the total number of characters in the specified column (including spaces)

Code

[java] view plain copy

@ Description (name = "letters", value = "_ FUNC_ (expr)-returns the total number of characters of all strings in the column")

Public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {

@ Override

Public GenericUDAFEvaluator getEvaluator (TypeInfo [] parameters)

Throws SemanticException {

If (parameters.length! = 1) {

Throw new UDFArgumentTypeException (parameters.length-1)

"Exactly one argument is expected."

}

ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo (parameters [0])

If (oi.getCategory ()! = ObjectInspector.Category.PRIMITIVE) {

Throw new UDFArgumentTypeException (0

"Argument must be PRIMITIVE, but"

+ oi.getCategory () .name ()

+ "was passed.")

}

PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi

If (inputOI.getPrimitiveCategory ()! = PrimitiveObjectInspector.PrimitiveCategory.STRING) {

Throw new UDFArgumentTypeException (0

"Argument must be String, but"

+ inputOI.getPrimitiveCategory () .name ()

+ "was passed.")

}

Return new TotalNumOfLettersEvaluator ()

}

Public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

PrimitiveObjectInspector inputOI

ObjectInspector outputOI

PrimitiveObjectInspector integerOI

Int total = 0

@ Override

Public ObjectInspector init (Mode m, ObjectInspector [] parameters)

Throws HiveException {

Assert (parameters.length = = 1)

Super.init (m, parameters)

/ / the sql column is read in the map phase, and the input is in String basic data format

If (m = = Mode.PARTIAL1 | | m = = Mode.COMPLETE) {

InputOI = (PrimitiveObjectInspector) parameters [0]

} else {

/ / for the rest of the phase, input is in Integer basic data format

IntegerOI = (PrimitiveObjectInspector) parameters [0]

}

/ / specify that the output data format of each stage is of Integer type

OutputOI = ObjectInspectorFactory.getReflectionObjectInspector (Integer.class

ObjectInspectorOptions.JAVA)

Return outputOI

}

/ * *

* the class that stores the current total number of characters

, /

Static class LetterSumAgg implements AggregationBuffer {

Int sum = 0

Void add (int num) {

Sum + = num

}

}

@ Override

Public AggregationBuffer getNewAggregationBuffer () throws HiveException {

LetterSumAgg result = new LetterSumAgg ()

Return result

}

@ Override

Public void reset (AggregationBuffer agg) throws HiveException {

LetterSumAgg myagg = new LetterSumAgg ()

}

Private boolean warned = false

@ Override

Public void iterate (AggregationBuffer agg, Object [] parameters)

Throws HiveException {

Assert (parameters.length = = 1)

If (parameters [0]! = null) {

LetterSumAgg myagg = (LetterSumAgg) agg

Object p1 = ((PrimitiveObjectInspector) inputOI) .getPrivveJavaObject (parameters [0])

Myagg.add (String.valueOf (p1). Length ())

}

}

@ Override

Public Object terminatePartial (AggregationBuffer agg) throws HiveException {

LetterSumAgg myagg = (LetterSumAgg) agg

Total + = myagg.sum

Return total

}

@ Override

Public void merge (AggregationBuffer agg, Object partial)

Throws HiveException {

If (partial! = null) {

LetterSumAgg myagg1 = (LetterSumAgg) agg

Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject (partial)

LetterSumAgg myagg2 = new LetterSumAgg ()

Myagg2.add (partialSum)

Myagg1.add (myagg2.sum)

}

}

@ Override

Public Object terminate (AggregationBuffer agg) throws HiveException {

LetterSumAgg myagg = (LetterSumAgg) agg

Total = myagg.sum

Return myagg.sum

}

}

}

Code description

Here are some resources about combiner, and Philippe Adjiman speaks well.

AggregationBuffer allows us to save intermediate results, and by defining our buffer, we can process data in any format, and in the code example the total number of characters is saved in AggregationBuffer.

[java] view plain copy

/ * *

* A class that saves the current total number of characters

, /

Static class LetterSumAgg implements AggregationBuffer {

Int sum = 0

Void add (int num) {

Sum + = num

}

}

This means that UDAF receives different inputs at different mapreduce stages. Iterate reads a row in our table (or the table to be exact) and then outputs the aggregate results in other data formats.

ArtialAggregation merges these aggregation results into new aggregation results in the same format, and then the final reducer takes these aggregation results and outputs the final result (which may not be consistent with the format of the received data).

In the init () method, we specify that the input is string, the result output format is integer, and the partial aggregate output format is integer (saved in aggregation buffer); both terminate () and terminatePartial () output an integer.

[java] view plain copy

/ / the output data format objectinspector is specified according to different mode in the init method.

If (m = = Mode.PARTIAL1 | | m = = Mode.COMPLETE) {

InputOI = (PrimitiveObjectInspector) parameters [0]

} else {

IntegerOI = (PrimitiveObjectInspector) parameters [0]

}

/ / output data formats for different model stages

OutputOI = ObjectInspectorFactory.getReflectionObjectInspector (Integer.class

ObjectInspectorOptions.JAVA)

The iterate () function reads the string of columns in each row, calculates and saves the length of the string

[java] view plain copy

Public void iterate (AggregationBuffer agg, Object [] parameters)

Throws HiveException {

...

Object p1 = ((PrimitiveObjectInspector) inputOI) .getPrivveJavaObject (parameters [0])

Myagg.add (String.valueOf (p1). Length ())

}

}

The Merge function increases the total number of partial aggregations to AggregationBuffer

[java] view plain copy

Public void merge (AggregationBuffer agg, Object partial)

Throws HiveException {

If (partial! = null) {

LetterSumAgg myagg1 = (LetterSumAgg) agg

Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject (partial)

LetterSumAgg myagg2 = new LetterSumAgg ()

Myagg2.add (partialSum)

Myagg1.add (myagg2.sum)

}

}

The Terminate () function returns the contents of the AggregationBuffer, where the final result is produced.

[java] view plain copy

Public Object terminate (AggregationBuffer agg) throws HiveException {

LetterSumAgg myagg = (LetterSumAgg) agg

Total = myagg.sum

Return myagg.sum

}

Use custom functions

[plain] view plain copy

ADD JAR. / hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar

CREATE TEMPORARY FUNCTION letters as' com.matthewrathbone.example.TotalNumOfLettersGenericUDAF'

SELECT letters (name) FROM people

OK

forty-four

Time taken: 20.688 seconds

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report