summaryrefslogtreecommitdiff
path: root/src/tasks/tasks.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/tasks/tasks.cpp')
-rw-r--r--src/tasks/tasks.cpp119
1 files changed, 36 insertions, 83 deletions
diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp
index 981ee20a..ea174039 100644
--- a/src/tasks/tasks.cpp
+++ b/src/tasks/tasks.cpp
@@ -31,14 +31,6 @@ template <>
auto Name<Type::kAudioConverter>() -> std::pmr::string {
return "audio_conv";
}
-template <>
-auto Name<Type::kDatabase>() -> std::pmr::string {
- return "db_fg";
-}
-template <>
-auto Name<Type::kBackgroundWorker>() -> std::pmr::string {
- return "bg_worker";
-}
template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>;
@@ -68,14 +60,10 @@ auto AllocateStack<Type::kAudioConverter>() -> cpp::span<StackType_t> {
static StackType_t sStack[size];
return {sStack, size};
}
-// Leveldb is designed for non-embedded use cases, where stack space isn't so
-// much of a concern. It therefore uses an eye-wateringly large amount of stack.
-template <>
-auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> {
- std::size_t size = 256 * 1024;
- return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)),
- size};
-}
+// Background workers receive huge stacks in PSRAM. This is mostly to faciliate
+// use of LevelDB from any bg worker; Leveldb is designed for non-embedded use
+// cases, where large stack usage isn't so much of a concern. It therefore uses
+// an eye-wateringly large amount of stack.
template <>
auto AllocateStack<Type::kBackgroundWorker>() -> cpp::span<StackType_t> {
std::size_t size = 256 * 1024;
@@ -115,26 +103,10 @@ auto Priority<Type::kUi>() -> UBaseType_t {
// couple of ms extra delay due to scheduling, so give this task the lowest
// priority.
template <>
-auto Priority<Type::kDatabase>() -> UBaseType_t {
- return 2;
-}
-template <>
auto Priority<Type::kBackgroundWorker>() -> UBaseType_t {
return 1;
}
-template <Type t>
-auto WorkerQueueSize() -> std::size_t;
-
-template <>
-auto WorkerQueueSize<Type::kDatabase>() -> std::size_t {
- return 8;
-}
-template <>
-auto WorkerQueueSize<Type::kBackgroundWorker>() -> std::size_t {
- return 8;
-}
-
auto PersistentMain(void* fn) -> void {
auto* function = reinterpret_cast<std::function<void(void)>*>(fn);
std::invoke(*function);
@@ -142,69 +114,50 @@ auto PersistentMain(void* fn) -> void {
vTaskDelete(NULL);
}
-auto Worker::Main(void* instance) {
- Worker* i = reinterpret_cast<Worker*>(instance);
+auto WorkerPool::Main(void* q) {
+ QueueHandle_t queue = reinterpret_cast<QueueHandle_t>(q);
while (1) {
WorkItem item;
- if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) {
- if (item.quit) {
- break;
- } else if (item.fn != nullptr) {
- std::invoke(*item.fn);
- delete item.fn;
- }
+ if (xQueueReceive(queue, &item, portMAX_DELAY)) {
+ std::invoke(*item);
+ delete item;
}
}
- i->is_task_running_.store(false);
- i->is_task_running_.notify_all();
- // Wait for the instance's destructor to delete this task. We do this instead
- // of just deleting ourselves so that it's 100% certain that it's safe to
- // delete or reuse this task's stack.
- while (1) {
- vTaskDelay(portMAX_DELAY);
+}
+
+static constexpr size_t kNumWorkers = 3;
+static constexpr size_t kMaxPendingItems = 8;
+
+WorkerPool::WorkerPool()
+ : queue_(xQueueCreate(kMaxPendingItems, sizeof(WorkItem))) {
+ for (size_t i = 0; i < kNumWorkers; i++) {
+ auto stack = AllocateStack<Type::kBackgroundWorker>();
+ // Task buffers must be in internal ram. Thankfully they're fairly small.
+ auto buffer = reinterpret_cast<StaticTask_t*>(heap_caps_malloc(
+ sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT));
+
+ std::string name = "worker_" + std::to_string(i);
+
+ xTaskCreateStatic(&Main, name.c_str(), stack.size(), queue_,
+ Priority<Type::kBackgroundWorker>(), stack.data(),
+ buffer);
}
}
-Worker::Worker(const std::pmr::string& name,
- cpp::span<StackType_t> stack,
- std::size_t queue_size,
- UBaseType_t priority)
- : stack_(stack.data()),
- queue_(xQueueCreate(queue_size, sizeof(WorkItem))),
- is_task_running_(true),
- task_buffer_(static_cast<StaticTask_t*>(
- heap_caps_malloc(sizeof(StaticTask_t),
- MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
- task_(xTaskCreateStatic(&Main,
- name.c_str(),
- stack.size(),
- this,
- priority,
- stack_,
- task_buffer_)) {}
-
-Worker::~Worker() {
- WorkItem item{
- .fn = nullptr,
- .quit = true,
- };
- xQueueSend(queue_, &item, portMAX_DELAY);
- is_task_running_.wait(true);
- vTaskDelete(task_);
- free(stack_);
+WorkerPool::~WorkerPool() {
+ // This should never happen!
+ assert("worker pool destroyed" == 0);
}
template <>
-auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> {
+auto WorkerPool::Dispatch(const std::function<void(void)> fn)
+ -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
- WorkItem item{
- .fn = new std::function<void(void)>([=]() {
- std::invoke(fn);
- promise->set_value();
- }),
- .quit = false,
- };
+ WorkItem item = new std::function<void(void)>([=]() {
+ std::invoke(fn);
+ promise->set_value();
+ });
xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future();
}