| /* |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| |
| /* |
| * This file is available under and governed by the GNU General Public |
| * License version 2 only, as published by the Free Software Foundation. |
| * However, the following notice accompanied the original version of this |
| * file: |
| * |
| * Written by Doug Lea with assistance from members of JCP JSR-166 |
| * Expert Group and released to the public domain, as explained at |
| * http://creativecommons.org/publicdomain/zero/1.0/ |
| */ |
| |
| package java.util.concurrent; |
| |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| /** |
| * Provides default implementations of {@link ExecutorService} |
| * execution methods. This class implements the {@code submit}, |
| * {@code invokeAny} and {@code invokeAll} methods using a |
| * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults |
| * to the {@link FutureTask} class provided in this package. For example, |
| * the implementation of {@code submit(Runnable)} creates an |
| * associated {@code RunnableFuture} that is executed and |
| * returned. Subclasses may override the {@code newTaskFor} methods |
| * to return {@code RunnableFuture} implementations other than |
| * {@code FutureTask}. |
| * |
| * <p><b>Extension example</b>. Here is a sketch of a class |
| * that customizes {@link ThreadPoolExecutor} to use |
| * a {@code CustomTask} class instead of the default {@code FutureTask}: |
| * <pre> {@code |
| * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { |
| * |
| * static class CustomTask<V> implements RunnableFuture<V> {...} |
| * |
| * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { |
| * return new CustomTask<V>(c); |
| * } |
| * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { |
| * return new CustomTask<V>(r, v); |
| * } |
| * // ... add constructors, etc. |
| * }}</pre> |
| * |
| * @since 1.5 |
| * @author Doug Lea |
| */ |
| public abstract class AbstractExecutorService implements ExecutorService { |
| |
| /** |
| * Returns a {@code RunnableFuture} for the given runnable and default |
| * value. |
| * |
| * @param runnable the runnable task being wrapped |
| * @param value the default value for the returned future |
| * @param <T> the type of the given value |
| * @return a {@code RunnableFuture} which, when run, will run the |
| * underlying runnable and which, as a {@code Future}, will yield |
| * the given value as its result and provide for cancellation of |
| * the underlying task |
| * @since 1.6 |
| */ |
| protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { |
| return new FutureTask<T>(runnable, value); |
| } |
| |
| /** |
| * Returns a {@code RunnableFuture} for the given callable task. |
| * |
| * @param callable the callable task being wrapped |
| * @param <T> the type of the callable's result |
| * @return a {@code RunnableFuture} which, when run, will call the |
| * underlying callable and which, as a {@code Future}, will yield |
| * the callable's result as its result and provide for |
| * cancellation of the underlying task |
| * @since 1.6 |
| */ |
| protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { |
| return new FutureTask<T>(callable); |
| } |
| |
| /** |
| * @throws RejectedExecutionException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public Future<?> submit(Runnable task) { |
| if (task == null) throw new NullPointerException(); |
| RunnableFuture<Void> ftask = newTaskFor(task, null); |
| execute(ftask); |
| return ftask; |
| } |
| |
| /** |
| * @throws RejectedExecutionException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public <T> Future<T> submit(Runnable task, T result) { |
| if (task == null) throw new NullPointerException(); |
| RunnableFuture<T> ftask = newTaskFor(task, result); |
| execute(ftask); |
| return ftask; |
| } |
| |
| /** |
| * @throws RejectedExecutionException {@inheritDoc} |
| * @throws NullPointerException {@inheritDoc} |
| */ |
| public <T> Future<T> submit(Callable<T> task) { |
| if (task == null) throw new NullPointerException(); |
| RunnableFuture<T> ftask = newTaskFor(task); |
| execute(ftask); |
| return ftask; |
| } |
| |
| /** |
| * the main mechanics of invokeAny. |
| */ |
| private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, |
| boolean timed, long nanos) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| if (tasks == null) |
| throw new NullPointerException(); |
| int ntasks = tasks.size(); |
| if (ntasks == 0) |
| throw new IllegalArgumentException(); |
| ArrayList<Future<T>> futures = new ArrayList<>(ntasks); |
| ExecutorCompletionService<T> ecs = |
| new ExecutorCompletionService<T>(this); |
| |
| // For efficiency, especially in executors with limited |
| // parallelism, check to see if previously submitted tasks are |
| // done before submitting more of them. This interleaving |
| // plus the exception mechanics account for messiness of main |
| // loop. |
| |
| try { |
| // Record exceptions so that if we fail to obtain any |
| // result, we can throw the last exception we got. |
| ExecutionException ee = null; |
| final long deadline = timed ? System.nanoTime() + nanos : 0L; |
| Iterator<? extends Callable<T>> it = tasks.iterator(); |
| |
| // Start one task for sure; the rest incrementally |
| futures.add(ecs.submit(it.next())); |
| --ntasks; |
| int active = 1; |
| |
| for (;;) { |
| Future<T> f = ecs.poll(); |
| if (f == null) { |
| if (ntasks > 0) { |
| --ntasks; |
| futures.add(ecs.submit(it.next())); |
| ++active; |
| } |
| else if (active == 0) |
| break; |
| else if (timed) { |
| f = ecs.poll(nanos, NANOSECONDS); |
| if (f == null) |
| throw new TimeoutException(); |
| nanos = deadline - System.nanoTime(); |
| } |
| else |
| f = ecs.take(); |
| } |
| if (f != null) { |
| --active; |
| try { |
| return f.get(); |
| } catch (ExecutionException eex) { |
| ee = eex; |
| } catch (RuntimeException rex) { |
| ee = new ExecutionException(rex); |
| } |
| } |
| } |
| |
| if (ee == null) |
| ee = new ExecutionException(); |
| throw ee; |
| |
| } finally { |
| cancelAll(futures); |
| } |
| } |
| |
| public <T> T invokeAny(Collection<? extends Callable<T>> tasks) |
| throws InterruptedException, ExecutionException { |
| try { |
| return doInvokeAny(tasks, false, 0); |
| } catch (TimeoutException cannotHappen) { |
| assert false; |
| return null; |
| } |
| } |
| |
| public <T> T invokeAny(Collection<? extends Callable<T>> tasks, |
| long timeout, TimeUnit unit) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| return doInvokeAny(tasks, true, unit.toNanos(timeout)); |
| } |
| |
| public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
| throws InterruptedException { |
| if (tasks == null) |
| throw new NullPointerException(); |
| ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); |
| try { |
| for (Callable<T> t : tasks) { |
| RunnableFuture<T> f = newTaskFor(t); |
| futures.add(f); |
| execute(f); |
| } |
| for (int i = 0, size = futures.size(); i < size; i++) { |
| Future<T> f = futures.get(i); |
| if (!f.isDone()) { |
| try { f.get(); } |
| catch (CancellationException ignore) {} |
| catch (ExecutionException ignore) {} |
| } |
| } |
| return futures; |
| } catch (Throwable t) { |
| cancelAll(futures); |
| throw t; |
| } |
| } |
| |
| public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, |
| long timeout, TimeUnit unit) |
| throws InterruptedException { |
| if (tasks == null) |
| throw new NullPointerException(); |
| final long nanos = unit.toNanos(timeout); |
| final long deadline = System.nanoTime() + nanos; |
| ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); |
| int j = 0; |
| timedOut: try { |
| for (Callable<T> t : tasks) |
| futures.add(newTaskFor(t)); |
| |
| final int size = futures.size(); |
| |
| // Interleave time checks and calls to execute in case |
| // executor doesn't have any/much parallelism. |
| for (int i = 0; i < size; i++) { |
| if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) |
| break timedOut; |
| execute((Runnable)futures.get(i)); |
| } |
| |
| for (; j < size; j++) { |
| Future<T> f = futures.get(j); |
| if (!f.isDone()) { |
| try { f.get(deadline - System.nanoTime(), NANOSECONDS); } |
| catch (CancellationException ignore) {} |
| catch (ExecutionException ignore) {} |
| catch (TimeoutException timedOut) { |
| break timedOut; |
| } |
| } |
| } |
| return futures; |
| } catch (Throwable t) { |
| cancelAll(futures); |
| throw t; |
| } |
| // Timed out before all the tasks could be completed; cancel remaining |
| cancelAll(futures, j); |
| return futures; |
| } |
| |
| private static <T> void cancelAll(ArrayList<Future<T>> futures) { |
| cancelAll(futures, 0); |
| } |
| |
| /** Cancels all futures with index at least j. */ |
| private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) { |
| for (int size = futures.size(); j < size; j++) |
| futures.get(j).cancel(true); |
| } |
| } |