资讯详情

大数据再出发-20Table API 和 Flink SQL

大数据重新开始-20Table API 和 Flink SQL

文章目录


一、整体介绍

1.1 什么是 Table API 和 Flink SQL

Flink本身就是批流统一的处理框架,所以Table API和SQL,批流统一的上层处理API。 功能尚未完善,正处于活跃的发展阶段。 Table API一套内嵌Java和Scala语言查询API,它允许我们以一种非常直观的方式些关系运算符的查询结合起来(例如select、filter和join)。 而对于Flink SQL,也就是说,你可以直接在代码中写作SQL,实现一些查询(Query)操作。Flink的SQL基于实现的支持SQL标准的Apache Calcite(Apache开源SQL分析工具)。 无论输入是批输入还是流式输入,这两套API指定的查询语义相同,结果相同。

1.2 依赖需要引入

Table API和SQL有两种依赖需要引入:planner和bridge。

<dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-table-planner_2.12</artifactId>     <version>1.10.1</version> </dependency> <dependency>     <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

flink-table-planner:planner计划器,是table API最主要的部分,提供了运行时环境和生成程序执行计划的planner; flink-table-api-scala-bridge, flink-table-api-java-bridge:bridge桥接器,主要负责table API和 DataStream/DataSet API的连接支持,按照语言分java和scala。 这里的两个依赖,是IDE环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner,就只需要有bridge就可以了。 当然,如果想使用用户自定义函数,或是跟kafka做连接,需要有一个SQL client,这个包含在flink-table-common里。

1.2.1 牛刀小试

import com.atguigu.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkSQL_Test01 { 
        

    public static void main(String[] args) throws Exception { 
        

        //1.获取执行环境并设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.读取文本数据创建流
        DataStreamSource<String> readTextFile = env.readTextFile("sensor");

        //3.将每一行数据转换为JavaBean
        SingleOutputStreamOperator<SensorReading> sensorDataStream = readTextFile.map(line -> { 
        
            String[] fields = line.split(",");
            return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
        });

        //4.创建TableAPI执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //5.从流中创建表
        Table table = tableEnv.fromDataStream(sensorDataStream);

        //6.转换数据
        //6.1 使用TableAPI转换数据
        Table result = table.select("id,temp").filter("id = 'sensor_1'");

        //6.2 使用FlinkSQL转换数据
        tableEnv.createTemporaryView("sensor", sensorDataStream);
        Table sqlResult = tableEnv.sqlQuery("select id,temp from sensor where id='sensor_1'");

        //7.转换为流输出数据
        tableEnv.toAppendStream(result, Row.class).print("result");
        tableEnv.toAppendStream(sqlResult, Row.class).print("sql");

        //8.启动任务
        env.execute();
    }

}

1.3 两种planner(old & blink)的区别

:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。 2. 因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。 3. Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog。 4. 旧planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推。 5. 基于字符串的键值配置选项仅适用于Blink planner。 6. PlannerConfig在两个planner中的实现不同。 7. Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立。 8. 旧的planner不支持目录统计,而Blink planner支持。 9. 使用Blink所需依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

二、API调用

2.1 基本程序结构

Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么几步:首先创建执行环境,然后定义source、transform和sink。 具体操作流程如下:

StreamTableEnvironment  tableEnv = ...     // 创建表的执行环境
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);
// 通过 SQL查询语句,得到一张结果表
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 将结果表写入输出表中
result.insertInto("outputTable");

2.2 创建表环境

创建表环境最简单的方式,就是基于流处理执行环境,调create方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

表环境(TableEnvironment)是flink中集成Table API & SQL的核心概念。它负责: 注册catalog 在内部 catalog 中注册表 执行 SQL 查询 注册用户自定义函数 将 DataStream 或 DataSet 转换为表 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在创建TableEnv的时候,可以多传入一个EnvironmentSettings或者TableConfig参数,可以用来配置 TableEnvironment的一些特性。比如:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
  .useOldPlanner()      // 使用老版本planner
  .inStreamingMode()    // 流处理模式
  .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment;
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.3 在Catalog中注册表

2.3.1 表(Table)的概念

TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表。它会维护一个Catalog-Table表之间的map。 表(Table)是由一个“标识符”来指定的,由3部分组成:Catalog名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。 表可以是常规的(Table,表),或者虚拟的(View,视图)。 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来。 视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果。

2.3.2 读取文件数据(Csv格式)

连接外部系统在Catalog中注册表,直接调用tableEnv.connect()就可以,里面参数要传入一个ConnectorDescriptor,也就是connector描述器。对于文件系统的connector而言,flink内部已经提供了,就叫做FileSystem()。 代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class FlinkSQL_Test02 { 
        

    public static void main(String[] args) throws Exception { 
        

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.设置并行度
        env.setParallelism(1);

        //3.创建
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //4.读取文件数据创建表
        tableEnv.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new OldCsv())
                .withSchema(new Schema().field("id", DataTypes.STRING())
                        .field("ts", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");

        //5.执行SQL查询数据
        Table table = tableEnv.sqlQuery("select id,temp from inputTable where id='sensor_1'");

        //6.将表转换为追加流并打印
        tableEnv.toAppendStream(table, Row.class).print();

        //7.执行任务
        env.execute();
    }

}

这是旧版本的csv格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被弃用,以后会被一个符合RFC-4180标准的新format描述器取代。新的描述器就叫Csv(),但flink没有直接提供,需要引入依赖flink-csv:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.1</version>
</dependency>

代码非常类似,只需要把withFormat里的OldCsv改成Csv就可以了。

2.3.3 读取Kafka数据

kafka的连接器flink-kafka-connector中,1.10版本的已经提供了Table API的支持。我们可以在 connect方法中直接传入一个叫做Kafka的类,这就是kafka连接器的描述器ConnectorDescriptor。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class FlinkSQL_Test03 { 
        

    public static void main(String[] args) throws Exception { 
        

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.定义Kafka连接描述器
        Kafka kafka = new Kafka()
                .version("0.11")
                .topic("test")
                .property("bootstrap.servers", "hadoop102:9092");

        //3.定义表的Schema信息
        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("temp", DataTypes.DOUBLE());

        //4.读取Kafka数据创建临时表
        tableEnv.connect(kafka).withFormat(new Csv()).withSchema(schema).createTemporaryTable("KafkaTable");

        //5.执行SQL查询数据
        Table table = tableEnv.sqlQuery("select id,temp from KafkaTable where id='sensor_1'");

        //6.将表转换为追加流进行打印
        tableEnv.toAppendStream(table, Row.class).print();

        //7.执行任务
        env.execute();

    }

}

当然也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统,实现方式基本上是类似的。

2.4 表的查询

利用外部系统的连接器connector,我们可以读写数据,并在环境的Catalog中注册表。接下来就可以对表做查询转换了。 Flink给我们提供了两种查询方式:Table API和 SQL。

2.4.1 Table API的调用

Table API是集成在Scala和Java语言内的查询API。与SQL不同,Table API的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。 Table API基于代表一张“表”的Table类,并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段,filter(…)表示筛选条件。 代码中的实现如下:

Table sensorTable = tableEnv.from("inputTable");

Table resultTable = senorTable
.select("id, temperature")
.filter("id ='sensor_1'");

2.4.2 SQL查询

Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL标准。在Flink中,用常规字符串来定义SQL查询语句。SQL 查询的结果,是一个新的 Table。 代码实现如下:

Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from inputTable where id ='sensor_1'");

当然,也可以加上聚合操作,比如我们统计每个sensor温度数据出现的个数,做个count统计:

Table aggResultTable = sensorTable
.groupBy("id")
.select("id, id.count as count");

SQL的实现:

Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");

2.5 将DataStream 转换成表

Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream,先流式地读取数据源,然后map成POJO,再把它转成Table。Table的列字段(column fields),就是POJO里的字段,这样就不用再麻烦地定义schema了。

2.5.1 代码表达

代码中实现非常简单,直接用tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。 这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次map操作(或者Table API的 select操作)。 代码具体如下:

DataStream<String> inputStream = env.readTextFile("sensor.txt");

DataStream<SensorReading> dataStream = inputStream
        .map( line -> { 
        
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        } );

Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

2.5.2 数据类型与 Table schema的对应

在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按照类中的字段名来对应的(name-based mapping),所以还可以用as做重命名。 基于名称的对应:

Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");

Flink的DataStream和 DataSet API支持多种类型。 组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等,允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问。其他类型,则被视为原子类型。

2.6. 创建临时视图(Temporary View)

创建临时视图的第一种方式,就是直接从DataStream转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段

标签: ts03s型传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

 锐单商城 - 一站式电子元器件采购平台  

 深圳锐单电子有限公司