资讯详情

浅谈(Java)JUC锁核心类AQS

??

?

??

??

??相遇是缘分,既然来了就拿着小板凳坐下来聊一会儿,如果在文中有所收获,请不要忘记一键三连,??,你的鼓励,是我创作的动力??!


文章目录

  • JUC: 核心类AQS详解
    • AbstractQueuedSynchronizer简介
      • AQS 核心思想
      • AQS 共享资源的方式
      • AQS模板采用模板方法模式
    • AbstractQueuedSynchronizer数据结构
    • AbstractQueuedSynchronizer源码分析
      • 类继承关系
      • 类的内部类 - Node类
      • 类的内部类 - ConditionObject类
      • 类的属性
      • 类的结构方法
      • 类的核心方法 - acquire方法
      • 类的核心方法 - release方法
    • AbstractQueuedSynchronizer示例详解一
    • AbstractQueuedSynchronizer示例详解二
    • AbstractQueuedSynchronizer总结

JUC锁: 锁核心类AQS详解

AbstractQueuedSynchronizer抽象是核心,需要掌握。它提供了一个基础FIFO可用于构建锁或其他相关同步装置的基本框架。

AbstractQueuedSynchronizer简介

AQS用于构建锁和同步器的框架AQS它可以简单高效地构建大量应用广泛的同步器,如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等都是基于的AQS的。当然,我们也可以利用自己AQS根据我们自己的需要,很容易构建同步器。

AQS 核心思想

AQS核心思想是,如果要求的共享资源是免费的,则将当前要求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。如果要求的共享资源被占用,则需要一套线程阻塞等待和锁分配机制,该机制被唤醒AQS是用CLH队列锁实现了,将暂时无法获得的线程加入队列。

CLH(Craig,Landin,and Hagersten)队列是虚拟的双向队列(虚拟的双向队列没有队列实例,只有结点之间的关系)。AQS将每个要求共享资源的线程包装成一个CLH锁队列的一个结点(Node)实现锁的分配。

AQS使用一个int成员变量通过内置来表示同步状态FIFO排队完成获取资源线程的排队工作。AQS使用CAS对同步状态进行原子操作,以修改其值。

private volatile int state;//共享变量,使用volatile保证线程可见性  

通过状态信息procted类型的getState,setState,compareAndSetState进行操作

////返回同步状态的当前值 protected final int getState() { 
                   return state; }  // 设置同步状态值 protected final void setState(int newState) { 
                  state = newState; } //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态等于expect(期望值) protected final boolean compareAndSetState(int expect, int update) { 
                 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }  

AQS 共享资源的方式

AQS定义两种资源共享方式

  • Exclusive(独占):只能执行一个线程,如ReentrantLock。可分为公平锁和非公平锁:
  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
  • ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

    AQS底层使用了模板方法模式

    同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

    使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

    AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。 
    

    默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

    以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    AbstractQueuedSynchronizer数据结构

    AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

    image.png

    AbstractQueuedSynchronizer源码分析

    类的继承关系

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable   
    

    其中AbstractOwnableSynchronizer抽象类的源码如下:

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { 
            
        
        // 版本序列号
        private static final long serialVersionUID = 3737899427754241961L;
        // 构造方法
        protected AbstractOwnableSynchronizer() { 
             }
        // 独占模式下的线程
        private transient Thread exclusiveOwnerThread;
        
        // 设置独占线程 
        protected final void setExclusiveOwnerThread(Thread thread) { 
            
            exclusiveOwnerThread = thread;
        }
        
        // 获取独占线程 
        protected final Thread getExclusiveOwnerThread() { 
            
            return exclusiveOwnerThread;
        }
    }  
    

    AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。

    AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。

    类的内部类 - Node类

    static final class Node { 
            
        // 模式,分为共享与独占
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;        
        // 结点状态
        // CANCELLED,值为1,表示当前的线程被取消
        // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
        // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
        // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
        // 值为0,表示当前节点在sync队列中,等待着获取锁
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;        
    
        // 结点状态
        volatile int waitStatus;        
        // 前驱结点
        volatile Node prev;    
        // 后继结点
        volatile Node next;        
        // 结点所对应的线程
        volatile Thread thread;        
        // 下一个等待者
        Node nextWaiter;
        
        // 结点是否在共享模式下等待
        final boolean isShared() { 
            
            return nextWaiter == SHARED;
        }
        
        // 获取前驱结点,若前驱结点为空,抛出异常
        final Node predecessor() throws NullPointerException { 
            
            // 保存前驱结点
            Node p = prev; 
            if (p == null) // 前驱结点为空,抛出异常
                throw new NullPointerException();
            else // 前驱结点不为空,返回
                return p;
        }
        
        // 无参构造方法
        Node() { 
                // Used to establish initial head or SHARED marker
        }
        
        // 构造方法
            Node(Thread thread, Node mode) { 
                // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        
        // 构造方法
        Node(Thread thread, int waitStatus) { 
             // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    } 
    

    每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

    • CANCELLED,值为1,表示当前的线程被取消。
    • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
    • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。
    • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
    • 值为0,表示当前节点在sync queue中,等待着获取锁。

    类的内部类 - ConditionObject类

    这个类有点长,耐心看下:

    // 内部类
    public class ConditionObject implements Condition, java.io.Serializable { 
            
        // 版本号
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        // condition队列的头结点
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        // condition队列的尾结点
        private transient Node lastWaiter;
    
        /** * Creates a new {@code ConditionObject} instance. */
        // 构造方法
        public ConditionObject() { 
             }
    
        // Internal methods
    
        /** * Adds a new waiter to wait queue. * @return its new wait node */
        // 添加新的waiter到wait队列
        private Node addConditionWaiter() { 
            
            // 保存尾结点
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) { 
             // 尾结点不为空,并且尾结点的状态不为CONDITION
                // 清除状态为CONDITION的结点
                unlinkCancelledWaiters(); 
                // 将最后一个结点重新赋值给t
                t = lastWaiter;
            }
            // 新建一个结点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null) // 尾结点为空
                // 设置condition队列的头结点
                firstWaiter = node;
            else // 尾结点不为空
                // 设置为节点的nextWaiter域为node结点
                t.nextWaiter = node;
            // 更新condition队列的尾结点
            lastWaiter = node;
            return node;
        }
    
        /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */
        private void doSignal(Node first) { 
            
            // 循环
            do { 
            
                if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空
                    // 设置尾结点为空
                    lastWaiter = null;
                // 设置first结点的nextWaiter域
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                        (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
        }
    
        /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */
        private void doSignalAll(Node first) { 
            
            // condition队列的头结点尾结点都设置为空
            lastWaiter = firstWaiter = null;
            // 循环
            do { 
            
                // 获取first结点的nextWaiter域结点
                Node next = first.nextWaiter;
                // 设置first结点的nextWaiter域为空
                first.nextWaiter = null;
                // 将first结点从condition队列转移到sync队列
                transferForSignal(first);
                // 重新设置first
                first = next;
            } while (first != null);
        }
    
        /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */
        // 从condition队列中清除状态为CANCEL的结点
        private void unlinkCancelledWaiters() { 
            
            // 保存condition队列头结点
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) { 
             // t不为空
                // 下一个结点
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) { 
             // t结点的状态不为CONDTION状态
                    // 设置t节点的额nextWaiter域为空
                    t.nextWaiter = null;
                    if (trail == null) // trail为空
                        // 重新设置condition队列的头结点
                        firstWaiter = next;
                    else // trail不为空
                        // 设置trail结点的nextWaiter域为next结点
                        trail.nextWaiter = next;
                    if (next == null) // next结点为空
                        // 设置condition队列的尾结点
                        lastWaiter = trail;
                }
                else // t结点的状态为CONDTION状态
                    // 设置trail结点
                    trail = t;
                // 设置t结点
                t = next;
            }
        }
    
        // public methods
    
        /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
        // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
        public final void signal() { 
            
            if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
                throw new IllegalMonitorStateException();
            // 保存condition队列头结点
            Node first = firstWaiter;
            if (first != null) // 头结点不为空
                // 唤醒一个等待线程
                doSignal(first);
        }
    
        /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
        // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
        public final void signalAll() { 
            
            if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
                throw new IllegalMonitorStateException();
            // 保存condition队列头结点
            Node first = firstWaiter;
            if (first != null) // 头结点不为空
                // 唤醒所有等待线程
                doSignalAll(first);
        }
    
        /** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */
        // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
        public final void awaitUninterruptibly() { 
            
            // 添加一个结点到等待队列
            Node node = addConditionWaiter();
            // 获取释放的状态
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) { 
             // 
                // 阻塞当前线程
                LockSupport.park(this);
                if (Thread.interrupted()) // 当前线程被中断
                    // 设置interrupted状态
                    interrupted = true; 
            }
            if (acquireQueued(node, savedState) || interrupted) // 
                selfInterrupt();
        }
    
        /* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
    
        /** Mode meaning to reinterrupt on exit from wait */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait */
        private static final int THROW_IE    = -1;
    
        /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */
        private int checkInterruptWhileWaiting(Node node) { 
            
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0; 
        }
    
        /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException { 
            
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
    
        /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */
        // // 等待,当前线程在接到信号或被中断之前一直处于等待状态
        public final void await() throws InterruptedException { 
            
            if (Thread.interrupted()) // 当前线程被中断,抛出异常
                throw new InterruptedException();
            // 在wait队列上添加一个结点
            Node node = addConditionWaiter();
            // 
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { 
            
                // 阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    
        /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */
        // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException { 
            
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { 
            
                if (nanosTimeout <= 0L) { 
            
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime() 

    标签: rp5n连接电缆

    锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

    锐单商城 - 一站式电子元器件采购平台