blob: ea6b24039e0f3f9c44ecfe338d72ee515c01ca64 [file] [log] [blame]
#include "caffe2/operators/dataset_ops.h"
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "caffe2/core/blob_serialization.h"
#include "caffe2/core/operator.h"
#include "caffe2/core/tensor.h"
#include "caffe2/utils/string_utils.h"
namespace caffe2 {
CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>);
CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr);
CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr);
CAFFE_KNOWN_TYPE(dataset_ops::Shared2DTensorVectorPtr);
namespace dataset_ops {
namespace {
const char kDatasetFieldSeparator = ':';
const char* kDatasetLengthField = "lengths";
// how much percent to grow the dataset when needed
const int kDatasetGrowthPct = 40;
} // namespace
TreeIterator::TreeIterator(const std::vector<std::string>& fields) {
// populate field vector and split field names
fields_.resize(fields.size());
std::vector<std::vector<std::string>> nameParts(fields_.size());
for (size_t i = 0; i < fields.size(); ++i) {
auto& field = fields_.at(i);
field.name = fields[i];
field.id = i;
field.lengthFieldId = -1;
nameParts.at(i) = split(kDatasetFieldSeparator, field.name);
}
// populate lengthFields
for (const auto& field : fields_) {
const auto& parts = nameParts.at(field.id);
if (!parts.empty() && parts.back() == kDatasetLengthField) {
lengthFieldIds_.push_back(field.id);
}
}
// find length-field with maximum prefix matching for each field
for (auto& field : fields_) {
// by default, we are matching against the root domain
size_t maxMatchLevel = 1;
int maxMatchLengthFieldId = -1;
for (int j = 0; j < numLengthFields(); ++j) {
const auto& lenField = lengthField(j);
// a length field can't have itself as its length field
if (field.id == lenField.id) {
continue;
}
auto lf = nameParts.at(lenField.id);
auto lfEnd = lf.end() - 1;
// check whether this lengthField is a prefix for this field name
if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin())
.first != lfEnd) {
continue;
}
if (lf.size() > maxMatchLevel) {
maxMatchLevel = lf.size();
maxMatchLengthFieldId = j;
}
}
field.lengthFieldId = maxMatchLengthFieldId;
}
// check that fields are topologically sorted
// (no length field depends on a length defined afterwards)
for (const auto& field : fields_) {
const auto* lengthField = lengthFieldFor(field);
CAFFE_ENFORCE(
(lengthField == nullptr) || (lengthField->id < field.id),
"Error: Field ",
field.id,
" (",
field.name,
") ",
"depends on a field defined afterwards: ",
lengthField->id,
" (",
lengthField->name,
").");
}
}
void TreeIterator::advance(
const std::vector<const TLength*>& lengths,
std::vector<TOffset>& offsets,
std::vector<TOffset>& sizes,
std::vector<TOffset>& limits,
TOffset num) {
std::vector<TOffset> newOffsets;
CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields());
CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields());
sizes.resize(offsets.size());
newOffsets.resize(offsets.size());
// first index, top level
{
auto limit = limits[0];
auto offset = offsets[0];
CAFFE_ENFORCE(limit >= offset, "Tried to advance past end of cursor.");
TOffset total = std::min(limit - offset, num);
sizes[0] = total;
newOffsets[0] = offset + total;
}
// child indices
for (int j = 1; j < numOffsetFields(); ++j) {
TOffset total = 0;
int parentOffsetId = offsetFieldIdFor(lengthField(j - 1));
const TLength* length = lengths[j - 1] + offsets[parentOffsetId];
for (int k = 0; k < sizes[parentOffsetId]; ++k) {
total += *(length++);
}
auto offset = offsets[j];
CAFFE_ENFORCE(
offset + total <= limits[j],
"Inconsistent field length: ",
"tried to advance past the end of field ",
j);
sizes[j] = total;
newOffsets[j] = offset + total;
}
offsets = newOffsets;
}
TreeWalker::TreeWalker(const vector<const Blob*>& inputs, TreeCursor& cursor)
: inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) {
CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size());
if (cursor.offsets.empty()) {
cursor.offsets.assign(cursor.it.numOffsetFields(), 0);
}
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
fields_.emplace_back(*this, fieldId);
}
gatherLengthData();
gatherSizeLimits();
// The invariant we hold is that we are always one step ahead
advance();
}
void TreeWalker::advance() {
prevOffsets_ = cursor_.offsets;
cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1);
}
std::vector<int64_t> TreeWalker::fieldDim(int fieldId) const {
auto tensorDim = input(fieldId).sizes().vec();
tensorDim[0] = sizes_[lengthIdx(fieldId)];
return tensorDim;
}
void* TreeWalker::fieldPtr(int fieldId) const {
auto& in = input(fieldId);
return (char*)in.raw_data() +
offset(fieldId) * in.size_from_dim(1) * in.dtype().itemsize();
}
void TreeWalker::gatherLengthData() {
static const TLength lenZero = 0;
lengths_.resize(cursor_.it.numLengthFields());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < lengths_.size(); ++i) {
auto& in = input(cursor_.it.lengthField(i).id);
if (in.numel() > 0) {
lengths_[i] = in.data<int>();
} else {
lengths_[i] = &lenZero;
}
}
}
void TreeWalker::gatherSizeLimits() {
limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) {
auto lengthFieldIdx = lengthIdx(fieldId);
limits_[lengthFieldIdx] =
std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).sizes()[0]);
}
}
namespace {
class CreateTreeCursorOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit CreateTreeCursorOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
bool RunOnDevice() override {
*OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) =
// NOLINTNEXTLINE(modernize-make-unique)
std::unique_ptr<TreeCursor>(new TreeCursor(TreeIterator(fields_)));
return true;
}
private:
std::vector<std::string> fields_;
};
class GetCursorOffsetOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit GetCursorOffsetOp(Args&&... args)
: Operator(std::forward<Args>(args)...) {}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
Output(0)->Resize(cursor->offsets.size());
auto* output = Output(0)->template mutable_data<int>();
for (size_t i = 0; i < cursor->offsets.size(); ++i) {
output[i] = cursor->offsets[i];
}
return true;
}
};
class ResetCursorOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit ResetCursorOp(Args&&... args)
: Operator(std::forward<Args>(args)...) {}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
std::lock_guard<std::mutex> lock(cursor->mutex_);
cursor->offsets.clear();
return true;
}
};
class CheckDatasetConsistencyOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit CheckDatasetConsistencyOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
bool RunOnDevice() override {
std::vector<const TLength*> lengths;
std::vector<TOffset> limits;
std::vector<TOffset> sizes;
std::vector<TOffset> offsets;
CAFFE_ENFORCE(
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
InputSize() == iterator_.fields().size(),
"Invalid number of fields. Expected ",
iterator_.fields().size(),
", got ",
InputSize());
sizes.resize(iterator_.numOffsetFields());
// gather length data
lengths.resize(iterator_.numLengthFields());
for (size_t i = 0; i < lengths.size(); ++i) {
lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>();
}
// gather size limits
limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
for (size_t i = 0; i < iterator_.fields().size(); ++i) {
int lengthIdx = iterator_.fields()[i].lengthFieldId + 1;
CAFFE_ENFORCE_GT(Input(i).dim(), 0);
TOffset size = (TOffset)Input(i).sizes()[0];
if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) {
limits[lengthIdx] = size;
} else {
CAFFE_ENFORCE(
limits[lengthIdx] == size,
"Inconsistent sizes for fields belonging to same domain.",
" Field: ",
i,
" (",
iterator_.fields()[i].name,
"); Length field index: ",
lengthIdx,
"); Previous size: ",
limits[lengthIdx],
"; New size: ",
size);
}
}
// advance to the end
offsets.assign(sizes.size(), 0);
iterator_.advance(lengths, offsets, sizes, limits, limits[0]);
for (size_t i = 0; i < limits.size(); ++i) {
CAFFE_ENFORCE(limits[i] == offsets[i]);
}
return true;
}
private:
TreeIterator iterator_;
};
class PackRecordsOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit PackRecordsOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")),
packToSingleSharedPtr_(OperatorBase::GetSingleArgument<int>(
"pack_to_single_shared_ptr",
0)) {}
bool RunOnDevice() override {
// There should be one input per field
CAFFE_ENFORCE_EQ(InputSize(), fields_.size());
CAFFE_ENFORCE_EQ(OutputSize(), 1);
TreeCursor cursor((TreeIterator(fields_)));
TreeWalker walker(Inputs(), cursor);
if (packToSingleSharedPtr_) {
Output(0)->Resize(1);
auto* dst = Output(0)->template mutable_data<Shared2DTensorVectorPtr>();
dst[0] = std::make_shared<Tensor2DVector>();
dst[0]->resize(walker.size());
for (int batchId = 0; batchId < walker.size(); ++batchId) {
std::vector<TensorCPU>& tensors = dst[0]->at(batchId);
tensors.reserve(walker.fields().size());
for (const auto& field : walker.fields()) {
tensors.emplace_back(field.dim(), CPU);
auto& tensor = tensors.back();
context_.CopyItemsSameDevice(
field.meta(),
tensor.numel(),
field.ptr() /* src */,
tensor.raw_mutable_data(field.meta()) /* dst */);
}
walker.advance();
}
} else {
Output(0)->Resize(walker.size());
auto* dst = Output(0)->template mutable_data<SharedTensorVectorPtr>();
for (int batchId = 0; batchId < walker.size(); ++batchId) {
dst[batchId] = std::make_shared<std::vector<TensorCPU>>();
dst[batchId]->reserve(walker.fields().size());
for (const auto& field : walker.fields()) {
dst[batchId]->emplace_back(field.dim(), CPU);
auto& tensor = dst[batchId]->back();
context_.CopyItemsSameDevice(
field.meta(),
tensor.numel(),
field.ptr() /* src */,
tensor.raw_mutable_data(field.meta()) /* dst */);
}
walker.advance();
}
}
return true;
}
private:
std::vector<std::string> fields_;
const bool packToSingleSharedPtr_;
};
class UnPackRecordsOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit UnPackRecordsOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {}
bool RunOnDevice() override {
size_t numRows = 0;
Shared2DTensorVectorPtr data_ptr = nullptr;
if (Input(0).IsType<SharedTensorVectorPtr>()) {
numRows = Input(0).numel();
CAFFE_ENFORCE_GE(numRows, 0);
data_ptr = std::make_shared<Tensor2DVector>();
data_ptr->reserve(numRows);
const auto* inputs = Input(0).template data<SharedTensorVectorPtr>();
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < numRows; i++) {
data_ptr->emplace_back(*inputs[i]);
}
} else if (Input(0).IsType<Shared2DTensorVectorPtr>()) {
CAFFE_ENFORCE_EQ(Input(0).numel(), 1);
const auto* inputs = Input(0).template data<Shared2DTensorVectorPtr>();
CAFFE_ENFORCE(inputs[0] != nullptr);
data_ptr = inputs[0];
numRows = inputs[0]->size();
CAFFE_ENFORCE_GE(numRows, 0);
} else {
// input contains a single tensor
CAFFE_ENFORCE_EQ(InputSize(), 1);
CAFFE_ENFORCE_EQ(OutputSize(), 1);
Output(0)->CopyFrom(Input(0));
return true;
}
auto numTensors = OutputSize();
// Precomputer the output sizes to avoid resizing
std::vector<std::vector<int64_t>> outputDims(numTensors);
std::vector<TypeMeta> metas(numTensors);
CAFFE_ENFORCE(
numRows > 0 || InputSize() > 1,
"Unpacking empty record without shape will leave output blobs in "
"undefined state.");
if (InputSize() == 1) {
getShapeAndMetaFromInput(data_ptr, outputDims, metas);
} else {
getShapeAndMetaFromPrototypeBlobs(outputDims, metas);
}
// inputs contains a single shared_ptr of vector<vector<caffe2::TensorCPU>>
auto& tensors = *data_ptr;
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < numRows; ++i) {
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int j = 0; j < tensors[i].size(); ++j) {
const auto& input = tensors[i][j];
// Checks to ensure that dimensions/sizes match
CAFFE_ENFORCE_EQ(outputDims[j].size(), input.dim());
CAFFE_ENFORCE(metas[j] == input.dtype());
// We look from first dimension, because we concat on the first.
for (int k = 1; k < input.dim(); ++k) {
CAFFE_ENFORCE_EQ(input.sizes()[k], outputDims[j][k]);
}
outputDims[j][0] += input.size(0);
}
}
// Resize to the final output size
std::vector<void*> destinations(numTensors);
for (int i = 0; i < numTensors; ++i) {
Output(i)->Resize(outputDims[i]);
destinations[i] = Output(i)->raw_mutable_data(metas[i]);
}
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < numRows; ++i) {
for (int j = 0; j < numTensors; ++j) {
const auto& input = tensors[i][j];
context_.CopyItemsSameDevice(
metas[j],
input.numel(),
input.raw_data() /* src */,
destinations[j] /* dst */
);
destinations[j] =
(char*)destinations[j] + input.numel() * input.itemsize();
}
}
return true;
}
private:
void getShapeAndMetaFromInput(
const Shared2DTensorVectorPtr& inputs,
std::vector<std::vector<int64_t>>& outputDims,
std::vector<TypeMeta>& metas) {
const auto& inputZero = inputs->at(0);
const auto numTensors = inputZero.size();
CAFFE_ENFORCE_EQ(numTensors, fields_.size());
CAFFE_ENFORCE_EQ(numTensors, OutputSize());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < numTensors; ++i) {
outputDims[i] = inputZero[i].sizes().vec();
outputDims[i][0] = 0;
metas[i] = inputZero[i].dtype();
}
}
void getShapeAndMetaFromPrototypeBlobs(
std::vector<std::vector<int64_t>>& outputDims,
std::vector<TypeMeta>& metas) {
const auto numTensors = fields_.size();
CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1);
CAFFE_ENFORCE_EQ(numTensors, OutputSize());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < numTensors; ++i) {
const auto& input = Input(i + 1);
outputDims[i] = input.sizes().vec();
outputDims[i][0] = 0;
metas[i] = input.dtype();
}
}
std::vector<std::string> fields_;
};
class ReadNextBatchOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit ReadNextBatchOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
enforceBatchSize_(OperatorBase::GetSingleArgument<bool>(
"enforce_batch_size",
false)) {}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
std::vector<const TLength*> lengths;
std::vector<TOffset> limits;
std::vector<TOffset> sizes;
std::vector<TOffset> offsets;
TLength lenZero = 0;
sizes.resize(cursor->it.numOffsetFields());
// gather length data
lengths.resize(cursor->it.numLengthFields());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < lengths.size(); ++i) {
auto& a = Input(cursor->it.lengthField(i).id + 1);
if (a.numel() > 0) {
lengths[i] = a.data<int>();
} else {
lengths[i] = &lenZero;
}
}
// gather size limits
limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < cursor->it.fields().size(); ++i) {
int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
limits[lengthFieldIdx] =
std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]);
}
// advance cursor
{
std::lock_guard<std::mutex> lock(cursor->mutex_);
if (cursor->offsets.empty()) {
cursor->offsets.assign(sizes.size(), 0);
}
offsets = cursor->offsets;
cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_);
if (enforceBatchSize_ && sizes[0] < batchSize_) {
// if we enforce batch_size but don't have enough rows left to
// complete a full batch, return empty for all columns.
// This signals end of dataset to the caller.
sizes.assign(sizes.size(), 0);
}
}
// gather data
std::vector<int64_t> outDim;
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < cursor->it.fields().size(); ++i) {
auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
auto size = sizes[lengthIdx];
auto offset = offsets[lengthIdx];
auto& in = Input(i + 1);
auto innerSize = in.size_from_dim(1);
outDim = in.sizes().vec();
outDim[0] = size;
auto* out = Output(i);
out->Resize(outDim);
void* src =
(char*)in.raw_data() + offset * innerSize * in.dtype().itemsize();
void* dst = out->raw_mutable_data(in.dtype()); // create the tensor
if (out->numel() == 0) {
continue;
}
context_.CopyItemsSameDevice(in.dtype(), out->numel(), src, dst);
}
return true;
}
int batchSize_;
bool enforceBatchSize_;
};
class ComputeOffsetOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit ComputeOffsetOp(Args&&... args)
: Operator(std::forward<Args>(args)...) {}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
auto* out = Output(0);
std::vector<const TLength*> lengths;
std::vector<TOffset> limits;
std::vector<TOffset> sizes;
std::vector<TOffset> offsets;
TLength lenZero = 0;
sizes.resize(cursor->it.numOffsetFields());
// gather length data
lengths.resize(cursor->it.numLengthFields());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < lengths.size(); ++i) {
auto& a = Input(cursor->it.lengthField(i).id + 1);
if (a.numel() > 0) {
lengths[i] = a.data<int>();
} else {
lengths[i] = &lenZero;
}
}
// gather size limits
limits.assign(sizes.size(), std::numeric_limits<TOffset>::max());
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < cursor->it.fields().size(); ++i) {
int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1;
limits[lengthFieldIdx] =
std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]);
}
out->Resize(limits.at(0) + 1, sizes.size());
auto* out_data = out->template mutable_data<int64_t>();
for (int k = 0; k <= limits.at(0); k++) {
// advance cursor
if (cursor->offsets.empty()) {
cursor->offsets.assign(sizes.size(), 0);
}
// write output
std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data);
out_data += sizes.size();
cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1);
}
cursor->offsets.assign(sizes.size(), 0); // reSet after getting meta info
return true;
}
};
class SortAndShuffleOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit SortAndShuffleOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
sort_by_field_idx_(
OperatorBase::GetSingleArgument<int>("sort_by_field_idx", 1)),
batch_size_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
shuffle_size_(OperatorBase::GetSingleArgument<int>("shuffle_size", 1)) {
}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1);
CAFFE_ENFORCE(-1 <= sort_by_field_idx_);
CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0);
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
int size;
if (sort_by_field_idx_ != -1) {
size = Input(sort_by_field_idx_ + 1).sizes()[0];
} else {
size = Input(1).sizes()[0];
}
CAFFE_ENFORCE(
batch_size_ > 0 && shuffle_size_ > 0 &&
0 < batch_size_ * shuffle_size_);
// adjust shuffle_size_ if it is too large
if (batch_size_ * shuffle_size_ > size) {
shuffle_size_ = size / batch_size_;
}
int num_batch = size / batch_size_;
auto* out = Output(0);
out->Resize(size);
auto* out_data = out->template mutable_data<int64_t>();
vector<int> shuffle_idx(size);
iota(shuffle_idx.begin(), shuffle_idx.end(), 0);
if (sort_by_field_idx_ != -1) {
auto& sortblob = Input(sort_by_field_idx_ + 1);
auto* sortdata = sortblob.data<int>();
// must sort by a field at the root level
CAFFE_ENFORCE(
cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1);
sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](int i1, int i2) {
return sortdata[i1] < sortdata[i2];
});
}
if (batch_size_ * shuffle_size_ > 1) {
int offset = 0;
while (offset + batch_size_ * shuffle_size_ < size) {
std::shuffle(
shuffle_idx.begin() + offset,
shuffle_idx.begin() + offset + batch_size_ * shuffle_size_,
std::default_random_engine());
offset += batch_size_ * shuffle_size_;
}
}
vector<int> batch_idx(num_batch);
iota(batch_idx.begin(), batch_idx.end(), 0);
std::shuffle(
batch_idx.begin(), batch_idx.end(), std::default_random_engine());
for (int i = 0; i < num_batch; i++) {
std::copy(
shuffle_idx.begin() + batch_idx[i] * batch_size_,
shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_,
out_data);
out_data += batch_size_;
}
std::copy(
shuffle_idx.begin() + num_batch * batch_size_,
shuffle_idx.end(),
out_data);
return true;
}
int sort_by_field_idx_;
int batch_size_;
int shuffle_size_;
};
class ReadRandomBatchOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit ReadRandomBatchOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)),
enforceBatchSize_(
OperatorBase::GetSingleArgument<bool>("enforce_batch_size", false)),
loopOver_(OperatorBase::GetSingleArgument<bool>("loop_over", false)) {}
bool RunOnDevice() override {
auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0);
auto& idxblob = Input(1);
auto& offsetsmat = Input(2);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3);
auto idxvec = idxblob.template data<int64_t>();
auto offsetdim = offsetsmat.sizes();
// gather data
std::vector<int64_t> outDim;
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
int64_t idx;
{
std::lock_guard<std::mutex> lock(cursor->mutex_);
cursor->offsets.resize(1);
idx = cursor->offsets.at(0);
// if we want to enforce batch size but we dont have a complete
// batch, skip the last rows.
if (enforceBatchSize_ && idx + batchSize_ > idxblob.numel()) {
idx = idxblob.numel();
}
if (loopOver_ && idx >= idxblob.numel()) {
cursor->offsets.at(0) = 0;
idx = 0;
}
cursor->offsets.at(0) += batchSize_;
}
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 0; i < cursor->it.fields().size(); ++i) {
auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1;
auto& in = Input(i + 3);
outDim = in.sizes().vec();
outDim.at(0) = 0;
auto idxbegin = idx;
for (int j = 0; j < batchSize_; ++j) {
if (idx >= idxblob.numel()) {
break;
}
CAFFE_ENFORCE(
(idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.numel(),
"Out of bound when trying to get elem from offsetsmat");
auto offsetptr = offsetsmat.template data<TOffset>() +
idxvec[idx] * offsetdim[1] + lengthIdx;
auto offset = *offsetptr;
auto size = *(offsetptr + offsetdim[1]) - offset;
outDim.at(0) += size; // accumulate over the batch
idx++;
}
idx = idxbegin; // reSet
auto* out = Output(i);
out->Resize(outDim);
if (out->numel() == 0) {
continue;
}
auto dst = static_cast<char*>(out->raw_mutable_data(in.dtype()));
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions,bugprone-narrowing-conversions)
int block_size = in.numel() / in.size(0);
auto block_bytesize = in.size_from_dim(1) * in.dtype().itemsize();
CAFFE_ENFORCE(
block_bytesize == in.nbytes() / in.size(0),
"block_bytesize should be consistent with data dim");
auto src_base = static_cast<const char*>(in.raw_data());
int start = 0;
for (int j = 0; j < batchSize_; ++j) {
if (idx >= idxblob.numel()) {
break;
}
auto offsetptr = offsetsmat.template data<TOffset>() +
idxvec[idx] * offsetdim[1] + lengthIdx;
auto offset = *offsetptr;
auto size = *(offsetptr + offsetdim[1]) - offset;
// copy data
auto src = src_base + offset * block_bytesize;
context_.CopyItemsSameDevice(
in.dtype(), size * block_size, src, dst + start * block_bytesize);
start += size;
idx++;
}
idx = idxbegin; // reSet
}
return true;
}
int batchSize_;
bool enforceBatchSize_;
bool loopOver_;
};
template <class Context>
class AppendOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
template <class... Args>
explicit AppendOp(Args&&... args)
: Operator<Context>(std::forward<Args>(args)...) {}
bool RunOnDevice() override {
auto& a = Input(0);
auto& b = Input(1);
auto* c = Output(0);
CAFFE_ENFORCE(b.dim() >= 1);
if (a.numel() == 0 && a.size(0) == 0) {
c->CopyFrom(b);
return true;
}
CAFFE_ENFORCE(&a == c, "First argument must be in-place.");
CAFFE_ENFORCE(c->dim() == b.dim());
CAFFE_ENFORCE(b.dim() == c->dim());
CAFFE_ENFORCE(a.dtype() == b.dtype());
for (int i = 1; i < a.dim(); ++i) {
CAFFE_ENFORCE(a.sizes()[i] == b.sizes()[i]);
}
auto oldSize = c->numel();
c->Extend(b.sizes()[0], kDatasetGrowthPct);
auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize();
context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst);
return true;
}
};
template <class Context>
class AtomicAppendOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
template <class... Args>
explicit AtomicAppendOp(Args&&... args)
: Operator<Context>(std::forward<Args>(args)...) {}
bool RunOnDevice() override {
auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0);
const auto numFields = (InputSize() - 1) / 2;
CAFFE_ENFORCE(OutputSize() == numFields);
std::lock_guard<std::mutex> guard(*mutex);
// 1: checks
for (int i = 0; i < numFields; ++i) {
auto& a = Input(1 + i);
auto& b = Input(1 + i + numFields);
auto* c = Output(i);
CAFFE_ENFORCE(b.dim() >= 1);
if (a.numel() == 0) {
continue;
}
CAFFE_ENFORCE(
(void*)&a == (void*)c, "Appended-to arguments must be in-place.");
CAFFE_ENFORCE(c->dim() == b.dim());
CAFFE_ENFORCE(b.dim() == c->dim());
CAFFE_ENFORCE(a.dtype() == b.dtype());
for (int j = 1; j < a.dim(); ++j) {
CAFFE_ENFORCE(a.sizes()[j] == b.sizes()[j]);
}
}
// 2: copies
for (int i = 0; i < numFields; ++i) {
auto& a = Input(1 + i);
auto& b = Input(1 + i + numFields);
auto* c = Output(i);
if (a.numel() == 0 && a.size(0) == 0) {
c->CopyFrom(b);
continue;
}
auto oldSize = c->numel();
c->Extend(b.sizes()[0], kDatasetGrowthPct);
auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize();
context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst);
}
return true;
}
};
template <class Context>
class CreateTensorVectorOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
using Operator<Context>::Operator;
bool RunOnDevice() override {
auto ptr = make_unique<std::vector<Tensor>>();
*OperatorBase::Output<TensorVectorPtr>(TENSOR_VECTOR) = std::move(ptr);
return true;
}
private:
OUTPUT_TAGS(TENSOR_VECTOR);
};
template <class Context>
class TensorVectorSizeOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp);
bool RunOnDevice() override {
auto& vector_ptr = OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR);
auto* size = Output(SIZE);
size->Resize();
// 32-bit should be enough here
*size->template mutable_data<int32_t>() = vector_ptr->size();
return true;
}
private:
INPUT_TAGS(TENSOR_VECTOR);
OUTPUT_TAGS(SIZE);
};
template <class Context>
class ConcatTensorVectorOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
using Operator<Context>::Operator;
bool RunOnDevice() override {
const TensorVectorPtr& tensorVector =
OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR);
auto* tensor = Output(TENSOR);
CAFFE_ENFORCE(!tensorVector->empty());
vector<int64_t> outputDims(tensorVector->at(0).sizes().vec());
CAFFE_ENFORCE(outputDims.size() > 0);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int i = 1; i < tensorVector->size(); i++) {
// the tensor shapes are the same except for the first dimension
for (int j = 1; j < tensorVector->at(i).dim(); j++) {
CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).sizes()[j]);
}
CAFFE_ENFORCE(tensorVector->at(0).dtype() == tensorVector->at(i).dtype());
outputDims[0] += tensorVector->at(i).sizes()[0];
}
tensor->Resize(outputDims);
int64_t offset = 0;
auto* dst = (char*)tensor->raw_mutable_data(tensorVector->at(0).dtype());
for (const auto& t : *tensorVector) {
context_.CopyItemsSameDevice(
t.dtype(), t.numel(), t.raw_data(), dst + offset);
offset += t.nbytes();
}
return true;
}
private:
INPUT_TAGS(TENSOR_VECTOR);
OUTPUT_TAGS(TENSOR);
};
template <class Context>
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init)
class CollectTensorOp final : public Operator<Context> {
public:
USE_OPERATOR_CONTEXT_FUNCTIONS;
template <class... Args>
explicit CollectTensorOp(Args&&... args)
: Operator<Context>(std::forward<Args>(args)...),
numToCollect_(
OperatorBase::GetSingleArgument<int>("num_to_collect", -1)),
numVisited_(0) {
CAFFE_ENFORCE(numToCollect_ > 0);
}
bool RunOnDevice() override {
int pos = -1;
if (numVisited_ < numToCollect_) {
// append
pos = numVisited_;
} else {
// uniform between [0, numVisited_]
at::uniform_int_from_to_distribution<int> uniformDist(numVisited_+1, 0);
pos = uniformDist(context_.RandGenerator());
if (pos >= numToCollect_) {
// discard
pos = -1;
}
}
for (int i = 0; i < OutputSize(); ++i) {
// TENSOR_VECTOR_IN is enforced inplace with TENSOR_VECTOR_OUT
TensorVectorPtr& tensorVector = *OperatorBase::Output<TensorVectorPtr>(i);
if (numVisited_ >= numToCollect_) {
CAFFE_ENFORCE(
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
tensorVector->size() == numToCollect_,
"TensorVector size = ",
tensorVector->size(),
" is different from numToCollect = ",
numToCollect_);
}
const auto& tensor = Input(OutputSize() + i);
if (pos < 0) {
// discard
CAFFE_ENFORCE(numVisited_ >= numToCollect_);
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
} else if (pos >= tensorVector->size()) {
// append
tensorVector->emplace_back();
ReinitializeAndCopyFrom(
&tensorVector->back(),
Context::GetDeviceType(),
tensor); // sync copy
} else {
// replace
tensorVector->at(pos).CopyFrom(tensor); // sync copy
}
}
numVisited_++;
return true;
}
private:
// number of tensors to collect
int numToCollect_;
// number of tensors visited
int numVisited_;
};
class TrimDatasetOp : public Operator<CPUContext> {
public:
template <class... Args>
explicit TrimDatasetOp(Args&&... args)
: Operator(std::forward<Args>(args)...),
iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")),
multiple_of_(OperatorBase::GetSingleArgument<int>("multiple_of", 1)) {
CAFFE_ENFORCE_GE(multiple_of_, 1);
}
bool RunOnDevice() override {
TreeCursor cursor(iterator_);
TreeWalker walker(Inputs(), cursor);
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions,bugprone-narrowing-conversions)
int trimmedSize = (walker.size() / multiple_of_) * multiple_of_;
if (trimmedSize == walker.size()) {
// we already satisfy the condition
return true;
}
// advance desired number of records
for (int i = 0; i < trimmedSize; ++i) {
walker.advance();
}
// trim each column to the offset
// NOLINTNEXTLINE(clang-diagnostic-sign-compare)
for (int col = 0; col < walker.fields().size(); ++col) {
auto newOuterSize = walker.fields().at(col).offset();
Output(col)->ShrinkTo(newOuterSize);
}
return true;
}
private:
TreeIterator iterator_;
int multiple_of_;
};
REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp);
REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp);
REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp);
REGISTER_CPU_OPERATOR(GetCursorOffset, GetCursorOffsetOp);
REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp);
REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp);
REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp);
REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp);
REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>);
REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>);
REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>);
REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>);
REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>);
REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>);
REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp);
REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp);
REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp);
OPERATOR_SCHEMA(CreateTreeCursor)
.NumInputs(0)
.NumOutputs(1)
.SetDoc(R"DOC(
Creates a cursor to iterate through a list of tensors, where some of those
tensors contain the lengths in a nested schema. The schema is determined by
the `fields` arguments.
For example, to represent the following schema:
Struct(
a=Int(),
b=List(List(Int)),
c=List(
Struct(
c1=String,
c2=List(Int),
),
),
)
the field list will be:
[
"a",
"b:lengths",
"b:values:lengths",
"b:values:values",
"c:lengths",
"c:c1",
"c:c2:lengths",
"c:c2:values",
]
And for the following instance of the struct:
Struct(
a=3,
b=[[4, 5], [6, 7, 8], [], [9]],
c=[
Struct(c1='alex', c2=[10, 11]),
Struct(c1='bob', c2=[12]),
],
)
The values of the fields will be:
{
"a": [3],
"b:lengths": [4],
"b:values:lengths": [2, 3, 0, 1],
"b:values:values": [4, 5, 6, 7, 8, 9],
"c:lengths": [2],
"c:c1": ["alex", "bob"],
"c:c2:lengths": [2, 1],
"c:c2:values", [10, 11, 12],
}
In general, every field name in the format "{prefix}:lengths" defines a domain
"{prefix}", and every subsequent field in the format "{prefix}:{field}" will
be in that domain, and the length of the domain is provided for each entry of
the parent domain. In the example, "b:lengths" defines a domain of length 4, so
every field under domain "b" will have 4 entries.
The "lengths" field for a given domain must appear before any reference to
that domain.
Returns a pointer to an instance of the Cursor, which keeps the current offset
on each of the domains defined by `fields`. Cursor also ensures thread-safety
such that ReadNextBatch and ResetCursor can be used safely in parallel.
A cursor does not contain data per se, so calls to ReadNextBatch actually need
to pass a list of blobs containing the data to read for each one of the fields.
)DOC")
.Output(0, "cursor", "A blob pointing to an instance of a new TreeCursor.")
.Arg(
"fields",
"A list of strings each one representing a field of the dataset.");
OPERATOR_SCHEMA(ResetCursor)
.NumInputs(1)
.NumOutputs(0)
.SetDoc(R"DOC(
Resets the offsets for the given TreeCursor. This operation is thread safe.
)DOC")
.Input(0, "cursor", "A blob containing a pointer to the cursor.");
OPERATOR_SCHEMA(ReadNextBatch)
.NumInputs(1, INT_MAX)
.NumOutputs(1, INT_MAX)
.SetDoc(R"DOC(
Read the next batch of examples out of the given cursor and data blobs.
Input(0) is a blob pointing to a TreeCursor, and
[Input(1),... Input(num_fields)] a list of tensors containing the data for
each field of the dataset.
ReadNextBatch is thread safe.
)DOC")
.Input(0, "cursor", "A blob containing a pointer to the cursor.")
.Input(1, "dataset_field_0", "First dataset field")
.Output(0, "field_0", "Tensor containing the next batch for field 0.")
.Arg("batch_size", "Number of top-level entries to read.");
OPERATOR_SCHEMA(GetCursorOffset)
.NumInputs(1)
.NumOutputs(1)
.SetDoc("Get the current offset in the cursor.")
.Input(0, "cursor", "A blob containing a pointer to the cursor.")
.Output(0, "offsets", "Tensor containing the offsets for the cursor.");
OPERATOR_SCHEMA(ComputeOffset)
.NumInputs(1, INT_MAX)
.NumOutputs(1)
.SetDoc(R"DOC(
Compute the offsets matrix given cursor and data blobs. Need to be ran at
beginning or after reseting cursor
Input(0) is a blob pointing to a TreeCursor, and
[Input(1),... Input(num_fields)] a list of tensors containing the data for
each field of the dataset.
ComputeOffset is thread safe.
)DOC")
.Input(0, "cursor", "A blob containing a pointer to the cursor.")
.Input(1, "dataset_field_0", "First dataset field")
.Output(0, "field_0", "Tensor containing offset info for this chunk.");
OPERATOR_SCHEMA(SortAndShuffle)
.NumInputs(1, INT_MAX)
.NumOutputs(1)
.SetDoc(R"DOC(
Compute the sorted indices given a field index to sort by and break the sorted
indices into chunks of shuffle_size * batch_size and shuffle each chunk,
finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort.
For example, we have data sorted as
1,2,3,4,5,6,7,8,9,10,11,12
and batchSize = 2 and shuffleSize = 3, when we shuffle we get:
[3,1,4,6,5,2] [12,10,11,8,9,7]
After this we will shuffle among different batches with size 2
[3,1],[4,6],[5,2],[12,10],[11,8],[9,7]
We may end up with something like
[9,7],[5,2],[12,10],[4,6],[3,1],[11,8]
Input(0) is a blob pointing to a TreeCursor, and
[Input(1),... Input(num_fields)] a list of tensors containing the data for
each field of the dataset.
SortAndShuffle is thread safe.
)DOC")
.Input(0, "cursor", "A blob containing a pointer to the cursor.")
.Input(1, "dataset_field_0", "First dataset field")
.Output(0, "indices", "Tensor containing sorted indices.");
OPERATOR_SCHEMA(ReadRandomBatch)
.NumInputs(1, INT_MAX)
.NumOutputs(1, INT_MAX)
.SetDoc(R"DOC(
Read the next batch of examples out of the given cursor,
idx blob, offset matrix and data blobs.
Input(0) is a blob pointing to a TreeCursor,
Input(1) is a blob pointing to the shuffled idx
Input(2) is a blob pointing to the offset matrix and
[Input(3),... Input(num_fields)] a list of tensors containing the data for
each field of the dataset.
ReadRandomBatch is thread safe.
)DOC")
.Input(0, "cursor", "A blob containing a pointer to the cursor.")
.Input(1, "idx", "idx with a shuffled order.")
.Input(2, "offsetsmat", "offset matrix containing length offset info.")
.Input(3, "dataset_field_0", "First dataset field")
.Output(0, "field_0", "Tensor containing the next batch for field 0.")
.Arg("batch_size", "Number of top-level entries to read.")
.Arg("loop_over", "(bool) Repeat the dataset indefinitely");
OPERATOR_SCHEMA(CheckDatasetConsistency)
.NumInputs(1, INT_MAX)
.NumOutputs(0)
.SetDoc(R"DOC(
Checks that the given data fields represents a consistent dataset under
the schema specified by the `fields` argument. Operator fails if the fields
are not consistent. If data is consistent, each field's data can be safely
appended to an existing dataset, keeping it consistent.
)DOC")
.Input(0, "field_0", "Data for field 0.")
.Arg(
"fields",
"List of strings representing the string names in the format"
"specified in the doc for CreateTreeCursor.");
OPERATOR_SCHEMA(Append)
.NumInputs(2)
.NumOutputs(1)
.EnforceInplace({{0, 0}})
.SetDoc(R"DOC(
Append input `B` to the end of input `A`.
- It is required that this operation run in-place, meaning that the input `A` blob must match the output blob.
- All except the outer-most dimension must be the same between `A` and `B`.
- Input `A` may have to be re-allocated in order for accommodate to the new size. Currently, an exponential growth ratio is used in order to ensure amortized constant time complexity.
Github Links:
- https://github.com/pytorch/pytorch/blob/main/caffe2/operators/dataset_ops.cc
<details>
<summary> <b>Example</b> </summary>
**Code**
```
workspace.ResetWorkspace()
op = core.CreateOperator(
"Append",
["A", "B"],
["A"],
)
workspace.FeedBlob("A", np.random.randint(10, size=(1,3,3)))
workspace.FeedBlob("B", np.random.randint(10, size=(2,3,3)))
print("A:", workspace.FetchBlob("A"))
print("B:", workspace.FetchBlob("B"))
workspace.RunOperatorOnce(op)
print("A:", workspace.FetchBlob("A"))
```
**Result**
```
A:
[[[3 8 7]
[1 6 6]
[5 0 6]]]
B:
[[[4 3 1]
[7 9 6]
[9 4 5]]
[[7 7 4]
[9 8 7]
[1 6 6]]]
A:
[[[3 8 7]
[1 6 6]
[5 0 6]]
[[4 3 1]
[7 9 6]
[9 4 5]]
[[7 7 4]
[9 8 7]
[1 6 6]]]
```
</details>
)DOC")
.Input(
0,
"A",
"(*Tensor*): base input tensor of shape $(N, d_1, d_2, ..., d_n)$")
.Input(
1,
"B",
"(*Tensor*): second input tensor of shape $(M, d_1, d_2, ..., d_n)$ to be appended to the base")
.Output(
0,
"A",
"(*Tensor*): output tensor of shape $(N+M, d_1, d_2, ..., d_n)$");
OPERATOR_SCHEMA(AtomicAppend)
.NumInputs(3, INT_MAX)
.NumOutputs(1, INT_MAX)
.AllowInplace([](int in, int out) { return in == out + 1; });
OPERATOR_SCHEMA(CreateTensorVector)
.NumInputs(0)
.NumOutputs(1)
.SetDoc("Create a std::unique_ptr<std::vector<Tensor> >");
OPERATOR_SCHEMA(TensorVectorSize)
.NumInputs(1)
.NumOutputs(1)
.SetDoc("Get the size of the input vector")
.Input(0, "tensor vector", "std::unique_ptr<std::vector<Tensor> >")
.Output(0, "size", "int32_t size");
OPERATOR_SCHEMA(ConcatTensorVector)
.NumInputs(1)
.NumOutputs(1)
.SetDoc(R"DOC(
Concat Tensors in the std::unique_ptr<std::vector<Tensor> >
along the first dimension.
)DOC")
.Input(0, "vector of Tensor", "std::unique_ptr<std::vector<Tensor> >")
.Output(0, "tensor", "tensor after concatenating");
OPERATOR_SCHEMA(CollectTensor)
.NumInputs([](int n) { return n > 0 && n % 2 == 0; })
.NumOutputs(1, INT_MAX)
.NumInputsOutputs([](int in, int out) { return in == out * 2; })
.EnforceInplace([](int in, int out) { return in == out; })
.SetDoc(R"DOC(
Collect tensor into tensor vector by reservoir sampling,
argument num_to_collect indicates the max number of tensors that will be
collected. The first half of the inputs are tensor vectors, which are also the
outputs. The second half of the inputs are the tensors to be collected into each
vector (in the same order). The input tensors are collected in all-or-none
manner. If they are collected, they will be placed at the same index in the
output vectors.
)DOC")
.Arg("num_to_collect", "The max number of tensors to collect");
OPERATOR_SCHEMA(PackRecords)
.NumInputs(1, INT_MAX)
.NumOutputs(1)
.SetDoc(R"DOC(
Given a dataset under a schema specified by the `fields` argument, pack all
the input tensors into one, where each tensor element represents a row of data
(batch of size 1). This format allows easier use with the rest of Caffe2
operators.
)DOC")
.Arg(
"fields",
"List of strings representing the string names in the format"
"specified in the doc for CreateTreeCursor.")
.Output(
0,
"tensor",
"One dimensional tensor having a complex type of SharedTensorVectorPtr."
" In order to reverse it back to the original input it has to be "
"inserted into UnPackRecordsOp.");
OPERATOR_SCHEMA(TrimDataset)
.NumInputs(1, INT_MAX)
.NumOutputs(1, INT_MAX)
.SetDoc(R"DOC(
Trim the given dataset inplace, given the dataset blobs and the field specs.
Trimming happens such that the dataset will contain the largest possible number
of records that is a multiple of the 'multiple_of' argument.
)DOC")
.EnforceInplace([](int input, int output) { return input == output; })
.Arg(
"fields",
"List of strings representing the string names in the format"
"specified in the doc for CreateTreeCursor.");
OPERATOR_SCHEMA(UnPackRecords)
.NumInputs(1, INT_MAX)
.NumOutputs(1, INT_MAX)
.SetDoc(R"DOC(
Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument
describing the datasets schema, return the original dataset format. Number of
returned tensors is equal to the number of fields in the `fields` argument.
The first input is the packed tensor to be unpacked. Optionally, you can provide
prototype tensors to give the expected shapes of the output tensors. This is
helpful when you expected to unpack empty tensor, e.g., output of a sampling
process.
)DOC")
.Arg(
"fields",
"List of strings representing the string names in the format"
"specified in the doc for CreateTreeCursor.")
.Input(0, "packed_tensor", "The tensor to be unpacked");
SHOULD_NOT_DO_GRADIENT(CreateTreeCursor);
SHOULD_NOT_DO_GRADIENT(ResetCursor);
SHOULD_NOT_DO_GRADIENT(ReadNextBatch);
SHOULD_NOT_DO_GRADIENT(ComputeOffset);
SHOULD_NOT_DO_GRADIENT(ReadRandomBatch);
SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency);
SHOULD_NOT_DO_GRADIENT(Append);
SHOULD_NOT_DO_GRADIENT(AtomicAppend);
SHOULD_NOT_DO_GRADIENT(CreateTensorVector);
SHOULD_NOT_DO_GRADIENT(TensorVectorSize);
SHOULD_NOT_DO_GRADIENT(ConcatTensorVector);
SHOULD_NOT_DO_GRADIENT(CollectTensor);
SHOULD_NOT_DO_GRADIENT(UnPackRecords);
SHOULD_NOT_DO_GRADIENT(PackRecords);
class TreeCursorSerializer : public BlobSerializerBase {
public:
// NOLINTNEXTLINE(modernize-use-equals-default)
TreeCursorSerializer() {}
// NOLINTNEXTLINE(modernize-use-equals-default)
~TreeCursorSerializer() override {}
void Serialize(
const void* pointer,
TypeMeta typeMeta,
const string& name,
SerializationAcceptor acceptor) override {
CAFFE_ENFORCE(typeMeta.Match<std::unique_ptr<TreeCursor>>());
const auto& cursor =
*static_cast<const std::unique_ptr<TreeCursor>*>(pointer);
BlobProto blob_proto;
// serialize offsets as a tensor
if (cursor->offsets.size() > 0) {
Blob offsets_blob;
auto* offsets = BlobGetMutableTensor(&offsets_blob, CPU);
offsets->Resize(cursor->offsets.size());
std::copy(
cursor->offsets.begin(),
cursor->offsets.end(),
offsets->template mutable_data<TOffset>());
TensorSerializer ser;
ser.Serialize(
*offsets, name, blob_proto.mutable_tensor(), 0, offsets->numel());
}
blob_proto.set_name(name);
blob_proto.set_type("std::unique_ptr<TreeCursor>");
// serialize field names in the content
std::ostringstream os;
for (const auto& field : cursor->it.fields()) {
os << field.name << " ";
}
blob_proto.set_content(os.str());
acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
}
};
class TreeCursorDeserializer : public BlobDeserializerBase {
public:
void Deserialize(const BlobProto& proto, Blob* blob) override {
// Deserialize the field names
std::vector<std::string> fieldNames;
std::istringstream is(proto.content());
std::string field;
while (true) {
is >> field;
if (is.eof()) {
break;
}
fieldNames.push_back(field);
}
TreeIterator it(fieldNames);
auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>();
CAFFE_ENFORCE(base != nullptr, "TreeCursor doesn't exist.");
// NOLINTNEXTLINE(modernize-make-unique)
(*base).reset(new TreeCursor(it));
// Deserialize the offset vector when it is not empty. The proto.tensor()
// function will return a TensorProto associated with offset vector. The
// offset vector contains fields of type int64_t, and we verify it is not
// empty before calling the deserializer.
if (proto.tensor().int64_data().size() > 0) {
TensorDeserializer deser;
Blob offset_blob;
deser.Deserialize(proto, &offset_blob);
auto& offsets = offset_blob.template Get<Tensor>();
auto* offsets_ptr = offsets.data<TOffset>();
(*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.numel());
}
}
};
REGISTER_BLOB_SERIALIZER(
(TypeMeta::Id<std::unique_ptr<TreeCursor>>()),
TreeCursorSerializer);
REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer);
} // namespace
void SharedTensorVectorPtrSerializer::Serialize(
const void* pointer,
TypeMeta typeMeta,
const string& name,
BlobSerializerBase::SerializationAcceptor acceptor) {
/* This is dummy serialize that doesn't save anything. If saving the content
is desired in future use case, you can change this serializer. Note: special
care need to be taken for the parameter initialization of
LastNWindowCollectorOp and ReservoirSamplingOp if this serializer actually
saves the content.
*/
CAFFE_ENFORCE(typeMeta.Match<std::shared_ptr<std::vector<TensorCPU>>>());
BlobProto blob_proto;
blob_proto.set_name(name);
blob_proto.set_type("std::shared_ptr<std::vector<TensorCPU>>");
blob_proto.set_content("");
acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto));
};
void SharedTensorVectorPtrDeserializer::Deserialize(
const BlobProto& /* unused */,
Blob* blob) {
/* This is dummy deserialize which creates a nullptr
*/
blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>();
}
REGISTER_BLOB_SERIALIZER(
(TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()),
SharedTensorVectorPtrSerializer);
REGISTER_BLOB_DESERIALIZER(
std::shared_ptr<std::vector<TensorCPU>>,
SharedTensorVectorPtrDeserializer);
} // namespace dataset_ops
} // namespace caffe2