Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use Table SQL APIx in Apache Flink

2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you how to use Table SQL APIx in Apache Flink, which is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

What is Flink Relational API?

Although Flink already supports DataSet and DataStream API, is there a better way to program without worrying about the specific API implementation? There is no need to understand the specific implementation of Java and Scala.

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink provides three layers of API, each of which provides a tradeoff between simplicity and expressiveness.

The lowest level is a stateful event-driven. It is very troublesome to develop at this level.

Although many functions can be accomplished based on DataSet and DataStreamAPI, it is difficult to be familiar with these two sets of API, and must be familiar with Java and Scala. If a framework cannot be handled by SQL during its use, then the framework has great limitations. Although it doesn't matter to the developer, it doesn't show up to the user. Therefore, SQL is very popular language.

For example, MapReduce uses Hive SQL,Spark, uses Spark SQL,Flink, uses Flink SQL.

Although Flink supports batch / streaming processing, how can it be unified at the API level?

Thus Table and SQL came into being.

This is actually a relational API that is as simple to operate as Mysql.

Apache Flink features two relational APIs-the Table API and SQL-for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way.

Apache Flink unifies batch and stream processing by using two major features, Table API and SQL. Table API is a query API that integrates the Scala and Java languages and allows operations such as select filter join.

Using Table SQL API requires additional dependencies

Java:

Org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-table-planner_2.11 ${flink.version} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version}

Scala:

Org.apache.flink flink-table-planner_2.11 ${flink.version} org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.version} programming with Table SQL API

First import the above dependencies, and then read the sales.csv file, which contains the following:

TransactionId,customerId,itemId,amountPaid111,1,1100.0112,2,2505.0113,1,3510.0114,2,4600.0115,3,2500.0116,4,2500.0117,1,2500.0118,1,2500.0119,1,3500.0120,1,2500.0121,2,4500.0122,1,2500.0123,1,4500.0124,1 2500.0Scalaobject TableSQLAPI {def main (args: Array [String]): Unit = {val bEnv = ExecutionEnvironment.getExecutionEnvironment val bTableEnv = BatchTableEnvironment.create (bEnv) val filePath= "E:/test/sales.csv" / / DataSet val csv = bEnv.readCsvFile [SalesLog] (filePath,ignoreFirstLine = true) / / DataSet = > Table} case class SalesLog (transactionId:String,customerId:String,itemId:String,amountPaid:Double)}

First get the DataSet, then change the DataSet to Table, and then you can execute SQL

/ / DataSet = > Tableval salesTable = bTableEnv.fromDataSet (csv) / / registered as Table Table = > table bTableEnv.registerTable ("sales", salesTable) / / sql val resultTable = bTableEnv.sqlQuery ("select customerId, sum (amountPaid) money from sales group by customerId") bTableEnv.toDataSet [Row] (resultTable). Print ()

The output is as follows:

4500.03500.01,4110.02,1605.0

In this way, you only need to use SQL to achieve the function of writing mapreduce before. It greatly facilitates the development process.

Javapackage com.vincent.course06;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.types.Row;public class JavaTableSQLAPI {public static void main (String [] args) throws Exception {ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment () BatchTableEnvironment bTableEnv = BatchTableEnvironment.create (bEnv); DataSource salesDataSource = bEnv.readCsvFile ("E:/test/sales.csv"). IgnoreFirstLine (). PojoType (Sales.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = bTableEnv.fromDataSet (salesDataSource); bTableEnv.registerTable ("sales", sales); Table resultTable = bTableEnv.sqlQuery ("select customerId, sum (amountPaid) money from sales group by customerId"); DataSet rowDataSet = bTableEnv.toDataSet (resultTable, Row.class); rowDataSet.print ();} public static class Sales {public String transactionId Public String customerId; public String itemId; public Double amountPaid @ Override public String toString () {return "Sales {" + "transactionId='" + transactionId +'\'+ ", customerId='" + customerId +'\'+ ", itemId='" + itemId +'\'+ ", amountPaid=" + amountPaid +'}' } the above is how to use Table SQL APIx in Apache Flink. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report