In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Description
This article is a lax translation from Hadoop Hive UDAF Tutorial-Extending Hive with Aggregation Functions: because the examples of the translated articles are easy to understand. In addition, I interspersed my UDAF understanding of Hive into the article.
Udfa is a user-defined aggregation function in Hive. The built-in UDAF function of hive includes sum () and count (). There are simple and general ways to implement UDAF. Simple UDAF has lost performance due to the use of Java reflection, and some features can not be used. In this blog, we will focus on the custom clustering function in Hive-GenericUDAF,UDAF development mainly involves the following two abstract classes:
[java] view plain copy
Org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
Org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
Source code link
All the code and data in the blog post can be found at the following link: hive examples
Sample data preparation
Start by creating a table containing the sample data: people, which has only one column name, which contains one or more names, and the table data is saved in the people.txt file.
[plain] view plain copy
~ $cat. / people.txt
John Smith
John and Ann White
Ted Green
Dorothy
Upload the file to the hdfs directory / user/matthew/people:
[plain] view plain copy
Hadoop fs-mkdir people
Hadoop fs-put. / people.txt people
Next, you will create an external table for hive, which will be executed in hive shell
[sql] view plain copy
CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED FIELDS
TERMINATED BY'\ t'
ESCAPED BY''
LINES TERMINATED BY'\ n'
STORED AS TEXTFILE
LOCATION'/ user/matthew/people'
Introduction to related abstract classes
To create a GenericUDAF, you must first understand the following two abstract classes:
[java] view plain copy
Org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
[java] view plain copy
Org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
In order to better understand the API of the above abstract class, remember that hive is just a mapreduce function, but hive has helped us write and hide mapreduce and provide a concise sql function, so we need to combine Mapper, Combiner and Reducer to help us understand this function. Keep in mind that there are several machines in the Hadoop cluster, and the Mapper and Reducer tasks run independently on different machines.
So in general, the UDAF function reads the data (mapper), aggregates a bunch of mapper output to a partial aggregate result (combiner), and finally creates a final aggregate result (reducer). Because we aggregate across multiple combiner of a domain, we need to save some of the aggregation results.
AbstractGenericUDAFResolver
Resolver is simple to override and implement the following method, which specifies which Evaluator is called for processing according to the parameter data format of the sql descendant.
[java] view plain copy
Public GenericUDAFEvaluator getEvaluator (TypeInfo [] parameters) throws SemanticException
GenericUDAFEvaluator
UDAF logic processing mainly occurs in Evaluator, and several methods of this abstract class are to be implemented.
Before you can understand Evaluator, you must understand the objectInspector interface and the inner class Model in GenericUDAFEvaluator.
ObjectInspector
The main function is to decouple the use of data and data format, make the data stream switch between different input and output formats on the input and output side, and use different formats on different Operator. You can refer to these two articles: first post on Hive UDFs, the role of ObjectInspector in Hive, which has an introduction to objectinspector.
Model
Model represents the various stages of UDAF in mapreduce.
[java] view plain copy
Public static enum Mode {
/ * *
* PARTIAL1: this is the map phase of mapreduce: from raw data to partial data aggregation
* iterate () and terminatePartial () will be called
, /
PARTIAL1
/ * *
* PARTIAL2: this is the Combiner phase on the map side of mapreduce, which is responsible for merging map data on the map side:: from partial data aggregation to partial data aggregation:
* merge () and terminatePartial () will be called
, /
PARTIAL2
/ * *
* FINAL: reduce phase of mapreduce: from partial data aggregation to full aggregation
* merge () and terminate () will be called
, /
FINAL
/ * *
* COMPLETE: if this stage occurs, it means that the mapreduce has only map and no reduce, so the result will be directly obtained on the map side: from the original data directly to the full aggregation.
* iterate () and terminate () will be called
, /
COMPLETE
}
Generally speaking, the complete UDAF logic is a mapreduce process. If you have mapper and reducer, you will go through PARTIAL1 (mapper), FINAL (reducer), and if you have combiner, you will go through PARTIAL1 (mapper), PARTIAL2 (combiner), FINAL (reducer).
In some cases of mapreduce, there is only mapper, but not reducer, so there is only the COMPLETE phase, which directly enters the original data and produces the result.
GenericUDAFEvaluator's method
[java] view plain copy
/ / determine the data format ObjectInspectors for the input and output parameters of each stage
Public ObjectInspector init (Mode m, ObjectInspector [] parameters) throws HiveException
/ / the class that saves the results of data aggregation
Abstract AggregationBuffer getNewAggregationBuffer () throws HiveException
/ / reset the aggregation result
Public void reset (AggregationBuffer agg) throws HiveException
/ / map phase, iteratively processes the column data passed by the input sql
Public void iterate (AggregationBuffer agg, Object [] parameters) throws HiveException
/ / the result is returned when map and combiner are finished, and some data aggregation results are obtained.
Public Object terminatePartial (AggregationBuffer agg) throws HiveException
/ / combiner merges the results returned by map, and the results returned by reducer merging mapper or combiner.
Public void merge (AggregationBuffer agg, Object partial) throws HiveException
/ / reducer stage, output the final result
Public Object terminate (AggregationBuffer agg) throws HiveException
Illustrating the relationship between Model and Evaluator
Each stage of Model corresponds to the call of Evaluator method
Deal with mapreduce process in each stage of Evaluator
Example
Here is an example of the aggregate function UDAF, where we will calculate the number of name column letters in the people table.
The following function code calculates the total number of characters in the specified column (including spaces)
Code
[java] view plain copy
@ Description (name = "letters", value = "_ FUNC_ (expr)-returns the total number of characters of all strings in the column")
Public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {
@ Override
Public GenericUDAFEvaluator getEvaluator (TypeInfo [] parameters)
Throws SemanticException {
If (parameters.length! = 1) {
Throw new UDFArgumentTypeException (parameters.length-1)
"Exactly one argument is expected."
}
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo (parameters [0])
If (oi.getCategory ()! = ObjectInspector.Category.PRIMITIVE) {
Throw new UDFArgumentTypeException (0
"Argument must be PRIMITIVE, but"
+ oi.getCategory () .name ()
+ "was passed.")
}
PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi
If (inputOI.getPrimitiveCategory ()! = PrimitiveObjectInspector.PrimitiveCategory.STRING) {
Throw new UDFArgumentTypeException (0
"Argument must be String, but"
+ inputOI.getPrimitiveCategory () .name ()
+ "was passed.")
}
Return new TotalNumOfLettersEvaluator ()
}
Public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI
ObjectInspector outputOI
PrimitiveObjectInspector integerOI
Int total = 0
@ Override
Public ObjectInspector init (Mode m, ObjectInspector [] parameters)
Throws HiveException {
Assert (parameters.length = = 1)
Super.init (m, parameters)
/ / the sql column is read in the map phase, and the input is in String basic data format
If (m = = Mode.PARTIAL1 | | m = = Mode.COMPLETE) {
InputOI = (PrimitiveObjectInspector) parameters [0]
} else {
/ / for the rest of the phase, input is in Integer basic data format
IntegerOI = (PrimitiveObjectInspector) parameters [0]
}
/ / specify that the output data format of each stage is of Integer type
OutputOI = ObjectInspectorFactory.getReflectionObjectInspector (Integer.class
ObjectInspectorOptions.JAVA)
Return outputOI
}
/ * *
* the class that stores the current total number of characters
, /
Static class LetterSumAgg implements AggregationBuffer {
Int sum = 0
Void add (int num) {
Sum + = num
}
}
@ Override
Public AggregationBuffer getNewAggregationBuffer () throws HiveException {
LetterSumAgg result = new LetterSumAgg ()
Return result
}
@ Override
Public void reset (AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = new LetterSumAgg ()
}
Private boolean warned = false
@ Override
Public void iterate (AggregationBuffer agg, Object [] parameters)
Throws HiveException {
Assert (parameters.length = = 1)
If (parameters [0]! = null) {
LetterSumAgg myagg = (LetterSumAgg) agg
Object p1 = ((PrimitiveObjectInspector) inputOI) .getPrivveJavaObject (parameters [0])
Myagg.add (String.valueOf (p1). Length ())
}
}
@ Override
Public Object terminatePartial (AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg
Total + = myagg.sum
Return total
}
@ Override
Public void merge (AggregationBuffer agg, Object partial)
Throws HiveException {
If (partial! = null) {
LetterSumAgg myagg1 = (LetterSumAgg) agg
Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject (partial)
LetterSumAgg myagg2 = new LetterSumAgg ()
Myagg2.add (partialSum)
Myagg1.add (myagg2.sum)
}
}
@ Override
Public Object terminate (AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg
Total = myagg.sum
Return myagg.sum
}
}
}
Code description
Here are some resources about combiner, and Philippe Adjiman speaks well.
AggregationBuffer allows us to save intermediate results, and by defining our buffer, we can process data in any format, and in the code example the total number of characters is saved in AggregationBuffer.
[java] view plain copy
/ * *
* A class that saves the current total number of characters
, /
Static class LetterSumAgg implements AggregationBuffer {
Int sum = 0
Void add (int num) {
Sum + = num
}
}
This means that UDAF receives different inputs at different mapreduce stages. Iterate reads a row in our table (or the table to be exact) and then outputs the aggregate results in other data formats.
ArtialAggregation merges these aggregation results into new aggregation results in the same format, and then the final reducer takes these aggregation results and outputs the final result (which may not be consistent with the format of the received data).
In the init () method, we specify that the input is string, the result output format is integer, and the partial aggregate output format is integer (saved in aggregation buffer); both terminate () and terminatePartial () output an integer.
[java] view plain copy
/ / the output data format objectinspector is specified according to different mode in the init method.
If (m = = Mode.PARTIAL1 | | m = = Mode.COMPLETE) {
InputOI = (PrimitiveObjectInspector) parameters [0]
} else {
IntegerOI = (PrimitiveObjectInspector) parameters [0]
}
/ / output data formats for different model stages
OutputOI = ObjectInspectorFactory.getReflectionObjectInspector (Integer.class
ObjectInspectorOptions.JAVA)
The iterate () function reads the string of columns in each row, calculates and saves the length of the string
[java] view plain copy
Public void iterate (AggregationBuffer agg, Object [] parameters)
Throws HiveException {
...
Object p1 = ((PrimitiveObjectInspector) inputOI) .getPrivveJavaObject (parameters [0])
Myagg.add (String.valueOf (p1). Length ())
}
}
The Merge function increases the total number of partial aggregations to AggregationBuffer
[java] view plain copy
Public void merge (AggregationBuffer agg, Object partial)
Throws HiveException {
If (partial! = null) {
LetterSumAgg myagg1 = (LetterSumAgg) agg
Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject (partial)
LetterSumAgg myagg2 = new LetterSumAgg ()
Myagg2.add (partialSum)
Myagg1.add (myagg2.sum)
}
}
The Terminate () function returns the contents of the AggregationBuffer, where the final result is produced.
[java] view plain copy
Public Object terminate (AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg
Total = myagg.sum
Return myagg.sum
}
Use custom functions
[plain] view plain copy
ADD JAR. / hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar
CREATE TEMPORARY FUNCTION letters as' com.matthewrathbone.example.TotalNumOfLettersGenericUDAF'
SELECT letters (name) FROM people
OK
forty-four
Time taken: 20.688 seconds
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.