blob: 0070b12bb933a9c3363f0d30cd08aa961bcc3820 [file] [log] [blame]
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <cmath>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>
#include "random_generator.h"
#include "sampler.h"
namespace dist_proc {
namespace aggregation {
namespace internal {
class KllSampler;
// Hierarchy of compactors, which store items from the stream and 'compact'
// them when necessary (i.e., keep every second item in a sorted compactor)
// and add them to the compactor one level up.
class CompactorStack {
public:
CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random);
CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random);
~CompactorStack();
// Initialize or reset the compactor stack and all counters and thresholds.
void Reset();
void Add(const int64_t value);
// Adds an item to the compactor stack with weight >= 1.
// Does nothing if weight <= 0.
void AddWithWeight(int64_t value, int weight);
// Ensures that the contents of each compactor are sorted.
void SortCompactorContents();
// Target capacity of compactor with index h. If this capacity is exceeded,
// the compactor will be lazily compacted in one of the next CompactStack()
// runs. I.e., this capacity can be temorarily exceeded.
int TargetCapacityAtLevel(int h) const;
void DoubleSamplerCapacity();
int num_stored_items() const;
std::optional<std::pair<const int64_t, int64_t>> sampled_item_and_weight() const;
// Returns the lowest active level in the compactor stack, which is identical
// with the number of replaced levels, or log2(sampler_capacity()).
int lowest_active_level() const;
int64_t sampler_capacity() const;
// For testing
bool IsSamplerOn() const {
return sampler_ != nullptr;
}
const std::vector<std::vector<int64_t>>& compactors() const {
return compactors_;
}
RandomGenerator* random() {
return random_;
}
int k() const {
return k_;
}
private:
void ClearCompactors();
// Adds a new compactor at the highest level. To be called when the currently
// topmost compactor is full.
void AddLevel();
// Called when at least one level in the compactor stack is above capacity.
// Iterates from bottom to top through the compactors and compacts the
// first one that is over its capacity by halving its contents and adding
// them to the compactor one level higher.
void CompactStack();
void CompactLevel(int level);
// To compact the items in a compactor to roughly half the size,
// sorts the items and adds every even or odd item (determined randomly)
// to the up_compactor.
void Halve(std::vector<int64_t>* down_compactor, std::vector<int64_t>* up_compactor);
std::vector<std::vector<int64_t>> compactors_;
int k_;
const double c_ = 2.0 / 3.0;
int overall_capacity_;
int num_items_in_compactors_;
RandomGenerator* random_;
std::unique_ptr<KllSampler> sampler_;
};
} // namespace internal
} // namespace aggregation
} // namespace dist_proc