资讯详情

Zookeeper-06-ZK的分布式锁实现

文章目录

  • 1:Zookeeper分布式分析
    • 1.zookeeper实现分布式锁方案
    • 2.Redis实现分布式锁方案
    • 3.对比zookeeper与redis实现分布式锁
  • 2:zookeeper锁的实现
    • 1:获取zk实例工具类
    • 2:测试类
    • 三、核心业务实现类

1:Zookeeper分布式锁分析

1.zookeeper实现分布式锁方案

a.只有一个人能得到锁

b.获,客户端问题,临时节点(session)

c.如何通知其他客户锁被释放和删除

c-1: 主动轮询,心跳:缺点:延迟,压力,

c-2: watch: 解决延迟问题。 弊端:压力

c-3: sequence watch:watch 前一个,最小的锁,一旦最小的锁释放,成本:zk只需回调第二次发送时间

2.Redis实现分布式锁方案

a.使用setnx()获取锁信息的方法

b.设置过期时间,防止客户端down机器,导致死锁

c.多线程(守护线程)、监控锁、业务未完成、锁过期、自动延迟

3.对比zookeeper与redis实现分布式锁

3.1 从获得锁的速度来看,redis的速度优于zookeeper

3.从方案实现的角度来看,zookeeper实现相对redis简单,zookeeper只要锁和回调,redis增加线程对锁监控。

2:zookeeper锁的实现

关于获取zk详细说明实例的代码。建议去看这篇文章。这篇文章省略了。只粘贴代码

1:获取zk实例的工具类

package com.example.zookeeperdemo.zookeeper.confog_wkl;  import org.apache.zookeeper.ZooKeeper;  import java.io.IOException; import java.util.concurrent.CountDownLatch;  /** * Description:获取zk实例 * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */ public class ZKUtils { 
              private  static ZooKeeper zk;     ///在端口号后面加/testConfig,后续程序中使用的根节点是testConfig     private static String adress = "192.168.138.128:2181/testConfig";     private static int timeout = 3000;      private  static  DefaultWatch defaultWatch = new DefaultWatch();     private static CountDownLatch downLatch = new CountDownLatch(1);      public static ZooKeeper getZK(
       
        )
        { 
          
        try 
        { 
          zk 
        = 
        new 
        ZooKeeper
        (adress
        ,timeout
        ,defaultWatch
        )
        ; defaultWatch
        .
        setCd
        (downLatch
        )
        ; 
        //因为zk连接是异步的,此处采用CountDownLatch downLatch
        .
        await
        (
        )
        ; 
        } 
        catch 
        (
        IOException e
        ) 
        { 
          e
        .
        printStackTrace
        (
        )
        ; 
        } 
        catch 
        (
        InterruptedException e
        ) 
        { 
          e
        .
        printStackTrace
        (
        )
        ; 
        } 
        return zk
        ; 
        } 
        } 
       

2:测试类

package com.example.zookeeperdemo.zookeeper.lock_wkl;


import com.example.zookeeperdemo.zookeeper.confog_wkl.ZKUtils;
import com.example.zookeeperdemo.zookeeper.lock.WatchCallBack;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/** * Description: * Date: 2022/5/8 - 上午 6:23 * author: wangkanglu * version: V1.0 */
public class TestLock { 
        

    ZooKeeper zk ;

    @Before
    public void conn (){ 
        
        zk  = ZKUtils.getZK();
    }

    @After
    public void close (){ 
        
        try { 
        
            zk.close();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    @Test
    public void test(){ 
        
        for (int i = 0; i < 10; i++) { 
        
            new Thread(()->{ 
        
                LockWatchCallBack watchCallBack = new LockWatchCallBack();
                watchCallBack.setZk(zk);
                String threadName = Thread.currentThread().getName();
                watchCallBack.setThreName(threadName);
                //每一个线程:
                //抢锁
                watchCallBack.tryLock();
                //干活
                System.out.println(threadName+" working...");
                    try { 
        
                        Thread.sleep(1000);
                    } catch (InterruptedException e) { 
        
                        e.printStackTrace();
                    }
                //释放锁
                watchCallBack.unLock();
            }).start();
        }

        while(true){ 
        

        }
    }

}

3:核心业务实现类

package com.example.zookeeperdemo.zookeeper.lock_wkl;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/** * Description: * Date: 2022/5/8 - 上午 6:27 * author: wangkanglu * version: V1.0 */
public class LockWatchCallBack implements AsyncCallback.StringCallback , AsyncCallback.Children2Callback,Watcher, AsyncCallback.StatCallback { 
        

    private ZooKeeper zk;
    private String threName;
    private String lockName;
    CountDownLatch cc = new CountDownLatch(1);

    public ZooKeeper getZk() { 
        
        return zk;
    }

    public void setZk(ZooKeeper zk) { 
        
        this.zk = zk;
    }

    public String getThreName() { 
        
        return threName;
    }

    public void setThreName(String threName) { 
        
        this.threName = threName;
    }

    //获取锁
    public void tryLock(){ 
        
        System.out.println(threName + " create....");
        zk.create("/lock",threName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, threName);
        try { 
        
            cc.await();
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        }
    }

    //释放锁
    public void unLock(){ 
        
        try { 
        
            zk.delete("/"+lockName,-1);
        } catch (InterruptedException e) { 
        
            e.printStackTrace();
        } catch (KeeperException e) { 
        
            e.printStackTrace();
        }
    }

    //创建节点时的回调函数
    @Override
    public void processResult(int rc, String path, Object ctx, String name) { 
        
        //每个线程启动后创建锁,然后get锁目录的所有孩子,不注册watch在锁目录
        System.out.println(ctx.toString()+" create path: "+ name);
        lockName = name.substring(1);
        zk.getChildren("/", false, this, ctx );
    }

    //获得所有子节点的回调函数
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { 
        
        Collections.sort(children);
        int i = children.indexOf(lockName.substring(1));


        //是不是第一个
        if(i <1 ){ 
        
            //yes
            System.out.println(lockName +" i am first....");
            try { 
        
                zk.setData("/",lockName.getBytes(),-1);
                //如果是第一个就直接放行
                cc.countDown();

            } catch (KeeperException e) { 
        
                e.printStackTrace();
            } catch (InterruptedException e) { 
        
                e.printStackTrace();
            }
        }else{ 
        
            //如果不是的话,就直接监控自己前一个几点的生命周期
            // 监控 回调
            zk.exists("/"+children.get(i-1),this,this,"sdf");
        }
    }

    监控回调
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) { 
        

    }

    //监控
    @Override
    public void process(WatchedEvent event) { 
        
        //如果第一个哥们,那个锁释放了,其实只有第二个收到了回调事件!!
        //如果,不是第一个哥们,某一个,挂了,也能造成他后边的收到这个通知,从而让他后边那个跟去watch挂掉这个哥们前边的。。。
        switch (event.getType()) { 
        
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                //如果自己前一个节点被删除了,那么还是检查下自己是否是第一个子节点
                zk.getChildren("/",false,this ,"sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }
}

标签: 温度传感器wkl

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台