Lifecycle: detecting pending reads.

Once pending read detected, try to start the dataloader.

Bug: 153874006
Test: test PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest

Change-Id: Ia8169ccbb0f710317715e6fddb9bc6a718543766
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index c3c2157..d412a19 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -266,6 +266,7 @@
         mIncFs(sm.getIncFs()),
         mAppOpsManager(sm.getAppOpsManager()),
         mJni(sm.getJni()),
+        mLooper(sm.getLooper()),
         mIncrementalDir(rootDir) {
     if (!mVold) {
         LOG(FATAL) << "Vold service is unavailable";
@@ -276,12 +277,22 @@
     if (!mAppOpsManager) {
         LOG(FATAL) << "AppOpsManager is unavailable";
     }
+    if (!mJni) {
+        LOG(FATAL) << "JNI is unavailable";
+    }
+    if (!mLooper) {
+        LOG(FATAL) << "Looper is unavailable";
+    }
 
     mJobQueue.reserve(16);
     mJobProcessor = std::thread([this]() {
         mJni->initializeForCurrentThread();
         runJobProcessing();
     });
+    mCmdLooperThread = std::thread([this]() {
+        mJni->initializeForCurrentThread();
+        runCmdLooper();
+    });
 
     const auto mountedRootNames = adoptMountedInstances();
     mountExistingImages(mountedRootNames);
@@ -294,6 +305,7 @@
     }
     mJobCondition.notify_all();
     mJobProcessor.join();
+    mCmdLooperThread.join();
 }
 
 static const char* toString(IncrementalService::BindKind kind) {
@@ -1315,6 +1327,13 @@
     return true;
 }
 
+void IncrementalService::runCmdLooper() {
+    constexpr auto kTimeoutMsecs = 1000;
+    while (mRunning.load(std::memory_order_relaxed)) {
+        mLooper->pollAll(kTimeoutMsecs);
+    }
+}
+
 IncrementalService::DataLoaderStubPtr IncrementalService::prepareDataLoader(
         IncFsMount& ifs, DataLoaderParamsParcel&& params,
         const DataLoaderStatusListener* externalListener) {
@@ -1337,8 +1356,16 @@
     fsControlParcel.incremental->log.reset(dup(ifs.control.logs()));
     fsControlParcel.service = new IncrementalServiceConnector(*this, ifs.mountId);
 
-    ifs.dataLoaderStub = new DataLoaderStub(*this, ifs.mountId, std::move(params),
-                                            std::move(fsControlParcel), externalListener);
+    incfs::UniqueControl healthControl = mIncFs->openMount(ifs.root.c_str());
+    if (healthControl.pendingReads() < 0) {
+        LOG(ERROR) << "Failed to open health control for: " << ifs.root << "("
+                   << healthControl.cmd() << ":" << healthControl.pendingReads() << ":"
+                   << healthControl.logs() << ")";
+    }
+
+    ifs.dataLoaderStub =
+            new DataLoaderStub(*this, ifs.mountId, std::move(params), std::move(fsControlParcel),
+                               std::move(healthControl), externalListener);
 }
 
 template <class Duration>
@@ -1658,24 +1685,34 @@
 IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
                                                    DataLoaderParamsParcel&& params,
                                                    FileSystemControlParcel&& control,
+                                                   incfs::UniqueControl&& healthControl,
                                                    const DataLoaderStatusListener* externalListener)
       : mService(service),
         mId(id),
         mParams(std::move(params)),
         mControl(std::move(control)),
+        mHealthControl(std::move(healthControl)),
         mListener(externalListener ? *externalListener : DataLoaderStatusListener()) {
+    addToCmdLooperLocked();
 }
 
-IncrementalService::DataLoaderStub::~DataLoaderStub() = default;
+IncrementalService::DataLoaderStub::~DataLoaderStub() {
+    if (mId != kInvalidStorageId) {
+        cleanupResources();
+    }
+}
 
 void IncrementalService::DataLoaderStub::cleanupResources() {
     requestDestroy();
 
     auto now = Clock::now();
-
     std::unique_lock lock(mMutex);
+
+    removeFromCmdLooperLocked();
+
     mParams = {};
     mControl = {};
+    mHealthControl = {};
     mStatusCondition.wait_until(lock, now + 60s, [this] {
         return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
     });
@@ -1710,21 +1747,19 @@
 }
 
 bool IncrementalService::DataLoaderStub::setTargetStatus(int newStatus) {
-    int oldStatus, curStatus;
     {
         std::unique_lock lock(mMutex);
-        oldStatus = mTargetStatus;
-        curStatus = mCurrentStatus;
         setTargetStatusLocked(newStatus);
     }
-    LOG(DEBUG) << "Target status update for DataLoader " << mId << ": " << oldStatus << " -> "
-               << newStatus << " (current " << curStatus << ")";
     return fsmStep();
 }
 
 void IncrementalService::DataLoaderStub::setTargetStatusLocked(int status) {
+    auto oldStatus = mTargetStatus;
     mTargetStatus = status;
     mTargetStatusTs = Clock::now();
+    LOG(DEBUG) << "Target status update for DataLoader " << mId << ": " << oldStatus << " -> "
+               << status << " (current " << mCurrentStatus << ")";
 }
 
 bool IncrementalService::DataLoaderStub::bind() {
@@ -1860,12 +1895,75 @@
     return binder::Status::ok();
 }
 
+void IncrementalService::DataLoaderStub::addToCmdLooperLocked() {
+    const auto pendingReadsFd = mHealthControl.pendingReads();
+    if (pendingReadsFd < 0) {
+        return;
+    }
+
+    mService.mLooper->addFd(
+            pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
+            [](int, int, void* data) -> int {
+                auto&& self = (DataLoaderStub*)data;
+                return self->onCmdLooperEvent();
+            },
+            this);
+    mService.mLooper->wake();
+}
+
+void IncrementalService::DataLoaderStub::removeFromCmdLooperLocked() {
+    const auto pendingReadsFd = mHealthControl.pendingReads();
+    if (pendingReadsFd < 0) {
+        return;
+    }
+
+    mService.mLooper->removeFd(pendingReadsFd);
+    mService.mLooper->wake();
+}
+
+int IncrementalService::DataLoaderStub::onCmdLooperEvent() {
+    if (!mService.mRunning.load(std::memory_order_relaxed)) {
+        return 0;
+    }
+
+    bool pendingReadsOccur = false;
+
+    {
+        std::unique_lock lock(mMutex);
+        const auto now = Clock::now();
+        if (now < mEarliestMissingPageTs) {
+            // Transition: duration::max -> now.
+            mEarliestMissingPageTs = now;
+            pendingReadsOccur = true;
+        }
+    }
+
+    if (pendingReadsOccur) {
+        LOG(INFO) << "Pending reads occur for, requesting start for: " << mId;
+        requestStart();
+    }
+
+    {
+        // Drop pending reads.
+        static constexpr auto kMaxDropIterations = 3;
+        std::unique_lock lock(mMutex);
+        for (int i = 0; i < kMaxDropIterations; ++i) {
+            if (IncFs_DropPendingReads(mHealthControl) <= 0) {
+                break;
+            }
+        }
+    }
+    return 1;
+}
+
 void IncrementalService::DataLoaderStub::onDump(int fd) {
     dprintf(fd, "    dataLoader: {\n");
     dprintf(fd, "      currentStatus: %d\n", mCurrentStatus);
     dprintf(fd, "      targetStatus: %d\n", mTargetStatus);
     dprintf(fd, "      targetStatusTs: %lldmcs\n",
             (long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+    dprintf(fd, "      earliestMissingPageTs: %lldmcs\n",
+            (long long)(elapsedMcs(mEarliestMissingPageTs, Clock::now())));
     const auto& params = mParams;
     dprintf(fd, "      dataLoaderParams: {\n");
     dprintf(fd, "        type: %s\n", toString(params.type).c_str());