| /* |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| /* |
| * This file is available under and governed by the GNU General Public |
| * License version 2 only, as published by the Free Software Foundation. |
| * However, the following notice accompanied the original version of this |
| * file: |
| * |
| * Written by Doug Lea, Bill Scherer, and Michael Scott with |
| * assistance from members of JCP JSR-166 Expert Group and released to |
| * the public domain, as explained at |
| * http://creativecommons.org/publicdomain/zero/1.0/ |
| */ |
| |
| package java.util.concurrent; |
| |
| import java.util.AbstractQueue; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.Spliterator; |
| import java.util.Spliterators; |
| import java.util.concurrent.locks.LockSupport; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| // BEGIN android-note |
| // removed link to collections framework docs |
| // END android-note |
| |
| /** |
| * A {@linkplain BlockingQueue blocking queue} in which each insert |
| * operation must wait for a corresponding remove operation by another |
| * thread, and vice versa. A synchronous queue does not have any |
| * internal capacity, not even a capacity of one. You cannot |
| * {@code peek} at a synchronous queue because an element is only |
| * present when you try to remove it; you cannot insert an element |
| * (using any method) unless another thread is trying to remove it; |
| * you cannot iterate as there is nothing to iterate. The |
| * <em>head</em> of the queue is the element that the first queued |
| * inserting thread is trying to add to the queue; if there is no such |
| * queued thread then no element is available for removal and |
| * {@code poll()} will return {@code null}. For purposes of other |
| * {@code Collection} methods (for example {@code contains}), a |
| * {@code SynchronousQueue} acts as an empty collection. This queue |
| * does not permit {@code null} elements. |
| * |
| * <p>Synchronous queues are similar to rendezvous channels used in |
| * CSP and Ada. They are well suited for handoff designs, in which an |
| * object running in one thread must sync up with an object running |
| * in another thread in order to hand it some information, event, or |
| * task. |
| * |
| * <p>This class supports an optional fairness policy for ordering |
| * waiting producer and consumer threads. By default, this ordering |
| * is not guaranteed. However, a queue constructed with fairness set |
| * to {@code true} grants threads access in FIFO order. |
| * |
| * <p>This class and its iterator implement all of the |
| * <em>optional</em> methods of the {@link Collection} and {@link |
| * Iterator} interfaces. |
| * |
| * @since 1.5 |
| * @author Doug Lea and Bill Scherer and Michael Scott |
| * @param <E> the type of elements held in this queue |
| */ |
| public class SynchronousQueue<E> extends AbstractQueue<E> |
| implements BlockingQueue<E>, java.io.Serializable { |
| private static final long serialVersionUID = -3223113410248163686L; |
| |
| /* |
| * This class implements extensions of the dual stack and dual |
| * queue algorithms described in "Nonblocking Concurrent Objects |
| * with Condition Synchronization", by W. N. Scherer III and |
| * M. L. Scott. 18th Annual Conf. on Distributed Computing, |
| * Oct. 2004 (see also |
| * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). |
| * The (Lifo) stack is used for non-fair mode, and the (Fifo) |
| * queue for fair mode. The performance of the two is generally |
| * similar. Fifo usually supports higher throughput under |
| * contention but Lifo maintains higher thread locality in common |
| * applications. |
| * |
| * A dual queue (and similarly stack) is one that at any given |
| * time either holds "data" -- items provided by put operations, |
| * or "requests" -- slots representing take operations, or is |
| * empty. A call to "fulfill" (i.e., a call requesting an item |
| * from a queue holding data or vice versa) dequeues a |
| * complementary node. The most interesting feature of these |
| * queues is that any operation can figure out which mode the |
| * queue is in, and act accordingly without needing locks. |
| * |
| * Both the queue and stack extend abstract class Transferer |
| * defining the single method transfer that does a put or a |
| * take. These are unified into a single method because in dual |
| * data structures, the put and take operations are symmetrical, |
| * so nearly all code can be combined. The resulting transfer |
| * methods are on the long side, but are easier to follow than |
| * they would be if broken up into nearly-duplicated parts. |
| * |
| * The queue and stack data structures share many conceptual |
| * similarities but very few concrete details. For simplicity, |
| * they are kept distinct so that they can later evolve |
| * separately. |
| * |
| * The algorithms here differ from the versions in the above paper |
| * in extending them for use in synchronous queues, as well as |
| * dealing with cancellation. The main differences include: |
| * |
| * 1. The original algorithms used bit-marked pointers, but |
| * the ones here use mode bits in nodes, leading to a number |
| * of further adaptations. |
| * 2. SynchronousQueues must block threads waiting to become |
| * fulfilled. |
| * 3. Support for cancellation via timeout and interrupts, |
| * including cleaning out cancelled nodes/threads |
| * from lists to avoid garbage retention and memory depletion. |
| * |
| * Blocking is mainly accomplished using LockSupport park/unpark, |
| * except that nodes that appear to be the next ones to become |
| * fulfilled first spin a bit (on multiprocessors only). On very |
| * busy synchronous queues, spinning can dramatically improve |
| * throughput. And on less busy ones, the amount of spinning is |
| * small enough not to be noticeable. |
| * |
| * Cleaning is done in different ways in queues vs stacks. For |
| * queues, we can almost always remove a node immediately in O(1) |
| * time (modulo retries for consistency checks) when it is |
| * cancelled. But if it may be pinned as the current tail, it must |
| * wait until some subsequent cancellation. For stacks, we need a |
| * potentially O(n) traversal to be sure that we can remove the |
| * node, but this can run concurrently with other threads |
| * accessing the stack. |
| * |
| * While garbage collection takes care of most node reclamation |
| * issues that otherwise complicate nonblocking algorithms, care |
| * is taken to "forget" references to data, other nodes, and |
| * threads that might be held on to long-term by blocked |
| * threads. In cases where setting to null would otherwise |
| * conflict with main algorithms, this is done by changing a |
| * node's link to now point to the node itself. This doesn't arise |
| * much for Stack nodes (because blocked threads do not hang on to |
| * old head pointers), but references in Queue nodes must be |
| * aggressively forgotten to avoid reachability of everything any |
| * node has ever referred to since arrival. |
| */ |
| |
| /** |
| * Shared internal API for dual stacks and queues. |
| */ |
| abstract static class Transferer<E> { |
| /** |
| * Performs a put or take. |
| * |
| * @param e if non-null, the item to be handed to a consumer; |
| * if null, requests that transfer return an item |
| * offered by producer. |
| * @param timed if this operation should timeout |
| * @param nanos the timeout, in nanoseconds |
| * @return if non-null, the item provided or received; if null, |
| * the operation failed due to timeout or interrupt -- |
| * the caller can distinguish which of these occurred |
| * by checking Thread.interrupted. |
| */ |
| abstract E transfer(E e, boolean timed, long nanos); |
| } |
| |
| /** |
| * The number of times to spin before blocking in timed waits. |
| * The value is empirically derived -- it works well across a |
| * variety of processors and OSes. Empirically, the best value |
| * seems not to vary with number of CPUs (beyond 2) so is just |
| * a constant. |
| */ |
| static final int MAX_TIMED_SPINS = |
| (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32; |
| |
| /** |
| * The number of times to spin before blocking in untimed waits. |
| * This is greater than timed value because untimed waits spin |
| * faster since they don't need to check times on each spin. |
| */ |
| static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16; |
| |
| /** |
| * The number of nanoseconds for which it is faster to spin |
| * rather than to use timed park. A rough estimate suffices. |
| */ |
| static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; |
| |
| /** Dual stack */ |
| static final class TransferStack<E> extends Transferer<E> { |
| /* |
| * This extends Scherer-Scott dual stack algorithm, differing, |
| * among other ways, by using "covering" nodes rather than |
| * bit-marked pointers: Fulfilling operations push on marker |
| * nodes (with FULFILLING bit set in mode) to reserve a spot |
| * to match a waiting node. |
| */ |
| |
| /* Modes for SNodes, ORed together in node fields */ |
| /** Node represents an unfulfilled consumer */ |
| static final int REQUEST = 0; |
| /** Node represents an unfulfilled producer */ |
| static final int DATA = 1; |
| /** Node is fulfilling another unfulfilled DATA or REQUEST */ |
| static final int FULFILLING = 2; |
| |
| /** Returns true if m has fulfilling bit set. */ |
| static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } |
| |
| /** Node class for TransferStacks. */ |
| static final class SNode { |
| volatile SNode next; // next node in stack |
| volatile SNode match; // the node matched to this |
| volatile Thread waiter; // to control park/unpark |
| Object item; // data; or null for REQUESTs |
| int mode; |
| // Note: item and mode fields don't need to be volatile |
| // since they are always written before, and read after, |
| // other volatile/atomic operations. |
| |
| SNode(Object item) { |
| this.item = item; |
| } |
| |
| boolean casNext(SNode cmp, SNode val) { |
| return cmp == next && |
| U.compareAndSwapObject(this, NEXT, cmp, val); |
| } |
| |
| /** |
| * Tries to match node s to this node, if so, waking up thread. |
| * Fulfillers call tryMatch to identify their waiters. |
| * Waiters block until they have been matched. |
| * |
| * @param s the node to match |
| * @return true if successfully matched to s |
| */ |
| boolean tryMatch(SNode s) { |
| if (match == null && |
| U.compareAndSwapObject(this, MATCH, null, s)) { |
| Thread w = waiter; |
| if (w != null) { // waiters need at most one unpark |
| waiter = null; |
| LockSupport.unpark(w); |
| } |
| return true; |
| } |
| return match == s; |
| } |
| |
| /** |
| * Tries to cancel a wait by matching node to itself. |
| */ |
| void tryCancel() { |
| U.compareAndSwapObject(this, MATCH, null, this); |
| } |
| |
| boolean isCancelled() { |
| return match == this; |
| } |
| |
| // Unsafe mechanics |
| private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
| private static final long MATCH; |
| private static final long NEXT; |
| |
| static { |
| try { |
| MATCH = U.objectFieldOffset |
| (SNode.class.getDeclaredField("match")); |
| NEXT = U.objectFieldOffset |
| (SNode.class.getDeclaredField("next")); |
| } catch (ReflectiveOperationException e) { |
| throw new Error(e); |
| } |
| } |
| } |
| |
| /** The head (top) of the stack */ |
| volatile SNode head; |
| |
| boolean casHead(SNode h, SNode nh) { |
| return h == head && |
| U.compareAndSwapObject(this, HEAD, h, nh); |
| } |
| |
| /** |
| * Creates or resets fields of a node. Called only from transfer |
| * where the node to push on stack is lazily created and |
| * reused when possible to help reduce intervals between reads |
| * and CASes of head and to avoid surges of garbage when CASes |
| * to push nodes fail due to contention. |
| */ |
| static SNode snode(SNode s, Object e, SNode next, int mode) { |
| if (s == null) s = new SNode(e); |
| s.mode = mode; |
| s.next = next; |
| return s; |
| } |
| |
| /** |
| * Puts or takes an item. |
| */ |
| @SuppressWarnings("unchecked") |
| E transfer(E e, boolean timed, long nanos) { |
| /* |
| * Basic algorithm is to loop trying one of three actions: |
| * |
| * 1. If apparently empty or already containing nodes of same |
| * mode, try to push node on stack and wait for a match, |
| * returning it, or null if cancelled. |
| * |
| * 2. If apparently containing node of complementary mode, |
| * try to push a fulfilling node on to stack, match |
| * with corresponding waiting node, pop both from |
| * stack, and return matched item. The matching or |
| * unlinking might not actually be necessary because of |
| * other threads performing action 3: |
| * |
| * 3. If top of stack already holds another fulfilling node, |
| * help it out by doing its match and/or pop |
| * operations, and then continue. The code for helping |
| * is essentially the same as for fulfilling, except |
| * that it doesn't return the item. |
| */ |
| |
| SNode s = null; // constructed/reused as needed |
| int mode = (e == null) ? REQUEST : DATA; |
| |
| for (;;) { |
| SNode h = head; |
| if (h == null || h.mode == mode) { // empty or same-mode |
| if (timed && nanos <= 0L) { // can't wait |
| if (h != null && h.isCancelled()) |
| casHead(h, h.next); // pop cancelled node |
| else |
| return null; |
| } else if (casHead(h, s = snode(s, e, h, mode))) { |
| SNode m = awaitFulfill(s, timed, nanos); |
| if (m == s) { // wait was cancelled |
| clean(s); |
| return null; |
| } |
| if ((h = head) != null && h.next == s) |
| casHead(h, s.next); // help s's fulfiller |
| return (E) ((mode == REQUEST) ? m.item : s.item); |
| } |
| } else if (!isFulfilling(h.mode)) { // try to fulfill |
| if (h.isCancelled()) // already cancelled |
| casHead(h, h.next); // pop and retry |
| else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { |
| for (;;) { // loop until matched or waiters disappear |
| SNode m = s.next; // m is s's match |
| if (m == null) { // all waiters are gone |
| casHead(s, null); // pop fulfill node |
| s = null; // use new node next time |
| break; // restart main loop |
| } |
| SNode mn = m.next; |
| if (m.tryMatch(s)) { |
| casHead(s, mn); // pop both s and m |
| return (E) ((mode == REQUEST) ? m.item : s.item); |
| } else // lost match |
| s.casNext(m, mn); // help unlink |
| } |
| } |
| } else { // help a fulfiller |
| SNode m = h.next; // m is h's match |
| if (m == null) // waiter is gone |
| casHead(h, null); // pop fulfilling node |
| else { |
| SNode mn = m.next; |
| if (m.tryMatch(h)) // help match |
| casHead(h, mn); // pop both h and m |
| else // lost match |
| h.casNext(m, mn); // help unlink |
| } |
| } |
| } |
| } |
| |
| /** |
| * Spins/blocks until node s is matched by a fulfill operation. |
| * |
| * @param s the waiting node |
| * @param timed true if timed wait |
| * @param nanos timeout value |
| * @return matched node, or s if cancelled |
| */ |
| SNode awaitFulfill(SNode s, boolean timed, long nanos) { |
| /* |
| * When a node/thread is about to block, it sets its waiter |
| * field and then rechecks state at least one more time |
| * before actually parking, thus covering race vs |
| * fulfiller noticing that waiter is non-null so should be |
| * woken. |
| * |
| * When invoked by nodes that appear at the point of call |
| * to be at the head of the stack, calls to park are |
| * preceded by spins to avoid blocking when producers and |
| * consumers are arriving very close in time. This can |
| * happen enough to bother only on multiprocessors. |
| * |
| * The order of checks for returning out of main loop |
| * reflects fact that interrupts have precedence over |
| * normal returns, which have precedence over |
| * timeouts. (So, on timeout, one last check for match is |
| * done before giving up.) Except that calls from untimed |
| * SynchronousQueue.{poll/offer} don't check interrupts |
| * and don't wait at all, so are trapped in transfer |
| * method rather than calling awaitFulfill. |
| */ |
| final long deadline = timed ? System.nanoTime() + nanos : 0L; |
| Thread w = Thread.currentThread(); |
| int spins = shouldSpin(s) |
| ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) |
| : 0; |
| for (;;) { |
| if (w.isInterrupted()) |
| s.tryCancel(); |
| SNode m = s.match; |
| if (m != null) |
| return m; |
| if (timed) { |
| nanos = deadline - System.nanoTime(); |
| if (nanos <= 0L) { |
| s.tryCancel(); |
| continue; |
| } |
| } |
| if (spins > 0) |
| spins = shouldSpin(s) ? (spins - 1) : 0; |
| else if (s.waiter == null) |
| s.waiter = w; // establish waiter so can park next iter |
| else if (!timed) |
| LockSupport.park(this); |
| else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
| LockSupport.parkNanos(this, nanos); |
| } |
| } |
| |
| /** |
| * Returns true if node s is at head or there is an active |
| * fulfiller. |
| */ |
| boolean shouldSpin(SNode s) { |
| SNode h = head; |
| return (h == s || h == null || isFulfilling(h.mode)); |
| } |
| |
| /** |
| * Unlinks s from the stack. |
| */ |
| void clean(SNode s) { |
| s.item = null; // forget item |
| s.waiter = null; // forget thread |
| |
| /* |
| * At worst we may need to traverse entire stack to unlink |
| * s. If there are multiple concurrent calls to clean, we |
| * might not see s if another thread has already removed |
| * it. But we can stop when we see any node known to |
| * follow s. We use s.next unless it too is cancelled, in |
| * which case we try the node one past. We don't check any |
| * further because we don't want to doubly traverse just to |
| * find sentinel. |
| */ |
| |
| SNode past = s.next; |
| if (past != null && past.isCancelled()) |
| past = past.next; |
| |
| // Absorb cancelled nodes at head |
| SNode p; |
| while ((p = head) != null && p != past && p.isCancelled()) |
| casHead(p, p.next); |
| |
| // Unsplice embedded nodes |
| while (p != null && p != past) { |
| SNode n = p.next; |
| if (n != null && n.isCancelled()) |
| p.casNext(n, n.next); |
| else |
| p = n; |
| } |
| } |
| |
| // Unsafe mechanics |
| private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
| private static final long HEAD; |
| static { |
| try { |
| HEAD = U.objectFieldOffset |
| (TransferStack.class.getDeclaredField("head")); |
| } catch (ReflectiveOperationException e) { |
| throw new Error(e); |
| } |
| } |
| } |
| |
| /** Dual Queue */ |
| static final class TransferQueue<E> extends Transferer<E> { |
| /* |
| * This extends Scherer-Scott dual queue algorithm, differing, |
| * among other ways, by using modes within nodes rather than |
| * marked pointers. The algorithm is a little simpler than |
| * that for stacks because fulfillers do not need explicit |
| * nodes, and matching is done by CAS'ing QNode.item field |
| * from non-null to null (for put) or vice versa (for take). |
| */ |
| |
| /** Node class for TransferQueue. */ |
| static final class QNode { |
| volatile QNode next; // next node in queue |
| volatile Object item; // CAS'ed to or from null |
| volatile Thread waiter; // to control park/unpark |
| final boolean isData; |
| |
| QNode(Object item, boolean isData) { |
| this.item = item; |
| this.isData = isData; |
| } |
| |
| boolean casNext(QNode cmp, QNode val) { |
| return next == cmp && |
| U.compareAndSwapObject(this, NEXT, cmp, val); |
| } |
| |
| boolean casItem(Object cmp, Object val) { |
| return item == cmp && |
| U.compareAndSwapObject(this, ITEM, cmp, val); |
| } |
| |
| /** |
| * Tries to cancel by CAS'ing ref to this as item. |
| */ |
| void tryCancel(Object cmp) { |
| U.compareAndSwapObject(this, ITEM, cmp, this); |
| } |
| |
| boolean isCancelled() { |
| return item == this; |
| } |
| |
| /** |
| * Returns true if this node is known to be off the queue |
| * because its next pointer has been forgotten due to |
| * an advanceHead operation. |
| */ |
| boolean isOffList() { |
| return next == this; |
| } |
| |
| // Unsafe mechanics |
| private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
| private static final long ITEM; |
| private static final long NEXT; |
| |
| static { |
| try { |
| ITEM = U.objectFieldOffset |
| (QNode.class.getDeclaredField("item")); |
| NEXT = U.objectFieldOffset |
| (QNode.class.getDeclaredField("next")); |
| } catch (ReflectiveOperationException e) { |
| throw new Error(e); |
| } |
| } |
| } |
| |
| /** Head of queue */ |
| transient volatile QNode head; |
| /** Tail of queue */ |
| transient volatile QNode tail; |
| /** |
| * Reference to a cancelled node that might not yet have been |
| * unlinked from queue because it was the last inserted node |
| * when it was cancelled. |
| */ |
| transient volatile QNode cleanMe; |
| |
| TransferQueue() { |
| QNode h = new QNode(null, false); // initialize to dummy node. |
| head = h; |
| tail = h; |
| } |
| |
| /** |
| * Tries to cas nh as new head; if successful, unlink |
| * old head's next node to avoid garbage retention. |
| */ |
| void advanceHead(QNode h, QNode nh) { |
| if (h == head && |
| U.compareAndSwapObject(this, HEAD, h, nh)) |
| h.next = h; // forget old next |
| } |
| |
| /** |
| * Tries to cas nt as new tail. |
| */ |
| void advanceTail(QNode t, QNode nt) { |
| if (tail == t) |
| U.compareAndSwapObject(this, TAIL, t, nt); |
| } |
| |
| /** |
| * Tries to CAS cleanMe slot. |
| */ |
| boolean casCleanMe(QNode cmp, QNode val) { |
| return cleanMe == cmp && |
| U.compareAndSwapObject(this, CLEANME, cmp, val); |
| } |
| |
| /** |
| * Puts or takes an item. |
| */ |
| @SuppressWarnings("unchecked") |
| E transfer(E e, boolean timed, long nanos) { |
| /* Basic algorithm is to loop trying to take either of |
| * two actions: |
| * |
| * 1. If queue apparently empty or holding same-mode nodes, |
| * try to add node to queue of waiters, wait to be |
| * fulfilled (or cancelled) and return matching item. |
| * |
| * 2. If queue apparently contains waiting items, and this |
| * call is of complementary mode, try to fulfill by CAS'ing |
| * item field of waiting node and dequeuing it, and then |
| * returning matching item. |
| * |
| * In each case, along the way, check for and try to help |
| * advance head and tail on behalf of other stalled/slow |
| * threads. |
| * |
| * The loop starts off with a null check guarding against |
| * seeing uninitialized head or tail values. This never |
| * happens in current SynchronousQueue, but could if |
| * callers held non-volatile/final ref to the |
| * transferer. The check is here anyway because it places |
| * null checks at top of loop, which is usually faster |
| * than having them implicitly interspersed. |
| */ |
| |
| QNode s = null; // constructed/reused as needed |
| boolean isData = (e != null); |
| |
| for (;;) { |
| QNode t = tail; |
| QNode h = head; |
| if (t == null || h == null) // saw uninitialized value |
| continue; // spin |
| |
| if (h == t || t.isData == isData) { // empty or same-mode |
| QNode tn = t.next; |
| if (t != tail) // inconsistent read |
| continue; |
| if (tn != null) { // lagging tail |
| advanceTail(t, tn); |
| continue; |
| } |
| if (timed && nanos <= 0L) // can't wait |
| return null; |
| if (s == null) |
| s = new QNode(e, isData); |
| if (!t.casNext(null, s)) // failed to link in |
| continue; |
| |
| advanceTail(t, s); // swing tail and wait |
| Object x = awaitFulfill(s, e, timed, nanos); |
| if (x == s) { // wait was cancelled |
| clean(t, s); |
| return null; |
| } |
| |
| if (!s.isOffList()) { // not already unlinked |
| advanceHead(t, s); // unlink if head |
| if (x != null) // and forget fields |
| s.item = s; |
| s.waiter = null; |
| } |
| return (x != null) ? (E)x : e; |
| |
| } else { // complementary-mode |
| QNode m = h.next; // node to fulfill |
| if (t != tail || m == null || h != head) |
| continue; // inconsistent read |
| |
| Object x = m.item; |
| if (isData == (x != null) || // m already fulfilled |
| x == m || // m cancelled |
| !m.casItem(x, e)) { // lost CAS |
| advanceHead(h, m); // dequeue and retry |
| continue; |
| } |
| |
| advanceHead(h, m); // successfully fulfilled |
| LockSupport.unpark(m.waiter); |
| return (x != null) ? (E)x : e; |
| } |
| } |
| } |
| |
| /** |
| * Spins/blocks until node s is fulfilled. |
| * |
| * @param s the waiting node |
| * @param e the comparison value for checking match |
| * @param timed true if timed wait |
| * @param nanos timeout value |
| * @return matched item, or s if cancelled |
| */ |
| Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { |
| /* Same idea as TransferStack.awaitFulfill */ |
| final long deadline = timed ? System.nanoTime() + nanos : 0L; |
| Thread w = Thread.currentThread(); |
| int spins = (head.next == s) |
| ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) |
| : 0; |
| for (;;) { |
| if (w.isInterrupted()) |
| s.tryCancel(e); |
| Object x = s.item; |
| if (x != e) |
| return x; |
| if (timed) { |
| nanos = deadline - System.nanoTime(); |
| if (nanos <= 0L) { |
| s.tryCancel(e); |
| continue; |
| } |
| } |
| if (spins > 0) |
| --spins; |
| else if (s.waiter == null) |
| s.waiter = w; |
| else if (!timed) |
| LockSupport.park(this); |
| else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
| LockSupport.parkNanos(this, nanos); |
| } |
| } |
| |
| /** |
| * Gets rid of cancelled node s with original predecessor pred. |
| */ |
| void clean(QNode pred, QNode s) { |
| s.waiter = null; // forget thread |
| /* |
| * At any given time, exactly one node on list cannot be |
| * deleted -- the last inserted node. To accommodate this, |
| * if we cannot delete s, we save its predecessor as |
| * "cleanMe", deleting the previously saved version |
| * first. At least one of node s or the node previously |
| * saved can always be deleted, so this always terminates. |
| */ |
| while (pred.next == s) { // Return early if already unlinked |
| QNode h = head; |
| QNode hn = h.next; // Absorb cancelled first node as head |
| if (hn != null && hn.isCancelled()) { |
| advanceHead(h, hn); |
| continue; |
| } |
| QNode t = tail; // Ensure consistent read for tail |
| if (t == h) |
| return; |
| QNode tn = t.next; |
| if (t != tail) |
| continue; |
| if (tn != null) { |
| advanceTail(t, tn); |
| continue; |
| } |
| if (s != t) { // If not tail, try to unsplice |
| QNode sn = s.next; |
| if (sn == s || pred.casNext(s, sn)) |
| return; |
| } |
| QNode dp = cleanMe; |
| if (dp != null) { // Try unlinking previous cancelled node |
| QNode d = dp.next; |
| QNode dn; |
| if (d == null || // d is gone or |
| d == dp || // d is off list or |
| !d.isCancelled() || // d not cancelled or |
| (d != t && // d not tail and |
| (dn = d.next) != null && // has successor |
| dn != d && // that is on list |
| dp.casNext(d, dn))) // d unspliced |
| casCleanMe(dp, null); |
| if (dp == pred) |
| return; // s is already saved node |
| } else if (casCleanMe(null, pred)) |
| return; // Postpone cleaning s |
| } |
| } |
| |
| private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); |
| private static final long HEAD; |
| private static final long TAIL; |
| private static final long CLEANME; |
| static { |
| try { |
| HEAD = U.objectFieldOffset |
| (TransferQueue.class.getDeclaredField("head")); |
| TAIL = U.objectFieldOffset |
| (TransferQueue.class.getDeclaredField("tail")); |
| CLEANME = U.objectFieldOffset |
| (TransferQueue.class.getDeclaredField("cleanMe")); |
| } catch (ReflectiveOperationException e) { |
| throw new Error(e); |
| } |
| } |
| } |
| |
| /** |
| * The transferer. Set only in constructor, but cannot be declared |
| * as final without further complicating serialization. Since |
| * this is accessed only at most once per public method, there |
| * isn't a noticeable performance penalty for using volatile |
| * instead of final here. |
| */ |
| private transient volatile Transferer<E> transferer; |
| |
| /** |
| * Creates a {@code SynchronousQueue} with nonfair access policy. |
| */ |
| public SynchronousQueue() { |
| this(false); |
| } |
| |
| /** |
| * Creates a {@code SynchronousQueue} with the specified fairness policy. |
| * |
| * @param fair if true, waiting threads contend in FIFO order for |
| * access; otherwise the order is unspecified. |
| */ |
| public SynchronousQueue(boolean fair) { |
| transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); |
| } |
| |
| /** |
| * Adds the specified element to this queue, waiting if necessary for |
| * another thread to receive it. |
| * |
| * @throws InterruptedException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public void put(E e) throws InterruptedException { |
| if (e == null) throw new NullPointerException(); |
| if (transferer.transfer(e, false, 0) == null) { |
| Thread.interrupted(); |
| throw new InterruptedException(); |
| } |
| } |
| |
| /** |
| * Inserts the specified element into this queue, waiting if necessary |
| * up to the specified wait time for another thread to receive it. |
| * |
| * @return {@code true} if successful, or {@code false} if the |
| * specified waiting time elapses before a consumer appears |
| * @throws InterruptedException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public boolean offer(E e, long timeout, TimeUnit unit) |
| throws InterruptedException { |
| if (e == null) throw new NullPointerException(); |
| if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) |
| return true; |
| if (!Thread.interrupted()) |
| return false; |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Inserts the specified element into this queue, if another thread is |
| * waiting to receive it. |
| * |
| * @param e the element to add |
| * @return {@code true} if the element was added to this queue, else |
| * {@code false} |
| * @throws NullPointerException if the specified element is null |
| */ |
| public boolean offer(E e) { |
| if (e == null) throw new NullPointerException(); |
| return transferer.transfer(e, true, 0) != null; |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, waiting if necessary |
| * for another thread to insert it. |
| * |
| * @return the head of this queue |
| * @throws InterruptedException {@inheritDoc} |
| */ |
| public E take() throws InterruptedException { |
| E e = transferer.transfer(null, false, 0); |
| if (e != null) |
| return e; |
| Thread.interrupted(); |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, waiting |
| * if necessary up to the specified wait time, for another thread |
| * to insert it. |
| * |
| * @return the head of this queue, or {@code null} if the |
| * specified waiting time elapses before an element is present |
| * @throws InterruptedException {@inheritDoc} |
| */ |
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
| E e = transferer.transfer(null, true, unit.toNanos(timeout)); |
| if (e != null || !Thread.interrupted()) |
| return e; |
| throw new InterruptedException(); |
| } |
| |
| /** |
| * Retrieves and removes the head of this queue, if another thread |
| * is currently making an element available. |
| * |
| * @return the head of this queue, or {@code null} if no |
| * element is available |
| */ |
| public E poll() { |
| return transferer.transfer(null, true, 0); |
| } |
| |
| /** |
| * Always returns {@code true}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return {@code true} |
| */ |
| public boolean isEmpty() { |
| return true; |
| } |
| |
| /** |
| * Always returns zero. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return zero |
| */ |
| public int size() { |
| return 0; |
| } |
| |
| /** |
| * Always returns zero. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @return zero |
| */ |
| public int remainingCapacity() { |
| return 0; |
| } |
| |
| /** |
| * Does nothing. |
| * A {@code SynchronousQueue} has no internal capacity. |
| */ |
| public void clear() { |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param o the element |
| * @return {@code false} |
| */ |
| public boolean contains(Object o) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param o the element to remove |
| * @return {@code false} |
| */ |
| public boolean remove(Object o) { |
| return false; |
| } |
| |
| /** |
| * Returns {@code false} unless the given collection is empty. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} unless given collection is empty |
| */ |
| public boolean containsAll(Collection<?> c) { |
| return c.isEmpty(); |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} |
| */ |
| public boolean removeAll(Collection<?> c) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code false}. |
| * A {@code SynchronousQueue} has no internal capacity. |
| * |
| * @param c the collection |
| * @return {@code false} |
| */ |
| public boolean retainAll(Collection<?> c) { |
| return false; |
| } |
| |
| /** |
| * Always returns {@code null}. |
| * A {@code SynchronousQueue} does not return elements |
| * unless actively waited on. |
| * |
| * @return {@code null} |
| */ |
| public E peek() { |
| return null; |
| } |
| |
| /** |
| * Returns an empty iterator in which {@code hasNext} always returns |
| * {@code false}. |
| * |
| * @return an empty iterator |
| */ |
| public Iterator<E> iterator() { |
| return Collections.emptyIterator(); |
| } |
| |
| /** |
| * Returns an empty spliterator in which calls to |
| * {@link java.util.Spliterator#trySplit()} always return {@code null}. |
| * |
| * @return an empty spliterator |
| * @since 1.8 |
| */ |
| public Spliterator<E> spliterator() { |
| return Spliterators.emptySpliterator(); |
| } |
| |
| /** |
| * Returns a zero-length array. |
| * @return a zero-length array |
| */ |
| public Object[] toArray() { |
| return new Object[0]; |
| } |
| |
| /** |
| * Sets the zeroth element of the specified array to {@code null} |
| * (if the array has non-zero length) and returns it. |
| * |
| * @param a the array |
| * @return the specified array |
| * @throws NullPointerException if the specified array is null |
| */ |
| public <T> T[] toArray(T[] a) { |
| if (a.length > 0) |
| a[0] = null; |
| return a; |
| } |
| |
| /** |
| * Always returns {@code "[]"}. |
| * @return {@code "[]"} |
| */ |
| public String toString() { |
| return "[]"; |
| } |
| |
| /** |
| * @throws UnsupportedOperationException {@inheritDoc} |
| * @throws ClassCastException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| * @throws IllegalArgumentException {@inheritDoc} |
| */ |
| public int drainTo(Collection<? super E> c) { |
| if (c == null) |
| throw new NullPointerException(); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| int n = 0; |
| for (E e; (e = poll()) != null;) { |
| c.add(e); |
| ++n; |
| } |
| return n; |
| } |
| |
| /** |
| * @throws UnsupportedOperationException {@inheritDoc} |
| * @throws ClassCastException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| * @throws IllegalArgumentException {@inheritDoc} |
| */ |
| public int drainTo(Collection<? super E> c, int maxElements) { |
| if (c == null) |
| throw new NullPointerException(); |
| if (c == this) |
| throw new IllegalArgumentException(); |
| int n = 0; |
| for (E e; n < maxElements && (e = poll()) != null;) { |
| c.add(e); |
| ++n; |
| } |
| return n; |
| } |
| |
| /* |
| * To cope with serialization strategy in the 1.5 version of |
| * SynchronousQueue, we declare some unused classes and fields |
| * that exist solely to enable serializability across versions. |
| * These fields are never used, so are initialized only if this |
| * object is ever serialized or deserialized. |
| */ |
| |
| @SuppressWarnings("serial") |
| static class WaitQueue implements java.io.Serializable { } |
| static class LifoWaitQueue extends WaitQueue { |
| private static final long serialVersionUID = -3633113410248163686L; |
| } |
| static class FifoWaitQueue extends WaitQueue { |
| private static final long serialVersionUID = -3623113410248163686L; |
| } |
| private ReentrantLock qlock; |
| private WaitQueue waitingProducers; |
| private WaitQueue waitingConsumers; |
| |
| /** |
| * Saves this queue to a stream (that is, serializes it). |
| * @param s the stream |
| * @throws java.io.IOException if an I/O error occurs |
| */ |
| private void writeObject(java.io.ObjectOutputStream s) |
| throws java.io.IOException { |
| boolean fair = transferer instanceof TransferQueue; |
| if (fair) { |
| qlock = new ReentrantLock(true); |
| waitingProducers = new FifoWaitQueue(); |
| waitingConsumers = new FifoWaitQueue(); |
| } |
| else { |
| qlock = new ReentrantLock(); |
| waitingProducers = new LifoWaitQueue(); |
| waitingConsumers = new LifoWaitQueue(); |
| } |
| s.defaultWriteObject(); |
| } |
| |
| /** |
| * Reconstitutes this queue from a stream (that is, deserializes it). |
| * @param s the stream |
| * @throws ClassNotFoundException if the class of a serialized object |
| * could not be found |
| * @throws java.io.IOException if an I/O error occurs |
| */ |
| private void readObject(java.io.ObjectInputStream s) |
| throws java.io.IOException, ClassNotFoundException { |
| s.defaultReadObject(); |
| if (waitingProducers instanceof FifoWaitQueue) |
| transferer = new TransferQueue<E>(); |
| else |
| transferer = new TransferStack<E>(); |
| } |
| |
| static { |
| // Reduce the risk of rare disastrous classloading in first call to |
| // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 |
| Class<?> ensureLoaded = LockSupport.class; |
| } |
| } |