第七章 使用zmq高级框架
·如何安全地从创造力过渡到工作的原型?(MOPED模式) ·作为数据zmq不同的消息序列化方式 ·如何用代码生成二进制序列化编解码器 ·如何使用GSL建立自定义代码生成器的工具 ·如何写和许可协议规范 ·如何在zmq可快速重新启动的文件传输 ·如何实现基于信用的流量控制? ·如何将协议的服务器和客户端构建为状态机 ·如何制作一个在zmq上述安全协议 ·大型文件发布系统(FileMQ)
MOPED目标是定义一个过程,可以获得新分布式应用的粗略用例。MOPED,过早优化的风险是通过专注于合同而不是实现来避免的。在添加更多功能之前,通过基于测试的短迭代来驱动设计过程可以更肯定现有的工作。 分布五个具体步骤: 1.内部化zmq语义。 2.描述一个粗略的结构。 三、决定合同。 4.编写最小端到端解决方案。 5.解决问题,重复。 1.1 第一步:内部化zmq语义 学习一门语言的唯一方法就是学习和消化套接字模式及其工作方法。(增加代码量) 1.2 第二步:描述粗略的结构 隔离每层,可以廉价替换整层。选择要解决的核心问题,忽略任何不必要的问题:将来添加。为了简化,架构可以随着时间的推移而变得完整和现实:例如,添加多个工人,增加客户端和API,处理故障等。 1.3 第三步:决定合同 两种分布式系统合同: ·客户端应用程序API,API尽量绝对简单、一致、熟悉。 ·连接部件的协议(一个简单的技能)称为unprotocols(反协议)。 1.4 第四步:编写最小端到端解决方案 如果你想在编写代码时测试代码,官方意思是编写最小的骨架系统应用程序来测试代码。目标是让最简单的测试案例在没有任何额外功能的情况下工作。在要做的事情列表中,切断所有可以切断的任务。随时添加功能比较容易,但目标是把整体规模保持在最小值。 1.5 第五步:解决一个问题,重复一遍 与其增加功能,不如开始解决有形问题。写清楚解释问题的问题,并为每个问题提出解决方案。当设计API记住命名标准、一致性和行为。用简单的文字写下这些内容有助于保持清晰。从这里可以通过运行测试案例验证架构和代码的每一个变化。如果不能正常工作,可以修改,这样可以来回工作,直到能正常工作。 通过这种方式,我们可以遍历整个周期(根据需要扩展测试案例)API、更新协议,扩展代码),每次选择一个问题并单独测试解决方案。
2 协议
zmq它提供了一个成功的协议抽象层,它使用随机传输多部分信息的工作模式。zmq默默地处理组帧、连接和路由,所以在zmq很容易在第四章和第五章中编写完整的协议规范。 2.2 合同很难 编写合同可能是大规模架构中最困难的部分。unprotocol尽可能多地消除不必要的摩擦。但剩下的仍然是一系列难以解决的问题。好的合同是否是APl、协议或租赁协议必须简单、清晰、可靠,易于执行。 总结编写协议: ·需求从简单开始,逐步发展。不要解决尚未遇到的问题。 ·使用非常清晰和一致的语言。该协议通常分解为命令和字段,以使用清晰和简短的名称。 ·尽量避免发明概念。从现有的规范中重用一些东西。 ·不做不能证明任何迫切需要的事情。规范可以解决问题,范可以解决问题的每个问题提供最简单可行的解决方案的功能。 ·为了了解每个选择的技术后果,同时构建协议。使用一种使其难以实现的语言(如C),它不是一种容易实现的语言(如Python)。 ·在构建测试规范的同时。对一个规范的最佳反馈是试图在别人没有的假设和知识中实现它。 ·快速一致的交叉测试,与他人客户端连接的服务器反之亦然。 ·准备把它扔出去,并根据需要随时重新启动。为此制定计划,例如,通过分层架构来保留一个API,但改变底层协议。 ·仅使用独立于编程语言和操作系统的结构。 ·分层解决大问题,使每一层都是独立的标准。小心创建一个巨大的整体协议。考虑如何重用每一层。思考不同的团队如何在每一层建立竞争标准。 最重要的是写下来。代码不是标准。通过写一个标准,你会发现灰色地带在代码中是看不到的。 使用zmq一个不明显的好处是,它减少了90%以上编写协议规范的必要努力,因为它已经处理了帧、路由、排队等。这意味着你可以快速尝试,低成本犯错误,从而快速学习。
·封面部分:一行中写有摘要和规范URL、正式名称、版本、负责人。 ·正文许可证:绝对需要公共规范。 ·变更过程:如何解决规范中的问题? ·语言的使用:必要、可能、应该等,对RFC2119的引用。·成熟度指标:这是一个实验、草稿、稳定、遗产,还是已退休的版本? ·协议目标:它试图解决哪些问题? ·正式语法:防止不同文本解释引起的争论。 ·技术描述:语义、错误处理等。 ·安全讨论:协议的安全程度明确。·参考文献:引用其他文件、协议等。 以下是协议的一些关键点: ·只要过程是开放的,就不需要需要一个委员会:只是做一个干净、简单的设计,以确保任何人都能自由地改进它们。 ·若使用现有许可证,则无法律后顾之忧。建议公开规范使用GPLv3。 ·形式有价值。也就是说,学会写正式的语法,比如ABNF(扩展巴科斯-诺尔范式)并使用它完全记录信息。 ·使用类似Digistan的COSS市场驱动的生命周期过程,使人在成熟(或不成熟)时正确把握规范。
当实现一个GPLv在3的规范中,他们可以以任何他们喜欢的方式许可它。但有两件事是肯定的。首先,该规范将永远不会被纳入和扩展到专有形式。本规范的任何衍生形式也必须是GPLv3.第二,曾经实现或使用该协议的人永远不会攻击它所涵盖的任何东西。
官方文件表示最喜欢的语法是由RFC2234定义的ABNF,因为它可能是用于定义双向通信协议的最简单和最广泛使用的正式语言。大多数IETF(互联网工程任务组)规范使用ABNF,这是一个值得合作的好伙伴。 给出一个写作ABNF速成教程。把语法写成规则。每个规则的形式是名称=元素”。一个元素也可以是其他规则(下面定义为另一个规则的东西),也可以是预定义的终端(如CRLF,八位字节),或一个数字。RFC所有终端都列在中间。以元素/元素的形式定义可选元素。定义重复,使用*”(请参考RFC,因为它不直观)。括号用于元素分组。 这是一个名字NOM的协议的ABNF: nom-protocol = open-peering *use-peering open-peering = C:OHAI ( S:OHAI-OK / S:WTF ) use-peering = C:ICANHAZ / S:CHEEZBURGER / C:HUGZ S:HUGZ-OK / S:HUGZ C:HUGZ-OK
用传统的客户端/服务器用例来解释。客户端连接到服务器并进行身份验证。然后,它需要一些资源。服务器做出回应,然后开始向客户端发送数据。最后,客户端断开连接或服务器完成,对话结束。 在设计这些信息之前,比较控制对话和数据流: ·控制对话持续时间短,信息少。数据流可能持续数小时或数天,并涉及数十亿条新闻。 ·所有正常错误都会发生在控制对话中,如身份验证不合格、找不到、付费、审查等。数据流过程中的任何错误都是例外(磁盘已满,服务器崩溃)。 ·当添加更多的选项或参数时,控制对话中的东西会随着时间的推移而变化。数据流几乎不应该随着时间而改变,因为一个资源的语义在一段时间内是相当恒定的。 ·控制对话本质上是一个同步的请求——回应对话。数据流基本上是单向异步流。 这些差异很重要。因此,在谈论性能时,它只适用于数据流。快速设计一次性控制对话。当谈到序列化的成本时,这只适用于数据流。编码/解码控制流的成本可能是巨大的,在许多情况下,它不会改变任何事情。因此,通过廉价的方式控制编码,并通过讨厌的方式编码数据流。 廉价的方法本质上是同步的、详细的、描述性的和灵活的。一个廉价的新闻充满了丰富的信息,可以改变每个应用程序。设计目标是使这些信息易于编码和分析,实验或增长易于扩展,变化非常强劲,前后兼容。该协议的廉价部分看起来如下: ·它使用简单的自描述数据的结构化编码,无论是XML、JSON、HTTP式的标题,或者是一些其他的编码。任何编码方式都是好的,只要的目标语言对它有标准简单的解析器。 ·它采用了直接的请求响应模型,每个请求都有成功/失败的响应。这使得它很容易编写正确的廉价对话客户端和服务器。 ·它不会尝试得很快,甚至没有轻微的尝试。当做一些只有一次性或每次会话几次的事情时,性能并不重要。 便宜的分析器是从架子上取下后将数据丢失的东西。它不应该崩溃,不应该有内存泄漏,应该高度宽容,应该更容易使用。这很好。 但讨厌的方式本质上是异步、简洁、沉默和不灵活的。一个讨厌的新闻携带最简洁的信息,几乎从未改变过。设计目标是快速分析这些信息,甚至可能无法扩展和测试。理想的讨厌模式看起来像这样: ·对数据采用手工优化的二进制布局,每一个二进制位都是精心设计的。 ·它需确认,采用纯异步模型,其中一两个对等节点发送数据。 ·它不会尝试对人友好,甚至没有轻微的尝试。当你每秒做几百万次的事情时,性能就是一切。 讨厌的分析器是手工编写的,它单独和准确地写入或读取二进制位、字节、单词和整数。它拒绝任何它不喜欢的东西,不执行内存分配,永不崩溃。 廉价或讨厌并不是一种常见的模式,也不是所有的协议都有这种二分法。此外,如何使用廉价或令人讨厌的方法将取决于具体情况。在某些情况下,它可以是一个单独协议的两部分。在其他情况下,它可以是两个协议,一个分层,另一个分层。
使用廉价方式或讨厌方式做错误处理相当简单。它具有两种命令和两种引发错误信号的办法: 1)同步控制命令 错误是正常的:每个请求都有一个响应,要么是OK,要么是错误响应。 2)异步数据命令 错误是异常的:坏的命令要么被悄悄丢弃,要么导致整个连接被关闭。 区分几种错误通常是很好的,但要始终使它保持最精简,并只添加所需要的东西。。
3 序列化数据 3.1 zmq组帧 用于zmq应用程序的最简单、最广泛使用的序列化格式是zmq自己的多部分组帧。 例如,下面是管家协议定义请求的方式: 第0帧:空帧 第1帧:“MDPW01“(6个字节,相当于MDP/工人V0.1) 第2帧:0X02(一个字节,相当于REQUEST) 第3帧:客户端地址(封包栈) 第4帧:空(零字节,封包分隔符) 第5帧以上:请求正文(不透明的二进制码) 在代码中读取和写入这些是很容易的。但是这是控制流的一个经典例子(整个MDP是一个经典的例子,因为它是一个同步交互的请求-应答协议)。当改善MDP的第二个版本时必须改变这种组帧。向后兼容性是很难的,但将zmq组帧用于控制流并没有好处。下面是官方意见,应该怎么设计这个协议。它分解成一个廉价的部分和一个讨厌的部分,并且它使用zmq组帧来区分这些: 第0帧:“MDP/2.0"的协议名称和版本 第1帧:命令标头 第2帧:命令正文 期望在各种中介(客户端APl、代理以及工人APl)中解析命令标头,并将命令正文体原封不动地从一个应用程序传递到另一个应用程序。 3.2 序列化语言 各种序列化语言都有自己的风格。XML在流行的时候曾经是大型的,然后陷入了“企业信息架构师”之手,从那以后它就不再有活力了,今天的XML是“小型、优雅的语言试图逃脱的某处困境”的缩影。 尽管如此XML仍是比它的前辈更好的方法。因此序列化语言的历史似乎是一个逐渐显现理智的过程, JSON从JavaScript的世界杀出来。JSON只是表达得像JavaScript源代码的最精简的XML。 这是在Cheap协议中使用JSON的简单示例:
"protocol": {
"name": "MTL",
"version": 1
},
"virtual-host": "test-env"
XML中的数据将是相同的(XML迫使发明单个顶级实体):
<command>
<protocol name = "MTL" version = "1" />
<virtual-host>test-env</virtual-host>
</command>
这里使用的是普通的HTTP样式的标头: Protocol: MTL/1.0 Virtual-host: test-env 这些都是等价的,只要不针对验证解析器、模式走极端。一种廉价方式的序列化语言所提供自由实验的空(忽略任何不认识的元素/属性/标头),并且容易编写通用的解析器,例如将一个命令转换到一个散列表或反向转换。 但是这不是完美的,虽然现代的脚本语言能足够轻松地支持JSON和XML,但旧的语言不能。如果使用XML或JSON,就产生了不寻常的依赖。这也是一个有点像在C语言中处理树形结构数据时的痛苦。 所以可以根据的目标语言驱动的选择,如果的环境是一种脚本语言那么去用JSON。如果的目标是建立更广泛的系统中使用的协议,那就让事情对C语言开发者保持简单,并坚持采用HTTP风格的标头。 3.3 序列化库 这就像JSON既快速又小巧。MessagePack序列化库是一种高效的二进制序列化格式。它允许将数据在类似JSON的多种语言之间交换,但它的速度更快、更小巧。 这是使用MessagePack接口定义语言(IDL)描述典型消息的方式:
message Person {
1: string surname
2: string firstname
3: optional string email
}
现在,使用Google协议的同一条消息将缓冲IDL:
message Person {
required string surname = 1;
required string firstname = 2;
optional string email = 3;
} 它可以工作但是在大多数实际情况下,以手工编写或机械生成的适当规范为后盾的序列化语言几乎不会带来什么好处。将要付出的代价是一个额外的依赖性而且很可能会比使用Cheap或Nasty时的整体性能更差。 3.4 手写的二进制序列化 当关心序列化的速度和/或结果的大小(通常这些是相互矛盾的)时,需要手写的二进制序列化。 编写一个高效讨厌的编码器/解码器(CODEC)的基本流程是: ·构建有代表性的数据集和测试应用程序,它们可以对编解码器进行压力测试。 ·编写编解码器的第一个哑巴版本。 ·测试、测量、改进和重复,直到用完了时间和/或金钱。。 下面是一些用来改善编解码器的技术: ·使用剖析器。原因很简单,没有办法知道的代码在做什么,除非已经剖析出它的函数执行次数和每个函数的CPU开销。 ·消除内存分配。在现代的Linux内核中堆是非常快的,但它仍然是最简陋的编解码器的瓶颈。在较旧版本的内核中堆可能会非常慢,需要在代码中尽可能使用局部变量(栈)来取代堆。 ·在不同的平台上用不同的编译器和编译器选项测试。除了堆之外还有许多其他的差异。 ·使用状态来更好地压缩。如果担心编解码器的性能那么几乎肯定是将相同类型的数据发送了很多次。数据实例之间将有冗余。可以检测这些冗余并用它来压缩。 ·了解的数据。最好的压缩技术(在紧凑性的CPU的成本方面)需要知道数据的相关信息。例如,用于压缩一个单词列表、视频和股票市场数据流的技术都不同。 ·准备好打破规则。真的需要用大端网络字节顺序对整数编码吗?×86和ARM占了几乎所有现代的CPU的数量,但它们使用小端字节顺序(ARM实际上是双端的,但Android与Windows和iOS一样,是小端的)。 3.5 代码生成 所有这些都将从代码生成中受益,但是没有通用模型。因此诀窍是根据需要设计自己的模型,然后使代码生成器成为该模型的廉价编译器。 编写GSL模型时可以使用任何喜欢的语义,换句话说可以当场发明特定领域的语言。这里将发明一对夫妇-看看是否能猜出它们代表什么:
slideshow
name = Cookery level 3
page
title = French Cuisine
item = Overview
item = The historical cuisine
item = The nouvelle cuisine
item = Why the French live longer
page
title = Overview
item = Soups and salads
item = Le plat principal
item = Béchamel and other sauces
item = Pastries, cakes, and quiches
item = Soufflé: cheese to strawberry
还有这个:
table
name = person
column
name = firstname
type = string
column
name = lastname
type = string
column
name = rating
type = integer
可以将第一段汇编成演示文稿。第二段可以编译成SQL以创建和使用数据库表。因此对于本练习模型由“类”组成,这些“类”包含“消息”,这些“消息”包含各种类型的“字段”。这是故意设计成熟悉样子的。这是MDP客户端协议:
<class name = "mdp_client">
MDP/Client
<header>
<field name = "empty" type = "string" value = ""
>Empty frame</field>
<field name = "protocol" type = "string" value = "MDPC01"
>Protocol identifier</field>
</header>
<message name = "request">
Client request to broker
<field name = "service" type = "string">Service name</field>
<field name = "body" type = "frame">Request body</field>
</message>
<message name = "reply">
Response back to client
<field name = "service" type = "string">Service name</field>
<field name = "body" type = "frame">Response body</field>
</message>
</class>
这是MDP工作程序协议:
<class name = "mdp_worker">
MDP/Worker
<header>
<field name = "empty" type = "string" value = ""
>Empty frame</field>
<field name = "protocol" type = "string" value = "MDPW01"
>Protocol identifier</field>
<field name = "id" type = "octet">Message identifier</field>
</header>
<message name = "ready" id = "1">
Worker tells broker it is ready
<field name = "service" type = "string">Service name</field>
</message>
<message name = "request" id = "2">
Client request to broker
<field name = "client" type = "frame">Client address</field>
<field name = "body" type = "frame">Request body</field>
</message>
<message name = "reply" id = "3">
Worker returns reply to broker
<field name = "client" type = "frame">Client address</field>
<field name = "body" type = "frame">Request body</field>
</message>
<message name = "hearbeat" id = "4">
Either peer tells the other it's still alive
</message>
<message name = "disconnect" id = "5">
Either peer tells other the party is over
</message>
</class>
GSL使用XML作为其建模语言。XML的声誉很差,它被太多的企业下水道所吸引,难以闻到香甜,但是只要保持简单,它就会带来一些积极的影响。编写项目和属性的自描述层次结构的任何方法都是可行的。 现在这里是用GSL编写的简短IDL生成器,它将协议模型转变为文档:
.# Trivial IDL generator (specs.gsl)
.#
.output "$(class.name).md"
## The $(string.trim (class.?''):left) Protocol
.for message
. frames = count (class->header.field) + count (field)
A $(message.NAME) command consists of a multipart message of $(frames)
frames:
. for class->header.field
. if name = "id"
* Frame $(item ()): 0x$(message.id:%02x) (1 byte, $(message.NAME))
. else
* Frame $(item ()): "$(value:)" ($(string.length ("$(value)")) \
bytes, $(field.:))
. endif
. endfor
. index = count (class->header.field) + 1
. for field
* Frame $(index): $(field.?'') \
. if type = "string"
(printable string)
. elsif type = "frame"
(opaque binary)
. index += 1
. else
. echo "E: unknown field type: $(type)"
. endif
. index += 1
. endfor
.endfor
XML模型和此脚本在子目录examples / models中。为了进行代码生成给出以下命令:
gsl -script:specs mdp_client.xml mdp_worker.xml
这是为工作程序协议获得的Markdown文本:
## The MDP/Worker Protocol
A READY command consists of a multipart message of 4 frames:
* Frame 1: "" (0 bytes, Empty frame)
* Frame 2: "MDPW01" (6 bytes, Protocol identifier)
* Frame 3: 0x01 (1 byte, READY)
* Frame 4: Service name (printable string)
A REQUEST command consists of a multipart message of 5 frames:
* Frame 1: "" (0 bytes, Empty frame)
* Frame 2: "MDPW01" (6 bytes, Protocol identifier)
* Frame 3: 0x02 (1 byte, REQUEST)
* Frame 4: Client address (opaque binary)
* Frame 6: Request body (opaque binary)
A REPLY command consists of a multipart message of 5 frames:
* Frame 1: "" (0 bytes, Empty frame)
* Frame 2: "MDPW01" (6 bytes, Protocol identifier)
* Frame 3: 0x03 (1 byte, REPLY)
* Frame 4: Client address (opaque binary)
* Frame 6: Request body (opaque binary)
A HEARBEAT command consists of a multipart message of 3 frames:
* Frame 1: "" (0 bytes, Empty frame)
* Frame 2: "MDPW01" (6 bytes, Protocol identifier)
* Frame 3: 0x04 (1 byte, HEARBEAT)
A DISCONNECT command consists of a multipart message of 3 frames:
* Frame 1: "" (0 bytes, Empty frame)
* Frame 2: "MDPW01" (6 bytes, Protocol identifier)
* Frame 3: 0x05 (1 byte, DISCONNECT)
这接近于原始规范中手工编写的内容。现在如果已经克隆了zguide存储库,并且正在查看examples / models中的代码,则可以生成MDP客户端和辅助编解码器。将相同的两个模型传递给不同的代码生成器: gsl -script:codec_c mdp_client.xml mdp_worker.xml 这给了mdp_client和mdp_worker类。实际上MDP非常简单,以至于编写代码生成器几乎不值得。当想要更改协议时(从独立的Majordomo项目开始),就可以获利。修改协议,运行命令,然后弹出更完美的代码。 该codec_c.gsl代码生成不算短,但所产生的编解码器比手写代码原本放在一起的管家要好得多。例如,手写代码没有错误检查,如果将其传递给伪造的消息它将死掉。 不要发明概念。设计人员的任务是消除问题,而不是添加功能。 首先将阐述面向模型的代码生成的优点: 可以创建映射到现实世界的近乎完美的抽象。因此协议模型将100%映射到Majordomo的“真实世界”。没有自由调整和更改模型的自由,这将是不可能的。 可以快速,廉价地开发这些完美的模型。 可以生成任何文本输出。从单个模型可以创建文档,任何语言的代码,测试工具-从字面上可以想到的任何输出。 可以生成完美的输出,因为将代码生成器提高到所需的任何级别都是很便宜的。 将获得一个结合了规范和语义的单一来源。 可以利用规模较小的团队。 缺点: 将工具依赖项添加到项目中。 可能会无所适从,为纯粹的创建模型而创建模型。 对新手不友好。 可能会给别人一个很大的借口,不要投资项目。 如果确实使用GSL并希望围绕工作创建开放的社区,这是的建议: 仅在可能会手工编写繁琐的代码的地方使用它。 设计人们期望看到的自然模型。 首先手动编写代码,以便知道生成什么。 不要过度使用。把事情简单化! 逐步引入一个项目。 将生成的代码放入存储库。 已经在围绕Zerzmq的一些项目中使用GSL。例如高级C绑定Czmq使用GSL生成套接字选项类(zsockopt)。 4 传输文件 “如何发送文件?”是zmq邮件列表中一个常见的问题。zmq在发送事件和任务方面是预制得很不错的,但它不擅长发送文件。 如果建立了一个适当的文件服务器,就会发现简单地将大量数据发送到许多客户端会产生下面这种情况,在技术的说法下喜欢把它叫作“由于所有可用堆内存被一个设计得糟糕的应用程序耗尽而导致的服务器崩溃”。一个适当的文件传输协议,需要注意内存的使用。 将正确地逐步解决这些问题,这应该有希望产生一个在zmq上运行良好和正确的文件传输协议。首先,用随机数据产生1GB的测试文件: dd if=/dev/urandom of=testdata bs=1M count=1024 当有很多客户端同时请求同一个文件的时候,这个文件大得足以造成麻烦,并在许多机器上,无论如何,1GB的内存都会因为太大了而无法分配成功。作为基本参考,让衡量将此文件从磁盘复制到磁盘需要多长时间。这将知道文件传输协议额外增加了多少(包括网络开销):
$ time cp testdata testdata2
real 0m7.143s
user 0m0.012s
sys 0m1.188s
预计有上下25%的波动。
本示例显示了代码初稿,其中客户端请求测试数据,而服务器只是把它当作一系列的消息一气呵成地发送,其中每个消息各执一个“块”。
# File Transfer model #1
#
# In which the server sends the entire file to the client in
# large chunks with no attempt at flow control.
from __future__ import print_function
from threading import Thread
import zmq
from zhelpers import socket_set_hwm, zpipe
CHUNK_SIZE = 250000
def client_thread(ctx, pipe):
dealer = ctx.socket(zmq.DEALER)
dealer.connect("tcp://127.0.0.1:6000")
dealer.send(b"fetch")
total = 0 # Total bytes received
chunks = 0 # Total chunks received
while True:
try:
chunk = dealer.recv()
except zmq.zmqError as e:
if e.errno == zmq.ETERM:
return # shutting down, quit
else:
raise
chunks += 1
size = len(chunk)
total += size
if size == 0:
break # whole file received
print ("%i chunks received, %i bytes" % (chunks, total))
pipe.send(b"OK")
# File server thread
# The server thread reads the file from disk in chunks, and sends
# each chunk to the client as a separate message. We only have one
# test file, so open that once and then serve it out as needed:
def server_thread(ctx):
file = open("testdata", "rb")
router = ctx.socket(zmq.ROUTER)
# Default HWM is 1000, which will drop messages here
# since we send more than 1,000 chunks of test data,
# so set an infinite HWM as a simple, stupid solution:
socket_set_hwm(router, 0)
router.bind("tcp://*:6000")
while True:
# First frame in each message is the sender identity
# Second frame is "fetch" command
try:
identity, command = router.recv_multipart()
except zmq.zmqError as e:
if e.errno == zmq.ETERM:
return # shutting down, quit
else:
raise
assert command == b"fetch"
while True:
data = file.read(CHUNK_SIZE)
router.send_multipart([identity, data])
if not data:
break
# File main thread
# The main task starts the client and server threads; it's easier
# to test this as a single process with threads, than as multiple
# processes:
def main():
# Start child threads
ctx = zmq.Context()
a,b = zpipe(ctx)
client = Thread(target=client_thread, args=(ctx, b))
server = Thread(target=server_thread, args=(ctx,))
client.start()
server.start()
# loop until client tells us it's done
try:
print (a.recv())
except KeyboardInterrupt:
pass
del a,b
ctx.term()
if __name__ == '__main__':
main()
遇到了一个问题:如果发送过多的数据到ROUTER套接字很容易就会溢出。简单而愚蠢的解决办法是把一个无限的高水位标记放在套接字中。但因为现在已经失去防止耗尽服务器内存的保护措施。然而,如果没有一个无限的HWM,又可能会丢失大量的文件块。 若把HWM设置为1000(在zmqv3.x版本是默认的),然后将块大小减小为100K,所以一次传送10K个块。运行测试会看到它永远都不会执行完成。因为zmqsocket()手册页写道,对于ROUTER套接字:“zmq_HWM选项的操作:删除”。 必须预先控制服务器发送的数据量。发送超过网络处理能力的数据量是没有意义的。尝试一次发送一个块。在这个版本的协议中,客户端会明确地说,“给块N”,而服务器会从磁盘读取特定块并将其发送。 该示例给出了改进的第二个模型,其中客户端一次请求一个块,而服务器只发送从客户端获得的每一个请求索要的一个块。
# File Transfer model #3
#
# In which the client requests each chunk individually, using
# command pipelining to give us a credit-based flow control.
from __future__ import print_function
import os
from threading import Thread
import zmq
from zhelpers import socket_set_hwm, zpipe
CHUNK_SIZE = 250000
def client_thread(ctx, pipe):
dealer = ctx.socket(zmq.DEALER)
socket_set_hwm(dealer, 1)
dealer.connect("tcp://127.0.0.1:6000")
total = 0 # Total bytes received
chunks = 0 # Total chunks received
while True:
# ask for next chunk
dealer.send_multipart
标签: mtl温度变送器mtl5573