Lifecycle: detecting blocked and unhealthy, part 2.

Part 2: continuous health checking, blocked and unhealthy states.

Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest
Change-Id: Ifdbff7dfa24b3bd9b96c534a40bb1226082749ca
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 66c7717..2fcb005 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -294,6 +294,10 @@
         mJni->initializeForCurrentThread();
         runCmdLooper();
     });
+    mTimerThread = std::thread([this]() {
+        mJni->initializeForCurrentThread();
+        runTimers();
+    });
 
     const auto mountedRootNames = adoptMountedInstances();
     mountExistingImages(mountedRootNames);
@@ -306,7 +310,13 @@
     }
     mJobCondition.notify_all();
     mJobProcessor.join();
+    mTimerCondition.notify_all();
+    mTimerThread.join();
     mCmdLooperThread.join();
+    mTimedJobs.clear();
+    // Ensure that mounts are destroyed while the service is still valid.
+    mBindsByPath.clear();
+    mMounts.clear();
 }
 
 static const char* toString(IncrementalService::BindKind kind) {
@@ -1700,6 +1710,55 @@
     }
 }
 
+void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+    if (id == kInvalidStorageId) {
+        return;
+    }
+    {
+        std::unique_lock lock(mTimerMutex);
+        mTimedJobs.insert(TimedJob{id, when, std::move(what)});
+    }
+    mTimerCondition.notify_all();
+}
+
+void IncrementalService::removeTimedJobs(MountId id) {
+    if (id == kInvalidStorageId) {
+        return;
+    }
+    {
+        std::unique_lock lock(mTimerMutex);
+        std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
+    }
+}
+
+void IncrementalService::runTimers() {
+    static constexpr TimePoint kInfinityTs{Clock::duration::max()};
+    TimePoint nextTaskTs = kInfinityTs;
+    for (;;) {
+        std::unique_lock lock(mTimerMutex);
+        mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
+            auto now = Clock::now();
+            return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
+        });
+        if (!mRunning) {
+            return;
+        }
+
+        auto now = Clock::now();
+        auto it = mTimedJobs.begin();
+        // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
+        for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
+            auto job = it->what;
+            mTimedJobs.erase(it);
+
+            lock.unlock();
+            job();
+            lock.lock();
+        }
+        nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
+    }
+}
+
 IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
                                                    DataLoaderParamsParcel&& params,
                                                    FileSystemControlParcel&& control,
@@ -1713,10 +1772,17 @@
         mControl(std::move(control)),
         mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()),
         mHealthListener(healthListener ? *healthListener : StorageHealthListener()),
-        mHealthPath(std::move(healthPath)) {
-    // TODO(b/153874006): enable external health listener.
-    mHealthListener = {};
-    healthStatusOk();
+        mHealthPath(std::move(healthPath)),
+        mHealthCheckParams(std::move(healthCheckParams)) {
+    if (mHealthListener) {
+        if (!isHealthParamsValid()) {
+            mHealthListener = {};
+        }
+    } else {
+        // Disable advanced health check statuses.
+        mHealthCheckParams.blockedTimeoutMs = -1;
+    }
+    updateHealthStatus();
 }
 
 IncrementalService::DataLoaderStub::~DataLoaderStub() {
@@ -1726,21 +1792,29 @@
 }
 
 void IncrementalService::DataLoaderStub::cleanupResources() {
+    auto now = Clock::now();
+    {
+        std::unique_lock lock(mMutex);
+        mHealthPath.clear();
+        unregisterFromPendingReads();
+        resetHealthControl();
+        mService.removeTimedJobs(mId);
+    }
+
     requestDestroy();
 
-    auto now = Clock::now();
-    std::unique_lock lock(mMutex);
-
-    unregisterFromPendingReads();
-
-    mParams = {};
-    mControl = {};
-    mStatusCondition.wait_until(lock, now + 60s, [this] {
-        return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
-    });
-    mStatusListener = {};
-    mHealthListener = {};
-    mId = kInvalidStorageId;
+    {
+        std::unique_lock lock(mMutex);
+        mParams = {};
+        mControl = {};
+        mHealthControl = {};
+        mHealthListener = {};
+        mStatusCondition.wait_until(lock, now + 60s, [this] {
+            return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
+        });
+        mStatusListener = {};
+        mId = kInvalidStorageId;
+    }
 }
 
 sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() {
@@ -1838,7 +1912,7 @@
         targetStatus = mTargetStatus;
     }
 
-    LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus;
+    LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus;
 
     if (currentStatus == targetStatus) {
         return true;
@@ -1920,42 +1994,167 @@
     return binder::Status::ok();
 }
 
-void IncrementalService::DataLoaderStub::healthStatusOk() {
-    LOG(DEBUG) << "healthStatusOk: " << mId;
-    std::unique_lock lock(mMutex);
-    registerForPendingReads();
+bool IncrementalService::DataLoaderStub::isHealthParamsValid() const {
+    return mHealthCheckParams.blockedTimeoutMs > 0 &&
+            mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs;
 }
 
-void IncrementalService::DataLoaderStub::healthStatusReadsPending() {
-    LOG(DEBUG) << "healthStatusReadsPending: " << mId;
-    requestStart();
-
-    std::unique_lock lock(mMutex);
-    unregisterFromPendingReads();
+void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener,
+                                                        int healthStatus) {
+    LOG(DEBUG) << id() << ": healthStatus: " << healthStatus;
+    if (healthListener) {
+        healthListener->onHealthStatus(id(), healthStatus);
+    }
 }
 
-void IncrementalService::DataLoaderStub::healthStatusBlocked() {}
+void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
+    LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : "");
 
-void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {}
+    int healthStatusToReport = -1;
+    StorageHealthListener healthListener;
 
-void IncrementalService::DataLoaderStub::registerForPendingReads() {
-    auto pendingReadsFd = mHealthControl.pendingReads();
-    if (pendingReadsFd < 0) {
-        mHealthControl = mService.mIncFs->openMount(mHealthPath);
-        pendingReadsFd = mHealthControl.pendingReads();
-        if (pendingReadsFd < 0) {
-            LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath
-                       << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
-                       << mHealthControl.logs() << ")";
+    {
+        std::unique_lock lock(mMutex);
+        unregisterFromPendingReads();
+
+        healthListener = mHealthListener;
+
+        // Healthcheck depends on timestamp of the oldest pending read.
+        // To get it, we need to re-open a pendingReads FD to get a full list of reads.
+        // Additionally we need to re-register for epoll with fresh FDs in case there are no reads.
+        const auto now = Clock::now();
+        const auto kernelTsUs = getOldestPendingReadTs();
+        if (baseline) {
+            // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads.
+            mHealthBase = {now, kernelTsUs};
+        }
+
+        if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
+            mHealthBase.kernelTsUs > kernelTsUs) {
+            LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
+            registerForPendingReads();
+            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
+            lock.unlock();
+            onHealthStatus(healthListener, healthStatusToReport);
             return;
         }
+
+        resetHealthControl();
+
+        // Always make sure the data loader is started.
+        setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED);
+
+        // Skip any further processing if health check params are invalid.
+        if (!isHealthParamsValid()) {
+            LOG(DEBUG) << id()
+                       << ": Skip any further processing if health check params are invalid.";
+            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
+            lock.unlock();
+            onHealthStatus(healthListener, healthStatusToReport);
+            // Triggering data loader start. This is a one-time action.
+            fsmStep();
+            return;
+        }
+
+        const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
+        const auto unhealthyTimeout =
+                std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
+        const auto unhealthyMonitoring =
+                std::max(1000ms,
+                         std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs));
+
+        const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
+        const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
+        const auto delta = now - userTs;
+
+        TimePoint whenToCheckBack;
+        if (delta < blockedTimeout) {
+            LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
+            whenToCheckBack = userTs + blockedTimeout;
+            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
+        } else if (delta < unhealthyTimeout) {
+            LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
+            whenToCheckBack = userTs + unhealthyTimeout;
+            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
+        } else {
+            LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
+            whenToCheckBack = now + unhealthyMonitoring;
+            healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
+        }
+        LOG(DEBUG) << id() << ": updateHealthStatus in "
+                   << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
+                                                                                   now)
+                                     .count()) /
+                        1000.0
+                   << "secs";
+        mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
     }
 
+    if (healthStatusToReport != -1) {
+        onHealthStatus(healthListener, healthStatusToReport);
+    }
+
+    fsmStep();
+}
+
+const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() {
+    if (mHealthPath.empty()) {
+        resetHealthControl();
+        return mHealthControl;
+    }
+    if (mHealthControl.pendingReads() < 0) {
+        mHealthControl = mService.mIncFs->openMount(mHealthPath);
+    }
+    if (mHealthControl.pendingReads() < 0) {
+        LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath
+                   << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
+                   << mHealthControl.logs() << ")";
+    }
+    return mHealthControl;
+}
+
+void IncrementalService::DataLoaderStub::resetHealthControl() {
+    mHealthControl = {};
+}
+
+BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() {
+    auto result = kMaxBootClockTsUs;
+
+    const auto& control = initializeHealthControl();
+    if (control.pendingReads() < 0) {
+        return result;
+    }
+
+    std::vector<incfs::ReadInfo> pendingReads;
+    if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) !=
+                android::incfs::WaitResult::HaveData ||
+        pendingReads.empty()) {
+        return result;
+    }
+
+    LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", "
+               << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs;
+
+    for (auto&& pendingRead : pendingReads) {
+        result = std::min(result, pendingRead.bootClockTsUs);
+    }
+    return result;
+}
+
+void IncrementalService::DataLoaderStub::registerForPendingReads() {
+    const auto pendingReadsFd = mHealthControl.pendingReads();
+    if (pendingReadsFd < 0) {
+        return;
+    }
+
+    LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd;
+
     mService.mLooper->addFd(
             pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
             [](int, int, void* data) -> int {
                 auto&& self = (DataLoaderStub*)data;
-                return self->onPendingReads();
+                self->updateHealthStatus(/*baseline=*/true);
+                return 0;
             },
             this);
     mService.mLooper->wake();
@@ -1967,19 +2166,10 @@
         return;
     }
 
+    LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd;
+
     mService.mLooper->removeFd(pendingReadsFd);
     mService.mLooper->wake();
-
-    mHealthControl = {};
-}
-
-int IncrementalService::DataLoaderStub::onPendingReads() {
-    if (!mService.mRunning.load(std::memory_order_relaxed)) {
-        return 0;
-    }
-
-    healthStatusReadsPending();
-    return 0;
 }
 
 void IncrementalService::DataLoaderStub::onDump(int fd) {
diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h
index 05f62b9..57e4669 100644
--- a/services/incremental/IncrementalService.h
+++ b/services/incremental/IncrementalService.h
@@ -35,6 +35,7 @@
 #include <limits>
 #include <map>
 #include <mutex>
+#include <set>
 #include <span>
 #include <string>
 #include <string_view>
@@ -186,17 +187,12 @@
 
         void onDump(int fd);
 
-        MountId id() const { return mId; }
+        MountId id() const { return mId.load(std::memory_order_relaxed); }
         const content::pm::DataLoaderParamsParcel& params() const { return mParams; }
 
     private:
         binder::Status onStatusChanged(MountId mount, int newStatus) final;
 
-        void registerForPendingReads();
-        void unregisterFromPendingReads();
-        int onPendingReads();
-
-        bool isValid() const { return mId != kInvalidStorageId; }
         sp<content::pm::IDataLoader> getDataLoader();
 
         bool bind();
@@ -208,21 +204,27 @@
         void setTargetStatusLocked(int status);
 
         bool fsmStep();
+        bool fsmStep(int currentStatus, int targetStatus);
 
-        // Watching for pending reads.
-        void healthStatusOk();
-        // Pending reads detected, waiting for Xsecs to confirm blocked state.
-        void healthStatusReadsPending();
-        // There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy
-        // state.
-        void healthStatusBlocked();
-        // There are reads pending for X+Ysecs, marking storage as unhealthy.
-        void healthStatusUnhealthy();
+        void onHealthStatus(StorageHealthListener healthListener, int healthStatus);
+        void updateHealthStatus(bool baseline = false);
+
+        bool isValid() const { return id() != kInvalidStorageId; }
+
+        bool isHealthParamsValid() const;
+
+        const incfs::UniqueControl& initializeHealthControl();
+        void resetHealthControl();
+
+        BootClockTsUs getOldestPendingReadTs();
+
+        void registerForPendingReads();
+        void unregisterFromPendingReads();
 
         IncrementalService& mService;
 
         std::mutex mMutex;
-        MountId mId = kInvalidStorageId;
+        std::atomic<MountId> mId = kInvalidStorageId;
         content::pm::DataLoaderParamsParcel mParams;
         content::pm::FileSystemControlParcel mControl;
         DataLoaderStatusListener mStatusListener;
@@ -235,6 +237,11 @@
 
         std::string mHealthPath;
         incfs::UniqueControl mHealthControl;
+        struct {
+            TimePoint userTs;
+            BootClockTsUs kernelTsUs;
+        } mHealthBase = {TimePoint::max(), kMaxBootClockTsUs};
+        StorageHealthCheckParams mHealthCheckParams;
     };
     using DataLoaderStubPtr = sp<DataLoaderStub>;
 
@@ -331,6 +338,8 @@
     bool unregisterAppOpsCallback(const std::string& packageName);
     void onAppOpChanged(const std::string& packageName);
 
+    using Job = std::function<void()>;
+
     void runJobProcessing();
     void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
                         const incfs::FileId& libFileId, std::string_view targetLibPath,
@@ -338,6 +347,10 @@
 
     void runCmdLooper();
 
+    void addTimedJob(MountId id, TimePoint when, Job what);
+    void removeTimedJobs(MountId id);
+    void runTimers();
+
 private:
     const std::unique_ptr<VoldServiceWrapper> mVold;
     const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager;
@@ -360,7 +373,6 @@
 
     std::atomic_bool mRunning{true};
 
-    using Job = std::function<void()>;
     std::unordered_map<MountId, std::vector<Job>> mJobQueue;
     MountId mPendingJobsMount = kInvalidStorageId;
     std::condition_variable mJobCondition;
@@ -368,6 +380,19 @@
     std::thread mJobProcessor;
 
     std::thread mCmdLooperThread;
+
+    struct TimedJob {
+        MountId id;
+        TimePoint when;
+        Job what;
+        friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
+            return lhs.when < rhs.when;
+        }
+    };
+    std::set<TimedJob> mTimedJobs;
+    std::condition_variable mTimerCondition;
+    std::mutex mTimerMutex;
+    std::thread mTimerThread;
 };
 
 } // namespace android::incremental
diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp
index 2948b6a..84ec7d3 100644
--- a/services/incremental/test/IncrementalServiceTest.cpp
+++ b/services/incremental/test/IncrementalServiceTest.cpp
@@ -371,7 +371,7 @@
 public:
     MOCK_CONST_METHOD0(initializeForCurrentThread, void());
 
-    MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
+    MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
 };
 
 class MockLooperWrapper : public LooperWrapper {