Cancellation即取消机制,虽然AQS未提供公开cancel但它是真实的(可能有外包线程调用当前线程的中断方法), Cancellation 包括中断引起的失败和其他失败 取消和超时造成的;Cancellation使问题变得复杂, 因为它可能随时发生 ;
两个关键概念: 同步队列 和 条件队列。 两者绝对不同,不能混淆!!有必要引入这样的概念。
同步队列 命名来源于 sync queue; 为什么是sync,关键是对sync理解(这个词其实很难理解,英语意思是:同步;同步信号;同步域)表示与同步代码相关的东西;
wait queue 表示等待队列,强调等待状态,可以是同步队列,也可以是条件队列; 这取决于当时的语境
一个锁对象 Lock(通常内部有一个AQS)中, 同步队列是 main queue,只有一个; 位于同步队列Lock锁对象上方。
条件队列 位于锁对象创建的条件对象ConditionObject 以上可以有很多,因为lock对象可以随意创造许多条件!
特别注意: 条件队列 不直接位置Lock上面的锁对象!
spinForTimeoutThreshold 时间很短, 若超时时间小于此时间,则无需堵塞,自旋即可。否则需要堵塞;———— 为什么呢?因为阻塞涉及到内核空间的调用,所以是一个耗费性能的操作,要尽量避免!
自旋: spin,看起来很难理解; 其实就是说循环,也就是说 让线程在循环中执行;执行什么?通常效果是无效的;而且需要做一些判断、检测,如果满足了条件,则停止循环; 这个循环可以有各种条件,也可以是for(;;) 、while(true)或while(条件) 这种形式;———— 无论如何,循环必须有判断或判断 暂停条件。
阻塞状态 其实可以认为是 等待状态;
Synchronizer 协调器、电同步器、整步器 、网络同步装置;同步器同步装置;同步机———— 最好理解为同步器 synchronization 同时;同时;同步;同步控制;同步 ———— 最好理解同步机制 Lock ———— 最好理解为 同步锁
下面是 源码级别 逐字逐句 详细分析!
/** 提供封锁及相关框架。 同步器(semaphores),依赖 先入先出(FIFO)等待队列。 这种设计是为了 它是大多数依赖同步器的基础。 单个原子{@code int}值表示状态 改变这种状态的保护方法必须定义,改变这种状态的保护方法必须定义 定义这个状态对对象的意义是什么? 或释放。 考虑到这些,这一类的其他方法都携带了 排队和阻挡机制都出来了。子类可以保持 其他状态字段,但只有原子更新{@code int}。 使用方法{@link #getState}, {@link #setState}和{@link #compareAndSetState}是关于被追踪的 到同步。
<P>子类应定义为非公开的内部帮助程序。 类,用于实现同步属性。 附属班级。 类 {@codeAbstractQueuedSynchronizer}没有实现任何东西 同步接口。 相反,它定义了一些方法,比如 {@link #acquireInterruptibly},可以被调用为 具体的锁具和相关的同步器,以适当地 实现它们的公共方法。
<p>这类支持默认<em>排他性</em>,也支持默认<em>排他性</em>。 模式和<em>共享</em>模式。在独占模式下获得时。 试图获得其他线程的失败。共享模式 可能(但不需要)通过多线程获得成功。 除下列情况外,不理解这些差异: 机械地认为,当共享模式成功时,下一个 等待的线程(如果有的话)也必须确定是否可以 获得和。在不同模式下等待线程共享 同一FIFO队列。通常,实现子类只支持。 其中一种模式可以发挥作用,例如 {@link ReadWriteLock}。只支持排他性子类或 只有共享模式不需要定义支持未使用模式的方法。
<p>这类定义了嵌套{@link条件对象}类,它是 子类可以作为{@link通过子类实现条件} 支持独占模式的方法{@link #isHeldExclusively}报告同步是否特殊 持有当前线程的方法{@link #release}。 调用当前{@link #getState}值时完全释放 这个对象,象{@link #acquire},给定保存状态值。 最后,将对象恢复到以前的获取状态。 没有 {@code AbstractQueuedSynchronizer}方法,否则就会创建这样一个 因此,如果不能满足这种约束,就不要使用它。 的 {@link条件对象}的行为当然取决于 同步器实现的语义。
<p>该类提供检测、仪表和监控功能。 内部队列和类似内部队列的方法。 条件对象。这些对象可以根据需要导出类别 使用{@code AbstractQueuedSynchronizer}为他们的 同步机理。
<p>这种序列化只存储底层原子 整数维持状态,因此反序列化对象是空的 线程队列。典型的子类需要序列化 定义一个{@code readObject}该方法还原为已知的 反序列化的初始状态。
<h3>使用</h3></h3>。
<p>使用此类作为同步器的基础,请重新定义 以下方法适用于检查、/或修改: 1. 使用同步状态{@link #getState}, {@link #setState}和/或{@link #compareAndSetState}。
<ul <li> {@link #tryAcquire}{@link #tryAcquire} <li> {@link #tryRelease}{@link #tryRelease} <li> {@link #tryAcquireShared}{@link #tryAcquireShared} <li> {@link #tryReleaseShared}{@link #tryReleaseShared} <li> {@link #isHeldExclusively}{@link #isHeldExclusively} </ul>
默认情况下,这些方法中的每一种都会被抛出{@link UnsupportedOperationException}。 实现这些方法 一般来说,内部线程必须是安全的,应该是短的和 而不是阻塞。定义这些方法是<em>唯一支持的</em>。 使用这种方法。所有其他方法都声明了 {@code final},因为它们不能独立改变。
<p>你也可以从{@link AbstractOwnableSynchronizer}有用的线程跟踪器。 拥有专属同步器。 鼓励你使用它们 -- 这使得监控和诊断工具能够帮助用户监控和诊断以下方面: 确定哪些线程持有锁。
<p>虽然这一类是基于内部的FIFO队列,但它 不自动执行FIFO获取策略。 核心的 独占同步如下:
<pre> Acquire。 while (!tryAcquire(arg)) { <em>如果线程没有排队,则取消对线程的请求</em>。 <em>可能会阻止当前线程</em>。 }
释放。 if (tryRelease(arg)) <em>解锁第一排队的线程</em>。 </pre></pre>
共享模式相似,但可能涉及级联信号)。 因为入队前可以检查获取方法中的新获取线程<em>barge</em>领先于 其他被阻止和排队的人。 不过,如果你愿意,可以。 定义{@code tryAcquire}和/或{@code tryAcquireShared}为 通过内部调用其中一个或多个检查功能来禁止驳运 方法,从而提供了一个<em>公平</em>的FIFO采集顺序。 特别是,大多数公平同步器可以定义{@code tryAcquire}。 返回{@code false},如果{@link #hasQueuedPredecessors}(一个方法) 专为公平同步器设计的)返回到 {@code true}。 其他的变化是可能的。
<p>一般来说,吞吐量和可扩展性是最高的。 默认驳船(又称<em>greedy</em>。 <em>放弃</em>,和<em>回避</em>)策略。 虽然这不能保证公平或无饥饿感,但早期的 队列中的线程允许在后面的队列中的线程之前重新约定 每一次和解都有不偏不倚的成功机会 针对传入的线程。 此外,虽然获取方法不 "旋转"在通常的意义上,它们可以执行多个 调用{@code tryAcquire}的调用与其他 阻止之前的计算。 这就给了大部分的好处 独家同步时的旋转,只需短暂地举行一次,而不需 大部分的责任时,它不是。如果你愿意的话,可以 通过在前面调用获取方法时,用 "fast-path "检查,可能是预先检查{@link #hasContended}。 和/或{@link #hasQueuedThreads},只有在同步器的条件下,才会这样做。 很可能不会被争论。
<p>这个类为我们提供了一个有效的、可扩展的基础。 同步化的部分原因是,将其使用范围专门用于 可以依赖{@code int}状态的同步器,获取、和 释放参数,以及内部FIFO等待队列。当这样做的时候 还不够,你可以从更低的层面上用 {@link java.utilite.concurrent.atomic atomic}类,你自己的自定义的 {@link java.utilite.Queueue}类,和{@link LockSupport}阻塞 支持。
* * <h3>Usage Examples</h3> * * <p>Here is a non-reentrant mutual exclusion lock class that uses * the value zero to represent the unlocked state, and one to * represent the locked state. While a non-reentrant lock * does not strictly require recording of the current owner * thread, this class does so anyway to make usage easier to monitor. * It also supports conditions and exposes * one of the instrumentation methods: * * <pre> {@code * class Mutex implements Lock, java.io.Serializable { * * // Our internal helper class * private static class Sync extends AbstractQueuedSynchronizer { * // Reports whether in locked state * protected boolean isHeldExclusively() { * return getState() == 1; * } * * // Acquires the lock if state is zero * public boolean tryAcquire(int acquires) { * assert acquires == 1; // Otherwise unused * if (compareAndSetState(0, 1)) { * setExclusiveOwnerThread(Thread.currentThread()); * return true; * } * return false; * } * * // Releases the lock by setting state to zero * protected boolean tryRelease(int releases) { * assert releases == 1; // Otherwise unused * if (getState() == 0) throw new IllegalMonitorStateException(); * setExclusiveOwnerThread(null); * setState(0); * return true; * } * * // Provides a Condition * Condition newCondition() { return new ConditionObject(); } * * // Deserializes properly * private void readObject(ObjectInputStream s) * throws IOException, ClassNotFoundException { * s.defaultReadObject(); * setState(0); // reset to unlocked state * } * } * * // The sync object does all the hard work. We just forward to it. * private final Sync sync = new Sync(); * * public void lock() { sync.acquire(1); } * public boolean tryLock() { return sync.tryAcquire(1); } * public void unlock() { sync.release(1); } * public Condition newCondition() { return sync.newCondition(); } * public boolean isLocked() { return sync.isHeldExclusively(); } * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } * public void lockInterruptibly() throws InterruptedException { * sync.acquireInterruptibly(1); * } * public boolean tryLock(long timeout, TimeUnit unit) * throws InterruptedException { * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); * } * }}</pre> * * <p>Here is a latch class that is like a * {@link java.util.concurrent.CountDownLatch CountDownLatch} * except that it only requires a single {@code signal} to * fire. Because a latch is non-exclusive, it uses the {@code shared} * acquire and release methods. * * <pre> {@code * class BooleanLatch { * * private static class Sync extends AbstractQueuedSynchronizer { * boolean isSignalled() { return getState() != 0; } * * protected int tryAcquireShared(int ignore) { * return isSignalled() ? 1 : -1; * } * * protected boolean tryReleaseShared(int ignore) { * setState(1); * return true; * } * } * * private final Sync sync = new Sync(); * public boolean isSignalled() { return sync.isSignalled(); } * public void signal() { sync.releaseShared(1); } * public void await() throws InterruptedException { * sync.acquireSharedInterruptibly(1); * } * }}</pre> * * @since 1.5 * @author Doug Lea */ AQS的关键属性: state。一般情况下state 是指资源,默认情况下当然state为0,普通竞争情况下可以表示等待锁的线程数量(假设一个线程最多只能获取一个资源)
AQS的Node对象难以理解,尤其是其state的状态,哦不, 是waitStatus,是waitStatus默认是0,只允许存在5个值 ; 但是可以看到, 都是常量,除了CANCELLED,都是负数,分别什么意思呢? 为什么这样设计呢?
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/** * Creates a new {@code AbstractQueuedSynchronizer} instance * with initial synchronization state of zero. */ protected AbstractQueuedSynchronizer() { }
/** * Wait queue node class. * * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. A * "status" field in each node keeps track of whether a thread * should block. A node is signalled when its predecessor * releases. Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. The status field does NOT control whether threads are * granted locks etc though. A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. * * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ 从上图来看,CLH锁,似乎只有一个prev,那么Node的next变量呢? * </pre> * * <p>Insertion into a CLH queue requires only a single atomic * operation on "tail", so there is a simple atomic point of * demarcation from unqueued to queued. Similarly, dequeuing * involves only updating the "head". However, it takes a bit * more work for nodes to determine who their successors are, * in part to deal with possible cancellation due to timeouts * and interrupts. * * <p>The "prev" links (not used in original CLH locks), are mainly * needed to handle cancellation. If a node is cancelled, its * successor is (normally) relinked to a non-cancelled * predecessor. For explanation of similar mechanics in the case * of spin locks, see the papers by Scott and Scherer at * http://www.cs.rochester.edu/u/scott/synchronization/ * * <p>We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. Determination of * successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) * * <p>Cancellation introduces some conservatism to the basic * algorithms. Since we must poll for cancellation of other * nodes, we can miss noticing whether a cancelled node is * ahead or behind us. This is dealt with by always unparking * successors upon cancellation, allowing them to stabilize on * a new predecessor, unless we can identify an uncancelled * predecessor who will carry this responsibility. * * <p>CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * * <p>Threads waiting on Conditions use the same nodes, but * use an additional link. Conditions only need to link nodes * in simple (non-concurrent) linked queues because they are * only accessed when exclusively held. Upon await, a node is * inserted into a condition queue. Upon signal, the node is * transferred to the main queue. A special value of status * field is used to mark which queue a node is on. * * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. /** 等待队列节点类。
<p>等待队列是 "CLH "的变体(Craig,Landin,和 Hagersten)锁队列。CLH锁通常用于 旋转锁。 我们反而用它们来阻断同步器,但 牵一发而动全身 节点的前辈中的线程信息。 每个节点中的 "状态 "字段记录了一个线程是否有一个 应该阻止。 当一个节点的前任 释放。 队列的每一个节点,否则就作为一个 专项通知式监控器,手持单机等待 线程。状态字段不控制线程是否为 授予锁等,但。 一个线程可以尝试获取,如果它是 拔得头筹 但第一并不能保证成功。 它只是赋予了竞争的权利。 所以,目前发布的 竞争者线程可能需要重新等待。
<p>要入队到一个CLH锁,你可以将其原子化拼接为new 尾部。要取消队列,只需设置头部字段即可。
<P>插入到CLH队列中只需要一个原子字段 尾巴 "上的操作,所以有一个简单的原子点的 从未排队到排队的分界。同样,取消排队 只涉及更新 "头"。然而,这需要一点 节点的工作更多的是确定谁是接班人。 部分原因是为了处理可能因超时而取消的问题。 和中断。
<p>"prev "链接(原CLH锁中没有使用),主要是指的是 处理取消所需。如果一个节点被取消,其 继承者(通常)重新链接到一个未取消的 前任。对于类似的机理的解释 的自旋锁,见Scott和Schererer的论文,在 http://www.cs.rochester.edu/u/scott/synchronization/
<P>我们还使用 "下一个 "链接来实现阻塞机制。 每个节点的线程id都保存在自己的节点中,所以一个 前任的节点通过遍历 下一个链接来确定是哪条线程。 判断是哪个线程 继任者必须避开与新排队节点的比赛,以设置 的 "下一个 "领域的前辈们。 这个问题就解决了 必要时,从原子上倒查 当一个节点的继任者似乎是空的时候,更新 "尾巴"。 (或者换个说法,下一个链接是一种优化的方法。 这样我们通常就不需要逆向扫描了)。
<P>取消引入了一些保守的基本的 算法。 由于我们必须对取消其他 节点,我们可能会错过注意到一个被取消的节点是否为 在我们的前面或后面。这个问题的处理方法是,始终不停车 取消后的继承人,使他们能够稳定在 新的前任,除非我们能确定一个未取消的 前辈将承担起这个责任。
<p>CLH队列需要一个假头节点来启动。但是 我们不在施工中创造它们,因为这样做是浪费了 努力,如果从来没有争论的话。相反,节点 构建,并在第一时间设置头部和尾部指针。 争论。
<p>等待条件的线程使用相同的节点,但 使用附加链接。条件只需要链接节点 在简单(非并发)链接队列中,因为它们是 只有在独占时才会被访问。 在等待时,一个节点被 插入到条件队列中。 一旦收到信号,该节点将 转移到主队列中。 状态的特殊值(即SHARED) 字段用于标记一个节点在哪个队列中。
<p>感谢 Dave Dice、Mark Moir、Victor Luchangco、Bill Schererer和Michael Scott,以及JSR-166成员 专家组,以便提出有益的意见、讨论和批评。 关于本班的设计。
通过www.DeepL.com/Translator(免费版)翻译
首先每个Node表示一个线程,除了第一个线程,后面都是在等待资源的状态,对于它们而言,它们的state肯定都是SIGNAL ,除非被主动的cancel; 那么呢 SIGNAL 就是表示说, 我(某个node上的线程)正在等待之中,CLH队列前面的线程执行完毕之后,记得一定要通知我,不然我就会一直处于等待状态; ———— 这个过程一定是前面 CLH队列前面的线程 来进行通知,Doug Lea的设计就是这样的,当然我们可能也可以使用其它的机制,比如,每个线程自己主动的去轮训一个变量,但是这样显然 会有一个轮训时间的 间隔;必然不是大家能够接受的; 所以还是 被动的等待前个线程来唤醒来得自然,而且没有误差;
———— 又理解错了!! SIGNAL表示的是某个线程t的下一个线程需要t来唤醒! 跟上面的理解是相反的! 可以认为 SIGNAL 状态的node,它本身是什么状态呢,其实是不确定的,它自己可能也在等待状态,也可能正在运行之中,但肯定是没有被取消的!
CONDITION 是表示某个线程t 处于 条件等待队列的状态; 这个比较特殊, 我一直都没有很好的理解; 非常坑爹!
PROPAGATE 状态也是比较难理解; 它表示下一个acquireShared 方法应该无条件的传播(或者说扩散), 为什么没有说下一个线程,而是仅仅只方法,为什么无条件? 为什么需要传播? 什么是传播?
*/ static final class Node { 为什么 mode的类型仍然是 Node,而不是直接使用int数字呢?更新:因为nextWaiter字段需要在条件等待队列中试用,条件等待队列的元素仍然还是Node~! /** Marker to indicate a node is waiting in shared mode */ 表明处于共享模式的等待状态的线程 static final Node SHARED = new Node(); 通过这种方式,确实有效的区分了独占、共享模式,因为 /** Marker to indicate a node is waiting in exclusive mode */ 表明处于独占模式的等待状态的线程 static final Node EXCLUSIVE = null; 为什么 EXCLUSIVE的值竟然是null? 太不可思议了! 不好理解啊!更新:: 其实这个时候 条件等待队列 还未开始建立起来,故EXCLUSIVE为null是合理的;
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ waitStatus 需要仔细的解释, doc文档也进行了详细的解释:(可见它的重要和复杂程度 ) SIGNAL 当前node的继任者(也就是下一个node)应该是一件处于阻塞状态,或者很快将会被阻塞,为什么这里说“很快将”呢,有点难理解; 所以呢,它在它需要释放资源或取消的时候 唤醒它的继任者,不然就永远无法得以唤醒!!(因为也没有了其它的手段) 。 为了避免竞争,acquire 方法应该首先表明它们需要一个信号,然后重新执行原子性的acquire方法,然后如果acquire失败,那么阻塞;———— 这里又有些不好理解了,为什么需要事先表明一下呢? CANCELLED 表示当前node由于超时或者中断被取消了,node一旦处于这个状态,那么它永远也不会改变这个状态,而且会最终被删除,因为已经取消了,线程也无法再次得到执行。 CONDITION 表示node正处于一个条件队列,它将不会被用于同步队列,除非被转移到了同步队列;( 也就是说AQS 其实内部维护着两个队列,一个是同步队列sync queue ,另外一个是条件队列condition queue ),转移的时候,waitStatus 会被设置为0,为什么0,这是为了简化(具体如何简化,未知。。) PROPAGATE 表示 releaseShared 方法应该被传播扩散到其他的所有的node,这个状态由doReleaseShared 方法来进行设置,以确保传播扩散得以继续下去, (为什么需要传播扩散? ) 即使被其他操作干涉了。 有点难以理解 0 表示普通状态,也就是默认的状态吧。 volatile int waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. 指向前任node,可以用来检查它的waitStatus。入队enq的时候进行赋值;仅在出队的时候进行GC nulled out;当前任node线程被取消时候,我们查找并重新设置一个未取消的node作为prev———— 这就是总是会成功的,因为head 也就是正在执行的线程 是永远不会被谁取消的,只有它去取消其他线程,而不是相反。只有成功获取资源之后一个node才会成为head,线程只会取消它自己, 不会被其他线程取消。———— 根本没有线程被其他线程取消的事情———— 好像跟自己之前的理解 有冲突。。 */ volatile Node prev;
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. 入队的时候赋值;绕过已取消节点或GC nulled out的时候进行调整, 入队方法enq并不会设置这个next字段(直到附件?),所以呢,如果看到某个node的next字段为null,也不一定就算说那个node处于CLH的尾部(??)然后,如果我们可以反向通过prev遍历.. 对于被取消的node, 它的next字段指向它自身而不是被设置为null, 这可以为isOnSyncQueue方法提供帮助。哦,原来如此! —— 这就是为什么unparkSuccessor 方法会反向通过prev指针遍历,因为next 靠不住啊... 更新: 这里的说法不准确,具体请查看unparkSuccessor 方法的更新 prev 是准确维护的,但是next 不是 */ volatile Node next; head tail 都是volatile 修饰的Node,但nextWaiter不是的,why?
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; 也是volatile,可能有多线程进行访问?也许吧
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. nextWaiter 也是Node类型变量,表明了下一个处于条件等待队列的node,或者特殊值 SHARED,(?更新: 这个英语的意思是: nextWaiter的值有两个可能: 1 执行下一个条件队列中等待的node, 2 特值SHARED) 因为条件等待队列只会存在于 独占模式下,我们只需一个 简单的 单链表 来保持那些真正条件等待的节点,它们会被转移到同步等待队列,然后重新获取资源,而且因为只能是独占模式,我们仅使用 特殊值 表明共享模式, 这个特殊值为什么是 SHARED 呢? 用来区分是否为共享模式,查看下面的isShared方法可知。 */ Node nextWaiter; AQS的条件等待队列,正是通过这个变量进行维护的!!!~~~。
/** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; 通过nextWaiter是不是SHARED来判断是不是 共享模式~ 因为这里是直接比较地址, 所以 SHARED的具体值是什么,其实无关紧要。只要地址不同就可以了.. }
/** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. null检查本也可以被取消,但是它主要是为了VM? * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node的三个构造方法,需要特别注意: Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; // Node表示了一个线程,nextWaiter也是Node类型,它指明了下一个等待的线程,为什么需要把下一个也设置出来呢? 为什么不是上一个? 一般线程不是被添加到CLH的末端吗? this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; // this.thread = thread; } }
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail;
/** * The synchronization state. */ private volatile int state;
/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; }
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
// Queuing utilities
/** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ static final long spinForTimeoutThreshold = 1000L;
/** * Inserts node into queue, initializing if necessary. See picture above. 插入node,如果有必须则先进行初始化 * @param node the node to insert * @return node's predecessor 返回入队成功后的 当前node的上一个节点 */ private Node enq(final Node node) { for (;;) { for循环保证了enq操作绝对会成功; Node t = tail; // 这里为什么需要引入局部变量t呢,而不直接使用tail 变量呢? 因为, if (t == null) { // Must initialize 第一次enq的时候,肯定t==null,那么需要设置tail = head if (compareAndSetHead(new Node())) // 这里采用了 cas 的方式设置head,也是保险起见,为什么呢,因为enq 方法是没有加锁的,不仅仅是enq方法,AQS整个类都没有加锁的地方,没有 synchronized 关键字,也没有其他锁机制,除了自身就剩下cas了,cas 是硬件实现的; 不能逻辑上保证只有一个一个线程执行到这里,所以这里采用了cas 需要特别注意的是, 这里的head 被设置为 new Node(), Node的这个构造器并没有指定一个线程,也是就没有线程,这么怎么理解呢? 可以认为 head 是当前获取锁成功,正在执行的那个线程,这是一个特例,故也不需要特别的设置这个node的对应的线程; 那么 当前执行enq 方法的线程,肯定是获取锁失败的,所以才会执行到这里的,所以呢, 这个仅仅是一个 初始化 CLH 的操作!! tail = head; 设置head成功之后,设置tail也指向head,因为此时head为null,当前CLH队列只有一个node,而既然CLH已经初始化了,那么tail head都是不能为空的,否则不好解释,说不过去。 } } else { node.prev = t; 把当前线程的prev指针执行tail, if (compareAndSetTail(t, node)) { 然后重新设置tail,也就是把tail指向当前线程, 为什么这里需要cas呢,因为执行到这里,仍然可能是多个线程的,因为确实存在竞争,保险起见,需要通过cas来设置; t.next = node;把tail 的 next设置为当前线程对应的node,即相当于把当前线程添加到了CLH的末尾; return t; 返回tail, } } } }
/** * Creates and enqueues node for current thread and given mode. 创建node并且把它入队: 通过当前线程和制定的mode 来进行创建 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared mode 仅存在两个值,独占或者共享; 为什么 mode的类型仍然是 Node,而不是直接使用int数字呢? * @return the new node */ addWaiter 方法: 刚开始的时候 head tail 肯定是null,所以直接enq, private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); 通过当前线程和制定的mode 来进行创建 // Try the fast path of enq; backup to full enq on failure 对这个效果存疑,感觉没什么必要 Node pred = tail; if (pred != null) { 如果已经初始化过一次,那么就使用fast path方式,其实下面的4行代码和enq方法中的最后4行的效果是完全一模一样的!~ 这么做呢,仅仅是为了fast,其实我感觉也没有多少fast效果,直接调用enq方法不过是多了for包围.. node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
/** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. 设置头结点,然后出队, 仅在acquire方法中被取消, 同时null out无用的字段,为了GC。以及抑制无必要的唤醒信号和遍历。 * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; 头结点的线程变量 无须设置,故干脆设置为null node.prev = null; 头结点的prev指针 必然是null }
/** * Wakes up node's successor, if one exists. 唤醒后继, 如果存在后继 * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) 如果是SIGNAL或者PROPAGATE,那么设置当前节点的waitStatus为0 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. 需要被unpark的线程位于后继node,填充就下一个,但是不排除其他的方法取消了它,那么需要反向遍历一下,找到还没有被取消的后继node。为什么需要反向呢?为什么不是直接通过next指针顺序遍历? 因为 prev指针比next可靠? 更新: 其实不是这样的.. 见下面if里面的分析 the next node 可能已经被取消,或者就是null(什么情况下为null?xxx 好像只有获取到执行完毕出队之后) */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; 头结点的next变量即后继节点s 为null,或者其状态已经表明了它取消,故把它 设置为null, for (Node t = tail; t != null && t != node; t = t.prev) 为什么需要反向呢? 因为s都可能为null啊, 自然没法获取s的next, 那么保守起见,反向获取比较安全 if (t.waitStatus <= 0) s = t; } if (s != null) 找到了,那么就唤醒它吧 LockSupport.unpark(s.thread); node 的后继已经被唤醒,一般来说就表示了node对应的线程已经执行完毕,那么它应该立即出队,但是AQS不是这样做的, 而是呢,而是在后继node 醒来之后重新去获取资源,获取成功之后 才重新设置head,然后将当前node 出队。 具体参见 比如 acquireQueued }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) 共享模式下释放操作,通知后继node,确保广播开来,(如果是独占模式,释放不多不少的资源量,然后如果有必要则执行head的解锁方法)? */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. 确保release操作也能够广播开来,即使其他的acquires/releases操作正在同步进行之中,如果有必要则对head的后继执行解锁方法,否则就设置head的状态为PROPAGATE 以确保在它释放资源的时候,广播操作仍会继续;另外,我们必须通过循环来进行这些操作以防执行当前方法的时候新的线程进入CLH尾部,同时,不同于其他情况的unparkSuccessor操作,我们需要在cas操作失败的时候进行重新检查 */ for (;;) { 自旋, cas 失败则需要进行自旋!! cas 失败说明其他线程也在修改同一个变量,但是它抢先修改成功了~~ Node h = head; if (h != null && h != tail) { 不为null,不是尾节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { 如果状态为SIGNAL,如果有必要则对head的后继执行解锁方法 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 把状态设置为0, 为什么要是0? 用以表明已经唤醒过一次后继了, 所以不再需要状态为SIGNAL了,否则还会进行下一次的unparkSuccessor continue; // loop to recheck cases 如果设置失败,跳出本次循环 unparkSuccessor(h); 设置成功,唤醒后继; 只要唤醒了一个后继,那么至少保证了不会死锁.. unparkSuccessor 方法其实也有一个cas设置waitStatus为0 的操作,所以,上面的if感觉不是很有必要?xxx 也不完全是,unparkSuccessor的cas 并没有强制的保证 } 否则就设置head的状态为PROPAGATE 以确保在它释放资源的时候,广播操作仍会继续 假设第一次自旋执行了上面的if,而且head没有发生变化,那么,第二次的自旋就肯定会执行到这里的else,那么肯定会把状态再次修改为PROPAGATE 更正: 如果head没有发生变化,这里的if else 之后执行一个,只会执行一次~! else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 如果状态已经是0,那么把状态设置为PROPAGATE 什么情况下head的状态是0呢?队列中只有一个node,而且clh 可能还没初始化呢。为什么把head的状态设置为PROPAGATE就完事了呢? 其实setHeadAndPropagate 方法判断依据是<0,自然包括了 PROPAGATE,然后再执行 doReleaseShared,doReleaseShared会进行对后继的唤醒操作 continue; // loop on failed CAS 如果设置失败,跳出本次循环 } if (h == head) // loop if head changed 这个地方 也是难以理解 break; head changed 不会跳出循环,但是如果head没有变,那么不在进行自旋,因为该完成的操作绝壁都已经成功执行了。 但是如果head changed,那说明 当前方法执行期间,其他方法对head进行了 干扰, 那么自旋,确保无论如何都要进行广播,因为head已经在本轮循环很短的时间内就发生变化,那么重来