This commit is contained in:
Kyle Zhang 2025-02-06 19:42:28 +08:00 committed by GitHub
commit 663a499b45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,6 +4,8 @@
#include "table/merger.h" #include "table/merger.h"
#include <vector>
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
@ -15,39 +17,49 @@ class MergingIterator : public Iterator {
public: public:
MergingIterator(const Comparator* comparator, Iterator** children, int n) MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator), : comparator_(comparator),
children_(new IteratorWrapper[n]), children_(n, nullptr),
n_(n), n_(n),
current_(nullptr), current_(nullptr),
direction_(kForward) { direction_(kForward) {
for (int i = 0; i < n; i++) { for (int i = 0; i < n_; i++) {
children_[i].Set(children[i]); children_[i] = new IteratorWrapper(children[i]);
} }
} }
~MergingIterator() override { delete[] children_; } ~MergingIterator() override {
for (int i = 0; i < n_; i++) {
delete children_[i];
}
}
bool Valid() const override { return (current_ != nullptr); } bool Valid() const override { return (current_ != nullptr); }
void SeekToFirst() override { void SeekToFirst() override {
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst(); children_[i]->SeekToFirst();
} }
SortChildren();
FindSmallest(); FindSmallest();
direction_ = kForward; direction_ = kForward;
} }
void SeekToLast() override { void SeekToLast() override {
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].SeekToLast(); children_[i]->SeekToLast();
} }
SortChildren();
FindLargest(); FindLargest();
direction_ = kReverse; direction_ = kReverse;
} }
void Seek(const Slice& target) override { void Seek(const Slice& target) override {
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].Seek(target); children_[i]->Seek(target);
} }
SortChildren();
FindSmallest(); FindSmallest();
direction_ = kForward; direction_ = kForward;
} }
@ -55,14 +67,17 @@ class MergingIterator : public Iterator {
void Next() override { void Next() override {
assert(Valid()); assert(Valid());
bool need_sort = false;
// Ensure that all children are positioned after key(). // Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already // If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is // true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise, // the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children. // we explicitly position the non-current_ children.
if (direction_ != kForward) { if (direction_ != kForward) {
need_sort = true;
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = children_[i];
if (child != current_) { if (child != current_) {
child->Seek(key()); child->Seek(key());
if (child->Valid() && if (child->Valid() &&
@ -75,20 +90,30 @@ class MergingIterator : public Iterator {
} }
current_->Next(); current_->Next();
if (need_sort) {
SortChildren();
} else {
AdjustCurrentByNext();
}
FindSmallest(); FindSmallest();
} }
void Prev() override { void Prev() override {
assert(Valid()); assert(Valid());
bool need_sort = false;
// Ensure that all children are positioned before key(). // Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already // If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is // true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise, // the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children. // we explicitly position the non-current_ children.
if (direction_ != kReverse) { if (direction_ != kReverse) {
need_sort = true;
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = children_[i];
if (child != current_) { if (child != current_) {
child->Seek(key()); child->Seek(key());
if (child->Valid()) { if (child->Valid()) {
@ -104,6 +129,13 @@ class MergingIterator : public Iterator {
} }
current_->Prev(); current_->Prev();
if (need_sort) {
SortChildren();
} else {
AdjustCurrentByPrev();
}
FindLargest(); FindLargest();
} }
@ -120,7 +152,7 @@ class MergingIterator : public Iterator {
Status status() const override { Status status() const override {
Status status; Status status;
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
status = children_[i].status(); status = children_[i]->status();
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
@ -135,44 +167,92 @@ class MergingIterator : public Iterator {
void FindSmallest(); void FindSmallest();
void FindLargest(); void FindLargest();
void SortChildren();
void AdjustCurrentByNext();
void AdjustCurrentByPrev();
// We might want to use a heap in case there are lots of children. // We might want to use a heap in case there are lots of children.
// For now we use a simple array since we expect a very small number // For now we use a simple array since we expect a very small number
// of children in leveldb. // of children in leveldb.
const Comparator* comparator_; const Comparator* comparator_;
IteratorWrapper* children_; std::vector<IteratorWrapper*> children_;
int n_; int n_;
IteratorWrapper* current_; IteratorWrapper* current_;
int current_idx_;
Direction direction_; Direction direction_;
}; };
void MergingIterator::FindSmallest() { void MergingIterator::FindSmallest() {
IteratorWrapper* smallest = nullptr; current_ = nullptr;
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = children_[i];
if (child->Valid()) { if (child->Valid()) {
if (smallest == nullptr) { current_ = child;
smallest = child; current_idx_ = i;
} else if (comparator_->Compare(child->key(), smallest->key()) < 0) { return;
smallest = child;
} }
} }
}
current_ = smallest;
} }
void MergingIterator::FindLargest() { void MergingIterator::FindLargest() {
IteratorWrapper* largest = nullptr; current_ = nullptr;
for (int i = n_ - 1; i >= 0; i--) { for (int i = n_ - 1; i >= 0; i--) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = children_[i];
if (child->Valid()) { if (child->Valid()) {
if (largest == nullptr) { current_ = child;
largest = child; current_idx_ = i;
} else if (comparator_->Compare(child->key(), largest->key()) > 0) { return;
largest = child;
} }
} }
}
void MergingIterator::SortChildren() {
std::sort(children_.begin(), children_.end(),
[this](const IteratorWrapper* a, const IteratorWrapper* b) {
// Order of invalid children are not important. They are just
// skipped.
if (!a->Valid()) return false;
if (!b->Valid()) return true;
return comparator_->Compare(a->key(), b->key()) < 0;
});
}
void MergingIterator::AdjustCurrentByNext() {
if (!current_->Valid()) return;
for (int next_idx = current_idx_ + 1; next_idx < n_; next_idx++) {
IteratorWrapper* next_child = children_[next_idx];
if (!next_child->Valid()) continue;
if (comparator_->Compare(current_->key(), next_child->key()) > 0) {
children_[current_idx_] = next_child;
current_idx_ = next_idx;
children_[next_idx] = current_;
} else {
break;
}
}
}
void MergingIterator::AdjustCurrentByPrev() {
if (!current_->Valid()) return;
for (int next_idx = current_idx_ - 1; next_idx >= 0 ; next_idx--) {
IteratorWrapper* next_child = children_[next_idx];
if (!next_child->Valid()) continue;
if (comparator_->Compare(current_->key(), next_child->key()) < 0) {
children_[current_idx_] = next_child;
current_idx_ = next_idx;
children_[next_idx] = current_;
} else {
break;
}
} }
current_ = largest;
} }
} // namespace } // namespace