| /* |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| /* |
| * This file is available under and governed by the GNU General Public |
| * License version 2 only, as published by the Free Software Foundation. |
| * However, the following notice accompanied the original version of this |
| * file: |
| * |
| * Written by Doug Lea, Bill Scherer, and Michael Scott with |
| * assistance from members of JCP JSR-166 Expert Group and released to |
| * the public domain, as explained at |
| * http://creativecommons.org/publicdomain/zero/1.0/ |
| */ |
| |
| package java.util.concurrent; |
| |
| import java.lang.invoke.MethodHandles; |
| import java.lang.invoke.VarHandle; |
| import java.util.AbstractQueue; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.Objects; |
| import java.util.Spliterator; |
| import java.util.Spliterators; |
| import java.util.concurrent.locks.LockSupport; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * A {@linkplain BlockingQueue blocking queue} in which each insert |
| * operation must wait for a corresponding remove operation by another |
| * thread, and vice versa. A synchronous queue does not have any |
| * internal capacity, not even a capacity of one. You cannot |
| * {@code peek} at a synchronous queue because an element is only |
| * present when you try to remove it; you cannot insert an element |
| * (using any method) unless another thread is trying to remove it; |
| * you cannot iterate as there is nothing to iterate. The |
| * <em>head</em> of the queue is the element that the first queued |
| * inserting thread is trying to add to the queue; if there is no such |
| * queued thread then no element is available for removal and |
| * {@code poll()} will return {@code null}. For purposes of other |
| * {@code Collection} methods (for example {@code contains}), a |
| * {@code SynchronousQueue} acts as an empty collection. This queue |
| * does not permit {@code null} elements. |
| * |
| * <p>Synchronous queues are similar to rendezvous channels used in |
| * CSP and Ada. They are well suited for handoff designs, in which an |
| * object running in one thread must sync up with an object running |
| * in another thread in order to hand it some information, event, or |
| * task. |
| * |
| * <p>This class supports an optional fairness policy for ordering |
| * waiting producer and consumer threads. By default, this ordering |
| * is not guaranteed. However, a queue constructed with fairness set |
| * to {@code true} grants threads access in FIFO order. |
| * |
| * <p>This class and its iterator implement all of the <em>optional</em> |
| * methods of the {@link Collection} and {@link Iterator} interfaces. |
| * |
| * <p>This class is a member of the |
| * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> |
| * Java Collections Framework</a>. |
| * |
| * @since 1.5 |
| * @author Doug Lea and Bill Scherer and Michael Scott |
| * @param <E> the type of elements held in this queue |
| */ |
| public class SynchronousQueue<E> extends AbstractQueue<E> |
| implements BlockingQueue<E>, java.io.Serializable { |
| private static final long serialVersionUID = -3223113410248163686L; |
| |
| /* |
| * This class implements extensions of the dual stack and dual |
| * queue algorithms described in "Nonblocking Concurrent Objects |
| * with Condition Synchronization", by W. N. Scherer III and |
| * M. L. Scott. 18th Annual Conf. on Distributed Computing, |
| * Oct. 2004 (see also |
| * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). |
| * The (Lifo) stack is used for non-fair mode, and the (Fifo) |
| * queue for fair mode. The performance of the two is generally |
| * similar. Fifo usually supports higher throughput under |
| * contention but Lifo maintains higher thread locality in common |
| * applications. |
| * |
| * A dual queue (and similarly stack) is one that at any given |
| * time either holds "data" -- items provided by put operations, |
| * or "requests" -- slots representing take operations, or is |
| * empty. A call to "fulfill" (i.e., a call requesting an item |
| * from a queue holding data or vice versa) dequeues a |
| * complementary node. The most interesting feature of these |
| * queues is that any operation can figure out which mode the |
| * queue is in, and act accordingly without needing locks. |
| * |
| * Both the queue and stack extend abstract class Transferer |
| * defining the single method transfer that does a put or a |
| * take. These are unified into a single method because in dual |
| * data structures, the put and take operations are symmetrical, |
| * so nearly all code can be combined. The resulting transfer |
| * methods are on the long side, but are easier to follow than |
| * they would be if broken up into nearly-duplicated parts. |
| * |
| * The queue and stack data structures share many conceptual |
| * similarities but very few concrete details. For simplicity, |
| * they are kept distinct so that they can later evolve |
| * separately. |
| * |
| * The algorithms here differ from the versions in the above paper |
| * in extending them for use in synchronous queues, as well as |
| * dealing with cancellation. The main differences include: |
| * |
| * 1. The original algorithms used bit-marked pointers, but |
| * the ones here use mode bits in nodes, leading to a number |
| * of further adaptations. |
| * 2. SynchronousQueues must block threads waiting to become |
| * fulfilled. |
| * 3. Support for cancellation via timeout and interrupts, |
| * including cleaning out cancelled nodes/threads |
| * from lists to avoid garbage retention and memory depletion. |
| * |
| * Blocking is mainly accomplished using LockSupport park/unpark, |
| * except that nodes that appear to be the next ones to become |
| * fulfilled first spin a bit (on multiprocessors only). On very |
| * busy synchronous queues, spinning can dramatically improve |
| * throughput. And on less busy ones, the amount of spinning is |
| * small enough not to be noticeable. |
| * |
| * Cleaning is done in different ways in queues vs stacks. For |
| * queues, we can almost always remove a node immediately in O(1) |
| * time (modulo retries for consistency checks) when it is |
| * cancelled. But if it may be pinned as the current tail, it must |
| * wait until some subsequent cancellation. For stacks, we need a |
| * potentially O(n) traversal to be sure that we can remove the |
| * node, but this can run concurrently with other threads |
| * accessing the stack. |
| * |
| * While garbage collection takes care of most node reclamation |
| * issues that otherwise complicate nonblocking algorithms, care |
| * is taken to "forget" references to data, other nodes, and |
| * threads that might be held on to long-term by blocked |
| * threads. In cases where setting to null would otherwise |
| * conflict with main algorithms, this is done by changing a |
| * node's link to now point to the node itself. This doesn't arise |
| * much for Stack nodes (because blocked threads do not hang on to |
| * old head pointers), but references in Queue nodes must be |
| * aggressively forgotten to avoid reachability of everything any |
| * node has ever referred to since arrival. |
| * |
| * The above steps improve throughput when many threads produce |
| * and/or consume data. But they don't help much with |
| * single-source / single-sink usages in which one side or the |
| * other is always transiently blocked, and so throughput is |
| * mainly a function of thread scheduling. This is not usually |
| * noticeably improved with bounded short spin-waits. Instead both |
| * forms of transfer try Thread.yield if apparently the sole |
| * waiter. This works well when there are more tasks that cores, |
| * which is expected to be the main usage context of this mode. In |
| * other cases, waiters may help with some bookkeeping, then |
| * park/unpark. |
| */ |
| |
| /** |
| * Shared internal API for dual stacks and queues. |
| */ |
| abstract static class Transferer<E> { |
| /** |
| * Performs a put or take. |
| * |
| * @param e if non-null, the item to be handed to a consumer; |
| * if null, requests that transfer return an item |
| * offered by producer. |
| * @param timed if this operation should timeout |
| * @param nanos the timeout, in nanoseconds |
| * @return if non-null, the item provided or received; if null, |
| * the operation failed due to timeout or interrupt -- |
| * the caller can distinguish which of these occurred |
| * by checking Thread.interrupted. |
| */ |
| abstract E transfer(E e, boolean timed, long nanos); |
| } |
| |
| /** |
| * The number of nanoseconds for which it is faster to spin |
| * rather than to use timed park. A rough estimate suffices. |
| */ |
| static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; |
| |
| /** Dual stack */ |
| static final class TransferStack<E> extends Transferer<E> { |
| /* |
| * This extends Scherer-Scott dual stack algorithm, differing, |
| * among other ways, by using "covering" nodes rather than |
| * bit-marked pointers: Fulfilling operations push on marker |
| * nodes (with FULFILLING bit set in mode) to reserve a spot |
| * to match a waiting node. |
| */ |
| |
| /* Modes for SNodes, ORed together in node fields */ |
| /** Node represents an unfulfilled consumer */ |
| static final int REQUEST = 0; |
| /** Node represents an unfulfilled producer */ |
| static final int DATA = 1; |
| /** Node is fulfilling another unfulfilled DATA or REQUEST */ |
| static final int FULFILLING = 2; |
| |
| /** Returns true if m has fulfilling bit set. */ |
| static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } |
| |
| /** Node class for TransferStacks. */ |
| static final class SNode implements ForkJoinPool.ManagedBlocker { |
| volatile SNode next; // next node in stack |
| volatile SNode match; // the node matched to this |
| volatile Thread waiter; // to control park/unpark |
| Object item; // data; or null for REQUESTs |
| int mode; |
| // Note: item and mode fields don't need to be volatile |
| // since they are always written before, and read after, |
| // other volatile/atomic operations. |
| |
| SNode(Object item) { |
| this.item = item; |
| } |
| |
| boolean casNext(SNode cmp, SNode val) { |
| return cmp == next && |
| SNEXT.compareAndSet(this, cmp, val); |
| } |
| |
| /** |
| * Tries to match node s to this node, if so, waking up thread. |
| * Fulfillers call tryMatch to identify their waiters. |
| * Waiters block until they have been matched. |
| * |
| * @param s the node to match |
| * @return true if successfully matched to s |
| */ |
| boolean tryMatch(SNode s) { |
| SNode m; Thread w; |
| if ((m = match) == null) { |
| if (SMATCH.compareAndSet(this, null, s)) { |
| if ((w = waiter) != null) |
| LockSupport.unpark(w); |
| return true; |
| } |
| else |
| m = match; |
| } |
| return m == s; |
| } |
| |
| /** |
| * Tries to cancel a wait by matching node to itself. |
| */ |
| boolean tryCancel() { |
| return SMATCH.compareAndSet(this, null, this); |
| } |
| |
| boolean isCancelled() { |
| return match == this; |
| } |
| |
| public final boolean isReleasable() { |
| return match != null || Thread.currentThread().isInterrupted(); |
| } |
| |
| public final boolean block() { |
| while (!isReleasable()) LockSupport.park(); |
| return true; |
| } |
| |
| void forgetWaiter() { |
| SWAITER.setOpaque(this, null); |
| } |
| |
| // VarHandle mechanics |
| private static final VarHandle SMATCH; |
| private static final VarHandle SNEXT; |
| private static final VarHandle SWAITER; |
| static { |
| try { |
| MethodHandles.Lookup l = MethodHandles.lookup(); |
| SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); |
| SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); |
| SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class); |
| } catch (ReflectiveOperationException e) { |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| } |
| |
| /** The head (top) of the stack */ |
| volatile SNode head; |
| |
| boolean casHead(SNode h, SNode nh) { |
| return h == head && |
| SHEAD.compareAndSet(this, h, nh); |
| } |
| |
| /** |
| * Creates or resets fields of a node. Called only from transfer |
| * where the node to push on stack is lazily created and |
| * reused when possible to help reduce intervals between reads |
| * and CASes of head and to avoid surges of garbage when CASes |
| * to push nodes fail due to contention. |
| */ |
| static SNode snode(SNode s, Object e, SNode next, int mode) { |
| if (s == null) s = new SNode(e); |
| s.mode = mode; |
| s.next = next; |
| return s; |
| } |
| |
| /** |
| * Puts or takes an item. |
| */ |
| @SuppressWarnings("unchecked") |
| E transfer(E e, boolean timed, long nanos) { |
| /* |
| * Basic algorithm is to loop trying one of three actions: |
| * |
| * 1. If apparently empty or already containing nodes of same |
| * mode, try to push node on stack and wait for a match, |
| * returning it, or null if cancelled. |
| * |
| * 2. If apparently containing node of complementary mode, |
| * try to push a fulfilling node on to stack, match |
| * with corresponding waiting node, pop both from |
| * stack, and return matched item. The matching or |
| * unlinking might not actually be necessary because of |
| * other threads performing action 3: |
| * |
| * 3. If top of stack already holds another fulfilling node, |
| * help it out by doing its match and/or pop |
| * operations, and then continue. The code for helping |
| * is essentially the same as for fulfilling, except |
| * that it doesn't return the item. |
| */ |
| |
| SNode s = null; // constructed/reused as needed |
| int mode = (e == null) ? REQUEST : DATA; |
| |
| for (;;) { |
| SNode h = head; |
| if (h == null || h.mode == mode) { // empty or same-mode |
| if (timed && nanos <= 0L) { // can't wait |
| if (h != null && h.isCancelled()) |
| casHead(h, h.next); // pop cancelled node |
| else |
| return null; |
| } else if (casHead(h, s = snode(s, e, h, mode))) { |
| long deadline = timed ? System.nanoTime() + nanos : 0L; |
| Thread w = Thread.currentThread(); |
| int stat = -1; // -1: may yield, +1: park, else 0 |
| SNode m; // await fulfill or cancel |
| while ((m = s.match) == null) { |
| if ((timed && |
| (nanos = deadline - System.nanoTime()) <= 0) || |
| w.isInterrupted()) { |
| if (s.tryCancel()) { |
| clean(s); // wait cancelled |
| return null; |
| } |
| } else if ((m = s.match) != null) { |
| break; // recheck |
| } else if (stat <= 0) { |
| if (stat < 0 && h == null && head == s) { |
| stat = 0; // yield once if was empty |
| Thread.yield(); |
| } else { |
| stat = 1; |
| s.waiter = w; // enable signal |
| } |
| } else if (!timed) { |
| LockSupport.setCurrentBlocker(this); |
| try { |
| ForkJoinPool.managedBlock(s); |
| } catch (InterruptedException cannotHappen) { } |
| LockSupport.setCurrentBlocker(null); |
| } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
| LockSupport.parkNanos(this, nanos); |
| } |
| if (stat == 1) |
| s.forgetWaiter(); |
| Object result = (mode == REQUEST) ? m.item : s.item; |
| if (h != null && h.next == s) |
| casHead(h, s.next); // help fulfiller |
| return (E) result; |
| } |
| } else if (!isFulfilling(h.mode)) { // try to fulfill |
| if (h.isCancelled()) // already cancelled |
| casHead(h, h.next); // pop and retry |
| else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { |
| for (;;) { // loop until matched or waiters disappear |
| SNode m = s.next; // m is s's match |
| if (m == null) { // all waiters are gone |
| casHead(s, null); // pop fulfill node |
| s = null; // use new node next time |
| break; // restart main loop |
| } |
| SNode mn = m.next; |
| if (m.tryMatch(s)) { |
| casHead(s, mn); // pop both s and m |
| return (E) ((mode == REQUEST) ? m.item : s.item); |
| } else // lost match |
| s.casNext(m, mn); // help unlink |
| } |
| } |
| } else { // help a fulfiller |
| SNode m = h.next; // m is h's match |
| if (m == null) // waiter is gone |
| casHead(h, null); // pop fulfilling node |
| else { |
| SNode mn = m.next; |
| if (m.tryMatch(h)) // help match |
| casHead(h, mn); // pop both h and m |
| else // lost match |
| h.casNext(m, mn); // help unlink |
| } |
| } |
| } |
| } |
| |
| /** |
| * Unlinks s from the stack. |
| */ |
| void clean(SNode s) { |
| s.item = null; // forget item |
| s.forgetWaiter(); |
| |
| /* |
| * At worst we may need to traverse entire stack to unlink |
| * s. If there are multiple concurrent calls to clean, we |
| * might not see s if another thread has already removed |
| * it. But we can stop when we see any node known to |
| * follow s. We use s.next unless it too is cancelled, in |
| * which case we try the node one past. We don't check any |
| * further because we don't want to doubly traverse just to |
| * find sentinel. |
| */ |
| |
| SNode past = s.next; |
| if (past != null && past.isCancelled()) |
| past = past.next; |
| |
| // Absorb cancelled nodes at head |
| SNode p; |
| while ((p = head) != null && p != past && p.isCancelled()) |
| casHead(p, p.next); |
| |
| // Unsplice embedded nodes |
| while (p != null && p != past) { |
| SNode n = p.next; |
| if (n != null && n.isCancelled()) |
| p.casNext(n, n.next); |
| else |
| p = n; |
| } |
| } |
| |
| // VarHandle mechanics |
| private static final VarHandle SHEAD; |
| static { |
| try { |
| MethodHandles.Lookup l = MethodHandles.lookup(); |
| SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class); |
| } catch (ReflectiveOperationException e) { |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| } |
| |
| /** Dual Queue */ |
| static final class TransferQueue<E> extends Transferer<E> { |
| /* |
| * This extends Scherer-Scott dual queue algorithm, differing, |
| * among other ways, by using modes within nodes rather than |
| * marked pointers. The algorithm is a little simpler than |
| * that for stacks because fulfillers do not need explicit |
| * nodes, and matching is done by CAS'ing QNode.item field |
| * from non-null to null (for put) or vice versa (for take). |
| */ |
| |
| /** Node class for TransferQueue. */ |
| static final class QNode implements ForkJoinPool.ManagedBlocker { |
| volatile QNode next; // next node in queue |
| volatile Object item; // CAS'ed to or from null |
| volatile Thread waiter; // to control park/unpark |
| final boolean isData; |
| |
| QNode(Object item, boolean isData) { |
| this.item = item; |
| this.isData = isData; |
| } |
| |
| boolean casNext(QNode cmp, QNode val) { |
| return next == cmp && |
| QNEXT.compareAndSet(this, cmp, val); |
| } |
| |
| boolean casItem(Object cmp, Object val) { |
| return item == cmp && |
| QITEM.compareAndSet(this, cmp, val); |
| } |
| |
| /** |
| * Tries to cancel by CAS'ing ref to this as item. |
| */ |
| boolean tryCancel(Object cmp) { |
| return QITEM.compareAndSet(this, cmp, this); |
| } |
| |
| boolean isCancelled() { |
| return item == this; |
| } |
| |
| /** |
| * Returns true if this node is known to be off the queue |
| * because its next pointer has been forgotten due to |
| * an advanceHead operation. |
| */ |
| boolean isOffList() { |
| return next == this; |
| } |
| |
| void forgetWaiter() { |
| QWAITER.setOpaque(this, null); |
| } |
| |
| boolean isFulfilled() { |
| Object x; |
| return isData == ((x = item) == null) || x == this; |
| } |
| |
| public final boolean isReleasable() { |
| Object x; |
| return isData == ((x = item) == null) || x == this || |
| Thread.currentThread().isInterrupted(); |
| } |
| |
| public final boolean block() { |
| while (!isReleasable()) LockSupport.park(); |
| return true; |
| } |
| |
| // VarHandle mechanics |
| private static final VarHandle QITEM; |
| private static final VarHandle QNEXT; |
| private static final VarHandle QWAITER; |
| static { |
| try { |
| MethodHandles.Lookup l = MethodHandles.lookup(); |
| QITEM = l.findVarHandle(QNode.class, "item", Object.class); |
| QNEXT = l.findVarHandle(QNode.class, "next", QNode.class); |
| QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class); |
| } catch (ReflectiveOperationException e) { |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| } |
| |
| /** Head of queue */ |
| transient volatile QNode head; |
| /** Tail of queue */ |
| transient volatile QNode tail; |
| /** |
| * Reference to a cancelled node that might not yet have been |
| * unlinked from queue because it was the last inserted node |
| * when it was cancelled. |
| */ |
| transient volatile QNode cleanMe; |
| |
| TransferQueue() { |
| QNode h = new QNode(null, false); // initialize to dummy node. |
| head = h; |
| tail = h; |
| } |
| |
| /** |
| * Tries to cas nh as new head; if successful, unlink |
| * old head's next node to avoid garbage retention. |
| */ |
| void advanceHead(QNode h, QNode nh) { |
| if (h == head && |
| QHEAD.compareAndSet(this, h, nh)) |
| h.next = h; // forget old next |
| } |
| |
| /** |
| * Tries to cas nt as new tail. |
| */ |
| void advanceTail(QNode t, QNode nt) { |
| if (tail == t) |
| QTAIL.compareAndSet(this, t, nt); |
| } |
| |
| /** |
| * Tries to CAS cleanMe slot. |
| */ |
| boolean casCleanMe(QNode cmp, QNode val) { |
| return cleanMe == cmp && |
| QCLEANME.compareAndSet(this, cmp, val); |
| } |
| |
| /** |
| * Puts or takes an item. |
| */ |
| @SuppressWarnings("unchecked") |
| E transfer(E e, boolean timed, long nanos) { |
| /* Basic algorithm is to loop trying to take either of |
| * two actions: |
| * |
| * 1. If queue apparently empty or holding same-mode nodes, |
| * try to add node to queue of waiters, wait to be |
| * fulfilled (or cancelled) and return matching item. |
| * |
| * 2. If queue apparently contains waiting items, and this |
| * call is of complementary mode, try to fulfill by CAS'ing |
| * item field of waiting node and dequeuing it, and then |
| * returning matching item. |
| * |
| * In each case, along the way, check for and try to help |
| * advance head and tail on behalf of other stalled/slow |
| * threads. |
| * |
| * The loop starts off with a null check guarding against |
| * seeing uninitialized head or tail values. This never |
| * happens in current SynchronousQueue, but could if |
| * callers held non-volatile/final ref to the |
| * transferer. The check is here anyway because it places |
| * null checks at top of loop, which is usually faster |
| * than having them implicitly interspersed. |
| */ |
| |
| QNode s = null; // constructed/reused as needed |
| boolean isData = (e != null); |
| for (;;) { |
| QNode t = tail, h = head, m, tn; // m is node to fulfill |
| if (t == null || h == null) |
| ; // inconsistent |
| else if (h == t || t.isData == isData) { // empty or same-mode |
| if (t != tail) // inconsistent |
| ; |
| else if ((tn = t.next) != null) // lagging tail |
| advanceTail(t, tn); |
| else if (timed && nanos <= 0L) // can't wait |
| return null; |
| else if (t.casNext(null, (s != null) ? s : |
| (s = new QNode(e, isData)))) { |
| advanceTail(t, s); |
| long deadline = timed ? System.nanoTime() + nanos : 0L; |
| Thread w = Thread.currentThread(); |
| int stat = -1; // same idea as TransferStack |
| Object item; |
| while ((item = s.item) == e) { |
| if ((timed && |
| (nanos = deadline - System.nanoTime()) <= 0) || |
| w.isInterrupted()) { |
| if (s.tryCancel(e)) { |
| clean(t, s); |
| return null; |
| } |
| } else if ((item = s.item) != e) { |
| break; // recheck |
| } else if (stat <= 0) { |
| if (t.next == s) { |
| if (stat < 0 && t.isFulfilled()) { |
| stat = 0; // yield once if first |
| Thread.yield(); |
| } |
| else { |
| stat = 1; |
| s.waiter = w; |
| } |
| } |
| } else if (!timed) { |
| LockSupport.setCurrentBlocker(this); |
| try { |
| ForkJoinPool.managedBlock(s); |
| } catch (InterruptedException cannotHappen) { } |
| LockSupport.setCurrentBlocker(null); |
| } |
| else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
| LockSupport.parkNanos(this, nanos); |
| } |
| if (stat == 1) |
| s.forgetWaiter(); |
| if (!s.isOffList()) { // not already unlinked |
| advanceHead(t, s); // unlink if head |
| if (item != null) // and forget fields |
| s.item = s; |
| } |
| return (item != null) ? (E)item : e; |
| } |
| |
| } else if ((m = h.next) != null && t == tail && h == head) { |
| Thread waiter; |
| Object x = m.item; |
| boolean fulfilled = ((isData == (x == null)) && |
| x != m && m.casItem(x, e)); |
| advanceHead(h, m); // (help) dequeue |
| if (fulfilled) { |
| if ((waiter = m.waiter) != null) |
| LockSupport.unpark(waiter); |
| return (x != null) ? (E)x : e; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Gets rid of cancelled node s with original predecessor pred. |
| */ |
| void clean(QNode pred, QNode s) { |
| s.forgetWaiter(); |
| /* |
| * At any given time, exactly one node on list cannot be |
| * deleted -- the last inserted node. To accommodate this, |
| * if we cannot delete s, we save its predecessor as |
| * "cleanMe", deleting the previously saved version |
| * first. At least one of node s or the node previously |
| * saved can always be deleted, so this always terminates. |
| */ |
| while (pred.next == s) { // Return early if already unlinked |
| QNode h = head; |
| QNode hn = h.next; // Absorb cancelled first node as head |
| if (hn != null && hn.isCancelled()) { |
| advanceHead(h, hn); |
| continue; |
| } |
| QNode t = tail; // Ensure consistent read for tail |
| if (t == h) |
| return; |
| QNode tn = t.next; |
| if (t != tail) |
| continue; |
| if (tn != null) { |
| advanceTail(t, tn); |
| continue; |
| } |
| if (s != t) { // If not tail, try to unsplice |
| QNode sn = s.next; |
| if (sn == s || pred.casNext(s, sn)) |
| return; |
| } |
| QNode dp = cleanMe; |
| if (dp != null) { // Try unlinking previous cancelled node |
| QNode d = dp.next; |
| QNode dn; |
| if (d == null || // d is gone or |
| d == dp || // d is off list or |
| !d.isCancelled() || // d not cancelled or |
| (d != t && // d not tail and |
| (dn = d.next) != null && // has successor |
| dn != d && // that is on list |
| dp.casNext(d, dn))) // d unspliced |
| casCleanMe(dp, null); |
| if (dp == pred) |
| return; // s is already saved node |
| } else if (casCleanMe(null, pred)) |
| return; // Postpone cleaning s |
| } |
| } |
| |
| // VarHandle mechanics |
| private static final VarHandle QHEAD; |
| private static final VarHandle QTAIL; |
| private static final VarHandle QCLEANME; |
| static { |
| try { |
| MethodHandles.Lookup l = MethodHandles.lookup(); |
| QHEAD = l.findVarHandle(TransferQueue.class, "head", |
| QNode.class); |
| QTAIL = l.findVarHandle(TransferQueue.class, "tail", |
| QNode.class); |
| QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe", |
| QNode.class); |
| } catch (ReflectiveOperationException e) { |
| throw new ExceptionInInitializerError(e); |
| } |
| } |
| } |
| |
| /** |
| * The transferer. Set only in constructor, but cannot be declared |
| * as final without further complicating serialization. Since |
| * this is accessed only at most once per public method, there |
| * isn't a noticeable performance penalty for using volatile |
| * instead of final here. |
| */ |
| private transient volatile Transferer<E> transferer; |
| |
| /** |
| * Creates a {@code SynchronousQueue} with nonfair access policy. |
| */ |
| public SynchronousQueue() { |
| this(false); |
| } |
| |
| /** |
| * Creates a {@code SynchronousQueue} with the specified fairness policy. |
| * |
| * @param fair if true, waiting threads contend in FIFO order for |
| * access; otherwise the order is unspecified. |
| */ |
| public SynchronousQueue(boolean fair) { |
| transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); |
| } |
| |
| /** |
| * Adds the specified element to this queue, waiting if necessary for |
| * another thread to receive it. |
| * |
| * @throws InterruptedException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public void put(E e) throws InterruptedException { |
| if (e == null) throw new NullPointerException(); |
| if (transferer.transfer(e, false, 0) == null) { |
| Thread.interrupted(); |
| throw new InterruptedException(); |
| } |
| } |
| |
| /** |
| * Inserts the specified element into this queue, waiting if necessary |
| * up to the specified wait time for another thread to receive it. |
| * |
| * @return {@code true} if successful, or {@code false} if the |
| * specified waiting time elapses before a consumer appears |
| * @throws InterruptedException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public boolean offer(E e, long timeout, TimeUnit unit) |
| throws InterruptedException { |
| if (e == null) throw new NullPointerException(); |
| if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) |
| return true; |
| if (!Thread.interrupted()) |
| return false; |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Inserts the specified element into this queue, if another thread is |
| * waiting to receive it. |
| * |
| * @param e the element to add |
| * @return {@code true} if the element was added to this queue, else |
| * {@code false} |
| * @throws NullPointerException if the specified element is null |
| */ |
| public boolean offer(E e) { |
| if (e == null) throw new NullPointerException(); |
| return transferer.transfer(e, true, 0) != null; |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, waiting if necessary |
| * for another thread to insert it. |
| * |
| * @return the head of this queue |
| * @throws InterruptedException {@inheritDoc} |
| */ |
| public E take() throws InterruptedException { |
| E e = transferer.transfer(null, false, 0); |
| if (e != null) |
| return e; |
| Thread.interrupted(); |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, waiting |
| * if necessary up to the specified wait time, for another thread |
| * to insert it. |
| * |
| * @return the head of this queue, or {@code null} if the |
| * specified waiting time elapses before an element is present |
| * @throws InterruptedException {@inheritDoc} |
| */ |
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| E e = transferer.transfer(null, true, unit.toNanos(timeout)); |
| if (e != null || !Thread.interrupted()) |
| return e; |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, if another thread |
| * is currently making an element available. |
| * |
| * @return the head of this queue, or {@code null} if no |
| * element is available |
| */ |
| public E poll() { |
| return transferer.transfer(null, true, 0); |
| } |
| |
| /** |
| * Always returns {@code true}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return {@code true} |
| */ |
| public boolean isEmpty() { |
| return true; |
| } |
| |
| /** |
| * Always returns zero. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return zero |
| */ |
| public int size() { |
| return 0; |
| } |
| |
| /** |
| * Always returns zero. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return zero |
| */ |
| public int remainingCapacity() { |
| return 0; |
| } |
| |
| /** |
| * Does nothing. |
| * A {@code SynchronousQueue} has no internal capacity. |
| */ |
| public void clear() { |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param o the element |
| * @return {@code false} |
| */ |
| public boolean contains(Object o) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param o the element to remove |
| * @return {@code false} |
| */ |
| public boolean remove(Object o) { |
| return false; |
| } |
| |
| /** |
| * Returns {@code false} unless the given collection is empty. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} unless given collection is empty |
| */ |
| public boolean containsAll(Collection<?> c) { |
| return c.isEmpty(); |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} |
| */ |
| public boolean removeAll(Collection<?> c) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} |
| */ |
| public boolean retainAll(Collection<?> c) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code null}. |
| * A {@code SynchronousQueue} does not return elements |
| * unless actively waited on. |
| * |
| * @return {@code null} |
| */ |
| public E peek() { |
| return null; |
| } |
| |
| /** |
| * Returns an empty iterator in which {@code hasNext} always returns |
| * {@code false}. |
| * |
| * @return an empty iterator |
| */ |
| public Iterator<E> iterator() { |
| return Collections.emptyIterator(); |
| } |
| |
| /** |
| * Returns an empty spliterator in which calls to |
| * {@link Spliterator#trySplit() trySplit} always return {@code null}. |
| * |
| * @return an empty spliterator |
| * @since 1.8 |
| */ |
| public Spliterator<E> spliterator() { |
| return Spliterators.emptySpliterator(); |
| } |
| |
| /** |
| * Returns a zero-length array. |
| * @return a zero-length array |
| */ |
| public Object[] toArray() { |
| return new Object[0]; |
| } |
| |
| /** |
| * Sets the zeroth element of the specified array to {@code null} |
| * (if the array has non-zero length) and returns it. |
| * |
| * @param a the array |
| * @return the specified array |
| * @throws NullPointerException if the specified array is null |
| */ |
| public <T> T[] toArray(T[] a) { |
| if (a.length > 0) |
| a[0] = null; |
| return a; |
| } |
| |
| /** |
| * Always returns {@code "[]"}. |
| * @return {@code "[]"} |
| */ |
| public String toString() { |
| return "[]"; |
| } |
| |
| /** |
| * @throws UnsupportedOperationException {@inheritDoc} |
| * @throws ClassCastException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| * @throws IllegalArgumentException {@inheritDoc} |
| */ |
| public int drainTo(Collection<? super E> c) { |
| Objects.requireNonNull(c); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| int n = 0; |
| for (E e; (e = poll()) != null; n++) |
| c.add(e); |
| return n; |
| } |
| |
| /** |
| * @throws UnsupportedOperationException {@inheritDoc} |
| * @throws ClassCastException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| * @throws IllegalArgumentException {@inheritDoc} |
| */ |
| public int drainTo(Collection<? super E> c, int maxElements) { |
| Objects.requireNonNull(c); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| int n = 0; |
| for (E e; n < maxElements && (e = poll()) != null; n++) |
| c.add(e); |
| return n; |
| } |
| |
| /* |
| * To cope with serialization strategy in the 1.5 version of |
| * SynchronousQueue, we declare some unused classes and fields |
| * that exist solely to enable serializability across versions. |
| * These fields are never used, so are initialized only if this |
| * object is ever serialized or deserialized. |
| */ |
| |
| @SuppressWarnings("serial") |
| static class WaitQueue implements java.io.Serializable { } |
| static class LifoWaitQueue extends WaitQueue { |
| private static final long serialVersionUID = -3633113410248163686L; |
| } |
| static class FifoWaitQueue extends WaitQueue { |
| private static final long serialVersionUID = -3623113410248163686L; |
| } |
| private ReentrantLock qlock; |
| private WaitQueue waitingProducers; |
| private WaitQueue waitingConsumers; |
| |
| /** |
| * Saves this queue to a stream (that is, serializes it). |
| * @param s the stream |
| * @throws java.io.IOException if an I/O error occurs |
| */ |
| private void writeObject(java.io.ObjectOutputStream s) |
| throws java.io.IOException { |
| boolean fair = transferer instanceof TransferQueue; |
| if (fair) { |
| qlock = new ReentrantLock(true); |
| waitingProducers = new FifoWaitQueue(); |
| waitingConsumers = new FifoWaitQueue(); |
| } |
| else { |
| qlock = new ReentrantLock(); |
| waitingProducers = new LifoWaitQueue(); |
| waitingConsumers = new LifoWaitQueue(); |
| } |
| s.defaultWriteObject(); |
| } |
| |
| /** |
| * Reconstitutes this queue from a stream (that is, deserializes it). |
| * @param s the stream |
| * @throws ClassNotFoundException if the class of a serialized object |
| * could not be found |
| * @throws java.io.IOException if an I/O error occurs |
| */ |
| private void readObject(java.io.ObjectInputStream s) |
| throws java.io.IOException, ClassNotFoundException { |
| s.defaultReadObject(); |
| if (waitingProducers instanceof FifoWaitQueue) |
| transferer = new TransferQueue<E>(); |
| else |
| transferer = new TransferStack<E>(); |
| } |
| |
| static { |
| // Reduce the risk of rare disastrous classloading in first call to |
| // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 |
| Class<?> ensureLoaded = LockSupport.class; |
| } |
| } |