一、NiFi概念
NiFi用于处理数据集成场景的数据分发,BS图形化结构。Apache NiFi 它是一个易于处理和分发数据的系统,功能强大可靠。可自动管理系统之间的数据流。它使用高度可配置的指示图来管理数据路由、转换和系统中介逻辑,并支持从各种数据源中动态提取数据。NiFi原来是NSA目前代码已经开源的项目是Apache顶级项目之一的基金会。 NiFi是基于Java的,使用Maven支持包的建设和管理。 NiFi基于Web工作方式,后台调度服务器。用户可以将数据处理定义为一个过程,然后进行处理,NiFi后台有数据处理引擎、任务调度等组件。 简单的说,NiFi它是为了解决不同系统之间的数据自动流通问题而建立的。 虽然dataflow这个术语在各种场景中使用,但我们在这里使用它来表示不同系统之间自动化的可管理信息流。自从企业拥有多个系统以来,一些系统将生成数据,一些系统将消耗数据,并出现不同系统之间的数据流通问题。这些问题的相应解决方案已被广泛研究和讨论,包括企业集成eip是一个全面易用的方案。
- 高可用
- 并发性能高
- 错误纠察
- 快速响应
- 兼容各种数据格式
- 安全性
- 方便迁移
二、NiFi核心组件的概念
- FlowFile Processor:处理FlowFile,逆行逻辑判断、路由、转换等操作。共同属性(是的NiFi生成服务的,不能修改),提取属性,添加自定义属性(UpdateAttribute)、属性路由(RouteOnAttribute)、表达式语言(${filename:toLower():contains(‘r’)})、NiFi表达式使用自定义属性
- Connection:连接处理器。连接不同的连接Processor,充当队列。
- Flow Controller:代理角色,调度促进处理器流文件的交换。
- Process Group:包含一组特定的Processor和Connection,组也可以连接和传输,形成新的组。
三、NiFi关键特性
- 流管理
- 易用性
- 灵活的缩放模型
四、NiFi处理器的类别(NiFi的功能)
数据转换、路由、数据库操作、属性提取、系统交互、数据提取、数据监控和发送、拆分和聚合HTTP。用户必须了解可用的处理器类型,才能创建有效的数据流处理流程。NiFi它包含许多不同的处理器。这些处理器提供了从许多不同系统中提取数据、路由、转换、处理、拆分和聚合数据以及将数据分发到多个系统的功能。 几乎每个NiFi版本中可用的处理器数量正在增加。因此,我们不会试图在这里介绍每个可用的处理器,但我们将重点介绍一些最常用的处理器,并根据其功能进行分类。
数据转换
- CompressContent:压缩或解压
- ConvertCharacterSet:将用于编码内容的字符集从一个字符集转换为另一个字符集
- EncryptContent:加密或解密
- ReplaceText:使用正则表达式修改文本内容
- TransformXml:应用XSLT转换XML内容
- JoltTransformJSON:应用JOLT规范来转换JSON内容
路由和调解
- ControlRate:数据在限制过程中流经某一部分的速率
- DetectDuplicate:根据一些用户定义的标准去监视发现重复的FlowFiles。通常与HashContent一起使用
- DistributeLoad:负载平衡或数据抽样是通过将部分数据分发给每个用户来实现的
- MonitorActivity:当用户定义的时间段过去,没有数据流经此节点时发送通知。(可选)在数据流恢复时发送通知。
- RouteOnAttribute:根据FlowFile属性路由包含FlowFile。
- ScanAttribute:扫描FlowFile检查用户定义的属性集是否与字典匹配。
- RouteOnContent:根据FlowFile内容是否与用户定制的正则表达式相匹配。如果匹配,则FlowFile将路由到已配置的关系。
- ScanContent:搜索用户在流文件内容中定义字典中的术语,并根据这些术语的存在或不存在进行路由。字典可以由文本条目或二进制条目组成。
- ValidateXml:以XML模式验证XML内容; 根据用户定义XML Schema,判断FlowFile的内容是否有效,进而来路由FlowFile。1
数据库访问
- ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令可以传递给PutSQL Processor
- ExecuteSQL:执行用户定义SQL SELECT命令,结果是Avro格式的FlowFile
- PutSQL:通过执行FlowFile内容定义的SQL DDM语句来更新数据库
- SelectHiveQL:对Apache Hive数据库执行用户定义HiveQL SELECT命令,结果是Avro或CSV格式的FlowFile
- PutHiveQL:通过执行FlowFile内容定义的HiveQL DDM语句来更新Hive数据库
属性提取
- EvaluateJsonPath:用户提供JSONPath表达式(类似XPath,用于XML根据分析/提取)JSON用结果值代替这些表达式进行内容评估FlowFile将结果值提取到用户自己命名的内容或Attribute中。
- EvaluateXPath:用户提供XPath表达式,然后根据XML用结果值代替这些表达式进行内容评估FlowFile将结果值提取到用户自己命名的内容或Attribute中。
- EvaluateXQuery:用户提供XQuery查询,然后根据XML本查询的内容评估用结果值代替FlowFile将结果值提取到用户自己命名的内容或Attribute中。
- ExtractText:用户FlowFile评估文本内容,然后提取用户自己命名的结果值Attribute中。
- HashAttribute:串联用户定义的现有属性列表hash。
- HashContent:对FlowFile的内容进行hash,并将得到的hash值添加到Attribute中。
- IdentifyMimeType:评估FlowFile确定内容FlowFile包装文件类型。这个处理器可以检测到许多不同的MIME类型,如图像、文本处理器文档、文本和压缩格式,只有几例。
- UpdateAttribute:向FlowFile添加或更新任何数量的用户定义属性。这对添加静态属性值和使用表达语言动态计算的属性值非常有用。处理器还提供"高级用户界面(Advanced User Interface)",允许用户根据用户提供的规则更新属性。
系统交互
- ExecuteProcess:操作用户自定义的操作系统命令。进程的StdOut为了重定向StdOut内容输出为FlowFile的内容。该处理器是源处理器(不接受数据流输入,无上游组件) - 其输出预计将产生新的FlowFile,并且系统调用不会接收任何输入。为过程提供输入,请使用ExecuteStreamCommand Processor。
- ExecuteStreamCommand:操作系统命令由用户定义。FlowFile流式传输到过程的内容可选StdIn。StdOut内容输出为FlowFile内容。此处理器不能用作源处理器 - 必须传入FlowFiles才能执行。
数据提取
- GetFile:流式将文件内容从本地磁盘(或网络连接的磁盘)传输到NiFi,然后删除原始文件。该处理器应将文件从一个位置移动到另一个位置,而不是复制数据。
- GetFTP:通过FTP下载远程文件的内容NiFi然后删除原始文件。该处理器应将文件从一个位置移动到另一个位置,而不是复制数据。
- GetSFTP:通过SFTP下载远程文件的内容NiFi然后删除原始文件。该处理器应将文件从一个位置移动到另一个位置,而不是复制数据。
- GetJMSQueue:从JMS队列下载消息,并根据JMS创建新闻内容FlowFile。可选地,JMS属性也可以复制为属性。
- GetJMSTopic:从JMS主题下载消息,并根据JMS创建新闻内容FlowFile。可选地,JMS属性也可以复制为属性。该处理器支持持久订阅和非持久订阅。
- GetHTTP:将基于HTTP或HTTPS的远程URL下载请求内容NiFi记住处理器ETag和Last-Modified Date,确保数据不会持续摄入。
- LisenHTTP:启动HTTP(或HTTPS)服务器并侦听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应。
- ListenUDP:侦听传入的UDP数据包,并为每个数据包或每个数据包创建一个FlowFile(取决于配置),并将FlowFile发送到"success"。
- GetHDFS:监视HDFS中用户指定的目录。每当新文件进入HDFS时,它将被复制到NiFi并从HDFS中删除。此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。如果在集群中运行,此处理器需仅在主节点上运行。要从HDFS复制数据并使其保持原状,或者从群集中的多个节点流式传输数据,请参阅ListHDFS处理器。
- ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定的目录,并发出一个FlowFile,其中包含它遇到的每个文件的文件名。然后,它通过分布式缓存在整个NiFi集群中保持此状态。然后可以在集群中,将其发送到FetchHDFS处理器,后者获取这些文件的实际内容并发出包含从HDFS获取的内容的FlowFiles。
- GetKafka:从Apache Kafka获取消息,特别是0.8.x版本。消息可以作为每个消息的FlowFile发出,也可以使用用户指定的分隔符一起进行批处理。
- GetMongo:对MongoDB执行用户指定的查询,并将内容写入新的FlowFile。
数据出口/发送数据
- PutEmail:向配置的收件人发送电子邮件。FlowFile的内容可选择作为附件发送。
- PutFile:将FlowFile的内容写入本地(或网络连接)文件系统上的目录。
- PutFTP:将FlowFile的内容复制到远程FTP服务器。
- PutSFTP:将FlowFile的内容复制到远程SFTP服务器。
- PutJMS:将FlowFile的内容作为JMS消息发送到JMS代理,可选择将Attributes添加JMS属性。
- PutSQL:将FlowFile的内容作为SQL DDL语句(INSERT,UPDATE或DELETE)执行。FlowFile的内容必须是有效的SQL语句。属性可以用作参数,FlowFile的内容可以是参数化的SQL语句,以避免SQL注入攻击。
- PutKafka:将FlowFile的内容作为消息发送到Apache Kafka,特别是0.8.x版本。FlowFile可以作为单个消息或分隔符发送,例如可以指定换行符,以便为单个FlowFile发送许多消息。
- PutMongo:将FlowFile的内容作为INSERT或UPDATE发送到Mongo。
分裂和聚合
- SplitText:SplitText接收单个FlowFile,其内容为文本,并根据配置的行数将其拆分为1个或多个FlowFiles。例如,可以将处理器配置为将FlowFile拆分为多个FlowFile,每个FlowFile只有一行。
- SplitJson:允许用户将包含数组或许多子对象的JSON对象拆分为每个JSON元素的FlowFile。
- SplitXml:允许用户将XML消息拆分为多个FlowFiles,每个FlowFiles包含原始段。这通常在多个XML元素与"wrapper"元素连接在一起时使用。然后,此处理器允许将这些元素拆分为单独的XML元素。
- UnpackContent:解压缩不同类型的存档格式,例如ZIP和TAR。然后,归档中的每个文件都作为单个FlowFile传输。
- SegmentContent:根据某些已配置的数据大小将FlowFile划分为可能的许多较小的FlowFile。不对任何类型的分界符执行拆分,而是仅基于字节偏移执行拆分。这是在传输FlowFiles之前使用的,以便通过并行发送许多不同的部分来提供更低的延迟。而另一方面,MergeContent处理器可以使用碎片整理模式重新组装这些FlowFiles。
- MergeContent:此处理器负责将许多FlowFiles合并到一个FlowFile中。可以通过将其内容与可选的页眉,页脚和分界符连接在一起,或者通过指定存档格式(如ZIP或TAR)来合并FlowFiles。FlowFiles可以根据公共属性进行分箱(binned),或者如果这些流是被其他组件拆分的,则可以进行"碎片整理(defragmented)"。根据元素的数量或FlowFiles内容的总大小(每个bin的最小和最大大小是用户指定的)并且还可以配置可选的Timeout属性,即FlowFiles等待其bin变为配置的上限值最大时间。
- SplitContent:将单个FlowFile拆分为可能的许多FlowFile,类似于SegmentContent。但是,对于SplitContent,不会在任意字节边界上执行拆分,而是指定要拆分内容的字节序列。
HTTP
- GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保不会持续摄取数据。
- ListenHTTP:启动HTTP(或HTTPS)服务器并侦听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应。
- InvokeHTTP:执行用户配置的HTTP请求。此处理器比GetHTTP和PostHTTP更通用,但需要更多配置。此处理器不能用作源处理器,并且需要具有传入的FlowFiles才能被触发以执行其任务。
- PostHTTP:执行HTTP POST请求,将FlowFile的内容作为消息正文发送。这通常与ListenHTTP结合使用,以便在无法使用s2s的情况下在两个不同的NiFi实例之间传输数据(例如,节点无法直接访问并且能够通过HTTP进行通信时代理)。 注意:除了现有的RAW套接字传输之外,HTTP还可用作s2s传输协议。它还支持HTTP代理。建议使用HTTP s2s,因为它更具可扩展性,并且可以使用具有更好用户身份验证和授权的输入/输出端口的方式来提供双向数据传输。
- HandleHttpRequest / HandleHttpResponse:HandleHttpRequest Processor是一个源处理器,与ListenHTTP类似,启动嵌入式HTTP(S)服务器。但是,它不会向客户端发送响应(比如200响应)。相反,流文件是以HTTP请求的主体作为其内容发送的,所有典型servlet参数、头文件等的属性作为属性。然后,HandleHttpResponse能够在FlowFile完成处理后将响应发送回客户端。这些处理器总是彼此结合使用,并允许用户在NiFi中可视化地创建Web服务。这对于将前端添加到非基于Web的协议或围绕已经由NiFi执行的某些功能(例如数据格式转换)添加简单的Web服务特别有用。
五、使用属性
每个FlowFile都拥有多个属性,这些属性将在FlowFile的生命周期中发生变化。FlowFile的概念非常强大,并提供三个主要优点。
- 首先,它允许用户在流中做出路由决策,以便满足某些条件的FlowFiles可以与其他FlowFiles进行不同地处理。这可以由RouteOnAttribute和其他类似的处理器完成的。
- 其次,利用属性配置处理器:处理器的配置依赖于数据本身。例如,PutFile能够使用Attributes来知道每个FlowFile的存储位置,而每个FlowFile的目录和文件名属性可能不同(结合表达式语言,比如每个流都有filename属性,组件中就可以这样配置文件名:${filename},就可以获取到当前FlowFIle中filename的属性值)。
- 最后,属性提供了有关数据的极有价值的上下文。在查看FlowFile的Provenance数据时非常有用,它允许用户搜索符合特定条件的Provenance数据,并且还允许用户在检查Provenance事件的详细信息时查看此上下文。通过简单地浏览该上下文,用户能够知道为什么以这样或那样的方式处理数据。
共同属性
每个FlowFile都有一组属性:
- filename:可用于将数据存储到本地或远程文件系统的文件名。
- path:可用于将数据存储到本地或远程文件系统的目录的名称。
- uuid:一个通用唯一标识符,用于区分FlowFile与系统中的其他FlowFiles。
- entryDate:FlowFile进入系统的日期和时间(即已创建)。此属性的值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。
- lineageStartDate:任何时候克隆,合并或拆分FlowFile,都会导致创建子FlowFile。该值表示当前FlowFile最早的祖先进入系统的日期和时间。该值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。
- fileSize:此属性表示FlowFile内容占用的字节数。
需要注意的是uuid,entryDate,lineageStartDate,和fileSize属性是系统生成的,不能改变。
提取属性
NiFi提供了几种不同的处理器,用于从FlowFiles中提取属性。我们在之前的处理器分类中已经提到过。这是构建自定义处理器的一个非常常见的用例,其实编写处理器是为了理解特定的数据格式,并从FlowFile的内容中提取相关信息,创建属性来保存该信息,以便可以决定如何路由或处理数据。
添加用户自定义的属性
NIFI除了提供能够将特定信息从FlowFile内容提取到属性中的处理器之外,NIFI还允许用户将自定义属性添加到每个FlowFile中的特定位置。UpdateAttribute就是专为此目的而设计。用户可以通过单击属性选项卡右上角的+按钮,在配置对话框中向处理器添加新属性。然后UI会提示用户输入属性的名称,然后输入值。对于此UpdateAttribute处理的每个FlowFile,都会添加用户自定义属性。Attribute的名称将与添加的属性的名称相同。
属性的值也可以包含表达式语言。这样就允许基于其他属性修改或添加属性。例如,如果我们想要将处理文件的主机名和日期添加到文件名之前,我们可以通过添加 h o s t n a m e ( ) − {hostname()}- hostname()−{now():format(‘yyyy-dd-MM’)}-${filename}来实现来实现。刚开始大家可能不太理解这是什么意思,在后续的课程中我们会进行讲解。
除了添加一组自定义的属性外,UpdateAttribute还具有一个高级UI,允许用户配置一组规则,以便在何时添加属性。要访问此功能,请在配置对话框的属性选项卡中,单击Advanced对话框底部的按钮。将弹出此处理器特定的UI界面。在此UI中,用户可以配置规则引擎,实质上是指定必须匹配的规则,以便将已配置的属性添加到FlowFile。
属性路由
NiFi最强大的功能之一是能够根据属性路由FlowFiles。执行此操作的主要机制是RouteOnAttribute。此处理器与UpdateAttribute一样,通过添加用户自定义的属性进行配置。通过单击处理器的配置对话框中属性选项卡右上角的+按钮,可以添加任意数量的属性。
每个FlowFile的属性将与配置的属性进行比较,以确定FlowFile是否满足指定的条件。每个属性的值应该是一个表达式语言并返回一个布尔值。下面的【表达式语言/在Property值中使用attribute】会对表达式语言进行补充。
在评估针对FlowFile的属性提供的表达式语言之后,处理器根据所选的路由策略确定如何路由FlowFile。最常见的策略是"Route to Property name"策略。选择此策略后,处理器将为配置的每个属性公开关系(可拖拽出去指向下一个处理器)。如果FlowFile的属性满足给定的表达式,则FlowFile的副本将路由到相应的Relationship。例如,如果我们有一个名为"begin-with-r"的新属性和值"$ {filename:startsWith(‘r’)}"的表达式,那么任何文件名以字母’r’开头的FlowFile将是路由到那个关系。所有其他FlowFiles将被路由到"unmatched"关系。
表达式语言/在Property值中使用attribute
当我们从FlowFiles的内容中提取属性并添加用户定义的属性时,除非我们有一些可以使用它们的机制,否则它们不会作为运算符进行计算。NiFi表达式语言允许我们在配置流时访问和操作FlowFile属性值。并非所有处理器属性都允许使用表达式语言,但很多处理器都可以。为了确定属性是否支持表达式语言,用户可以将鼠标悬停在处理器配置对话框的属性选项卡中的[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jcq68veG-1622119703458)(./images/问号.png)]图标上,然后会有一个提示,显示属性的描述,默认值(如果有)以及属性是否支持表达式语言。
对于支持表达式语言的属性,可以通过在 开始标记 ${ 和结束标记 } 中添加表达式来使用它。表达式可以像属性名一样简单。例如,要引用uuid Attribute,我们可以简单地使用 u u i d 。 如 果 属 性 名 称 以 字 母 以 外 的 任 何 字 符 开 头 , 或 者 包 含 除 数 字 , 字 母 , 句 号 ( . ) 或 下 划 线 ( ) 以 外 的 字 符 , 则 需 要 加 引 号 。 例 如 , {uuid}。如果属性名称以字母以外的任何字符开头,或者包含除数字,字母,句号(.)或下划线(_)以外的字符,则需要加引号。例如, uuid。如果属性名称以字母以外的任何字符开头,或者包含除数字,字母,句号(.)或下划线()以外的字符,则需要加引号。例如,{My Attribute Name} 将无效,但${‘My Attribute Name’}将引用属性My Attribute Name。
除了引用属性值之外,我们还可以对这些属性执行许多功能和比较。例如,如果我们想检查filename属性是否不分大小写(大写或小写)地包含字母’r’,我们可以使用表达式来完成${filename:toLower():contains(‘r’)}。请注意,函数由冒号分隔。我们可以将任意数量的函数链接在一起,以构建更复杂的表达式。重要的是要明白,即使我们正在调用filename:toLower(),这也不会改变filename属性的值,而只是返回给我们一个新的值。
我们也可以在一个表达式中嵌入另一个表达式。例如,如果我们想要将attr1 Attribute 的值与attr2 Attribute的值进行比较,我们可以使用以下表达式来执行此操作:${attr1:equals( ${attr2} )}。
表达式语言包含许多不同的函数,官方文档Expression Language Guide。
此外,此表达式语言指南内置于应用程序中,以便用户可以轻松查看哪些功能可用,并在输入时查看其文档。设置支持表达式语言的属性的值时,如果光标位于表达式语言的开始和结束标记内,则在关键字上按 Ctrl + Space 将弹出所有可用的函数(快捷键冲突被占用会无法使用此功能),并将提供自动填充的功能。单击或使用键盘上下键指向弹出窗口中列出的某个功能会有提示,提示解释了该功能的作用,它所期望的参数以及函数的返回类型。
表达式语言中的自定义属性
除了使用FlowFile属性外,还可以定义表达式语言使用的自定义属性。定义自定义属性为处理和配置数据流提供了额外的灵活性。
六、常用处理器
: 执行脚本处理器, 支持: clojure, ecmascript, groovy, lua, python, ruby
: 数据库查询处理器, 支持: mysql
: avro 数据格式转换为 json
: 将JSON文件拆分为多个单独的FlowFiles, 用于由JsonPath表达式指定的数组元素。
: 根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。
: 文本组装与替换, 支持正则表达式
: 将FlowFile数据写入Hadoop分布式文件系统(HDFS)
: 执行hive ddl/dml命令, 如: insert, update
: 根据配置将消息发送到kafka topic
: 执行hive select 语句并获取结果
: 执行SQL的insert或update命令
: 从目录中的文件创建FlowFiles。
: 将FlowFile数据写入文件
: 从Hadoop分布式文件系统获取文件
: 从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括INSERT,UPDATE,DELETE操作。事件作为单个流文件输出,这些文件按操作发生的时间排序。
: 一般用于执行sh脚本
七、监控NiFi
当数据在NiFi中流经您的数据流处理流程时,了解您的系统执行情况以评估您是否需要更多资源以及评估当前资源的运行状况非常重要。NiFi提供了一些监控系统的机制。
状态栏
在组件工具栏下的NiFi屏幕顶部附近有一个条形,称为状态栏。它包含一些关于NiFi当前健康状况的重要统计数据。活动线程的数量可以指示NiFi当前的工作状态,排队统计数据表示当前在整个流程中排队的FlowFile数量以及这些FlowFiles的总大小。
如果NiFi实例位于群集中,我们还会在此处看到一个指示器,告诉我们群集中有多少节点以及当前连接的节点数。在这种情况下,活动线程的数量和队列大小指示当前连接的所有节点的所有总和。
组件统计
画布上的每个处理器,进程组(Group)和远程进程组都提供了有关组件处理了多少数据的若干统计信息。这些统计信息提供有关在过去五分钟内处理了多少数据的信息。这是一个滚动窗口,允许我们查看处理器消耗的FlowFiles数量,以及处理器发出的FlowFiles数量。
处理器之间的连接还会显示当前排队的项目数。
查看这些指标的历史值以及(如果是群集的)不同节点相互比较也可能很有价值。我们可以右键单击组件并选择Stats菜单项查看此信息,nifi会向我们展示一个图表,该图表涵盖自NiFi启动以来的时间,或最多24小时,以较少者为准(通过更改属性文件中的配置,可以扩展或减少此处显示的时间量)
在此对话框的右上角有一个下拉列表,允许用户选择他们正在查看的指标。底部的图表允许用户选择图表的较小部分进行放大。
公告
除了为每个组件提供的统计信息之外,用户还想知道流程是否出现问题。虽然我们可以监视日志中的任何内容,但在屏幕上弹出通知会更方便。如果处理器将日志级别设置为WARNING或ERROR,我们将在处理器的右上角看到"Bulletin Indicator"。此指示器看起来像一个粘滞便笺,将在事件发生后持续显示五分钟。将鼠标悬停在公告上可提供有关所发生情况的信息,以便用户无需筛选日志消息即可查找。如果是在集群中,公告还会指示是集群中的哪个节点发布了公告。我们还可以在处理器的"配置"对话框的"设置"选项卡中更改公告的日志级别。
如果框架发布了公告,我们还会在屏幕右上方突出显示公告指示符。在全局菜单中是公告板选项(Bulletin Board)。单击此选项我们将看到公告板,在那里我们可以看到NiFi实例中出现的所有公告,并可以根据组件,消息等进行过滤。
八、数据来源
NiFi对其摄取的每个数据保持非常精细的细节。当数据通过系统处理并被转换,路由,拆分,聚合和分发到其他端点时,这些信息都存储在NiFi的Provenance Repository中。为了搜索和查看此信息,我们可以从全局菜单中选择数据源(Data Provenance)。会弹出一个表格,列出我们搜索过的Provenance事件:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hhChdR96-1622119768301)(./images/provenance-table.png)]
此表列出了最近发生的1,000个Provenance事件(尽管事件发生后可能需要几秒钟才能处理信息)。在此对话框中,有一个Search按钮,允许用户搜索特定处理器发生的事件,按文件名或UUID或其他几个字段搜索特定的FlowFile。在nifi.properties文件中提供了配置这些属性中的哪些属性可编入索引或可作搜索条件的功能。此外,配置文件还允许您指定将被索引的FlowFile属性。因此,您可以指定哪些属性对您的特定数据流很重要,并使这些属性可搜索。
事件详情
一旦我们执行了搜索,我们的表格将仅展示与搜索条件匹配的事件。在这里,我们可以选择细节图标[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4QuQG0E6-1622119768303)()]来查看该事件的详细信息:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tWG2lnSQ-1622119768305)(./images/event-details.png)]
在这里,我们可以确切地看到该事件发生的时间,事件影响的FlowFile,事件执行的组件(处理器等),事件花费的时间以及事件发生时NiFi数据的总体时间(总潜伏期)。
下一个选项卡提供事件发生时FlowFile上存在的所有属性的列表:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W5PPL3dg-1622119768306)(./images/event-attributes.png)]
在这里,我们可以看到事件发生时FlowFile上存在的所有属性,以及这些属性的先前值。我们可以知道哪些属性因此事件而发生变化以及它们如何变化。此外,在右侧角是一个复选框,允许用户仅查看那些已更改的属性。如果FlowFile只有少量属性,这可能不是特别有用,但当FlowFile有数百个属性时,它可能非常有用。
这非常重要,因为它允许用户理解FlowFile处理的确切上下文,对理解FlowFile的处理逻辑是有帮助的,特别是在使用表达式语言配置处理器时。
最后,还有Content选项卡:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QAMuHHdv-1622119768312)(./images/event-content.png)]
此选项卡向我们提供有关存储FlowFile content的内容存储库位置的信息。如果事件修改了FlowFile的内容,我们将看到’input claim和’outputclaim’。如果数据格式是NiFi了可以识别的可以呈现的数据格式,我们可以选择下载或查看NiFi内部的内容。
此外,在选项卡的重播部分,有一个Replay按钮,允许用户将FlowFile重新插入到流中,并从事件发生的时间点重新处理它。这提供了一种非常强大的机制,因为我们能够实时修改流程,重新处理FlowFile,然后查看结果。如果它们不符合预期,我们可以再次修改流程,并再次重新处理FlowFile。我们能够执行流程的迭代开发,直到它完全按照预期处理数据。
谱系图
除了查看Provenance事件的详细信息之外,我们还可以通过单击视图中的Lineage图标[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IzCedhrQ-1622119768313)()]来查看所涉及的FlowFile的血缘关系。
这为我们提供了一个图形表示,说明了在遍历系统时该数据发生了什么:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2PmtXlaD-1622119768316)(./images/lineage-graph-annotated.png)]
在这里,我们可以右键单击任何事件,然后单击View Details菜单项以查看【事件详情】。此图形表示向我们准确显示了数据发生的事件。有一些"特殊"事件类型需要注意。如果我们看到JOIN,FORK或CLONE事件,我们可以右键单击并选择Find Parents或Expand。这允许我们查看父FlowFiles和创建的子FlowFiles的血缘关系。
左下角的滑块允许我们查看这些事件发生的时间。通过左右滑动,我们可以看到哪些事件花费了较长的时间,这样我们可以分析瓶颈,得知哪些节点需要更多资源,例如配置处理器的并发任务数。它也可能揭示其他信息,例如,大多数延迟是由JOIN事件引入的,我们在等待更多的FlowFiles连接在一起。在任何一种情况下,都能够轻松查看数据处理发生的位置是一项非常强大的功能,可帮助用户了解企业的运营方式。
九、术语
:DataFlow Manager(DFM)是NiFi用户,具有添加,删除和修改NiFi数据流组件的权限。
:FlowFile代表NiFi中的单个数据。FlowFile由两个组件组成:FlowFile属性(attribute)和FlowFile内容(content)。内容是FlowFile表示的数据。属性是提供有关数据的信息或上下文的特征,它们由键值对组成。所有FlowFiles都具有以下标准属性:
- uuid:一个通用唯一标识符,用于区分FlowFile与系统中的其他FlowFiles
- filename:在将数据存储到磁盘或外部服务时可以使用的可读文件名
- path:在将数据存储到磁盘或外部服务时可以使用的分层结构值,以便数据不存储在单个目录中
:处理器是NiFi组件,用于监听传入数据、从外部来源提取数据、将数据发布到外部来源、路由,转换或提取FlowFiles。
:每个处理器都为其定义了零个或多个关系。命名这些关系以指示处理FlowFile的结果含义。处理器处理完FlowFile后,它会将FlowFile路由(传输)到其中一个关系。DFM能够将每一个关系连接到其他组件,以指定FlowFile应该在哪里进行下一步处理。
:DFM通过将组件从NiFi工具栏的Components部分拖动到画布,然后通过Connections将组件连接在一起来创建自动的数据处理流程。每个连接由一个或多个关系组成。对于每个Connection,DFM都可以为其确定使用哪些关系。这样我们可以基于其处理结果的不同来以不同的方式路由数据。每个连接都包含一个FlowFile队列。将FlowFile传输到特定关系时,会将其添加到属于当前Connection的队列中。
:控制器服务是扩展点,在用户界面中由DFM添加和配置后,将在NiFi启动时启动,并提供给其他组件(如处理器或其他控制器服务)需要的信息。常见Controller Service比如StandardSSLContextService,它提供了一次配置密钥库和/或信任库属性的能力,并在整个应用程序中重用该配置。我们的想法是,控制器服务不是在每个可能需要它的处理器中配置这些信息,而是根据需要为任何处理器提供。
:漏斗是一个NiFi组件,用于将来自多个Connections的数据合并到一个Connection中。
:当数据流变得复杂时,在更高,更抽象的层面上推断数据流是很有用的。NiFi允许将多个组件(如处理器)组合到一个过程组中。然后,DFM可以在NiFi用户界面轻松地将多个流程组连接到逻辑数据处理流程中,DFM还可以进入流程组查看和操作流程组中的组件。
:使用一个或多个进程组构建的数据流需要一种方法将进程组连接到其他数据流组件。这是通过使用Ports实现的。DFM可以向进程组添加任意数量的输入端口和输出端口,并相应地命名这些端口。
:正如数据传输进出进程组一样,有时需要将数据从一个NiFi实例传输到另一个NIFI实例。虽然NiFi提供了许多不同的机制来将数据从一个系统传输到另一个系统,但是如果将数据传输到另一个NiFi实例,远程进程组通常是实现此目的的最简单方法。
:NiFi用户界面提供了大量有关应用程序当前状态的监视和反馈。除了滚动统计信息和为每个组件提供的当前状态之外,组件还能够报告公告。每当组件报告公告时,该组件上都会显示公告图标(处理器右上角红色的图标)。系统级公告显示在页面顶部附近的状态栏上。使用鼠标悬停在该图标上将提供一个工具提示,显示公告的时间和严重性(Debug, Info, Warning, Error)以及公告的消息。也可以在全局菜单中的公告板页面中查看和过滤所有组件的公告。
:通常,DataFlow由许多可以重用的组件组成。NiFi允许DFM选择DataFlow的一部分(或整个DataFlow)并创建模板。此模板具有名称,然后可以像其他组件一样拖动到画布上。最终,可以将若干组件组合在一起以形成更大的构建块,然后从该构建块创建数据流处理流程。这些模板也可以导出为XML并导入到另一个NiFi实例中,从而可以共享这些构建块。
:DFM放入NiFi用户界面画布的所有内容都实时写入一个名为flow.xml.gz的文件。该文件默认位于conf目录中。在画布上进行的任何更改都会自动保存到此文件中,而无需用户单击保存按钮。此外,NiFi在更新时会自动在归档目录中创建此文件的备份副本。您可以使用这些归档文件来回滚配置,如果想要回滚,先停止NiFi,将flow.xml.gz替换为所需的备份副本,然后重新启动NiFi。在集群环境中,停止整个NiFi集群,替换其中一个节点的flow.xml.gz,删除自其他节点的flow.xml.gz,然后重新启动该节点。确认此节点启动为单节点集群后,然后启动其他节点。替换的流配置将在集群中同步。flow.xml.gz的名称和位置以及自动存档行为是可配置的。
十、Linux配置优化
如果您在Linux上运行,请考虑这些最佳实践。典型的Linux默认设置不一定能够满足像NiFi这样的IO密集型应用程序的需求。对于这些最佳实践,NIFI所在的Linux发行版的实际情况可能会有所不同,可以参考下面的介绍,但是请参考特定发行版的文档。
NiFi在任何时候都可能会打开非常大量的文件句柄。通过编辑 /etc/security/limits.conf 来增加限制,添加类似的内容
* hard nofile 50000 * soft nofile 50000
NiFi可以配置生成大量的线程。要增加Linux允许的数量,请编辑 /etc/security/limits.conf
* hard nproc 10000 * soft nproc 10000
你的发行版Linux可能需要通过添加来编辑 /etc/security/limits.d/20-nproc.conf
* soft nproc 10000
如果你的流程会在很短的时间内设置并拆除大量socket,这一点尤为重要。
sudo sysctl -w net.ipv4.ip_local_port_range ="10000 65000"
考虑到你希望能够快速设置和拆卸新套接字,你不希望您的套接字停留太长时间。最好多阅读一下并调整类似的东西
sudo sysctl -w net.ipv4.netfilter.ip_conntrack_tcp_timeout_time_wait ="1"
对于某些应用程序来说,swapping非常棒。对于像NiFi一样想要运行的程序并不好。要告诉Linux你想关掉swapping,你可以编辑 /etc/sysctl.conf 来添加以下行
vm.swappiness = 0
对于处理各种NiFi repos的分区,请关闭诸如atime
之类的选项。这样做会导致吞吐量的惊人提高。编辑/etc/fstab
文件,对于感兴趣的分区,添加noatime
选项。
比如我要在根文件系统使用noatime,可以编辑/etc/fstab文件,如下:
/dev/mapper/centos-root / xfs defaults,noatime 0 0 UUID=47f23406-2cda-4601-93b6-09030b30e2dd /boot xfs defaults 0 0 /dev/mapper/centos-swap swap swap defaults 0 0
修改后重新挂载
mount -o remount / 或者 mount -o remount /boot
十一、NIFI集群
为什么集群?
DFM可能会发现在单个服务器上使用一个NiFi实例不足以处理他们拥有的数据量。因此,一种解决方案是在多个NiFi服务器上运行相同的数据流。但是,这会产生管理问题,因为每次DFM想要更改或更新数据流时,他们必须在每个服务器上进行这些更改,然后逐个监视每个服务器。而集群NiFi服务器,可以增加处理能力同时,支持单接口控制,通过该接口可以更改整个集群数据流并监控数据流。集群允许DFM只进行一次更改,然后将更改的内容复制到集群的所有节点。通过单一接口,DFM还可以监视所有节点的健康状况和状态。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ajJGIJEJ-1622119880962)(.\images\nifi集群架构.png)]
零主集群
NiFi采用Zero-Master Clustering范例。集群中的每个节点都对数据执行相同的任务,但每个节点都在不同的数据集上运行。其中一个节点自动被选择(通过Apache ZooKeeper)作为集群协调器。然后,集群中的所有节点都会向此节点发送心跳/状态信息,并且此节点负责断开在一段时间内未报告任何心跳状态的节点。此外,当新节点选择加入集群时,新节点必须首先连接到当前选定的集群协调器,以获取最新的流。如果集群协调器确定允许该节点加入(基于其配置的防火墙文件),则将当前流提供给该节点,并且该节点能够加入集群,假设节点的流副本与集群协调器提供的副本匹配。如果节点的流配置版本与集群协调器的版本不同,则该节点将不会加入集群。
术语
NiFi Clustering是独一无二的,有自己的术语。在设置集群之前了解以下术语非常重要:
:NiFi集群协调器是NiFi集群中的节点,负责管理集群中允许执行任务的节点,并为新加入的节点提供最新的数据流量。当DataFlow Manager管理集群中的数据流时,可以通过集群中任何节点的用户界面执行此操作。然后,所做的任何更改都将复制到集群中的所有节点。
:每个集群由一个或多个节点组成。节点执行实际的数据处理。
:每个集群都有一个主节点。在此节点上,可以运行"隔离处理器"(见下文)。ZooKeeper用于自动选择主节点。如果该节点由于任何原因断开与集群的连接,将自动选择新的主节点。用户可以通过查看用户界面的"集群管理"页面来确定当前选择哪个节点作为主节点。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aCuQhTJU-1622119880965)(.\images\集群管理页面.png)]
:在NiFi集群中,相同的数据流会在所有节点上运行。但是,可能存在DFM不希望每个处理器在每个节点上运行的情况。最常见的情况是使用的处理器存在与外部服务进行通信得的情况。例如,GetSFTP处理器从远程目录中提取。如果GetSFTP处理器在集群中的每个节点上运行并同时尝试从同一个远程目录中提取,则可能存在重复读取。因此,DFM可以将主节点上的GetSFTP配置为独立运行,这意味着它仅在该节点上运行。通过适当的数据流配置,它可以提取数据并在集群中的其余节点之间对其进行负载平衡。注意,虽然存在此功能,但仅使用独立的NiFi实例来提取数据并将其输出内容分发给集群也很常见。它仅取决于可用资源以及管理员配置集群的方式。
:节点通过"心跳"将其健康状况和状态传达给当前选定的集群协调器,这使协调器知道它们仍然处于连接状态并正常工作。默认情况下,节点每5秒发出一次心跳,如果集群协调器在40秒内没有从节点收到心跳,则由于"缺乏心跳"而断开节点。5秒设置可在_nifi.properties_文件中配置。集群协调器断开节点的原因是协调器需要确保集群中的每个节点都处于同步状态,并且如果没有定期收听到节点,协调器无法确定它是否仍与其余节点同步。如果在40秒后节点发送新的心跳,协调器将自动把请求节点重新加入集群。一旦接收到心跳,由于心跳不足导致的断开连接和重新连接信息都会报告给用户界面中的DFM。
集群安装
环境基础
1、系统:CentOS 7.4
2、Java环境:JDK8
使用NiFi集成的zookeeper
NiFi依赖于ZooKeeper以实现集群配置。但是,在有些环境中,部署了NiFi,而没有维护现有的ZooKeeper集合。为了避免强迫管理员维护单独的ZooKeeper实例的负担,NiFi提供了嵌入式ZooKeeper服务器的选项。
nifi.state.management.embedded.zookeeper.start |
指定此NiFi实例是否应运行嵌入式ZooKeeper服务器 |
nifi.state.management.embedded.zookeeper.properties |
如果nifi.state.management.embedded.zookeeper.start 设置为true ,则要提供使用的ZooKeeper属性的属性文件 |
通过设置 nifi.properties 中的nifi.state.management.embedded.zookeeper.start
属性为true
来运行嵌入式的ZooKeeper服务器。
通常建议在3或5个节点上运行ZooKeeper。在少于3个节点上运行可在遇到故障时提供较低的耐用性。在超过5个节点上运行通常会产生不必要的网络流量。此外,在4个节点上运行ZooKeeper并不会比在3个节点上运行有优势,ZooKeeper要求大多数节点处于活动状态才能运行。
如果nifi.state.management.embedded.zookeeper.start
属性设置为true
,则 nifi.properties 中的nifi.state.management.embedded.zookeeper.properties
属性也需要设置。它用来指定要使用的ZooKeeper属性文件。这个属性文件至少需要配置ZooKeeper的服务器列表。另注意,由于ZooKeeper将侦听这些端口,因此可能需要将防火墙配置为打开这些端口。默认值为2181
,但可以通过_zookeeper.properties_文件中的_clientPort_属性进行配置。
使用嵌入式ZooKeeper时,/ _conf / zookeeper.properties_文件具有名为dataDir
的属性。默认情况下,此值为./state/zookeeper
。如果多个NiFi节点正在运行嵌入式ZooKeeper,则必须告知服务器它是哪一个。通过创建名为_myid_的文件 并将其放在ZooKeeper的数据目录中来实现。此文件的内容应该是不同服务器的唯一索引值。因此,对于某一个ZooKeeper服务器,我们将通过执行以下命令来完成此任务:
cd $NIFI_HOME
mkdir state
mkdir state/zookeeper
echo 1 > state/zookeeper/myid
对于将运行ZooKeeper的下一个NiFi节点,我们可以通过执行以下命令来实现此目的:
cd $NIFI_HOME
mkdir state
mkdir state/zookeeper
echo 2 > state/zookeeper/myid
我们采用三个节点的集群,且在一台机器上搭建,所以不同节点的端口会不同,如果搭建在三台机器上,IP不同,那么端口可以相同。
上传资料中提供的nifi-1.9.2-bin.tar.gz文件到服务器的/export/download目录下,并进行解压:
tar -zxvf nifi-1.9.2-bin.tar.gz
移动并复制,共三个副本。
mv nifi-1.9.2 ../soft/nifi-1.9.2-18001
cd ../soft
cp -r nifi-1.9.2-18001/ nifi-1.9.2-18002
cp -r nifi-1.9.2-18001/ nifi-1.9.2-18003
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HloeLu0b-1622119880967)(.\images\目录结构.png)]
# zk客户端连接接口:1节点12181,2节点12182,1节点12183
clientPort=12181
# 不同服务的IP和选举端口号
server.1=192.168.52.150:12888:13888
server.2=192.168.52.150:14888:15888
server.3=192.168.52.150:16888:17888
1
####################
# State Management #
####################
nifi.state.management.configuration.file=./conf/state-management.xml
nifi.state.management.provider.local=local-provider
nifi.state.management.provider.cluster=zk-provider
# 指定此NiFi实例是否应运行嵌入式ZooKeeper服务器,默认是false
nifi.state.management.embedded.zookeeper.start=true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# web properties #
nifi.web.war.directory=./lib
# HTTP主机。默认为空白
nifi.web.http.host=192.168.52.150
# HTTP端口。默认值为8080;修改为18001、18002、18003
nifi.web.http.port=18001
# cluster node properties (only configure for cluster nodes) #
# 如果实例是群集中的节点,请将此设置为true。默认值为false
nifi.cluster.is.node=true
# 节点的完全限定地址。默认为空白
nifi.cluster.node.address=192.168.52.150
# 节点的协议端口。默认为空白,修改为:28001、28002、28003
nifi.cluster.node.protocol.port=28001
# 指定在选择Flow作为“正确”流之前等待的时间量。如果已投票的节点数等于nifi.cluster.flow.election.max.candidates属性指定的数量,则群集将不会等待这么长时间。默认值为5 mins
nifi.cluster.flow.election.max.wait.time=1 mins
# 指定群集中所需的节点数,以便提前选择流。这允许群集中的节点避免在开始处理之前等待很长时间,如果我们至少达到群集中的此数量的节点
nifi.cluster.flow.election.max.candidates=1
# cluster load balancing properties #
nifi.cluster.load.balance.host=
# 修改为:16342、26342、36342
nifi.cluster.load.balance.port=16342
# zookeeper properties, used for cluster management #
# 连接到Apache ZooKeeper所需的连接字符串。这是一个以逗号分隔的hostname:port对列表
nifi.zookeeper.connect.string=192.168.52.150:12181,192.168.52.150:12182,192.168.52.150:12183
nifi.zookeeper.connect.timeout=3 secs
nifi.zookeeper.session.timeout=3 secs
nifi.zookeeper.root.node=/nifi
节点2,节点3内容跟节点1相同,只是nifi.web.http.port,nifi.cluster.node.protocol.port,nifi.cluster.load.balance.port,这三个端口区分开来,避免端口重复
<cluster-provider> <id>zk-provider</id> <class> org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider </class> <property name="Connect String"> 192.168.52.150:12181,192.168.52.150:12182,192.168.52.150:12183 </property> <property name="Root Node">/nifi</property> <property name="Session Timeout">10 seconds 标签:
tlk2711连接器4034连接器xg4a