文章目录
- 1.什么是分布式配置监控?
- 2:实例编码
-
- 1:开发分析
- 2:创建zk实例
- 3:检查当前节点是否存在
- 4:当监控存在时,读取数据
- 五、写监控程序
- 6:优化代码-当无值的时候直接持续查询是否有值
- 7:进行测试
- 8:最后优化,包装所有方法,将值暴露给业务方法
- 三、附录:所有代码
-
- TestZk测试类
- ZKUtils:获取zk实例
- DefaultWatch:默认连接zk时的监控类
- MyConf:存储信息的类别
- WatchCallBack:核心工具类
1.什么是分布式配置监控?
如图所示:在分布式系统中,如果多个客户端(服务器)同时使用某些配置。然后,当配置发生变化时,我们需要通过心跳等方式主动监控它。那么,当数据发生变化时,有没有办法主动通知我们呢;所以zookeeper就可以办到;
2:实例编码
1:开发分析
1:场景,开发测试程序,模拟客户监控数据,当数据存在时不断输出,当数据更改时,输出最新值,当数据丢失时,线程堵塞。 2:本次开发采用响应式编程 3:这种发展的前提是你已经知道了zk基本使用客户端;如果没有,建议阅读本文Zookeeper-04-ZK的API基本使用 4:开发程序类目录说明 5:由于使用响应编程,异步处理来回跳转更烧脑,我试着每一步粘贴所有代码;我希望不要因为太多的代码而影响你的印象;
2:创建zk实例
1:创建测试类,前置调用测试类获取zk实例
package com.example.zookeeperdemo.zookeeper.confog_wkl; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */ public class TestZk {
ZooKeeper zk; ///前置程序主要是为了获得zk连接 @Before public void conn(){
zk=ZKUtils.getZK(); } @After public void colse(){
try {
zk.close(); } catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test(){
System.out.println("start----------");
}
}
2:zk实例由ZKUtils封装的静态方法获取,因为zk连接是异步的,此处利用CountDownLatch 来实现同步阻塞
package com.example.zookeeperdemo.zookeeper.confog_wkl;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/** * Description:获取zk实例 * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class ZKUtils {
private static ZooKeeper zk;
//在端口号后边加/testConfig,表示后续在程序中使用的根节点为testConfig
private static String adress = "192.168.138.128:2181/testConfig";
private static int timeout = 3000;
private static DefaultWatch defaultWatch = new DefaultWatch();
private static CountDownLatch downLatch = new CountDownLatch(1);
public static ZooKeeper getZK(){
try {
zk = new ZooKeeper(adress,timeout,defaultWatch);
defaultWatch.setCd(downLatch);
//因为zk连接是异步的,此处采用CountDownLatch
downLatch.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zk;
}
}
3:因为zk实例连接时要设置监听器,正好在监听器中设置countDown
package com.example.zookeeperdemo.zookeeper.confog_wkl;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class DefaultWatch implements Watcher {
CountDownLatch cd;
public CountDownLatch getCd() {
return cd;
}
public void setCd(CountDownLatch cd) {
this.cd = cd;
}
@Override
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("zk连接成功了");
//成功建立连接后放行
cd.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
}
}
}
3:查看当前节点是否存在
package com.example.zookeeperdemo.zookeeper.confog_wkl; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */ public class TestZk { ZooKeeper zk; //前置程序主要是为了获得zk连接 @Before public void conn(){ zk=ZKUtils.getZK(); } @After public void colse(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void test(){ System.out.println("start----------"); //因为zk获取数据和查询数据都需要一个
监控器和回调器,干脆做成一个公共的 WatchCallBack watchCallBack = new WatchCallBack(); //查看当前节点是否存在,参数需要一个监控器和异步回调器 zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC"); } }
因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的
package com.example.zookeeperdemo.zookeeper.confog_wkl;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
ZooKeeper zk;
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
//查询状态回调器
@Override
public void processResult(int i, String s, Object o, Stat stat) {
if(stat !=null){
//不等于null,代表有数据
zk.getData("/AppConf",this,this,"ABC");
}
}
//监控器
@Override
public void process(WatchedEvent watchedEvent) {
}
//获取数据回调器
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
}
}
4:当监控存在时,读取数据
当该节点存在时,从该节点读取数据
package com.example.zookeeperdemo.zookeeper.confog_wkl;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
/** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */
public class TestZk {
ZooKeeper zk;
//前置程序主要是为了获得zk连接
@Before
public void conn(){
zk=ZKUtils.getZK();
}
@After
public void colse(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test(){
System.out.println("start----------");
//因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的
WatchCallBack watchCallBack = new WatchCallBack();
MyConf myConf = new MyConf();
watchCallBack.setZk(zk);
watchCallBack.setConf(myConf);
CountDownLatch countDownLatch = new CountDownLatch(1);
//查看当前节点是否存在,参数需要一个监控器和异步回调器
zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
watchCallBack.setCountDownLatch(countDownLatch);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
countDownLatch.await();
}
}
并将该数据存入myconf这个类中
package com.example.zookeeperdemo.zookeeper.confog_wkl;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
ZooKeeper zk;
MyConf conf ;
CountDownLatch countDownLatch;
public MyConf getConf() {
return conf;
}
public void setConf(MyConf conf) {
this.conf = conf;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
//查询状态回调器
@Override
public void processResult(int i, String s, Object o, Stat stat) {
if(stat !=null){
//不等于null,代表有数据
zk.getData("/AppConf",this,this,"ABC");
}
}
//监控器
@Override
public void process(WatchedEvent watchedEvent) {
}
//获取数据回调器
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
if(bytes!=null){
String data = new String(bytes);
//将数据存入myconf
conf.setName(data);
//当拿到数据后,countDown
countDownLatch.countDown();
}
}
}
package com.example.zookeeperdemo.zookeeper.confog_wkl;
/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class MyConf {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
5:写监控程序
代码中读取数据和查看节点都是用了同一个监控,那么我们可以同一个监控,对不同的监控类型做出处理,此处采用在节点数据改变和没有发现节点时,在此调用getData方法,这个方法可以可以一直循环监控数据
//监控器
@Override
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) {
case None:
break;
case NodeCreated:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeDeleted:
//容忍性
conf.setName("");
countDownLatch = new CountDownLatch(1);
break;
case NodeDataChanged:
zk.getData("/AppConf",this,this,"sdfs");
break;
case NodeChildrenChanged:
break;
}
}
6:优化代码-当无值的时候直接持续查询是否有值
我们优化一下代码,当myConf中没有值得时候让线程阻塞等待,等待有值得时候直接输出
package com.example.zookeeperdemo.zookeeper.confog_wkl; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.CountDownLatch; /** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */ public class TestZk { ZooKeeper zk; //前置程序主要是为了获得zk连接 @Before public void conn(){ zk=ZKUtils.getZK(); } @After public void colse(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void test(){ System.out.println("start----------"); //因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的 WatchCallBack watchCallBack = new WatchCallBack(); MyConf myConf = new MyConf(); watchCallBack.setZk(zk); watchCallBack.setConf(myConf); CountDownLatch countDownLatch = new CountDownLatch(1); //查看当前节点是否存在,参数需要一个监控器和异步回调器 zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC"); watchCallBack.setCountDownLatch(countDownLatch); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } while 标签:
温度传感器wkl