diff options
Diffstat (limited to 'src/database')
| -rw-r--r-- | src/database/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/database/database.cpp | 88 | ||||
| -rw-r--r-- | src/database/db_task.cpp | 91 | ||||
| -rw-r--r-- | src/database/env_esp.cpp | 11 | ||||
| -rw-r--r-- | src/database/include/database.hpp | 6 | ||||
| -rw-r--r-- | src/database/include/db_task.hpp | 25 | ||||
| -rw-r--r-- | src/database/include/env_esp.hpp | 5 |
7 files changed, 66 insertions, 164 deletions
diff --git a/src/database/CMakeLists.txt b/src/database/CMakeLists.txt index 897bf029..c769fa33 100644 --- a/src/database/CMakeLists.txt +++ b/src/database/CMakeLists.txt @@ -1,7 +1,7 @@ idf_component_register( - SRCS "env_esp.cpp" "database.cpp" "song.cpp" "db_task.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp" + SRCS "env_esp.cpp" "database.cpp" "song.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp" INCLUDE_DIRS "include" - REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor") + REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor" "tasks") target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS}) diff --git a/src/database/database.cpp b/src/database/database.cpp index f5fe5240..65a500d9 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -18,13 +18,13 @@ #include "leveldb/slice.h" #include "leveldb/write_batch.h" -#include "db_task.hpp" #include "env_esp.hpp" #include "file_gatherer.hpp" #include "records.hpp" #include "result.hpp" #include "song.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" namespace database { @@ -62,33 +62,33 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser) return cpp::fail(DatabaseError::ALREADY_OPEN); } - if (!StartDbTask()) { - return cpp::fail(DatabaseError::ALREADY_OPEN); - } - - return RunOnDbTask<cpp::result<Database*, DatabaseError>>( - [=]() -> cpp::result<Database*, DatabaseError> { - leveldb::DB* db; - leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); - leveldb::Options options; - options.env = sEnv.env(); - options.create_if_missing = true; - options.write_buffer_size = 48 * 1024; - options.max_file_size = 32; - options.block_cache = cache; - options.block_size = 512; - - auto status = leveldb::DB::Open(options, "/.db", &db); - if (!status.ok()) { - delete cache; - ESP_LOGE(kTag, "failed to open db, status %s", - status.ToString().c_str()); - return cpp::fail(FAILED_TO_OPEN); - } - - ESP_LOGI(kTag, "Database opened successfully"); - return new Database(db, cache, gatherer, parser); - }) + std::shared_ptr<tasks::Worker> worker( + tasks::Worker::Start<tasks::Type::kDatabase>()); + leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>(worker); + return worker + ->Dispatch<cpp::result<Database*, DatabaseError>>( + [&]() -> cpp::result<Database*, DatabaseError> { + leveldb::DB* db; + leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024); + leveldb::Options options; + options.env = sEnv.env(); + options.create_if_missing = true; + options.write_buffer_size = 48 * 1024; + options.max_file_size = 32; + options.block_cache = cache; + options.block_size = 512; + + auto status = leveldb::DB::Open(options, "/.db", &db); + if (!status.ok()) { + delete cache; + ESP_LOGE(kTag, "failed to open db, status %s", + status.ToString().c_str()); + return cpp::fail(FAILED_TO_OPEN); + } + + ESP_LOGI(kTag, "Database opened successfully"); + return new Database(db, cache, gatherer, parser, worker); + }) .get(); } @@ -101,9 +101,11 @@ auto Database::Destroy() -> void { Database::Database(leveldb::DB* db, leveldb::Cache* cache, IFileGatherer* file_gatherer, - ITagParser* tag_parser) + ITagParser* tag_parser, + std::shared_ptr<tasks::Worker> worker) : db_(db), cache_(cache), + worker_task_(worker), file_gatherer_(file_gatherer), tag_parser_(tag_parser) {} @@ -113,12 +115,13 @@ Database::~Database() { delete db_; delete cache_; - QuitDbTask(); + leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>(); + sIsDbOpen.store(false); } auto Database::Update() -> std::future<void> { - return RunOnDbTask<void>([&]() -> void { + return worker_task_->Dispatch<void>([&]() -> void { // Stage 1: verify all existing songs are still valid. ESP_LOGI(kTag, "verifying existing songs"); const leveldb::Snapshot* snapshot = db_->GetSnapshot(); @@ -219,7 +222,7 @@ auto Database::Update() -> std::future<void> { } auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> { - return RunOnDbTask<Result<Song>*>([=, this]() -> Result<Song>* { + return worker_task_->Dispatch<Result<Song>*>([=, this]() -> Result<Song>* { Continuation<Song> c{.iterator = nullptr, .prefix = CreateDataPrefix().data, .start_key = CreateDataPrefix().data, @@ -232,21 +235,22 @@ auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> { auto Database::GetDump(std::size_t page_size) -> std::future<Result<std::string>*> { - return RunOnDbTask<Result<std::string>*>([=, this]() -> Result<std::string>* { - Continuation<std::string> c{.iterator = nullptr, - .prefix = "", - .start_key = "", - .forward = true, - .was_prev_forward = true, - .page_size = page_size}; - return dbGetPage(c); - }); + return worker_task_->Dispatch<Result<std::string>*>( + [=, this]() -> Result<std::string>* { + Continuation<std::string> c{.iterator = nullptr, + .prefix = "", + .start_key = "", + .forward = true, + .was_prev_forward = true, + .page_size = page_size}; + return dbGetPage(c); + }); } template <typename T> auto Database::GetPage(Continuation<T>* c) -> std::future<Result<T>*> { Continuation<T> copy = *c; - return RunOnDbTask<Result<T>*>( + return worker_task_->Dispatch<Result<T>*>( [=, this]() -> Result<T>* { return dbGetPage(copy); }); } diff --git a/src/database/db_task.cpp b/src/database/db_task.cpp deleted file mode 100644 index 5b4b34b5..00000000 --- a/src/database/db_task.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "db_task.hpp" - -#include <functional> - -#include "esp_heap_caps.h" -#include "freertos/FreeRTOS.h" -#include "freertos/portmacro.h" -#include "freertos/projdefs.h" -#include "freertos/queue.h" -#include "freertos/task.h" - -namespace database { - -static const std::size_t kDbStackSize = 256 * 1024; -static StaticTask_t sDbStaticTask; -static StackType_t* sDbStack = nullptr; - -static std::atomic<bool> sTaskRunning(false); -static QueueHandle_t sWorkQueue; - -struct WorkItem { - std::function<void(void)>* fn; - bool quit; -}; - -auto SendToDbTask(std::function<void(void)> fn) -> void { - WorkItem item{ - .fn = new std::function<void(void)>(fn), - .quit = false, - }; - xQueueSend(sWorkQueue, &item, portMAX_DELAY); -} - -template <> -auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void> { - std::shared_ptr<std::promise<void>> promise = - std::make_shared<std::promise<void>>(); - SendToDbTask([=]() { - std::invoke(fn); - promise->set_value(); - }); - return promise->get_future(); -} - -void DatabaseTaskMain(void* args) { - while (true) { - WorkItem item; - if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) { - if (item.fn != nullptr) { - std::invoke(*item.fn); - delete item.fn; - } - if (item.quit) { - break; - } - } - } - vQueueDelete(sWorkQueue); - sTaskRunning.store(false); - vTaskDelete(NULL); -} - -auto StartDbTask() -> bool { - if (sTaskRunning.exchange(true)) { - return false; - } - if (sDbStack == nullptr) { - sDbStack = reinterpret_cast<StackType_t*>( - heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM)); - } - sWorkQueue = xQueueCreate(8, sizeof(WorkItem)); - xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack, - &sDbStaticTask); - return true; -} - -auto QuitDbTask() -> void { - if (!sTaskRunning.load()) { - return; - } - WorkItem item{ - .fn = nullptr, - .quit = true, - }; - xQueueSend(sWorkQueue, &item, portMAX_DELAY); - while (sTaskRunning.load()) { - vTaskDelay(pdMS_TO_TICKS(1)); - } -} - -} // namespace database diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp index 71d4fcea..3bc68984 100644 --- a/src/database/env_esp.cpp +++ b/src/database/env_esp.cpp @@ -29,10 +29,12 @@ #include "leveldb/slice.h" #include "leveldb/status.h" -#include "db_task.hpp" +#include "tasks.hpp" namespace leveldb { +std::weak_ptr<tasks::Worker> sBackgroundThread; + std::string ErrToStr(FRESULT err) { switch (err) { case FR_OK: @@ -455,8 +457,11 @@ EspEnv::EspEnv() {} void EspEnv::Schedule( void (*background_work_function)(void* background_work_arg), void* background_work_arg) { - database::SendToDbTask( - [=]() { std::invoke(background_work_function, background_work_arg); }); + auto worker = sBackgroundThread.lock(); + if (worker) { + worker->Dispatch<void>( + [=]() { std::invoke(background_work_function, background_work_arg); }); + } } } // namespace leveldb diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index da0ed083..ce2ca9fe 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -19,6 +19,7 @@ #include "result.hpp" #include "song.hpp" #include "tag_parser.hpp" +#include "tasks.hpp" namespace database { @@ -90,6 +91,8 @@ class Database { leveldb::DB* db_; leveldb::Cache* cache_; + std::shared_ptr<tasks::Worker> worker_task_; + // Not owned. IFileGatherer* file_gatherer_; ITagParser* tag_parser_; @@ -97,7 +100,8 @@ class Database { Database(leveldb::DB* db, leveldb::Cache* cache, IFileGatherer* file_gatherer, - ITagParser* tag_parser); + ITagParser* tag_parser, + std::shared_ptr<tasks::Worker> worker); auto dbMintNewSongId() -> SongId; auto dbEntomb(SongId song, uint64_t hash) -> void; diff --git a/src/database/include/db_task.hpp b/src/database/include/db_task.hpp deleted file mode 100644 index 39f932b0..00000000 --- a/src/database/include/db_task.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include <functional> -#include <future> -#include <memory> - -namespace database { - -auto StartDbTask() -> bool; -auto QuitDbTask() -> void; - -auto SendToDbTask(std::function<void(void)> fn) -> void; - -template <typename T> -auto RunOnDbTask(std::function<T(void)> fn) -> std::future<T> { - std::shared_ptr<std::promise<T>> promise = - std::make_shared<std::promise<T>>(); - SendToDbTask([=]() { promise->set_value(std::invoke(fn)); }); - return promise->get_future(); -} - -template <> -auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void>; - -} // namespace database diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp index cf5a20e1..6a415ce6 100644 --- a/src/database/include/env_esp.hpp +++ b/src/database/include/env_esp.hpp @@ -1,5 +1,6 @@ #pragma once +#include <memory> #include <mutex> #include <set> #include <string> @@ -7,8 +8,12 @@ #include "leveldb/env.h" #include "leveldb/status.h" +#include "tasks.hpp" + namespace leveldb { +extern std::weak_ptr<tasks::Worker> sBackgroundThread; + // Tracks the files locked by EspEnv::LockFile(). // // We maintain a separate set instead of relying on fcntl(F_SETLK) because |
