本文共 26468 字,大约阅读时间需要 88 分钟。
点关注,不迷路!如果本文对你有帮助的话不要忘记点赞支持哦!
Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.
主要特点:先看一下一个常见的 demo
import org.apache.log4j.Logger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 此demo用于测试 condition * Created by xujiankang on 2017/2/8. */public class ConditionTest { private static final Logger logger = Logger.getLogger(ConditionTest.class); static final Lock lock = new ReentrantLock(); static final Condition condition = lock.newCondition(); public static void main(String[] args) throws Exception{ final Thread thread1 = new Thread("Thread 1 "){ @Override public void run() { lock.lock(); // 线程 1获取 lock logger.info(Thread.currentThread().getName() + " 正在运行 ....."); try { Thread.sleep(2 * 1000); logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal "); condition.await(); // 调用 condition.await 进行释放锁, 将当前节点封装成一个 Node 放入 Condition Queue 里面, 等待唤醒 } catch (InterruptedException e) { e.printStackTrace(); } logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 "); lock.unlock(); // 释放锁 } }; thread1.start(); // 线程 1 线运行 Thread.sleep(1 * 1000); Thread thread2 = new Thread("Thread 2 "){ @Override public void run() { lock.lock(); // 线程 2获取lock logger.info(Thread.currentThread().getName() + " 正在运行....."); thread1.interrupt(); // 对线程1 进行中断 看看中断后会怎么样? 结果 线程 1还是获取lock, 并且最后还进行 lock.unlock()操作 try { Thread.sleep(2 * 1000); }catch (Exception e){ } condition.signal(); // 发送唤醒信号 从 AQS 的 Condition Queue 里面转移 Node 到 Sync Queue logger.info(Thread.currentThread().getName() + " 发送一个 signal "); logger.info(Thread.currentThread().getName() + " 发送 signal 结束"); lock.unlock(); // 线程 2 释放锁 } }; thread2.start(); }}
整个执行步骤
执行结果
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在运行 .....[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止运行, 等待一个 signal [2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在运行.....java.lang.InterruptedException[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 发送一个 signal at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 发送 signal 结束 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 获取一个 signal, 继续执行 at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
主要是Condition Queue 的头尾节点(这里头尾节点不需要进行初始化)
/** First node of condition queue *//** Condition Queue 里面的头节点 */private transient Node firstWaiter;/** Last node of condition queue *//** Condition Queue 里面的尾节点 */private transient Node lastWaiter;/** Creates a new {@code ConditionObject} instance *//** 构造函数 */public ConditionObject(){}
addConditionWaiter方法主要用于调用 Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面
大家可以注意下, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
/** * Adds a new waiter to wait queue * 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面 * 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况 * @return */private Node addConditionWaiter(){ Node t = lastWaiter; // 1. Condition queue 的尾节点 // If lastWaiter is cancelled, clean out // 2.尾节点已经Cancel, 直接进行清除, // 这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时 // 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会) if(t != null && t.waitStatus != Node.CONDITION){ unlinkCancelledWaiters(); // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt)) t = lastWaiter; // 4. 获取最新的 lastWaiter } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 将线程封装成 node 准备放入 Condition Queue 里面 if(t == null){ firstWaiter = node; // 6 .Condition Queue 是空的 }else{ t.nextWaiter = node; // 7. 最加到 queue 尾部 } lastWaiter = node; // 8. 重新赋值 lastWaiter return node;}
这里的唤醒指的是将节点从 Condition Queue 转移到 Sync Queue 里面
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters * @param first *//** * 唤醒 Condition Queue 里面的头节点, 注意这里的唤醒只是将 Node 从 Condition Queue 转到 Sync Queue 里面(这时的 Node 也还是能被 Interrupt) */private void doSignal(Node first){ do{ if((firstWaiter = first.nextWaiter) == null){ // 1. 将 first.nextWaiter 赋值给 nextWaiter 为下次做准备 lastWaiter = null; // 2. 这时若 nextWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter } first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt }while(!transferForSignal(first) && (first = firstWaiter) != null); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面, 返回不成功的话, 将 firstWaiter 赋值给 first}
/** * Removes and transfers all nodes * @param first (non-null) the first node on condition queue *//** * 唤醒 Condition Queue 里面的所有的节点 */private void doSignalAll(Node first){ lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空 do{ Node next = first.nextWaiter; // 2. 初始化下个换新的节点 first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt transferForSignal(first); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面 first = next; // 5. 开始换新 next 节点 }while(first != null);}
一般的节点都会被 signal 唤醒, 从 Condition Queue 转移到 Sync Queue, 而若遇到 interrupt 或 等待超时, 则直接改变 node 的状态(从 CONDITION 变成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的节点, 所以需要下面的函数
/** * http://czj4451.iteye.com/blog/1483264 * * Unlinks cancelled waiter nodes from condition queue * Called only while holding lock. This is called when * cancellation occured during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes intot play when * timeouts or cancellations all nodes rather than stoppping at a * particular target to unlink all pointers to garbege nodes * without requiring many re-traversals during cancellation * storms *//** * 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点 * 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点 */private void unlinkCancelledWaiters(){ Node t = firstWaiter; Node trail = null; while(t != null){ Node next = t.nextWaiter; // 1. 先初始化 next 节点 if(t.waitStatus != Node.CONDITION){ // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的) t.nextWaiter = null; // 3. Node.nextWaiter 置空 if(trail == null){ // 4. 一次都没有遇到有效的节点 firstWaiter = next; // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理) }else{ trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t } if(next == null){ // 7. next == null 说明 已经 traverse 完了 Condition Queue lastWaiter = trail; } }else{ trail = t; // 8. 将有效节点赋值给 trail } t = next; }}
毫无疑问, 这是一段非常精巧的queue节点删除, 主要还是在 节点 trail 上, trail 节点可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock * * @throws IllegalMonitorStateException if{@link #isHeldExclusively()} * returns {@code false} *//** * 将 Condition queue 的头节点转移到 Sync Queue 里面 * 在进行调用 signal 时, 当前的线程必须获取了 独占的锁 */@Overridepublic void signal() { if(!isHeldExclusively()){ // 1. 判断当前的线程是否已经获取 独占锁 throw new IllegalMonitorStateException(); } Node first = firstWaiter; if(first != null){ doSignal(first); // 2. 调用 doSignal 进行转移 }}
上述面试题答案都整理成文档笔记。 也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以 。
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock * * @throws IllegalMonitorStateException if {@link #isHeldExclusively()} * return {@code false} *//** * 将 Condition Queue 里面的节点都转移到 Sync Queue 里面 */public final void signalAll(){ if(!isHeldExclusively()){ throw new IllegalMonitorStateException(); } Node first = firstWaiter; if(first != null){ doSignalAll(first); }}
awaitUninterruptibly 方法是一个不响应 中断的方法
整个流程
/** * Implements uninterruptible condition wait *
下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
主要的区别是 1. REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt) 2. THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)
/** * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire *//** * 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过 * 主要的区别是 * REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt) * THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception) *//** Mode meaning to reinterrupt on exit from wait *//** 在离开 awaitXX方法, 退出前再次 自我中断 (调用 selfInterrupt)*/private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait *//** 在离开 awaitXX方法, 退出前再次, 以为在 接受 signal 前被中断, 所以需要抛出异常 */private static final int THROW_IE = -1;
该方法主要是查 在 awaitXX 方法中的这次唤醒是否是中断引起的, 若是中断引起的, 就进行 Node 的转移
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted *//** * 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的 * 若是中断引起的, 则将 Node 从 Condition Queue 转移到 Sync Queue 里面 * 返回值的区别: * 0: 此次唤醒是通过 signal -> LockSupport.unpark * THROW_IE: 此次的唤醒是通过 interrupt, 并且 在 接受 signal 之前 * REINTERRUPT: 线程的唤醒是 接受过 signal 而后又被中断 */private int checkInterruptWhileWaiting(Node node){ return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode *//** * 这个方法是在 awaitXX 方法离开前调用的, 主要是根据 * interrupMode 判断是抛出异常, 还是自我再中断一下 */private void reportInterruptAfterWait(int interrupMode) throws InterruptedException{ if(interrupMode == THROW_IE){ throw new InterruptedException(); } else if(interrupMode == REINTERRUPT){ selfInterrupt(); }}
await 此方法响应中断请求, 当接受到中断请求后会将节点从 Condition Queue 转移到 Sync Queue
/** * Implements interruptible condition wait * *
awaitNanos 具有超时功能, 与响应中断的功能, 不管中断还是超时都会 将节点从 Condition Queue 转移到 Sync Queue
/** * Impelemnts timed condition wait * *
/** * Implements absolute timed condition wait *
/** * Returns true if this condition was created by the given * synchronization object *//**判断当前 condition 是否获取 独占锁 */final boolean isOwnedBy(KAbstractOwnableSynchronizer sync){ return sync == KAbstractQueuedSynchronizer.this;}/** * Quires whether any threads are waiting on this condition * Implements {@link KAbstractOwnableSynchronizer#"hasWaiters(ConditionObject)} * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively()} * returns {@code false} *//** * 查看 Condition Queue 里面是否有 CONDITION 的节点 */protected final boolean hasWaiters(){ if(!isHeldExclusively()){ throw new IllegalMonitorStateException(); } for(Node w = firstWaiter; w != null; w = w.nextWaiter ){ if(w.waitStatus == Node.CONDITION){ return true; } } return false;}/** * Returns an estimate of the number of threads waiting on * this condition * Implements {@link KAbstractOwnableSynchronizer#"getWaitQueueLength()} * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively()} * return {@code false} *//** * 获取 Condition queue 里面的 CONDITION 的节点的个数 */protected final int getWaitQueueLength(){ if(!isHeldExclusively()){ throw new IllegalMonitorStateException(); } int n = 0; for(Node w = firstWaiter; w != null; w = w.nextWaiter){ if(w.waitStatus == Node.CONDITION){ ++n; } } return n;}/** * Returns a collection containing those threads that may be * waiting on this Condition * Implements {@link KAbstractOwnableSynchronizer#'getWaitingThreads} * * @return the collection of thread * @throws IllegalMonitorStateException if {@link #isHeldExclusively()} * returns {@code false} *//** * 获取 等待的线程 */protected final CollectiongetWaitingThreads(){ if(!isHeldExclusively()){ throw new IllegalMonitorStateException(); } ArrayList list = new ArrayList<>(); for(Node w = firstWaiter; w != null; w = w.nextWaiter){ if(w.waitStatus == Node.CONDITION){ Thread t = w.thread; if(t != null){ list.add(t); } } } return list;}
到此这篇关于文章就结束了!
点关注,不迷路!如果本文对你有帮助的话不要忘记点赞支持哦!
上述面试题答案都整理成文档笔记。 也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以 。
希望对大家有所帮助,有用的话点赞给我支持!
转载地址:http://dxfvi.baihongyu.com/