In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the knowledge of "how to use FlinkSQL built-in functions". In the operation of practical cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Preface
Flink Table and SQL have many built-in functions supported in SQL; if there are unmet needs, you can implement user-defined functions (UDF) to solve them.
I. built-in function of the system
Flink Table API and SQL provide users with a set of built-in functions for data conversion. Many of the functions supported in SQL have been implemented by Table API and SQL, and others are under rapid development.
The following are some examples of typical functions, all of which are built-in functions, which can be introduced on the official website.
Type TableApiSQLAPI comparison function ANY1 = ANY2value1 = value2 comparison function NY1 > ANY2value1 > value2 logic function BOOLEAN1 | | BOOLEAN2boolean1 OR boolean2 logic function BOOLEAN.isFalseboolean IS FALSE logic function! BOOLEANNOT boolean arithmetic function NUMERIC1 + NUMERIC2numeric1 + numeric2 arithmetic function NUMERIC1.power (NUMERIC2) POWER (numeric1) Numeric2) string function STRING1 + STRING2string1 | | string2 string function STRING.upperCase () UPPER (string) string function STRING.charLength () CHAR_LENGTH (string) time function STRING.toDateDATE string time function currentTime () CURRENT_TIME time function NUMERIC.daysINTERVAL string range time function NUMERIC.minutes (*) aggregate function FIELD.sum0SUM ([ALL | DISTINCT] expression) aggregate function RANK () aggregate function ROW_NUMBER ()
II. Flink UDF
User-defined functions (User-defined Functions,UDF) are an important feature because they significantly expand the expressive power of queries (Query). For some requirements that cannot be solved by the built-in functions of the system, we can use UDF to customize the implementation.
2.1 registering a user-defined function UDF
In most cases, user-defined functions must be registered before they can be used in a query. There is no need to register functions specifically for Scala's Table API.
The function is registered in TableEnvironment by calling the registerFunction () method. When a user-defined function is registered, it is inserted into the TableEnvironment function directory so that the Table API or SQL parser can recognize it and interpret it correctly.
2.2 Scalar function (Scalar Functions)
A user-defined scalar function that maps 0, 1, or more scalar values to new scalar values.
In order to define scalar functions, the base class Scalar Function must be extended in org.apache.flink.table.functions and one or more evaluation,eval methods must be implemented. The behavior of a scalar function is determined by the evaluation method, which must be publicly declared and named eval (direct def declaration, no override). The parameter type and return type of the evaluation method, and determine the parameter and return type of the scalar function.
In the following code, we define our own HashCode function, register it in TableEnvironment, and call it in the query.
Prepare data
Sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
The code is as follows
Package udf import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors. {Csv, FileSystem Schema} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row / * @ Package udf * @ File: FlinkSqlUdfHashCode.java * @ author Brother big data * @ date, 2020-12-29 21:58 * @ version V1.0 * / object FlinkSqlUdfHashCode {def main (args: Array [String]): Unit = {/ / 1. Build runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / set parallelism to 1 / 2. Build TableEnv val tableEnv = StreamTableEnvironment.create (env) / / 3. Build the data source tableEnv.connect (new FileSystem (). Path (". / data/sensor.txt") .withFormat (new Csv ()) .withSchema (new Schema () .field ("id", DataTypes.STRING ()) .field ("timestamp", DataTypes.INT ()) .field ("temperature") DataTypes.DOUBLE (). CreateTemporaryTable ("sensor") / / converted to table val tableSensor = tableEnv.from ("sensor") / / bedstead conversion object val code = new HashCode () / / Test with tableAPI val tableRes = tableSensor.select ('id, code ('id) tableEnv.registerFunction ("code", code) / / Register udf val tableSql = tableEnv.sqlQuery ("" | select | id) | | code (id) | from | sensor | "" .stripMargin) / output tableRes.toAppendStream [Row] .print ("tableAPI") tableSql.toAppendStream [Row] .print ("tableSql") env.execute ("FlinkSqlUdfHashCode")} class HashCode () extends ScalarFunction {def eval (s: String): String = {s.hashCode.toString} running result.
2.3 Table functions (Table Functions)
Similar to user-defined scalar functions, user-defined table functions can take 0, 1, or more scalar values as input parameters
Unlike a scalar function, it can return any number of rows as output instead of a single value. To define a table function, you must extend the base class TableFunction in org.apache.flink.table.functions and implement one or more evaluation methods. The behavior of a table function is determined by its evaluation method, which must be public and named eval. The parameter type of the evaluation method determines all valid parameters of the table function.
The type of the return table is determined by the generic type of TableFunction. The evaluation method uses the protected collect (T) method to emit the output line.
In Table API, the Table function needs to be used with .joinLateral or .leftOuterJoinLateral.
The joinLateral operator concatenates each row in the external table with all the rows calculated by the table function (TableFunction, whose argument is its expression).
The leftOuterJoinLateral operator, on the other hand, is a left outer join, which also joins each row in the external table with all rows generated by the table function calculation, and preserves the outer rows that the table function returns as an empty table.
In SQL, you need to use Lateral Table (), or a left join with an ON TRUE condition.
In the following code, we will define a table function, register it in the table environment, and call it in the query.
Data preparation
Hello | word,hello | spark hello | Flink,hello | java,hello | Brother big data
Write code
Package udf import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row / * * @ Package udf * @ File: FlinkSqlUDFTableFunction.java * @ author Brother big data * @ date 2020-12-29 23:10 * @ version V1.0 * / object FlinkSqlUDFTableFunction {def main (args: Array [String]): Unit = {/ / 1. Build runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / set parallelism to 1 / 2. Build TableEnv val tableEnv = StreamTableEnvironment.create (env) / / 3. Build data source val data = env.readTextFile (". / data/words.txt") / / parse data val wordData: DataStream [String] = data.flatMap (_ .split (",")) / / Type conversion val tableWord = tableEnv.fromDataStream (wordData,'id) / / call TableFunction val split = new Split () / / Table API mode-val resTable1 = tableWord. JoinLateral (split ('id) as ('word,'length)). Select (' id,'word,'length) / / Table API mode two val resTable2 = tableWord. LeftOuterJoinLateral (split ('id) as ('word,'length)) .select (' id,'word,'length) / / register the data as a table tableEnv.createTemporaryView ("sensor", tableWord) tableEnv.registerFunction ("split", split) / / SQL mode-val tableSQL1 = tableEnv.sqlQuery ("" | select | id, | word, | length | from | sensor LATERAL TABLE (split (id)) AS newsensor (word, length) | "" .stripMargin) / / SQL method 2 val TableSQL2 = tableEnv.sqlQuery ("| select | id, | word, | length | from | sensor | LEFT JOIN LATERAL TABLE (split (id)) AS newsensor (word) Length) ON TRUE | "" .stripMargin) / / call data resTable1.toAppendStream [Row] .print ("resTable1") resTable2.toAppendStream [Row] .print ("resTable2") tableSQL1.toAppendStream [Row] .print ("tableSQL1") TableSQL2. ToAppendStream [Row] .print ("TableSQL2") env.execute ("FlinkSqlUDFTableFunction")} class Split () extends TableFunction [(String ") Int)] {def eval (str: String): Unit = {str.split ("\\ |") .foreach (word = > collect ((word, word.length)}
2.4 aggregate function (Aggregate Functions)
A user-defined aggregate function (User-Defined Aggregate Functions,UDAGGs) aggregates the data in a table into a scalar value. User-defined aggregate functions are implemented by inheriting AggregateFunction abstract classes.
An example of aggregation is shown in the figure above.
Suppose there is now a table containing data on various drinks. The table consists of three columns (id, name and price) and five rows of data. Now we need to find the highest price for all the drinks in the table, that is, perform a max () aggregate, and the result will be a number. AggregateFunction works as follows:
First, it needs an accumulator to hold the data structure (state) of the aggregate intermediate results. You can create an empty accumulator by calling the createAccumulator () method of AggregateFunction.
The accumulator is then updated by calling the function's accumulate () method on each input line.
After all the rows have been processed, the function's getValue () method is called to calculate and return the final result. The methods that AggregationFunction requires must be implemented:
In addition to the above methods, there are some alternative ways to implement. Some of these methods can make the system execute queries more efficiently, while others are necessary for some scenarios. For example, the merge () method is required if the aggregate function is applied in the context of the session window (session group window).
Retract ()
Merge ()
ResetAccumulator ()
Next, let's write a custom AggregateFunction and calculate an average of each price.
Data preparation
1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4
The code is as follows
Package udf import org.apache.roomte.rel.`type`.{ RelDataType, RelDataTypeFactory} import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors. {Csv, FileSystem Schema} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row import java.util / * @ Package udf * @ File: FlinkSQUDFAggregateFunction.java * @ author Brother big data * @ date, 2020-12-30 22:06 * @ version V1.0 * / object FlinkSQUDFAggregateFunction {def main (args: Array [String]): Unit = {/ / 1. Build runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / set parallelism to 1 / 2. Build TableEnv val tableEnv = StreamTableEnvironment.create (env) / / 3. Build the data source tableEnv.connect (new FileSystem (). Path (". / data/datas") .withFormat (new Csv) .withSchema (new Schema () .field ("id", DataTypes.STRING ()) .field ("name", DataTypes.STRING ()) .field ("price") DataTypes.DOUBLE (). CreateTemporaryTable ("datas") val AvgTemp = new AvgTemp () val table = tableEnv.from ("datas") val resTableApi = table.groupBy ('id). Val resTableApi (AvgTemp ('price) as' sumprice) .selec t ('id, 'sumprice) tableEnv.registerFunction ("avgTemp", AvgTemp) val tablesql = tableEnv.sqlQuery ("" | select | id) AvgTemp (price) | from datas group by id | "" .stripMargin) resTableApi.toRetractStream.print ("resTableApi") tablesql.toRetractStream [Row] .print ("tablesql") env.execute ("FlinkSQUDFAggregateFunction")} class AvgTempAcc {var sum: Double = 0.0 var count: Int = 0} class AvgTemp extends AggregateFunction [Double " AvgTempAcc] {override def getValue (acc: AvgTempAcc): Double = {acc.sum / acc.count} override def createAccumulator (): AvgTempAcc = new AvgTempAcc ()} def accumulate (accumulator: AvgTempAcc, price: Double): Unit = {accumulator.sum + = price accumulator.count + = 1}}
2.5 Table aggregate function (Table Aggregate Functions)
A user-defined table aggregation function (User-Defined Table Aggregate Functions,UDTAGGs) that aggregates data from a table into a result table with multiple rows and columns. This is very similar to AggregateFunction, except that the aggregate result, which was a scalar value, is now a table.
For example, now we need to find the top two highest prices for all the drinks in the table, that is, to perform top2 () table aggregation. We need to check each of the five rows, and the result will be a table with the first two sorted values. User-defined table aggregation functions are implemented by inheriting TableAggregateFunction abstract classes. TableAggregateFunction works as follows:
To start with, it also needs an accumulator (Accumulator), which is the data structure that holds the intermediate results of the aggregation. You can create an empty accumulator by calling the createAccumulator () method of TableAggregateFunction.
Subsequently, the accumulator is updated by calling the function's accumulate () method on each input line.
After all the rows have been processed, the function's emitValue () method is called to calculate and return the final result. In addition to the above methods, there are some alternative ways to implement.
Retract ()
Merge ()
ResetAccumulator ()
EmitValue ()
EmitUpdateWithRetract ()
Next, let's write a custom TableAggregateFunction to extract the two highest averages for each price.
Data preparation
1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4
The code is as follows
Package udf import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors. {Csv, FileSystem Schema} import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row import org.apache.flink.util.Collector import udf.FlinkSQUDFAggregateFunction.AvgTemp / * @ Package udf * @ File: FlinkSqlUDFTableAggregateFunction.java * @ author Brother big data * @ date, 2020-12-30 22:53 * @ version V1.0 * / object FlinkSqlUDFTableAggregateFunction {def main (args: Array [String]): Unit = {/ / 1. Build runtime environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism (1) / / set parallelism to 1 / 2. Build TableEnv val tableEnv = StreamTableEnvironment.create (env) / / 3. Build the data source tableEnv.connect (new FileSystem (). Path (". / data/datas") .withFormat (new Csv) .withSchema (new Schema () .field ("id", DataTypes.STRING ()) .field ("name", DataTypes.STRING ()) .field ("price") DataTypes.DOUBLE (). CreateTemporaryTable ("datas") val table = tableEnv.from ("datas") val temp = new Top2Temp () val tableApi = table.groupBy ('id) .flatAggregate (temp ('price) as (' tmpprice, 'rank)) .select (' id, 'tmpprice,' rank) tableEnv.registerFunction ("temp") Temp) tableApi.toRetractStream [FlinkSqlUDFTableAggregateFunction] .print () env.execute ("FlinkSqlUDFTableAggregateFunction")} class Top2TempAcc {var highestPrice: Double = Int.MinValue var secodeHighestPrice: Double = Int.MinValue} class Top2Temp extends TableAggregateFunction [(Double, Int), Top2TempAcc] {override def createAccumulator (): Top2TempAcc = new Top2TempAcc def accumulate (acc: Top2TempAcc) Temp: Double): Unit = {if (temp > acc.highestPrice) {acc.secodeHighestPrice = acc.highestPrice acc.highestPrice = temp} else if (temp > acc.secodeHighestPrice) {acc.highestPrice = temp}} def emitValue (acc: Top2TempAcc, out: Collector [(Double, Int)]): Unit = {out.collect (acc.highestPrice, 1) out.collect (acc.secodeHighestPrice 2)} "how to use FlinkSQL built-in functions" ends here Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.