大数据重新开始-20Table API 和 Flink SQL
文章目录
- 大数据重新开始-20Table API 和 Flink SQL
- 一、整体介绍
-
- 1.1 什么是 Table API 和 Flink SQL
- 1.2 依赖需要引入
-
- 1.2.1 牛刀小试
- 1.3 两种planner(old & blink)的区别
- 二、API调用
-
- 2.1 基本程序结构
- 2.2 创建表环境
- 2.3 在Catalog中注册表
-
- 2.3.1 表(Table)的概念
- 2.3.2 读取文件数据(Csv格式)
- 2.3.3 读取Kafka数据
- 2.4 表的查询
-
- 2.4.1 Table API的调用
- 2.4.2 SQL查询
- 2.5 将DataStream 转换成表
-
- 2.5.1 代码表达
- 2.5.2 数据类型与 Table schema的对应
- 2.6. 创建临时视图(Temporary View)
- 2.7. 输出表
-
- 2.7.1 将表转换成DataStream打印
- 2.7.2 输出到文件
- 2.7.3 更新模式(Update Mode)
- 2.7.4 输出到Kafka
- 2.7.5 输出到ElasticSearch
- 2.7.6 输出到MySql
- 2.8 Query解释和执行
- 三、流处理中的特殊概念
-
- 3.1 流处理和关系代数(表,及SQL)的区别
- 3.2 动态表(Dynamic Tables)
- 3.3 连续查询流式的过程
-
- 3.3.1 将流转换成表(Table)
- 3.3.2 持续查询(Continuous Query)
- 3.3.3 将动态表转换为流
- 3.4 时间特性
-
- 3.4.1 处理时间(Processing Time)
- 3.4.2 事件时间(Event Time)
- 四、窗口(Windows)
-
- 4.1 Group Windows
-
- 4.1.1 滚动窗口TableAPI方式
- 4.1.2 滑动窗口TableAPI方式
- 4.1.3 会话窗口TableAPI方式
- 4.2 Group Windows案例实操
-
- 4.2.1 滚动窗(时间)
- 4.2.2 滑动窗(时间)
- 4.2.3 会话窗(时间)
- 4.2.4 滚动窗(计数)
- 4.2.5 滑动窗口(计数)
- 4.3 Over Windows
-
- 4.3.1 FlinkSQL方式
- 4.3.2 TableAPI方式
- 4.4 Over Windows案例实操
- 五、函数(Functions)
-
- 5.2 UDF
-
- 5.2.1 注册用户自定义函数UDF
- 5.2.2 标量函数(Scalar Functions)
- 5.2.3 表函数(Table Functions)
- 5.2.4 聚合函数(Aggregate Functions)
- 5.2.5 表聚合函数(Table Aggregate Functions)
一、整体介绍
1.1 什么是 Table API 和 Flink SQL
Flink本身就是批流统一的处理框架,所以Table API和SQL,批流统一的上层处理API。 功能尚未完善,正处于活跃的发展阶段。 Table API一套内嵌Java和Scala语言查询API,它允许我们以一种非常直观的方式些关系运算符的查询结合起来(例如select、filter和join)。 而对于Flink SQL,也就是说,你可以直接在代码中写作SQL,实现一些查询(Query)操作。Flink的SQL基于实现的支持SQL标准的Apache Calcite(Apache开源SQL分析工具)。 无论输入是批输入还是流式输入,这两套API指定的查询语义相同,结果相同。
1.2 依赖需要引入
Table API和SQL有两种依赖需要引入:planner和bridge。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner; flink-table-api-scala-bridge, flink-table-api-java-bridge:bridge桥接器,主要负责table API和 DataStream/DataSet API的连接支持,按照语言分java和scala。 这里的两个依赖,是IDE环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了。 当然,如果想使用用户自定义函数,或是跟kafka做连接,需要有一个SQL client,这个包含在flink-table-common里。
1.2.1 牛刀小试
import com.atguigu.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSQL_Test01 {
public static void main(String[] args) throws Exception {
//1.获取执行环境并设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取文本数据创建流
DataStreamSource<String> readTextFile = env.readTextFile("sensor");
//3.将每一行数据转换为JavaBean
SingleOutputStreamOperator<SensorReading> sensorDataStream = readTextFile.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
});
//4.创建TableAPI执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//5.从流中创建表
Table table = tableEnv.fromDataStream(sensorDataStream);
//6.转换数据
//6.1 使用TableAPI转换数据
Table result = table.select("id,temp").filter("id = 'sensor_1'");
//6.2 使用FlinkSQL转换数据
tableEnv.createTemporaryView("sensor", sensorDataStream);
Table sqlResult = tableEnv.sqlQuery("select id,temp from sensor where id='sensor_1'");
//7.转换为流输出数据
tableEnv.toAppendStream(result, Row.class).print("result");
tableEnv.toAppendStream(sqlResult, Row.class).print("sql");
//8.启动任务
env.execute();
}
}
1.3 两种planner(old & blink)的区别
:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。 2. 因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。 3. Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog。 4. 旧planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推。 5. 基于字符串的键值配置选项仅适用于Blink planner。 6. PlannerConfig在两个planner中的实现不同。 7. Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立。 8. 旧的planner不支持目录统计,而Blink planner支持。 9. 使用Blink所需依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
二、API调用
2.1 基本程序结构
Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。 具体操作流程如下:
StreamTableEnvironment tableEnv = ... // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过 SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");
2.2 创建表环境
创建表环境最简单的方式,就是基于流处理执行环境,调create方法直接创建:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
表环境(TableEnvironment)是flink中集成Table API & SQL的核心概念。它负责: 注册catalog 在内部 catalog 中注册表 执行 SQL 查询 注册用户自定义函数 将 DataStream 或 DataSet 转换为表 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置 TableEnvironment的一些特性。比如:
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useOldPlanner() // 使用老版本planner
.inStreamingMode() // 流处理模式
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment;
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
2.3 在Catalog中注册表
2.3.1 表(Table)的概念
TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表。它会维护一个Catalog-Table表之间的map。 表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。 表可以是常规的(Table,表),或者虚拟的(View,视图)。 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。 视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果。
2.3.2 读取文件数据(Csv格式)
连接外部系统在Catalog中注册表,直接调用tableEnv.connect()就可以,里面参数要传入一个ConnectorDescriptor,也就是connector描述器。对于文件系统的connector而言,flink内部已经提供了,就叫做FileSystem()。 代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class FlinkSQL_Test02 {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度
env.setParallelism(1);
//3.创建
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//4.读取文件数据创建表
tableEnv.connect(new FileSystem().path("input/sensor.txt"))
.withFormat(new OldCsv())
.withSchema(new Schema().field("id", DataTypes.STRING())
.field("ts", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
//5.执行SQL查询数据
Table table = tableEnv.sqlQuery("select id,temp from inputTable where id='sensor_1'");
//6.将表转换为追加流并打印
tableEnv.toAppendStream(table, Row.class).print();
//7.执行任务
env.execute();
}
}
这是旧版本的csv格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被弃用,以后会被一个符合RFC-4180标准的新format描述器取代。新的描述器就叫Csv(),但flink没有直接提供,需要引入依赖flink-csv:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
代码非常类似,只需要把withFormat里的OldCsv改成Csv就可以了。
2.3.3 读取Kafka数据
kafka的连接器flink-kafka-connector中,1.10版本的已经提供了Table API的支持。我们可以在 connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class FlinkSQL_Test03 {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.定义Kafka连接描述器
Kafka kafka = new Kafka()
.version("0.11")
.topic("test")
.property("bootstrap.servers", "hadoop102:9092");
//3.定义表的Schema信息
Schema schema = new Schema()
.field("id", DataTypes.STRING())
.field("ts", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE());
//4.读取Kafka数据创建临时表
tableEnv.connect(kafka).withFormat(new Csv()).withSchema(schema).createTemporaryTable("KafkaTable");
//5.执行SQL查询数据
Table table = tableEnv.sqlQuery("select id,temp from KafkaTable where id='sensor_1'");
//6.将表转换为追加流进行打印
tableEnv.toAppendStream(table, Row.class).print();
//7.执行任务
env.execute();
}
}
当然也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的。
2.4 表的查询
利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。 Flink给我们提供了两种查询方式:Table API和 SQL。
2.4.1 Table API的调用
Table API是集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。 Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。 代码中的实现如下:
Table sensorTable = tableEnv.from("inputTable");
Table resultTable = senorTable
.select("id, temperature")
.filter("id ='sensor_1'");
2.4.2 SQL查询
Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。 代码实现如下:
Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'");
当然,也可以加上聚合操作,比如我们统计每个sensor温度数据出现的个数,做个count统计:
Table aggResultTable = sensorTable
.groupBy("id")
.select("id, id.count as count");
SQL的实现:
Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");
2.5 将DataStream 转换成表
Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table。Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了。
2.5.1 代码表达
代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。 代码具体如下:
DataStream<String> inputStream = env.readTextFile("sensor.txt");
DataStream<SensorReading> dataStream = inputStream
.map( line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
} );
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");
2.5.2 数据类型与 Table schema的对应
在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。 基于名称的对应:
Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");
Flink的DataStream和 DataSet API支持多种类型。 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。
2.6. 创建临时视图(Temporary View)
创建临时视图的第一种方式,就是直接从DataStream转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段