Lifecycle: detecting blocked and unhealthy, part 2.
Part 2: continuous health checking, blocked and unhealthy states.
Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest
Change-Id: Ifdbff7dfa24b3bd9b96c534a40bb1226082749ca
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 66c7717..2fcb005 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -294,6 +294,10 @@
mJni->initializeForCurrentThread();
runCmdLooper();
});
+ mTimerThread = std::thread([this]() {
+ mJni->initializeForCurrentThread();
+ runTimers();
+ });
const auto mountedRootNames = adoptMountedInstances();
mountExistingImages(mountedRootNames);
@@ -306,7 +310,13 @@
}
mJobCondition.notify_all();
mJobProcessor.join();
+ mTimerCondition.notify_all();
+ mTimerThread.join();
mCmdLooperThread.join();
+ mTimedJobs.clear();
+ // Ensure that mounts are destroyed while the service is still valid.
+ mBindsByPath.clear();
+ mMounts.clear();
}
static const char* toString(IncrementalService::BindKind kind) {
@@ -1700,6 +1710,55 @@
}
}
+void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+ if (id == kInvalidStorageId) {
+ return;
+ }
+ {
+ std::unique_lock lock(mTimerMutex);
+ mTimedJobs.insert(TimedJob{id, when, std::move(what)});
+ }
+ mTimerCondition.notify_all();
+}
+
+void IncrementalService::removeTimedJobs(MountId id) {
+ if (id == kInvalidStorageId) {
+ return;
+ }
+ {
+ std::unique_lock lock(mTimerMutex);
+ std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
+ }
+}
+
+void IncrementalService::runTimers() {
+ static constexpr TimePoint kInfinityTs{Clock::duration::max()};
+ TimePoint nextTaskTs = kInfinityTs;
+ for (;;) {
+ std::unique_lock lock(mTimerMutex);
+ mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
+ auto now = Clock::now();
+ return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
+ });
+ if (!mRunning) {
+ return;
+ }
+
+ auto now = Clock::now();
+ auto it = mTimedJobs.begin();
+ // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
+ for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
+ auto job = it->what;
+ mTimedJobs.erase(it);
+
+ lock.unlock();
+ job();
+ lock.lock();
+ }
+ nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
+ }
+}
+
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
DataLoaderParamsParcel&& params,
FileSystemControlParcel&& control,
@@ -1713,10 +1772,17 @@
mControl(std::move(control)),
mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()),
mHealthListener(healthListener ? *healthListener : StorageHealthListener()),
- mHealthPath(std::move(healthPath)) {
- // TODO(b/153874006): enable external health listener.
- mHealthListener = {};
- healthStatusOk();
+ mHealthPath(std::move(healthPath)),
+ mHealthCheckParams(std::move(healthCheckParams)) {
+ if (mHealthListener) {
+ if (!isHealthParamsValid()) {
+ mHealthListener = {};
+ }
+ } else {
+ // Disable advanced health check statuses.
+ mHealthCheckParams.blockedTimeoutMs = -1;
+ }
+ updateHealthStatus();
}
IncrementalService::DataLoaderStub::~DataLoaderStub() {
@@ -1726,21 +1792,29 @@
}
void IncrementalService::DataLoaderStub::cleanupResources() {
+ auto now = Clock::now();
+ {
+ std::unique_lock lock(mMutex);
+ mHealthPath.clear();
+ unregisterFromPendingReads();
+ resetHealthControl();
+ mService.removeTimedJobs(mId);
+ }
+
requestDestroy();
- auto now = Clock::now();
- std::unique_lock lock(mMutex);
-
- unregisterFromPendingReads();
-
- mParams = {};
- mControl = {};
- mStatusCondition.wait_until(lock, now + 60s, [this] {
- return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
- });
- mStatusListener = {};
- mHealthListener = {};
- mId = kInvalidStorageId;
+ {
+ std::unique_lock lock(mMutex);
+ mParams = {};
+ mControl = {};
+ mHealthControl = {};
+ mHealthListener = {};
+ mStatusCondition.wait_until(lock, now + 60s, [this] {
+ return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
+ });
+ mStatusListener = {};
+ mId = kInvalidStorageId;
+ }
}
sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() {
@@ -1838,7 +1912,7 @@
targetStatus = mTargetStatus;
}
- LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus;
+ LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus;
if (currentStatus == targetStatus) {
return true;
@@ -1920,42 +1994,167 @@
return binder::Status::ok();
}
-void IncrementalService::DataLoaderStub::healthStatusOk() {
- LOG(DEBUG) << "healthStatusOk: " << mId;
- std::unique_lock lock(mMutex);
- registerForPendingReads();
+bool IncrementalService::DataLoaderStub::isHealthParamsValid() const {
+ return mHealthCheckParams.blockedTimeoutMs > 0 &&
+ mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs;
}
-void IncrementalService::DataLoaderStub::healthStatusReadsPending() {
- LOG(DEBUG) << "healthStatusReadsPending: " << mId;
- requestStart();
-
- std::unique_lock lock(mMutex);
- unregisterFromPendingReads();
+void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener,
+ int healthStatus) {
+ LOG(DEBUG) << id() << ": healthStatus: " << healthStatus;
+ if (healthListener) {
+ healthListener->onHealthStatus(id(), healthStatus);
+ }
}
-void IncrementalService::DataLoaderStub::healthStatusBlocked() {}
+void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
+ LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : "");
-void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {}
+ int healthStatusToReport = -1;
+ StorageHealthListener healthListener;
-void IncrementalService::DataLoaderStub::registerForPendingReads() {
- auto pendingReadsFd = mHealthControl.pendingReads();
- if (pendingReadsFd < 0) {
- mHealthControl = mService.mIncFs->openMount(mHealthPath);
- pendingReadsFd = mHealthControl.pendingReads();
- if (pendingReadsFd < 0) {
- LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath
- << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
- << mHealthControl.logs() << ")";
+ {
+ std::unique_lock lock(mMutex);
+ unregisterFromPendingReads();
+
+ healthListener = mHealthListener;
+
+ // Healthcheck depends on timestamp of the oldest pending read.
+ // To get it, we need to re-open a pendingReads FD to get a full list of reads.
+ // Additionally we need to re-register for epoll with fresh FDs in case there are no reads.
+ const auto now = Clock::now();
+ const auto kernelTsUs = getOldestPendingReadTs();
+ if (baseline) {
+ // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads.
+ mHealthBase = {now, kernelTsUs};
+ }
+
+ if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
+ mHealthBase.kernelTsUs > kernelTsUs) {
+ LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
+ registerForPendingReads();
+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
+ lock.unlock();
+ onHealthStatus(healthListener, healthStatusToReport);
return;
}
+
+ resetHealthControl();
+
+ // Always make sure the data loader is started.
+ setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED);
+
+ // Skip any further processing if health check params are invalid.
+ if (!isHealthParamsValid()) {
+ LOG(DEBUG) << id()
+ << ": Skip any further processing if health check params are invalid.";
+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
+ lock.unlock();
+ onHealthStatus(healthListener, healthStatusToReport);
+ // Triggering data loader start. This is a one-time action.
+ fsmStep();
+ return;
+ }
+
+ const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
+ const auto unhealthyTimeout =
+ std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
+ const auto unhealthyMonitoring =
+ std::max(1000ms,
+ std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs));
+
+ const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
+ const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
+ const auto delta = now - userTs;
+
+ TimePoint whenToCheckBack;
+ if (delta < blockedTimeout) {
+ LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
+ whenToCheckBack = userTs + blockedTimeout;
+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
+ } else if (delta < unhealthyTimeout) {
+ LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
+ whenToCheckBack = userTs + unhealthyTimeout;
+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
+ } else {
+ LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
+ whenToCheckBack = now + unhealthyMonitoring;
+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
+ }
+ LOG(DEBUG) << id() << ": updateHealthStatus in "
+ << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
+ now)
+ .count()) /
+ 1000.0
+ << "secs";
+ mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
}
+ if (healthStatusToReport != -1) {
+ onHealthStatus(healthListener, healthStatusToReport);
+ }
+
+ fsmStep();
+}
+
+const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() {
+ if (mHealthPath.empty()) {
+ resetHealthControl();
+ return mHealthControl;
+ }
+ if (mHealthControl.pendingReads() < 0) {
+ mHealthControl = mService.mIncFs->openMount(mHealthPath);
+ }
+ if (mHealthControl.pendingReads() < 0) {
+ LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath
+ << "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
+ << mHealthControl.logs() << ")";
+ }
+ return mHealthControl;
+}
+
+void IncrementalService::DataLoaderStub::resetHealthControl() {
+ mHealthControl = {};
+}
+
+BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() {
+ auto result = kMaxBootClockTsUs;
+
+ const auto& control = initializeHealthControl();
+ if (control.pendingReads() < 0) {
+ return result;
+ }
+
+ std::vector<incfs::ReadInfo> pendingReads;
+ if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) !=
+ android::incfs::WaitResult::HaveData ||
+ pendingReads.empty()) {
+ return result;
+ }
+
+ LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", "
+ << pendingReads.size() << ": " << pendingReads.front().bootClockTsUs;
+
+ for (auto&& pendingRead : pendingReads) {
+ result = std::min(result, pendingRead.bootClockTsUs);
+ }
+ return result;
+}
+
+void IncrementalService::DataLoaderStub::registerForPendingReads() {
+ const auto pendingReadsFd = mHealthControl.pendingReads();
+ if (pendingReadsFd < 0) {
+ return;
+ }
+
+ LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd;
+
mService.mLooper->addFd(
pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
[](int, int, void* data) -> int {
auto&& self = (DataLoaderStub*)data;
- return self->onPendingReads();
+ self->updateHealthStatus(/*baseline=*/true);
+ return 0;
},
this);
mService.mLooper->wake();
@@ -1967,19 +2166,10 @@
return;
}
+ LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd;
+
mService.mLooper->removeFd(pendingReadsFd);
mService.mLooper->wake();
-
- mHealthControl = {};
-}
-
-int IncrementalService::DataLoaderStub::onPendingReads() {
- if (!mService.mRunning.load(std::memory_order_relaxed)) {
- return 0;
- }
-
- healthStatusReadsPending();
- return 0;
}
void IncrementalService::DataLoaderStub::onDump(int fd) {
diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h
index 05f62b9..57e4669 100644
--- a/services/incremental/IncrementalService.h
+++ b/services/incremental/IncrementalService.h
@@ -35,6 +35,7 @@
#include <limits>
#include <map>
#include <mutex>
+#include <set>
#include <span>
#include <string>
#include <string_view>
@@ -186,17 +187,12 @@
void onDump(int fd);
- MountId id() const { return mId; }
+ MountId id() const { return mId.load(std::memory_order_relaxed); }
const content::pm::DataLoaderParamsParcel& params() const { return mParams; }
private:
binder::Status onStatusChanged(MountId mount, int newStatus) final;
- void registerForPendingReads();
- void unregisterFromPendingReads();
- int onPendingReads();
-
- bool isValid() const { return mId != kInvalidStorageId; }
sp<content::pm::IDataLoader> getDataLoader();
bool bind();
@@ -208,21 +204,27 @@
void setTargetStatusLocked(int status);
bool fsmStep();
+ bool fsmStep(int currentStatus, int targetStatus);
- // Watching for pending reads.
- void healthStatusOk();
- // Pending reads detected, waiting for Xsecs to confirm blocked state.
- void healthStatusReadsPending();
- // There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy
- // state.
- void healthStatusBlocked();
- // There are reads pending for X+Ysecs, marking storage as unhealthy.
- void healthStatusUnhealthy();
+ void onHealthStatus(StorageHealthListener healthListener, int healthStatus);
+ void updateHealthStatus(bool baseline = false);
+
+ bool isValid() const { return id() != kInvalidStorageId; }
+
+ bool isHealthParamsValid() const;
+
+ const incfs::UniqueControl& initializeHealthControl();
+ void resetHealthControl();
+
+ BootClockTsUs getOldestPendingReadTs();
+
+ void registerForPendingReads();
+ void unregisterFromPendingReads();
IncrementalService& mService;
std::mutex mMutex;
- MountId mId = kInvalidStorageId;
+ std::atomic<MountId> mId = kInvalidStorageId;
content::pm::DataLoaderParamsParcel mParams;
content::pm::FileSystemControlParcel mControl;
DataLoaderStatusListener mStatusListener;
@@ -235,6 +237,11 @@
std::string mHealthPath;
incfs::UniqueControl mHealthControl;
+ struct {
+ TimePoint userTs;
+ BootClockTsUs kernelTsUs;
+ } mHealthBase = {TimePoint::max(), kMaxBootClockTsUs};
+ StorageHealthCheckParams mHealthCheckParams;
};
using DataLoaderStubPtr = sp<DataLoaderStub>;
@@ -331,6 +338,8 @@
bool unregisterAppOpsCallback(const std::string& packageName);
void onAppOpChanged(const std::string& packageName);
+ using Job = std::function<void()>;
+
void runJobProcessing();
void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
const incfs::FileId& libFileId, std::string_view targetLibPath,
@@ -338,6 +347,10 @@
void runCmdLooper();
+ void addTimedJob(MountId id, TimePoint when, Job what);
+ void removeTimedJobs(MountId id);
+ void runTimers();
+
private:
const std::unique_ptr<VoldServiceWrapper> mVold;
const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager;
@@ -360,7 +373,6 @@
std::atomic_bool mRunning{true};
- using Job = std::function<void()>;
std::unordered_map<MountId, std::vector<Job>> mJobQueue;
MountId mPendingJobsMount = kInvalidStorageId;
std::condition_variable mJobCondition;
@@ -368,6 +380,19 @@
std::thread mJobProcessor;
std::thread mCmdLooperThread;
+
+ struct TimedJob {
+ MountId id;
+ TimePoint when;
+ Job what;
+ friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
+ return lhs.when < rhs.when;
+ }
+ };
+ std::set<TimedJob> mTimedJobs;
+ std::condition_variable mTimerCondition;
+ std::mutex mTimerMutex;
+ std::thread mTimerThread;
};
} // namespace android::incremental
diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp
index 2948b6a..84ec7d3 100644
--- a/services/incremental/test/IncrementalServiceTest.cpp
+++ b/services/incremental/test/IncrementalServiceTest.cpp
@@ -371,7 +371,7 @@
public:
MOCK_CONST_METHOD0(initializeForCurrentThread, void());
- MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
+ MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
};
class MockLooperWrapper : public LooperWrapper {