Healthcheck: proper job allocation and test.
Bug: 153874006
Test: atest PackageManagerShellCommandTest PackageManagerShellCommandIncrementalTest IncrementalServiceTest
Change-Id: Iede1f2297cc4f8e3c3f0acd43cee597f75dff179
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 2fcb005..885f4d2 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -268,22 +268,14 @@
mAppOpsManager(sm.getAppOpsManager()),
mJni(sm.getJni()),
mLooper(sm.getLooper()),
+ mTimedQueue(sm.getTimedQueue()),
mIncrementalDir(rootDir) {
- if (!mVold) {
- LOG(FATAL) << "Vold service is unavailable";
- }
- if (!mDataLoaderManager) {
- LOG(FATAL) << "DataLoaderManagerService is unavailable";
- }
- if (!mAppOpsManager) {
- LOG(FATAL) << "AppOpsManager is unavailable";
- }
- if (!mJni) {
- LOG(FATAL) << "JNI is unavailable";
- }
- if (!mLooper) {
- LOG(FATAL) << "Looper is unavailable";
- }
+ CHECK(mVold) << "Vold service is unavailable";
+ CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
+ CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
+ CHECK(mJni) << "JNI is unavailable";
+ CHECK(mLooper) << "Looper is unavailable";
+ CHECK(mTimedQueue) << "TimedQueue is unavailable";
mJobQueue.reserve(16);
mJobProcessor = std::thread([this]() {
@@ -294,10 +286,6 @@
mJni->initializeForCurrentThread();
runCmdLooper();
});
- mTimerThread = std::thread([this]() {
- mJni->initializeForCurrentThread();
- runTimers();
- });
const auto mountedRootNames = adoptMountedInstances();
mountExistingImages(mountedRootNames);
@@ -310,10 +298,8 @@
}
mJobCondition.notify_all();
mJobProcessor.join();
- mTimerCondition.notify_all();
- mTimerThread.join();
mCmdLooperThread.join();
- mTimedJobs.clear();
+ mTimedQueue->stop();
// Ensure that mounts are destroyed while the service is still valid.
mBindsByPath.clear();
mMounts.clear();
@@ -1710,53 +1696,18 @@
}
}
-void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
+void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
if (id == kInvalidStorageId) {
return;
}
- {
- std::unique_lock lock(mTimerMutex);
- mTimedJobs.insert(TimedJob{id, when, std::move(what)});
- }
- mTimerCondition.notify_all();
+ mTimedQueue->addJob(id, after, std::move(what));
}
void IncrementalService::removeTimedJobs(MountId id) {
if (id == kInvalidStorageId) {
return;
}
- {
- std::unique_lock lock(mTimerMutex);
- std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
- }
-}
-
-void IncrementalService::runTimers() {
- static constexpr TimePoint kInfinityTs{Clock::duration::max()};
- TimePoint nextTaskTs = kInfinityTs;
- for (;;) {
- std::unique_lock lock(mTimerMutex);
- mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
- auto now = Clock::now();
- return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
- });
- if (!mRunning) {
- return;
- }
-
- auto now = Clock::now();
- auto it = mTimedJobs.begin();
- // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
- for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
- auto job = it->what;
- mTimedJobs.erase(it);
-
- lock.unlock();
- job();
- lock.lock();
- }
- nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
- }
+ mTimedQueue->removeJobs(id);
}
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
@@ -2029,8 +1980,8 @@
mHealthBase = {now, kernelTsUs};
}
- if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
- mHealthBase.kernelTsUs > kernelTsUs) {
+ if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
+ mHealthBase.userTs > now) {
LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
registerForPendingReads();
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
@@ -2056,6 +2007,9 @@
return;
}
+ // Don't schedule timer job less than 500ms in advance.
+ static constexpr auto kTolerance = 500ms;
+
const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
const auto unhealthyTimeout =
std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
@@ -2065,31 +2019,28 @@
const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
- const auto delta = now - userTs;
+ const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);
- TimePoint whenToCheckBack;
- if (delta < blockedTimeout) {
+ Milliseconds checkBackAfter;
+ if (delta + kTolerance < blockedTimeout) {
LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
- whenToCheckBack = userTs + blockedTimeout;
+ checkBackAfter = blockedTimeout - delta;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
- } else if (delta < unhealthyTimeout) {
+ } else if (delta + kTolerance < unhealthyTimeout) {
LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
- whenToCheckBack = userTs + unhealthyTimeout;
+ checkBackAfter = unhealthyTimeout - delta;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
} else {
LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
- whenToCheckBack = now + unhealthyMonitoring;
+ checkBackAfter = unhealthyMonitoring;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
}
- LOG(DEBUG) << id() << ": updateHealthStatus in "
- << double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
- now)
- .count()) /
- 1000.0
+ LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
<< "secs";
- mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
+ mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
}
+ // With kTolerance we are expecting these to execute before the next update.
if (healthStatusToReport != -1) {
onHealthStatus(healthListener, healthStatusToReport);
}
@@ -2178,6 +2129,16 @@
dprintf(fd, " targetStatus: %d\n", mTargetStatus);
dprintf(fd, " targetStatusTs: %lldmcs\n",
(long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
+ dprintf(fd, " health: {\n");
+ dprintf(fd, " path: %s\n", mHealthPath.c_str());
+ dprintf(fd, " base: %lldmcs (%lld)\n",
+ (long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
+ (long long)mHealthBase.kernelTsUs);
+ dprintf(fd, " blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
+ dprintf(fd, " unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
+ dprintf(fd, " unhealthyMonitoringMs: %d\n",
+ int(mHealthCheckParams.unhealthyMonitoringMs));
+ dprintf(fd, " }\n");
const auto& params = mParams;
dprintf(fd, " dataLoaderParams: {\n");
dprintf(fd, " type: %s\n", toString(params.type).c_str());
diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h
index 57e4669..918531b 100644
--- a/services/incremental/IncrementalService.h
+++ b/services/incremental/IncrementalService.h
@@ -56,8 +56,6 @@
using FileId = incfs::FileId;
using BlockIndex = incfs::BlockIndex;
using RawMetadata = incfs::RawMetadata;
-using Clock = std::chrono::steady_clock;
-using TimePoint = std::chrono::time_point<Clock>;
using Seconds = std::chrono::seconds;
using BootClockTsUs = uint64_t;
@@ -338,8 +336,6 @@
bool unregisterAppOpsCallback(const std::string& packageName);
void onAppOpChanged(const std::string& packageName);
- using Job = std::function<void()>;
-
void runJobProcessing();
void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
const incfs::FileId& libFileId, std::string_view targetLibPath,
@@ -347,9 +343,8 @@
void runCmdLooper();
- void addTimedJob(MountId id, TimePoint when, Job what);
+ void addTimedJob(MountId id, Milliseconds after, Job what);
void removeTimedJobs(MountId id);
- void runTimers();
private:
const std::unique_ptr<VoldServiceWrapper> mVold;
@@ -358,6 +353,7 @@
const std::unique_ptr<AppOpsManagerWrapper> mAppOpsManager;
const std::unique_ptr<JniWrapper> mJni;
const std::unique_ptr<LooperWrapper> mLooper;
+ const std::unique_ptr<TimedQueueWrapper> mTimedQueue;
const std::string mIncrementalDir;
mutable std::mutex mLock;
@@ -380,19 +376,6 @@
std::thread mJobProcessor;
std::thread mCmdLooperThread;
-
- struct TimedJob {
- MountId id;
- TimePoint when;
- Job what;
- friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
- return lhs.when < rhs.when;
- }
- };
- std::set<TimedJob> mTimedJobs;
- std::condition_variable mTimerCondition;
- std::mutex mTimerMutex;
- std::thread mTimerThread;
};
} // namespace android::incremental
diff --git a/services/incremental/ServiceWrappers.cpp b/services/incremental/ServiceWrappers.cpp
index a76aa62..99a35ad 100644
--- a/services/incremental/ServiceWrappers.cpp
+++ b/services/incremental/ServiceWrappers.cpp
@@ -25,6 +25,8 @@
#include <binder/AppOpsManager.h>
#include <utils/String16.h>
+#include <thread>
+
#include "IncrementalServiceValidation.h"
using namespace std::literals;
@@ -181,6 +183,88 @@
}
};
+static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
+
+class RealTimedQueueWrapper : public TimedQueueWrapper {
+public:
+ RealTimedQueueWrapper(JavaVM* jvm) {
+ mThread = std::thread([this, jvm]() {
+ (void)getOrAttachJniEnv(jvm);
+ runTimers();
+ });
+ }
+ ~RealTimedQueueWrapper() final {
+ CHECK(!mRunning) << "call stop first";
+ CHECK(!mThread.joinable()) << "call stop first";
+ }
+
+ void addJob(MountId id, Milliseconds after, Job what) final {
+ const auto now = Clock::now();
+ {
+ std::unique_lock lock(mMutex);
+ mJobs.insert(TimedJob{id, now + after, std::move(what)});
+ }
+ mCondition.notify_all();
+ }
+ void removeJobs(MountId id) final {
+ std::unique_lock lock(mMutex);
+ std::erase_if(mJobs, [id](auto&& item) { return item.id == id; });
+ }
+ void stop() final {
+ {
+ std::unique_lock lock(mMutex);
+ mRunning = false;
+ }
+ mCondition.notify_all();
+ mThread.join();
+ mJobs.clear();
+ }
+
+private:
+ void runTimers() {
+ static constexpr TimePoint kInfinityTs{Clock::duration::max()};
+ TimePoint nextJobTs = kInfinityTs;
+ std::unique_lock lock(mMutex);
+ for (;;) {
+ mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
+ const auto now = Clock::now();
+ const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
+ return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
+ });
+ if (!mRunning) {
+ return;
+ }
+
+ const auto now = Clock::now();
+ auto it = mJobs.begin();
+ // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
+ for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
+ auto job = std::move(it->what);
+ mJobs.erase(it);
+
+ lock.unlock();
+ job();
+ lock.lock();
+ }
+ nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
+ }
+ }
+
+ struct TimedJob {
+ MountId id;
+ TimePoint when;
+ Job what;
+ friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
+ return lhs.when < rhs.when;
+ }
+ };
+ bool mRunning = true;
+ std::set<TimedJob> mJobs;
+ std::condition_variable mCondition;
+ std::mutex mMutex;
+ std::thread mThread;
+};
+
RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env)
: mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {}
@@ -228,6 +312,10 @@
return std::make_unique<RealLooperWrapper>();
}
+std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() {
+ return std::make_unique<RealTimedQueueWrapper>(mJvm);
+}
+
static JavaVM* getJavaVm(JNIEnv* env) {
CHECK(env);
JavaVM* jvm = nullptr;
diff --git a/services/incremental/ServiceWrappers.h b/services/incremental/ServiceWrappers.h
index a935ab9..8cd726fd 100644
--- a/services/incremental/ServiceWrappers.h
+++ b/services/incremental/ServiceWrappers.h
@@ -35,6 +35,11 @@
namespace android::incremental {
+using Clock = std::chrono::steady_clock;
+using TimePoint = std::chrono::time_point<Clock>;
+using Milliseconds = std::chrono::milliseconds;
+using Job = std::function<void()>;
+
// --- Wrapper interfaces ---
using MountId = int32_t;
@@ -121,6 +126,14 @@
virtual int pollAll(int timeoutMillis) = 0;
};
+class TimedQueueWrapper {
+public:
+ virtual ~TimedQueueWrapper() = default;
+ virtual void addJob(MountId id, Milliseconds after, Job what) = 0;
+ virtual void removeJobs(MountId id) = 0;
+ virtual void stop() = 0;
+};
+
class ServiceManagerWrapper {
public:
virtual ~ServiceManagerWrapper() = default;
@@ -130,6 +143,7 @@
virtual std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() = 0;
virtual std::unique_ptr<JniWrapper> getJni() = 0;
virtual std::unique_ptr<LooperWrapper> getLooper() = 0;
+ virtual std::unique_ptr<TimedQueueWrapper> getTimedQueue() = 0;
};
// --- Real stuff ---
@@ -144,6 +158,7 @@
std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final;
std::unique_ptr<JniWrapper> getJni() final;
std::unique_ptr<LooperWrapper> getLooper() final;
+ std::unique_ptr<TimedQueueWrapper> getTimedQueue() final;
private:
template <class INTERFACE>
diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp
index 84ec7d3..26b5094 100644
--- a/services/incremental/test/IncrementalServiceTest.cpp
+++ b/services/incremental/test/IncrementalServiceTest.cpp
@@ -22,6 +22,7 @@
#include <gtest/gtest.h>
#include <utils/Log.h>
+#include <chrono>
#include <future>
#include "IncrementalService.h"
@@ -295,9 +296,21 @@
void openMountSuccess() {
ON_CALL(*this, openMount(_)).WillByDefault(Invoke(this, &MockIncFs::openMountForHealth));
}
- void waitForPendingReadsSuccess() {
+
+ // 1000ms
+ void waitForPendingReadsSuccess(uint64_t ts = 0) {
ON_CALL(*this, waitForPendingReads(_, _, _))
- .WillByDefault(Invoke(this, &MockIncFs::waitForPendingReadsForHealth));
+ .WillByDefault(
+ Invoke([ts](const Control& control, std::chrono::milliseconds timeout,
+ std::vector<incfs::ReadInfo>* pendingReadsBuffer) {
+ pendingReadsBuffer->push_back({.bootClockTsUs = ts});
+ return android::incfs::WaitResult::HaveData;
+ }));
+ }
+
+ void waitForPendingReadsTimeout() {
+ ON_CALL(*this, waitForPendingReads(_, _, _))
+ .WillByDefault(Return(android::incfs::WaitResult::Timeout));
}
static constexpr auto kPendingReadsFd = 42;
@@ -305,13 +318,6 @@
return UniqueControl(IncFs_CreateControl(-1, kPendingReadsFd, -1));
}
- WaitResult waitForPendingReadsForHealth(
- const Control& control, std::chrono::milliseconds timeout,
- std::vector<incfs::ReadInfo>* pendingReadsBuffer) const {
- pendingReadsBuffer->push_back({.bootClockTsUs = 0});
- return android::incfs::WaitResult::HaveData;
- }
-
RawMetadata getMountInfoMetadata(const Control& control, std::string_view path) {
metadata::Mount m;
m.mutable_storage()->set_id(100);
@@ -371,7 +377,7 @@
public:
MOCK_CONST_METHOD0(initializeForCurrentThread, void());
- MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
+ MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
};
class MockLooperWrapper : public LooperWrapper {
@@ -385,7 +391,7 @@
ON_CALL(*this, addFd(_, _, _, _, _))
.WillByDefault(Invoke(this, &MockLooperWrapper::storeCallback));
ON_CALL(*this, removeFd(_)).WillByDefault(Invoke(this, &MockLooperWrapper::clearCallback));
- ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::sleepFor));
+ ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::wait10Ms));
}
int storeCallback(int, int, int, android::Looper_callbackFunc callback, void* data) {
@@ -400,8 +406,10 @@
return 0;
}
- int sleepFor(int timeoutMillis) {
- std::this_thread::sleep_for(std::chrono::milliseconds(timeoutMillis));
+ int wait10Ms(int) {
+ // This is called from a loop in runCmdLooper.
+ // Sleeping for 10ms only to avoid busy looping.
+ std::this_thread::sleep_for(10ms);
return 0;
}
@@ -409,6 +417,55 @@
void* mCallbackData = nullptr;
};
+class MockTimedQueueWrapper : public TimedQueueWrapper {
+public:
+ MOCK_METHOD3(addJob, void(MountId, Milliseconds, Job));
+ MOCK_METHOD1(removeJobs, void(MountId));
+ MOCK_METHOD0(stop, void());
+
+ MockTimedQueueWrapper() {
+ ON_CALL(*this, addJob(_, _, _))
+ .WillByDefault(Invoke(this, &MockTimedQueueWrapper::storeJob));
+ ON_CALL(*this, removeJobs(_)).WillByDefault(Invoke(this, &MockTimedQueueWrapper::clearJob));
+ }
+
+ void storeJob(MountId id, Milliseconds after, Job what) {
+ mId = id;
+ mAfter = after;
+ mWhat = std::move(what);
+ }
+
+ void clearJob(MountId id) {
+ if (mId == id) {
+ mAfter = {};
+ mWhat = {};
+ }
+ }
+
+ MountId mId = -1;
+ Milliseconds mAfter;
+ Job mWhat;
+};
+
+class MockStorageHealthListener : public os::incremental::BnStorageHealthListener {
+public:
+ MOCK_METHOD2(onHealthStatus, binder::Status(int32_t storageId, int32_t status));
+
+ MockStorageHealthListener() {
+ ON_CALL(*this, onHealthStatus(_, _))
+ .WillByDefault(Invoke(this, &MockStorageHealthListener::storeStorageIdAndStatus));
+ }
+
+ binder::Status storeStorageIdAndStatus(int32_t storageId, int32_t status) {
+ mStorageId = storageId;
+ mStatus = status;
+ return binder::Status::ok();
+ }
+
+ int32_t mStorageId = -1;
+ int32_t mStatus = -1;
+};
+
class MockServiceManager : public ServiceManagerWrapper {
public:
MockServiceManager(std::unique_ptr<MockVoldService> vold,
@@ -416,13 +473,15 @@
std::unique_ptr<MockIncFs> incfs,
std::unique_ptr<MockAppOpsManager> appOpsManager,
std::unique_ptr<MockJniWrapper> jni,
- std::unique_ptr<MockLooperWrapper> looper)
+ std::unique_ptr<MockLooperWrapper> looper,
+ std::unique_ptr<MockTimedQueueWrapper> timedQueue)
: mVold(std::move(vold)),
mDataLoaderManager(std::move(dataLoaderManager)),
mIncFs(std::move(incfs)),
mAppOpsManager(std::move(appOpsManager)),
mJni(std::move(jni)),
- mLooper(std::move(looper)) {}
+ mLooper(std::move(looper)),
+ mTimedQueue(std::move(timedQueue)) {}
std::unique_ptr<VoldServiceWrapper> getVoldService() final { return std::move(mVold); }
std::unique_ptr<DataLoaderManagerWrapper> getDataLoaderManager() final {
return std::move(mDataLoaderManager);
@@ -431,6 +490,7 @@
std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final { return std::move(mAppOpsManager); }
std::unique_ptr<JniWrapper> getJni() final { return std::move(mJni); }
std::unique_ptr<LooperWrapper> getLooper() final { return std::move(mLooper); }
+ std::unique_ptr<TimedQueueWrapper> getTimedQueue() final { return std::move(mTimedQueue); }
private:
std::unique_ptr<MockVoldService> mVold;
@@ -439,6 +499,7 @@
std::unique_ptr<MockAppOpsManager> mAppOpsManager;
std::unique_ptr<MockJniWrapper> mJni;
std::unique_ptr<MockLooperWrapper> mLooper;
+ std::unique_ptr<MockTimedQueueWrapper> mTimedQueue;
};
// --- IncrementalServiceTest ---
@@ -460,6 +521,8 @@
mJni = jni.get();
auto looper = std::make_unique<NiceMock<MockLooperWrapper>>();
mLooper = looper.get();
+ auto timedQueue = std::make_unique<NiceMock<MockTimedQueueWrapper>>();
+ mTimedQueue = timedQueue.get();
mIncrementalService =
std::make_unique<IncrementalService>(MockServiceManager(std::move(vold),
std::move(
@@ -467,7 +530,8 @@
std::move(incFs),
std::move(appOps),
std::move(jni),
- std::move(looper)),
+ std::move(looper),
+ std::move(timedQueue)),
mRootDir.path);
mDataLoaderParcel.packageName = "com.test";
mDataLoaderParcel.arguments = "uri";
@@ -503,6 +567,7 @@
NiceMock<MockAppOpsManager>* mAppOpsManager = nullptr;
NiceMock<MockJniWrapper>* mJni = nullptr;
NiceMock<MockLooperWrapper>* mLooper = nullptr;
+ NiceMock<MockTimedQueueWrapper>* mTimedQueue = nullptr;
NiceMock<MockDataLoader>* mDataLoader = nullptr;
std::unique_ptr<IncrementalService> mIncrementalService;
TemporaryDir mRootDir;
@@ -710,6 +775,136 @@
mLooper->mCallback(-1, -1, mLooper->mCallbackData);
}
+TEST_F(IncrementalServiceTest, testStartDataLoaderUnhealthyStorage) {
+ mVold->mountIncFsSuccess();
+ mIncFs->makeFileSuccess();
+ mIncFs->openMountSuccess();
+ mVold->bindMountSuccess();
+ mDataLoaderManager->bindToDataLoaderSuccess();
+ mDataLoaderManager->getDataLoaderSuccess();
+ EXPECT_CALL(*mDataLoaderManager, bindToDataLoader(_, _, _, _)).Times(1);
+ EXPECT_CALL(*mDataLoaderManager, unbindFromDataLoader(_)).Times(1);
+ EXPECT_CALL(*mDataLoader, create(_, _, _, _)).Times(1);
+ EXPECT_CALL(*mDataLoader, start(_)).Times(1);
+ EXPECT_CALL(*mDataLoader, destroy(_)).Times(1);
+ EXPECT_CALL(*mVold, unmountIncFs(_)).Times(2);
+ EXPECT_CALL(*mLooper, addFd(MockIncFs::kPendingReadsFd, _, _, _, _)).Times(2);
+ EXPECT_CALL(*mLooper, removeFd(MockIncFs::kPendingReadsFd)).Times(2);
+ EXPECT_CALL(*mTimedQueue, addJob(_, _, _)).Times(4);
+
+ sp<NiceMock<MockStorageHealthListener>> listener{new NiceMock<MockStorageHealthListener>};
+ NiceMock<MockStorageHealthListener>* listenerMock = listener.get();
+ EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_OK))
+ .Times(2);
+ EXPECT_CALL(*listenerMock,
+ onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING))
+ .Times(1);
+ EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_BLOCKED))
+ .Times(1);
+ EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY))
+ .Times(2);
+
+ StorageHealthCheckParams params;
+ params.blockedTimeoutMs = 10000;
+ params.unhealthyTimeoutMs = 20000;
+ params.unhealthyMonitoringMs = 30000;
+
+ using MS = std::chrono::milliseconds;
+ using MCS = std::chrono::microseconds;
+
+ const auto blockedTimeout = MS(params.blockedTimeoutMs);
+ const auto unhealthyTimeout = MS(params.unhealthyTimeoutMs);
+ const auto unhealthyMonitoring = MS(params.unhealthyMonitoringMs);
+
+ const uint64_t kFirstTimestampUs = 1000000000ll;
+ const uint64_t kBlockedTimestampUs =
+ kFirstTimestampUs - std::chrono::duration_cast<MCS>(blockedTimeout).count();
+ const uint64_t kUnhealthyTimestampUs =
+ kFirstTimestampUs - std::chrono::duration_cast<MCS>(unhealthyTimeout).count();
+
+ TemporaryDir tempDir;
+ int storageId = mIncrementalService->createStorage(tempDir.path, std::move(mDataLoaderParcel),
+ IncrementalService::CreateOptions::CreateNew,
+ {}, std::move(params), listener);
+ ASSERT_GE(storageId, 0);
+
+ // Healthy state, registered for pending reads.
+ ASSERT_NE(nullptr, mLooper->mCallback);
+ ASSERT_NE(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
+
+ // Looper/epoll callback.
+ mIncFs->waitForPendingReadsSuccess(kFirstTimestampUs);
+ mLooper->mCallback(-1, -1, mLooper->mCallbackData);
+
+ // Unregister from pending reads and wait.
+ ASSERT_EQ(nullptr, mLooper->mCallback);
+ ASSERT_EQ(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_READS_PENDING, listener->mStatus);
+ // Timed callback present.
+ ASSERT_EQ(storageId, mTimedQueue->mId);
+ ASSERT_GE(mTimedQueue->mAfter, blockedTimeout);
+ auto timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // Timed job callback for blocked.
+ mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs);
+ timedCallback();
+
+ // Still not registered, and blocked.
+ ASSERT_EQ(nullptr, mLooper->mCallback);
+ ASSERT_EQ(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_BLOCKED, listener->mStatus);
+ // Timed callback present.
+ ASSERT_EQ(storageId, mTimedQueue->mId);
+ ASSERT_GE(mTimedQueue->mAfter, 1000ms);
+ timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // Timed job callback for unhealthy.
+ mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
+ timedCallback();
+
+ // Still not registered, and blocked.
+ ASSERT_EQ(nullptr, mLooper->mCallback);
+ ASSERT_EQ(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
+ // Timed callback present.
+ ASSERT_EQ(storageId, mTimedQueue->mId);
+ ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
+ timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // One more unhealthy.
+ mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
+ timedCallback();
+
+ // Still not registered, and blocked.
+ ASSERT_EQ(nullptr, mLooper->mCallback);
+ ASSERT_EQ(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
+ // Timed callback present.
+ ASSERT_EQ(storageId, mTimedQueue->mId);
+ ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
+ timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // And now healthy.
+ mIncFs->waitForPendingReadsTimeout();
+ timedCallback();
+
+ // Healthy state, registered for pending reads.
+ ASSERT_NE(nullptr, mLooper->mCallback);
+ ASSERT_NE(nullptr, mLooper->mCallbackData);
+ ASSERT_EQ(storageId, listener->mStorageId);
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
+}
+
TEST_F(IncrementalServiceTest, testSetIncFsMountOptionsSuccess) {
mVold->mountIncFsSuccess();
mIncFs->makeFileSuccess();