Healthcheck: proper job allocation and test.
Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest
Change-Id: Iede1f2297cc4f8e3c3f0acd43cee597f75dff179
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 2fcb005..885f4d2 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -268,22 +268,14 @@
mAppOpsManager(sm.getAppOpsManager()),
mJni(sm.getJni()),
mLooper(sm.getLooper()),
+ mTimedQueue(sm.getTimedQueue()),
mIncrementalDir(rootDir) {
- if (!mVold) {
- LOG(FATAL) << "Vold service is unavailable";
- }
- if (!mDataLoaderManager) {
- LOG(FATAL) << "DataLoaderManagerService is unavailable";
- }
- if (!mAppOpsManager) {
- LOG(FATAL) << "AppOpsManager is unavailable";
- }
- if (!mJni) {
- LOG(FATAL) << "JNI is unavailable";
- }
- if (!mLooper) {
- LOG(FATAL) << "Looper is unavailable";
- }
+ CHECK(mVold) << "Vold service is unavailable";
+ CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
+ CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
+ CHECK(mJni) << "JNI is unavailable";
+ CHECK(mLooper) << "Looper is unavailable";
+ CHECK(mTimedQueue) << "TimedQueue is unavailable";
mJobQueue.reserve(16);
mJobProcessor = std::thread([this]() {
@@ -294,10 +286,6 @@
mJni->initializeForCurrentThread();
runCmdLooper();
});
- mTimerThread = std::thread([this]() {
- mJni->initializeForCurrentThread();
- runTimers();
- });
const auto mountedRootNames = adoptMountedInstances();
mountExistingImages(mountedRootNames);
@@ -310,10 +298,8 @@
}
mJobCondition.notify_all();
mJobProcessor.join();
- mTimerCondition.notify_all();
- mTimerThread.join();
mCmdLooperThread.join();
- mTimedJobs.clear();
+ mTimedQueue->stop();
// Ensure that mounts are destroyed while the service is still valid.
mBindsByPath.clear();
mMounts.clear();
@@ -1710,53 +1696,18 @@
}
}
-void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
if (id == kInvalidStorageId) {
return;
}
- {
- std::unique_lock lock(mTimerMutex);
- mTimedJobs.insert(TimedJob{id, when, std::move(what)});
- }
- mTimerCondition.notify_all();
+ mTimedQueue->addJob(id, after, std::move(what));
}
void IncrementalService::removeTimedJobs(MountId id) {
if (id == kInvalidStorageId) {
return;
}
- {
- std::unique_lock lock(mTimerMutex);
- std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
- }
-}
-
-void IncrementalService::runTimers() {
- static constexpr TimePoint kInfinityTs{Clock::duration::max()};
- TimePoint nextTaskTs = kInfinityTs;
- for (;;) {
- std::unique_lock lock(mTimerMutex);
- mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
- auto now = Clock::now();
- return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
- });
- if (!mRunning) {
- return;
- }
-
- auto now = Clock::now();
- auto it = mTimedJobs.begin();
- // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
- for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
- auto job = it->what;
- mTimedJobs.erase(it);
-
- lock.unlock();
- job();
- lock.lock();
- }
- nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
- }
+ mTimedQueue->removeJobs(id);
}
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -2029,8 +1980,8 @@
mHealthBase = {now, kernelTsUs};
}
- if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
- mHealthBase.kernelTsUs > kernelTsUs) {
+ if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
+ mHealthBase.userTs > now) {
LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
registerForPendingReads();
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
@@ -2056,6 +2007,9 @@
return;
}
+ // Don't schedule timer job less than 500ms in advance.
+ static constexpr auto kTolerance = 500ms;
+
const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
const auto unhealthyTimeout =
std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
@@ -2065,31 +2019,28 @@
const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
- const auto delta = now - userTs;
+ const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);
- TimePoint whenToCheckBack;
- if (delta < blockedTimeout) {
+ Milliseconds checkBackAfter;
+ if (delta + kTolerance < blockedTimeout) {
LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
- whenToCheckBack = userTs + blockedTimeout;
+ checkBackAfter = blockedTimeout - delta;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
- } else if (delta < unhealthyTimeout) {
+ } else if (delta + kTolerance < unhealthyTimeout) {
LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
- whenToCheckBack = userTs + unhealthyTimeout;
+ checkBackAfter = unhealthyTimeout - delta;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
} else {
LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
- whenToCheckBack = now + unhealthyMonitoring;
+ checkBackAfter = unhealthyMonitoring;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
}
- LOG(DEBUG) << id() << ": updateHealthStatus in "
- << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
- now)
- .count()) /
- 1000.0
+ LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
<< "secs";
- mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
+ mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
}
+ // With kTolerance we are expecting these to execute before the next update.
if (healthStatusToReport != -1) {
onHealthStatus(healthListener, healthStatusToReport);
}
@@ -2178,6 +2129,16 @@
dprintf(fd, " targetStatus: %d\n", mTargetStatus);
dprintf(fd, " targetStatusTs: %lldmcs\n",
(long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+ dprintf(fd, " health: {\n");
+ dprintf(fd, " path: %s\n", mHealthPath.c_str());
+ dprintf(fd, " base: %lldmcs (%lld)\n",
+ (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
+ (long long)mHealthBase.kernelTsUs);
+ dprintf(fd, " blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
+ dprintf(fd, " unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
+ dprintf(fd, " unhealthyMonitoringMs: %d\n",
+ int(mHealthCheckParams.unhealthyMonitoringMs));
+ dprintf(fd, " }\n");
const auto& params = mParams;
dprintf(fd, " dataLoaderParams: {\n");
dprintf(fd, " type: %s\n", toString(params.type).c_str());