Refactor MemTable to support secondary keys; update related methods and tests.

Secondary MemTable added as a btree.

Todo: understand changes in detail
This commit is contained in:
harish876 2025-03-28 05:44:14 +00:00
parent 29e5789a57
commit 46b995573b
8 changed files with 180 additions and 94 deletions

View File

@ -144,7 +144,9 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(nullptr), db_lock_(nullptr),
shutting_down_(false), shutting_down_(false),
background_work_finished_signal_(&mutex_), background_work_finished_signal_(&mutex_),
mem_(nullptr), mem_(
nullptr), // init memtable constructor here -
// https://github.com/mohiuddin-shuvo/LevelDB_Embedded-Secondary-Index/commit/ba1746a6a5b0e98b6d1c1d10df3cdb5c4bde0445#diff-6fdb755f590d9b01ecb89bd8ceb28577e85536d4472f8e4fc3addeb9a65f3645
imm_(nullptr), imm_(nullptr),
has_imm_(false), has_imm_(false),
logfile_(nullptr), logfile_(nullptr),
@ -446,7 +448,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { if (mem == nullptr) {
mem = new MemTable(internal_comparator_); mem = new MemTable(internal_comparator_, this->options_.secondary_key);
mem->Ref(); mem->Ref();
} }
status = WriteBatchInternal::InsertInto(&batch, mem); status = WriteBatchInternal::InsertInto(&batch, mem);
@ -492,7 +494,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
mem = nullptr; mem = nullptr;
} else { } else {
// mem can be nullptr if lognum exists but was empty. // mem can be nullptr if lognum exists but was empty.
mem_ = new MemTable(internal_comparator_); mem_ = new MemTable(internal_comparator_, this->options_.secondary_key);
mem_->Ref(); mem_->Ref();
} }
} }
@ -1204,13 +1206,13 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key,
LookupKey lkey(s_key, snapshot); LookupKey lkey(s_key, snapshot);
std::unordered_set<std::string> result_set; std::unordered_set<std::string> result_set;
mem->Get(lkey, acc, &s, this->options_.secondary_key, &result_set, mem->Get(s_key, snapshot, acc, &s, this->options_.secondary_key,
top_k_outputs, this); &result_set, top_k_outputs);
if (imm != nullptr && top_k_outputs - acc->size() > 0) { if (imm != nullptr && top_k_outputs - acc->size() > 0) {
int mem_size = acc->size(); int mem_size = acc->size();
imm->Get(lkey, acc, &s, this->options_.secondary_key, &result_set, imm->Get(s_key, snapshot, acc, &s, this->options_.secondary_key,
top_k_outputs, this); &result_set, top_k_outputs);
} }
if (top_k_outputs > (int)(acc->size())) { if (top_k_outputs > (int)(acc->size())) {
@ -1218,16 +1220,13 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key,
top_k_outputs, &result_set, this); top_k_outputs, &result_set, this);
// have_stat_update = true; // have_stat_update = true;
} }
// if (top_k_outputs < acc->size()) { std::sort_heap(acc->begin(), acc->end(), NewestFirst);
// acc->erase(acc->begin() + top_k_outputs, acc->end());
// }
// std::sort_heap(acc->begin(), acc->end(), NewestFirst);
mutex_.Lock(); mutex_.Lock();
} }
if (have_stat_update && current->UpdateStats(stats)) { // /*if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction(); // MaybeScheduleCompaction();
} // }*/
mem->Unref(); mem->Unref();
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
@ -1474,7 +1473,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
log_ = new log::Writer(lfile); log_ = new log::Writer(lfile);
imm_ = mem_; imm_ = mem_;
has_imm_.store(true, std::memory_order_release); has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_); mem_ = new MemTable(internal_comparator_, this->options_.secondary_key);
mem_->Ref(); mem_->Ref();
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); MaybeScheduleCompaction();
@ -1599,7 +1598,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_ =
new MemTable(impl->internal_comparator_, options.secondary_key);
impl->mem_->Ref(); impl->mem_->Ref();
} }
} }

View File

@ -6,11 +6,13 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include <cstdint>
#include <unordered_set> #include <unordered_set>
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/slice.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/json_utils.h" #include "util/json_utils.h"
@ -24,10 +26,23 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len); return Slice(p, len);
} }
MemTable::MemTable(const InternalKeyComparator& comparator) MemTable::MemTable(const InternalKeyComparator& comparator,
: comparator_(comparator), refs_(0), table_(comparator_, &arena_) {} std::string secondary_key)
: comparator_(comparator), refs_(0), table_(comparator_, &arena_) {
secAttribute = secondary_key;
}
MemTable::~MemTable() { assert(refs_ == 0); } MemTable::~MemTable() {
assert(refs_ == 0);
for (SecMemTable::iterator it = secTable_.begin(); it != secTable_.end();
it++) {
std::pair<std::string, std::vector<std::string>*> pr = *it;
std::vector<std::string>* invertedList = pr.second;
invertedList->clear();
delete invertedList;
}
}
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
@ -103,6 +118,37 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
std::memcpy(p, value.data(), val_size); std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len); assert(p + val_size == buf + encoded_len);
table_.Insert(buf); table_.Insert(buf);
// SECONDARY MEMTABLE
// Ex: { id: 1, age: 30} we add this record with key age=30
/*
secTable_ = {
"30": ["1", "2"],
"25": ["3"]
}
*/
if (type == kTypeDeletion) {
return;
}
std::string secKey;
Status st =
ExtractKeyFromJSON(value.ToString().c_str(), secAttribute, &secKey);
if (!st.ok()) {
return;
}
SecMemTable::const_iterator lookup = secTable_.find(secKey);
if (lookup == secTable_.end()) {
std::vector<std::string>* invertedList = new std::vector<std::string>();
invertedList->push_back(key.ToString());
secTable_.insert(std::make_pair(secKey, invertedList));
}
else {
std::pair<std::string, std::vector<std::string>*> pr = *lookup;
pr.second->push_back(key.ToString());
}
} }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
@ -141,74 +187,90 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
return false; return false;
} }
bool MemTable::Get(const LookupKey& s_key, std::vector<SKeyReturnVal>* acc, bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
Status* s, std::string secondary_key, uint64_t* tag) {
std::unordered_set<std::string>* result_set, Slice memkey = key.memtable_key();
int top_k_output, DBImpl* db) {
if (secondary_key.empty()) {
return false;
}
Slice memkey = s_key.memtable_key();
Table::Iterator iter(&table_); Table::Iterator iter(&table_);
iter.SeekToFirst(); iter.Seek(memkey.data());
bool found; if (iter.Valid()) {
// entry format is:
// I believe we do a O(n) search for the actual key based on the secondary key // klength varint32
// userkey char[klength]
while (iter.Valid()) { // tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key(); const char* entry = iter.key();
uint32_t key_length; uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); if (comparator_.comparator.user_comparator()->Compare(
std::string val; Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
switch (static_cast<ValueType>(tag & 0xff)) { // Correct user key
case kTypeValue: { *tag = DecodeFixed64(key_ptr + key_length - 8);
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); switch (static_cast<ValueType>(*tag & 0xff)) {
val.assign(v.data(), v.size()); case kTypeValue: {
std::string sec_key_attr; Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
Status s = ExtractKeyFromJSON(v, secondary_key, &sec_key_attr); value->assign(v.data(), v.size());
if (!s.ok()) { return true;
break; }
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
void MemTable::Get(const Slice& skey, SequenceNumber snapshot,
std::vector<SKeyReturnVal>* acc, Status* s,
std::string secondary_key,
std::unordered_set<std::string>* result_set,
int top_k_output) {
auto lookup = secTable_.find(skey.ToString());
if (lookup == secTable_.end()) {
return;
}
std::pair<std::string, std::vector<std::string>*> pr = *lookup;
for (int i = pr.second->size() - 1; i >= 0; i--) {
if (acc->size() >= top_k_output) return;
Slice pkey = pr.second->at(i);
LookupKey lkey(pkey, snapshot);
std::string secKeyVal;
std::string svalue;
Status s;
uint64_t tag;
if (!this->Get(lkey, &svalue, &s, &tag)) return;
if (s.IsNotFound()) return;
Status st = ExtractKeyFromJSON(svalue, secAttribute, &secKeyVal);
if (!st.ok()) return;
if (comparator_.comparator.user_comparator()->Compare(secKeyVal, skey) ==
0) {
struct SKeyReturnVal newVal;
newVal.key = pr.second->at(i);
std::string temp;
if (result_set->find(newVal.key) == result_set->end()) {
newVal.value = svalue;
newVal.sequence_number = tag;
if (acc->size() < top_k_output) {
newVal.Push(acc, newVal);
result_set->insert(newVal.key);
} else if (newVal.sequence_number > acc->front().sequence_number) {
newVal.Pop(acc);
newVal.Push(acc, newVal);
result_set->insert(newVal.key);
result_set->erase(result_set->find(acc->front().key));
} }
if (comparator_.comparator.user_comparator()->Compare(
sec_key_attr, s_key.user_key()) == 0) {
struct SKeyReturnVal new_val;
new_val.key = Slice(key_ptr, key_length - 8).ToString();
std::string temp;
if (result_set->find(new_val.key) == result_set->end()) {
new_val.value = val;
new_val.sequence_number = tag; // not able to understand this
if (acc->size() < top_k_output) {
Status st = db->Get(leveldb::ReadOptions(), new_val.key, &temp);
if (st.ok() && !st.IsNotFound() && temp == new_val.value) {
new_val.Push(acc, new_val);
result_set->insert(new_val.key);
}
} else if (new_val.sequence_number > acc->front().sequence_number) {
Status st = db->Get(leveldb::ReadOptions(), new_val.key, &temp);
if (st.ok() && !st.IsNotFound() && temp == new_val.value) {
new_val.Pop(acc);
new_val.Push(acc, new_val);
result_set->insert(new_val.key);
result_set->erase(result_set->find(acc->front().key));
}
}
// value->push_back(newVal);
// kNoOfOutputs--;
// outputFile<<key<<"\nfound"<<endl;
found = true;
}
}
break;
} }
case kTypeDeletion:
break;
} }
iter.Next();
} }
return found;
} }
} // namespace leveldb } // namespace leveldb

View File

@ -10,8 +10,11 @@
#include "db/skiplist.h" #include "db/skiplist.h"
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
#include <util/btree_map.h>
#include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/slice.h"
#include "util/arena.h" #include "util/arena.h"
@ -24,7 +27,8 @@ class MemTable {
public: public:
// MemTables are reference counted. The initial reference count // MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once. // is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator); explicit MemTable(const InternalKeyComparator& comparator,
std::string secondary_key);
MemTable(const MemTable&) = delete; MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete;
@ -65,10 +69,12 @@ class MemTable {
// Else, return false. // Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s); bool Get(const LookupKey& key, std::string* value, Status* s);
bool Get(const LookupKey& s_key, std::vector<SKeyReturnVal>* value, Status* s, // Get methods for Secondary Memtable
bool Get(const LookupKey& key, std::string* value, Status* s, uint64_t* tag);
void Get(const Slice& s_key, SequenceNumber snapshot,
std::vector<SKeyReturnVal>* value, Status* s,
std::string secondary_key, std::string secondary_key,
std::unordered_set<std::string>* result_set, int top_k_value, std::unordered_set<std::string>* result_set, int top_k_value);
DBImpl* db);
private: private:
friend class MemTableIterator; friend class MemTableIterator;
@ -88,6 +94,11 @@ class MemTable {
int refs_; int refs_;
Arena arena_; Arena arena_;
Table table_; Table table_;
// SECONDARY MEMTABLE
typedef btree::btree_map<std::string, std::vector<std::string>*> SecMemTable;
SecMemTable secTable_;
std::string secAttribute;
}; };
} // namespace leveldb } // namespace leveldb

View File

@ -34,6 +34,7 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -177,7 +178,7 @@ class Repairer {
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = new MemTable(icmp_); MemTable* mem = new MemTable(icmp_, this->options_.secondary_key);
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {

View File

@ -528,7 +528,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
} }
// std::sort(acc->begin(), acc->end(), NewestFirstSequenceNumber); // std::sort(acc->begin(), acc->end(), NewestFirstSequenceNumber);
if (acc->size() > 0) if (acc->size() == 0)
return Status::NotFound(Slice()); // Use an empty error message for speed return Status::NotFound(Slice()); // Use an empty error message for speed
else else
return s; return s;

View File

@ -2,18 +2,21 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "gtest/gtest.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "util/logging.h" #include "util/logging.h"
#include "gtest/gtest.h"
namespace leveldb { namespace leveldb {
static std::string PrintContents(WriteBatch* b) { static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
MemTable* mem = new MemTable(cmp); MemTable* mem = new MemTable(cmp, ""); // REVISIT
mem->Ref(); mem->Ref();
std::string state; std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem); Status s = WriteBatchInternal::InsertInto(b, mem);

View File

@ -131,6 +131,13 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
Status s = ExtractKeyFromJSON(value, r->options.secondary_key, Status s = ExtractKeyFromJSON(value, r->options.secondary_key,
&secondary_key_attr); &secondary_key_attr);
if (!s.ok()) { if (!s.ok()) {
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
return; return;
} }
r->secondary_filter_block->AddKey(secondary_key_attr); r->secondary_filter_block->AddKey(secondary_key_attr);

View File

@ -4,24 +4,26 @@
#include "leveldb/table.h" #include "leveldb/table.h"
#include <map>
#include <string>
#include "gtest/gtest.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include <map>
#include <string>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/table_builder.h" #include "leveldb/table_builder.h"
#include "table/block.h" #include "table/block.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "table/format.h" #include "table/format.h"
#include "util/random.h" #include "util/random.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "gtest/gtest.h"
namespace leveldb { namespace leveldb {
// Return reverse of "key". // Return reverse of "key".
@ -302,13 +304,13 @@ class MemTableConstructor : public Constructor {
public: public:
explicit MemTableConstructor(const Comparator* cmp) explicit MemTableConstructor(const Comparator* cmp)
: Constructor(cmp), internal_comparator_(cmp) { : Constructor(cmp), internal_comparator_(cmp) {
memtable_ = new MemTable(internal_comparator_); memtable_ = new MemTable(internal_comparator_, ""); // REVISIT
memtable_->Ref(); memtable_->Ref();
} }
~MemTableConstructor() override { memtable_->Unref(); } ~MemTableConstructor() override { memtable_->Unref(); }
Status FinishImpl(const Options& options, const KVMap& data) override { Status FinishImpl(const Options& options, const KVMap& data) override {
memtable_->Unref(); memtable_->Unref();
memtable_ = new MemTable(internal_comparator_); memtable_ = new MemTable(internal_comparator_, ""); // REVISIT
memtable_->Ref(); memtable_->Ref();
int seq = 1; int seq = 1;
for (const auto& kvp : data) { for (const auto& kvp : data) {
@ -724,7 +726,7 @@ TEST_F(Harness, RandomizedLongDB) {
TEST(MemTableTest, Simple) { TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
MemTable* memtable = new MemTable(cmp); MemTable* memtable = new MemTable(cmp, ""); // REVISIT
memtable->Ref(); memtable->Ref();
WriteBatch batch; WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100); WriteBatchInternal::SetSequence(&batch, 100);