资讯详情

Java 多线程共享模型之管程(下)

共享模型管程

wait、notify

wait、notify 原理

  • Owner线程发现条件不满足,调用 wait 方法,可以进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 线程堵塞,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 释放时唤醒线程
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时间唤醒,但唤醒后并不意味着立即获得锁,仍需进入EntryList 重新竞争

API 介绍

  • obj.wait() 让进入 object 线程到监视器 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 在等待线程中选择唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 所有等待的线程都被唤醒

它们都是线程之间合作的手段,都属于 Object 对象的方法。在调用这些方法之前,必须获得对象的锁。

package WaNo;  import lombok.extern.slf4j.Slf4j;  @Slf4j(topic = "c.demo2") public class demo2 {   static final Object lock = new Object();   public static void main(String[] args) throws InterruptedException {   new Thread(() -> {    synchronized (lock){     log.debug("执行");     try {      lock.wait();     } catch (InterruptedException e) {      e.printStackTrace();     }     log.debug("其他代码");    }   },"t1").start();    new Thread(() -> {    synchronized (lock){     log.debug("执行");     try {      lock.wait();     } catch (InterruptedException e) {      e.printStackTrace();     }     log.debug("其他代码");    }   },"t2").start();    Thread.sleep(2000);   log.debug("唤醒 lock 上其他线程");   synchronized (lock){    lock.notify(); //唤醒 lock 上一个线程(随机)    //lock.notifyAll(); //唤醒 lock 所有线程   }  } } 
  • 20:20:58 [t1] c.demo2 - 执行 20:20:58 [t2] c.demo2 - 执行 20:21:00 [main] c.demo2 - 唤醒 lock 上其他线程 20:21:00 [t1] c.demo2 - 其他代码 
  • 20:22:04 [t1] c.demo2 - 执行 20:22:04 [t2] c.demo2 - 执行 20:22:06 [main] c.demo2 - 唤醒 lock 上其他线程 20:22:06 [t2] c.demo2 - 其他代码 20:22:06 [t1] c.demo2 - 其他代码 

wait() 方**释放对象的锁,进入 WaitSet 等待区让其他线程有机会获得对象的锁。notify 为止

wait(long n) 有时限等待, 到 n 毫秒后结束等待,或者被人接受 notify

wait、notify 正确使用

sleep vs. wait

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 但是 wait 需要和 synchronized 一起用
  • sleep 睡觉时,物体锁不会释放,但 wait 等待时释放物体锁
  • 它们状态 TIMED_WAITING

step 1

想想下面的解决方案,为什么?

package WaNo;  import lombok.extern.slf4j.Slf4j;  @Slf4j(topic = "c.demo4") public class demo4 {  static final Object room = new Object();  static boolean hasCigarette = false;  static boolean hasTakeOut = false;   public static void main(String[] args) throws InterruptedException {   new Thread(() -> {    synchronized (room){     log.debug("有烟没?[{}]",hasCigarette);     if(![{}]",hasCigarette);     if(!hasCigarette){      log.debug("没烟,睡觉!");      try {       Thread.sleep(2000);      } catch (InterruptedException e) {       e.printStackTrace();      }     }     log.debug("有烟没?[{}]",hasCigarette);     if(hasCigarette){      log.debug("开始干活!");     }    }   },"小南").start();    for(int i=0;i<5;i  ){    new Thread(() -> {     synchronized (room){      log.debug("开始干活!");     }    },"其他人").start();   }    Thread.sleep(1000);   new Thread(() -> {    hasCigarette = true;    log.debug("烟到了!");   },"送烟的").start();  } } 

输出:

20:41:09 [小南] c.demo4 - 有烟没?[false] 20:41:09 [小南] c.demo4 - 没烟,睡觉! 20:41:10 [送烟的] c.demo4 - 烟到了! 20:41:11 [小南] c.demo4 - 有烟没?[true] 20:41:11 [小南] c.demo4 - 开始干活! 20:41:11 [其他人] c.demo4 - 开始干活! 20:41:11 [其他人] c.demo4 - 开始干活! 20:41:11 [其他人] c.demo4 - 开始干活! 20:41:11 [其他人] c.demo4 - 开始干活! 20:41:11 [其他人] c.demo4 - 开始干活! 
  • 其他工作线程应始终堵塞,效率过低
  • 必须睡足小南线程 2s 即使烟提前送来,也不能马上醒来
  • 加了 synchronized (room) 之后,就像小南把门锁在里面睡觉,烟根本送不进门,main 没加synchronized 就好像 main 翻窗进来的线程
  • 解决方案,使用 wait - notify 机制

step 2

想想下面的实现,为什么?

package WaNo.step;  import lombok.extern.slf4j.Slf4j;  @Slf4j(topic = "c.demo4") public class step2 {  static final Object room = new Object();  static boolean hasCigarette = false;  static boolean hasTakeOut = false;   public static void main(String[] args) throws InterruptedException {   new Thread(() -> {    synchronized (room){     log.debug("有烟没?[{}]",hasCigarette);     if(![{}]",hasCigarette);     if(!hasCigarette){      log.debug("没烟,睡觉!");      try {       room.wait();      } catch (InterruptedException e) {       e.printStackTrace();      }     }     log.debug("有烟没?[{}]",hasCigarette);     if(hasCigarette){      log.debug("开始干活!");     }    }   },"小南").start();    for(int i=0;i<5;i  ){    new Thread(() -> {     synchronized (room){      log.debug("开始干活!");     }    },"其他人").start();   }    Thread.sleep(1000);   new Thread(() > {
   synchronized (room){
    hasCigarette = true;
    log.debug("烟到了!");
    room.notify();
   }
  },"送烟的").start();
 }
}

输出:

20:46:32 [小南] c.demo4 - 有烟没?[false]
20:46:32 [小南] c.demo4 - 没烟,睡会!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:33 [送烟的] c.demo4 - 烟到了!
20:46:33 [小南] c.demo4 - 有烟没?[true]
20:46:33 [小南] c.demo4 - 开始干活!
  • 解决了其它干活的线程阻塞的问题
  • 但如果有其它线程也在等待条件呢?

step 3

package WaNo.step;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class step3 {
 static final Object room = new Object();
 static boolean hasCigarette = false;
 static boolean hasTakeOut = false;

 public static void main(String[] args) throws InterruptedException {
  new Thread(() -> {
   synchronized (room){
    log.debug("有烟没?[{}]",hasCigarette);
    if(!hasCigarette){
     log.debug("没烟,睡会!");
     try {
      room.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("有烟没?[{}]",hasCigarette);
    if(hasCigarette){
     log.debug("开始干活!");
    } else {
     log.debug("没干成活...");
    }
   }
  },"小南").start();

  new Thread(() -> {
   synchronized (room) {
    Thread thread = Thread.currentThread();
    log.debug("外卖送到没?[{}]", hasTakeOut);
    if (!hasTakeOut) {
     log.debug("没外卖,先歇会!");
     try {
      room.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("外卖送到没?[{}]", hasTakeOut);
    if (hasTakeOut) {
     log.debug("可以开始干活了");
    } else {
     log.debug("没干成活...");
    }
   }
  }, "小女").start();

  for(int i=0;i<5;i++){
   new Thread(() -> {
    synchronized (room){
     log.debug("开始干活!");
    }
   },"其他人").start();
  }

  Thread.sleep(1000);
  new Thread(() -> {
   synchronized (room){
    hasCigarette = true;
    log.debug("烟到了!");
    room.notify();
   }
  },"送烟的").start();
 }
}

输出:

20:53:12.173 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:12.176 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:53:12.176 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:53:12.176 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:53:13.174 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:53:13.174 [小南] c.TestCorrectPosture - 没干成活...

notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】

解决方法,改为 notifyAll

step 4

new Thread(() -> {
 	synchronized (room) {
 		hasTakeout = true;
 		log.debug("外卖到了噢!");
 		room.notifyAll();
 	}
}, "送外卖的").start();

输出:

20:55:23.978 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:55:23.982 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:55:23.982 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:55:23.982 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:55:24.979 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:55:24.979 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
20:55:24.980 [小女] c.TestCorrectPosture - 可以开始干活了
20:55:24.980 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:55:24.980 [小南] c.TestCorrectPosture - 没干成活...

用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了

解决方法,用 while + wait,当条件不成立,再次 wait

step 5

将 if 改为 while

while (!hasCigarette) {
 	log.debug("没烟,先歇会!");
 	try {
 		room.wait();
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
}

输出:

20:58:34.322 [小南] c.TestCorrectPosture - 有烟没?[false] 
20:58:34.326 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:58:34.326 [小女] c.TestCorrectPosture - 外卖送到没?[false] 
20:58:34.326 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:58:35.323 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外卖送到没?[true] 
20:58:35.324 [小女] c.TestCorrectPosture - 可以开始干活了
20:58:35.324 [小南] c.TestCorrectPosture - 没烟,先歇会!

套路总结

synchronized(lock) {
 	while(条件不成立) {
 		lock.wait();
 	}
 	// 干活
}

//另一个线程
synchronized(lock) {
 	lock.notifyAll();
}

同步模式之保护性暂停

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

实现

package WaNo.step;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j(topic = "c.demo4")
public class demo4 {
 public static void main(String[] args) {
  //线程1 等待线程2 的下载结果
  GuardedObject guardedObject = new GuardedObject();
  new Thread(() -> {
   List<String> list = (List<String>) guardedObject.get();
   log.debug("结果的大小是:{}",list.size());
  },"t1").start();

  new Thread(() -> {
   log.debug("执行下载");
   try {
    Thread.sleep(5000);
    List<String> list = new ArrayList<>();
    list.add("1");
    guardedObject.complete(list);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  },"t2").start();
 }
}

class GuardedObject {
 //结果
 private Object response;

 //获取结果
 public Object get() {
  synchronized (this){
   //还没有结果
   while (response == null){
    try {
     this.wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   return response;
  }
 }

 //产生结果
 public void complete(Object response){
  synchronized (this){
   //给结果成员变量赋值
   this.response = response;
   this.notifyAll();
  }
 }
}

输出:

16:47:15 [t2] c.demo4 - 执行下载
16:47:20 [t1] c.demo4 - 结果的大小是:1

异步模式之生产者/消费者

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

package WaNo;

import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

@Slf4j(topic = "c.demo5")
public class demo5 {
 public static void main(String[] args) {
  MessageQueue queue = new MessageQueue(2);

  for (int i = 0; i < 3; i++) {
   int id = i;
   new Thread(() -> {
    queue.put(new Message(id,"值"+id));
   },"生产者" + i).start();
  }

  new Thread(() -> {
   while (true){
    try {
     Thread.sleep(1000);
     Message message = queue.take();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  },"消费者").start();
 }
}

//消息队列类(线程间通信)
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
 //消息队列集合
 private LinkedList<Message> list = new LinkedList<>();
 //队列容量
 private int capcity;

 public MessageQueue(int capcity){
  this.capcity = capcity;
 }

 //获取消息
 public Message take(){
  //检查队列是否为空
  synchronized (list){
   while (list.isEmpty()){
    try {
     log.debug("队列为空,消费者线程等待");
     list.wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   //从队列头部获取消息返回
   Message message = list.removeFirst();
   log.debug("已消费消息 {}",message);
   list.notifyAll();
   return message;
  }
 }

 //存入消息
 public void put(Message message){
  synchronized (list){
   //检查队列是否已满
   while (list.size() == capcity){
    try {
     log.debug("队列已满,生产者线程等待");
     list.wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   //将消息加入队列的尾部
   list.addLast(message);
   log.debug("已生产消息 {}",message);
   list.notifyAll();
  }
 }
}

@Setter
@AllArgsConstructor
@ToString
@Slf4j(topic = "c.Message")
final class Message {
 private int id;
 private Object value;
}

输出:

17:18:49 [生产者0] c.MessageQueue - 已生产消息 Message(id=0, value=值0)
17:18:49 [生产者2] c.MessageQueue - 已生产消息 Message(id=2, value=值2)
17:18:49 [生产者1] c.MessageQueue - 队列已满,生产者线程等待
17:18:50 [消费者] c.MessageQueue - 已消费消息 Message(id=0, value=值0)
17:18:50 [生产者1] c.MessageQueue - 已生产消息 Message(id=1, value=值1)
17:18:51 [消费者] c.MessageQueue - 已消费消息 Message(id=2, value=值2)
17:18:52 [消费者] c.MessageQueue - 已消费消息 Message(id=1, value=值1)
17:18:53 [消费者] c.MessageQueue - 队列为空,消费者线程等待

park、unpark

基本使用">基本使用

它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park(); 

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

Thread t1 = new Thread(() -> {
 	log.debug("start...");
 	sleep(1);
 	log.debug("park...");
 	LockSupport.park();
 	log.debug("resume...");
},"t1");
t1.start();

Thread.sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park... 
18:42:54.583 c.TestParkUnpark [main] - unpark... 
18:42:54.583 c.TestParkUnpark [t1] - resume...

Thread t1 = new Thread(() -> {
 	log.debug("start...");
  sleep(2);
  log.debug("park...");
  LockSupport.park();
  log.debug("resume...");
}, "t1");
t1.start();

sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark... 
18:43:52.769 c.TestParkUnpark [t1] - park... 
18:43:52.769 c.TestParkUnpark [t1] - resume...

特点

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

原理

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

重新理解六种状态

假设有线程 Thread t

情况一

NEW --> RUNNABLE

当调用 t.start() 方法时,由 NEW --> RUNNABLE

情况二

​ RUNNABLE <--> WAITING

 用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait() 方法时, 从 RUNNABLE --> WAITING
  • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
    • 竞争锁成功, 从 WAITING --> RUNNABLE
    • 竞争锁失败, 从 WAITING --> BLOCKED

情况三

RUNNABLE <--> WAITING

  • 调用 t.join() 方法时,从 RUNNABLE --> WAITING
    • 注意是 的监视器上等待
  •  运行结束,或调用了的 interrupt() 时,从 WAITING --> RUNNABLE

情况四

RUNNABLE <--> WAITING

  • 当前线程调用 LockSupport.park() 方**让当前线程从 RUNNABLE --> WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE

情况五

​ RUNNABLE <--> TIMED_WAITING

 用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait(long n) 方法时, 从 RUNNABLE --> TIMED_WAITING
  •  等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
    • 竞争锁成功, 从 TIMED_WAITING --> RUNNABLE
    • 竞争锁失败, 从 TIMED_WAITING --> BLOCKED

情况六

RUNNABLE <--> TIMED_WAITING

  • 调用 t.join(long n) 方法时,从 RUNNABLE --> TIMED_WAITING
    • 注意是 的监视器上等待
  • 等待时间超过了 n 毫秒,或 运行结束,或调用了的 interrupt() 时,从TIMED_WAITING --> RUNNABLE

情况七

RUNNABLE <--> TIMED_WAITING

  • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
  • 等待时间超过了 n 毫秒,从 TIMED_WAITING --> RUNNABLE

情况八

RUNNABLE <--> TIMED_WAITING

  • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,从 RUNNABLE --> TIMED_WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从TIMED_WAITING--> RUNNABLE

情况九

RUNNABLE <--> BLOCKED

  •  用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中  竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED

情况十

RUNNABLE <--> TERMINATED

当前线程所有代码运行完毕,进入 TERMINATED

多把锁

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo6")
public class demo6 {
 public static void main(String[] args) {
  BigRoom bigRoom = new BigRoom();
  new Thread(() -> {
   bigRoom.study();
  },"r1").start();

  new Thread(() -> {
   bigRoom.sleep();
  },"r2").start();
 }
}

@Slf4j(topic = "c.BigRoom")
class BigRoom {
 private final Object studyRoom = new Object();
 private final Object bedRoom = new Object();

 public void sleep(){
  synchronized (bedRoom){
   log.debug("sleep two hours");
   try {
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }

 public void study(){
  synchronized (studyRoom){
   log.debug("study one hour");
   try {
    Thread.sleep(1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}

输出:

20:01:42 [r2] c.BigRoom - sleep two hours
20:01:42 [r1] c.BigRoom - study one hour

将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

活跃性

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁

t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
 	synchronized (A) {
 		log.debug("lock A");
 		sleep(1);
		synchronized (B) {
 			log.debug("lock B");
 			log.debug("操作...");
 		}
 	}
}, "t1");

Thread t2 = new Thread(() -> {
 	synchronized (B) {
 		log.debug("lock B");
 		sleep(0.5);
 		synchronized (A) {
 			log.debug("lock A");
 			log.debug("操作...");
 		}
 	}
}, "t2");
t1.start();
t2.start();

输出:

12:22:06.962 [t2] c.TestDeadLock - lock B 
12:22:06.962 [t1] c.TestDeadLock - lock A

哲学家进餐问题

有五位哲学家,围坐在圆桌旁。

  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
  • 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
  • 如果筷子被身边的人拿着,自己就得等待

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束

public class TestLiveLock {
 	static volatile int count = 10;
 	static final Object lock = new Object();
 	public static void main(String[] args) {
 	new Thread(() -> {
 		// 期望减到 0 退出循环
 		while (count > 0) {
 			sleep(0.2);
 			count--;
 			log.debug("count: {}", count);
 		}
 	}, "t1").start();
 	
 	new Thread(() -> {
 		// 期望超过 20 退出循环
 		while (count < 20) {
 			sleep(0.2);
 			count++;
 			log.debug("count: {}", count);
 		}
 	}, "t2").start();
 }

饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

基本语法

// 获取锁
reentrantLock.lock();
try {
 	// 临界区
} finally {
 	// 释放锁
 	reentrantLock.unlock();
}

可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo1")
public class demo1 {
 private static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) {

  lock.lock();
  try {
   log.debug("enter main");
   m1();
  }finally {
   lock.unlock();
  }
 }

 public static void m1(){
  lock.lock();
  try {
   log.debug("enter m1");
   m2();
  }finally {
   lock.unlock();
  }
 }

 public static void m2(){
  lock.lock();
  try {
   log.debug("enter m2");
  }finally {
   lock.unlock();
  }
 }
}

输出:

20:19:19 [main] c.demo1 - enter main
20:19:19 [main] c.demo1 - enter m1
20:19:19 [main] c.demo1 - enter m2

可打断

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo2")
public class demo2 {
 private static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   try {
    //如果没有竞争,此方**获取对象的锁
    //如果有竞争,就进入阻塞队列,可以被其他线程用 interrupt 打断
    log.debug("尝试获得锁");
    lock.lockInterruptibly();
   } catch (InterruptedException e) {
    e.printStackTrace();
    log.debug("未获得锁,返回");
    return;
   }
   try {
    log.debug("获取到锁");
   }finally {
    lock.unlock();
   }
  }, "t1");

  lock.lock();
  t1.start();
  Thread.sleep(1000);
  log.debug("打断t1");
  t1.interrupt();
 }
}

输出:

20:26:05 [t1] c.demo2 - 尝试获得锁
20:26:06 [main] c.demo2 - 打断t1
20:26:06 [t1] c.demo2 - 未获得锁,返回
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at ReentrantLockDemo.demo2.lambda$main$0(demo2.java:16)
	at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 0

注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断

锁超时

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
 private static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) {
  Thread t1 = new Thread(() -> {
   log.debug("尝试获得锁");
   if(!lock.tryLock()){
    log.debug("获取不到锁");
    return;
   }
   try {
    log.debug("获得到锁");
   }finally {
    lock.unlock();
   }
  },"t1");

  lock.lock();
  log.debug("获得到锁");
  t1.start();
 }
}

输出:

20:31:15 [main] c.demo3 - 获得到锁
20:31:15 [t1] c.demo3 - 尝试获得锁
20:31:15 [t1] c.demo3 - 获取不到锁

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;
import sun.reflect.generics.tree.Tree;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
 private static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   log.debug("尝试获得锁");
   try {
    if(!lock.tryLock(1, TimeUnit.SECONDS)){
     log.debug("获取不到锁");
     return;
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
    log.debug("获取不到锁");
    return;
   }
   try {
    log.debug("获得到锁");
   }finally {
    lock.unlock();
   }
  },"t1");

  lock.lock();
  log.debug("获得到锁");
  Thread.sleep(1000);
  lock.unlock();
  t1.start();
 }
}

输出:

20:34:03 [main] c.demo3 - 获得到锁
20:34:04 [t1] c.demo3 - 尝试获得锁
20:34:04 [t1] c.demo3 - 获得到锁

公平锁

ReentrantLock 默认是不公平的

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
 public static void main(String[] args) {
  ReentrantLock lock = new ReentrantLock(false);
  lock.lock();
  for (int i = 0; i < 500; i++) {
   new Thread(() -> {
    lock.lock();
    try {
     System.out.println(Thread.currentThread().getName() + " running...");
    } finally {
     lock.unlock();
    }
   }, "t" + i).start();
  }
// 1s 之后去争抢锁
  Thread.sleep(1000);
  new Thread(() -> {
   System.out.println(Thread.currentThread().getName() + " start...");
   lock.lock();
   try {
    System.out.println(Thread.currentThread().getName() + " running...");
   } finally {
    lock.unlock();
   }
  }, "强行插入").start();
  lock.unlock();
 }
}

强行插入,有机会在中间输出

t39 running... 
t40 running... 
t41 running... 
t42 running... 
t43 running... 
强行插入 start... 
强行插入 running... 
t44 running... 
t45 running... 
t46 running... 
t47 running... 
t49 running...

改为公平锁后

ReentrantLock lock = new ReentrantLock(true);

强行插入,总是在最后输出

t465 running... 
t464 running... 
t477 running... 
t442 running... 
t468 running... 
t493 running... 
t482 running... 
t485 running... 
t481 running... 
强行插入 running...

公平锁一般没有必要,会降低并发度

条件变量

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

使用要点:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行
package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo4")
public class demo4 {
 private static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) {
  //创建一个新的条件变量(休息室)
  Condition condition1 = lock.newCondition();
  Condition condition2 = lock.newCondition();

  lock.lock();
  //进入休息室等待
  condition1.await();
  
  condition1.signal();
  //condition1.signalAll();
 }
}

同步模式之顺序控制

固定运行顺序

比如,必须先 2 后 1 打印

wait notify版

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
 static final Object lock = new Object();
 //表示 t2 是否被运行过
 static boolean t2runned = false;
 public static void main(String[] args) {
  Thread t1 = new Thread(() -> {
   synchronized (lock){
    while (!t2runned){
     try {
      lock.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("1");
   }
  },"t1");

  Thread t2 = new Thread(() -> {
   synchronized (lock){
    log.debug("2");
    t2runned = true;
    lock.notify();
   }
  },"t2");

  t1.start();
  t2.start();
 }
}

输出:

20:49:28 [t2] c.demo4 - 2
20:49:28 [t1] c.demo4 - 1

park unpark版

可以看到,实现上很麻烦:

  • 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
  • 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
  • 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
 public static void main(String[] args) {
  Thread t1 = new Thread(() -> {
   LockSupport.park();
   log.debug("1");
  }, "t1");

  t1.start();

  new Thread(() -> {
   log.debug("2");
   LockSupport.unpark(t1);
  },"t2").start();
 }
}

交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

wait notify版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
 public static void main(String[] args) {
  WaitNotify wn = new WaitNotify(1,5);
  new Thread(() -> {
   wn.print("a",1,2);
  }).start();
  new Thread(() -> {
   wn.print("b",2,3);
  }).start();
  new Thread(() -> {
   wn.print("c",3,1);
  }).start();
 }
}
/*
 输出内容 等待标记 下一个标记
 a   1   2
 b   2   3
 c   3   1
 */
@AllArgsConstructor
class WaitNotify{
 //等待标记
 private int flag;
 //循环次数
 private int loopNumber;

 //打印
 public void print(String str,int waitFlag,int nextFlag){
  for (int i = 0; i < loopNumber; i++) {
   synchronized (this){
    while (flag != waitFlag){
     try {
      this.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    System.out.print(str);
    flag = nextFlag;
    this.notifyAll();
   }
  }
 }
}

输出:

abcabcabcabcabc

ReentrantLock版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo6")
public class demo6 {
 public static void main(String[] args) throws InterruptedException {
  AwaitSignal awaitSignal = new AwaitSignal(5);
  Condition a = awaitSignal.newCondition();
  Condition b = awaitSignal.newCondition();
  Condition c = awaitSignal.newCondition();
  new Thread(() -> {
   awaitSignal.print("a", a, b);
  }).start();
  new Thread(() -> {
   awaitSignal.print("b", b, c);
  }).start();
  new Thread(() -> {
   awaitSignal.print("c", c, a);
  }).start();

  Thread.sleep(1000);
  awaitSignal.lock();
  try {
   System.out.println("开始。。。");
   a.signal();
  }finally {
   awaitSignal.unlock();
  }
 }
}

@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
 private int loopNumber;
 /**
  * @param str 打印内容
  * @param current 进入哪一间休息室
  * @param next 下一间休息室
  */
 public void print(String str,Condition current,Condition next){
  for (int i = 0; i < loopNumber; i++) {
   lock();
   try {
    try {
     current.await();
     System.out.print(str);
     next.signal();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }finally {
    unlock();
   }
  }
 }
}

输出:

开始。。。
abcabcabcabcabc

park unpark版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;

import java.util.concurrent.locks.LockSupport;

public class demo7 {

 static Thread t1;
 static Thread t2;
 static Thread t3;

 public static void main(String[] args) {
  ParkUnpark pu = new ParkUnpark(5);
  t1 = new Thread(() -> {
   pu.print("a", t2);
  },"t1");
  t2 = new Thread(() -> {
   pu.print("b", t3);
  },"t2");
  t3 = new Thread(() -> {
   pu.print("c", t1);
  },"t3");

  t1.start();
  t2.start();
  t3.start();

  LockSupport.unpark(t1);
 }
}

@AllArgsConstructor
class ParkUnpark{
 private int loopNumber;

 public void print(String str,Thread next){
  for (int i = 0; i < loopNumber; i++) {
   LockSupport.park();
   System.out.print(str);
   LockSupport.unpark(next);
  }
 }
}

输出:

abcabcabcabcabc

“做程序员,圈子和学习最重要”因为有有了圈子可以让你少走弯路,扩宽人脉,扩展思路,学习他人的一些经验及学习方法!同时在这分享一下是一直以来整理的Java后端进阶笔记文档和学习资料免费分享给大家!需要资料的朋友私信我扣【06】 转载请保留本文网址: http://www.shaoqun.com/a/1926525.html

标签: t40fm扭矩传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台