资讯详情

Zookeeper-05-ZK的分布式配置实例(watch监控)

文章目录

  • 1.什么是分布式配置监控?
  • 2:实例编码
    • 1:开发分析
    • 2:创建zk实例
    • 3:检查当前节点是否存在
    • 4:当监控存在时,读取数据
    • 五、写监控程序
    • 6:优化代码-当无值的时候直接持续查询是否有值
    • 7:进行测试
    • 8:最后优化,包装所有方法,将值暴露给业务方法
  • 三、附录:所有代码
    • TestZk测试类
    • ZKUtils:获取zk实例
    • DefaultWatch:默认连接zk时的监控类
    • MyConf:存储信息的类别
    • WatchCallBack:核心工具类

1.什么是分布式配置监控?

在这里插入图片描述 如图所示:在分布式系统中,如果多个客户端(服务器)同时使用某些配置。然后,当配置发生变化时,我们需要通过心跳等方式主动监控它。那么,当数据发生变化时,有没有办法主动通知我们呢;所以zookeeper就可以办到;

2:实例编码

1:开发分析

1:场景,开发测试程序,模拟客户监控数据,当数据存在时不断输出,当数据更改时,输出最新值,当数据丢失时,线程堵塞。 2:本次开发采用响应式编程 3:这种发展的前提是你已经知道了zk基本使用客户端;如果没有,建议阅读本文Zookeeper-04-ZK的API基本使用 4:开发程序类目录说明 5:由于使用响应编程,异步处理来回跳转更烧脑,我试着每一步粘贴所有代码;我希望不要因为太多的代码而影响你的印象;

2:创建zk实例

1:创建测试类,前置调用测试类获取zk实例

package com.example.zookeeperdemo.zookeeper.confog_wkl;  import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test;  /** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */ public class TestZk { 
              ZooKeeper zk;      ///前置程序主要是为了获得zk连接     @Before     public void conn(){ 
                 zk=ZKUtils.getZK();     }      @After     public void colse(){ 
                 try { 
                     zk.close();         } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    @Test
    public void test(){ 
        
        System.out.println("start----------");
    }
}

2:zk实例由ZKUtils封装的静态方法获取,因为zk连接是异步的,此处利用CountDownLatch 来实现同步阻塞

package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/** * Description:获取zk实例 * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class ZKUtils { 
        

    private  static ZooKeeper zk;
    //在端口号后边加/testConfig,表示后续在程序中使用的根节点为testConfig
    private static String adress = "192.168.138.128:2181/testConfig";
    private static int timeout = 3000;

    private  static  DefaultWatch defaultWatch = new DefaultWatch();
    private static CountDownLatch downLatch = new CountDownLatch(1);

    public static ZooKeeper getZK(){ 
        
        try { 
        
            zk = new ZooKeeper(adress,timeout,defaultWatch);
            defaultWatch.setCd(downLatch);
            //因为zk连接是异步的,此处采用CountDownLatch
            downLatch.await();
        } catch (IOException e) { 
        
            e.printStackTrace();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
        return zk;
    }
}

3:因为zk实例连接时要设置监听器,正好在监听器中设置countDown

package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class DefaultWatch implements Watcher { 
        

    CountDownLatch cd;

    public CountDownLatch getCd() { 
        
        return cd;
    }

    public void setCd(CountDownLatch cd) { 
        
        this.cd = cd;
    }

    @Override
    public void process(WatchedEvent watchedEvent) { 
        
        switch (watchedEvent.getState()) { 
        
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("zk连接成功了");
                //成功建立连接后放行
                cd.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }
    }
}

3:查看当前节点是否存在

 package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */
public class TestZk { 
        

    ZooKeeper zk;

    //前置程序主要是为了获得zk连接
    @Before
    public void conn(){ 
        
        zk=ZKUtils.getZK();
    }

    @After
    public void colse(){ 
        
        try { 
        
            zk.close();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    @Test
    public void test(){ 
        
        System.out.println("start----------");

        //因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的
        WatchCallBack watchCallBack = new WatchCallBack();

        //查看当前节点是否存在,参数需要一个监控器和异步回调器
        zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
    }
}

因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的

package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { 
        

    ZooKeeper zk;

    public ZooKeeper getZk() { 
        
        return zk;
    }

    public void setZk(ZooKeeper zk) { 
        
        this.zk = zk;
    }

    //查询状态回调器
    @Override
    public void processResult(int i, String s, Object o, Stat stat) { 
        
        if(stat !=null){ 
        
            //不等于null,代表有数据
            zk.getData("/AppConf",this,this,"ABC");
        }
    }

    //监控器
    @Override
    public void process(WatchedEvent watchedEvent) { 
        

    }

    //获取数据回调器
    @Override
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { 
        

    }
}

4:当监控存在时,读取数据

当该节点存在时,从该节点读取数据

package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

/** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */
public class TestZk { 
        

    ZooKeeper zk;

    //前置程序主要是为了获得zk连接
    @Before
    public void conn(){ 
        
        zk=ZKUtils.getZK();
    }

    @After
    public void colse(){ 
        
        try { 
        
            zk.close();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    @Test
    public void test(){ 
        
        System.out.println("start----------");

        //因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的
        WatchCallBack watchCallBack = new WatchCallBack();
        MyConf myConf = new MyConf();

        watchCallBack.setZk(zk);
        watchCallBack.setConf(myConf);
        CountDownLatch countDownLatch = new CountDownLatch(1);


        //查看当前节点是否存在,参数需要一个监控器和异步回调器
        zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
        watchCallBack.setCountDownLatch(countDownLatch);
        try { 
        
            countDownLatch.await();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }

		zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
		countDownLatch.await();
		
        
    }
}

并将该数据存入myconf这个类中

package com.example.zookeeperdemo.zookeeper.confog_wkl;


import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { 
        

    ZooKeeper zk;
    MyConf conf ;
    CountDownLatch countDownLatch;

    public MyConf getConf() { 
        
        return conf;
    }

    public void setConf(MyConf conf) { 
        
        this.conf = conf;
    }

    public ZooKeeper getZk() { 
        
        return zk;
    }

    public void setZk(ZooKeeper zk) { 
        
        this.zk = zk;
    }

    //查询状态回调器
    @Override
    public void processResult(int i, String s, Object o, Stat stat) { 
        
        if(stat !=null){ 
        
            //不等于null,代表有数据
            zk.getData("/AppConf",this,this,"ABC");           

        }
    }

    //监控器
    @Override
    public void process(WatchedEvent watchedEvent) { 
        

    }

    //获取数据回调器
    @Override
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { 
        
        if(bytes!=null){ 
        
            String data = new String(bytes);
            //将数据存入myconf
            conf.setName(data);
            //当拿到数据后,countDown
            countDownLatch.countDown();
        }
    }
}

package com.example.zookeeperdemo.zookeeper.confog_wkl;

/** * Description: * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */
public class MyConf { 
        
    private String name;

    public String getName() { 
        
        return name;
    }

    public void setName(String name) { 
        
        this.name = name;
    }
}

5:写监控程序

代码中读取数据和查看节点都是用了同一个监控,那么我们可以同一个监控,对不同的监控类型做出处理,此处采用在节点数据改变和没有发现节点时,在此调用getData方法,这个方法可以可以一直循环监控数据

 //监控器
    @Override
    public void process(WatchedEvent watchedEvent) { 
        
        switch (watchedEvent.getType()) { 
        
            case None:
                break;
            case NodeCreated:
                zk.getData("/AppConf",this,this,"sdfs");
                break;
            case NodeDeleted:
                //容忍性
                conf.setName("");
                countDownLatch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                zk.getData("/AppConf",this,this,"sdfs");
                break;
            case NodeChildrenChanged:
                break;
        }
    }

6:优化代码-当无值的时候直接持续查询是否有值

我们优化一下代码,当myConf中没有值得时候让线程阻塞等待,等待有值得时候直接输出

package com.example.zookeeperdemo.zookeeper.confog_wkl;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

/** * Description: * Date: 2022/5/8 - 上午 4:10 * author: wangkanglu * version: V1.0 */
public class TestZk { 
        

    ZooKeeper zk;

    //前置程序主要是为了获得zk连接
    @Before
    public void conn(){ 
        
        zk=ZKUtils.getZK();
    }

    @After
    public void colse(){ 
        
        try { 
        
            zk.close();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    @Test
    public void test(){ 
        
        System.out.println("start----------");

        //因为zk获取数据和查询数据都需要一个监控器和回调器,干脆做成一个公共的
        WatchCallBack watchCallBack = new WatchCallBack();
        MyConf myConf = new MyConf();

        watchCallBack.setZk(zk);
        watchCallBack.setConf(myConf);
        CountDownLatch countDownLatch = new CountDownLatch(1);


        //查看当前节点是否存在,参数需要一个监控器和异步回调器
        zk.exists("/AppConf",watchCallBack,watchCallBack ,"ABC");
        watchCallBack.setCountDownLatch(countDownLatch);
        try { 
        
            countDownLatch.await();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }

        while 
        标签: 温度传感器wkl

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台