| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/message_loop/incoming_task_queue.h" |
| |
| #include <limits> |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/callback_helpers.h" |
| #include "base/location.h" |
| #include "base/metrics/histogram_macros.h" |
| #include "base/synchronization/waitable_event.h" |
| #include "base/time/time.h" |
| #include "build/build_config.h" |
| |
| namespace base { |
| namespace internal { |
| |
| namespace { |
| |
| #if DCHECK_IS_ON() |
| // Delays larger than this are often bogus, and a warning should be emitted in |
| // debug builds to warn developers. http://crbug.com/450045 |
| constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14); |
| #endif |
| |
| TimeTicks CalculateDelayedRuntime(TimeDelta delay) { |
| TimeTicks delayed_run_time; |
| if (delay > TimeDelta()) |
| delayed_run_time = TimeTicks::Now() + delay; |
| else |
| DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative"; |
| return delayed_run_time; |
| } |
| |
| } // namespace |
| |
| IncomingTaskQueue::IncomingTaskQueue( |
| std::unique_ptr<Observer> task_queue_observer) |
| : task_queue_observer_(std::move(task_queue_observer)), |
| triage_tasks_(this) { |
| // The constructing sequence is not necessarily the running sequence, e.g. in |
| // the case of a MessageLoop created unbound. |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| } |
| |
| IncomingTaskQueue::~IncomingTaskQueue() = default; |
| |
| bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here, |
| OnceClosure task, |
| TimeDelta delay, |
| Nestable nestable) { |
| // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 |
| // for details. |
| CHECK(task); |
| DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold) |
| << "Requesting super-long task delay period of " << delay.InSeconds() |
| << " seconds from here: " << from_here.ToString(); |
| |
| PendingTask pending_task(from_here, std::move(task), |
| CalculateDelayedRuntime(delay), nestable); |
| #if defined(OS_WIN) |
| // We consider the task needs a high resolution timer if the delay is |
| // more than 0 and less than 32ms. This caps the relative error to |
| // less than 50% : a 33ms wait can wake at 48ms since the default |
| // resolution on Windows is between 10 and 15ms. |
| if (delay > TimeDelta() && |
| delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) { |
| pending_task.is_high_res = true; |
| } |
| #endif |
| |
| if (!delay.is_zero()) |
| UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay); |
| |
| return PostPendingTask(&pending_task); |
| } |
| |
| void IncomingTaskQueue::Shutdown() { |
| AutoLock auto_lock(incoming_queue_lock_); |
| accept_new_tasks_ = false; |
| } |
| |
| void IncomingTaskQueue::ReportMetricsOnIdle() const { |
| UMA_HISTOGRAM_COUNTS_1M( |
| "MessageLoop.DelayedTaskQueue.PendingTasksCountOnIdle", |
| delayed_tasks_.Size()); |
| } |
| |
| IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer) |
| : outer_(outer) {} |
| |
| IncomingTaskQueue::TriageQueue::~TriageQueue() = default; |
| |
| const PendingTask& IncomingTaskQueue::TriageQueue::Peek() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); |
| ReloadFromIncomingQueueIfEmpty(); |
| DCHECK(!queue_.empty()); |
| return queue_.front(); |
| } |
| |
| PendingTask IncomingTaskQueue::TriageQueue::Pop() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); |
| ReloadFromIncomingQueueIfEmpty(); |
| DCHECK(!queue_.empty()); |
| PendingTask pending_task = std::move(queue_.front()); |
| queue_.pop(); |
| return pending_task; |
| } |
| |
| bool IncomingTaskQueue::TriageQueue::HasTasks() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); |
| ReloadFromIncomingQueueIfEmpty(); |
| return !queue_.empty(); |
| } |
| |
| void IncomingTaskQueue::TriageQueue::Clear() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); |
| |
| // Clear() should be invoked before WillDestroyCurrentMessageLoop(). |
| DCHECK(outer_->accept_new_tasks_); |
| |
| // Delete all currently pending tasks but not tasks potentially posted from |
| // their destructors. See ~MessageLoop() for the full logic mitigating against |
| // infite loops when clearing pending tasks. The ScopedClosureRunner below |
| // will be bound to a task posted at the end of the queue. After it is posted, |
| // tasks will be deleted one by one, when the bound ScopedClosureRunner is |
| // deleted and sets |deleted_all_originally_pending|, we know we've deleted |
| // all originally pending tasks. |
| bool deleted_all_originally_pending = false; |
| ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce( |
| [](bool* deleted_all_originally_pending) { |
| *deleted_all_originally_pending = true; |
| }, |
| Unretained(&deleted_all_originally_pending))); |
| outer_->AddToIncomingQueue( |
| FROM_HERE, |
| BindOnce([](ScopedClosureRunner) {}, |
| std::move(capture_deleted_all_originally_pending)), |
| TimeDelta(), Nestable::kNestable); |
| |
| while (!deleted_all_originally_pending) { |
| PendingTask pending_task = Pop(); |
| |
| if (!pending_task.delayed_run_time.is_null()) { |
| outer_->delayed_tasks().Push(std::move(pending_task)); |
| } |
| } |
| } |
| |
| void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_); |
| if (queue_.empty()) { |
| outer_->ReloadWorkQueue(&queue_); |
| } |
| } |
| |
| IncomingTaskQueue::DelayedQueue::DelayedQueue() { |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| } |
| |
| IncomingTaskQueue::DelayedQueue::~DelayedQueue() = default; |
| |
| void IncomingTaskQueue::DelayedQueue::Push(PendingTask pending_task) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| if (pending_task.is_high_res) |
| ++pending_high_res_tasks_; |
| |
| queue_.push(std::move(pending_task)); |
| } |
| |
| const PendingTask& IncomingTaskQueue::DelayedQueue::Peek() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!queue_.empty()); |
| return queue_.top(); |
| } |
| |
| PendingTask IncomingTaskQueue::DelayedQueue::Pop() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!queue_.empty()); |
| PendingTask delayed_task = std::move(const_cast<PendingTask&>(queue_.top())); |
| queue_.pop(); |
| |
| if (delayed_task.is_high_res) |
| --pending_high_res_tasks_; |
| DCHECK_GE(pending_high_res_tasks_, 0); |
| |
| return delayed_task; |
| } |
| |
| bool IncomingTaskQueue::DelayedQueue::HasTasks() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| // TODO(robliao): The other queues don't check for IsCancelled(). Should they? |
| while (!queue_.empty() && Peek().task.IsCancelled()) |
| Pop(); |
| |
| return !queue_.empty(); |
| } |
| |
| void IncomingTaskQueue::DelayedQueue::Clear() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| while (!queue_.empty()) |
| Pop(); |
| } |
| |
| size_t IncomingTaskQueue::DelayedQueue::Size() const { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return queue_.size(); |
| } |
| |
| IncomingTaskQueue::DeferredQueue::DeferredQueue() { |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| } |
| |
| IncomingTaskQueue::DeferredQueue::~DeferredQueue() = default; |
| |
| void IncomingTaskQueue::DeferredQueue::Push(PendingTask pending_task) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| queue_.push(std::move(pending_task)); |
| } |
| |
| const PendingTask& IncomingTaskQueue::DeferredQueue::Peek() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!queue_.empty()); |
| return queue_.front(); |
| } |
| |
| PendingTask IncomingTaskQueue::DeferredQueue::Pop() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| DCHECK(!queue_.empty()); |
| PendingTask deferred_task = std::move(queue_.front()); |
| queue_.pop(); |
| return deferred_task; |
| } |
| |
| bool IncomingTaskQueue::DeferredQueue::HasTasks() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return !queue_.empty(); |
| } |
| |
| void IncomingTaskQueue::DeferredQueue::Clear() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| while (!queue_.empty()) |
| Pop(); |
| } |
| |
| bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) { |
| // Warning: Don't try to short-circuit, and handle this thread's tasks more |
| // directly, as it could starve handling of foreign threads. Put every task |
| // into this queue. |
| bool accept_new_tasks; |
| bool was_empty = false; |
| { |
| AutoLock auto_lock(incoming_queue_lock_); |
| accept_new_tasks = accept_new_tasks_; |
| if (accept_new_tasks) { |
| was_empty = |
| PostPendingTaskLockRequired(pending_task) && triage_queue_empty_; |
| } |
| } |
| |
| if (!accept_new_tasks) { |
| // Clear the pending task outside of |incoming_queue_lock_| to prevent any |
| // chance of self-deadlock if destroying a task also posts a task to this |
| // queue. |
| pending_task->task.Reset(); |
| return false; |
| } |
| |
| // Let |task_queue_observer_| know of the queued task. This is done outside |
| // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also |
| // use a lock). |
| task_queue_observer_->DidQueueTask(was_empty); |
| |
| return true; |
| } |
| |
| bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) { |
| incoming_queue_lock_.AssertAcquired(); |
| |
| // Initialize the sequence number. The sequence number is used for delayed |
| // tasks (to facilitate FIFO sorting when two tasks have the same |
| // delayed_run_time value) and for identifying the task in about:tracing. |
| pending_task->sequence_num = next_sequence_num_++; |
| |
| task_queue_observer_->WillQueueTask(pending_task); |
| |
| bool was_empty = incoming_queue_.empty(); |
| incoming_queue_.push(std::move(*pending_task)); |
| return was_empty; |
| } |
| |
| void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| |
| // Make sure no tasks are lost. |
| DCHECK(work_queue->empty()); |
| |
| // Acquire all we can from the inter-thread queue with one lock acquisition. |
| AutoLock lock(incoming_queue_lock_); |
| incoming_queue_.swap(*work_queue); |
| triage_queue_empty_ = work_queue->empty(); |
| } |
| |
| } // namespace internal |
| } // namespace base |