In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
(1) Custom UDFobject SparkSqlTest {def main (args: Array [String]): Unit = {/ / screen redundant logs Logger.getLogger ("org.apache.hadoop") .setLevel (Level.WARN) Logger.getLogger ("org.apache.spark") .setLevel (Level.WARN) Logger.getLogger ("org.project-spark") .setLevel (Level.WARN) / / build the programming entry Val conf: SparkConf = new SparkConf () conf.setAppName ("SparkSqlTest") .setmaster ("local [2]") val spark: SparkSession = SparkSession.builder () .config (conf) .getOrCreate () / create sqlcontext object val sqlContext: SQLContext = spark.sqlContext / * register the defined UDF: * generic [Int] here String] * the first is the return value type It can be followed by one or more Is the method parameter type * / sqlContext.udf.register [Int,String] ("strLen", strLen) val sql= "" | select strLen ("zhangsan") "" .stripMargin spark.sql (sql). Show ()} / / Custom UDF method def strLen (str:String): Integer= {str.length}} (2) Custom UDAF
The example here is to implement a count:
Customize the UDAF class:
Class MyCountUDAF extends UserDefinedAggregateFunction {/ / the data type entered by the UDAF override def inputSchema: StructType = {StructType (List (StructField ("age", DataTypes.IntegerType))} / / the data type aggregated in the UDAF override def bufferSchema: StructType = {StructType (StructField ("age")) DataTypes.IntegerType))} / / the data type of the UDAF output override def dataType: DataType = DataTypes.IntegerType / / deterministic judgment Usually, the types of specific inputs and outputs are the same override def deterministic: Boolean = true / / buffer: Buffer override def initialize (buffer: MutableAggregationBuffer): Unit = {buffer.update (0line 0)} / * data aggregation merge in partition * @ param buffer: this is the temporary buffer that we declare initialized in the initialize method. Zone * @ param input: the new value passed in by the aggregation operation * / override def update (buffer: MutableAggregationBuffer Input: Row): Unit = {val oldValue=buffer.getInt (0) buffer.update (0jdValueValue1)} / * aggregation between partitions * @ param buffer1: temporary result of partition-aggregation * @ param buffer2 Temporary result of partition dimerization * / override def merge (buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {val p1=buffer1.getInt (0) val p2=buffer2.getInt (0) buffer1.update (0rec p1aggregp2)} / / the final output value of the aggregate function override def evaluate (buffer: Row): Any = {buffer.get (0)}}
Call:
Object SparkSqlTest {def main (args: Array [String]): Unit = {/ / screen redundant logs Logger.getLogger ("org.apache.hadoop") .setLevel (Level.WARN) Logger.getLogger ("org.apache.spark") .setLevel (Level.WARN) Logger.getLogger ("org.project-spark") .setLevel (Level.WARN) / / build programming entry val conf: SparkConf = new SparkConf () conf.setAppName ("SparkSqlTest") .setMaster ("local [2]") .set ("spark.serializer" "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses (Array) val spark: SparkSession = SparkSession.builder (). Config (conf) .getOrCreate () / create the sqlcontext object val sqlContext: SQLContext = spark.sqlContext / / register UDAF sqlContext.udf.register ("myCount", new MyCountUDAF ()) val stuList = List (new Student ("xx") 18), new Student ("Wu xx", 18), new Student ("Qi xx", 18), new Student ("Wang xx", 19), new Student ("Xue xx" 19) import spark.implicits._ val stuDS: Dataset [Student] = sqlContext.createDataset (stuList) stuDS.createTempView ("student") val sql= "" | select myCount (1) counts | from student | group by age | order by counts "" .stripMargin spark.sql (sql) ). Show ()}} case class Student (name:String Age:Int)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.