blob: b34430c152793fb974d22e1bbc013ddf24a290e5 [file] [log] [blame]
#include "lmdb.h" // NOLINT
#if defined(_MSC_VER)
#include <direct.h>
#endif
#include <sys/stat.h>
#include <string>
#include "caffe2/core/db.h"
#include "caffe2/core/logging.h"
namespace caffe2 {
namespace db {
constexpr size_t LMDB_MAP_SIZE = 1099511627776; // 1 TB
inline void MDB_CHECK(int mdb_status) {
CAFFE_ENFORCE_EQ(mdb_status, MDB_SUCCESS, mdb_strerror(mdb_status));
}
class LMDBCursor : public Cursor {
public:
explicit LMDBCursor(MDB_env* mdb_env) : mdb_env_(mdb_env), valid_(false) {
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn_));
MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
MDB_CHECK(mdb_cursor_open(mdb_txn_, mdb_dbi_, &mdb_cursor_));
SeekToFirst();
}
~LMDBCursor() override {
mdb_cursor_close(mdb_cursor_);
mdb_dbi_close(mdb_env_, mdb_dbi_);
mdb_txn_abort(mdb_txn_);
}
void Seek(const string& key) override {
if (key.size() == 0) {
SeekToFirst();
return;
}
// a key of 16k size should be enough? I am not sure though.
mdb_key_.mv_size = key.size();
mdb_key_.mv_data = const_cast<char*>(key.c_str());
int mdb_status =
mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, MDB_SET_RANGE);
if (mdb_status == MDB_NOTFOUND) {
valid_ = false;
} else {
MDB_CHECK(mdb_status);
valid_ = true;
}
}
bool SupportsSeek() override {
return true;
}
void SeekToFirst() override {
SeekLMDB(MDB_FIRST);
}
void Next() override {
SeekLMDB(MDB_NEXT);
}
string key() override {
return string(static_cast<const char*>(mdb_key_.mv_data), mdb_key_.mv_size);
}
string value() override {
return string(
static_cast<const char*>(mdb_value_.mv_data), mdb_value_.mv_size);
}
bool Valid() override {
return valid_;
}
private:
void SeekLMDB(MDB_cursor_op op) {
int mdb_status = mdb_cursor_get(mdb_cursor_, &mdb_key_, &mdb_value_, op);
if (mdb_status == MDB_NOTFOUND) {
valid_ = false;
} else {
MDB_CHECK(mdb_status);
valid_ = true;
}
}
MDB_env* mdb_env_;
MDB_txn* mdb_txn_;
MDB_dbi mdb_dbi_;
MDB_cursor* mdb_cursor_;
MDB_val mdb_key_, mdb_value_;
bool valid_;
};
class LMDBTransaction final : public Transaction {
public:
explicit LMDBTransaction(MDB_env* mdb_env) : mdb_env_(mdb_env) {
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn_));
MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
}
~LMDBTransaction() override {
MDB_CHECK(mdb_txn_commit(mdb_txn_));
mdb_dbi_close(mdb_env_, mdb_dbi_);
}
void Put(const string& key, string&& value) override;
void Commit() override {
MDB_CHECK(mdb_txn_commit(mdb_txn_));
mdb_dbi_close(mdb_env_, mdb_dbi_);
// Begin a new transaction.
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn_));
MDB_CHECK(mdb_dbi_open(mdb_txn_, NULL, 0, &mdb_dbi_));
}
private:
MDB_env* mdb_env_;
MDB_dbi mdb_dbi_;
MDB_txn* mdb_txn_;
C10_DISABLE_COPY_AND_ASSIGN(LMDBTransaction);
};
class LMDB : public DB {
public:
LMDB(const string& source, Mode mode);
~LMDB() override {
Close();
}
void Close() override {
if (mdb_env_ != NULL) {
mdb_env_close(mdb_env_);
mdb_env_ = NULL;
}
}
unique_ptr<Cursor> NewCursor() override {
return make_unique<LMDBCursor>(mdb_env_);
}
unique_ptr<Transaction> NewTransaction() override {
return make_unique<LMDBTransaction>(mdb_env_);
}
private:
MDB_env* mdb_env_;
};
LMDB::LMDB(const string& source, Mode mode) : DB(source, mode) {
MDB_CHECK(mdb_env_create(&mdb_env_));
MDB_CHECK(mdb_env_set_mapsize(mdb_env_, LMDB_MAP_SIZE));
if (mode == NEW) {
#if defined(_MSC_VER)
CAFFE_ENFORCE_EQ(_mkdir(source.c_str()), 0, "mkdir ", source, " failed");
#else
CAFFE_ENFORCE_EQ(
mkdir(source.c_str(), 0744), 0, "mkdir ", source, " failed");
#endif
}
int flags = 0;
if (mode == READ) {
flags = MDB_RDONLY | MDB_NOTLS | MDB_NOLOCK;
}
MDB_CHECK(mdb_env_open(mdb_env_, source.c_str(), flags, 0664));
VLOG(1) << "Opened lmdb " << source;
}
void LMDBTransaction::Put(const string& key, string&& value) {
MDB_val mdb_key, mdb_value;
mdb_key.mv_data = const_cast<char*>(key.data());
mdb_key.mv_size = key.size();
mdb_value.mv_data = const_cast<char*>(value.data());
mdb_value.mv_size = value.size();
MDB_CHECK(mdb_put(mdb_txn_, mdb_dbi_, &mdb_key, &mdb_value, 0));
}
REGISTER_CAFFE2_DB(LMDB, LMDB);
REGISTER_CAFFE2_DB(lmdb, LMDB);
} // namespace db
} // namespace caffe2