Healthcheck: proper job allocation and test.

Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest

Change-Id: Iede1f2297cc4f8e3c3f0acd43cee597f75dff179
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 2fcb005..885f4d2 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -268,22 +268,14 @@
         mAppOpsManager(sm.getAppOpsManager()),
         mJni(sm.getJni()),
         mLooper(sm.getLooper()),
+        mTimedQueue(sm.getTimedQueue()),
         mIncrementalDir(rootDir) {
-    if (!mVold) {
-        LOG(FATAL) << "Vold service is unavailable";
-    }
-    if (!mDataLoaderManager) {
-        LOG(FATAL) << "DataLoaderManagerService is unavailable";
-    }
-    if (!mAppOpsManager) {
-        LOG(FATAL) << "AppOpsManager is unavailable";
-    }
-    if (!mJni) {
-        LOG(FATAL) << "JNI is unavailable";
-    }
-    if (!mLooper) {
-        LOG(FATAL) << "Looper is unavailable";
-    }
+    CHECK(mVold) << "Vold service is unavailable";
+    CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
+    CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
+    CHECK(mJni) << "JNI is unavailable";
+    CHECK(mLooper) << "Looper is unavailable";
+    CHECK(mTimedQueue) << "TimedQueue is unavailable";
 
     mJobQueue.reserve(16);
     mJobProcessor = std::thread([this]() {
@@ -294,10 +286,6 @@
         mJni->initializeForCurrentThread();
         runCmdLooper();
     });
-    mTimerThread = std::thread([this]() {
-        mJni->initializeForCurrentThread();
-        runTimers();
-    });
 
     const auto mountedRootNames = adoptMountedInstances();
     mountExistingImages(mountedRootNames);
@@ -310,10 +298,8 @@
     }
     mJobCondition.notify_all();
     mJobProcessor.join();
-    mTimerCondition.notify_all();
-    mTimerThread.join();
     mCmdLooperThread.join();
-    mTimedJobs.clear();
+    mTimedQueue->stop();
     // Ensure that mounts are destroyed while the service is still valid.
     mBindsByPath.clear();
     mMounts.clear();
@@ -1710,53 +1696,18 @@
     }
 }
 
-void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
     if (id == kInvalidStorageId) {
         return;
     }
-    {
-        std::unique_lock lock(mTimerMutex);
-        mTimedJobs.insert(TimedJob{id, when, std::move(what)});
-    }
-    mTimerCondition.notify_all();
+    mTimedQueue->addJob(id, after, std::move(what));
 }
 
 void IncrementalService::removeTimedJobs(MountId id) {
     if (id == kInvalidStorageId) {
         return;
     }
-    {
-        std::unique_lock lock(mTimerMutex);
-        std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
-    }
-}
-
-void IncrementalService::runTimers() {
-    static constexpr TimePoint kInfinityTs{Clock::duration::max()};
-    TimePoint nextTaskTs = kInfinityTs;
-    for (;;) {
-        std::unique_lock lock(mTimerMutex);
-        mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
-            auto now = Clock::now();
-            return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
-        });
-        if (!mRunning) {
-            return;
-        }
-
-        auto now = Clock::now();
-        auto it = mTimedJobs.begin();
-        // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
-        for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
-            auto job = it->what;
-            mTimedJobs.erase(it);
-
-            lock.unlock();
-            job();
-            lock.lock();
-        }
-        nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
-    }
+    mTimedQueue->removeJobs(id);
 }
 
 IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -2029,8 +1980,8 @@
             mHealthBase = {now, kernelTsUs};
         }
 
-        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
-            mHealthBase.kernelTsUs > kernelTsUs) {
+        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
+            mHealthBase.userTs > now) {
             LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
             registerForPendingReads();
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
@@ -2056,6 +2007,9 @@
             return;
         }
 
+        // Don't schedule timer job less than 500ms in advance.
+        static constexpr auto kTolerance = 500ms;
+
         const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
         const auto unhealthyTimeout =
                 std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
@@ -2065,31 +2019,28 @@
 
         const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
         const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
-        const auto delta = now - userTs;
+        const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);
 
-        TimePoint whenToCheckBack;
-        if (delta < blockedTimeout) {
+        Milliseconds checkBackAfter;
+        if (delta + kTolerance < blockedTimeout) {
             LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
-            whenToCheckBack = userTs + blockedTimeout;
+            checkBackAfter = blockedTimeout - delta;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
-        } else if (delta < unhealthyTimeout) {
+        } else if (delta + kTolerance < unhealthyTimeout) {
             LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
-            whenToCheckBack = userTs + unhealthyTimeout;
+            checkBackAfter = unhealthyTimeout - delta;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
         } else {
             LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
-            whenToCheckBack = now + unhealthyMonitoring;
+            checkBackAfter = unhealthyMonitoring;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
         }
-        LOG(DEBUG) << id() << ": updateHealthStatus in "
-                   << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
-                                                                                   now)
-                                     .count()) /
-                        1000.0
+        LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
                    << "secs";
-        mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
+        mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
     }
 
+    // With kTolerance we are expecting these to execute before the next update.
     if (healthStatusToReport != -1) {
         onHealthStatus(healthListener, healthStatusToReport);
     }
@@ -2178,6 +2129,16 @@
     dprintf(fd, "      targetStatus: %d\n", mTargetStatus);
     dprintf(fd, "      targetStatusTs: %lldmcs\n",
             (long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+    dprintf(fd, "      health: {\n");
+    dprintf(fd, "        path: %s\n", mHealthPath.c_str());
+    dprintf(fd, "        base: %lldmcs (%lld)\n",
+            (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
+            (long long)mHealthBase.kernelTsUs);
+    dprintf(fd, "        blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
+    dprintf(fd, "        unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
+    dprintf(fd, "        unhealthyMonitoringMs: %d\n",
+            int(mHealthCheckParams.unhealthyMonitoringMs));
+    dprintf(fd, "      }\n");
     const auto& params = mParams;
     dprintf(fd, "      dataLoaderParams: {\n");
     dprintf(fd, "        type: %s\n", toString(params.type).c_str());