kazoo是一个Python目的是使库Python使用方便方便zookeeper。
kazoo的安装
使用pip安装kazoo:
pip install kazoo
因为kazoo使用纯python实现zookeeper因此,无需安装协议Python zookeeper C各种依赖。
基本用法
连接处理
使用kazoo,首先需要实例化KazooClient代码如下:
from kazoo.client import KazooClient zk = KazooClient(hosts="127.0.0.1:2181") zk.start()
默认情况下,KazooClient会连接本地zookeeper服务,端口号2181。
当zookeeper服务异常时(服务异常或服务未启动等),zk.start()在连接加连接,直到连接超时。
无论是间歇性连接丢失(网络闪断等。zookeeper会话过期,KazooClient将继续尝试重新连接。
我们可以通过stop命令显式中断连接:
zk.stop()
会话状态
Kazoo客户端在和zookeeper在服务会话过程中,通常在以下三种状态之间切换:CONNECTED
、SUSPENDED
、LOST
。
当KazooClient例子第一次创建时,它的状态是LOST,一旦连接成功建立,状态立即切换CONNECTED。
在整个会话生命周期中,伴随着网络闪断和服务端zookeeper异常或其他原因导致客户端和服务端断开,KazooClient状态切换成SUSPENDED,与此同时,KazooClient您将继续尝试重新连接服务端。一旦连接成功,状态将再次恢复CONNECTED。
kazoo状态监听
添加状态监控事件,实时监控客户端和服务端的会话状态,使我们能够及时处理连接中断、连接恢复或会话过期其使用方法如下:
def connection_listener(state): if state == "LOST": # Register somewhere that the session was lost pass elif state == "SUSPENDED": # Handle being disconnected from Zookeeper pass else: # Handle being connected/reconnected to Zookeeper pass zk = KazooClient(hosts="127.0.0.1:2181") zk.add_listener(connection_listener)
当使用kazoo.recipe.lock.Lock或者在创建临时节点时,强烈建议状态监控,以便我们的代码能够正确处理连接中断或Zookeeper会话丢失。
zookeeper的增删改查
创建节点
方法:
- ensure_path():递归创建节点路径,只能设置权限,不能添加数据。
- create():如果父节点必须存在,不能递归创建,可以同时添加数据和监控事件。
用法:
zk.ensure_path("/china/henan") zk.create("/china", b"this is china node.")
读取数据
方法:
- exists():检查节点是否存在
- get():获取节点数据以及节点状态的详细信息
- get_children():获取指定节点的所有子节点
更新数据
方法:
- set():更新指定节点的信息。
删除节点
方法:
- delete():删除指定节点。
监听器
kazoo当节点或节点的子节点发生变化时,可以在节点上添加监控器进行触发。
kazoo支持两种方式添加监听器,一种是zookeeper原生支持的,其用法如下:
def test_watch_data(event): print("this is a watcher for node data.") zk.get_children("/china", watch=test_watch_children)
另一种方法是通过python支持该功能的方法有:
- ChildrenWatch:当子节点发生变化时触发
- DataWatch:当节点数据发生变化时触发
用法如下:
@zk.ChildrenWatch("/china") def watch_china_children(children): print("this is watch_china_children %s" % children) @zk.DataWatch("/china") def watch_china_node(data, state): print("china node is %s" % data)
kazoo事务
自v3.4以后,zookeeper支持一次发送多个命令,作为原子提交,要么成功执行,要么失败。
使用方法如下:
transaction = zk.transaction() transaction.check('/china/hebei', version=3) transaction.create('/china/shanxi', b"thi is shanxi.") results = transaction.commit()