Zookeeper
一 Zookeeper介绍
ZooKeeper是一个分布式服务协调框架
,主要用于解决分布式应用中的一些数据管理问题,如:统一命名服务、状态同步服务、应用配置
项目管理等。
ZooKeeper由雅虎研究院开发,后来托管到Apache,2010年11月正式成立Apache顶级项目。
在大数据生态系统中,许多组件被命名为动物,如hadoop就是??,hive就是??。Zookeeper用于管理这些组件,即动物园经理。
官方网址:https://zookeeper.apache.org/
二 Zookeeper安装
1 服务端
配置(我已经配置好了)
- 进入到zookeeper的conf目录,复制
zoo_sample.cfg
为zoo.cfg
- 编辑
zoo.cfg
文件,修改dataDir
的值为../data
这是zookeeper存储数据的位置
启动
进入安装路径bin目录,双击zkServer.cmd
即可启动zookeeper服务
2 命令行客户端
-
进入安装路径bin目录,双击
zkCli.cmd
即可启动zookeeper命令行客户端 -
输入客户端
create /itcast ceshi
,没有错误,代表客户端可以成功操作服务器
若要连接其他人zk需要使用服务器cmd命令运行zkCli.cmd
zkCli.cmd -server ip:端口号
3 UI客户端
安装
将资料中的ZooInspector.zip复制到没有中文、空格和特殊符号的目录,然后解压ZooInspector的软件
软件的安装build执行在目录下java -jar zookeeper-dev-ZooInspector.jar
命令可以启动软件界面
连接
-
点击连接按钮,输入zk点击服务地址OK连接
-
连接成功后,可点击itcast查看节点存储的信息
三 Zookeeper数据结构重点
1 数据结构
Zookeeper数据节点可视为树形(或目录)结构,树中的每个节点都被称为znode(即zookeeper node),一个znode可以有多个子节点。
Zookeeper节点在结构上呈树形,使用路径path来定位某个znode,比如/itcast/tj/java
,此处itcast、tj、java一级节点,二级节点,三级节点
znode(节点)具有文件和目录两个特点:像文件一样维护数据和元信息ACL、时间戳等数据结构也可以像目录一样挂载子目录。
一个znode一般分为三个部分:
- data:当前节点的数据
- children:当前节点的子节点
- stat:用于描述当前节点的创建、修改记录等
# 在zookeeper shell中使用get检查指定路径节点的命令data、stat信息: [zk: localhost:2181(CONNECTED) 3] get /itcast/bj/java javadata cZxid = 0x6fc ctime = Sun Dec 13 17:56:13 CST 2020 mZxid = 0x6fc mtime = Sun Dec 13 17:56:13 CST 2020 pZxid = 0x6fc cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 0 # cZxid:创建数据节点时的事务 ID # ctime:创建数据节点时间 # mZxid:最后一次更新数据节点时的事务 ID # mtime:最后一次更新数据节点时间 # pZxid:当数据节点的子节点最后一次被修改时 ID # cversion:子节点的变化次数 # dataVersion:节点数据的更改次数 # aclVersion: 更改权限的次数 # ephemeralOwner: 持久节点还是临时节点 # dataLength:数据内容的长度 # numChildren:当前数据节点的子节点数
2 节点类型
对于Zookeeper节点有两种分类方法,一种是根据节点是否持久化分类
,一是根据节点顺序分类
。
根据节点是否持久分类,可分为持久节点和临时节点
-
:(默认)节点的生命周期不依赖会话,只有在客户端删除时才能删除
-
:该节点的生命周期取决于创建它们的对话。会话一结束,临时节点就会自动删除,当然也可以手动删除
尽管每一个临时的Znode会绑定到客户端会话,但对所有客户端都是可见的。ZooKeeper不允许有子节点的临时节点
有序节点和无序节点可根据节点是否有序分类
- :每个节点都将维护其子节点的顺序
- :(默认)节点不会维护子节点的顺序
知识小结
- (默认)持久无序节点 :节点创建后将永远存在zookeeper直到主动删除服务器 - 节点持久有序 :在持久无序节点的基础上,在每个节点的子节点上顺序特性 - 临时无序节点 : 当客户端会话失败时,临时节点的生命周期与客户端会话一致,自动清理节点 - 节点临时有序 : 在临时节点上多了一个顺序特性,当客户端会话失效,自动清理节点
四 Zookeeper常见命令(理解)
1 创建节点
create [-s] [-e] path data [acl]
path:节点名称(绝对路径) data:节点数据 acl: 权限 -s:有序节点 -e:临时节点
# 1)创建 /node节点 [zk: localhost2181(CONNECTED) 4] create /node node
Created /node
# 2)创建 /node/data1节点(无序持久节点)
[zk: localhost:2181(CONNECTED) 5] create /node/node1 data
Created /node/node1
# 3)创建 /node/data2节点(无序临时节点)
# 注意: 一旦创建此节点的连接断开(命令行退出),节点会自动删除
[zk: localhost:2181(CONNECTED) 6] create -e /node/node2 data
Created /node/node2
# 4) 创建 /node/data3节点(有序持久节点)
# 注意: 此类节点创建出来后会在节点名称后面自动跟随一串递增的数字
[zk: localhost:2181(CONNECTED) 7] create -s /node/node3 data
Created /node/node30000000002
[zk: localhost:2181(CONNECTED) 8] create -s /node/node3 data
Created /node/node30000000003
# 4) 创建 /node/data4节点(有序临时节点)
# 注意: 此类节点创建出来后会在节点名称后面自动跟随一串递增的数字
# 一旦创建此节点的连接断开(命令行退出),节点会自动删除
[zk: localhost:2181(CONNECTED) 9] create -s -e /node/node4 data
Created /node/node40000000004
[zk: localhost:2181(CONNECTED) 10] create -s -e /node/node4 data
Created /node/node40000000005
2 查看节点
查看节点信息
get path : 获取节点的状态信息和数据信息
stat path : 获取节点的状态信息
[zk: localhost:2181(CONNECTED) 3] get /node/node1
data # 节点数据
cZxid = 0x21 # 数据节点创建时的事务ID
ctime = Sun Oct 11 22:35:27 CST 2020 # 数据节点创建时的时间
mZxid = 0x21 # 数据节点最后一次更新时的事务ID
mtime = Sun Oct 11 22:35:27 CST 2020 # 数据节点最后一次更新时的时间
pZxid = 0x21 # 数据节点的子节点最后一次被修改时的事务ID
cversion = 0 # 子节点的更改次数
dataVersion = 0 # 节点数据的更改次数
aclVersion = 0 # 权限的更改次数
ephemeralOwner = 0x0 # 如果节点为临时节点,那么它的值为这个节点拥有者的sessionID;如果该节点不是临时节点, 值为0
dataLength = 4 # 数据内容的长度
numChildren = 0 # 数据节点当前的子节点个数
查看节点列表
ls path: 查看指定节点的子节点列表
ls2 path: 查看指定节点的状态信息ls 以及子节点列表 (ls + stat)
#1) ls
[zk: localhost:2181(CONNECTED) 11] ls /
[node, zookeeper, itcast]
[zk: localhost:2181(CONNECTED) 12] ls /node
[node30000000003, node30000000002, node1]
#2)ls2
[zk: localhost:2181(CONNECTED) 13] ls2 /
[node, zookeeper, itcast]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x1f
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 3
3 修改节点
set path data [version]
# 1) set path data 直接使用set命令对指定节点内容进行修改
# 注意: 在修改的时候,除了数据内容变化之外,还有一个关键属性变化了
# dataVersion = 1 指的是数据版本,每次数据被修改,版本自动加1
[zk: localhost:2181(CONNECTED) 14] set /node/node1 mydata
cZxid = 0x20
ctime = Wed Nov 04 21:07:17 CST 2020
mZxid = 0x26
mtime = Wed Nov 04 21:14:14 CST 2020
pZxid = 0x20
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
# 2) set path data version 基于版本号对指定节点内容进行修改
# 即修改数据的时候,要传入要修改数据的版本号, 如果传入的版本号和当前的版本号不符合时,zookeeper会拒绝本次修改
[zk: localhost:2181(CONNECTED) 15] set /node/node1 mydatadata 0
version No is not valid : /node/node1
[zk: localhost:2181(CONNECTED) 16] set /node/node1 mydatadata 1
cZxid = 0x20
ctime = Wed Nov 04 21:07:17 CST 2020
mZxid = 0x28
mtime = Wed Nov 04 21:15:07 CST 2020
pZxid = 0x20
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0
4 删除节点
delete path [version]
rmr path (递归)
#1) delete path 使用delete命令删除指定节点
[zk: localhost:2181(CONNECTED) 18] delete /node/node30000000002
#2) delete path version 基于版本号删除指定节点
# 即删除节点的时候,要传入要删除节点的版本号, 如果传入的版本号和当前的版本号不符合时,zookeeper会拒绝本次删除
[zk: localhost:2181(CONNECTED) 19] delete /node/node1 1
version No is not valid : /node/node1
[zk: localhost:2181(CONNECTED) 20] delete /node/node1 2
#3) rmr path 递归删除
# 删除非空节点(含有子节点)失败
[zk: localhost:2181(CONNECTED) 21] delete /itcast
Node not empty: /itcast
# 递归删除
[zk: localhost:2181(CONNECTED) 22] rmr /itcast
5 监听节点 (理解)
在zookeeper中还支持一种watch(监听)机制,它允许对zookeeper注册监听,当监听的对象发生指定的事件的时候,zookeeper就会返回一个通知。
- None:客户端和服务端连接状态发生变化
- NodeCreated:创建节点
- NodeDeleted:删除节点
- NodeDataChanged:节点数据发生变化(dataVersion)
- NodeChildrenChanged:子节点发生变化(创建\删除)
get/stat path watch: 注册的监听器能够在监听节点的`内容发生改变`或者`节点被删除`的时候,向客户端发出通知
ls/ls2 path watch: 注册的监听器能够在监听节点的`子节点新增和删除`的时候,向客户端发出通知
# 注意: 下面的操作,需要使用两个客户端来完成,一个负责注册监听,令一个负责修改
# 1) 对/node (内容)监听
[zk: localhost:2181(CONNECTED) 12] get /node watch
mydata
cZxid = 0x6fd
ctime = Sun Dec 13 17:57:06 CST 2020
mZxid = 0x70d
mtime = Sun Dec 13 18:07:37 CST 2020
pZxid = 0x710
cversion = 11
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 3
# 2) 此处,新开一个客户端,去修改/node的内容
[zk: localhost:2181(CONNECTED) 0] set /node mydata
cZxid = 0x6fd
ctime = Sun Dec 13 17:57:06 CST 2020
mZxid = 0x70d
mtime = Sun Dec 13 18:07:37 CST 2020
pZxid = 0x705
cversion = 9
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 3
# 3) 然后就会发现控制台返回了一个NodeDataChanged(节点数据变化)的通知
[zk: localhost:2181(CONNECTED) 13]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/node
# 1) 对/node注册ls(子节点新增或者删除)监听
[zk: localhost:2181(CONNECTED) 13] ls /node watch
[node30000000003, node30000000002, node1]
# 2) 此处,新开一个客户端,去添加/node/data5的子节点
[zk: localhost:2181(CONNECTED) 0] create /node/node5 test
Created /node/node5/test
# 3) 然后就会发现控制台返回了一个NodeChildrenChanged(子节点变化)的通知
[zk: localhost:2181(CONNECTED) 14]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/node
1. 当监听器监听的事件被触发,服务端会发送通知给客户端,但通知信息中不包括事件的具体内容。
以监听ZNode结点数据变化为例,当Znode的数据被改变,客户端会收到事件类型为NodeDataChanged的通知,但该Znode的数据改变成了什么客户端无法从通知中获取,需要客户端在收到通知后手动去获取。
2. Watcher是一次性的。一旦被触发将会失效。如果需要反复进行监听就需要反复进行注册。
这么设计是为了
减轻服务端的压力
,但是对开发者而言却是相当不友好,不过不用着急,可以通过一些Zookeeper的开源客户端轻松实现对某一事件的永久监听。
五 Java操作Zookeeper(演示)
1 API
ZooKeeper 原生Java API(不推荐使用)
Apache Curator(推荐使用)
- Apache Curator是 Apache ZooKeeper的Java客户端库,简化ZooKeeper客户端的使用
- Apache Curator最初是
Netflix
研发的,后来捐献了 Apache基金会,目前是 Apache的顶级项目
2 环境搭建
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 设置编译版本为1.8 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
3 远程连接
public class BaseDemo {
//连接ZK
public CuratorFramework getClient() {
//1.设置重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
//2.创建客户端
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 1000, retryPolicy);
//3.启动客户端
client.start();
//4.返回客户端连接
return client;
}
}
4 节点操作
//创建节点
@Test
public void testCreateNodes() throws Exception {
String path = null;
//1. 创建一个空节点a, 值默认为请求端ip
//path = getClient().create().forPath("/a");
//2. 创建一个有内容的b节点
//path = getClient().create().forPath("/b", "data".getBytes());
//3. 创建多层节点
//creatingParentsIfNeeded() 递归创建父目录
//withMode() 创建模式
//path = getClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/c/c1/c2", "data".getBytes());
//4. 创建带有的序号的节点
//path = getClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/d");
//5. 创建临时节点(客户端关闭,节点消失),设置延时5秒关闭
//path = getClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/e");
//6. 创建临时带序号节点(客户端关闭,节点消失),设置延时5秒关闭
//path = getClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/f");
System.out.println(path);
//关闭客户端
getClient().close();
}
//更新节点
@Test
public void testUpdateNodes() throws Exception {
//更新节点数据
//getClient().setData().forPath("/a", "itheima".getBytes());
//根据版本号更新节点数据
//getClient().setData().withVersion(2).forPath("/a", "itheima".getBytes());
getClient().close();
}
//获取节点
@Test
public void testGetNodes() throws Exception {
//查询节点数据
byte[] bytes = getClient().getData().forPath("/a");
System.out.println(new String(bytes));
// 包含状态查询
Stat stat = new Stat();
getClient().getData().storingStatIn(stat).forPath("/a");
System.out.println(stat.getVersion());
}
//删除数据
@Test
public void testDeleteNodes() throws Exception {
//删除节点
//getClient().delete().forPath("/c");
//删除节点并递归删除其子节点
//getClient().delete().deletingChildrenIfNeeded().forPath("/c");
}
5 节点监听
zookeeperwatch机制,一个轻量级的设计。因为它采用了一种
推拉
结合的模式。 一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的"推"部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是"拉"部分。
Curator在这方面做了优化,它引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。 Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程ZooKeeper视图的对比过程。 而且Curator会
自动的再次监听
,我们就不需要自己手动的重复监听了。Curator中的cache共有三种: NodeCache: 用来监听节点的创建和数据变化 PathChildrenCache: 用来监听指定节点的子节点增删和数据变化 【推荐】TreeCache: 像上面两种Cache的结合体,能够监听自身节点的变化、也能够监听子节点的变化 * 这哥们可以实现持续监听和主动拉取
public class WatchDemo { //连接ZK public CuratorFramework getClient() { //1.设置重试策略 RetryPolicy retryPolicy = new RetryNTimes(3, 3000); //2.创建客户端 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 1000, retryPolicy); //3.启动客户端 client.start(); //4.返回客户端连接 return client; } //只监测当前节点数据 @Test public void testNodeCache() throws Exception { //创建节点数据监听对象 final NodeCache nodeCache = new NodeCache(getClient(), "/a"); nodeCache.start(true); //开始缓存 System.out.println(new String(nodeCache.getCurrentData().getData())); //添加监听对象 nodeCache.getListenable().addListener(new NodeCacheListener() { //如果节点数据有变化,会回调该方法 public void nodeChanged() throws Exception { System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath() + ",data=" + new String(nodeCache.getCurrentData().getData())); } }); System.in.read(); } //监测所有子节点数据 @Test public void testPathChildrenCache() throws Exception { //监听指定节点的子节点变化情况包括?新增子节点 子节点数据变更 和子节点删除 PathChildrenCache childrenCache = new PathChildrenCache(getClient(), "/a", true); //在启动时缓存子节点数据,提示初始化 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //添加监听 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { System.out.println(event.getData().getPath() + "子节点添加:" + new String(event.getData().getData())); } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { System.out.println(event.getData().getPath() + "子节点移除:" + new String(event.getData().getData())); } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) { System.out.println(event.getData().getPath() + "子节点修改:" + new String(event.getData().getData())); } else if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { System.out.println("初始化完成"); } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED) { System.out.println("连接过时"); } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED) { System.out.println("重新连接"); } else if (event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST) { System.out.println("连接过时一段时间"); } } }); System.in.read(); } //监测当前节点和所有的子节点 @Test public void testTreeCache() throws Exception { TreeCache treeCache = new TreeCache(getClient(), "/a"); treeCache.start(); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception