资讯详情

Flink Sql(二) Kafka连接器

Kafka连接器

在 Table API 和 SQL 编写的 Flink 在创建表格时,可以使用程序 WITH 子句指定连接器 (connector),这样,数据交互就可以连接到外部系统。

架构中的 TableSource 从外部系统中读取数据并转换成表,TableSink 结果表负责 写入外部系统。在 Flink 1.13 的 API 调用中没有区分 TableSource 和 TableSink,我们只需要建立外部系统的连接并创建表,Flink 从程序处理逻辑中自动分析其用途。

Flink 的 Table API 和 SQL 支持各种连接器。当然,最简单的就是连接到控制台打印输出:

CREATE TABLE ResultTable (     user STRING,     cnt BIGINT WITH (  'connector' = 'print' ); 

只需要在这里 WITH 中定义 connector 为 print 可以。对于其他外部系统,需要添加一些配置项。让我们分别介绍一下。

Kafka 的 SQL 连接器可以从 Kafka 的主题(topic)读取数据并将其转换为表数据 写入 Kafka 主题。换句话说,在创建表格时,将连接器指定为 Kafka,该表可用作输入表或输出表。

1.引入依赖

想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>  <version>${flink.version}</version> </dependency> 

我们在这里介绍的 Flink 和 Kafka 和以前一样 DataStream API 中间引入的连接器是一样的。如果你想 SQL 使用客户端 Kafka 还需要下载相应的连接器 jar 包放到 lib 目录下。

另外,Flink 为各种连接器提供一系列表格式(table formats),比如 CSV、JSON、 Avro、Parquet 等等。这些表格式定义了底层存储的二进制数据与表的列之间的转换,相当于表的序列化工具。对于 Kafka 而言,CSV、JSON、Avro 支持主要格式, 根据 Kafka 对于配置在连接器中的格式,我们可能需要引入相应的依赖支持。

以 CSV 为例:

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-csv</artifactId>  <version>${flink.version}</version> </dependency> 

由于 SQL 客户端已内置 CSV、JSON 因此,使用时不需要特别介绍支持;对于 没有内置支持的格式(如 Avro),还是要下载相应的 jar 包。关于连接器的格式细节详见官网说明,我们后面就不再讨论了。

2. 创建连接到 Kafka 的表

创建连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 在句子中指定连接 器为 Kafka,并定义必要的配置参数。

​ 下面是一个具体示例:

CREATE TABLE KafkaTable (
`user` STRING,
 `url` STRING,
 `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
 'connector' = 'kafka',
 'topic' = 'events',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
)

​ 这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始 模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts,它的声明中用到了 METADATA FROM,这是表示一个“元数据列”(metadata column),它是由 Kafka 连接器的 元数据“timestamp”生成的。这里的 timestamp 其实就是 Kafka 中数据自带的时间戳,我们把 它直接作为元数据提取出来,转换成一个新的字段 ts.

3. Upsert Kafka

​ 正常情况下,Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对 应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结 果表写入 Kafka,就会因为 Kafka 无法识别撤回(retract)或更新插入(upsert)消息而导致异常。

​ 为了解决这个问题,Flink 专门增加了一个“更新插入 Kafka”(Upsert Kafka)连接器。这 个连接器支持以更新插入(UPSERT)的方式向 Kafka 的 topic 中读写数据。

​ 具体来说,Upsert Kafka 连接器处理的是更新日志(changlog)流。如果作为 TableSource, 连接器会将读取到的 topic中的数据(key, value),解释为对当前 key 的数据值的更新(UPDATE), 也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所 以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空 (null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。

​ 如果作为 TableSink,Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志 (changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应 的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者 更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka。 由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。

​ 下面是一个创建和使用 Upsert Kafka 表的例子:

CREATE TABLE pageviews_per_region (
 user_region STRING,
 pv BIGINT,
 uv BIGINT,
 PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'pageviews_per_region',
 'properties.bootstrap.servers' = '...',
 'key.format' = 'avro',
 'value.format' = 'avro'
);
CREATE TABLE pageviews (
 user_id BIGINT,
 page_id BIGINT,
 viewtime TIMESTAMP,
 user_region STRING,
 WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'pageviews',
 'properties.bootstrap.servers' = '...',
 'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECT
 user_region,
 COUNT(*),
 COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

​ 这里我们从 Kafka 表 pageviews 中读取数据,统计每个区域的 PV(全部浏览量)和 UV (对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结 果表写入 Kafka 的 pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中 需要用PRIMARY KEY来指定主键,并且在WITH子句中分别指定key和value的序列化格式。

标签: uv连接器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台