文章目录
- 1:Zookeeper分布式锁分析
-
- 1.zookeeper实现分布式锁方案
- 2.Redis实现分布式锁方案
- 3.对比zookeeper与redis实现分布式锁
- 2:zookeeper锁的实现
-
- 1:获取zk实例工具类
- 2:测试类
- 三、核心业务实现类
1:Zookeeper分布式锁分析
1.zookeeper实现分布式锁方案
a.只有一个人能得到锁
b.获,客户端问题,临时节点(session)
c.如何通知其他客户锁被释放和删除
c-1: 主动轮询,心跳:缺点:延迟,压力,
c-2: watch: 解决延迟问题。 弊端:压力
c-3: sequence watch:watch 前一个,最小的锁,一旦最小的锁释放,成本:zk只需回调第二次发送时间
2.Redis实现分布式锁方案
a.使用setnx()获取锁信息的方法
b.设置过期时间,防止客户端down机器,导致死锁
c.多线程(守护线程)、监控锁、业务未完成、锁过期、自动延迟
3.对比zookeeper与redis实现分布式锁
3.1 从获得锁的速度来看,redis的速度优于zookeeper
3.从方案实现的角度来看,zookeeper实现相对redis简单,zookeeper只要锁和回调,redis增加线程对锁监控。
2:zookeeper锁的实现
关于获取zk详细说明实例的代码。建议去看这篇文章。这篇文章省略了。只粘贴代码
1:获取zk实例的工具类
package com.example.zookeeperdemo.zookeeper.confog_wkl; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * Description:获取zk实例 * Date: 2022/5/8 - 上午 4:17 * author: wangkanglu * version: V1.0 */ public class ZKUtils {
private static ZooKeeper zk; ///在端口号后面加/testConfig,后续程序中使用的根节点是testConfig private static String adress = "192.168.138.128:2181/testConfig"; private static int timeout = 3000; private static DefaultWatch defaultWatch = new DefaultWatch(); private static CountDownLatch downLatch = new CountDownLatch(1); public static ZooKeeper getZK(
)
{
try
{
zk
=
new
ZooKeeper
(adress
,timeout
,defaultWatch
)
; defaultWatch
.
setCd
(downLatch
)
;
//因为zk连接是异步的,此处采用CountDownLatch downLatch
.
await
(
)
;
}
catch
(
IOException e
)
{
e
.
printStackTrace
(
)
;
}
catch
(
InterruptedException e
)
{
e
.
printStackTrace
(
)
;
}
return zk
;
}
}
2:测试类
package com.example.zookeeperdemo.zookeeper.lock_wkl;
import com.example.zookeeperdemo.zookeeper.confog_wkl.ZKUtils;
import com.example.zookeeperdemo.zookeeper.lock.WatchCallBack;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/** * Description: * Date: 2022/5/8 - 上午 6:23 * author: wangkanglu * version: V1.0 */
public class TestLock {
ZooKeeper zk ;
@Before
public void conn (){
zk = ZKUtils.getZK();
}
@After
public void close (){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void test(){
for (int i = 0; i < 10; i++) {
new Thread(()->{
LockWatchCallBack watchCallBack = new LockWatchCallBack();
watchCallBack.setZk(zk);
String threadName = Thread.currentThread().getName();
watchCallBack.setThreName(threadName);
//每一个线程:
//抢锁
watchCallBack.tryLock();
//干活
System.out.println(threadName+" working...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//释放锁
watchCallBack.unLock();
}).start();
}
while(true){
}
}
}
3:核心业务实现类
package com.example.zookeeperdemo.zookeeper.lock_wkl;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/** * Description: * Date: 2022/5/8 - 上午 6:27 * author: wangkanglu * version: V1.0 */
public class LockWatchCallBack implements AsyncCallback.StringCallback , AsyncCallback.Children2Callback,Watcher, AsyncCallback.StatCallback {
private ZooKeeper zk;
private String threName;
private String lockName;
CountDownLatch cc = new CountDownLatch(1);
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public String getThreName() {
return threName;
}
public void setThreName(String threName) {
this.threName = threName;
}
//获取锁
public void tryLock(){
System.out.println(threName + " create....");
zk.create("/lock",threName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, threName);
try {
cc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//释放锁
public void unLock(){
try {
zk.delete("/"+lockName,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
//创建节点时的回调函数
@Override
public void processResult(int rc, String path, Object ctx, String name) {
//每个线程启动后创建锁,然后get锁目录的所有孩子,不注册watch在锁目录
System.out.println(ctx.toString()+" create path: "+ name);
lockName = name.substring(1);
zk.getChildren("/", false, this, ctx );
}
//获得所有子节点的回调函数
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
Collections.sort(children);
int i = children.indexOf(lockName.substring(1));
//是不是第一个
if(i <1 ){
//yes
System.out.println(lockName +" i am first....");
try {
zk.setData("/",lockName.getBytes(),-1);
//如果是第一个就直接放行
cc.countDown();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
//如果不是的话,就直接监控自己前一个几点的生命周期
// 监控 回调
zk.exists("/"+children.get(i-1),this,this,"sdf");
}
}
监控回调
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
//监控
@Override
public void process(WatchedEvent event) {
//如果第一个哥们,那个锁释放了,其实只有第二个收到了回调事件!!
//如果,不是第一个哥们,某一个,挂了,也能造成他后边的收到这个通知,从而让他后边那个跟去watch挂掉这个哥们前边的。。。
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
//如果自己前一个节点被删除了,那么还是检查下自己是否是第一个子节点
zk.getChildren("/",false,this ,"sdf");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
}