From 46b995573b6ab00fa2ac936dba97a764f516d9b2 Mon Sep 17 00:00:00 2001 From: harish876 Date: Fri, 28 Mar 2025 05:44:14 +0000 Subject: [PATCH] Refactor MemTable to support secondary keys; update related methods and tests. Secondary MemTable added as a btree. Todo: understand changes in detail --- db/db_impl.cc | 32 +++---- db/memtable.cc | 188 +++++++++++++++++++++++++++-------------- db/memtable.h | 19 ++++- db/repair.cc | 3 +- db/version_set.cc | 2 +- db/write_batch_test.cc | 7 +- table/table_builder.cc | 7 ++ table/table_test.cc | 16 ++-- 8 files changed, 180 insertions(+), 94 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 42627fe..6b0d918 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -144,7 +144,9 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) db_lock_(nullptr), shutting_down_(false), background_work_finished_signal_(&mutex_), - mem_(nullptr), + mem_( + nullptr), // init memtable constructor here - + // https://github.com/mohiuddin-shuvo/LevelDB_Embedded-Secondary-Index/commit/ba1746a6a5b0e98b6d1c1d10df3cdb5c4bde0445#diff-6fdb755f590d9b01ecb89bd8ceb28577e85536d4472f8e4fc3addeb9a65f3645 imm_(nullptr), has_imm_(false), logfile_(nullptr), @@ -446,7 +448,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_); + mem = new MemTable(internal_comparator_, this->options_.secondary_key); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); @@ -492,7 +494,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, mem = nullptr; } else { // mem can be nullptr if lognum exists but was empty. - mem_ = new MemTable(internal_comparator_); + mem_ = new MemTable(internal_comparator_, this->options_.secondary_key); mem_->Ref(); } } @@ -1204,13 +1206,13 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key, LookupKey lkey(s_key, snapshot); std::unordered_set result_set; - mem->Get(lkey, acc, &s, this->options_.secondary_key, &result_set, - top_k_outputs, this); + mem->Get(s_key, snapshot, acc, &s, this->options_.secondary_key, + &result_set, top_k_outputs); if (imm != nullptr && top_k_outputs - acc->size() > 0) { int mem_size = acc->size(); - imm->Get(lkey, acc, &s, this->options_.secondary_key, &result_set, - top_k_outputs, this); + imm->Get(s_key, snapshot, acc, &s, this->options_.secondary_key, + &result_set, top_k_outputs); } if (top_k_outputs > (int)(acc->size())) { @@ -1218,16 +1220,13 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key, top_k_outputs, &result_set, this); // have_stat_update = true; } - // if (top_k_outputs < acc->size()) { - // acc->erase(acc->begin() + top_k_outputs, acc->end()); - // } - // std::sort_heap(acc->begin(), acc->end(), NewestFirst); + std::sort_heap(acc->begin(), acc->end(), NewestFirst); mutex_.Lock(); } - if (have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleCompaction(); - } + // /*if (have_stat_update && current->UpdateStats(stats)) { + // MaybeScheduleCompaction(); + // }*/ mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); @@ -1474,7 +1473,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.store(true, std::memory_order_release); - mem_ = new MemTable(internal_comparator_); + mem_ = new MemTable(internal_comparator_, this->options_.secondary_key); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); @@ -1599,7 +1598,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); - impl->mem_ = new MemTable(impl->internal_comparator_); + impl->mem_ = + new MemTable(impl->internal_comparator_, options.secondary_key); impl->mem_->Ref(); } } diff --git a/db/memtable.cc b/db/memtable.cc index 71f1b17..60a2359 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -6,11 +6,13 @@ #include "db/db_impl.h" #include "db/dbformat.h" +#include #include #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "leveldb/slice.h" #include "util/coding.h" #include "util/json_utils.h" @@ -24,10 +26,23 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } -MemTable::MemTable(const InternalKeyComparator& comparator) - : comparator_(comparator), refs_(0), table_(comparator_, &arena_) {} +MemTable::MemTable(const InternalKeyComparator& comparator, + std::string secondary_key) + : comparator_(comparator), refs_(0), table_(comparator_, &arena_) { + secAttribute = secondary_key; +} -MemTable::~MemTable() { assert(refs_ == 0); } +MemTable::~MemTable() { + assert(refs_ == 0); + for (SecMemTable::iterator it = secTable_.begin(); it != secTable_.end(); + it++) { + std::pair*> pr = *it; + + std::vector* invertedList = pr.second; + invertedList->clear(); + delete invertedList; + } +} size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } @@ -103,6 +118,37 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf); + + // SECONDARY MEMTABLE + // Ex: { id: 1, age: 30} we add this record with key age=30 + + /* + secTable_ = { + "30": ["1", "2"], + "25": ["3"] + } + */ + if (type == kTypeDeletion) { + return; + } + std::string secKey; + Status st = + ExtractKeyFromJSON(value.ToString().c_str(), secAttribute, &secKey); + if (!st.ok()) { + return; + } + SecMemTable::const_iterator lookup = secTable_.find(secKey); + if (lookup == secTable_.end()) { + std::vector* invertedList = new std::vector(); + invertedList->push_back(key.ToString()); + + secTable_.insert(std::make_pair(secKey, invertedList)); + } + + else { + std::pair*> pr = *lookup; + pr.second->push_back(key.ToString()); + } } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { @@ -141,74 +187,90 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { return false; } -bool MemTable::Get(const LookupKey& s_key, std::vector* acc, - Status* s, std::string secondary_key, - std::unordered_set* result_set, - int top_k_output, DBImpl* db) { - if (secondary_key.empty()) { - return false; - } - Slice memkey = s_key.memtable_key(); +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + uint64_t* tag) { + Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); - iter.SeekToFirst(); - bool found; - - // I believe we do a O(n) search for the actual key based on the secondary key - - while (iter.Valid()) { + iter.Seek(memkey.data()); + if (iter.Valid()) { + // entry format is: + // klength varint32 + // userkey char[klength] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. const char* entry = iter.key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); - const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - std::string val; - switch (static_cast(tag & 0xff)) { - case kTypeValue: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - val.assign(v.data(), v.size()); - std::string sec_key_attr; - Status s = ExtractKeyFromJSON(v, secondary_key, &sec_key_attr); - if (!s.ok()) { - break; + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), key.user_key()) == 0) { + // Correct user key + *tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(*tag & 0xff)) { + case kTypeValue: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + value->assign(v.data(), v.size()); + return true; + } + case kTypeDeletion: + *s = Status::NotFound(Slice()); + return true; + } + } + } + return false; +} + +void MemTable::Get(const Slice& skey, SequenceNumber snapshot, + std::vector* acc, Status* s, + std::string secondary_key, + std::unordered_set* result_set, + int top_k_output) { + auto lookup = secTable_.find(skey.ToString()); + if (lookup == secTable_.end()) { + return; + } + std::pair*> pr = *lookup; + for (int i = pr.second->size() - 1; i >= 0; i--) { + if (acc->size() >= top_k_output) return; + + Slice pkey = pr.second->at(i); + LookupKey lkey(pkey, snapshot); + std::string secKeyVal; + std::string svalue; + Status s; + uint64_t tag; + if (!this->Get(lkey, &svalue, &s, &tag)) return; + if (s.IsNotFound()) return; + + Status st = ExtractKeyFromJSON(svalue, secAttribute, &secKeyVal); + if (!st.ok()) return; + if (comparator_.comparator.user_comparator()->Compare(secKeyVal, skey) == + 0) { + struct SKeyReturnVal newVal; + newVal.key = pr.second->at(i); + std::string temp; + + if (result_set->find(newVal.key) == result_set->end()) { + newVal.value = svalue; + newVal.sequence_number = tag; + + if (acc->size() < top_k_output) { + newVal.Push(acc, newVal); + result_set->insert(newVal.key); + + } else if (newVal.sequence_number > acc->front().sequence_number) { + newVal.Pop(acc); + newVal.Push(acc, newVal); + result_set->insert(newVal.key); + result_set->erase(result_set->find(acc->front().key)); } - if (comparator_.comparator.user_comparator()->Compare( - sec_key_attr, s_key.user_key()) == 0) { - struct SKeyReturnVal new_val; - new_val.key = Slice(key_ptr, key_length - 8).ToString(); - std::string temp; - - if (result_set->find(new_val.key) == result_set->end()) { - new_val.value = val; - new_val.sequence_number = tag; // not able to understand this - - if (acc->size() < top_k_output) { - Status st = db->Get(leveldb::ReadOptions(), new_val.key, &temp); - if (st.ok() && !st.IsNotFound() && temp == new_val.value) { - new_val.Push(acc, new_val); - result_set->insert(new_val.key); - } - } else if (new_val.sequence_number > acc->front().sequence_number) { - Status st = db->Get(leveldb::ReadOptions(), new_val.key, &temp); - if (st.ok() && !st.IsNotFound() && temp == new_val.value) { - new_val.Pop(acc); - new_val.Push(acc, new_val); - result_set->insert(new_val.key); - result_set->erase(result_set->find(acc->front().key)); - } - } - // value->push_back(newVal); - // kNoOfOutputs--; - // outputFile< #include +#include +#include #include "leveldb/db.h" +#include "leveldb/slice.h" #include "util/arena.h" @@ -24,7 +27,8 @@ class MemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. - explicit MemTable(const InternalKeyComparator& comparator); + explicit MemTable(const InternalKeyComparator& comparator, + std::string secondary_key); MemTable(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete; @@ -65,10 +69,12 @@ class MemTable { // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s); - bool Get(const LookupKey& s_key, std::vector* value, Status* s, + // Get methods for Secondary Memtable + bool Get(const LookupKey& key, std::string* value, Status* s, uint64_t* tag); + void Get(const Slice& s_key, SequenceNumber snapshot, + std::vector* value, Status* s, std::string secondary_key, - std::unordered_set* result_set, int top_k_value, - DBImpl* db); + std::unordered_set* result_set, int top_k_value); private: friend class MemTableIterator; @@ -88,6 +94,11 @@ class MemTable { int refs_; Arena arena_; Table table_; + + // SECONDARY MEMTABLE + typedef btree::btree_map*> SecMemTable; + SecMemTable secTable_; + std::string secAttribute; }; } // namespace leveldb diff --git a/db/repair.cc b/db/repair.cc index 97a27c6..6271ba7 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -34,6 +34,7 @@ #include "db/table_cache.h" #include "db/version_edit.h" #include "db/write_batch_internal.h" + #include "leveldb/comparator.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -177,7 +178,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_); + MemTable* mem = new MemTable(icmp_, this->options_.secondary_key); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/version_set.cc b/db/version_set.cc index 585e11b..a1e3487 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -528,7 +528,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k, } // std::sort(acc->begin(), acc->end(), NewestFirstSequenceNumber); - if (acc->size() > 0) + if (acc->size() == 0) return Status::NotFound(Slice()); // Use an empty error message for speed else return s; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 1a3ea8f..502a5a2 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -2,18 +2,21 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "gtest/gtest.h" #include "db/memtable.h" #include "db/write_batch_internal.h" + #include "leveldb/db.h" #include "leveldb/env.h" + #include "util/logging.h" +#include "gtest/gtest.h" + namespace leveldb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* mem = new MemTable(cmp); + MemTable* mem = new MemTable(cmp, ""); // REVISIT mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem); diff --git a/table/table_builder.cc b/table/table_builder.cc index 8925c37..ad1bf7d 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -131,6 +131,13 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { Status s = ExtractKeyFromJSON(value, r->options.secondary_key, &secondary_key_attr); if (!s.ok()) { + r->last_key.assign(key.data(), key.size()); + r->num_entries++; + r->data_block.Add(key, value); + const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); + if (estimated_block_size >= r->options.block_size) { + Flush(); + } return; } r->secondary_filter_block->AddKey(secondary_key_attr); diff --git a/table/table_test.cc b/table/table_test.cc index aea0697..0bf431f 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -4,24 +4,26 @@ #include "leveldb/table.h" -#include -#include - -#include "gtest/gtest.h" #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/table_builder.h" + #include "table/block.h" #include "table/block_builder.h" #include "table/format.h" #include "util/random.h" #include "util/testutil.h" +#include "gtest/gtest.h" + namespace leveldb { // Return reverse of "key". @@ -302,13 +304,13 @@ class MemTableConstructor : public Constructor { public: explicit MemTableConstructor(const Comparator* cmp) : Constructor(cmp), internal_comparator_(cmp) { - memtable_ = new MemTable(internal_comparator_); + memtable_ = new MemTable(internal_comparator_, ""); // REVISIT memtable_->Ref(); } ~MemTableConstructor() override { memtable_->Unref(); } Status FinishImpl(const Options& options, const KVMap& data) override { memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_); + memtable_ = new MemTable(internal_comparator_, ""); // REVISIT memtable_->Ref(); int seq = 1; for (const auto& kvp : data) { @@ -724,7 +726,7 @@ TEST_F(Harness, RandomizedLongDB) { TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* memtable = new MemTable(cmp); + MemTable* memtable = new MemTable(cmp, ""); // REVISIT memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100);