| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define STATSD_DEBUG false // STOPSHIP if true |
| #include "Log.h" |
| |
| #include "NumericValueMetricProducer.h" |
| |
| #include <stdlib.h> |
| |
| #include <algorithm> |
| |
| #include "FieldValue.h" |
| #include "guardrail/StatsdStats.h" |
| #include "metrics/HistogramValue.h" |
| #include "metrics/NumericValue.h" |
| #include "metrics/parsing_utils/metrics_manager_util.h" |
| #include "stats_log_util.h" |
| |
| using android::util::FIELD_COUNT_REPEATED; |
| using android::util::FIELD_TYPE_BOOL; |
| using android::util::FIELD_TYPE_DOUBLE; |
| using android::util::FIELD_TYPE_INT32; |
| using android::util::FIELD_TYPE_INT64; |
| using android::util::FIELD_TYPE_MESSAGE; |
| using android::util::FIELD_TYPE_STRING; |
| using android::util::ProtoOutputStream; |
| using std::shared_ptr; |
| using std::string; |
| using std::unordered_map; |
| |
| namespace android { |
| namespace os { |
| namespace statsd { |
| |
| namespace { // anonymous namespace |
| // for StatsLogReport |
| const int FIELD_ID_VALUE_METRICS = 7; |
| // for ValueBucketInfo |
| const int FIELD_ID_VALUE_INDEX = 1; |
| const int FIELD_ID_VALUE_LONG = 2; |
| const int FIELD_ID_VALUE_DOUBLE = 3; |
| const int FIELD_ID_VALUE_HISTOGRAM = 5; |
| const int FIELD_ID_VALUE_SAMPLESIZE = 4; |
| const int FIELD_ID_VALUES = 9; |
| const int FIELD_ID_BUCKET_NUM = 4; |
| const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; |
| const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; |
| const int FIELD_ID_CONDITION_TRUE_NS = 10; |
| const int FIELD_ID_CONDITION_CORRECTION_NS = 11; |
| |
| const NumericValue ZERO_LONG((int64_t)0); |
| const NumericValue ZERO_DOUBLE((double)0); |
| |
| double toDouble(const NumericValue& value) { |
| return value.is<int64_t>() ? value.getValue<int64_t>() : value.getValueOrDefault<double>(0); |
| } |
| |
| } // anonymous namespace |
| |
| // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently |
| NumericValueMetricProducer::NumericValueMetricProducer( |
| const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash, |
| const PullOptions& pullOptions, const BucketOptions& bucketOptions, |
| const WhatOptions& whatOptions, const ConditionOptions& conditionOptions, |
| const StateOptions& stateOptions, const ActivationOptions& activationOptions, |
| const GuardrailOptions& guardrailOptions, |
| const wp<ConfigMetadataProvider> configMetadataProvider) |
| : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions, |
| conditionOptions, stateOptions, activationOptions, guardrailOptions, |
| configMetadataProvider), |
| mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), |
| mAggregationTypes(whatOptions.aggregationTypes), |
| mIncludeSampleSize(metric.has_include_sample_size() |
| ? metric.include_sample_size() |
| : hasAvgAggregationType(whatOptions.aggregationTypes)), |
| mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()), |
| mValueDirection(metric.value_direction()), |
| mSkipZeroDiffOutput(metric.skip_zero_diff_output()), |
| mUseZeroDefaultBase(metric.use_zero_default_base()), |
| mHasGlobalBase(false), |
| mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC |
| : StatsdStats::kPullMaxDelayNs), |
| mDedupedFieldMatchers(dedupFieldMatchers(whatOptions.fieldMatchers)), |
| mBinStartsList(whatOptions.binStartsList) { |
| // TODO(b/186677791): Use initializer list to initialize mUploadThreshold. |
| if (metric.has_threshold()) { |
| mUploadThreshold = metric.threshold(); |
| } |
| } |
| |
| void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, |
| const BucketDropReason reason) { |
| ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason); |
| |
| switch (reason) { |
| case BucketDropReason::DUMP_REPORT_REQUESTED: |
| case BucketDropReason::EVENT_IN_WRONG_BUCKET: |
| case BucketDropReason::CONDITION_UNKNOWN: |
| case BucketDropReason::PULL_FAILED: |
| case BucketDropReason::PULL_DELAYED: |
| case BucketDropReason::DIMENSION_GUARDRAIL_REACHED: |
| resetBase(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void NumericValueMetricProducer::resetBase() { |
| for (auto& [_, dimInfo] : mDimInfos) { |
| for (NumericValue& base : dimInfo.dimExtras) { |
| base.reset(); |
| } |
| } |
| mHasGlobalBase = false; |
| } |
| |
| void NumericValueMetricProducer::writePastBucketAggregateToProto( |
| const int aggIndex, const NumericValue& value, const int sampleSize, |
| ProtoOutputStream* const protoOutput) const { |
| uint64_t valueToken = |
| protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); |
| protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex); |
| if (mIncludeSampleSize) { |
| protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize); |
| } |
| if (value.is<int64_t>()) { |
| const int64_t val = value.getValue<int64_t>(); |
| protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)val); |
| VLOG("\t\t value %d: %lld", aggIndex, (long long)val); |
| } else if (value.is<double>()) { |
| const double val = value.getValue<double>(); |
| protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, val); |
| VLOG("\t\t value %d: %.2f", aggIndex, val); |
| } else if (value.is<HistogramValue>()) { |
| const HistogramValue& val = value.getValue<HistogramValue>(); |
| const uint64_t histToken = |
| protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_HISTOGRAM); |
| val.toProto(*protoOutput); |
| protoOutput->end(histToken); |
| VLOG("\t\t value %d: %s", aggIndex, val.toString().c_str()); |
| } else { |
| VLOG("Wrong value type for ValueMetric output"); |
| } |
| protoOutput->end(valueToken); |
| } |
| |
| void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs, |
| const bool isActive) { |
| // When active state changes from true to false for pulled metric, clear diff base but don't |
| // reset other counters as we may accumulate more value in the bucket. |
| if (mUseDiff && !isActive) { |
| resetBase(); |
| } |
| } |
| |
| // Only called when mIsActive and the event is NOT too late. |
| void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition, |
| const ConditionState newCondition, |
| const int64_t eventTimeNs) { |
| // For metrics that use diff, when condition changes from true to false, |
| // clear diff base but don't reset other counts because we may accumulate |
| // more value in the bucket. |
| if (mUseDiff && |
| (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) { |
| resetBase(); |
| } |
| } |
| |
| void NumericValueMetricProducer::prepareFirstBucketLocked() { |
| // Kicks off the puller immediately if condition is true and diff based. |
| if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) { |
| pullAndMatchEventsLocked(mCurrentBucketStartTimeNs); |
| } |
| } |
| |
| void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { |
| vector<shared_ptr<LogEvent>> allData; |
| if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) { |
| ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs); |
| invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED); |
| return; |
| } |
| |
| accumulateEvents(allData, timestampNs, timestampNs); |
| } |
| |
| int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { |
| return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; |
| } |
| |
| // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely |
| // to be delayed. Other events like condition changes or app upgrade which are not based on |
| // AlarmManager might have arrived earlier and close the bucket. |
| void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, |
| PullResult pullResult, int64_t originalPullTimeNs) { |
| lock_guard<mutex> lock(mMutex); |
| if (mCondition == ConditionState::kTrue) { |
| // If the pull failed, we won't be able to compute a diff. |
| if (pullResult == PullResult::PULL_RESULT_FAIL) { |
| invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED); |
| } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) { |
| bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); |
| if (isEventLate) { |
| // If the event is late, we are in the middle of a bucket. Just |
| // process the data without trying to snap the data to the nearest bucket. |
| accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); |
| } else { |
| // For scheduled pulled data, the effective event time is snap to the nearest |
| // bucket end. In the case of waking up from a deep sleep state, we will |
| // attribute to the previous bucket end. If the sleep was long but not very |
| // long, we will be in the immediate next bucket. Previous bucket may get a |
| // larger number as we pull at a later time than real bucket end. |
| // |
| // If the sleep was very long, we skip more than one bucket before sleep. In |
| // this case, if the diff base will be cleared and this new data will serve as |
| // new diff base. |
| int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1; |
| StatsdStats::getInstance().noteBucketBoundaryDelayNs( |
| mMetricId, originalPullTimeNs - bucketEndTimeNs); |
| accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs); |
| } |
| } |
| } |
| |
| // We can probably flush the bucket. Since we used bucketEndTimeNs when calling |
| // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. |
| flushIfNeededLocked(originalPullTimeNs); |
| } |
| |
| void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues, |
| const LogEvent& newEvent, |
| const vector<int>& newValueIndices) const { |
| if (eventValues.second.size() != newValueIndices.size()) { |
| ALOGE("NumericValueMetricProducer value indices sizes don't match"); |
| return; |
| } |
| vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues(); |
| const vector<FieldValue>& newFieldValues = newEvent.getValues(); |
| for (size_t i = 0; i < eventValues.second.size(); ++i) { |
| if (newValueIndices[i] != -1 && eventValues.second[i] != -1) { |
| (*aggregateFieldValues)[eventValues.second[i]].mValue += |
| newFieldValues[newValueIndices[i]].mValue; |
| } |
| } |
| } |
| |
| // Process events retrieved from a pull. |
| void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData, |
| int64_t originalPullTimeNs, |
| int64_t eventElapsedTimeNs) { |
| if (isEventLateLocked(eventElapsedTimeNs)) { |
| VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", |
| (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); |
| StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); |
| invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET); |
| return; |
| } |
| |
| const int64_t elapsedRealtimeNs = getElapsedRealtimeNs(); |
| const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs; |
| StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs); |
| if (pullDelayNs > mMaxPullDelayNs) { |
| ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId, |
| (long long)mMaxPullDelayNs); |
| StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId); |
| // We are missing one pull from the bucket which means we will not have a complete view of |
| // what's going on. |
| invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED); |
| return; |
| } |
| |
| mMatchedMetricDimensionKeys.clear(); |
| if (mUseDiff) { |
| // An extra aggregation step is needed to sum values with matching dimensions |
| // before calculating the diff between sums of consecutive pulls. |
| std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents; |
| for (const auto& data : allData) { |
| const auto [matchResult, transformedEvent] = |
| mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex); |
| if (matchResult != MatchingState::kMatched) { |
| continue; |
| } |
| |
| // Get dimensions_in_what key and value indices. |
| HashableDimensionKey dimensionsInWhat; |
| vector<int> valueIndices(mDedupedFieldMatchers.size(), -1); |
| const LogEvent& eventRef = transformedEvent == nullptr ? *data : *transformedEvent; |
| if (!filterValues(mDimensionsInWhat, mDedupedFieldMatchers, eventRef.getValues(), |
| dimensionsInWhat, valueIndices)) { |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| } |
| |
| // Store new event in map or combine values in existing event. |
| auto it = aggregateEvents.find(dimensionsInWhat); |
| if (it == aggregateEvents.end()) { |
| aggregateEvents.emplace(std::piecewise_construct, |
| std::forward_as_tuple(dimensionsInWhat), |
| std::forward_as_tuple(eventRef, valueIndices)); |
| } else { |
| combineValueFields(it->second, eventRef, valueIndices); |
| } |
| } |
| |
| for (auto& [dimKey, eventInfo] : aggregateEvents) { |
| eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs); |
| onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first); |
| } |
| } else { |
| for (const auto& data : allData) { |
| const auto [matchResult, transformedEvent] = |
| mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex); |
| if (matchResult == MatchingState::kMatched) { |
| LogEvent localCopy = transformedEvent == nullptr ? *data : *transformedEvent; |
| localCopy.setElapsedTimestampNs(eventElapsedTimeNs); |
| onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); |
| } |
| } |
| } |
| |
| // If a key that is: |
| // 1. Tracked in mCurrentSlicedBucket and |
| // 2. A superset of the current mStateChangePrimaryKey |
| // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys) |
| // then we clear the data from mDimInfos to reset the base and current state key. |
| for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) { |
| const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat(); |
| bool presentInPulledData = |
| mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end(); |
| if (!presentInPulledData && |
| containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks, |
| mStateChangePrimaryKey.first)) { |
| auto it = mDimInfos.find(whatKey); |
| if (it != mDimInfos.end()) { |
| mDimInfos.erase(it); |
| } |
| // Turn OFF condition timer for keys not present in pulled data. |
| currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs); |
| } |
| } |
| mMatchedMetricDimensionKeys.clear(); |
| mHasGlobalBase = true; |
| |
| // If we reach the guardrail, we might have dropped some data which means the bucket is |
| // incomplete. |
| // |
| // The base also needs to be reset. If we do not have the full data, we might |
| // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key |
| // might be missing from mCurrentSlicedBucket. |
| if (hasReachedGuardRailLimit()) { |
| invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED); |
| mCurrentSlicedBucket.clear(); |
| } |
| } |
| |
| bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { |
| // ===========GuardRail============== |
| // 1. Report the tuple count if the tuple count > soft limit |
| if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { |
| return false; |
| } |
| if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { |
| size_t newTupleCount = mCurrentFullBucket.size() + 1; |
| // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. |
| if (newTupleCount > mDimensionHardLimit) { |
| if (!mHasHitGuardrail) { |
| ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", |
| (long long)mMetricId, newKey.toString().c_str()); |
| mHasHitGuardrail = true; |
| } |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| namespace { |
| NumericValue getAggregationInputValue(const LogEvent& event, const Matcher& matcher) { |
| if (matcher.hasAllPositionMatcher()) { // client-aggregated histogram |
| vector<int> binCounts; |
| for (const FieldValue& value : event.getValues()) { |
| if (!value.mField.matches(matcher)) { |
| continue; |
| } |
| if (value.mValue.getType() == INT) { |
| binCounts.push_back(value.mValue.int_value); |
| } else { |
| return NumericValue{}; |
| } |
| } |
| return NumericValue(HistogramValue(binCounts)); |
| } |
| |
| for (const FieldValue& value : event.getValues()) { |
| if (!value.mField.matches(matcher)) { |
| continue; |
| } |
| switch (value.mValue.type) { |
| case INT: |
| return NumericValue((int64_t)value.mValue.int_value); |
| case LONG: |
| return NumericValue((int64_t)value.mValue.long_value); |
| case FLOAT: |
| return NumericValue((double)value.mValue.float_value); |
| case DOUBLE: |
| return NumericValue((double)value.mValue.double_value); |
| default: |
| return NumericValue{}; |
| } |
| } |
| return NumericValue{}; |
| } |
| |
| void addValueToHistogram(const NumericValue& value, const optional<const BinStarts>& binStarts, |
| HistogramValue& histValue) { |
| if (binStarts == nullopt) { |
| ALOGE("Missing bin configuration!"); |
| return; |
| } |
| histValue.addValue(static_cast<float>(toDouble(value)), *binStarts); |
| } |
| |
| } // anonymous namespace |
| |
| bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs, |
| const MetricDimensionKey& eventKey, |
| const LogEvent& event, vector<Interval>& intervals, |
| Bases& bases) { |
| if (bases.size() < mFieldMatchers.size()) { |
| VLOG("Resizing number of bases to %zu", mFieldMatchers.size()); |
| bases.resize(mFieldMatchers.size()); |
| } |
| |
| // We only use anomaly detection under certain cases. |
| // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics |
| // containing multiple values. We tried to retain all previous behaviour, but we are unsure the |
| // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. |
| // Whoever next works on it should look into the cases where it is triggered in this function. |
| // Discussion here: http://ag/6124370. |
| bool useAnomalyDetection = true; |
| bool seenNewData = false; |
| for (size_t i = 0; i < mFieldMatchers.size(); i++) { |
| const Matcher& matcher = mFieldMatchers[i]; |
| Interval& interval = intervals[i]; |
| interval.aggIndex = i; |
| NumericValue& base = bases[i]; |
| NumericValue value = getAggregationInputValue(event, matcher); |
| if (!value.hasValue()) { |
| VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str()); |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| return seenNewData; |
| } |
| |
| if (value.is<HistogramValue>() && !value.getValue<HistogramValue>().isValid()) { |
| ALOGE("Invalid histogram at %zu from event %s", i, event.ToString().c_str()); |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| if (mUseDiff) { |
| base.reset(); |
| } |
| continue; |
| } |
| |
| if (mUseDiff) { |
| if (!base.hasValue()) { |
| if (mHasGlobalBase && mUseZeroDefaultBase) { |
| // The bucket has global base. This key does not. |
| // Optionally use zero as base. |
| if (value.is<int64_t>()) { |
| base = ZERO_LONG; |
| } else if (value.is<double>()) { |
| base = ZERO_DOUBLE; |
| } else if (value.is<HistogramValue>()) { |
| base = HistogramValue(); |
| } |
| } else { |
| // no base. just update base and return. |
| base = value; |
| |
| // If we're missing a base, do not use anomaly detection on incomplete data |
| useAnomalyDetection = false; |
| |
| seenNewData = true; |
| // Continue (instead of return) here in order to set base value for other bases |
| continue; |
| } |
| } |
| NumericValue diff{}; |
| if (value.is<HistogramValue>()) { |
| diff = value - base; |
| seenNewData = true; |
| base = value; |
| if (diff == HistogramValue::ERROR_BINS_MISMATCH) { |
| ALOGE("Value %zu from event %s does not have enough bins", i, |
| event.ToString().c_str()); |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| continue; |
| } |
| if (diff == HistogramValue::ERROR_BIN_COUNT_TOO_HIGH) { |
| ALOGE("Value %zu from event %s has decreasing bin count", i, |
| event.ToString().c_str()); |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| continue; |
| } |
| } else { |
| seenNewData = true; |
| switch (mValueDirection) { |
| case ValueMetric::INCREASING: |
| if (value >= base) { |
| diff = value - base; |
| } else if (mUseAbsoluteValueOnReset) { |
| diff = value; |
| } else { |
| VLOG("Unexpected decreasing value"); |
| StatsdStats::getInstance().notePullDataError(mPullAtomId); |
| base = value; |
| // If we've got bad data, do not use anomaly detection |
| useAnomalyDetection = false; |
| continue; |
| } |
| break; |
| case ValueMetric::DECREASING: |
| if (base >= value) { |
| diff = base - value; |
| } else if (mUseAbsoluteValueOnReset) { |
| diff = value; |
| } else { |
| VLOG("Unexpected increasing value"); |
| StatsdStats::getInstance().notePullDataError(mPullAtomId); |
| base = value; |
| // If we've got bad data, do not use anomaly detection |
| useAnomalyDetection = false; |
| continue; |
| } |
| break; |
| case ValueMetric::ANY: |
| diff = value - base; |
| break; |
| default: |
| break; |
| } |
| base = value; |
| } |
| value = diff; |
| } |
| |
| const ValueMetric::AggregationType aggType = getAggregationTypeLocked(i); |
| if (interval.hasValue()) { |
| switch (aggType) { |
| case ValueMetric::SUM: |
| // for AVG, we add up and take average when flushing the bucket |
| case ValueMetric::AVG: |
| interval.aggregate += value; |
| break; |
| case ValueMetric::MIN: |
| interval.aggregate = min(value, interval.aggregate); |
| break; |
| case ValueMetric::MAX: |
| interval.aggregate = max(value, interval.aggregate); |
| break; |
| case ValueMetric::HISTOGRAM: |
| if (value.is<HistogramValue>()) { |
| // client-aggregated histogram: add the corresponding bin counts. |
| NumericValue sum = interval.aggregate + value; |
| if (sum == HistogramValue::ERROR_BINS_MISMATCH) { |
| ALOGE("Value %zu from event %s has too many bins", i, |
| event.ToString().c_str()); |
| StatsdStats::getInstance().noteBadValueType(mMetricId); |
| continue; |
| } |
| interval.aggregate = sum; |
| } else { |
| // statsd-aggregated histogram: add the raw value to histogram. |
| addValueToHistogram(value, getBinStarts(i), |
| interval.aggregate.getValue<HistogramValue>()); |
| } |
| break; |
| default: |
| break; |
| } |
| } else if (aggType == ValueMetric::HISTOGRAM && !value.is<HistogramValue>()) { |
| // statsd-aggregated histogram: add raw value to histogram. |
| interval.aggregate = HistogramValue(); |
| addValueToHistogram(value, getBinStarts(i), |
| interval.aggregate.getValue<HistogramValue>()); |
| } else { |
| interval.aggregate = value; |
| } |
| seenNewData = true; |
| interval.sampleSize += 1; |
| } |
| |
| // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due |
| // to MULTIPLE_BUCKETS_SKIPPED. |
| if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) { |
| // TODO: propgate proper values down stream when anomaly support doubles |
| long wholeBucketVal = intervals[0].aggregate.getValueOrDefault<int64_t>(0); |
| auto prev = mCurrentFullBucket.find(eventKey); |
| if (prev != mCurrentFullBucket.end()) { |
| wholeBucketVal += prev->second; |
| } |
| for (auto& tracker : mAnomalyTrackers) { |
| tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey, |
| wholeBucketVal); |
| } |
| } |
| return seenNewData; |
| } |
| |
| PastBucket<NumericValue> NumericValueMetricProducer::buildPartialBucket( |
| int64_t bucketEndTimeNs, vector<Interval>& intervals) { |
| PastBucket<NumericValue> bucket; |
| bucket.mBucketStartNs = mCurrentBucketStartTimeNs; |
| bucket.mBucketEndNs = bucketEndTimeNs; |
| |
| // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold, |
| // then all interval values are discarded for this bucket. |
| if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) { |
| return bucket; |
| } |
| |
| for (const Interval& interval : intervals) { |
| // skip the output if the diff is zero |
| if (!interval.hasValue() || |
| (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) { |
| continue; |
| } |
| |
| bucket.aggIndex.push_back(interval.aggIndex); |
| bucket.aggregates.push_back(getFinalValue(interval)); |
| if (mIncludeSampleSize) { |
| bucket.sampleSizes.push_back(interval.sampleSize); |
| } |
| } |
| return bucket; |
| } |
| |
| // Also invalidates current bucket if multiple buckets have been skipped |
| void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs, |
| const int64_t nextBucketStartTimeNs) { |
| ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs); |
| if (mAnomalyTrackers.size() > 0) { |
| appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs()); |
| } |
| } |
| |
| void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) { |
| ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs); |
| |
| // If we do not have a global base when the condition is true, |
| // we will have incomplete bucket for the next bucket. |
| if (mUseDiff && !mHasGlobalBase && mCondition) { |
| // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here. |
| mCurrentBucketIsSkipped = false; |
| } |
| } |
| |
| void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { |
| if (mCurrentBucketIsSkipped) { |
| if (isFullBucketReached) { |
| // If the bucket is invalid, we ignore the full bucket since it contains invalid data. |
| mCurrentFullBucket.clear(); |
| } |
| // Current bucket is invalid, we do not add it to the full bucket. |
| return; |
| } |
| |
| if (isFullBucketReached) { // If full bucket, send to anomaly tracker. |
| // Accumulate partial buckets with current value and then send to anomaly tracker. |
| if (mCurrentFullBucket.size() > 0) { |
| for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { |
| if (hitFullBucketGuardRailLocked(metricDimensionKey) || |
| currentBucket.intervals.empty()) { |
| continue; |
| } |
| // TODO: fix this when anomaly can accept double values |
| auto& interval = currentBucket.intervals[0]; |
| if (interval.hasValue()) { |
| mCurrentFullBucket[metricDimensionKey] += |
| interval.aggregate.getValueOrDefault<int64_t>(0); |
| } |
| } |
| for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) { |
| for (auto& tracker : mAnomalyTrackers) { |
| if (tracker != nullptr) { |
| tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum); |
| } |
| } |
| } |
| mCurrentFullBucket.clear(); |
| } else { |
| // Skip aggregating the partial buckets since there's no previous partial bucket. |
| for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { |
| for (auto& tracker : mAnomalyTrackers) { |
| if (tracker != nullptr && !currentBucket.intervals.empty()) { |
| // TODO: fix this when anomaly can accept double values |
| auto& interval = currentBucket.intervals[0]; |
| if (interval.hasValue()) { |
| const int64_t longVal = |
| interval.aggregate.getValueOrDefault<int64_t>(0); |
| tracker->addPastBucket(metricDimensionKey, longVal, mCurrentBucketNum); |
| } |
| } |
| } |
| } |
| } |
| } else { |
| // Accumulate partial bucket. |
| for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) { |
| if (!currentBucket.intervals.empty()) { |
| // TODO: fix this when anomaly can accept double values |
| auto& interval = currentBucket.intervals[0]; |
| if (interval.hasValue()) { |
| mCurrentFullBucket[metricDimensionKey] += |
| interval.aggregate.getValueOrDefault<int64_t>(0); |
| } |
| } |
| } |
| } |
| } |
| |
| const optional<const BinStarts>& NumericValueMetricProducer::getBinStarts( |
| int valueFieldIndex) const { |
| return mBinStartsList.size() == 1 ? mBinStartsList[0] : mBinStartsList[valueFieldIndex]; |
| } |
| |
| // Estimate for the size of NumericValues. |
| size_t NumericValueMetricProducer::getAggregatedValueSize(const NumericValue& value) const { |
| size_t valueSize = 0; |
| // Index |
| valueSize += sizeof(int32_t); |
| |
| // Value |
| valueSize += value.getSize(); |
| |
| // Sample Size |
| if (mIncludeSampleSize) { |
| valueSize += sizeof(int32_t); |
| } |
| return valueSize; |
| } |
| |
| size_t NumericValueMetricProducer::byteSizeLocked() const { |
| sp<ConfigMetadataProvider> configMetadataProvider = getConfigMetadataProvider(); |
| if (configMetadataProvider != nullptr && configMetadataProvider->useV2SoftMemoryCalculation()) { |
| bool dimensionGuardrailHit = StatsdStats::getInstance().hasHitDimensionGuardrail(mMetricId); |
| return computeOverheadSizeLocked(!mPastBuckets.empty() || !mSkippedBuckets.empty(), |
| dimensionGuardrailHit) + |
| mTotalDataSize; |
| } |
| size_t totalSize = 0; |
| for (const auto& [_, buckets] : mPastBuckets) { |
| totalSize += buckets.size() * kBucketSize; |
| // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector |
| } |
| return totalSize; |
| } |
| |
| bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const { |
| if (mUploadThreshold == nullopt) { |
| return true; |
| } |
| |
| double doubleValue = toDouble(getFinalValue(interval)); |
| |
| switch (mUploadThreshold->value_comparison_case()) { |
| case UploadThreshold::kLtInt: |
| return doubleValue < (double)mUploadThreshold->lt_int(); |
| case UploadThreshold::kGtInt: |
| return doubleValue > (double)mUploadThreshold->gt_int(); |
| case UploadThreshold::kLteInt: |
| return doubleValue <= (double)mUploadThreshold->lte_int(); |
| case UploadThreshold::kGteInt: |
| return doubleValue >= (double)mUploadThreshold->gte_int(); |
| case UploadThreshold::kLtFloat: |
| return doubleValue <= (double)mUploadThreshold->lt_float(); |
| case UploadThreshold::kGtFloat: |
| return doubleValue >= (double)mUploadThreshold->gt_float(); |
| default: |
| ALOGE("Value metric no upload threshold type used"); |
| return false; |
| } |
| } |
| |
| NumericValue NumericValueMetricProducer::getFinalValue(const Interval& interval) const { |
| if (interval.aggregate.is<HistogramValue>()) { |
| return interval.aggregate.getValue<HistogramValue>().getCompactedHistogramValue(); |
| } |
| if (getAggregationTypeLocked(interval.aggIndex) != ValueMetric::AVG) { |
| return interval.aggregate; |
| } else { |
| double sum = toDouble(interval.aggregate); |
| return NumericValue(sum / interval.sampleSize); |
| } |
| } |
| |
| NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const { |
| return {FIELD_ID_VALUE_METRICS, |
| FIELD_ID_BUCKET_NUM, |
| FIELD_ID_START_BUCKET_ELAPSED_MILLIS, |
| FIELD_ID_END_BUCKET_ELAPSED_MILLIS, |
| FIELD_ID_CONDITION_TRUE_NS, |
| FIELD_ID_CONDITION_CORRECTION_NS}; |
| } |
| |
| MetricProducer::DataCorruptionSeverity NumericValueMetricProducer::determineCorruptionSeverity( |
| int32_t atomId, DataCorruptedReason /*reason*/, LostAtomType atomType) const { |
| switch (atomType) { |
| case LostAtomType::kWhat: |
| return mUseDiff ? DataCorruptionSeverity::kUnrecoverable |
| : DataCorruptionSeverity::kResetOnDump; |
| case LostAtomType::kCondition: |
| case LostAtomType::kState: |
| return DataCorruptionSeverity::kUnrecoverable; |
| }; |
| return DataCorruptionSeverity::kNone; |
| }; |
| |
| } // namespace statsd |
| } // namespace os |
| } // namespace android |