Flink 1.13 发布了!Flink 1.13 包括了超过 200 由名贡献者提交 1000 多次修复和优化。
在这个版本中,Flink 一个主要目标取得了重要进展。Flink 1.13 新引入的被动膨胀容量使流动操作的膨胀容量与其他应用程序一样简单,用户只需要修改和发展。
这个版本还包括一系列重要的变化。当流量操作性能低于预期时,这些变化可以使用户更好地分析原因。这些变化包括识别瓶颈节点的负载和反压可视化,并分析算子热代码 CPU 火焰图及分析 State Backend 状态的 State 访问性能指标。
除此之外,Flink 社区还增加了大量的其他优化,我们将在本文的后续讨论中讨论其中一些。我们希望用户能享受到新版本和特带来的便利。在本文的结尾,我们还将介绍升级Flink一些版本需要注意的变化。
鼓励用户下载试用新版 Flink 并通过邮件列表和 JIRA 反馈遇到的问题。
重要特性
Flink 该项目的初始目标之一是流处理应用程序可以像普通应用程序一样简单和自然,被动扩展是 Flink 针对这一目标的最新进展。
在考虑资源管理和部分时,Flink 有两种可能的模式。用户可以将 Flink 应用部署到 k8s、yarn 在资源管理系统之上,由 Flink 积极管理资源,按需分配和释放资源。这种模式对经常改变资源需求的操作和应用非常有用,如批操作和实时操作 SQL 查询。在这种模式下,Flink 所启动的 Worker 数量由应用程序设置的并发度决定。在 Flink 我们称这种模式为主动扩缩容。
对于长期运行的流处理应用,一个更合适的模型是用户只需要像其他长期运行服务一样启动操作,而不需要考虑部署 k8s、yarn 在其他资源管理平台上,不需要考虑申请资源的数量。相反,它的规模是分配的 worker 数量决定。当 worker 当数量发生变化时,Flink 并发度的自动变化。在 Flink 我们称这种模式为被动扩缩容。
Flink 的 Application 开启了部署模式 Flink 作业更接近普通应用(即启动) Flink 作业不需要执行两个独立的步骤来启动集群和提交应用)的努力,而被动扩缩容完成了这一目标:用户不再需要使用额外的工具(如脚本、K8s 算子)来让 worker 数量与应用并发设置一致。
现在用户可以应用自动扩展容量的工具 Flink 在应用程序之上,就像普通的应用程序一样,只要用户知道扩展容量的成本:当扩展容量时,需要重新分发状态流应用程序。
如果你想尝试被动扩展,用户可以增加 scheduler-mode: reactive 该配置项,然后启动应用集群(Standalone 或者 K8s)。更多细节见被动扩。
对于所有应用程序来说,简单地分析和理解应用程序的性能是一个非常关键的功能。该功能适用于 Flink 更加重要,因为 Flink 应用程序通常是数据密集的(即需要处理大量数据),并且需要在(近)实时延迟中给出结果。
当 Flink 当应用程序无法跟上数据输入的速度时,或者当应用程序占用的资源超过预期时,以下工具可以帮助您分析原因。
瓶颈检测和反压监测
Flink 性能分析首先要解决的问题往往是:哪个算子是瓶颈?
回答这个问题,Flink 引入了描述操作繁忙程度的指标(即处理数据)和反向压力(因为下游算子不能及时处理结果而无法继续输出)。应用程序中可能的瓶颈是上游繁忙和反向压力的算子。
Flink 1.13 优化反压检测的逻辑(基于任务) Mailbox 计时,而不是堆栈采样),重新实现作业图 UI 展示:Flink 现在在 UI 繁忙和反压的程度通过颜色和值来显示。
Web UI 中的 CPU 火焰图
Flink 另一个经常需要回答的性能问题:瓶颈算子计算逻辑消耗的哪一部分?
有效的可视化工具是火焰图。它可以帮助回答以下问题:
- 哪种方法现在被占用? CPU?
- 占用方法不同 CPU 比例如何?
- 用一种方法调用的栈是什么样子的?
火焰图是通过堆栈重复采样线程来构建的。在火焰图中,每种方法都被称为矩形,矩形的长度与该方法在采样中出现的次数成正比。 UI 上面的例子如下图所示。
火焰图的文档包括使用此功能的更多细节和指令。
State 访问延迟指标
另一个可能的性能瓶颈是 state backend,尤其是作业 state 超过内存容量必须使用 RocksDB state backend 时。
我不想在这里说 RocksDB 性能不够好(我们非常喜欢) RocksDB!),但它需要满足一些条件才能达到最佳性能。例如,用户可能很容易遇到由于使用了错误的磁盘资源类型而无法满足的非故意云 RockDB 的 IO 性能要求。
基于 CPU 火焰图,新的 State Backend 延迟指标可以帮助用户更好地判断性能是否不符合预期 State Backend 例如,如果用户发现了它。 RocksDB 单次访问需要几毫秒,因此需要检查内存和 I/O 配置。通过设置这些指标 state.backend.rocksdb.latency-track-enabled 使用此选项。这些指标是通过采样来监控性能的,所以它们是 RocksDB State Backend 性能影响微不足道。
现在用户可以从一个 Savepoint 重启时切换一个 Flink 应用的 State Backend。这使得 Flink 应用程序不再受限制,只能在应用程序首次运行时选择 State Backend。
基于此功能,用户现在可以先使用一个 HashMap State Backend(纯内存的 State Backend),如果后续状态变大,切换到 RocksDB State Backend 中。
在实现层,Flink 现在统一了所有 State Backend 的 Savepoint 实现此功能的格式。
原生 kubernetes 部署(Flink 主动要求 K8s 来启动 Pod)现在可以使用自定义 Pod 模板。
使用这些模板,用户可以使用更合适的模板 K8s 设置方法 JM 和 TM 的 Pod,这种方式比 Flink K8s 集成内置配置项更加灵活。
Unaligned Checkpoint 目前已达到生产可用状态,鼓励用户在反压条件下尝试此功能。
具体来说,Flink 1.13 这些功能被引入 Unaligned Checkpoint 使用方便:
- 现在用户使用 Unaligned Checkpoint 还可以扩展容量应用程序。如果用户因性能原因需要使用 Savepoint而必须使用 Retained checkpoint 这个功能会很方便。
- 对于无反压应用,启用 Unaligned Checkpoint 现在成本更低了。Unaligned Checkpoint 现在可以通过加班自动触发,即默认使用一个应用程序 Aligned Checkpoint(不存储传输中的数据),只在对齐超过一定时间范围时自动切换 Unaligned Checkpoint(存储传输中的数据)。
关于如何启用 Unaligned Checkpoint 可参考相关文件。
为了加速 Flink 机器学习器学习的进展(流批统一机器学习) Flink 机器学习开启了新的 flink-ml 仓库 Stateful Function 通过使用单独的仓库,简化代码合并流程,并可以单独发布版本,从而提高开发效率。
用户可以关注 Flink 机器学习的进展,如 Alink(Flink 常用的机器学习算法套件) Flink 与 Tensorflow 的集成。
SQL / Table API 进展
类似于之前的版本,SQL 和 Table API 在所有开发中仍占很大比例。
通过 Table-valued 定义时间窗口的函数
在流式 SQL 定义时间窗口是最常用的查询之一。Flink 1.13 通过引入一种新的定义窗口: Table-valued 函数。这种方法不仅具有更强的表达能力(允许用户定义新的窗口类型),而且与 SQL 标准更一致。
Flink 1.13 支持新语法 TUMBLE 和 HOP 在后续版本中后续版本 SESSION 窗口。我们通过以下两个例子来展示这种方法的表达能力:
- 例 一、新引进的 CUMULATE 在达到最大窗口大小之前,可以支持按特定步长扩展的窗口函数:
SELECT window_time, window_start, window_end, SUM(price) AS total_price FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL 10' MINUTES))
GROUP BY window_start, window_end, window_time;
- 例 2:用户在 table-valued 窗口函数中可以访问窗口的起始和终止时间,从而使用户可以实现新的功能。例如,除了常规的基于窗口的聚合和 Join 之外,用户现在也可以实现基于窗口的 Top-K 聚合:
SELECT window_time, ...
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
as rank
FROM t
) WHERE rank <= 100;
这一版本极大的简化了 DataStream API 与 Table API 混合的程序。
Table API 是一种非常方便的应用开发接口,因为这经支持表达式的程序编写并提供了大量的内置函数。但是有时候用户也需要切换回 DataStream,例如当用户存在表达能力、灵活性或者 State 访问的需求时。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以将一个 DataStream API 声明的 Source 或者 Sink 当作 Table 的 Source 或者 Sink 来使用。主要的优化包括:
- DataStream 与 Table API 类型系统的自动转换。
- Event Time 配置的无缝集成,Watermark 行为的高度一致性。
- Row 类型(即 Table API 中数据的表示)有了极大的增强,包括 toString() / hashCode() 和 equals() 方法的优化,按名称访问字段值的支持与稀疏表示的支持。
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
DataStream<Row> dataStream = tableEnv.toDataStream(table)
.keyBy(r -> r.getField("user"))
.window(...);
SQL Client 是一种直接运行和部署 SQL 流或批作业的简便方式,用户不需要编写代码就可以从命令行调用 SQL,或者作为 CI / CD 流程的一部分。
这个版本极大的提高了 SQL Client 的功能。现在基于所有通过 Java 编程(即通过编程的方式调用 TableEnvironment 来发起查询)可以支持的语法,现在 SQL Client 和 SQL 脚本都可以支持。这意味着 SQL 用户不再需要添加胶水代码来部署他们的SQL作业。
配置简化和代码共享
Flink 后续将不再支持通过 Yaml 的方式来配置 SQL Client(注:目前还在支持,但是已经被标记为废弃)。作为替代,SQL Client 现在支持使用一个初始化脚本在主 SQL 脚本执行前来配置环境。
这些初始化脚本通常可以在不同团队/部署之间共享。它可以用来加载常用的 catalog,应用通用的配置或者定义标准的视图。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
更多的配置项
通过增加配置项,优化 SET / RESET 命令,用户可以更方便的在 SQL Client 和 SQL 脚本内部来控制执行的流程。
通过语句集合来支持多查询
多查询允许用户在一个 Flink 作业中执行多个 SQL 查询(或者语句)。这对于长期运行的流式 SQL 查询非常有用。
语句集可以用来将一组查询合并为一组同时执行。
以下是一个可以通过 SQL Client 来执行的 SQL 脚本的例子。它初始化和配置了执行多查询的环境。这一脚本包括了所有的查询和所有的环境初始化和配置的工作,从而使它可以作为一个自包含的部署组件。
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
);
-- set the execution mode for jobs
SET execution.runtime-mode=streaming;
-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;
-- set the job's parallelism
SET parallism.default=10;
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
用户现在在 Flink 上也可以使用 Hive SQL 语法。除了 Hive DDL 方言之外,Flink现在也支持常用的 Hive DML 和 DQL 方言。
为了使用 Hive SQL 方言,需要设置 table.sql-dialect 为 hive 并且加载 HiveModule。后者非常重要,因为必须要加载 Hive 的内置函数后才能正确实现对 Hive 语法和语义的兼容性。例子如下:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是, Hive 方言中不再支持 Flink 语法的 DML 和 DQL 语句。如果要使用 Flink 语法,需要切换回 default 的方言配置。
在数据处理中时间处理是一个重要的任务。但是与此同时,处理不同的时区、日期和时间是一个日益复杂的任务。
在 Flink 1.13 中,我们投入了大量的精力来简化时间函数的使用。我们调整了时间相关函数的返回类型使其更加精确,例如 PROCTIME(),CURRENT_TIMESTAMP() 和 NOW()。
其次,用户现在还可以基于一个 TIMESTAMP_LTZ 类型的列来定义 Event Time 属性,从而可以优雅的在窗口处理中支持夏令时。
用户可以参考 Release Note 来查看该部分的完整变更。
PyFlink 核心优化
这个版本对 PyFlink 的改进主要是使基于 Python 的 DataStream API 与 Table API 与 Java/scala 版本的对应功能更加一致。
在 Flink 1.13 中,Python 程序员可以享受到 Flink 状态处理 API 的所有能力。在 Flink 1.12 版本重构过的 Python DataStream API 现在已经拥有完整的状态访问能力,从而使用户可以将数据的信息记录到 state 中并且在后续访问。
带状态的处理能力是许多依赖跨记录状态共享(例如 Window Operator)的复杂数据处理场景的基础。
以下例子展示了一个自定义的计算窗口的实现:
class CountWindowAverage(FlatMapFunction):
def __init__(self, window_size):
self.window_size = window_size
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
# update the count
current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
# if the count reaches window_size, emit the average and clear the state
if current_sum[0] >= self.window_size:
self.sum.clear()
yield value[0], current_sum[1] // current_sum[0]
else:
self.sum.update(current_sum)
ds = ... # type: DataStream
ds.key_by(lambda row: row[0]) \
.flat_map(CountWindowAverage(5))
Flink 1.13 中 PyFlink DataStream 接口增加了对用户自定义窗口的支持,现在用户可以使用标准窗口之外的窗口定义。
由于窗口是处理无限数据流的核心机制 (通过将流切分为多个有限的『桶』),这一功能极大的提高的 API 的表达能力。
Python Table API 现在支持基于行的操作,例如用户对行数据的自定义函数。这一功能使得用户可以使用非内置的数据处理函数。
一个使用 map() 操作的 Python Table API 示例如下:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD("c1", DataTypes.BIGINT()),
DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
return Row(r[0] + 1, r[1])
table = ... # type: Table
mapped_result = table.map(increment_column)
除了 map(),这一 API 还支持 flat_map(),aggregate(),flat_aggregate() 和其它基于行的操作。这使 Python Table API 的功能与 Java Table API 的功能更加接近。
对于有限流,PyFlink DataStream API 现在已经支持 Flink 1.12 DataStream API 中引入的 Batch 执行模式。
通过复用数据有限性来跳过 State backend 和 Checkpoint 的处理,Batch 执行模式可以简化运维,并且提高有限流处理的性能。
其它优化
Flink 文档从 JekyII 迁移到了 Hugo。如果您发现有问题,请务必通知我们,我们非常期待用户对新的界面的感受。
Flink Web UI 现在可以展示导致作业失败的 n 次历史异常,从而提升在一个异常导致多个后续异常的场景下的调试体验。用户可以在异常历史中找到根异常。
Flink 现在提供了失败或被取消的 Checkpoint 的统计,从而使用户可以更简单的判断 Checkpoint 失败的原因,而不需要去查看日志。
Flink 之前的版本只有在 Checkpoint 成功的时候才会汇报指标(例如持久化数据的大小、触发时间等)。
从 1.13 开始,通过使用事务提交数据,JDBC Sink 可以对支持 XA 事务的数据库提供『恰好一次』的一致性支持。这一特性要求目标数据库必须有(或链接到)一个 XA 事务处理器。
这一 Sink 现在只能在 DataStream API 中使用。用户可以通过 JdbcSink.exactlyOnceSink(…) 来创建这一 Sink(或者通过显式初始化一个 JdbcXaSinkFunction)。
PyFlink Table API 现在对 Group 窗口同时支持基于 Python 的用户自定义聚合函数(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。这些函数对许多数据分析或机器学习训练的程序非常重要。
在 Flink 1.13 之前,这些函数仅能在无限的 Group-by 聚合场景下使用。Flink 1.13 优化了这一限制。
Flink 1.13 优化了针对批处理程序的 Sort-merge Blocking Shuffle 的性能和内存占用情况。这一 Shuffle 模式是在Flink 1.12 的 FLIP-148 中引入的。
这一优化避免了大规模作业下不断出现 OutOfMemoryError: Direct Memory 的问题,并且通过 I/O 调度和 broadcast 优化提高了性能(尤其是在机械硬盘上)。
HBase Lookup Table Source 现在可以支持异步查询模式和查询缓存。这极大的提高了使用这一 Source 的 Table / SQL 维表 Join 的性能,并且在一些典型情况下可以减少对 HBase 的 I/O 请求数量。
在之前的版本中,HBase Lookup Source 仅支持同步通信,从而导致作业吞吐以及资源利用率降低。
升级 Flink 1.13 需要注意的改动
- FLINK-21709 – 老的 Table & SQL API 计划器已经被标记为废弃,并且将在 Flink 1.14 中被删除。Blink 计划器在若干版本之前已经被设置为默认计划器,并且将成为未来版本中的唯一计划器。这意味着 BatchTableEnvironment 和 DataSet API 互操作后续也将不再支持。用户需要切换到统一的 TableEnvironment 来编写流或者批的作业。
- FLINK-22352 – Flink 社区决定废弃对 Apache mesos 的支持,未来有可能会进一步删除这部分功能。用户最好能够切换到其它的资源管理系统上。
- FLINK-21935 – state.backend.async 这一配置已经被禁用了,因为现在 Flink 总是会异步的来保存快照(即之前的配置默认值),并且现在没有实现可以支持同步的快照保存操作。
- FLINK-17012 – Task 的 RUNNING 状态被细分为两步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 阶段包括加载 state 和在启用 unaligned checkpoint 时恢复 In-flight 数据的过程。通过显式区分这两种状态,监控系统可以更好的区分任务是否已经在实际工作。
- FLINK-21698 – NUMERIC 和 TIMESTAMP 类型之间的直接转换存在问题,现在已经被禁用,例如 CAST(numeric AS TIMESTAMP(3))。用户应该使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 来代替。
- FLINK-22133 – 新的 Source 接口有一个小的不兼容的修改,即 SplitEnumerator.snapshotState() 方法现在多接受一个 checkpoint id 参数来表示正在进行的 snapshot 操作所属的 checkpoint 的 id。
- FLINK-19463 – 由于老的 Statebackend 接口承载了过多的语义并且容易引起困惑,这一接口被标记为废弃。这是一个纯 API 层的改动,而并不会影响应用运行时。对于如何升级现有作业,请参考作业迁移指引 。
翻译 | 高赟 Review | 朱翥、马国维