From cd46d7bd203b69e6d163fd19e38600d9feae6e56 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Tue, 21 Nov 2023 16:20:01 +1100 Subject: Make lua db iterators async --- src/database/database.cpp | 58 +++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 22 deletions(-) (limited to 'src/database/database.cpp') diff --git a/src/database/database.cpp b/src/database/database.cpp index 0967eb95..88ae7bbe 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -858,7 +858,7 @@ auto IndexRecord::Expand(std::size_t page_size) const } Iterator::Iterator(std::weak_ptr db, const IndexInfo& idx) - : db_(db), prev_pos_(), current_pos_() { + : db_(db), pos_mutex_(), current_pos_(), prev_pos_() { std::string prefix = EncodeIndexPrefix( IndexKey::Header{.id = idx.id, .depth = 0, .components_hash = 0}); current_pos_ = Continuation{.prefix = {prefix.data(), prefix.size()}, @@ -869,36 +869,50 @@ Iterator::Iterator(std::weak_ptr db, const IndexInfo& idx) } Iterator::Iterator(std::weak_ptr db, const Continuation& c) - : db_(db), prev_pos_(), current_pos_(c) {} + : db_(db), pos_mutex_(), current_pos_(c), prev_pos_() {} -auto Iterator::Prev() -> std::optional { - if (!prev_pos_) { - return {}; - } +auto Iterator::Next(Callback cb) -> void { auto db = db_.lock(); if (!db) { - return {}; + InvokeNull(cb); + return; } - std::unique_ptr> res{ - db->GetPage(&*prev_pos_).get()}; - prev_pos_ = res->prev_page(); - current_pos_ = prev_pos_; - return *res->values()[0]; + db->worker_task_->Dispatch([=]() { + std::lock_guard lock{pos_mutex_}; + if (!current_pos_) { + InvokeNull(cb); + return; + } + std::unique_ptr> res{ + db->dbGetPage(*current_pos_)}; + prev_pos_ = current_pos_; + current_pos_ = res->next_page(); + std::invoke(cb, *res->values()[0]); + }); } -auto Iterator::Next() -> std::optional { - if (!current_pos_) { - return {}; - } +auto Iterator::Prev(Callback cb) -> void { auto db = db_.lock(); if (!db) { - return {}; + InvokeNull(cb); + return; } - std::unique_ptr> res{ - db->GetPage(&*current_pos_).get()}; - prev_pos_ = current_pos_; - current_pos_ = res->next_page(); - return *res->values()[0]; + db->worker_task_->Dispatch([=]() { + std::lock_guard lock{pos_mutex_}; + if (!prev_pos_) { + InvokeNull(cb); + return; + } + std::unique_ptr> res{ + db->dbGetPage(*current_pos_)}; + current_pos_ = prev_pos_; + prev_pos_ = res->prev_page(); + std::invoke(cb, *res->values()[0]); + }); +} + +auto Iterator::InvokeNull(Callback cb) -> void { + std::invoke(cb, std::optional{}); } } // namespace database -- cgit v1.2.3