Lifecycle: detecting pending reads.
Once pending read detected, try to start the dataloader.
Bug: 153874006
Test: test PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest
Change-Id: Ia8169ccbb0f710317715e6fddb9bc6a718543766
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index c3c2157..d412a19 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -266,6 +266,7 @@
mIncFs(sm.getIncFs()),
mAppOpsManager(sm.getAppOpsManager()),
mJni(sm.getJni()),
+ mLooper(sm.getLooper()),
mIncrementalDir(rootDir) {
if (!mVold) {
LOG(FATAL) << "Vold service is unavailable";
@@ -276,12 +277,22 @@
if (!mAppOpsManager) {
LOG(FATAL) << "AppOpsManager is unavailable";
}
+ if (!mJni) {
+ LOG(FATAL) << "JNI is unavailable";
+ }
+ if (!mLooper) {
+ LOG(FATAL) << "Looper is unavailable";
+ }
mJobQueue.reserve(16);
mJobProcessor = std::thread([this]() {
mJni->initializeForCurrentThread();
runJobProcessing();
});
+ mCmdLooperThread = std::thread([this]() {
+ mJni->initializeForCurrentThread();
+ runCmdLooper();
+ });
const auto mountedRootNames = adoptMountedInstances();
mountExistingImages(mountedRootNames);
@@ -294,6 +305,7 @@
}
mJobCondition.notify_all();
mJobProcessor.join();
+ mCmdLooperThread.join();
}
static const char* toString(IncrementalService::BindKind kind) {
@@ -1315,6 +1327,13 @@
return true;
}
+void IncrementalService::runCmdLooper() {
+ constexpr auto kTimeoutMsecs = 1000;
+ while (mRunning.load(std::memory_order_relaxed)) {
+ mLooper->pollAll(kTimeoutMsecs);
+ }
+}
+
IncrementalService::DataLoaderStubPtr IncrementalService::prepareDataLoader(
IncFsMount& ifs, DataLoaderParamsParcel&& params,
const DataLoaderStatusListener* externalListener) {
@@ -1337,8 +1356,16 @@
fsControlParcel.incremental->log.reset(dup(ifs.control.logs()));
fsControlParcel.service = new IncrementalServiceConnector(*this, ifs.mountId);
- ifs.dataLoaderStub = new DataLoaderStub(*this, ifs.mountId, std::move(params),
- std::move(fsControlParcel), externalListener);
+ incfs::UniqueControl healthControl = mIncFs->openMount(ifs.root.c_str());
+ if (healthControl.pendingReads() < 0) {
+ LOG(ERROR) << "Failed to open health control for: " << ifs.root << "("
+ << healthControl.cmd() << ":" << healthControl.pendingReads() << ":"
+ << healthControl.logs() << ")";
+ }
+
+ ifs.dataLoaderStub =
+ new DataLoaderStub(*this, ifs.mountId, std::move(params), std::move(fsControlParcel),
+ std::move(healthControl), externalListener);
}
template <class Duration>
@@ -1658,24 +1685,34 @@
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
DataLoaderParamsParcel&& params,
FileSystemControlParcel&& control,
+ incfs::UniqueControl&& healthControl,
const DataLoaderStatusListener* externalListener)
: mService(service),
mId(id),
mParams(std::move(params)),
mControl(std::move(control)),
+ mHealthControl(std::move(healthControl)),
mListener(externalListener ? *externalListener : DataLoaderStatusListener()) {
+ addToCmdLooperLocked();
}
-IncrementalService::DataLoaderStub::~DataLoaderStub() = default;
+IncrementalService::DataLoaderStub::~DataLoaderStub() {
+ if (mId != kInvalidStorageId) {
+ cleanupResources();
+ }
+}
void IncrementalService::DataLoaderStub::cleanupResources() {
requestDestroy();
auto now = Clock::now();
-
std::unique_lock lock(mMutex);
+
+ removeFromCmdLooperLocked();
+
mParams = {};
mControl = {};
+ mHealthControl = {};
mStatusCondition.wait_until(lock, now + 60s, [this] {
return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
});
@@ -1710,21 +1747,19 @@
}
bool IncrementalService::DataLoaderStub::setTargetStatus(int newStatus) {
- int oldStatus, curStatus;
{
std::unique_lock lock(mMutex);
- oldStatus = mTargetStatus;
- curStatus = mCurrentStatus;
setTargetStatusLocked(newStatus);
}
- LOG(DEBUG) << "Target status update for DataLoader " << mId << ": " << oldStatus << " -> "
- << newStatus << " (current " << curStatus << ")";
return fsmStep();
}
void IncrementalService::DataLoaderStub::setTargetStatusLocked(int status) {
+ auto oldStatus = mTargetStatus;
mTargetStatus = status;
mTargetStatusTs = Clock::now();
+ LOG(DEBUG) << "Target status update for DataLoader " << mId << ": " << oldStatus << " -> "
+ << status << " (current " << mCurrentStatus << ")";
}
bool IncrementalService::DataLoaderStub::bind() {
@@ -1860,12 +1895,75 @@
return binder::Status::ok();
}
+void IncrementalService::DataLoaderStub::addToCmdLooperLocked() {
+ const auto pendingReadsFd = mHealthControl.pendingReads();
+ if (pendingReadsFd < 0) {
+ return;
+ }
+
+ mService.mLooper->addFd(
+ pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
+ [](int, int, void* data) -> int {
+ auto&& self = (DataLoaderStub*)data;
+ return self->onCmdLooperEvent();
+ },
+ this);
+ mService.mLooper->wake();
+}
+
+void IncrementalService::DataLoaderStub::removeFromCmdLooperLocked() {
+ const auto pendingReadsFd = mHealthControl.pendingReads();
+ if (pendingReadsFd < 0) {
+ return;
+ }
+
+ mService.mLooper->removeFd(pendingReadsFd);
+ mService.mLooper->wake();
+}
+
+int IncrementalService::DataLoaderStub::onCmdLooperEvent() {
+ if (!mService.mRunning.load(std::memory_order_relaxed)) {
+ return 0;
+ }
+
+ bool pendingReadsOccur = false;
+
+ {
+ std::unique_lock lock(mMutex);
+ const auto now = Clock::now();
+ if (now < mEarliestMissingPageTs) {
+ // Transition: duration::max -> now.
+ mEarliestMissingPageTs = now;
+ pendingReadsOccur = true;
+ }
+ }
+
+ if (pendingReadsOccur) {
+ LOG(INFO) << "Pending reads occur for, requesting start for: " << mId;
+ requestStart();
+ }
+
+ {
+ // Drop pending reads.
+ static constexpr auto kMaxDropIterations = 3;
+ std::unique_lock lock(mMutex);
+ for (int i = 0; i < kMaxDropIterations; ++i) {
+ if (IncFs_DropPendingReads(mHealthControl) <= 0) {
+ break;
+ }
+ }
+ }
+ return 1;
+}
+
void IncrementalService::DataLoaderStub::onDump(int fd) {
dprintf(fd, " dataLoader: {\n");
dprintf(fd, " currentStatus: %d\n", mCurrentStatus);
dprintf(fd, " targetStatus: %d\n", mTargetStatus);
dprintf(fd, " targetStatusTs: %lldmcs\n",
(long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+ dprintf(fd, " earliestMissingPageTs: %lldmcs\n",
+ (long long)(elapsedMcs(mEarliestMissingPageTs, Clock::now())));
const auto& params = mParams;
dprintf(fd, " dataLoaderParams: {\n");
dprintf(fd, " type: %s\n", toString(params.type).c_str());