Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark-sql custom function

2025-03-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

(1) Custom UDFobject SparkSqlTest {def main (args: Array [String]): Unit = {/ / screen redundant logs Logger.getLogger ("org.apache.hadoop") .setLevel (Level.WARN) Logger.getLogger ("org.apache.spark") .setLevel (Level.WARN) Logger.getLogger ("org.project-spark") .setLevel (Level.WARN) / / build the programming entry Val conf: SparkConf = new SparkConf () conf.setAppName ("SparkSqlTest") .setmaster ("local [2]") val spark: SparkSession = SparkSession.builder () .config (conf) .getOrCreate () / create sqlcontext object val sqlContext: SQLContext = spark.sqlContext / * register the defined UDF: * generic [Int] here String] * the first is the return value type It can be followed by one or more Is the method parameter type * / sqlContext.udf.register [Int,String] ("strLen", strLen) val sql= "" | select strLen ("zhangsan") "" .stripMargin spark.sql (sql). Show ()} / / Custom UDF method def strLen (str:String): Integer= {str.length}} (2) Custom UDAF

The example here is to implement a count:

Customize the UDAF class:

Class MyCountUDAF extends UserDefinedAggregateFunction {/ / the data type entered by the UDAF override def inputSchema: StructType = {StructType (List (StructField ("age", DataTypes.IntegerType))} / / the data type aggregated in the UDAF override def bufferSchema: StructType = {StructType (StructField ("age")) DataTypes.IntegerType))} / / the data type of the UDAF output override def dataType: DataType = DataTypes.IntegerType / / deterministic judgment Usually, the types of specific inputs and outputs are the same override def deterministic: Boolean = true / / buffer: Buffer override def initialize (buffer: MutableAggregationBuffer): Unit = {buffer.update (0line 0)} / * data aggregation merge in partition * @ param buffer: this is the temporary buffer that we declare initialized in the initialize method. Zone * @ param input: the new value passed in by the aggregation operation * / override def update (buffer: MutableAggregationBuffer Input: Row): Unit = {val oldValue=buffer.getInt (0) buffer.update (0jdValueValue1)} / * aggregation between partitions * @ param buffer1: temporary result of partition-aggregation * @ param buffer2 Temporary result of partition dimerization * / override def merge (buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val p1=buffer1.getInt (0) val p2=buffer2.getInt (0) buffer1.update (0rec p1aggregp2)} / / the final output value of the aggregate function override def evaluate (buffer: Row): Any = {buffer.get (0)}}

Call:

Object SparkSqlTest {def main (args: Array [String]): Unit = {/ / screen redundant logs Logger.getLogger ("org.apache.hadoop") .setLevel (Level.WARN) Logger.getLogger ("org.apache.spark") .setLevel (Level.WARN) Logger.getLogger ("org.project-spark") .setLevel (Level.WARN) / / build programming entry val conf: SparkConf = new SparkConf () conf.setAppName ("SparkSqlTest") .setMaster ("local [2]") .set ("spark.serializer" "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses (Array) val spark: SparkSession = SparkSession.builder (). Config (conf) .getOrCreate () / create the sqlcontext object val sqlContext: SQLContext = spark.sqlContext / / register UDAF sqlContext.udf.register ("myCount", new MyCountUDAF ()) val stuList = List (new Student ("xx") 18), new Student ("Wu xx", 18), new Student ("Qi xx", 18), new Student ("Wang xx", 19), new Student ("Xue xx" 19) import spark.implicits._ val stuDS: Dataset [Student] = sqlContext.createDataset (stuList) stuDS.createTempView ("student") val sql= "" | select myCount (1) counts | from student | group by age | order by counts "" .stripMargin spark.sql (sql) ). Show ()}} case class Student (name:String Age:Int)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report