Introduces an option to set a dump latency requirement.

We are currently dumping invalid data for pulled metrics. Pulled metrics
require a new pull when flushing a bucket. We should either do another
pull or invalidate the previous bucket.

There are cases where we cannot afford to do another pull, e.g. statsd
being killed. If we do not have enough time, we'll just invalidte the
bucket to make sure we have correct data.

Bug: 123866830
Test: atest statsd_test
Change-Id: I090127cace3b7265032ebb2c9bddae976c883771
diff --git a/bin/src/StatsLogProcessor.cpp b/bin/src/StatsLogProcessor.cpp
index 653ef2e..a6699e7 100644
--- a/bin/src/StatsLogProcessor.cpp
+++ b/bin/src/StatsLogProcessor.cpp
@@ -298,7 +298,7 @@
 void StatsLogProcessor::OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
                                         const StatsdConfig& config) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED);
+    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED, NO_TIME_CONSTRAINTS);
     OnConfigUpdatedLocked(timestampNs, key, config);
 }
 
@@ -355,6 +355,7 @@
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      ProtoOutputStream* proto) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
 
@@ -378,8 +379,10 @@
         // Start of ConfigMetricsReport (reports).
         uint64_t reportsToken =
                 proto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS);
-        onConfigMetricsReportLocked(key, dumpTimeStampNs, include_current_partial_bucket,
-                                    erase_data, dumpReportReason, proto);
+        onConfigMetricsReportLocked(key, dumpTimeStampNs,
+                                    include_current_partial_bucket,
+                                    erase_data, dumpReportReason,
+                                    dumpLatency, proto);
         proto->end(reportsToken);
         // End of ConfigMetricsReport (reports).
     } else {
@@ -394,10 +397,11 @@
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      vector<uint8_t>* outData) {
     ProtoOutputStream proto;
     onDumpReport(key, dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                 dumpReportReason, &proto);
+                 dumpReportReason, dumpLatency, &proto);
 
     if (outData != nullptr) {
         outData->clear();
@@ -423,6 +427,7 @@
                                                     const bool include_current_partial_bucket,
                                                     const bool erase_data,
                                                     const DumpReportReason dumpReportReason,
+                                                    const DumpLatency dumpLatency,
                                                     ProtoOutputStream* proto) {
     // We already checked whether key exists in mMetricsManagers in
     // WriteDataToDisk.
@@ -438,7 +443,7 @@
     // First, fill in ConfigMetricsReport using current data on memory, which
     // starts from filling in StatsLogReport's.
     it->second->onDumpReport(dumpTimeStampNs, include_current_partial_bucket,
-                             erase_data, &str_set, proto);
+                             erase_data, dumpLatency, &str_set, proto);
 
     // Fill in UidMap if there is at least one metric to report.
     // This skips the uid map if it's an empty config.
@@ -492,7 +497,7 @@
         }
     }
     if (configKeysTtlExpired.size() > 0) {
-        WriteDataToDiskLocked(CONFIG_RESET);
+        WriteDataToDiskLocked(CONFIG_RESET, NO_TIME_CONSTRAINTS);
         resetConfigsLocked(timestampNs, configKeysTtlExpired);
     }
 }
@@ -501,7 +506,8 @@
     std::lock_guard<std::mutex> lock(mMetricsMutex);
     auto it = mMetricsManagers.find(key);
     if (it != mMetricsManagers.end()) {
-        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED);
+        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED, 
+                              NO_TIME_CONSTRAINTS);
         mMetricsManagers.erase(it);
         mUidMap->OnConfigRemoved(key);
     }
@@ -572,14 +578,15 @@
 
 void StatsLogProcessor::WriteDataToDiskLocked(const ConfigKey& key,
                                               const int64_t timestampNs,
-                                              const DumpReportReason dumpReportReason) {
+                                              const DumpReportReason dumpReportReason,
+                                              const DumpLatency dumpLatency) {
     if (mMetricsManagers.find(key) == mMetricsManagers.end() ||
         !mMetricsManagers.find(key)->second->shouldWriteToDisk()) {
         return;
     }
     ProtoOutputStream proto;
     onConfigMetricsReportLocked(key, timestampNs, true /* include_current_partial_bucket*/,
-                                true /* erase_data */, dumpReportReason, &proto);
+                                true /* erase_data */, dumpReportReason, dumpLatency, &proto);
     string file_name = StringPrintf("%s/%ld_%d_%lld", STATS_DATA_DIR,
          (long)getWallClockSec(), key.GetUid(), (long long)key.GetId());
     android::base::unique_fd fd(open(file_name.c_str(),
@@ -658,7 +665,8 @@
     StorageManager::deleteFile(file_name.c_str());
 }
 
-void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason) {
+void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
+                                              const DumpLatency dumpLatency) {
     const int64_t timeNs = getElapsedRealtimeNs();
     // Do not write to disk if we already have in the last few seconds.
     // This is to avoid overwriting files that would have the same name if we
@@ -671,13 +679,14 @@
     }
     mLastWriteTimeNs = timeNs;
     for (auto& pair : mMetricsManagers) {
-        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason);
+        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason, dumpLatency);
     }
 }
 
-void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason) {
+void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason, 
+                                        const DumpLatency dumpLatency) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    WriteDataToDiskLocked(dumpReportReason);
+    WriteDataToDiskLocked(dumpReportReason, dumpLatency);
 }
 
 void StatsLogProcessor::informPullAlarmFired(const int64_t timestampNs) {
diff --git a/bin/src/StatsLogProcessor.h b/bin/src/StatsLogProcessor.h
index ea9c6e7..e92b897 100644
--- a/bin/src/StatsLogProcessor.h
+++ b/bin/src/StatsLogProcessor.h
@@ -66,10 +66,14 @@
 
     void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket, const bool erase_data,
-                      const DumpReportReason dumpReportReason, vector<uint8_t>* outData);
+                      const DumpReportReason dumpReportReason, 
+                      const DumpLatency dumpLatency,
+                      vector<uint8_t>* outData);
     void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket, const bool erase_data,
-                      const DumpReportReason dumpReportReason, ProtoOutputStream* proto);
+                      const DumpReportReason dumpReportReason,
+                      const DumpLatency dumpLatency,
+                      ProtoOutputStream* proto);
 
     /* Tells MetricsManager that the alarms in alarmSet have fired. Modifies anomaly alarmSet. */
     void onAnomalyAlarmFired(
@@ -82,7 +86,8 @@
             unordered_set<sp<const InternalAlarm>, SpHash<InternalAlarm>> alarmSet);
 
     /* Flushes data to disk. Data on memory will be gone after written to disk. */
-    void WriteDataToDisk(const DumpReportReason dumpReportReason);
+    void WriteDataToDisk(const DumpReportReason dumpReportReason,
+                         const DumpLatency dumpLatency);
 
     /* Persist metric activation status onto disk. */
     void WriteMetricsActivationToDisk(int64_t currentTimeNs);
@@ -153,14 +158,17 @@
 
     void GetActiveConfigsLocked(const int uid, vector<int64_t>& outActiveConfigs);
 
-    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason);
+    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
+                               const DumpLatency dumpLatency);
     void WriteDataToDiskLocked(const ConfigKey& key, const int64_t timestampNs,
-                               const DumpReportReason dumpReportReason);
+                               const DumpReportReason dumpReportReason,
+                               const DumpLatency dumpLatency);
 
     void onConfigMetricsReportLocked(const ConfigKey& key, const int64_t dumpTimeStampNs,
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      util::ProtoOutputStream* proto);
 
     /* Check if we should send a broadcast if approaching memory limits and if we're over, we
diff --git a/bin/src/StatsService.cpp b/bin/src/StatsService.cpp
index c542b62..4deb8bd 100644
--- a/bin/src/StatsService.cpp
+++ b/bin/src/StatsService.cpp
@@ -313,7 +313,9 @@
                 proto.start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS_LIST);
         mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(),
                                  true /* includeCurrentBucket */, false /* erase_data */,
-                                 ADB_DUMP, &proto);
+                                 ADB_DUMP,
+                                 FAST,
+                                 &proto);
         proto.end(reportsListToken);
         proto.flush(out);
         proto.clear();
@@ -694,7 +696,9 @@
         if (good) {
             vector<uint8_t> data;
             mProcessor->onDumpReport(ConfigKey(uid, StrToInt64(name)), getElapsedRealtimeNs(),
-                                     includeCurrentBucket, eraseData, ADB_DUMP, &data);
+                                     includeCurrentBucket, eraseData, ADB_DUMP,
+                                     NO_TIME_CONSTRAINTS,
+                                     &data);
             if (proto) {
                 for (size_t i = 0; i < data.size(); i ++) {
                     dprintf(out, "%c", data[i]);
@@ -758,7 +762,7 @@
 
 status_t StatsService::cmd_write_data_to_disk(int out) {
     dprintf(out, "Writing data to disk\n");
-    mProcessor->WriteDataToDisk(ADB_DUMP);
+    mProcessor->WriteDataToDisk(ADB_DUMP, NO_TIME_CONSTRAINTS);
     return NO_ERROR;
 }
 
@@ -958,7 +962,7 @@
 Status StatsService::informDeviceShutdown() {
     ENFORCE_UID(AID_SYSTEM);
     VLOG("StatsService::informDeviceShutdown");
-    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN);
+    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN, FAST);
     mProcessor->WriteMetricsActivationToDisk(getElapsedRealtimeNs());
     return Status::ok();
 }
@@ -1000,7 +1004,7 @@
 void StatsService::Terminate() {
     ALOGI("StatsService::Terminating");
     if (mProcessor != nullptr) {
-        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED);
+        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED, FAST);
     }
 }
 
@@ -1017,8 +1021,10 @@
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(), ipc->getCallingUid());
     ConfigKey configKey(ipc->getCallingUid(), key);
+    // The dump latency does not matter here since we do not include the current bucket, we do not
+    // need to pull any new data anyhow.
     mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(), false /* include_current_bucket*/,
-                             true /* erase_data */, GET_DATA_CALLED, output);
+                             true /* erase_data */, GET_DATA_CALLED, FAST, output);
     return Status::ok();
 }
 
@@ -1312,7 +1318,7 @@
     StatsdStats::getInstance().noteSystemServerRestart(getWallClockSec());
     if (mProcessor != nullptr) {
         ALOGW("Reset statsd upon system server restarts.");
-        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED);
+        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED, FAST);
         mProcessor->resetConfigs();
     }
     mAnomalyAlarmMonitor->setStatsCompanionService(nullptr);
diff --git a/bin/src/metrics/CountMetricProducer.cpp b/bin/src/metrics/CountMetricProducer.cpp
index e84f88d..96d1447 100644
--- a/bin/src/metrics/CountMetricProducer.cpp
+++ b/bin/src/metrics/CountMetricProducer.cpp
@@ -139,13 +139,13 @@
 
 
 void CountMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
-    flushIfNeededLocked(dumpTimeNs);
     mPastBuckets.clear();
 }
 
 void CountMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     if (include_current_partial_bucket) {
@@ -319,7 +319,7 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
+    flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     // Setup the bucket start time and number.
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
     mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
@@ -328,7 +328,8 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
-void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
     CountBucket info;
     info.mBucketStartNs = mCurrentBucketStartTimeNs;
diff --git a/bin/src/metrics/CountMetricProducer.h b/bin/src/metrics/CountMetricProducer.h
index 1ac4426..b4a910c 100644
--- a/bin/src/metrics/CountMetricProducer.h
+++ b/bin/src/metrics/CountMetricProducer.h
@@ -57,6 +57,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
 
@@ -78,7 +79,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& newEventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     std::unordered_map<MetricDimensionKey, std::vector<CountBucket>> mPastBuckets;
 
diff --git a/bin/src/metrics/DurationMetricProducer.cpp b/bin/src/metrics/DurationMetricProducer.cpp
index da6b97c..5e4594b 100644
--- a/bin/src/metrics/DurationMetricProducer.cpp
+++ b/bin/src/metrics/DurationMetricProducer.cpp
@@ -456,6 +456,7 @@
 void DurationMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                                 const bool include_current_partial_bucket,
                                                 const bool erase_data,
+                                                const DumpLatency dumpLatency,
                                                 std::set<string> *str_set,
                                                 ProtoOutputStream* protoOutput) {
     if (include_current_partial_bucket) {
@@ -581,7 +582,8 @@
     mCurrentBucketNum += numBucketsForward;
 }
 
-void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                      const int64_t& nextBucketStartTimeNs) {
     for (auto whatIt = mCurrentSlicedDurationTrackerMap.begin();
             whatIt != mCurrentSlicedDurationTrackerMap.end();) {
         for (auto it = whatIt->second.begin(); it != whatIt->second.end();) {
diff --git a/bin/src/metrics/DurationMetricProducer.h b/bin/src/metrics/DurationMetricProducer.h
index ec561b5..f711df2 100644
--- a/bin/src/metrics/DurationMetricProducer.h
+++ b/bin/src/metrics/DurationMetricProducer.h
@@ -64,6 +64,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
 
@@ -88,7 +89,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime);
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     const DurationMetric_AggregationType mAggregationType;
 
diff --git a/bin/src/metrics/EventMetricProducer.cpp b/bin/src/metrics/EventMetricProducer.cpp
index 3b4af65..5435c84 100644
--- a/bin/src/metrics/EventMetricProducer.cpp
+++ b/bin/src/metrics/EventMetricProducer.cpp
@@ -108,6 +108,7 @@
 void EventMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
diff --git a/bin/src/metrics/EventMetricProducer.h b/bin/src/metrics/EventMetricProducer.h
index 96adfdd..74e6bc8 100644
--- a/bin/src/metrics/EventMetricProducer.h
+++ b/bin/src/metrics/EventMetricProducer.h
@@ -48,6 +48,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
diff --git a/bin/src/metrics/GaugeMetricProducer.cpp b/bin/src/metrics/GaugeMetricProducer.cpp
index 6301793..7b001b3 100644
--- a/bin/src/metrics/GaugeMetricProducer.cpp
+++ b/bin/src/metrics/GaugeMetricProducer.cpp
@@ -184,6 +184,7 @@
 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     VLOG("Gauge metric %lld report now...", (long long)mMetricId);
@@ -528,7 +529,7 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
+    flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
 
     // Adjusts the bucket start and end times.
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
@@ -538,7 +539,8 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
-void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
 
     GaugeBucket info;
diff --git a/bin/src/metrics/GaugeMetricProducer.h b/bin/src/metrics/GaugeMetricProducer.h
index 64a1833..9b99fb1 100644
--- a/bin/src/metrics/GaugeMetricProducer.h
+++ b/bin/src/metrics/GaugeMetricProducer.h
@@ -82,8 +82,7 @@
             // Flush full buckets on the normal path up to the latest bucket boundary.
             flushIfNeededLocked(eventTimeNs);
         }
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
         if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
             pullAndMatchEventsLocked(eventTimeNs);
         }
@@ -99,6 +98,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
@@ -119,7 +119,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     void pullAndMatchEventsLocked(const int64_t timestampNs);
 
diff --git a/bin/src/metrics/MetricProducer.h b/bin/src/metrics/MetricProducer.h
index 8ab3b06..99cb5d4 100644
--- a/bin/src/metrics/MetricProducer.h
+++ b/bin/src/metrics/MetricProducer.h
@@ -46,6 +46,16 @@
     kActiveOnBoot = 2,
 };
 
+enum DumpLatency {
+    // In some cases, we only have a short time range to do the dump, e.g. statsd is being killed.
+    // We might be able to return all the data in this mode. For instance, pull metrics might need
+    // to be pulled when the current bucket is requested.
+    FAST = 1,
+    // In other cases, it is fine for a dump to take more than a few milliseconds, e.g. config
+    // updates.
+    NO_TIME_CONSTRAINTS = 2
+};
+
 // A MetricProducer is responsible for compute one single metrics, creating stats log report, and
 // writing the report to dropbox. MetricProducers should respond to package changes as required in
 // PackageInfoListener, but if none of the metrics are slicing by package name, then the update can
@@ -87,8 +97,7 @@
             flushIfNeededLocked(eventTimeNs);
         }
         // Now flush a partial bucket.
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
         // Don't update the current bucket number so that the anomaly tracker knows this bucket
         // is a partial bucket and can merge it with the previous bucket.
     };
@@ -135,11 +144,12 @@
     void onDumpReport(const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket,
                       const bool erase_data,
+                      const DumpLatency dumpLatency,
                       std::set<string> *str_set,
                       android::util::ProtoOutputStream* protoOutput) {
         std::lock_guard<std::mutex> lock(mMutex);
         return onDumpReportLocked(dumpTimeNs, include_current_partial_bucket, erase_data,
-                str_set, protoOutput);
+                dumpLatency, str_set, protoOutput);
     }
 
     void clearPastBuckets(const int64_t dumpTimeNs) {
@@ -239,6 +249,7 @@
     virtual void onDumpReportLocked(const int64_t dumpTimeNs,
                                     const bool include_current_partial_bucket,
                                     const bool erase_data,
+                                    const DumpLatency dumpLatency,
                                     std::set<string> *str_set,
                                     android::util::ProtoOutputStream* protoOutput) = 0;
     virtual void clearPastBucketsLocked(const int64_t dumpTimeNs) = 0;
@@ -270,7 +281,7 @@
      */
     virtual void flushLocked(const int64_t& eventTimeNs) {
         flushIfNeededLocked(eventTimeNs);
-        flushCurrentBucketLocked(eventTimeNs);
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
     /**
@@ -283,7 +294,8 @@
      * flushIfNeededLocked or the app upgrade handler; the caller MUST update the bucket timestamp
      * and bucket number as needed.
      */
-    virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs){};
+    virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                          const int64_t& nextBucketStartTimeNs) {};
 
     // Convenience to compute the current bucket's end time, which is always aligned with the
     // start time of the metric.
diff --git a/bin/src/metrics/MetricsManager.cpp b/bin/src/metrics/MetricsManager.cpp
index 4851a8d..4b3bfd3 100644
--- a/bin/src/metrics/MetricsManager.cpp
+++ b/bin/src/metrics/MetricsManager.cpp
@@ -217,6 +217,7 @@
 void MetricsManager::onDumpReport(const int64_t dumpTimeStampNs,
                                   const bool include_current_partial_bucket,
                                   const bool erase_data,
+                                  const DumpLatency dumpLatency,
                                   std::set<string> *str_set,
                                   ProtoOutputStream* protoOutput) {
     VLOG("=========================Metric Reports Start==========================");
@@ -227,10 +228,10 @@
                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS);
             if (mHashStringsInReport) {
                 producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                                       str_set, protoOutput);
+                                       dumpLatency, str_set, protoOutput);
             } else {
                 producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                                       nullptr, protoOutput);
+                                       dumpLatency, nullptr, protoOutput);
             }
             protoOutput->end(token);
         } else {
diff --git a/bin/src/metrics/MetricsManager.h b/bin/src/metrics/MetricsManager.h
index eab1f76..3904460 100644
--- a/bin/src/metrics/MetricsManager.h
+++ b/bin/src/metrics/MetricsManager.h
@@ -121,6 +121,7 @@
     virtual void onDumpReport(const int64_t dumpTimeNs,
                               const bool include_current_partial_bucket,
                               const bool erase_data,
+                              const DumpLatency dumpLatency,
                               std::set<string> *str_set,
                               android::util::ProtoOutputStream* protoOutput);
 
diff --git a/bin/src/metrics/ValueMetricProducer.cpp b/bin/src/metrics/ValueMetricProducer.cpp
index 3cf378d..e94b75c 100644
--- a/bin/src/metrics/ValueMetricProducer.cpp
+++ b/bin/src/metrics/ValueMetricProducer.cpp
@@ -188,13 +188,28 @@
 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     VLOG("metric %lld dump report now...", (long long)mMetricId);
+    flushIfNeededLocked(dumpTimeNs);
     if (include_current_partial_bucket) {
-        flushLocked(dumpTimeNs);
-    } else {
-        flushIfNeededLocked(dumpTimeNs);
+        // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
+        // current bucket will have incomplete data and the next will have the wrong snapshot to do
+        // a diff against. If the condition is false, we are fine since the base data is reset and
+        // we are not tracking anything.
+        bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
+        if (pullNeeded) {
+            switch (dumpLatency) {
+                case FAST:
+                    invalidateCurrentBucket();
+                    break;
+                case NO_TIME_CONSTRAINTS:
+                    pullAndMatchEventsLocked(dumpTimeNs);
+                    break;
+            }
+        } 
+        flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
     }
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
@@ -712,23 +727,21 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
-
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
-    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
-    mCurrentBucketNum += numBucketsForward;
+    int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
+    flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
 
+    mCurrentBucketNum += numBucketsForward;
     if (numBucketsForward > 1) {
         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
         // take base again in future good bucket.
         resetBase();
     }
-    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
-         (long long)mCurrentBucketStartTimeNs);
 }
 
-void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     if (mCondition == ConditionState::kUnknown) {
         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
     }
@@ -758,6 +771,9 @@
     }
     initCurrentSlicedBucket();
     mCurrentBucketIsInvalid = false;
+    mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
+    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
+         (long long)mCurrentBucketStartTimeNs);
 }
 
 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
diff --git a/bin/src/metrics/ValueMetricProducer.h b/bin/src/metrics/ValueMetricProducer.h
index f26ad85..696d4fa 100644
--- a/bin/src/metrics/ValueMetricProducer.h
+++ b/bin/src/metrics/ValueMetricProducer.h
@@ -65,8 +65,7 @@
         if (mIsPulled && mCondition) {
             pullAndMatchEventsLocked(eventTimeNs - 1);
         }
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
 protected:
@@ -79,6 +78,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
@@ -97,7 +97,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     void dropDataLocked(const int64_t dropTimeNs) override;
 
diff --git a/bin/tests/StatsLogProcessor_test.cpp b/bin/tests/StatsLogProcessor_test.cpp
index 4579ca6..88aa180 100644
--- a/bin/tests/StatsLogProcessor_test.cpp
+++ b/bin/tests/StatsLogProcessor_test.cpp
@@ -173,7 +173,7 @@
 
     // Expect to get no metrics, but snapshot specified above in uidmap.
     vector<uint8_t> bytes;
-    p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes);
+    p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes);
 
     ConfigMetricsReportList output;
     output.ParseFromArray(bytes.data(), bytes.size());
@@ -204,7 +204,7 @@
 
     // Expect to get no metrics, but snapshot specified above in uidmap.
     vector<uint8_t> bytes;
-    p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes);
+    p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes);
 
     ConfigMetricsReportList output;
     output.ParseFromArray(bytes.data(), bytes.size());
@@ -235,7 +235,7 @@
 
     // Expect to get no metrics, but snapshot specified above in uidmap.
     vector<uint8_t> bytes;
-    p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes);
+    p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes);
 
     ConfigMetricsReportList output;
     output.ParseFromArray(bytes.data(), bytes.size());
@@ -269,21 +269,21 @@
     ConfigMetricsReportList output;
 
     // Dump report WITHOUT erasing data.
-    processor->onDumpReport(cfgKey, 3, true, false /* Do NOT erase data. */, ADB_DUMP, &bytes);
+    processor->onDumpReport(cfgKey, 3, true, false /* Do NOT erase data. */, ADB_DUMP, FAST, &bytes);
     output.ParseFromArray(bytes.data(), bytes.size());
     EXPECT_EQ(output.reports_size(), 1);
     EXPECT_EQ(output.reports(0).metrics_size(), 1);
     EXPECT_EQ(output.reports(0).metrics(0).count_metrics().data_size(), 1);
 
     // Dump report WITH erasing data. There should be data since we didn't previously erase it.
-    processor->onDumpReport(cfgKey, 4, true, true /* DO erase data. */, ADB_DUMP, &bytes);
+    processor->onDumpReport(cfgKey, 4, true, true /* DO erase data. */, ADB_DUMP, FAST, &bytes);
     output.ParseFromArray(bytes.data(), bytes.size());
     EXPECT_EQ(output.reports_size(), 1);
     EXPECT_EQ(output.reports(0).metrics_size(), 1);
     EXPECT_EQ(output.reports(0).metrics(0).count_metrics().data_size(), 1);
 
     // Dump report again. There should be no data since we erased it.
-    processor->onDumpReport(cfgKey, 5, true, true /* DO erase data. */, ADB_DUMP, &bytes);
+    processor->onDumpReport(cfgKey, 5, true, true /* DO erase data. */, ADB_DUMP, FAST, &bytes);
     output.ParseFromArray(bytes.data(), bytes.size());
     // We don't care whether statsd has a report, as long as it has no count metrics in it.
     bool noData = output.reports_size() == 0
diff --git a/bin/tests/e2e/Attribution_e2e_test.cpp b/bin/tests/e2e/Attribution_e2e_test.cpp
index a9841c9..3382525 100644
--- a/bin/tests/e2e/Attribution_e2e_test.cpp
+++ b/bin/tests/e2e/Attribution_e2e_test.cpp
@@ -147,7 +147,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 4 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -295,7 +295,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 4 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp b/bin/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp
index a8914da..e4186b7 100644
--- a/bin/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp
+++ b/bin/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp
@@ -212,7 +212,7 @@
                 ConfigMetricsReportList reports;
                 vector<uint8_t> buffer;
                 processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false,
-                                        true, ADB_DUMP, &buffer);
+                                        true, ADB_DUMP, FAST, &buffer);
                 EXPECT_TRUE(buffer.size() > 0);
                 EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
                 backfillDimensionPath(&reports);
@@ -548,7 +548,7 @@
             ConfigMetricsReportList reports;
             vector<uint8_t> buffer;
             processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false,
-                                    true, ADB_DUMP, &buffer);
+                                    true, ADB_DUMP, FAST, &buffer);
             EXPECT_TRUE(buffer.size() > 0);
             EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
             backfillDimensionPath(&reports);
@@ -798,7 +798,7 @@
             ConfigMetricsReportList reports;
             vector<uint8_t> buffer;
             processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false,
-                                    true, ADB_DUMP, &buffer);
+                                    true, ADB_DUMP, FAST, &buffer);
             EXPECT_TRUE(buffer.size() > 0);
             EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
             backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp b/bin/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp
index 621b6ed..f3ecd56 100644
--- a/bin/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp
+++ b/bin/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp
@@ -131,7 +131,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -347,7 +347,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -531,7 +531,7 @@
         ConfigMetricsReportList reports;
         vector<uint8_t> buffer;
         processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                                ADB_DUMP, &buffer);
+                                ADB_DUMP, FAST, &buffer);
         EXPECT_TRUE(buffer.size() > 0);
         EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
         backfillDimensionPath(&reports);
@@ -733,7 +733,7 @@
         ConfigMetricsReportList reports;
         vector<uint8_t> buffer;
         processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                                ADB_DUMP, &buffer);
+                                ADB_DUMP, FAST, &buffer);
         EXPECT_TRUE(buffer.size() > 0);
         EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
         backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp b/bin/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp
index 9f8acaf..489bb0b 100644
--- a/bin/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp
+++ b/bin/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp
@@ -143,7 +143,7 @@
             ConfigMetricsReportList reports;
             vector<uint8_t> buffer;
             processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false,
-                                    true, ADB_DUMP, &buffer);
+                                    true, ADB_DUMP, FAST, &buffer);
             EXPECT_TRUE(buffer.size() > 0);
             EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
             backfillDimensionPath(&reports);
@@ -438,7 +438,7 @@
             ConfigMetricsReportList reports;
             vector<uint8_t> buffer;
             processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false,
-                                    true, ADB_DUMP, &buffer);
+                                    true, ADB_DUMP, FAST, &buffer);
             EXPECT_TRUE(buffer.size() > 0);
             EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
             backfillDimensionPath(&reports);
@@ -659,7 +659,7 @@
         ConfigMetricsReportList reports;
         vector<uint8_t> buffer;
         processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                                ADB_DUMP, &buffer);
+                                ADB_DUMP, FAST, &buffer);
         EXPECT_TRUE(buffer.size() > 0);
         EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
         backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp b/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
index 24a9980..946eccf 100644
--- a/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
+++ b/bin/tests/e2e/GaugeMetric_e2e_pull_test.cpp
@@ -125,7 +125,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -248,7 +248,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, configAddedTimeNs + 8 * bucketSizeNs + 10, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -352,7 +352,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/GaugeMetric_e2e_push_test.cpp b/bin/tests/e2e/GaugeMetric_e2e_push_test.cpp
index 3af8212..cd80310 100644
--- a/bin/tests/e2e/GaugeMetric_e2e_push_test.cpp
+++ b/bin/tests/e2e/GaugeMetric_e2e_push_test.cpp
@@ -150,7 +150,7 @@
         ConfigMetricsReportList reports;
         vector<uint8_t> buffer;
         processor->onDumpReport(cfgKey, bucketStartTimeNs + 3 * bucketSizeNs, false, true,
-                                ADB_DUMP, &buffer);
+                                ADB_DUMP, FAST, &buffer);
         EXPECT_TRUE(buffer.size() > 0);
         EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
         backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/MetricActivation_e2e_test.cpp b/bin/tests/e2e/MetricActivation_e2e_test.cpp
index 85d8a56..7fb43f8 100644
--- a/bin/tests/e2e/MetricActivation_e2e_test.cpp
+++ b/bin/tests/e2e/MetricActivation_e2e_test.cpp
@@ -211,7 +211,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor.onDumpReport(cfgKey, bucketStartTimeNs + NS_PER_SEC * 60 * 15 + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/MetricConditionLink_e2e_test.cpp b/bin/tests/e2e/MetricConditionLink_e2e_test.cpp
index 9349c85..78fb391 100644
--- a/bin/tests/e2e/MetricConditionLink_e2e_test.cpp
+++ b/bin/tests/e2e/MetricConditionLink_e2e_test.cpp
@@ -200,7 +200,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -319,7 +319,7 @@
     vector<uint8_t> buffer;
 
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/PartialBucket_e2e_test.cpp b/bin/tests/e2e/PartialBucket_e2e_test.cpp
index aee0c1f..ef3643f 100644
--- a/bin/tests/e2e/PartialBucket_e2e_test.cpp
+++ b/bin/tests/e2e/PartialBucket_e2e_test.cpp
@@ -46,7 +46,7 @@
     IPCThreadState* ipc = IPCThreadState::self();
     ConfigKey configKey(ipc->getCallingUid(), kConfigKey);
     processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/,
-                            true /* erase_data */, ADB_DUMP, &output);
+                            true /* erase_data */, ADB_DUMP, FAST, &output);
     ConfigMetricsReportList reports;
     reports.ParseFromArray(output.data(), output.size());
     EXPECT_EQ(1, reports.reports_size());
diff --git a/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp b/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
index f3c4e12..cdb5a78 100644
--- a/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/bin/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -121,7 +121,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -228,7 +228,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, configAddedTimeNs + 9 * bucketSizeNs + 10, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/e2e/WakelockDuration_e2e_test.cpp b/bin/tests/e2e/WakelockDuration_e2e_test.cpp
index 16be3d7..e13bf14 100644
--- a/bin/tests/e2e/WakelockDuration_e2e_test.cpp
+++ b/bin/tests/e2e/WakelockDuration_e2e_test.cpp
@@ -128,7 +128,7 @@
     vector<uint8_t> buffer;
     ConfigMetricsReportList reports;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -165,7 +165,7 @@
     vector<uint8_t> buffer;
     ConfigMetricsReportList reports;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -216,7 +216,7 @@
     }
 
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 6 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -249,7 +249,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
 
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
@@ -279,7 +279,7 @@
     ConfigMetricsReportList reports;
     vector<uint8_t> buffer;
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
@@ -325,7 +325,7 @@
     }
 
     processor->onDumpReport(cfgKey, bucketStartTimeNs + 6 * bucketSizeNs + 1, false, true,
-                            ADB_DUMP, &buffer);
+                            ADB_DUMP, FAST, &buffer);
     EXPECT_TRUE(buffer.size() > 0);
     EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size()));
     backfillDimensionPath(&reports);
diff --git a/bin/tests/metrics/ValueMetricProducer_test.cpp b/bin/tests/metrics/ValueMetricProducer_test.cpp
index 7e7ffed..a9d2c88 100644
--- a/bin/tests/metrics/ValueMetricProducer_test.cpp
+++ b/bin/tests/metrics/ValueMetricProducer_test.cpp
@@ -2797,7 +2797,6 @@
     EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value);
 }
 
-
 TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries) {
     ValueMetric metric;
     metric.set_id(metricId);
@@ -2864,6 +2863,187 @@
     EXPECT_EQ(true, valueProducer.mHasGlobalBase);
 }
 
+static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
+    vector<uint8_t> bytes;
+    bytes.resize(proto->size());
+    size_t pos = 0;
+    auto iter = proto->data();
+    while (iter.readBuffer() != NULL) {
+        size_t toRead = iter.currentToRead();
+        std::memcpy(&((bytes)[pos]), iter.readBuffer(), toRead);
+        pos += toRead;
+        iter.rp()->move(toRead);
+    }
+
+    StatsLogReport report;
+    report.ParseFromArray(bytes.data(), bytes.size());
+    return report;
+}
+
+TEST(ValueMetricProducerTest, TestPullNeededFastDump) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Initial pull.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(1);
+                event->write(1);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer.onDumpReport(bucketStartTimeNs + 10, 
+                               true /* include recent buckets */, true,
+                               FAST, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    // Bucket is invalid since we did not pull when dump report was called.
+    EXPECT_EQ(0, report.value_metrics().data_size());
+}
+
+TEST(ValueMetricProducerTest, TestFastDumpWithoutCurrentBucket) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Initial pull.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(1);
+                event->write(1);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(tagId);
+    event->write(2);
+    event->write(2);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer.onDumpReport(bucket4StartTimeNs, 
+                               false /* include recent buckets */, true,
+                               FAST, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    // Previous bucket is part of the report.
+    EXPECT_EQ(1, report.value_metrics().data_size());
+    EXPECT_EQ(0, report.value_metrics().data(0).bucket_info(0).bucket_num());
+}
+
+TEST(ValueMetricProducerTest, TestPullNeededNoTimeConstraints) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Initial pull.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(1);
+                event->write(1);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write(tagId);
+                event->write(3);
+                event->write(3);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer.onDumpReport(bucketStartTimeNs + 10, 
+                               true /* include recent buckets */, true,
+                               NO_TIME_CONSTRAINTS, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_EQ(1, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().data(0).bucket_info_size());
+    EXPECT_EQ(2, report.value_metrics().data(0).bucket_info(0).values(0).value_long());
+}
+
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android