blob: cf2d44793f881787c9164265088a40d2503d3ddd [file] [log] [blame]
Alan Viverette3da604b2020-06-10 18:34:39 +00001/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.util.AbstractQueue;
39import java.util.Collection;
40import java.util.Iterator;
41import java.util.NoSuchElementException;
42import java.util.Spliterator;
43import java.util.Spliterators;
44import java.util.concurrent.atomic.AtomicInteger;
45import java.util.concurrent.locks.Condition;
46import java.util.concurrent.locks.ReentrantLock;
47import java.util.function.Consumer;
48
49// BEGIN android-note
50// removed link to collections framework docs
51// END android-note
52
53/**
54 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
55 * linked nodes.
56 * This queue orders elements FIFO (first-in-first-out).
57 * The <em>head</em> of the queue is that element that has been on the
58 * queue the longest time.
59 * The <em>tail</em> of the queue is that element that has been on the
60 * queue the shortest time. New elements
61 * are inserted at the tail of the queue, and the queue retrieval
62 * operations obtain elements at the head of the queue.
63 * Linked queues typically have higher throughput than array-based queues but
64 * less predictable performance in most concurrent applications.
65 *
66 * <p>The optional capacity bound constructor argument serves as a
67 * way to prevent excessive queue expansion. The capacity, if unspecified,
68 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
69 * dynamically created upon each insertion unless this would bring the
70 * queue above capacity.
71 *
72 * <p>This class and its iterator implement all of the
73 * <em>optional</em> methods of the {@link Collection} and {@link
74 * Iterator} interfaces.
75 *
76 * @since 1.5
77 * @author Doug Lea
78 * @param <E> the type of elements held in this queue
79 */
80public class LinkedBlockingQueue<E> extends AbstractQueue<E>
81 implements BlockingQueue<E>, java.io.Serializable {
82 private static final long serialVersionUID = -6903933977591709194L;
83
84 /*
85 * A variant of the "two lock queue" algorithm. The putLock gates
86 * entry to put (and offer), and has an associated condition for
87 * waiting puts. Similarly for the takeLock. The "count" field
88 * that they both rely on is maintained as an atomic to avoid
89 * needing to get both locks in most cases. Also, to minimize need
90 * for puts to get takeLock and vice-versa, cascading notifies are
91 * used. When a put notices that it has enabled at least one take,
92 * it signals taker. That taker in turn signals others if more
93 * items have been entered since the signal. And symmetrically for
94 * takes signalling puts. Operations such as remove(Object) and
95 * iterators acquire both locks.
96 *
97 * Visibility between writers and readers is provided as follows:
98 *
99 * Whenever an element is enqueued, the putLock is acquired and
100 * count updated. A subsequent reader guarantees visibility to the
101 * enqueued Node by either acquiring the putLock (via fullyLock)
102 * or by acquiring the takeLock, and then reading n = count.get();
103 * this gives visibility to the first n items.
104 *
105 * To implement weakly consistent iterators, it appears we need to
106 * keep all Nodes GC-reachable from a predecessor dequeued Node.
107 * That would cause two problems:
108 * - allow a rogue Iterator to cause unbounded memory retention
109 * - cause cross-generational linking of old Nodes to new Nodes if
110 * a Node was tenured while live, which generational GCs have a
111 * hard time dealing with, causing repeated major collections.
112 * However, only non-deleted Nodes need to be reachable from
113 * dequeued Nodes, and reachability does not necessarily have to
114 * be of the kind understood by the GC. We use the trick of
115 * linking a Node that has just been dequeued to itself. Such a
116 * self-link implicitly means to advance to head.next.
117 */
118
119 /**
120 * Linked list node class.
121 */
122 static class Node<E> {
123 E item;
124
125 /**
126 * One of:
127 * - the real successor Node
128 * - this Node, meaning the successor is head.next
129 * - null, meaning there is no successor (this is the last node)
130 */
131 Node<E> next;
132
133 Node(E x) { item = x; }
134 }
135
136 /** The capacity bound, or Integer.MAX_VALUE if none */
137 private final int capacity;
138
139 /** Current number of elements */
140 private final AtomicInteger count = new AtomicInteger();
141
142 /**
143 * Head of linked list.
144 * Invariant: head.item == null
145 */
146 transient Node<E> head;
147
148 /**
149 * Tail of linked list.
150 * Invariant: last.next == null
151 */
152 private transient Node<E> last;
153
154 /** Lock held by take, poll, etc */
155 private final ReentrantLock takeLock = new ReentrantLock();
156
157 /** Wait queue for waiting takes */
158 private final Condition notEmpty = takeLock.newCondition();
159
160 /** Lock held by put, offer, etc */
161 private final ReentrantLock putLock = new ReentrantLock();
162
163 /** Wait queue for waiting puts */
164 private final Condition notFull = putLock.newCondition();
165
166 /**
167 * Signals a waiting take. Called only from put/offer (which do not
168 * otherwise ordinarily lock takeLock.)
169 */
170 private void signalNotEmpty() {
171 final ReentrantLock takeLock = this.takeLock;
172 takeLock.lock();
173 try {
174 notEmpty.signal();
175 } finally {
176 takeLock.unlock();
177 }
178 }
179
180 /**
181 * Signals a waiting put. Called only from take/poll.
182 */
183 private void signalNotFull() {
184 final ReentrantLock putLock = this.putLock;
185 putLock.lock();
186 try {
187 notFull.signal();
188 } finally {
189 putLock.unlock();
190 }
191 }
192
193 /**
194 * Links node at end of queue.
195 *
196 * @param node the node
197 */
198 private void enqueue(Node<E> node) {
199 // assert putLock.isHeldByCurrentThread();
200 // assert last.next == null;
201 last = last.next = node;
202 }
203
204 /**
205 * Removes a node from head of queue.
206 *
207 * @return the node
208 */
209 private E dequeue() {
210 // assert takeLock.isHeldByCurrentThread();
211 // assert head.item == null;
212 Node<E> h = head;
213 Node<E> first = h.next;
214 h.next = h; // help GC
215 head = first;
216 E x = first.item;
217 first.item = null;
218 return x;
219 }
220
221 /**
222 * Locks to prevent both puts and takes.
223 */
224 void fullyLock() {
225 putLock.lock();
226 takeLock.lock();
227 }
228
229 /**
230 * Unlocks to allow both puts and takes.
231 */
232 void fullyUnlock() {
233 takeLock.unlock();
234 putLock.unlock();
235 }
236
237// /**
238// * Tells whether both locks are held by current thread.
239// */
240// boolean isFullyLocked() {
241// return (putLock.isHeldByCurrentThread() &&
242// takeLock.isHeldByCurrentThread());
243// }
244
245 /**
246 * Creates a {@code LinkedBlockingQueue} with a capacity of
247 * {@link Integer#MAX_VALUE}.
248 */
249 public LinkedBlockingQueue() {
250 this(Integer.MAX_VALUE);
251 }
252
253 /**
254 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
255 *
256 * @param capacity the capacity of this queue
257 * @throws IllegalArgumentException if {@code capacity} is not greater
258 * than zero
259 */
260 public LinkedBlockingQueue(int capacity) {
261 if (capacity <= 0) throw new IllegalArgumentException();
262 this.capacity = capacity;
263 last = head = new Node<E>(null);
264 }
265
266 /**
267 * Creates a {@code LinkedBlockingQueue} with a capacity of
268 * {@link Integer#MAX_VALUE}, initially containing the elements of the
269 * given collection,
270 * added in traversal order of the collection's iterator.
271 *
272 * @param c the collection of elements to initially contain
273 * @throws NullPointerException if the specified collection or any
274 * of its elements are null
275 */
276 public LinkedBlockingQueue(Collection<? extends E> c) {
277 this(Integer.MAX_VALUE);
278 final ReentrantLock putLock = this.putLock;
279 putLock.lock(); // Never contended, but necessary for visibility
280 try {
281 int n = 0;
282 for (E e : c) {
283 if (e == null)
284 throw new NullPointerException();
285 if (n == capacity)
286 throw new IllegalStateException("Queue full");
287 enqueue(new Node<E>(e));
288 ++n;
289 }
290 count.set(n);
291 } finally {
292 putLock.unlock();
293 }
294 }
295
296 // this doc comment is overridden to remove the reference to collections
297 // greater in size than Integer.MAX_VALUE
298 /**
299 * Returns the number of elements in this queue.
300 *
301 * @return the number of elements in this queue
302 */
303 public int size() {
304 return count.get();
305 }
306
307 // this doc comment is a modified copy of the inherited doc comment,
308 // without the reference to unlimited queues.
309 /**
310 * Returns the number of additional elements that this queue can ideally
311 * (in the absence of memory or resource constraints) accept without
312 * blocking. This is always equal to the initial capacity of this queue
313 * less the current {@code size} of this queue.
314 *
315 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
316 * an element will succeed by inspecting {@code remainingCapacity}
317 * because it may be the case that another thread is about to
318 * insert or remove an element.
319 */
320 public int remainingCapacity() {
321 return capacity - count.get();
322 }
323
324 /**
325 * Inserts the specified element at the tail of this queue, waiting if
326 * necessary for space to become available.
327 *
328 * @throws InterruptedException {@inheritDoc}
329 * @throws NullPointerException {@inheritDoc}
330 */
331 public void put(E e) throws InterruptedException {
332 if (e == null) throw new NullPointerException();
333 // Note: convention in all put/take/etc is to preset local var
334 // holding count negative to indicate failure unless set.
335 int c = -1;
336 Node<E> node = new Node<E>(e);
337 final ReentrantLock putLock = this.putLock;
338 final AtomicInteger count = this.count;
339 putLock.lockInterruptibly();
340 try {
341 /*
342 * Note that count is used in wait guard even though it is
343 * not protected by lock. This works because count can
344 * only decrease at this point (all other puts are shut
345 * out by lock), and we (or some other waiting put) are
346 * signalled if it ever changes from capacity. Similarly
347 * for all other uses of count in other wait guards.
348 */
349 while (count.get() == capacity) {
350 notFull.await();
351 }
352 enqueue(node);
353 c = count.getAndIncrement();
354 if (c + 1 < capacity)
355 notFull.signal();
356 } finally {
357 putLock.unlock();
358 }
359 if (c == 0)
360 signalNotEmpty();
361 }
362
363 /**
364 * Inserts the specified element at the tail of this queue, waiting if
365 * necessary up to the specified wait time for space to become available.
366 *
367 * @return {@code true} if successful, or {@code false} if
368 * the specified waiting time elapses before space is available
369 * @throws InterruptedException {@inheritDoc}
370 * @throws NullPointerException {@inheritDoc}
371 */
372 public boolean offer(E e, long timeout, TimeUnit unit)
373 throws InterruptedException {
374
375 if (e == null) throw new NullPointerException();
376 long nanos = unit.toNanos(timeout);
377 int c = -1;
378 final ReentrantLock putLock = this.putLock;
379 final AtomicInteger count = this.count;
380 putLock.lockInterruptibly();
381 try {
382 while (count.get() == capacity) {
383 if (nanos <= 0L)
384 return false;
385 nanos = notFull.awaitNanos(nanos);
386 }
387 enqueue(new Node<E>(e));
388 c = count.getAndIncrement();
389 if (c + 1 < capacity)
390 notFull.signal();
391 } finally {
392 putLock.unlock();
393 }
394 if (c == 0)
395 signalNotEmpty();
396 return true;
397 }
398
399 /**
400 * Inserts the specified element at the tail of this queue if it is
401 * possible to do so immediately without exceeding the queue's capacity,
402 * returning {@code true} upon success and {@code false} if this queue
403 * is full.
404 * When using a capacity-restricted queue, this method is generally
405 * preferable to method {@link BlockingQueue#add add}, which can fail to
406 * insert an element only by throwing an exception.
407 *
408 * @throws NullPointerException if the specified element is null
409 */
410 public boolean offer(E e) {
411 if (e == null) throw new NullPointerException();
412 final AtomicInteger count = this.count;
413 if (count.get() == capacity)
414 return false;
415 int c = -1;
416 Node<E> node = new Node<E>(e);
417 final ReentrantLock putLock = this.putLock;
418 putLock.lock();
419 try {
420 if (count.get() < capacity) {
421 enqueue(node);
422 c = count.getAndIncrement();
423 if (c + 1 < capacity)
424 notFull.signal();
425 }
426 } finally {
427 putLock.unlock();
428 }
429 if (c == 0)
430 signalNotEmpty();
431 return c >= 0;
432 }
433
434 public E take() throws InterruptedException {
435 E x;
436 int c = -1;
437 final AtomicInteger count = this.count;
438 final ReentrantLock takeLock = this.takeLock;
439 takeLock.lockInterruptibly();
440 try {
441 while (count.get() == 0) {
442 notEmpty.await();
443 }
444 x = dequeue();
445 c = count.getAndDecrement();
446 if (c > 1)
447 notEmpty.signal();
448 } finally {
449 takeLock.unlock();
450 }
451 if (c == capacity)
452 signalNotFull();
453 return x;
454 }
455
456 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457 E x = null;
458 int c = -1;
459 long nanos = unit.toNanos(timeout);
460 final AtomicInteger count = this.count;
461 final ReentrantLock takeLock = this.takeLock;
462 takeLock.lockInterruptibly();
463 try {
464 while (count.get() == 0) {
465 if (nanos <= 0L)
466 return null;
467 nanos = notEmpty.awaitNanos(nanos);
468 }
469 x = dequeue();
470 c = count.getAndDecrement();
471 if (c > 1)
472 notEmpty.signal();
473 } finally {
474 takeLock.unlock();
475 }
476 if (c == capacity)
477 signalNotFull();
478 return x;
479 }
480
481 public E poll() {
482 final AtomicInteger count = this.count;
483 if (count.get() == 0)
484 return null;
485 E x = null;
486 int c = -1;
487 final ReentrantLock takeLock = this.takeLock;
488 takeLock.lock();
489 try {
490 if (count.get() > 0) {
491 x = dequeue();
492 c = count.getAndDecrement();
493 if (c > 1)
494 notEmpty.signal();
495 }
496 } finally {
497 takeLock.unlock();
498 }
499 if (c == capacity)
500 signalNotFull();
501 return x;
502 }
503
504 public E peek() {
505 if (count.get() == 0)
506 return null;
507 final ReentrantLock takeLock = this.takeLock;
508 takeLock.lock();
509 try {
510 return (count.get() > 0) ? head.next.item : null;
511 } finally {
512 takeLock.unlock();
513 }
514 }
515
516 /**
517 * Unlinks interior Node p with predecessor trail.
518 */
519 void unlink(Node<E> p, Node<E> trail) {
520 // assert isFullyLocked();
521 // p.next is not changed, to allow iterators that are
522 // traversing p to maintain their weak-consistency guarantee.
523 p.item = null;
524 trail.next = p.next;
525 if (last == p)
526 last = trail;
527 if (count.getAndDecrement() == capacity)
528 notFull.signal();
529 }
530
531 /**
532 * Removes a single instance of the specified element from this queue,
533 * if it is present. More formally, removes an element {@code e} such
534 * that {@code o.equals(e)}, if this queue contains one or more such
535 * elements.
536 * Returns {@code true} if this queue contained the specified element
537 * (or equivalently, if this queue changed as a result of the call).
538 *
539 * @param o element to be removed from this queue, if present
540 * @return {@code true} if this queue changed as a result of the call
541 */
542 public boolean remove(Object o) {
543 if (o == null) return false;
544 fullyLock();
545 try {
546 for (Node<E> trail = head, p = trail.next;
547 p != null;
548 trail = p, p = p.next) {
549 if (o.equals(p.item)) {
550 unlink(p, trail);
551 return true;
552 }
553 }
554 return false;
555 } finally {
556 fullyUnlock();
557 }
558 }
559
560 /**
561 * Returns {@code true} if this queue contains the specified element.
562 * More formally, returns {@code true} if and only if this queue contains
563 * at least one element {@code e} such that {@code o.equals(e)}.
564 *
565 * @param o object to be checked for containment in this queue
566 * @return {@code true} if this queue contains the specified element
567 */
568 public boolean contains(Object o) {
569 if (o == null) return false;
570 fullyLock();
571 try {
572 for (Node<E> p = head.next; p != null; p = p.next)
573 if (o.equals(p.item))
574 return true;
575 return false;
576 } finally {
577 fullyUnlock();
578 }
579 }
580
581 /**
582 * Returns an array containing all of the elements in this queue, in
583 * proper sequence.
584 *
585 * <p>The returned array will be "safe" in that no references to it are
586 * maintained by this queue. (In other words, this method must allocate
587 * a new array). The caller is thus free to modify the returned array.
588 *
589 * <p>This method acts as bridge between array-based and collection-based
590 * APIs.
591 *
592 * @return an array containing all of the elements in this queue
593 */
594 public Object[] toArray() {
595 fullyLock();
596 try {
597 int size = count.get();
598 Object[] a = new Object[size];
599 int k = 0;
600 for (Node<E> p = head.next; p != null; p = p.next)
601 a[k++] = p.item;
602 return a;
603 } finally {
604 fullyUnlock();
605 }
606 }
607
608 /**
609 * Returns an array containing all of the elements in this queue, in
610 * proper sequence; the runtime type of the returned array is that of
611 * the specified array. If the queue fits in the specified array, it
612 * is returned therein. Otherwise, a new array is allocated with the
613 * runtime type of the specified array and the size of this queue.
614 *
615 * <p>If this queue fits in the specified array with room to spare
616 * (i.e., the array has more elements than this queue), the element in
617 * the array immediately following the end of the queue is set to
618 * {@code null}.
619 *
620 * <p>Like the {@link #toArray()} method, this method acts as bridge between
621 * array-based and collection-based APIs. Further, this method allows
622 * precise control over the runtime type of the output array, and may,
623 * under certain circumstances, be used to save allocation costs.
624 *
625 * <p>Suppose {@code x} is a queue known to contain only strings.
626 * The following code can be used to dump the queue into a newly
627 * allocated array of {@code String}:
628 *
629 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
630 *
631 * Note that {@code toArray(new Object[0])} is identical in function to
632 * {@code toArray()}.
633 *
634 * @param a the array into which the elements of the queue are to
635 * be stored, if it is big enough; otherwise, a new array of the
636 * same runtime type is allocated for this purpose
637 * @return an array containing all of the elements in this queue
638 * @throws ArrayStoreException if the runtime type of the specified array
639 * is not a supertype of the runtime type of every element in
640 * this queue
641 * @throws NullPointerException if the specified array is null
642 */
643 @SuppressWarnings("unchecked")
644 public <T> T[] toArray(T[] a) {
645 fullyLock();
646 try {
647 int size = count.get();
648 if (a.length < size)
649 a = (T[])java.lang.reflect.Array.newInstance
650 (a.getClass().getComponentType(), size);
651
652 int k = 0;
653 for (Node<E> p = head.next; p != null; p = p.next)
654 a[k++] = (T)p.item;
655 if (a.length > k)
656 a[k] = null;
657 return a;
658 } finally {
659 fullyUnlock();
660 }
661 }
662
663 public String toString() {
664 return Helpers.collectionToString(this);
665 }
666
667 /**
668 * Atomically removes all of the elements from this queue.
669 * The queue will be empty after this call returns.
670 */
671 public void clear() {
672 fullyLock();
673 try {
674 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
675 h.next = h;
676 p.item = null;
677 }
678 head = last;
679 // assert head.item == null && head.next == null;
680 if (count.getAndSet(0) == capacity)
681 notFull.signal();
682 } finally {
683 fullyUnlock();
684 }
685 }
686
687 /**
688 * @throws UnsupportedOperationException {@inheritDoc}
689 * @throws ClassCastException {@inheritDoc}
690 * @throws NullPointerException {@inheritDoc}
691 * @throws IllegalArgumentException {@inheritDoc}
692 */
693 public int drainTo(Collection<? super E> c) {
694 return drainTo(c, Integer.MAX_VALUE);
695 }
696
697 /**
698 * @throws UnsupportedOperationException {@inheritDoc}
699 * @throws ClassCastException {@inheritDoc}
700 * @throws NullPointerException {@inheritDoc}
701 * @throws IllegalArgumentException {@inheritDoc}
702 */
703 public int drainTo(Collection<? super E> c, int maxElements) {
704 if (c == null)
705 throw new NullPointerException();
706 if (c == this)
707 throw new IllegalArgumentException();
708 if (maxElements <= 0)
709 return 0;
710 boolean signalNotFull = false;
711 final ReentrantLock takeLock = this.takeLock;
712 takeLock.lock();
713 try {
714 int n = Math.min(maxElements, count.get());
715 // count.get provides visibility to first n Nodes
716 Node<E> h = head;
717 int i = 0;
718 try {
719 while (i < n) {
720 Node<E> p = h.next;
721 c.add(p.item);
722 p.item = null;
723 h.next = h;
724 h = p;
725 ++i;
726 }
727 return n;
728 } finally {
729 // Restore invariants even if c.add() threw
730 if (i > 0) {
731 // assert h.item == null;
732 head = h;
733 signalNotFull = (count.getAndAdd(-i) == capacity);
734 }
735 }
736 } finally {
737 takeLock.unlock();
738 if (signalNotFull)
739 signalNotFull();
740 }
741 }
742
743 /**
744 * Returns an iterator over the elements in this queue in proper sequence.
745 * The elements will be returned in order from first (head) to last (tail).
746 *
747 * <p>The returned iterator is
748 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
749 *
750 * @return an iterator over the elements in this queue in proper sequence
751 */
752 public Iterator<E> iterator() {
753 return new Itr();
754 }
755
756 private class Itr implements Iterator<E> {
757 /*
758 * Basic weakly-consistent iterator. At all times hold the next
759 * item to hand out so that if hasNext() reports true, we will
760 * still have it to return even if lost race with a take etc.
761 */
762
763 private Node<E> current;
764 private Node<E> lastRet;
765 private E currentElement;
766
767 Itr() {
768 fullyLock();
769 try {
770 current = head.next;
771 if (current != null)
772 currentElement = current.item;
773 } finally {
774 fullyUnlock();
775 }
776 }
777
778 public boolean hasNext() {
779 return current != null;
780 }
781
782 public E next() {
783 fullyLock();
784 try {
785 if (current == null)
786 throw new NoSuchElementException();
787 lastRet = current;
788 E item = null;
789 // Unlike other traversal methods, iterators must handle both:
790 // - dequeued nodes (p.next == p)
791 // - (possibly multiple) interior removed nodes (p.item == null)
792 for (Node<E> p = current, q;; p = q) {
793 if ((q = p.next) == p)
794 q = head.next;
795 if (q == null || (item = q.item) != null) {
796 current = q;
797 E x = currentElement;
798 currentElement = item;
799 return x;
800 }
801 }
802 } finally {
803 fullyUnlock();
804 }
805 }
806
807 public void remove() {
808 if (lastRet == null)
809 throw new IllegalStateException();
810 fullyLock();
811 try {
812 Node<E> node = lastRet;
813 lastRet = null;
814 for (Node<E> trail = head, p = trail.next;
815 p != null;
816 trail = p, p = p.next) {
817 if (p == node) {
818 unlink(p, trail);
819 break;
820 }
821 }
822 } finally {
823 fullyUnlock();
824 }
825 }
826 }
827
828 /** A customized variant of Spliterators.IteratorSpliterator */
829 static final class LBQSpliterator<E> implements Spliterator<E> {
830 static final int MAX_BATCH = 1 << 25; // max batch array size;
831 final LinkedBlockingQueue<E> queue;
832 Node<E> current; // current node; null until initialized
833 int batch; // batch size for splits
834 boolean exhausted; // true when no more nodes
835 long est; // size estimate
836 LBQSpliterator(LinkedBlockingQueue<E> queue) {
837 this.queue = queue;
838 this.est = queue.size();
839 }
840
841 public long estimateSize() { return est; }
842
843 public Spliterator<E> trySplit() {
844 Node<E> h;
845 final LinkedBlockingQueue<E> q = this.queue;
846 int b = batch;
847 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
848 if (!exhausted &&
849 ((h = current) != null || (h = q.head.next) != null) &&
850 h.next != null) {
851 Object[] a = new Object[n];
852 int i = 0;
853 Node<E> p = current;
854 q.fullyLock();
855 try {
856 if (p != null || (p = q.head.next) != null) {
857 do {
858 if ((a[i] = p.item) != null)
859 ++i;
860 } while ((p = p.next) != null && i < n);
861 }
862 } finally {
863 q.fullyUnlock();
864 }
865 if ((current = p) == null) {
866 est = 0L;
867 exhausted = true;
868 }
869 else if ((est -= i) < 0L)
870 est = 0L;
871 if (i > 0) {
872 batch = i;
873 return Spliterators.spliterator
874 (a, 0, i, (Spliterator.ORDERED |
875 Spliterator.NONNULL |
876 Spliterator.CONCURRENT));
877 }
878 }
879 return null;
880 }
881
882 public void forEachRemaining(Consumer<? super E> action) {
883 if (action == null) throw new NullPointerException();
884 final LinkedBlockingQueue<E> q = this.queue;
885 if (!exhausted) {
886 exhausted = true;
887 Node<E> p = current;
888 do {
889 E e = null;
890 q.fullyLock();
891 try {
892 if (p == null)
893 p = q.head.next;
894 while (p != null) {
895 e = p.item;
896 p = p.next;
897 if (e != null)
898 break;
899 }
900 } finally {
901 q.fullyUnlock();
902 }
903 if (e != null)
904 action.accept(e);
905 } while (p != null);
906 }
907 }
908
909 public boolean tryAdvance(Consumer<? super E> action) {
910 if (action == null) throw new NullPointerException();
911 final LinkedBlockingQueue<E> q = this.queue;
912 if (!exhausted) {
913 E e = null;
914 q.fullyLock();
915 try {
916 if (current == null)
917 current = q.head.next;
918 while (current != null) {
919 e = current.item;
920 current = current.next;
921 if (e != null)
922 break;
923 }
924 } finally {
925 q.fullyUnlock();
926 }
927 if (current == null)
928 exhausted = true;
929 if (e != null) {
930 action.accept(e);
931 return true;
932 }
933 }
934 return false;
935 }
936
937 public int characteristics() {
938 return Spliterator.ORDERED | Spliterator.NONNULL |
939 Spliterator.CONCURRENT;
940 }
941 }
942
943 /**
944 * Returns a {@link Spliterator} over the elements in this queue.
945 *
946 * <p>The returned spliterator is
947 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
948 *
949 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
950 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
951 *
952 * @implNote
953 * The {@code Spliterator} implements {@code trySplit} to permit limited
954 * parallelism.
955 *
956 * @return a {@code Spliterator} over the elements in this queue
957 * @since 1.8
958 */
959 public Spliterator<E> spliterator() {
960 return new LBQSpliterator<E>(this);
961 }
962
963 /**
964 * Saves this queue to a stream (that is, serializes it).
965 *
966 * @param s the stream
967 * @throws java.io.IOException if an I/O error occurs
968 * @serialData The capacity is emitted (int), followed by all of
969 * its elements (each an {@code Object}) in the proper order,
970 * followed by a null
971 */
972 private void writeObject(java.io.ObjectOutputStream s)
973 throws java.io.IOException {
974
975 fullyLock();
976 try {
977 // Write out any hidden stuff, plus capacity
978 s.defaultWriteObject();
979
980 // Write out all elements in the proper order.
981 for (Node<E> p = head.next; p != null; p = p.next)
982 s.writeObject(p.item);
983
984 // Use trailing null as sentinel
985 s.writeObject(null);
986 } finally {
987 fullyUnlock();
988 }
989 }
990
991 /**
992 * Reconstitutes this queue from a stream (that is, deserializes it).
993 * @param s the stream
994 * @throws ClassNotFoundException if the class of a serialized object
995 * could not be found
996 * @throws java.io.IOException if an I/O error occurs
997 */
998 private void readObject(java.io.ObjectInputStream s)
999 throws java.io.IOException, ClassNotFoundException {
1000 // Read in capacity, and any hidden stuff
1001 s.defaultReadObject();
1002
1003 count.set(0);
1004 last = head = new Node<E>(null);
1005
1006 // Read in all elements and place in queue
1007 for (;;) {
1008 @SuppressWarnings("unchecked")
1009 E item = (E)s.readObject();
1010 if (item == null)
1011 break;
1012 add(item);
1013 }
1014 }
1015}