| /* |
| * Copyright 2018 The gRPC Authors |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.grpc; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.Queue; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.annotation.concurrent.ThreadSafe; |
| |
| /** |
| * A synchronization context is a queue of tasks that run in sequence. It offers following |
| * guarantees: |
| * |
| * <ul> |
| * <li>Ordering. Tasks are run in the same order as they are submitted via {@link #execute} |
| * and {@link #executeLater}.</li> |
| * <li>Serialization. Tasks are run in sequence and establish a happens-before relationship |
| * between them. </li> |
| * <li>Non-reentrancy. If a task running in a synchronization context executes or schedules |
| * another task in the same synchronization context, the latter task will never run |
| * inline. It will instead be queued and run only after the current task has returned.</li> |
| * </ul> |
| * |
| * <p>It doesn't own any thread. Tasks are run from caller's or caller-provided threads. |
| * |
| * <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper |
| * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running |
| * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have |
| * completed before returning from {@code execute()}. However, the behavior does diverge if locks |
| * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization |
| * context except via {@link #executeLater}. |
| * |
| * <p>This class is thread-safe. |
| * |
| * @since 1.17.0 |
| */ |
| @ThreadSafe |
| public final class SynchronizationContext implements Executor { |
| private final UncaughtExceptionHandler uncaughtExceptionHandler; |
| |
| private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); |
| private final AtomicReference<Thread> drainingThread = new AtomicReference<>(); |
| |
| /** |
| * Creates a SynchronizationContext. |
| * |
| * @param uncaughtExceptionHandler handles exceptions thrown out of the tasks. Different from |
| * what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is |
| * not terminated when the handler is called. |
| */ |
| public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) { |
| this.uncaughtExceptionHandler = |
| checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler"); |
| } |
| |
| /** |
| * Run all tasks in the queue in the current thread, if no other thread is running this method. |
| * Otherwise do nothing. |
| * |
| * <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it |
| * have been or will eventually be run, while not requiring any more calls to {@code drain()}. |
| */ |
| public final void drain() { |
| do { |
| if (!drainingThread.compareAndSet(null, Thread.currentThread())) { |
| return; |
| } |
| try { |
| Runnable runnable; |
| while ((runnable = queue.poll()) != null) { |
| try { |
| runnable.run(); |
| } catch (Throwable t) { |
| uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t); |
| } |
| } |
| } finally { |
| drainingThread.set(null); |
| } |
| // must check queue again here to catch any added prior to clearing drainingThread |
| } while (!queue.isEmpty()); |
| } |
| |
| /** |
| * Adds a task that will be run when {@link #drain} is called. |
| * |
| * <p>This is useful for cases where you want to enqueue a task while under a lock of your own, |
| * but don't want the tasks to be run under your lock (for fear of deadlock). You can call {@link |
| * #executeLater} in the lock, and call {@link #drain} outside the lock. |
| */ |
| public final void executeLater(Runnable runnable) { |
| queue.add(checkNotNull(runnable, "runnable is null")); |
| } |
| |
| /** |
| * Adds a task and run it in this synchronization context as soon as possible. The task may run |
| * inline. If there are tasks that are previously queued by {@link #executeLater} but have not |
| * been run, this method will trigger them to be run before the given task. This is equivalent to |
| * calling {@link #executeLater} immediately followed by {@link #drain}. |
| */ |
| @Override |
| public final void execute(Runnable task) { |
| executeLater(task); |
| drain(); |
| } |
| |
| /** |
| * Throw {@link IllegalStateException} if this method is not called from this synchronization |
| * context. |
| */ |
| public void throwIfNotInThisSynchronizationContext() { |
| checkState(Thread.currentThread() == drainingThread.get(), |
| "Not called from the SynchronizationContext"); |
| } |
| |
| /** |
| * Schedules a task to be added and run via {@link #execute} after a delay. |
| * |
| * @param task the task being scheduled |
| * @param delay the delay |
| * @param unit the time unit for the delay |
| * @param timerService the {@code ScheduledExecutorService} that provides delayed execution |
| * |
| * @return an object for checking the status and/or cancel the scheduled task |
| */ |
| public final ScheduledHandle schedule( |
| final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) { |
| final ManagedRunnable runnable = new ManagedRunnable(task); |
| ScheduledFuture<?> future = timerService.schedule(new Runnable() { |
| @Override |
| public void run() { |
| execute(runnable); |
| } |
| |
| @Override |
| public String toString() { |
| return task.toString() + "(scheduled in SynchronizationContext)"; |
| } |
| }, delay, unit); |
| return new ScheduledHandle(runnable, future); |
| } |
| |
| /** |
| * Schedules a task to be added and run via {@link #execute} after an inital delay and then |
| * repeated after the delay until cancelled. |
| * |
| * @param task the task being scheduled |
| * @param initialDelay the delay before the first run |
| * @param delay the delay after the first run. |
| * @param unit the time unit for the delay |
| * @param timerService the {@code ScheduledExecutorService} that provides delayed execution |
| * |
| * @return an object for checking the status and/or cancel the scheduled task |
| */ |
| public final ScheduledHandle scheduleWithFixedDelay( |
| final Runnable task, long initialDelay, long delay, TimeUnit unit, |
| ScheduledExecutorService timerService) { |
| final ManagedRunnable runnable = new ManagedRunnable(task); |
| ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() { |
| @Override |
| public void run() { |
| execute(runnable); |
| } |
| |
| @Override |
| public String toString() { |
| return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay |
| + ")"; |
| } |
| }, initialDelay, delay, unit); |
| return new ScheduledHandle(runnable, future); |
| } |
| |
| |
| private static class ManagedRunnable implements Runnable { |
| final Runnable task; |
| boolean isCancelled; |
| boolean hasStarted; |
| |
| ManagedRunnable(Runnable task) { |
| this.task = checkNotNull(task, "task"); |
| } |
| |
| @Override |
| public void run() { |
| // The task may have been cancelled after timerService calls SynchronizationContext.execute() |
| // but before the runnable is actually run. We must guarantee that the task will not be run |
| // in this case. |
| if (!isCancelled) { |
| hasStarted = true; |
| task.run(); |
| } |
| } |
| } |
| |
| /** |
| * Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}. |
| * |
| * <p>This class is NOT thread-safe. All methods must be run from the same {@link |
| * SynchronizationContext} as which the task was scheduled in. |
| */ |
| public static final class ScheduledHandle { |
| private final ManagedRunnable runnable; |
| private final ScheduledFuture<?> future; |
| |
| private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) { |
| this.runnable = checkNotNull(runnable, "runnable"); |
| this.future = checkNotNull(future, "future"); |
| } |
| |
| /** |
| * Cancel the task if it's still {@link #isPending pending}. |
| */ |
| public void cancel() { |
| runnable.isCancelled = true; |
| future.cancel(false); |
| } |
| |
| /** |
| * Returns {@code true} if the task will eventually run, meaning that it has neither started |
| * running nor been cancelled. |
| */ |
| public boolean isPending() { |
| return !(runnable.hasStarted || runnable.isCancelled); |
| } |
| } |
| } |