[incfs] Allow multiple timed jobs at the same time point

Old code had a tiny chance of ignoring a job if it happens to
be scheduled to the exactly the same time as one already in the
queue. Not that it will ever happen, but better to fix it.

+ make the worker thread code slightly easier to reason about

Bug: 183243150
Test: atest IncrementalServiceTest
Change-Id: Ia3126d30e19edfd17f7c8da368e9763ca5501e84
diff --git a/services/incremental/ServiceWrappers.cpp b/services/incremental/ServiceWrappers.cpp
index 7e85f9d..3465499 100644
--- a/services/incremental/ServiceWrappers.cpp
+++ b/services/incremental/ServiceWrappers.cpp
@@ -255,7 +255,7 @@
 
 static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
 
-class RealTimedQueueWrapper : public TimedQueueWrapper {
+class RealTimedQueueWrapper final : public TimedQueueWrapper {
 public:
     RealTimedQueueWrapper(JavaVM* jvm) {
         mThread = std::thread([this, jvm]() {
@@ -268,11 +268,11 @@
         CHECK(!mThread.joinable()) << "call stop first";
     }
 
-    void addJob(MountId id, Milliseconds after, Job what) final {
+    void addJob(MountId id, Milliseconds timeout, Job what) final {
         const auto now = Clock::now();
         {
             std::unique_lock lock(mMutex);
-            mJobs.insert(TimedJob{id, now + after, std::move(what)});
+            mJobs.insert(TimedJob{id, now + timeout, std::move(what)});
         }
         mCondition.notify_all();
     }
@@ -293,29 +293,28 @@
 private:
     void runTimers() {
         static constexpr TimePoint kInfinityTs{Clock::duration::max()};
-        TimePoint nextJobTs = kInfinityTs;
         std::unique_lock lock(mMutex);
         for (;;) {
-            mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
+            const TimePoint nextJobTs = mJobs.empty() ? kInfinityTs : mJobs.begin()->when;
+            mCondition.wait_until(lock, nextJobTs, [this, oldNextJobTs = nextJobTs]() {
                 const auto now = Clock::now();
-                const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
-                return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
+                const auto newFirstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
+                return newFirstJobTs <= now || newFirstJobTs < oldNextJobTs || !mRunning;
             });
             if (!mRunning) {
                 return;
             }
 
             const auto now = Clock::now();
-            auto it = mJobs.begin();
-            // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
-            for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
+            // Always re-acquire begin(). We can't use it after unlock as mTimedJobs can change.
+            for (auto it = mJobs.begin(); it != mJobs.end() && it->when <= now;
+                 it = mJobs.begin()) {
                 auto jobNode = mJobs.extract(it);
 
                 lock.unlock();
                 jobNode.value().what();
                 lock.lock();
             }
-            nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
         }
     }
 
@@ -328,7 +327,7 @@
         }
     };
     bool mRunning = true;
-    std::set<TimedJob> mJobs;
+    std::multiset<TimedJob> mJobs;
     std::condition_variable mCondition;
     std::mutex mMutex;
     std::thread mThread;