summaryrefslogtreecommitdiff
path: root/src/database
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-05-22 15:34:40 +1000
committerjacqueline <me@jacqueline.id.au>2023-05-22 15:35:50 +1000
commitda977aaa3feb397c426563da8897d51ebeba8d1f (patch)
tree3daf9278518b2cace7f26ec67e7eb486ba6adcf2 /src/database
parenta6ab1504058304012791281f9eb42c262745888f (diff)
parent5ac4d3949cd7430e0d4c994bbc528e8e4fa91337 (diff)
downloadtangara-fw-da977aaa3feb397c426563da8897d51ebeba8d1f.tar.gz
Merge branch 'main' into state-machines
Diffstat (limited to 'src/database')
-rw-r--r--src/database/CMakeLists.txt4
-rw-r--r--src/database/database.cpp88
-rw-r--r--src/database/db_task.cpp91
-rw-r--r--src/database/env_esp.cpp11
-rw-r--r--src/database/include/database.hpp6
-rw-r--r--src/database/include/db_task.hpp25
-rw-r--r--src/database/include/env_esp.hpp5
7 files changed, 66 insertions, 164 deletions
diff --git a/src/database/CMakeLists.txt b/src/database/CMakeLists.txt
index 897bf029..c769fa33 100644
--- a/src/database/CMakeLists.txt
+++ b/src/database/CMakeLists.txt
@@ -1,7 +1,7 @@
idf_component_register(
- SRCS "env_esp.cpp" "database.cpp" "song.cpp" "db_task.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp"
+ SRCS "env_esp.cpp" "database.cpp" "song.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp"
INCLUDE_DIRS "include"
- REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor")
+ REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor" "tasks")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})
diff --git a/src/database/database.cpp b/src/database/database.cpp
index f5fe5240..65a500d9 100644
--- a/src/database/database.cpp
+++ b/src/database/database.cpp
@@ -18,13 +18,13 @@
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
-#include "db_task.hpp"
#include "env_esp.hpp"
#include "file_gatherer.hpp"
#include "records.hpp"
#include "result.hpp"
#include "song.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
namespace database {
@@ -62,33 +62,33 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser)
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
- if (!StartDbTask()) {
- return cpp::fail(DatabaseError::ALREADY_OPEN);
- }
-
- return RunOnDbTask<cpp::result<Database*, DatabaseError>>(
- [=]() -> cpp::result<Database*, DatabaseError> {
- leveldb::DB* db;
- leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
- leveldb::Options options;
- options.env = sEnv.env();
- options.create_if_missing = true;
- options.write_buffer_size = 48 * 1024;
- options.max_file_size = 32;
- options.block_cache = cache;
- options.block_size = 512;
-
- auto status = leveldb::DB::Open(options, "/.db", &db);
- if (!status.ok()) {
- delete cache;
- ESP_LOGE(kTag, "failed to open db, status %s",
- status.ToString().c_str());
- return cpp::fail(FAILED_TO_OPEN);
- }
-
- ESP_LOGI(kTag, "Database opened successfully");
- return new Database(db, cache, gatherer, parser);
- })
+ std::shared_ptr<tasks::Worker> worker(
+ tasks::Worker::Start<tasks::Type::kDatabase>());
+ leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>(worker);
+ return worker
+ ->Dispatch<cpp::result<Database*, DatabaseError>>(
+ [&]() -> cpp::result<Database*, DatabaseError> {
+ leveldb::DB* db;
+ leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
+ leveldb::Options options;
+ options.env = sEnv.env();
+ options.create_if_missing = true;
+ options.write_buffer_size = 48 * 1024;
+ options.max_file_size = 32;
+ options.block_cache = cache;
+ options.block_size = 512;
+
+ auto status = leveldb::DB::Open(options, "/.db", &db);
+ if (!status.ok()) {
+ delete cache;
+ ESP_LOGE(kTag, "failed to open db, status %s",
+ status.ToString().c_str());
+ return cpp::fail(FAILED_TO_OPEN);
+ }
+
+ ESP_LOGI(kTag, "Database opened successfully");
+ return new Database(db, cache, gatherer, parser, worker);
+ })
.get();
}
@@ -101,9 +101,11 @@ auto Database::Destroy() -> void {
Database::Database(leveldb::DB* db,
leveldb::Cache* cache,
IFileGatherer* file_gatherer,
- ITagParser* tag_parser)
+ ITagParser* tag_parser,
+ std::shared_ptr<tasks::Worker> worker)
: db_(db),
cache_(cache),
+ worker_task_(worker),
file_gatherer_(file_gatherer),
tag_parser_(tag_parser) {}
@@ -113,12 +115,13 @@ Database::~Database() {
delete db_;
delete cache_;
- QuitDbTask();
+ leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>();
+
sIsDbOpen.store(false);
}
auto Database::Update() -> std::future<void> {
- return RunOnDbTask<void>([&]() -> void {
+ return worker_task_->Dispatch<void>([&]() -> void {
// Stage 1: verify all existing songs are still valid.
ESP_LOGI(kTag, "verifying existing songs");
const leveldb::Snapshot* snapshot = db_->GetSnapshot();
@@ -219,7 +222,7 @@ auto Database::Update() -> std::future<void> {
}
auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
- return RunOnDbTask<Result<Song>*>([=, this]() -> Result<Song>* {
+ return worker_task_->Dispatch<Result<Song>*>([=, this]() -> Result<Song>* {
Continuation<Song> c{.iterator = nullptr,
.prefix = CreateDataPrefix().data,
.start_key = CreateDataPrefix().data,
@@ -232,21 +235,22 @@ auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
auto Database::GetDump(std::size_t page_size)
-> std::future<Result<std::string>*> {
- return RunOnDbTask<Result<std::string>*>([=, this]() -> Result<std::string>* {
- Continuation<std::string> c{.iterator = nullptr,
- .prefix = "",
- .start_key = "",
- .forward = true,
- .was_prev_forward = true,
- .page_size = page_size};
- return dbGetPage(c);
- });
+ return worker_task_->Dispatch<Result<std::string>*>(
+ [=, this]() -> Result<std::string>* {
+ Continuation<std::string> c{.iterator = nullptr,
+ .prefix = "",
+ .start_key = "",
+ .forward = true,
+ .was_prev_forward = true,
+ .page_size = page_size};
+ return dbGetPage(c);
+ });
}
template <typename T>
auto Database::GetPage(Continuation<T>* c) -> std::future<Result<T>*> {
Continuation<T> copy = *c;
- return RunOnDbTask<Result<T>*>(
+ return worker_task_->Dispatch<Result<T>*>(
[=, this]() -> Result<T>* { return dbGetPage(copy); });
}
diff --git a/src/database/db_task.cpp b/src/database/db_task.cpp
deleted file mode 100644
index 5b4b34b5..00000000
--- a/src/database/db_task.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#include "db_task.hpp"
-
-#include <functional>
-
-#include "esp_heap_caps.h"
-#include "freertos/FreeRTOS.h"
-#include "freertos/portmacro.h"
-#include "freertos/projdefs.h"
-#include "freertos/queue.h"
-#include "freertos/task.h"
-
-namespace database {
-
-static const std::size_t kDbStackSize = 256 * 1024;
-static StaticTask_t sDbStaticTask;
-static StackType_t* sDbStack = nullptr;
-
-static std::atomic<bool> sTaskRunning(false);
-static QueueHandle_t sWorkQueue;
-
-struct WorkItem {
- std::function<void(void)>* fn;
- bool quit;
-};
-
-auto SendToDbTask(std::function<void(void)> fn) -> void {
- WorkItem item{
- .fn = new std::function<void(void)>(fn),
- .quit = false,
- };
- xQueueSend(sWorkQueue, &item, portMAX_DELAY);
-}
-
-template <>
-auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void> {
- std::shared_ptr<std::promise<void>> promise =
- std::make_shared<std::promise<void>>();
- SendToDbTask([=]() {
- std::invoke(fn);
- promise->set_value();
- });
- return promise->get_future();
-}
-
-void DatabaseTaskMain(void* args) {
- while (true) {
- WorkItem item;
- if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) {
- if (item.fn != nullptr) {
- std::invoke(*item.fn);
- delete item.fn;
- }
- if (item.quit) {
- break;
- }
- }
- }
- vQueueDelete(sWorkQueue);
- sTaskRunning.store(false);
- vTaskDelete(NULL);
-}
-
-auto StartDbTask() -> bool {
- if (sTaskRunning.exchange(true)) {
- return false;
- }
- if (sDbStack == nullptr) {
- sDbStack = reinterpret_cast<StackType_t*>(
- heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM));
- }
- sWorkQueue = xQueueCreate(8, sizeof(WorkItem));
- xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack,
- &sDbStaticTask);
- return true;
-}
-
-auto QuitDbTask() -> void {
- if (!sTaskRunning.load()) {
- return;
- }
- WorkItem item{
- .fn = nullptr,
- .quit = true,
- };
- xQueueSend(sWorkQueue, &item, portMAX_DELAY);
- while (sTaskRunning.load()) {
- vTaskDelay(pdMS_TO_TICKS(1));
- }
-}
-
-} // namespace database
diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp
index 71d4fcea..3bc68984 100644
--- a/src/database/env_esp.cpp
+++ b/src/database/env_esp.cpp
@@ -29,10 +29,12 @@
#include "leveldb/slice.h"
#include "leveldb/status.h"
-#include "db_task.hpp"
+#include "tasks.hpp"
namespace leveldb {
+std::weak_ptr<tasks::Worker> sBackgroundThread;
+
std::string ErrToStr(FRESULT err) {
switch (err) {
case FR_OK:
@@ -455,8 +457,11 @@ EspEnv::EspEnv() {}
void EspEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
- database::SendToDbTask(
- [=]() { std::invoke(background_work_function, background_work_arg); });
+ auto worker = sBackgroundThread.lock();
+ if (worker) {
+ worker->Dispatch<void>(
+ [=]() { std::invoke(background_work_function, background_work_arg); });
+ }
}
} // namespace leveldb
diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp
index da0ed083..ce2ca9fe 100644
--- a/src/database/include/database.hpp
+++ b/src/database/include/database.hpp
@@ -19,6 +19,7 @@
#include "result.hpp"
#include "song.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
namespace database {
@@ -90,6 +91,8 @@ class Database {
leveldb::DB* db_;
leveldb::Cache* cache_;
+ std::shared_ptr<tasks::Worker> worker_task_;
+
// Not owned.
IFileGatherer* file_gatherer_;
ITagParser* tag_parser_;
@@ -97,7 +100,8 @@ class Database {
Database(leveldb::DB* db,
leveldb::Cache* cache,
IFileGatherer* file_gatherer,
- ITagParser* tag_parser);
+ ITagParser* tag_parser,
+ std::shared_ptr<tasks::Worker> worker);
auto dbMintNewSongId() -> SongId;
auto dbEntomb(SongId song, uint64_t hash) -> void;
diff --git a/src/database/include/db_task.hpp b/src/database/include/db_task.hpp
deleted file mode 100644
index 39f932b0..00000000
--- a/src/database/include/db_task.hpp
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-
-#include <functional>
-#include <future>
-#include <memory>
-
-namespace database {
-
-auto StartDbTask() -> bool;
-auto QuitDbTask() -> void;
-
-auto SendToDbTask(std::function<void(void)> fn) -> void;
-
-template <typename T>
-auto RunOnDbTask(std::function<T(void)> fn) -> std::future<T> {
- std::shared_ptr<std::promise<T>> promise =
- std::make_shared<std::promise<T>>();
- SendToDbTask([=]() { promise->set_value(std::invoke(fn)); });
- return promise->get_future();
-}
-
-template <>
-auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void>;
-
-} // namespace database
diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp
index cf5a20e1..6a415ce6 100644
--- a/src/database/include/env_esp.hpp
+++ b/src/database/include/env_esp.hpp
@@ -1,5 +1,6 @@
#pragma once
+#include <memory>
#include <mutex>
#include <set>
#include <string>
@@ -7,8 +8,12 @@
#include "leveldb/env.h"
#include "leveldb/status.h"
+#include "tasks.hpp"
+
namespace leveldb {
+extern std::weak_ptr<tasks::Worker> sBackgroundThread;
+
// Tracks the files locked by EspEnv::LockFile().
//
// We maintain a separate set instead of relying on fcntl(F_SETLK) because