From dad159dc3c4ebc89d395b4b695a0626b7be10578 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Thu, 1 Feb 2024 10:54:20 +1100 Subject: Use a single pool of bg workers instead of separate tasks per use case Also: bump the number of workers tasks up to 3 from 2! This makes bg db updates + playback work :) --- src/tasks/tasks.cpp | 119 ++++++++++++++++------------------------------------ src/tasks/tasks.hpp | 44 +++++-------------- 2 files changed, 47 insertions(+), 116 deletions(-) (limited to 'src/tasks') diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp index 981ee20a..ea174039 100644 --- a/src/tasks/tasks.cpp +++ b/src/tasks/tasks.cpp @@ -31,14 +31,6 @@ template <> auto Name() -> std::pmr::string { return "audio_conv"; } -template <> -auto Name() -> std::pmr::string { - return "db_fg"; -} -template <> -auto Name() -> std::pmr::string { - return "bg_worker"; -} template auto AllocateStack() -> cpp::span; @@ -68,14 +60,10 @@ auto AllocateStack() -> cpp::span { static StackType_t sStack[size]; return {sStack, size}; } -// Leveldb is designed for non-embedded use cases, where stack space isn't so -// much of a concern. It therefore uses an eye-wateringly large amount of stack. -template <> -auto AllocateStack() -> cpp::span { - std::size_t size = 256 * 1024; - return {static_cast(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)), - size}; -} +// Background workers receive huge stacks in PSRAM. This is mostly to faciliate +// use of LevelDB from any bg worker; Leveldb is designed for non-embedded use +// cases, where large stack usage isn't so much of a concern. It therefore uses +// an eye-wateringly large amount of stack. template <> auto AllocateStack() -> cpp::span { std::size_t size = 256 * 1024; @@ -115,26 +103,10 @@ auto Priority() -> UBaseType_t { // couple of ms extra delay due to scheduling, so give this task the lowest // priority. template <> -auto Priority() -> UBaseType_t { - return 2; -} -template <> auto Priority() -> UBaseType_t { return 1; } -template -auto WorkerQueueSize() -> std::size_t; - -template <> -auto WorkerQueueSize() -> std::size_t { - return 8; -} -template <> -auto WorkerQueueSize() -> std::size_t { - return 8; -} - auto PersistentMain(void* fn) -> void { auto* function = reinterpret_cast*>(fn); std::invoke(*function); @@ -142,69 +114,50 @@ auto PersistentMain(void* fn) -> void { vTaskDelete(NULL); } -auto Worker::Main(void* instance) { - Worker* i = reinterpret_cast(instance); +auto WorkerPool::Main(void* q) { + QueueHandle_t queue = reinterpret_cast(q); while (1) { WorkItem item; - if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) { - if (item.quit) { - break; - } else if (item.fn != nullptr) { - std::invoke(*item.fn); - delete item.fn; - } + if (xQueueReceive(queue, &item, portMAX_DELAY)) { + std::invoke(*item); + delete item; } } - i->is_task_running_.store(false); - i->is_task_running_.notify_all(); - // Wait for the instance's destructor to delete this task. We do this instead - // of just deleting ourselves so that it's 100% certain that it's safe to - // delete or reuse this task's stack. - while (1) { - vTaskDelay(portMAX_DELAY); +} + +static constexpr size_t kNumWorkers = 3; +static constexpr size_t kMaxPendingItems = 8; + +WorkerPool::WorkerPool() + : queue_(xQueueCreate(kMaxPendingItems, sizeof(WorkItem))) { + for (size_t i = 0; i < kNumWorkers; i++) { + auto stack = AllocateStack(); + // Task buffers must be in internal ram. Thankfully they're fairly small. + auto buffer = reinterpret_cast(heap_caps_malloc( + sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT)); + + std::string name = "worker_" + std::to_string(i); + + xTaskCreateStatic(&Main, name.c_str(), stack.size(), queue_, + Priority(), stack.data(), + buffer); } } -Worker::Worker(const std::pmr::string& name, - cpp::span stack, - std::size_t queue_size, - UBaseType_t priority) - : stack_(stack.data()), - queue_(xQueueCreate(queue_size, sizeof(WorkItem))), - is_task_running_(true), - task_buffer_(static_cast( - heap_caps_malloc(sizeof(StaticTask_t), - MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))), - task_(xTaskCreateStatic(&Main, - name.c_str(), - stack.size(), - this, - priority, - stack_, - task_buffer_)) {} - -Worker::~Worker() { - WorkItem item{ - .fn = nullptr, - .quit = true, - }; - xQueueSend(queue_, &item, portMAX_DELAY); - is_task_running_.wait(true); - vTaskDelete(task_); - free(stack_); +WorkerPool::~WorkerPool() { + // This should never happen! + assert("worker pool destroyed" == 0); } template <> -auto Worker::Dispatch(const std::function fn) -> std::future { +auto WorkerPool::Dispatch(const std::function fn) + -> std::future { std::shared_ptr> promise = std::make_shared>(); - WorkItem item{ - .fn = new std::function([=]() { - std::invoke(fn); - promise->set_value(); - }), - .quit = false, - }; + WorkItem item = new std::function([=]() { + std::invoke(fn); + promise->set_value(); + }); xQueueSend(queue_, &item, portMAX_DELAY); return promise->get_future(); } diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp index 6a3a0d09..1623a8d8 100644 --- a/src/tasks/tasks.hpp +++ b/src/tasks/tasks.hpp @@ -48,8 +48,6 @@ template auto AllocateStack() -> cpp::span; template auto Priority() -> UBaseType_t; -template -auto WorkerQueueSize() -> std::size_t; auto PersistentMain(void* fn) -> void; @@ -74,32 +72,15 @@ auto StartPersistent(BaseType_t core, const std::function& fn) Priority(), stack.data(), task_buffer, core); } -class Worker { +class WorkerPool { private: - Worker(const std::pmr::string& name, - cpp::span stack, - std::size_t queue_size, - UBaseType_t priority); - - StackType_t* stack_; QueueHandle_t queue_; - std::atomic is_task_running_; - StaticTask_t *task_buffer_; - TaskHandle_t task_; - - struct WorkItem { - std::function* fn; - bool quit; - }; + using WorkItem = std::function*; + static auto Main(void* instance); public: - template - static auto Start() -> Worker* { - return new Worker(Name(), AllocateStack(), WorkerQueueSize(), - Priority()); - } - - static auto Main(void* instance); + WorkerPool(); + ~WorkerPool(); /* * Schedules the given function to be executed on the worker task, and @@ -109,22 +90,19 @@ class Worker { auto Dispatch(const std::function fn) -> std::future { std::shared_ptr> promise = std::make_shared>(); - WorkItem item{ - .fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }), - .quit = false, - }; + WorkItem item = + new std::function([=]() { promise->set_value(std::invoke(fn)); }); xQueueSend(queue_, &item, portMAX_DELAY); return promise->get_future(); } - ~Worker(); - - Worker(const Worker&) = delete; - Worker& operator=(const Worker&) = delete; + WorkerPool(const WorkerPool&) = delete; + WorkerPool& operator=(const WorkerPool&) = delete; }; /* Specialisation of Evaluate for functions that return nothing. */ template <> -auto Worker::Dispatch(const std::function fn) -> std::future; +auto WorkerPool::Dispatch(const std::function fn) + -> std::future; } // namespace tasks -- cgit v1.2.3