开篇词-克服实时流计算难点,掌握未来大数据
我曾经在华为工作过 2012 实验室高斯部负责实时分析内存数据库 RTANA、华为公有云 RDS 服务的研发。目前,我专注于移动反欺诈解决方案的研发。根据公司的业务需求,我开发了一个实时流量计算系统,并在此基础上完成了风险控制系统的研发。最后,该系统被独角兽收购。
近两年来,越来越多的业务和数据分析对实时性提出了更高的要求,解决实时计算问题的流量计算框架也开始流行起来。
由于工作原因,人们经常问我关于实时流计算系统的问题。经过整体观察,我发现很多时候,他们。
业务功能要求实时,我该如何着陆?
在此之前,我想先说一点:如果你的业务比较简单,你可以通过查看数据库来毫秒返回,那么就没有必要研究更复杂的技术。俗话说没必要就不要增加实体,保持一切简单。
但是,当有大量的请求、大量的数据和非常严格的请求延迟时,例如,必须在毫秒甚至微秒返回,问题变得复杂。
反欺诈或风险控制系统实时检测异常; 大屏系统实时显示业务报表; 推荐系统实时计算用户的兴趣偏好; 实时统计过车流量的智能交通系统。
面对上述业务场景,如果按照传统的数据库添加、删除和修改方法,则需要将所有数据记录在数据库中,然后在查询过程中立即复制和计算。显然,该方案的存储空间和计算时间都非常昂贵,无法实时有效计算。
因此,原本习惯做增删查改业务逻辑开发的人员,。
-
统计时间窗口长,数据量大。 同样的设备在 3 此时,如果您想实时计算结果,则无法通过遍历数据库实现注册事件的次数。
-
变量需要统计,其值域非常大。 同一用户在 6 一个月内使用不同 IP 如果是数亿用户和数亿用户 IP,还能用集合来记录这些不同的值吗?更何况还需要在规定的时间范围内计算。
-
一个完整的业务可能需要计算数十个甚至数百个特征。例如,在实时风险控制系统中,风险控制模型的输入就是这样。为了确保用户体验,风险控制系统必须在几秒甚至几百毫秒内返回。
-
有些问题的算法自然非常复杂,数据量非常大。如何实时计算?例如,社交网络的二次相关分析,以及许多复杂的统计学习和机器学习模型。
-
甚至有时,产品和开发人员也不知道是否需要或可以使用实时流计算技术。这可能令人难以置信,但这样的公司和开发人员并不少。
如果你想有效地解决这些问题,你需要通过现象看到本质。在我看来,上述问题主要是因为以下问题:
-
一是,缺乏对实时流计算技术以及它的适用场景的整体认识;
-
第二,不知道怎么用 “流” 实现各种业务逻辑的异步和高并发计算;
-
第三,不知道怎么针对。 “流” 设计实时算法的独特数据模式;
-
第四,对各种流量计算框架的理解只停留在 API 调用层,不了解背后的设计原理,即 “流” 核心概念和关键技术点是这种计算模式;
-
第五,对一些现有案例缺乏参考和思考。
如何解决实时流计算问题?
既然问题已经明确下一步该怎么克服?我想我们可以从突破的两个方面。
系统架构
从架构师的角度来看,为产品设计一个好的实现方案不仅要有足够的技术储备,还要充分了解具体的业务问题。通过分析各种实时业务场景,我们可以发现大多数方案都是基于 “流计算” 技术的。
本质上,流计算是一种 “异步” 编程方法。业务数据图像 “流水” 同样,通过管道,即队列,不断流入各个环节的子系统,然后由各个环节的子系统独立处理。因此,为了更快地处理流动
目前,开源的流计算框架虽然有许多(比如 Storm、Spark Streaming、Samza 和 Flink),但事实上,这些主流框架背后有一套类似的设计理念和架构模式。它们都涉及流数据状态、流信息状态、反向压力、信息可靠性等概念。。
实时算法
毕竟,传统 “块数据” 流数据需要连续实时处理。
实时流计算算算法的核心问题是解决 大数据量 和“实时计算”之间的矛盾。数据量一大,几乎所有事情都会变得复杂和缓慢。大数据量的问题,集中在四个方面:时间窗口很长、业务请求量很大、内存受限、数据跨网络访问。
为了实现 实时计算 效果要求你对算法进行非常仔细的设计。幸运的是, 您只需要掌握几种特定类型的算法,如计数、求和、平均值、方差、直方图、分位数HyperLogLog 等等。对于更复杂的算法,如果不能直接实时计算,我们可以通过 Lambda 解决架构!
课程设计理念
这门课是从 “” 和 “” 这两个方面将带您了解实时流计算系统。为此,我为您设计了以下学习路径。(注意模块三是 “” 部分,其余模块 “” 相关。)
,以及入门流计算前需要掌握的
因此,您将对实时流计算系统有一个全面的了解 “流” 初步了解本质。
希望能帮助您理解流计算系统的核心概念和实现原理。
因此,您将掌握实时流计算中涉及的各种算法,这将有助于您解决实时业务场景中的各种问题。
另外,我会通过的
讲师寄语
本课程对实时流量计算技术的关键点进行了总结分析和解释。我希望你能从点到面了解整体情况,快速理解大多数流量计算框架的本质,并在方案选择和软件开发有信心。
在中国流量计算技术兴起之前,我从零开始设计并实现了我自己的流量计算框架。这是我实践经验的总结,它经得起事实的验证。
未来,实时流计算技术将不可避免地成为大数据的主流模式,数据不仅仅是基于 “流” 处理方法,还可以 “流” 储存方式。希望这门课能给你真正的帮助。
01-通用架构实时流计算
为什么要把这节课作为第一节课?因为通过这节课,你将建立对流计算技术和系统的整体理解,这不仅可以为下一节课奠定基础,还可以启发实时流计算应用的设计和开发。
任何系统的产生都是为了解决一个特定的问题。实时流计算技术的诞生是为了更快、更完整地获取数据,更快、更充分地挖掘数据价值。
让我们来看看实时流计算技术的几个应用场景。根据这些场景,我们可以大致了解它的一般架构。
实时流计算技术应用场景
图 1 是某打车软件公司交通热点路段分析及可视化系统的示意图。
在这个系统中,从车载设备上发出的数据,被一个基于 Kafka API 的数据采集模块接收,然后发送到 Spark Streaming 模块进行处理,并且还使用机器学习模型进行分析,然后分析的结果以 JSON 的形式存储到数据库中,并提供给可视化模块进行展示和分析。
我们再来看另一个金融风控的例子。图 2 是一个基于 Flink 的实时欺诈检测平台。
在这个平台中,从手机等各种支付渠道产生的交易数据,被数据采集服务器收集起来,并发送到 Kafka。然后 Flink 从 Kafka 中将交易数据取出来,采用基于机器学习的风控模型,进行风险分析和评估。然后分析的结果再次发送到 Kafka,后续支付网关就可以根据这些交易的欺诈风险等级,来允许或阻止交易进行。
实时流计算系统通用架构
比较上面两个场景的流计算系统组成,我们不难发现这些系统,都包含了五个部分:
事实上,也正是这五个部分,构成了一般通用的实时流计算系统,它们之间的组成关系如下图 3 所示。
在上图 3 中,数据采集模块用于接收来自各种数据源的数据,比如互联网上的各种移动设备、物联网上的各种传感器,内部网络中部署在各个服务模块上的日志代理等。数据采集模块收集到这些数据后,对数据进行一定整理,再将数据发送到数据传输模块。
数据传输模块通常是消息中间件,比如 Kafka,之后再由数据处理模块从数据传输模块中取出数据来进行处理。数据处理模块是流计算系统的核心,在这个模块中会实现流计算应用的各种业务功能。
之后,计算结果被重新发送到数据传输模块,并由数据存储模块取出后,保存到各种类型的数据库中。最后,数据展示模块会通过 API 或者 UI 的方式对结果进行展示。
下面我来逐一详细介绍下通用架构的五个部分。
数据采集
俗话说 “巧妇难为无米之炊”,有数据了,我们才能进行流计算,所以我们先来看看应该怎样采集数据。
数据采集,就是从各种数据源收集数据的过程,比如浏览器、手机、工业传感器、日志代理等。怎样开发一个数据采集服务呢?最简单的方式,就是用 Spring Boot 开发一个 REST 服务,这样,我们就可以用 HTTP 请求的方式,从浏览器、手机等终端设备,将数据发送到数据采集服务器。
这么一看,数据采集服务器似乎很简单!其实不然,这中间还是有很多问题需要认真考虑。如果考虑不周的话,很可能你花冤枉钱买了许多服务器,但是系统的性能却依旧十分可怜。
为了避免在以后的开发中出现这种问题,这里我想跟你分享下我在日常开发 Web 服务时考虑的五个关键点。
-
第一点是
吞吐量 。我们一般用 TPS(Transactions Per Second),也就是每秒处理事务数,来描述系统的吞吐量。当吞吐量要求不高时,选择的余地往往更大些。你可以随意采用阻塞 IO ,或非阻塞 IO 的编程框架。但是当吞吐量要求很高时,通常就只能选择非阻塞 IO 的编程框架了。如果采用阻塞 IO 方式时,需要开启数千个线程,才能使吞吐量最大化,就可以考虑换成非阻塞 IO 的方案了。 -
第二点是
时延 。当吞吐量和时延同时有性能要求时,我一般是先保证能够满足时延要求,然后在此基础上,再尽可能提高吞吐量。如果一个服务实例的吞吐量,满足不了要求,就部署多个服务实例。对于互联网上的应用,如果吞吐量很大,为保证时延,还需要使用类似于 CDN 的方案。 -
第三点是
发送方式 。数据可以逐条发送,也可以批次发送。相比逐条发送而言,批次发送每次的网络 IO 耗时更多,为了提升接收服务器的吞吐能力,我一般也会采用 Netty 这样的非阻塞 IO 框架。 -
第四点是
连接方式 。使用长连接还是短连接,一般由具体的场景决定。当有大量连接需要维持时,就需要使用非阻塞 IO 服务器框架,比如 Netty。而当连接数量较少时,采用长连接和连接池的方案,一般也会非常显著提升请求处理的性能。 -
第五点是
连接数量 。如果数据源相对固定,比如微服务之间的调用,那我们可以采用长连接配合连接池的方案,这样一般会非常显著地提升请求处理的性能。但当数据源很多或经常变化时,应该将连接保持时间(Keep Alive Timeout)设置为一个合理的值。
总的来说,在大多数情况下,数据接收服务器选择诸如 Netty 的非阻塞 IO 方案,都会更加合适。
数据采集之后,我们一般还需要做些简单的处理,比如提取出感兴趣的字段,或者对字段进行调整等,然后再将调整好的字段,组成格式统一的数据,比如 JSON、AVRO、Protobuf 等。最后将整理好的数据,发往到数据传输系统。
数据传输
我们这里说的数据传输,是指流数据在各个模块间流转的过程。
流计算系统中,一般是采用消息中间件进行数据传输的,比如 Apache Kafka、RabbitMQ 等,在微服务系统中一般是采用 HTTP 或 RPC 的方式进行数据传输。这是流计算系统与微服务系统最明显的区别。
在选择消息中间件时,你需要重点考虑五个方面的问题:
这是因为,
而高可用和持久化,则是保证我们系统,能够正确稳定运行的重要因素。
总的来说,数据传输系统就像人体的血管,承载了实时流计算系统中数据的传输。一个高吞吐、低时延、支持高可用和持久化,且能水平扩展的数据传输系统,是构建优秀实时流计算应用的基础。目前,像 Kafka 和 Pulsar 都是不错的数据传输系统选择。
数据处理
接下来,我们来看下流计算系统的核心模块,即数据处理。为什么说数据处理,是流计算系统的核心呢?这是因为在数据处理模块,我们将实现各种业务功能,比如数据过滤、聚合计算、CEP、模型训练等。
我们构建实时流计算系统的目的,就是为了解决具体的业务问题。总的来说,这些业务问题可以分为以下四类。
-
第一类是
数据转化 。数据转化包括对流数据的抽取、清洗、转换和加载。比如使用 filter 函数过滤出符合条件的流数据,使用 map 函数给流数据增加新的字段。再比如更复杂的 Flink SQL CDC,也属于数据转化的内容。 -
第二类是在
流数据上,统计各种指标 ,比如计数、求和、均值、标准差、极值、聚合、关联、直方图等。 -
第三类是
模式匹配 。模式匹配是指在流数据上,寻找预先设定的事件序列模式。比如我们常说的 CEP,也就是复杂事件处理,就属于模式匹配。 -
第四类是
模型学习和预测 。基于流的模型学习算法,可以实时动态地训练或更新模型参数,继而根据模型做出预测,能更加准确地描述数据背后当时正在发生的事情。
数据处理是流计算的核心,也是一个流计算应用开发人员最应该掌握的知识点。这部分的内容是非常丰富且有一定难度的,我将在本课程的模块三中,对数据处理问题进行详细讲解。
数据存储
使用实时流计算技术,一顿操作猛如虎,结果不记录,或者不输出结果的话,那就是算了个寂寞。所以数据处理过程中,必然会涉及,数据存储的问题。而数据存储,是一个非常麻烦的问题,特别是在实时流计算领域,这种大数据、低时延、高吞吐的场景,对我们的数据存储方案,挑战是非常大的。
不知道你是否考虑过这个问题,为什么软件行业,有那么多不同种类的数据库?MySQL、MongoDB、Redis、HBase、ElasticSearch、CockroachDB…… 随便想一下,就可以列举出数十种数据库。
这是因为每种数据库,其实都有其擅长的使用场景,没有一种数据库能够在所有场景下都能胜任,所以我在这里先抛砖引玉,针对实时流计算中几种最常见的场景,讲解下应该选择怎样的存储方案。
在实时风控场景下,我们经常需要计算诸如 “过去一天同一设备上登录的不同用户数” 这种类型的查询。在数据量较小时,使用传统关系型数据库和结构化查询语言是个不错的选择。
但当数据量变得很大后,这种基于关系型数据库的方案会变得越来越吃力,直到最后根本不可能在实时级别的时延内完成计算。这个时候,如果采用像 Redis 这样的 NoSQL 数据库并结合优化的算法设计,就能够做到实时查询,并获得更高的吞吐能力。所以相比传统 SQL 数据库,实时流计算中会更多地使用 NoSQL 数据库。
很多时候,我们需要将实时流计算的状态或者结果存储下来,以供其他服务根据一个或多个健,来查询一条特定的,实时计算记录。那这个时候,我们可以选择像 MongoDB 这样的 NoSQL 数据库。当然,这个时候如果在 MongoDB 之上,再配上一个 Redis 缓存也是极好的。
还有些时候,我们需要在 UI 上展现实时流计算的结果。不知道其他人是怎样想的,反正在我这个后端开发眼里,那些产品同学总喜欢在 UI 上设计一些 “莫名其妙” 的交互式查询,比如任意可选的查询条件和查询方式。那这个时候,我们选择的存储方案,就一定不能太“僵硬”,此时采用像 ElasticSearch 这样搜索引擎一类的存储方案,一定是个明智的选择。
总的来说,在相对复杂的业务场景下,实时流计算可能只是系统中的一个环节。我们需要针对不同的计算类型和查询目的,选择合适的存储方案。当一种数据库满足不了业务的需求时,我们还会将相同的数据,存入多种不同的存储。毕竟到目前为止,还没有一种能称之为 “银弹” 的数据库。
数据展现
最后就是数据展现模块了,数据展现是将数据呈现给最终用户的过程。
数据展现的形式,可以是 API,也可以是 UI。
-
API 相对简单,比如用 Spring Boot 就很容易开发一个 REST API 服务。
-
UI 目前越来越多的是采用 Web UI 的方式。
基于 Web 的 UI 有很多优点。一方面,其部署和访问都非常简单,只需要启动 Web 服务,然后在浏览器访问即可。另一方面,各种丰富的前端框架和数据可视化框架,为开发提供了更多的便利和选择,比如前端常用的框架,就有 React、Vue、Angular 等,然后常用的数据可视化框架,则有 ECharts、D3.js 等。
由于数据展示,更加偏向于前端(包括 UI 设计、JS、CSS 和 HTML 等),这与实时流计算的主体,并无太强关联,所以我在本课程中,不会专门讨论数据展示的内容。
小结
今天,我依据几种不同场景的流计算系统,总结了一个通用的流计算系统架构,然后带你了解了这个架构中各个模块在整个系统中起到的作用。
相信你在以后开发实时流计算应用时,十有八九会用到上面这种架构,并且一定会碰到我跟你讲的这些问题,尤其是数据采集、数据处理和数据存储三个模块。
-
数据采集模块的难点,一定会在高并发和高吞吐场景下暴露出来,这点需要你对 NIO 和异步编程有非常深刻的理解。
-
数据处理模块的难点,则主要表现在与业务的贴合。这要求你对流计算能够解决哪些问题有比较深刻的理解,并需要熟练掌握解决这些问题的算法。
-
数据存储模块的难点,则主要表现在能否根据具体的使用场景,选择最合适的存储方案。而实时流计算中,会涉及多种不同类型的数据存储问题。
不过不用担心,在接下来的课程中,我将会为你详细讲解 NIO 和异步编程的问题。至于数据处理和数据存储的内容,则会在本课程的模块三进行详细讨论。
那么你在工作中,有没有遇到比较难解决的实时计算或流计算问题呢?你可以先把这些问题放在留言区,我会时刻关注,并在后续文章为你着重强调哦!
本课时精华:
02-异步和高并发:为什么 NIO 是异步和高并发编程的基础
为什么在讲流计算之前,要先讲异步和高并发的问题呢?
-
其一,是因为 “流” 本质是异步的,可以说 “流计算” 也是一种形式的异步编程。
-
其二,是因为对于一个流计算系统而言,其起点一定是数据采集,没数据就什么事情都做不了,而数据采集通常就会涉及 IO 问题,如何设计一个高性能的 IO 密集型应用,异步和并发编程既是过不去的坎,也是我们掌握高性能 Java 编程的基础。
所以,在这个课时中,我们就从数据采集模块切入,通过开发一个高性能的数据采集模块,从实战中理解 NIO、异步和高并发的原理。这样,当你以后开发高性能服务时,比如需要支持数万甚至数百万并发连接的 Web 服务时,就知道如何充分发挥出硬件资源的能力,就可以用最低的硬件成本,来达到业务的性能要求。
为了更方便地说明问题,我们今天的讨论,以从互联网上采集数据为例。具体来说,如下图 1 所示,数据通过 REST 接口,从手机或网页端,发送到数据采集服务器。
图 1 基于 REST 协议的数据采集服务器
BIO 连接器的问题
由于是面向互联网采集数据,所以我们要实现的数据采集服务器,就是一个常见的 Web 服务。说到 Web 服务开发,作为 Java 开发人员,十有八九会用到 Tomcat。毕竟 Tomcat 一直是 Spring 生态的默认 Web 服务器,使用面是非常广的。
但使用 Tomcat 需要注意一个问题。在 Tomcat 7 及之前的版本中, Tomcat 默认使用的是 BIO 连接器, BIO 连接器的工作原理如下图 2 所示。
图 2 BIO 连接器工作原理
当使用 BIO 连接器时,Tomcat 会为每个客户端请求,分配一个独立的工作线程进行处理。这样,如果有 100 个客户端同时发送请求,就需要同时创建 100 个工作线程。如果有 1 万个客户端同时请求,就需要创建 1 万个工作线程。而如果是 100 万个客户端同时请求呢?是不是需要创建 100 万个工作线程?
所以,
所以,我们需要采取新的方案,这就是 Tomcat NIO 连接器。
使用 NIO 支持百万连接
毫无意外的是,从 Tomcat 8 开始,Tomcat 已经将 NIO 设置成了它的默认连接器。所以,如果你此时还在使用 Tomcat 7 或之前的版本的话,需要检查下你的服务器,究竟使用的是哪种连接器。
图 3 NIO 连接器工作原理
图 3 是 NIO 连接器的工作原理。可以看出,
-
一是,使用 “队列” 将请求接收器和工作线程隔开; -
二是,引入选择器来更加精细地管理连接套接字。
NIO 连接器的这两点改进,带来了两个非常大的好处。
-
一方面,将请求接收器和工作线程隔离开,可以让接收器和工作线程,各自尽其所能地工作,从而更加充分地使用 IO 和 CPU 资源。
-
另一方面,NIO 连接器能够保持的并发连接数,不再受限于工作线程数量,这样无须分配大量线程,数据接收服务器就能支持大量并发连接了。
所以,使用 NIO 连接器,我们解决了百万并发连接的问题。但想要实现一个高性能的数据采集服务器,光使用 NIO 连接器还不够。因为当系统支持百万并发连接时,也就意味着我们的系统是一个吞吐量非常高的系统。这就要求我们在实现业务逻辑时,需要更加精细地使用 CPU 和 IO 资源。否则,千辛万苦改成 NIO 的努力,就都白白浪费了。
如何优化 IO 和 CPU 都密集的任务
考虑实际的应用场景,当数据采集服务器在接收到数据后,往往还需要做三件事情:
-
一是,对数据进行解码;
-
二是,对数据进行规整化,包括字段提取、类型统一、过滤无效数据等;
-
三是,将规整化的数据发送到下游,比如消息中间件 Kafka。
在这三个步骤中,1 和 2 主要是纯粹的 CPU 计算,占用的是 CPU 资源,而 3 则是 IO 输出,占用的是 IO 资源。每接收到一条数据,我们都会执行以上三个步骤,所以也就构成了类似于图 4 所示的这种循环。
图 4 CPU 和 IO 都密集型任务
从图 4 可以看出,数据采集服务器是一个对 CPU 和 IO 资源的使用都比较密集的场景。为什么我们会强调这种
如果想提高 IO 利用率,一种简单且行之有效的方式,是使用更多的线程。这是因为当线程执行到涉及 IO 操作或 sleep 之类的函数时,会触发系统调用。线程执行系统调用,会从用户态进入内核态,之后在其准备从内核态返回用户态时,操作系统将触发一次线程调度的机会。对于正在执行 IO 操作的线程,操作系统很有可能将其调度出去。这是因为触发 IO 请求的线程,通常需要等待 IO 操作完成,操作系统就会暂时让其在一旁等着,先调度其他线程执行。当 IO 请求的数据准备好之后,线程才再次获得被调度的机会,然后继续之前的执行流程。
但是,是不是能够一直将线程的数量增加下去呢?不是的!如果线程过多,操作系统就会频繁地进行线程调度和上下文切换,这样 CPU 会浪费很多的时间在线程调度和上下文切换上,使得用于有效计算的时间变少,从而造成另一种形式的 CPU 资源浪费。
所以,
那具体如何做到这两点呢?这就是接下来要讲的,“NIO”结合 “异步” 方法了。
NIO 结合异步编程
既然要说异步,那什么是异步?举个生活中的例子。当我们做饭时,在把米和水放到电饭锅,并按下电源开关后,不会干巴巴站在一旁等米饭煮熟,而是会利用这段时间去炒菜。当电饭锅的米饭煮熟之后,它会发出嘟嘟的声音,通知我们米饭已经煮好。同时,这个时候我们的菜肴,也差不多做好了。
在这个例子中,我们没有等待电饭锅煮饭,而是让其在饭熟后提醒我们,这种做事方式就是 “异步” 的。反过来,如果我们一直等到米饭煮熟之后再做菜,这就是 “同步” 的做事方式。
对应到程序中,我们的角色就相当于 CPU ,电饭锅煮饭的过程,就相当于一次耗时的 IO 操作,而炒菜的过程,就相当于在执行一段算法。很显然,异步的方式能更加有效地使用 CPU 资源。
那在 Java 中,应该怎样完美地将 NIO 和异步编程结合起来呢?这里我采用了 Netty 框架,和 CompletableFuture 异步编程工具类。具体可以看看这段代码(完整代码):
Executor decoderExecutor = ExecutorHelper.createExecutor(2, "decoder");
Executor ectExecutor = ExecutorHelper.createExecutor(8, "ect");
Executor senderExecutor = ExecutorHelper.createExecutor(2, "sender");
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
}
在上面的代码中,由于 Netty 框架本身已经处理好 NIO 的问题,所以我们的工作重点放在实现 “异步” 处理上。Netty 框架里的 channelRead0 函数,是实现业务逻辑的地方,于是我在这个函数中,将请求处理逻辑细分为,解码(decode)、规整化(doExtractCleanTransform)、发送(send)三个步骤,然后使用 CompletableFuture 类的方法,将这三个步骤串联起来,构成了最终的异步调用链。
至此,我们终于将数据采集服务的整个请求处理过程,都彻彻底底地异步化。所有 CPU 密集型任务和 IO 密集性任务都被隔离开,在各自分配的线程里独立运行,彼此互不影响。这样, CPU 和 IO 资源,都能够得到充分利用,程序的性能也能够彻底释放出来。
小结
今天,我们为了实现了高性能的数据采集服务器,详细分析了 NIO 和异步编程的工作原理,其中,还涉及了一些有关操作系统进行线程调度的知识。我们实现的基于 Netty 的,数据采集服务器,将 NIO 和异步编程技术结合起来,整个请求处理过程都是异步的,最大限度地发挥出, CPU 和 IO 资源的使用效率。
但是,有关异步的内容,还没完全讨论完。在接下来的课程中,我们将着重讨论异步系统的一些问题。我们后面会发现,异步系统的这些问题,也会出现在流计算系统中。
相信通过今天的学习,你对高并发的基础,也就是 NIO 和异步编程,已经有一定理解。那你知道如何在 Spring 框架下,实现 NIO 和异步编程吗?在留言区写出你的想法吧。
本课时精华:
异步编程
在02课我们使用了 Netty 并配合 Java 8 中的 CompletableFuture 类,构建了一个完全异步执行的数据采集服务器。经过这种改造,CPU 和 IO 的使用效率被充分发挥出来,显著提高了服务器在高并发场景下的性能。
异步系统中的 OOM 问题
回想下02课中,基于 Netty 和 CompletableFuture 类的数据采集服务器,关键是下面这部分代码(请参见完整代码):
public static ExecutorService createExecutor(int nThreads, String threadNamePrefix) {
return Executors.newFixedThreadPool(nThreads, threadNameThreadFactory(threadNamePrefix));
}
final private Executor decoderExecutor = createExecutor(2, "decoder");
final private Executor ectExecutor = createExecutor(8, "ect");
final private Executor senderExecutor = createExecutor(2, "sender");
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) {
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
}
从上面的代码可以看出,我们在进行请求处理时,采用了 CompletableFuture 类提供的异步执行框架。在整个执行过程中,请求的处理逻辑都是提交给每个步骤各自的执行器,来进行处理,比如 decoderExecutor、ectExecutor 和 senderExecutor。
仔细分析下这些执行器你就会发现,在上面异步执行的过程中,没有任何阻塞的地方。只不过每个步骤都将它要处理的任务,存放在了执行器的任务队列中。每个执行器,如果它处理得足够快,那么任务队列里的任务都会被及时处理。这种情况下不存在什么问题。
但是,一旦有某个步骤处理的速度比较慢,比如在图 1 中,process 的速度比不上 decode 的速度,那么,消息就会在 process 的输入队列中积压。而由于执行器的任务队列,默认是非阻塞且不限容量的。这样,任务队列里积压的任务,就会越来越多。终有一刻,JVM 的内存会被耗尽,然后抛出 OOM 异常,程序就退出了。
所以,为了避免 OOM 的问题,我们必须对上游输出给下游的速度做流量控制。那怎么进行流量控制呢?
一种方式,是
所以,我们该如何进行流量控制呢?这里有一种更优雅的方法,也就是
反向压力原理
在反向压力的方案中,上游能够根据下游的处理能力,动态地调整输出速度。当下游处理不过来时,上游就减慢发送速度,当下游处理能力提高时,上游就加快发送速度。
反向压力的思想,已经成为流计算领域的共识,并且形成了反向压力相关的标准,也就是 Reactive Streams。
上面的图 2 描述了 Reactive Streams 的工作原理。当下游的消息订阅者,从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息。然后消息发布者就按照这个数量,向下游的消息订阅者发送消息。这样,整个消息传递的过程都是量力而行的,就不存在上下游之间因为处理速度不匹配,而造成的 OOM 问题了。
目前,一些主流的异步框架都开始支持 Reactive Streams 标准,比如 RxJava、Reactor、Akka Streams、Vert.x 等。这足以说明, OOM 和反向压力问题在异步系统中是多么重要!
实现反向压力
现在,我们回到 Netty 数据采集服务器。那究竟该怎样为这个服务器加上反向压力的功能呢?
前面我们分析了异步执行的过程,之所以会出现 OOM 问题,主要还是因为,接收线程在接收到新的请求后,触发了一系列任务。这些任务都会被存放在任务队列中,并且这些任务队列,都是非阻塞且不限容量的。
因此,要实现反向压力的功能,只需要从两个方面来进行控制。
-
其一是,执行器的任务队列,它的容量必须是有限的。
-
其二是,当执行器的任务队列已经满了时,就阻止上游继续提交新的任务,直到任务队列,重新有新的空间可用为止。
按照上面这种思路,我们就可以很容易地实现反向压力。下面的图 3 就展示了,使用容量有限的阻塞队列,实现反向压力的过程。
当 process 比 decode 慢时,运行一段时间后,位于 process 前的任务队列就会被填满。当 decode 继续往里面提交任务时,就会被阻塞,直到 process 从这个任务队列中取走任务为止。
以上说的都是实现原理。那具体用代码该怎样实现呢?下面就是这样一个具备反向压力能力的 ExecutorService 的具体实现。
private final List<ExecutorService> executors;
private final Partitioner partitioner;
private Long rejectSleepMills = 1L;
public BackPressureExecutor(String name, int executorNumber, int coreSize, int maxSize, int capacity, long rejectSleepMills) {
this.rejectSleepMills = rejectSleepMills;
this.executors = new ArrayList<>(executorNumber);
for (int i = 0; i < executorNumber; i++) {
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
this.executors.add(new ThreadPoolExecutor(
coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
queue,
new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-%d").build(),
new ThreadPoolExecutor.AbortPolicy()));
}
this.partitioner = new RoundRobinPartitionSelector(executorNumber);
}
@Override
public void execute(Runnable command) {
boolean rejected;
do {
try {
rejected = false;
executors.get(partitioner.getPartition()).execute(command);
} catch (RejectedExecutionException e) {
rejected = true;
try {
TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
} catch (InterruptedException e1) {
logger.warn("Reject sleep has been interrupted.", e1);
}
}
} while (rejected);
}
在上面的代码中,BackPressureExecutor 类在初始化时,新建一个或多个 ThreadPoolExecutor 对象,作为执行任务的线程池。这里面的关键点有两个。
-
第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue。这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。
-
第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy。这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。之后,我们再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。
这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。
接下来,我们就可以在数据接收服务器中,使用这个带有反向压力功能的 BackPressureExecutor 了(请参见完整代码)。
final private Executor decoderExecutor = new BackPressureExecutor("decoderExecutor",
1, 2, 1024, 1024, 1);
final private Executor ectExecutor = new BackPressureExecutor("ectExecutor",
1, 8, 1024, 1024, 1);
final private Executor senderExecutor = new BackPressureExecutor("senderExecutor",
1, 2, 1024, 1024, 1);
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) {
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
}
从上面的代码可以看出,我们只需把 decode、doExtractCleanTransform 和 send 等每一个步骤用到的执行器,都替换成 BackPressureExecutor 即可。这样,就实现了反向压力功能,其他部分的代码,不需要做任何改变!
最后,还需要说明下的是,在 BackPressureExecutor 的实现中,为什么需要封装多个执行器呢?这是因为,使用 M * N 个线程,有三种不同的方法:
-
第一种是,每个执行器使用 1 个线程,然后使用个 M * N 执行器;
-
第二种是,每个执行器使用 M * N 个线程,然后使用 1 个执行器;
-
第三种是,每个执行器使用 M 个线程,然后使用 N 个执行器。
在不同场景下,三种使用方式的性能表现会有所不同。根据我的经验,主要是因为,队列的生产者之间,存在着相互竞争,然后队列的消费者之间,也存在着相互竞争。所以,如果你要使用这个类的话,还是需要根据实际的使用场景,分配合适的队列数和线程数,避免对同一个队列的竞争,过于激烈。这样,有利于提升程序的性能。
小结
今天,我用反向压力的功能进行流量控制,解决了异步系统中的 OOM 问题。对于一个能够在生产环境上稳定运行的系统来说,任何使用了异步技术的地方,都需要尤其注意 OOM 问题。
其实,解决异步系统 OOM 问题的方法,并不限于反向压力。比如,我们在使用线程池时,设置线程的数量,这也是一种保护措施。但是,我们今天着重强调的是反向压力的方法。这是因为,反向压力在流计算系统中,有着非常重要的地位。像目前的流计算框架,比如 Flink、Spark Streaming 等,都支持反向压力。可以说,如果没有反向压力的功能,任何一个流计算系统,都会时时刻刻有着 OOM 崩溃的风险。
在今天的讨论中,我们已经多次用到了上游、下游,甚至是 Reactive Streams 这种,直接与 “流” 相关的字眼。我们已经隐隐约约感受到,“流”与 “异步” 之间,有着千丝万缕的关系。在接下来的课程中,我们还会专门讨论到,它们之间的关联关系。
相信通过今天的课程,你在以后使用异步编程时,一定会注意到系统的 OOM 问题。你在以往的编程中,有没有遇到过 OOM 问题呢?有的话,可以在评论区留言,我看到后会和你一起分析解决!
本课时精华:
04-流与异步:为什么说掌握流计算先要理解异步编程
在前面的课时中,我们详细分析了 “异步” 的工作原理,并且在解决异步系统的 OOM 问题时,使用了 “反向压力” 的方法。在讨论过程中,我们已经明确地使用到,诸如上游、下游、streams 这样的概念都暗示着我们,“流”和 “异步” 之间有着某种关联。
所以今天,我们就借助于目前四种主流的异步编程方案,来详细分析下 “流” 和“异步”之间这种紧密关系。
异步编程框架
说到 “异步编程” 或者“高并发编程”,你首先想到的是什么呢?
根据我以往当面试官的经验:
-
青铜级 的求职者,一般会说多线程、synchronized、锁等知识,更有甚者还会扯到 Redis 神马的。很显然,这类求职者对异步和高并发编程,其实是没有什么概念的; -
白银级 的求职者,则会说线程池、executor、ConcurrentHashMap 等,这类同学对异步和高并发编程,已经有了初步认识,但却还不够深入; -
王者级别 的求职者,则会对 NIO、Netty、CompletableFuture 等技术如数家珍,甚至还会谈到 Fiber。
其实很多时候,我问求职者的问题,都是在实际开发过程中,需要使用或注意的知识点,要求并不苛刻。毕竟面试的目的,是尽快招到合适的开发人员一起做事,而不是为了刁难人家。但可惜的是,我遇到最多的是青铜,少有白银,王者则更是稀有了。我自己也曾面试过某 BAT 大厂之一,记得当时最后一轮技术面,是三个不同部门的老大同时面我。他们问了我很多问题,其中印象最深的一个,就是关于异步和高并发编程的问题。当时我从 “流” 的角度,结合 NIO 和 CompletableFuture 等工具,跟他们详细讲解了我在平时开发过程中,总结出的最佳实践方案。最后,我顺利拿到了 offer。
所以,回到问题本身,当我们谈论 “异步” 和“高并发”编程时,到底是在说什么呢?通过第 02 课时的学习,我们已经知道,
所以进一步地,我们的问题落在了选择 “异步” 编程方案上。那究竟怎样实现异步编程呢?其实,异步编程的框架非常多,目前主流的异步编程可以分为四类模式:Promise、Actor、ReactiveX 和纤程(或者说协程)。下面我们逐一讨论。
Promise 模式
Promise 模式是非常基本的异步编程模式,在 JavaScript、Java、Python、C++、C# 等语言中,都有 Promise 模式的实现。
Promise 模式在前端 JavaScript 开发中,是非常常见的。这是因为 JavaScript 本身是单线程的,为了解决诸如并发网络请求的问题,JavaScript 使用了大量异步编程的技巧。早期的 JavaScript 还不支持 Promise 模式,为了实现异步编程,采用的都是回调的方式。但是回调会有一个问题,就是所谓的 “回调陷阱”。
举个例子,当你需要依次调用 A、B、C、D 四次网络请求时,如果采用回调的编程方式,那么四次网络请求的回调函数,会依次嵌套起来。这样,整个回调函数的实现会非常长,逻辑会异常复杂,不容易理解和维护。
为了解决 “回调陷阱” 的问题,JavaScript 引入了 Promise 模式。类似于下面这样:
let myPromise = new Promise(function(myResolve, myReject) {
setTimeout(function() { myResolve("Hello World!"); }, 5000);
});
myPromise.then(function(value) {
document.getElementById("test").innerHTML = value;
})
在上面的这段 JavaScript 代码中,实现了一个异步的定时器。定时器定时 5 秒后返回,然后将 id 为 “test” 的元素设置为 “Hello World!”。
可以看出,Promise 模式将嵌套的回调过程,变成了平铺直叙的 Promise 链,这极大地简化了异步编程的复杂程度。
那在 Java 中的 Promise 模式呢?在 Java 8 之前,JDK 是不支持 Promise 模式的。好在 Java 8 为我们带来了 CompletableFuture 类,这就是 Promise 模式的实现。比如在 03 课时的异步执行代码,正是一个 Promise 链。
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
在上面的代码中,我们使用的 executor 都是带队列的线程池,也就是类似于下面这样。
从上面的图 1 可以看出,这个过程有生产者,有队列,有消费者,是不是已经非常像 “流”?
当然,CompletableFuture 类也可以使用其他类型的 executor,比如,使用栈管理线程的 executor。在这种 executor 的实现中,每次调用 execute() 方法时,都是从栈中取出一个线程来执行任务,像这种不带任务队列的执行器,就和 “流” 相差甚远了。
Actor 模式
Actor 模式是另外一种非常著名的异步编程模式。在这种模式中,用 Actor 来表示一个个的活动实体,这些活动实体之间以消息的方式,进行通信和交互。
Actor 模式非常适用的一种场景是游戏开发。比如 DotA 游戏里的小兵,就可以用一个个 Actor 表示。如果要小兵去攻击防御塔,就给这个小兵 Actor 发一条消息,让它移动到塔下,再发一条消息,让它攻击塔。
必须强调的是,Actor 模式最好是构建在纤程上,这样 Actor 才能随心所欲地想干吗就干吗,你写代码时就不会有过多的约束。
如果 Actor 是基于线程构建,那么当存在较多 Actor 时,Actor 的代码就不宜做过多 IO 或 sleep 操作。但大多数情况下,IO 操作都是难以避免的,所以为了减少 IO 和 sleep 操作对其他 Actor 的影响,应将涉及 IO 操作的 Actor 与其他非 IO 操作的 Actor 隔离开。给涉及 IO 操作的 Actor 分配专门的线程,不让这些 Actor 和其他非 IO 操作的 Actor 分配到相同的线程。这样可以保证 CPU 和 IO 资源,都能充分利用,提高了程序的性能。
在 JVM 平台上,比较有名的 Actor 模式是 Akka。但是 Akka 是构建在线程而非纤程上,所以使用起来就存在上面说的这些问题。
如果你要用 Akka 的话,需要注意给以 IO 操作为主的 Actor ,分配专门的线程池。另外,Akka 自身不具备反向压力功能,所以使用起来时,还需要自己进行流量控制才行。
我自己曾经实现过,感觉还是有点小麻烦的。主要的问题在于 Actor 系统对邮箱的定位,已经要求邮箱,也就是 Actor 用于接收消息的队列,最好不要阻塞。所以如果是做流量控制的话,就不能直接将邮箱,设置为容量有限的阻塞队列,这样在 Actor 系统中,非常容易造成死锁。
ReactiveX 模式
ReactiveX 模式又称之为响应式扩展,它是一种观察者模式。在 Java 中,ReactiveX 模式的实现是 RxJava。ReactiveX 模式的核心是观察者(Observer)和被观察者(Observable)。被观察者(Observable)产生一系列的事件,比如网络请求、数据库操作、文件读取等,然后观察者会观察到这些事件,之后就触发一系列后续动作。
下面就是使用 RxJava 编写的一段异步执行代码。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onNext("!!!");
subscriber.onCompleted();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "onNext: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "onCompleted called");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "onError called");
}
};
observable.subscribe(observer);
在上面的代码中,被观察者依次发出 “Hello”“World”“!!!” 三个事件,然后观察者观察到这三个事件后,就将每个事件打印出来。
非常有趣的是,ReactiveX 将其自身定义为一个异步编程库,却明确地将被观察者的事件序列,按照 “无限流”(infinite streams)的方式来进行处理,还实现了 Reactive-Streams 标准,支持反向压力功能。
你说,是不是他们也发现了,流和异步之间有着相通之处呢?
不过,相比 Java 8 的 CompletableFuture,我觉得这个 RxJava 还是显得有些复杂,理解和使用起来都更加麻烦,但明显的优势又没有,所以我不太推荐使用这种异步编程模式。
我在之前的工作中,也有见过其他同事在 Android 开发时使用这种模式。所以,如果你感兴趣的话,也可以了解一下。如果是我,我就直接使用 CompletableFuture 了。
纤程 / 协程模式
最后是纤程(fiber)模式,也称之为协程(coroutine)模式。应该说纤程是最理想的异步编程方案了,没有之一!它是用 “同步方式写异步代码” 的最高级别形态。
下面的图 3 是纤程的工作原理,纤程是一种用户态的线程,其调度逻辑在用户态实现,从而避免过多地进出内核态,进行调度和上下文切换。
实现纤程的关键,是要在执行过程中,能够在恰当的时刻和地方中断,并将 CPU 让给其他纤程使用。具体实现起来就是,将 IO 操作委托给少量固定线程,再使用另外少量线程负责 IO 状态检查和纤程调度,再使用另外一批线程执行纤程。
这样,少量线程就可以支撑大量纤程的执行,从而保证了 CPU 和 IO 资源的使用效率,提升了程序的性能。
使用纤程还可以极大地降低异步和并发编程的难度。但可惜的是,当前 Java 还不支持纤程。Java 对纤程的支持还在路上,你可以查阅一个被称之为 Loom 的项目来跟踪进度。所以这里,我就先借助当前另外一款火爆的编程语言 Golang,来对纤程做一番演示。
下面就是一个 Golang 协程的示例代码。
package main
import (
"fmt"
"time"
)
func produce(queue chan int) {
for i := 0; true; i++ {
time.Sleep(1 * time.Second)
queue <- i
fmt.Printf("produce item[%d]\n", i)
}
}
func consume(queue chan int) {
for {
e := <-queue
fmt.Printf("consume item[%d]\n", e)
}
}
func waitForever() {
for {
time.Sleep(1 * time.Second)
}
}
func main() {
queue := make(chan int, 10)
go produce(queue)
go consume(queue)
waitForever()
}
在上面的代码中,我们使用了 Golang 最核心的两个概念,即 goroutine 和 channel。Golang 最推崇的并发编程思路就是,通过通信来共享内存,而不是通过共享内存来进行通信。所以,我们可以非常直观地看到,这里的 channel 就是一个容量有限的阻塞队列,天然就具备了反向压力的能力。
所以,Golang 这种生产者、队列、消费者的模式,不就是 “流” 的一种雏形吗?
异步和流之间的关系
至此,我们已经讨论了四种不同的异步编程模式。除了像 async/await 这样的异步编程语法糖外,上面讨论的四种模式,基本覆盖了当前所有主流的异步编程模式。这里稍微提一下,async/await 这个异步编程语法糖,还是非常有趣的,Python 和 JavaScript 都支持,建议你了解一下。
我们再回过头来看下,这四种异步编程模式,它们都已经暗含了 “流” 的影子。
首先是 Promise 模式,当 CompletableFuture 使用的执行器,是带队列的线程池时,Promise 异步调用链的过程,在底层就是事件在队列中 “流” 转的过程。
然后是 Actor 模式,每个 Actor 的邮箱就是一个非阻塞的队列,Actor 之间的通信过程,就是消息在这些非阻塞队列之间 “流” 转的过程。
接下来就是 ReactiveX 模式,将自己定义为异步编程库的 ReactiveX,明确地将事件按照 “无限流” 的方式来处理,还实现了 Reactive-Streams 标准,支持反向压力功能。
最后是纤程和协程,Golang 语言明确将 “队列” 作为了异步和并发编程时最主要的通信模式,甚至将“通过通信来共享内存,而不是通过共享内存来进行通信”,作为一种编程哲学思想来进行推崇。
所以,在