本文共 7799 字,大约阅读时间需要 25 分钟。
Condition可以阻塞或唤醒线程,配合lock使用达到类似于wait()和notify()的效果。本文主要根据jdk源码讲解condition的实现原理。
Condition是一个接口,首先看看接口中定义的方法列表public interface Condition { void await() throws InterruptedException;//类似于wait(),可以响应中断 void awaitUninterruptibly();//不响应中断的等待 long awaitNanos(long nanosTimeout) throws InterruptedException;//等待指定时间(单位是纳秒),在接到信号、被中断或到达指定等待时间之前一直处于等待状态。方法返回被唤醒后的剩余等待时间,若返回值小于等于0则代表此次等待超时。boolean await(long time, TimeUnit unit) throws InterruptedException;//指定时间到达前结束等待返回true,否则返回falseboolean awaitUntil(Date deadline) throws InterruptedException;//指定日期到达前被唤醒返回true,否则返回falsevoid signal();//唤醒一个等待中的线程,类似于notify()void signalAll();//唤醒所有等待中的线程,类似于notifyAll()}
以AbstractQueuedSynchronizer,看看Condition中几个主要方法的实现原理。
首先看一下await()public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
一步一步分析,先看addConditionWaiter()
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
代码目的很明显了,将当前线程加入到等待节点中。在看看下一步fullyRelease
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
可以看到这部分代码是释放当前线程所持有的锁,在release()方法中同时会唤醒等待锁的线程,这个在上一篇将lock的文章分析过,这里就不阐述了。
继续往下,将执行这段循环while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
看一看其中的isOnSyncQueue方法
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
这段就是判断是否节点已经加入了AbstractQueuedSynchronizer的队列中。直到节点从队列中移除后才能成功挂起。
接下来是从挂起状态被唤醒后重新获取锁和一系列状态更新的操作。if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
由此便完成了await过程。下面我们来看看signal
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
通过transferForSignal方法将头节点唤醒,也使用了cas操作保证一致性。
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
唤醒后将使下一个节点成为头节点。至此signal过程结束。
下面看看signalAllpublic final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
代码逻辑很清晰,从头节点开始唤醒所有节点。
下面我们在来看看awaitNanos方法,该方法可以让线程在指定的时间后被唤醒,或是被signal唤醒。public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
重点看一下这段
while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); }
当nanosTimeout小于0即等待时间已到或已超过时,执行transferAfterCancelledWait方法:
final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
尝试更新节点状态,若更新失败的话直到节点从队列移除后返回。
在往下,spinForTimeoutThreshold值为1000,在这里的话就是1毫秒,因此在这里我们可以看到:当等待时间超过1毫秒时将线程挂起,但是挂起的这段时间可能被其他线程唤醒,因此在不足1毫秒时用忙等的方式等待等待时间的结束。最后方法返回实际的等待时间,转载地址:http://irsws.baihongyu.com/