diff options
| author | jacqueline <me@jacqueline.id.au> | 2024-02-01 10:54:20 +1100 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2024-02-01 10:54:20 +1100 |
| commit | dad159dc3c4ebc89d395b4b695a0626b7be10578 (patch) | |
| tree | e46c9b00f6a2879088b164347f9f35a5636ddad6 /src | |
| parent | fde45dba39152064add0379a8ae68b905adff9b9 (diff) | |
| download | tangara-fw-dad159dc3c4ebc89d395b4b695a0626b7be10578.tar.gz | |
Use a single pool of bg workers instead of separate tasks per use case
Also: bump the number of workers tasks up to 3 from 2!
This makes bg db updates + playback work :)
Diffstat (limited to 'src')
| -rw-r--r-- | src/audio/fatfs_audio_input.cpp | 2 | ||||
| -rw-r--r-- | src/audio/include/fatfs_audio_input.hpp | 4 | ||||
| -rw-r--r-- | src/audio/include/readahead_source.hpp | 4 | ||||
| -rw-r--r-- | src/audio/include/track_queue.hpp | 4 | ||||
| -rw-r--r-- | src/audio/readahead_source.cpp | 2 | ||||
| -rw-r--r-- | src/audio/track_queue.cpp | 2 | ||||
| -rw-r--r-- | src/database/database.cpp | 4 | ||||
| -rw-r--r-- | src/database/env_esp.cpp | 2 | ||||
| -rw-r--r-- | src/database/include/database.hpp | 2 | ||||
| -rw-r--r-- | src/database/include/env_esp.hpp | 2 | ||||
| -rw-r--r-- | src/system_fsm/booting.cpp | 3 | ||||
| -rw-r--r-- | src/system_fsm/include/service_locator.hpp | 14 | ||||
| -rw-r--r-- | src/tasks/tasks.cpp | 119 | ||||
| -rw-r--r-- | src/tasks/tasks.hpp | 44 |
14 files changed, 67 insertions, 141 deletions
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index 1dcab97a..de64b14b 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -44,7 +44,7 @@ namespace audio { FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser, - tasks::Worker& bg_worker) + tasks::WorkerPool& bg_worker) : IAudioSource(), tag_parser_(tag_parser), bg_worker_(bg_worker), diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index f9635b86..4cccbb46 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -31,7 +31,7 @@ namespace audio { */ class FatfsAudioInput : public IAudioSource { public: - explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&); + explicit FatfsAudioInput(database::ITagParser&, tasks::WorkerPool&); ~FatfsAudioInput(); /* @@ -55,7 +55,7 @@ class FatfsAudioInput : public IAudioSource { -> std::optional<codecs::StreamType>; database::ITagParser& tag_parser_; - tasks::Worker& bg_worker_; + tasks::WorkerPool& bg_worker_; std::mutex new_stream_mutex_; std::shared_ptr<TaggedStream> new_stream_; diff --git a/src/audio/include/readahead_source.hpp b/src/audio/include/readahead_source.hpp index 58076350..3e18a989 100644 --- a/src/audio/include/readahead_source.hpp +++ b/src/audio/include/readahead_source.hpp @@ -27,7 +27,7 @@ namespace audio { */ class ReadaheadSource : public codecs::IStream { public: - ReadaheadSource(tasks::Worker&, std::unique_ptr<codecs::IStream>); + ReadaheadSource(tasks::WorkerPool&, std::unique_ptr<codecs::IStream>); ~ReadaheadSource(); auto Read(cpp::span<std::byte> dest) -> ssize_t override; @@ -48,7 +48,7 @@ class ReadaheadSource : public codecs::IStream { private: auto BeginReadahead() -> void; - tasks::Worker& worker_; + tasks::WorkerPool& worker_; std::unique_ptr<codecs::IStream> wrapped_; bool readahead_enabled_; diff --git a/src/audio/include/track_queue.hpp b/src/audio/include/track_queue.hpp index 24b4fe48..0ff72021 100644 --- a/src/audio/include/track_queue.hpp +++ b/src/audio/include/track_queue.hpp @@ -57,7 +57,7 @@ class RandomIterator { */ class TrackQueue { public: - TrackQueue(tasks::Worker& bg_worker); + TrackQueue(tasks::WorkerPool& bg_worker); /* Returns the currently playing track. */ auto current() const -> std::optional<database::TrackId>; @@ -105,7 +105,7 @@ class TrackQueue { private: mutable std::shared_mutex mutex_; - tasks::Worker& bg_worker_; + tasks::WorkerPool& bg_worker_; size_t pos_; std::pmr::vector<database::TrackId> tracks_; diff --git a/src/audio/readahead_source.cpp b/src/audio/readahead_source.cpp index aae878f8..c7b960d2 100644 --- a/src/audio/readahead_source.cpp +++ b/src/audio/readahead_source.cpp @@ -27,7 +27,7 @@ namespace audio { static constexpr char kTag[] = "readahead"; static constexpr size_t kBufferSize = 1024 * 512; -ReadaheadSource::ReadaheadSource(tasks::Worker& worker, +ReadaheadSource::ReadaheadSource(tasks::WorkerPool& worker, std::unique_ptr<codecs::IStream> wrapped) : IStream(wrapped->type()), worker_(worker), diff --git a/src/audio/track_queue.cpp b/src/audio/track_queue.cpp index 6bab60e7..33858e0a 100644 --- a/src/audio/track_queue.cpp +++ b/src/audio/track_queue.cpp @@ -74,7 +74,7 @@ auto notifyChanged(bool current_changed) -> void { events::Audio().Dispatch(ev); } -TrackQueue::TrackQueue(tasks::Worker& bg_worker) +TrackQueue::TrackQueue(tasks::WorkerPool& bg_worker) : mutex_(), bg_worker_(bg_worker), pos_(0), diff --git a/src/database/database.cpp b/src/database/database.cpp index bb26bc96..6b879710 100644 --- a/src/database/database.cpp +++ b/src/database/database.cpp @@ -126,14 +126,14 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool { auto Database::Open(IFileGatherer& gatherer, ITagParser& parser, locale::ICollator& collator, - tasks::Worker& bg_worker) + tasks::WorkerPool& bg_worker) -> cpp::result<Database*, DatabaseError> { if (sIsDbOpen.exchange(true)) { return cpp::fail(DatabaseError::ALREADY_OPEN); } if (!leveldb::sBackgroundThread) { - leveldb::sBackgroundThread = tasks::Worker::Start<tasks::Type::kDatabase>(); + leveldb::sBackgroundThread = &bg_worker; } return bg_worker diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp index 8cd11727..f7a5637a 100644 --- a/src/database/env_esp.cpp +++ b/src/database/env_esp.cpp @@ -41,7 +41,7 @@ namespace leveldb { -tasks::Worker *sBackgroundThread = nullptr; +tasks::WorkerPool *sBackgroundThread = nullptr; std::string ErrToStr(FRESULT err) { switch (err) { diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp index c4da3dc1..783d3872 100644 --- a/src/database/include/database.hpp +++ b/src/database/include/database.hpp @@ -56,7 +56,7 @@ class Database { static auto Open(IFileGatherer& file_gatherer, ITagParser& tag_parser, locale::ICollator& collator, - tasks::Worker& bg_worker) + tasks::WorkerPool& bg_worker) -> cpp::result<Database*, DatabaseError>; static auto Destroy() -> void; diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp index dd0159b2..472a72a6 100644 --- a/src/database/include/env_esp.hpp +++ b/src/database/include/env_esp.hpp @@ -18,7 +18,7 @@ namespace leveldb { -extern tasks::Worker* sBackgroundThread; +extern tasks::WorkerPool* sBackgroundThread; // Tracks the files locked by EspEnv::LockFile(). // diff --git a/src/system_fsm/booting.cpp b/src/system_fsm/booting.cpp index 898eb6aa..e6267879 100644 --- a/src/system_fsm/booting.cpp +++ b/src/system_fsm/booting.cpp @@ -68,8 +68,7 @@ auto Booting::entry() -> void { } ESP_LOGI(kTag, "starting bg worker"); - sServices->bg_worker(std::unique_ptr<tasks::Worker>{ - tasks::Worker::Start<tasks::Type::kBackgroundWorker>()}); + sServices->bg_worker(std::make_unique<tasks::WorkerPool>()); ESP_LOGI(kTag, "installing remaining drivers"); drivers::spiffs_mount(); diff --git a/src/system_fsm/include/service_locator.hpp b/src/system_fsm/include/service_locator.hpp index 4aa57df0..5978578c 100644 --- a/src/system_fsm/include/service_locator.hpp +++ b/src/system_fsm/include/service_locator.hpp @@ -80,13 +80,9 @@ class ServiceLocator { touchwheel_ = std::move(i); } - auto haptics() -> drivers::Haptics& { - return *haptics_; - } + auto haptics() -> drivers::Haptics& { return *haptics_; } - auto haptics(std::unique_ptr<drivers::Haptics> i) { - haptics_ = std::move(i); - } + auto haptics(std::unique_ptr<drivers::Haptics> i) { haptics_ = std::move(i); } auto database() -> std::weak_ptr<database::Database> { return database_; } @@ -121,12 +117,12 @@ class ServiceLocator { collator_ = std::move(i); } - auto bg_worker() -> tasks::Worker& { + auto bg_worker() -> tasks::WorkerPool& { assert(bg_worker_ != nullptr); return *bg_worker_; } - auto bg_worker(std::unique_ptr<tasks::Worker> w) -> void { + auto bg_worker(std::unique_ptr<tasks::WorkerPool> w) -> void { bg_worker_ = std::move(w); } @@ -149,7 +145,7 @@ class ServiceLocator { std::unique_ptr<database::ITagParser> tag_parser_; std::unique_ptr<locale::ICollator> collator_; - std::unique_ptr<tasks::Worker> bg_worker_; + std::unique_ptr<tasks::WorkerPool> bg_worker_; drivers::SdState sd_; }; diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp index 981ee20a..ea174039 100644 --- a/src/tasks/tasks.cpp +++ b/src/tasks/tasks.cpp @@ -31,14 +31,6 @@ template <> auto Name<Type::kAudioConverter>() -> std::pmr::string { return "audio_conv"; } -template <> -auto Name<Type::kDatabase>() -> std::pmr::string { - return "db_fg"; -} -template <> -auto Name<Type::kBackgroundWorker>() -> std::pmr::string { - return "bg_worker"; -} template <Type t> auto AllocateStack() -> cpp::span<StackType_t>; @@ -68,14 +60,10 @@ auto AllocateStack<Type::kAudioConverter>() -> cpp::span<StackType_t> { static StackType_t sStack[size]; return {sStack, size}; } -// Leveldb is designed for non-embedded use cases, where stack space isn't so -// much of a concern. It therefore uses an eye-wateringly large amount of stack. -template <> -auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> { - std::size_t size = 256 * 1024; - return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)), - size}; -} +// Background workers receive huge stacks in PSRAM. This is mostly to faciliate +// use of LevelDB from any bg worker; Leveldb is designed for non-embedded use +// cases, where large stack usage isn't so much of a concern. It therefore uses +// an eye-wateringly large amount of stack. template <> auto AllocateStack<Type::kBackgroundWorker>() -> cpp::span<StackType_t> { std::size_t size = 256 * 1024; @@ -115,26 +103,10 @@ auto Priority<Type::kUi>() -> UBaseType_t { // couple of ms extra delay due to scheduling, so give this task the lowest // priority. template <> -auto Priority<Type::kDatabase>() -> UBaseType_t { - return 2; -} -template <> auto Priority<Type::kBackgroundWorker>() -> UBaseType_t { return 1; } -template <Type t> -auto WorkerQueueSize() -> std::size_t; - -template <> -auto WorkerQueueSize<Type::kDatabase>() -> std::size_t { - return 8; -} -template <> -auto WorkerQueueSize<Type::kBackgroundWorker>() -> std::size_t { - return 8; -} - auto PersistentMain(void* fn) -> void { auto* function = reinterpret_cast<std::function<void(void)>*>(fn); std::invoke(*function); @@ -142,69 +114,50 @@ auto PersistentMain(void* fn) -> void { vTaskDelete(NULL); } -auto Worker::Main(void* instance) { - Worker* i = reinterpret_cast<Worker*>(instance); +auto WorkerPool::Main(void* q) { + QueueHandle_t queue = reinterpret_cast<QueueHandle_t>(q); while (1) { WorkItem item; - if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) { - if (item.quit) { - break; - } else if (item.fn != nullptr) { - std::invoke(*item.fn); - delete item.fn; - } + if (xQueueReceive(queue, &item, portMAX_DELAY)) { + std::invoke(*item); + delete item; } } - i->is_task_running_.store(false); - i->is_task_running_.notify_all(); - // Wait for the instance's destructor to delete this task. We do this instead - // of just deleting ourselves so that it's 100% certain that it's safe to - // delete or reuse this task's stack. - while (1) { - vTaskDelay(portMAX_DELAY); +} + +static constexpr size_t kNumWorkers = 3; +static constexpr size_t kMaxPendingItems = 8; + +WorkerPool::WorkerPool() + : queue_(xQueueCreate(kMaxPendingItems, sizeof(WorkItem))) { + for (size_t i = 0; i < kNumWorkers; i++) { + auto stack = AllocateStack<Type::kBackgroundWorker>(); + // Task buffers must be in internal ram. Thankfully they're fairly small. + auto buffer = reinterpret_cast<StaticTask_t*>(heap_caps_malloc( + sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT)); + + std::string name = "worker_" + std::to_string(i); + + xTaskCreateStatic(&Main, name.c_str(), stack.size(), queue_, + Priority<Type::kBackgroundWorker>(), stack.data(), + buffer); } } -Worker::Worker(const std::pmr::string& name, - cpp::span<StackType_t> stack, - std::size_t queue_size, - UBaseType_t priority) - : stack_(stack.data()), - queue_(xQueueCreate(queue_size, sizeof(WorkItem))), - is_task_running_(true), - task_buffer_(static_cast<StaticTask_t*>( - heap_caps_malloc(sizeof(StaticTask_t), - MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))), - task_(xTaskCreateStatic(&Main, - name.c_str(), - stack.size(), - this, - priority, - stack_, - task_buffer_)) {} - -Worker::~Worker() { - WorkItem item{ - .fn = nullptr, - .quit = true, - }; - xQueueSend(queue_, &item, portMAX_DELAY); - is_task_running_.wait(true); - vTaskDelete(task_); - free(stack_); +WorkerPool::~WorkerPool() { + // This should never happen! + assert("worker pool destroyed" == 0); } template <> -auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> { +auto WorkerPool::Dispatch(const std::function<void(void)> fn) + -> std::future<void> { std::shared_ptr<std::promise<void>> promise = std::make_shared<std::promise<void>>(); - WorkItem item{ - .fn = new std::function<void(void)>([=]() { - std::invoke(fn); - promise->set_value(); - }), - .quit = false, - }; + WorkItem item = new std::function<void(void)>([=]() { + std::invoke(fn); + promise->set_value(); + }); xQueueSend(queue_, &item, portMAX_DELAY); return promise->get_future(); } diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp index 6a3a0d09..1623a8d8 100644 --- a/src/tasks/tasks.hpp +++ b/src/tasks/tasks.hpp @@ -48,8 +48,6 @@ template <Type t> auto AllocateStack() -> cpp::span<StackType_t>; template <Type t> auto Priority() -> UBaseType_t; -template <Type t> -auto WorkerQueueSize() -> std::size_t; auto PersistentMain(void* fn) -> void; @@ -74,32 +72,15 @@ auto StartPersistent(BaseType_t core, const std::function<void(void)>& fn) Priority<t>(), stack.data(), task_buffer, core); } -class Worker { +class WorkerPool { private: - Worker(const std::pmr::string& name, - cpp::span<StackType_t> stack, - std::size_t queue_size, - UBaseType_t priority); - - StackType_t* stack_; QueueHandle_t queue_; - std::atomic<bool> is_task_running_; - StaticTask_t *task_buffer_; - TaskHandle_t task_; - - struct WorkItem { - std::function<void(void)>* fn; - bool quit; - }; + using WorkItem = std::function<void(void)>*; + static auto Main(void* instance); public: - template <Type t> - static auto Start() -> Worker* { - return new Worker(Name<t>(), AllocateStack<t>(), WorkerQueueSize<t>(), - Priority<t>()); - } - - static auto Main(void* instance); + WorkerPool(); + ~WorkerPool(); /* * Schedules the given function to be executed on the worker task, and @@ -109,22 +90,19 @@ class Worker { auto Dispatch(const std::function<T(void)> fn) -> std::future<T> { std::shared_ptr<std::promise<T>> promise = std::make_shared<std::promise<T>>(); - WorkItem item{ - .fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }), - .quit = false, - }; + WorkItem item = + new std::function([=]() { promise->set_value(std::invoke(fn)); }); xQueueSend(queue_, &item, portMAX_DELAY); return promise->get_future(); } - ~Worker(); - - Worker(const Worker&) = delete; - Worker& operator=(const Worker&) = delete; + WorkerPool(const WorkerPool&) = delete; + WorkerPool& operator=(const WorkerPool&) = delete; }; /* Specialisation of Evaluate for functions that return nothing. */ template <> -auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void>; +auto WorkerPool::Dispatch(const std::function<void(void)> fn) + -> std::future<void>; } // namespace tasks |
