Healthcheck: proper job allocation and test.

Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest

Change-Id: Iede1f2297cc4f8e3c3f0acd43cee597f75dff179
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 2fcb005..885f4d2 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -268,22 +268,14 @@
         mAppOpsManager(sm.getAppOpsManager()),
         mJni(sm.getJni()),
         mLooper(sm.getLooper()),
+        mTimedQueue(sm.getTimedQueue()),
         mIncrementalDir(rootDir) {
-    if (!mVold) {
-        LOG(FATAL) << "Vold service is unavailable";
-    }
-    if (!mDataLoaderManager) {
-        LOG(FATAL) << "DataLoaderManagerService is unavailable";
-    }
-    if (!mAppOpsManager) {
-        LOG(FATAL) << "AppOpsManager is unavailable";
-    }
-    if (!mJni) {
-        LOG(FATAL) << "JNI is unavailable";
-    }
-    if (!mLooper) {
-        LOG(FATAL) << "Looper is unavailable";
-    }
+    CHECK(mVold) << "Vold service is unavailable";
+    CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
+    CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
+    CHECK(mJni) << "JNI is unavailable";
+    CHECK(mLooper) << "Looper is unavailable";
+    CHECK(mTimedQueue) << "TimedQueue is unavailable";
 
     mJobQueue.reserve(16);
     mJobProcessor = std::thread([this]() {
@@ -294,10 +286,6 @@
         mJni->initializeForCurrentThread();
         runCmdLooper();
     });
-    mTimerThread = std::thread([this]() {
-        mJni->initializeForCurrentThread();
-        runTimers();
-    });
 
     const auto mountedRootNames = adoptMountedInstances();
     mountExistingImages(mountedRootNames);
@@ -310,10 +298,8 @@
     }
     mJobCondition.notify_all();
     mJobProcessor.join();
-    mTimerCondition.notify_all();
-    mTimerThread.join();
     mCmdLooperThread.join();
-    mTimedJobs.clear();
+    mTimedQueue->stop();
     // Ensure that mounts are destroyed while the service is still valid.
     mBindsByPath.clear();
     mMounts.clear();
@@ -1710,53 +1696,18 @@
     }
 }
 
-void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
     if (id == kInvalidStorageId) {
         return;
     }
-    {
-        std::unique_lock lock(mTimerMutex);
-        mTimedJobs.insert(TimedJob{id, when, std::move(what)});
-    }
-    mTimerCondition.notify_all();
+    mTimedQueue->addJob(id, after, std::move(what));
 }
 
 void IncrementalService::removeTimedJobs(MountId id) {
     if (id == kInvalidStorageId) {
         return;
     }
-    {
-        std::unique_lock lock(mTimerMutex);
-        std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
-    }
-}
-
-void IncrementalService::runTimers() {
-    static constexpr TimePoint kInfinityTs{Clock::duration::max()};
-    TimePoint nextTaskTs = kInfinityTs;
-    for (;;) {
-        std::unique_lock lock(mTimerMutex);
-        mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
-            auto now = Clock::now();
-            return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
-        });
-        if (!mRunning) {
-            return;
-        }
-
-        auto now = Clock::now();
-        auto it = mTimedJobs.begin();
-        // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
-        for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
-            auto job = it->what;
-            mTimedJobs.erase(it);
-
-            lock.unlock();
-            job();
-            lock.lock();
-        }
-        nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
-    }
+    mTimedQueue->removeJobs(id);
 }
 
 IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -2029,8 +1980,8 @@
             mHealthBase = {now, kernelTsUs};
         }
 
-        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
-            mHealthBase.kernelTsUs > kernelTsUs) {
+        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
+            mHealthBase.userTs > now) {
             LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
             registerForPendingReads();
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
@@ -2056,6 +2007,9 @@
             return;
         }
 
+        // Don't schedule timer job less than 500ms in advance.
+        static constexpr auto kTolerance = 500ms;
+
         const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
         const auto unhealthyTimeout =
                 std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
@@ -2065,31 +2019,28 @@
 
         const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
         const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
-        const auto delta = now - userTs;
+        const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);
 
-        TimePoint whenToCheckBack;
-        if (delta < blockedTimeout) {
+        Milliseconds checkBackAfter;
+        if (delta + kTolerance < blockedTimeout) {
             LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
-            whenToCheckBack = userTs + blockedTimeout;
+            checkBackAfter = blockedTimeout - delta;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
-        } else if (delta < unhealthyTimeout) {
+        } else if (delta + kTolerance < unhealthyTimeout) {
             LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
-            whenToCheckBack = userTs + unhealthyTimeout;
+            checkBackAfter = unhealthyTimeout - delta;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
         } else {
             LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
-            whenToCheckBack = now + unhealthyMonitoring;
+            checkBackAfter = unhealthyMonitoring;
             healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
         }
-        LOG(DEBUG) << id() << ": updateHealthStatus in "
-                   << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
-                                                                                   now)
-                                     .count()) /
-                        1000.0
+        LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
                    << "secs";
-        mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
+        mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
     }
 
+    // With kTolerance we are expecting these to execute before the next update.
     if (healthStatusToReport != -1) {
         onHealthStatus(healthListener, healthStatusToReport);
     }
@@ -2178,6 +2129,16 @@
     dprintf(fd, "      targetStatus: %d\n", mTargetStatus);
     dprintf(fd, "      targetStatusTs: %lldmcs\n",
             (long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+    dprintf(fd, "      health: {\n");
+    dprintf(fd, "        path: %s\n", mHealthPath.c_str());
+    dprintf(fd, "        base: %lldmcs (%lld)\n",
+            (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
+            (long long)mHealthBase.kernelTsUs);
+    dprintf(fd, "        blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
+    dprintf(fd, "        unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
+    dprintf(fd, "        unhealthyMonitoringMs: %d\n",
+            int(mHealthCheckParams.unhealthyMonitoringMs));
+    dprintf(fd, "      }\n");
     const auto& params = mParams;
     dprintf(fd, "      dataLoaderParams: {\n");
     dprintf(fd, "        type: %s\n", toString(params.type).c_str());
diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h
index 57e4669..918531b 100644
--- a/services/incremental/IncrementalService.h
+++ b/services/incremental/IncrementalService.h
@@ -56,8 +56,6 @@
 using FileId = incfs::FileId;
 using BlockIndex = incfs::BlockIndex;
 using RawMetadata = incfs::RawMetadata;
-using Clock = std::chrono::steady_clock;
-using TimePoint = std::chrono::time_point<Clock>;
 using Seconds = std::chrono::seconds;
 using BootClockTsUs = uint64_t;
 
@@ -338,8 +336,6 @@
     bool unregisterAppOpsCallback(const std::string& packageName);
     void onAppOpChanged(const std::string& packageName);
 
-    using Job = std::function<void()>;
-
     void runJobProcessing();
     void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
                         const incfs::FileId& libFileId, std::string_view targetLibPath,
@@ -347,9 +343,8 @@
 
     void runCmdLooper();
 
-    void addTimedJob(MountId id, TimePoint when, Job what);
+    void addTimedJob(MountId id, Milliseconds after, Job what);
     void removeTimedJobs(MountId id);
-    void runTimers();
 
 private:
     const std::unique_ptr<VoldServiceWrapper> mVold;
@@ -358,6 +353,7 @@
     const std::unique_ptr<AppOpsManagerWrapper> mAppOpsManager;
     const std::unique_ptr<JniWrapper> mJni;
     const std::unique_ptr<LooperWrapper> mLooper;
+    const std::unique_ptr<TimedQueueWrapper> mTimedQueue;
     const std::string mIncrementalDir;
 
     mutable std::mutex mLock;
@@ -380,19 +376,6 @@
     std::thread mJobProcessor;
 
     std::thread mCmdLooperThread;
-
-    struct TimedJob {
-        MountId id;
-        TimePoint when;
-        Job what;
-        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
-            return lhs.when < rhs.when;
-        }
-    };
-    std::set<TimedJob> mTimedJobs;
-    std::condition_variable mTimerCondition;
-    std::mutex mTimerMutex;
-    std::thread mTimerThread;
 };
 
 } // namespace android::incremental
diff --git a/services/incremental/ServiceWrappers.cpp b/services/incremental/ServiceWrappers.cpp
index a76aa62..99a35ad 100644
--- a/services/incremental/ServiceWrappers.cpp
+++ b/services/incremental/ServiceWrappers.cpp
@@ -25,6 +25,8 @@
 #include <binder/AppOpsManager.h>
 #include <utils/String16.h>
 
+#include <thread>
+
 #include "IncrementalServiceValidation.h"
 
 using namespace std::literals;
@@ -181,6 +183,88 @@
     }
 };
 
+static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
+
+class RealTimedQueueWrapper : public TimedQueueWrapper {
+public:
+    RealTimedQueueWrapper(JavaVM* jvm) {
+        mThread = std::thread([this, jvm]() {
+            (void)getOrAttachJniEnv(jvm);
+            runTimers();
+        });
+    }
+    ~RealTimedQueueWrapper() final {
+        CHECK(!mRunning) << "call stop first";
+        CHECK(!mThread.joinable()) << "call stop first";
+    }
+
+    void addJob(MountId id, Milliseconds after, Job what) final {
+        const auto now = Clock::now();
+        {
+            std::unique_lock lock(mMutex);
+            mJobs.insert(TimedJob{id, now + after, std::move(what)});
+        }
+        mCondition.notify_all();
+    }
+    void removeJobs(MountId id) final {
+        std::unique_lock lock(mMutex);
+        std::erase_if(mJobs, [id](auto&& item) { return item.id == id; });
+    }
+    void stop() final {
+        {
+            std::unique_lock lock(mMutex);
+            mRunning = false;
+        }
+        mCondition.notify_all();
+        mThread.join();
+        mJobs.clear();
+    }
+
+private:
+    void runTimers() {
+        static constexpr TimePoint kInfinityTs{Clock::duration::max()};
+        TimePoint nextJobTs = kInfinityTs;
+        std::unique_lock lock(mMutex);
+        for (;;) {
+            mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
+                const auto now = Clock::now();
+                const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
+                return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
+            });
+            if (!mRunning) {
+                return;
+            }
+
+            const auto now = Clock::now();
+            auto it = mJobs.begin();
+            // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
+            for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
+                auto job = std::move(it->what);
+                mJobs.erase(it);
+
+                lock.unlock();
+                job();
+                lock.lock();
+            }
+            nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
+        }
+    }
+
+    struct TimedJob {
+        MountId id;
+        TimePoint when;
+        Job what;
+        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
+            return lhs.when < rhs.when;
+        }
+    };
+    bool mRunning = true;
+    std::set<TimedJob> mJobs;
+    std::condition_variable mCondition;
+    std::mutex mMutex;
+    std::thread mThread;
+};
+
 RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env)
       : mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {}
 
@@ -228,6 +312,10 @@
     return std::make_unique<RealLooperWrapper>();
 }
 
+std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() {
+    return std::make_unique<RealTimedQueueWrapper>(mJvm);
+}
+
 static JavaVM* getJavaVm(JNIEnv* env) {
     CHECK(env);
     JavaVM* jvm = nullptr;
diff --git a/services/incremental/ServiceWrappers.h b/services/incremental/ServiceWrappers.h
index a935ab9..8cd726fd 100644
--- a/services/incremental/ServiceWrappers.h
+++ b/services/incremental/ServiceWrappers.h
@@ -35,6 +35,11 @@
 
 namespace android::incremental {
 
+using Clock = std::chrono::steady_clock;
+using TimePoint = std::chrono::time_point<Clock>;
+using Milliseconds = std::chrono::milliseconds;
+using Job = std::function<void()>;
+
 // --- Wrapper interfaces ---
 
 using MountId = int32_t;
@@ -121,6 +126,14 @@
     virtual int pollAll(int timeoutMillis) = 0;
 };
 
+class TimedQueueWrapper {
+public:
+    virtual ~TimedQueueWrapper() = default;
+    virtual void addJob(MountId id, Milliseconds after, Job what) = 0;
+    virtual void removeJobs(MountId id) = 0;
+    virtual void stop() = 0;
+};
+
 class ServiceManagerWrapper {
 public:
     virtual ~ServiceManagerWrapper() = default;
@@ -130,6 +143,7 @@
     virtual std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() = 0;
     virtual std::unique_ptr<JniWrapper> getJni() = 0;
     virtual std::unique_ptr<LooperWrapper> getLooper() = 0;
+    virtual std::unique_ptr<TimedQueueWrapper> getTimedQueue() = 0;
 };
 
 // --- Real stuff ---
@@ -144,6 +158,7 @@
     std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final;
     std::unique_ptr<JniWrapper> getJni() final;
     std::unique_ptr<LooperWrapper> getLooper() final;
+    std::unique_ptr<TimedQueueWrapper> getTimedQueue() final;
 
 private:
     template <class INTERFACE>
diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp
index 84ec7d3..26b5094 100644
--- a/services/incremental/test/IncrementalServiceTest.cpp
+++ b/services/incremental/test/IncrementalServiceTest.cpp
@@ -22,6 +22,7 @@
 #include <gtest/gtest.h>
 #include <utils/Log.h>
 
+#include <chrono>
 #include <future>
 
 #include "IncrementalService.h"
@@ -295,9 +296,21 @@
     void openMountSuccess() {
         ON_CALL(*this, openMount(_)).WillByDefault(Invoke(this, &MockIncFs::openMountForHealth));
     }
-    void waitForPendingReadsSuccess() {
+
+    // 1000ms
+    void waitForPendingReadsSuccess(uint64_t ts = 0) {
         ON_CALL(*this, waitForPendingReads(_, _, _))
-                .WillByDefault(Invoke(this, &MockIncFs::waitForPendingReadsForHealth));
+                .WillByDefault(
+                        Invoke([ts](const Control& control, std::chrono::milliseconds timeout,
+                                    std::vector<incfs::ReadInfo>* pendingReadsBuffer) {
+                            pendingReadsBuffer->push_back({.bootClockTsUs = ts});
+                            return android::incfs::WaitResult::HaveData;
+                        }));
+    }
+
+    void waitForPendingReadsTimeout() {
+        ON_CALL(*this, waitForPendingReads(_, _, _))
+                .WillByDefault(Return(android::incfs::WaitResult::Timeout));
     }
 
     static constexpr auto kPendingReadsFd = 42;
@@ -305,13 +318,6 @@
         return UniqueControl(IncFs_CreateControl(-1, kPendingReadsFd, -1));
     }
 
-    WaitResult waitForPendingReadsForHealth(
-            const Control& control, std::chrono::milliseconds timeout,
-            std::vector<incfs::ReadInfo>* pendingReadsBuffer) const {
-        pendingReadsBuffer->push_back({.bootClockTsUs = 0});
-        return android::incfs::WaitResult::HaveData;
-    }
-
     RawMetadata getMountInfoMetadata(const Control& control, std::string_view path) {
         metadata::Mount m;
         m.mutable_storage()->set_id(100);
@@ -371,7 +377,7 @@
 public:
     MOCK_CONST_METHOD0(initializeForCurrentThread, void());
 
-    MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
+    MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
 };
 
 class MockLooperWrapper : public LooperWrapper {
@@ -385,7 +391,7 @@
         ON_CALL(*this, addFd(_, _, _, _, _))
                 .WillByDefault(Invoke(this, &MockLooperWrapper::storeCallback));
         ON_CALL(*this, removeFd(_)).WillByDefault(Invoke(this, &MockLooperWrapper::clearCallback));
-        ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::sleepFor));
+        ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::wait10Ms));
     }
 
     int storeCallback(int, int, int, android::Looper_callbackFunc callback, void* data) {
@@ -400,8 +406,10 @@
         return 0;
     }
 
-    int sleepFor(int timeoutMillis) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(timeoutMillis));
+    int wait10Ms(int) {
+        // This is called from a loop in runCmdLooper.
+        // Sleeping for 10ms only to avoid busy looping.
+        std::this_thread::sleep_for(10ms);
         return 0;
     }
 
@@ -409,6 +417,55 @@
     void* mCallbackData = nullptr;
 };
 
+class MockTimedQueueWrapper : public TimedQueueWrapper {
+public:
+    MOCK_METHOD3(addJob, void(MountId, Milliseconds, Job));
+    MOCK_METHOD1(removeJobs, void(MountId));
+    MOCK_METHOD0(stop, void());
+
+    MockTimedQueueWrapper() {
+        ON_CALL(*this, addJob(_, _, _))
+                .WillByDefault(Invoke(this, &MockTimedQueueWrapper::storeJob));
+        ON_CALL(*this, removeJobs(_)).WillByDefault(Invoke(this, &MockTimedQueueWrapper::clearJob));
+    }
+
+    void storeJob(MountId id, Milliseconds after, Job what) {
+        mId = id;
+        mAfter = after;
+        mWhat = std::move(what);
+    }
+
+    void clearJob(MountId id) {
+        if (mId == id) {
+            mAfter = {};
+            mWhat = {};
+        }
+    }
+
+    MountId mId = -1;
+    Milliseconds mAfter;
+    Job mWhat;
+};
+
+class MockStorageHealthListener : public os::incremental::BnStorageHealthListener {
+public:
+    MOCK_METHOD2(onHealthStatus, binder::Status(int32_t storageId, int32_t status));
+
+    MockStorageHealthListener() {
+        ON_CALL(*this, onHealthStatus(_, _))
+                .WillByDefault(Invoke(this, &MockStorageHealthListener::storeStorageIdAndStatus));
+    }
+
+    binder::Status storeStorageIdAndStatus(int32_t storageId, int32_t status) {
+        mStorageId = storageId;
+        mStatus = status;
+        return binder::Status::ok();
+    }
+
+    int32_t mStorageId = -1;
+    int32_t mStatus = -1;
+};
+
 class MockServiceManager : public ServiceManagerWrapper {
 public:
     MockServiceManager(std::unique_ptr<MockVoldService> vold,
@@ -416,13 +473,15 @@
                        std::unique_ptr<MockIncFs> incfs,
                        std::unique_ptr<MockAppOpsManager> appOpsManager,
                        std::unique_ptr<MockJniWrapper> jni,
-                       std::unique_ptr<MockLooperWrapper> looper)
+                       std::unique_ptr<MockLooperWrapper> looper,
+                       std::unique_ptr<MockTimedQueueWrapper> timedQueue)
           : mVold(std::move(vold)),
             mDataLoaderManager(std::move(dataLoaderManager)),
             mIncFs(std::move(incfs)),
             mAppOpsManager(std::move(appOpsManager)),
             mJni(std::move(jni)),
-            mLooper(std::move(looper)) {}
+            mLooper(std::move(looper)),
+            mTimedQueue(std::move(timedQueue)) {}
     std::unique_ptr<VoldServiceWrapper> getVoldService() final { return std::move(mVold); }
     std::unique_ptr<DataLoaderManagerWrapper> getDataLoaderManager() final {
         return std::move(mDataLoaderManager);
@@ -431,6 +490,7 @@
     std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final { return std::move(mAppOpsManager); }
     std::unique_ptr<JniWrapper> getJni() final { return std::move(mJni); }
     std::unique_ptr<LooperWrapper> getLooper() final { return std::move(mLooper); }
+    std::unique_ptr<TimedQueueWrapper> getTimedQueue() final { return std::move(mTimedQueue); }
 
 private:
     std::unique_ptr<MockVoldService> mVold;
@@ -439,6 +499,7 @@
     std::unique_ptr<MockAppOpsManager> mAppOpsManager;
     std::unique_ptr<MockJniWrapper> mJni;
     std::unique_ptr<MockLooperWrapper> mLooper;
+    std::unique_ptr<MockTimedQueueWrapper> mTimedQueue;
 };
 
 // --- IncrementalServiceTest ---
@@ -460,6 +521,8 @@
         mJni = jni.get();
         auto looper = std::make_unique<NiceMock<MockLooperWrapper>>();
         mLooper = looper.get();
+        auto timedQueue = std::make_unique<NiceMock<MockTimedQueueWrapper>>();
+        mTimedQueue = timedQueue.get();
         mIncrementalService =
                 std::make_unique<IncrementalService>(MockServiceManager(std::move(vold),
                                                                         std::move(
@@ -467,7 +530,8 @@
                                                                         std::move(incFs),
                                                                         std::move(appOps),
                                                                         std::move(jni),
-                                                                        std::move(looper)),
+                                                                        std::move(looper),
+                                                                        std::move(timedQueue)),
                                                      mRootDir.path);
         mDataLoaderParcel.packageName = "com.test";
         mDataLoaderParcel.arguments = "uri";
@@ -503,6 +567,7 @@
     NiceMock<MockAppOpsManager>* mAppOpsManager = nullptr;
     NiceMock<MockJniWrapper>* mJni = nullptr;
     NiceMock<MockLooperWrapper>* mLooper = nullptr;
+    NiceMock<MockTimedQueueWrapper>* mTimedQueue = nullptr;
     NiceMock<MockDataLoader>* mDataLoader = nullptr;
     std::unique_ptr<IncrementalService> mIncrementalService;
     TemporaryDir mRootDir;
@@ -710,6 +775,136 @@
     mLooper->mCallback(-1, -1, mLooper->mCallbackData);
 }
 
+TEST_F(IncrementalServiceTest, testStartDataLoaderUnhealthyStorage) {
+    mVold->mountIncFsSuccess();
+    mIncFs->makeFileSuccess();
+    mIncFs->openMountSuccess();
+    mVold->bindMountSuccess();
+    mDataLoaderManager->bindToDataLoaderSuccess();
+    mDataLoaderManager->getDataLoaderSuccess();
+    EXPECT_CALL(*mDataLoaderManager, bindToDataLoader(_, _, _, _)).Times(1);
+    EXPECT_CALL(*mDataLoaderManager, unbindFromDataLoader(_)).Times(1);
+    EXPECT_CALL(*mDataLoader, create(_, _, _, _)).Times(1);
+    EXPECT_CALL(*mDataLoader, start(_)).Times(1);
+    EXPECT_CALL(*mDataLoader, destroy(_)).Times(1);
+    EXPECT_CALL(*mVold, unmountIncFs(_)).Times(2);
+    EXPECT_CALL(*mLooper, addFd(MockIncFs::kPendingReadsFd, _, _, _, _)).Times(2);
+    EXPECT_CALL(*mLooper, removeFd(MockIncFs::kPendingReadsFd)).Times(2);
+    EXPECT_CALL(*mTimedQueue, addJob(_, _, _)).Times(4);
+
+    sp<NiceMock<MockStorageHealthListener>> listener{new NiceMock<MockStorageHealthListener>};
+    NiceMock<MockStorageHealthListener>* listenerMock = listener.get();
+    EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_OK))
+            .Times(2);
+    EXPECT_CALL(*listenerMock,
+                onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING))
+            .Times(1);
+    EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_BLOCKED))
+            .Times(1);
+    EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY))
+            .Times(2);
+
+    StorageHealthCheckParams params;
+    params.blockedTimeoutMs = 10000;
+    params.unhealthyTimeoutMs = 20000;
+    params.unhealthyMonitoringMs = 30000;
+
+    using MS = std::chrono::milliseconds;
+    using MCS = std::chrono::microseconds;
+
+    const auto blockedTimeout = MS(params.blockedTimeoutMs);
+    const auto unhealthyTimeout = MS(params.unhealthyTimeoutMs);
+    const auto unhealthyMonitoring = MS(params.unhealthyMonitoringMs);
+
+    const uint64_t kFirstTimestampUs = 1000000000ll;
+    const uint64_t kBlockedTimestampUs =
+            kFirstTimestampUs - std::chrono::duration_cast<MCS>(blockedTimeout).count();
+    const uint64_t kUnhealthyTimestampUs =
+            kFirstTimestampUs - std::chrono::duration_cast<MCS>(unhealthyTimeout).count();
+
+    TemporaryDir tempDir;
+    int storageId = mIncrementalService->createStorage(tempDir.path, std::move(mDataLoaderParcel),
+                                                       IncrementalService::CreateOptions::CreateNew,
+                                                       {}, std::move(params), listener);
+    ASSERT_GE(storageId, 0);
+
+    // Healthy state, registered for pending reads.
+    ASSERT_NE(nullptr, mLooper->mCallback);
+    ASSERT_NE(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
+
+    // Looper/epoll callback.
+    mIncFs->waitForPendingReadsSuccess(kFirstTimestampUs);
+    mLooper->mCallback(-1, -1, mLooper->mCallbackData);
+
+    // Unregister from pending reads and wait.
+    ASSERT_EQ(nullptr, mLooper->mCallback);
+    ASSERT_EQ(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_READS_PENDING, listener->mStatus);
+    // Timed callback present.
+    ASSERT_EQ(storageId, mTimedQueue->mId);
+    ASSERT_GE(mTimedQueue->mAfter, blockedTimeout);
+    auto timedCallback = mTimedQueue->mWhat;
+    mTimedQueue->clearJob(storageId);
+
+    // Timed job callback for blocked.
+    mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs);
+    timedCallback();
+
+    // Still not registered, and blocked.
+    ASSERT_EQ(nullptr, mLooper->mCallback);
+    ASSERT_EQ(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_BLOCKED, listener->mStatus);
+    // Timed callback present.
+    ASSERT_EQ(storageId, mTimedQueue->mId);
+    ASSERT_GE(mTimedQueue->mAfter, 1000ms);
+    timedCallback = mTimedQueue->mWhat;
+    mTimedQueue->clearJob(storageId);
+
+    // Timed job callback for unhealthy.
+    mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
+    timedCallback();
+
+    // Still not registered, and blocked.
+    ASSERT_EQ(nullptr, mLooper->mCallback);
+    ASSERT_EQ(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
+    // Timed callback present.
+    ASSERT_EQ(storageId, mTimedQueue->mId);
+    ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
+    timedCallback = mTimedQueue->mWhat;
+    mTimedQueue->clearJob(storageId);
+
+    // One more unhealthy.
+    mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
+    timedCallback();
+
+    // Still not registered, and blocked.
+    ASSERT_EQ(nullptr, mLooper->mCallback);
+    ASSERT_EQ(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
+    // Timed callback present.
+    ASSERT_EQ(storageId, mTimedQueue->mId);
+    ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
+    timedCallback = mTimedQueue->mWhat;
+    mTimedQueue->clearJob(storageId);
+
+    // And now healthy.
+    mIncFs->waitForPendingReadsTimeout();
+    timedCallback();
+
+    // Healthy state, registered for pending reads.
+    ASSERT_NE(nullptr, mLooper->mCallback);
+    ASSERT_NE(nullptr, mLooper->mCallbackData);
+    ASSERT_EQ(storageId, listener->mStorageId);
+    ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
+}
+
 TEST_F(IncrementalServiceTest, testSetIncFsMountOptionsSuccess) {
     mVold->mountIncFsSuccess();
     mIncFs->makeFileSuccess();