diff options
| author | jacqueline <me@jacqueline.id.au> | 2023-11-21 16:20:01 +1100 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2023-11-21 16:20:21 +1100 |
| commit | cd46d7bd203b69e6d163fd19e38600d9feae6e56 (patch) | |
| tree | 0e10f7fbc07ab80a58964f52a229c91cbc706833 | |
| parent | 5afdb89f9ac63f4892257a4eaf1c65a3b49f3284 (diff) | |
| download | tangara-fw-cd46d7bd203b69e6d163fd19e38600d9feae6e56.tar.gz | |
Make lua db iterators async
| -rw-r--r-- | src/database/database.cpp | 58 | ||||
| -rw-r--r-- | src/database/include/database.hpp | 14 | ||||
| -rw-r--r-- | src/lua/lua_database.cpp | 27 |
3 files changed, 66 insertions, 33 deletions
diff --git a/src/database/database.cpp b/src/database/database.cpp index 0967eb95..88ae7bbe 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -858,7 +858,7 @@ auto IndexRecord::Expand(std::size_t page_size) const } Iterator::Iterator(std::weak_ptr<Database> db, const IndexInfo& idx) - : db_(db), prev_pos_(), current_pos_() { + : db_(db), pos_mutex_(), current_pos_(), prev_pos_() { std::string prefix = EncodeIndexPrefix( IndexKey::Header{.id = idx.id, .depth = 0, .components_hash = 0}); current_pos_ = Continuation{.prefix = {prefix.data(), prefix.size()}, @@ -869,36 +869,50 @@ Iterator::Iterator(std::weak_ptr<Database> db, const IndexInfo& idx) } Iterator::Iterator(std::weak_ptr<Database> db, const Continuation& c) - : db_(db), prev_pos_(), current_pos_(c) {} + : db_(db), pos_mutex_(), current_pos_(c), prev_pos_() {} -auto Iterator::Prev() -> std::optional<IndexRecord> { - if (!prev_pos_) { - return {}; - } +auto Iterator::Next(Callback cb) -> void { auto db = db_.lock(); if (!db) { - return {}; + InvokeNull(cb); + return; } - std::unique_ptr<Result<IndexRecord>> res{ - db->GetPage<IndexRecord>(&*prev_pos_).get()}; - prev_pos_ = res->prev_page(); - current_pos_ = prev_pos_; - return *res->values()[0]; + db->worker_task_->Dispatch<void>([=]() { + std::lock_guard lock{pos_mutex_}; + if (!current_pos_) { + InvokeNull(cb); + return; + } + std::unique_ptr<Result<IndexRecord>> res{ + db->dbGetPage<IndexRecord>(*current_pos_)}; + prev_pos_ = current_pos_; + current_pos_ = res->next_page(); + std::invoke(cb, *res->values()[0]); + }); } -auto Iterator::Next() -> std::optional<IndexRecord> { - if (!current_pos_) { - return {}; - } +auto Iterator::Prev(Callback cb) -> void { auto db = db_.lock(); if (!db) { - return {}; + InvokeNull(cb); + return; } - std::unique_ptr<Result<IndexRecord>> res{ - db->GetPage<IndexRecord>(&*current_pos_).get()}; - prev_pos_ = current_pos_; - current_pos_ = res->next_page(); - return *res->values()[0]; + db->worker_task_->Dispatch<void>([=]() { + std::lock_guard lock{pos_mutex_}; + if (!prev_pos_) { + InvokeNull(cb); + return; + } + std::unique_ptr<Result<IndexRecord>> res{ + db->dbGetPage<IndexRecord>(*current_pos_)}; + current_pos_ = prev_pos_; + prev_pos_ = res->prev_page(); + std::invoke(cb, *res->values()[0]); + }); +} + +auto Iterator::InvokeNull(Callback cb) -> void { + std::invoke(cb, std::optional<IndexRecord>{}); } } // namespace database diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index 972871db..63014bed 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -129,6 +129,8 @@ class Database { Database& operator=(const Database&) = delete; private: + friend class Iterator; + // Owned. Dumb pointers because destruction needs to be done in an explicit // order. leveldb::DB* db_; @@ -191,13 +193,19 @@ class Iterator { Iterator(std::weak_ptr<Database>, const IndexInfo&); Iterator(std::weak_ptr<Database>, const Continuation&); - auto Prev() -> std::optional<IndexRecord>; - auto Next() -> std::optional<IndexRecord>; + using Callback = std::function<void(std::optional<IndexRecord>)>; + + auto Next(Callback) -> void; + auto Prev(Callback) -> void; private: + auto InvokeNull(Callback) -> void; + std::weak_ptr<Database> db_; - std::optional<Continuation> prev_pos_; + + std::mutex pos_mutex_; std::optional<Continuation> current_pos_; + std::optional<Continuation> prev_pos_; }; } // namespace database diff --git a/src/lua/lua_database.cpp b/src/lua/lua_database.cpp index 545dcd31..d8ae86f6 100644 --- a/src/lua/lua_database.cpp +++ b/src/lua/lua_database.cpp @@ -56,17 +56,28 @@ static const struct luaL_Reg kDatabaseFuncs[] = {{"indexes", indexes}, {NULL, NULL}}; static auto db_iterate(lua_State* state) -> int { + luaL_checktype(state, 1, LUA_TFUNCTION); + int callback_ref = luaL_ref(state, LUA_REGISTRYINDEX); + database::Iterator* it = *reinterpret_cast<database::Iterator**>( lua_touserdata(state, lua_upvalueindex(1))); - auto res = it->Next(); - if (res) { - database::IndexRecord** record = reinterpret_cast<database::IndexRecord**>( - lua_newuserdata(state, sizeof(uintptr_t))); - *record = new database::IndexRecord(*res); - luaL_setmetatable(state, kDbRecordMetatable); - return 1; - } + it->Next([=](std::optional<database::IndexRecord> res) { + events::Ui().RunOnTask([=]() { + lua_rawgeti(state, LUA_REGISTRYINDEX, callback_ref); + if (res) { + database::IndexRecord** record = + reinterpret_cast<database::IndexRecord**>( + lua_newuserdata(state, sizeof(uintptr_t))); + *record = new database::IndexRecord(*res); + luaL_setmetatable(state, kDbRecordMetatable); + } else { + lua_pushnil(state); + } + lua_call(state, 1, 0); + luaL_unref(state, LUA_REGISTRYINDEX, callback_ref); + }); + }); return 0; } |
