summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2024-02-01 10:54:20 +1100
committerjacqueline <me@jacqueline.id.au>2024-02-01 10:54:20 +1100
commitdad159dc3c4ebc89d395b4b695a0626b7be10578 (patch)
treee46c9b00f6a2879088b164347f9f35a5636ddad6 /src
parentfde45dba39152064add0379a8ae68b905adff9b9 (diff)
downloadtangara-fw-dad159dc3c4ebc89d395b4b695a0626b7be10578.tar.gz
Use a single pool of bg workers instead of separate tasks per use case
Also: bump the number of workers tasks up to 3 from 2! This makes bg db updates + playback work :)
Diffstat (limited to 'src')
-rw-r--r--src/audio/fatfs_audio_input.cpp2
-rw-r--r--src/audio/include/fatfs_audio_input.hpp4
-rw-r--r--src/audio/include/readahead_source.hpp4
-rw-r--r--src/audio/include/track_queue.hpp4
-rw-r--r--src/audio/readahead_source.cpp2
-rw-r--r--src/audio/track_queue.cpp2
-rw-r--r--src/database/database.cpp4
-rw-r--r--src/database/env_esp.cpp2
-rw-r--r--src/database/include/database.hpp2
-rw-r--r--src/database/include/env_esp.hpp2
-rw-r--r--src/system_fsm/booting.cpp3
-rw-r--r--src/system_fsm/include/service_locator.hpp14
-rw-r--r--src/tasks/tasks.cpp119
-rw-r--r--src/tasks/tasks.hpp44
14 files changed, 67 insertions, 141 deletions
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp
index 1dcab97a..de64b14b 100644
--- a/src/audio/fatfs_audio_input.cpp
+++ b/src/audio/fatfs_audio_input.cpp
@@ -44,7 +44,7 @@
namespace audio {
FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser,
- tasks::Worker& bg_worker)
+ tasks::WorkerPool& bg_worker)
: IAudioSource(),
tag_parser_(tag_parser),
bg_worker_(bg_worker),
diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp
index f9635b86..4cccbb46 100644
--- a/src/audio/include/fatfs_audio_input.hpp
+++ b/src/audio/include/fatfs_audio_input.hpp
@@ -31,7 +31,7 @@ namespace audio {
*/
class FatfsAudioInput : public IAudioSource {
public:
- explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&);
+ explicit FatfsAudioInput(database::ITagParser&, tasks::WorkerPool&);
~FatfsAudioInput();
/*
@@ -55,7 +55,7 @@ class FatfsAudioInput : public IAudioSource {
-> std::optional<codecs::StreamType>;
database::ITagParser& tag_parser_;
- tasks::Worker& bg_worker_;
+ tasks::WorkerPool& bg_worker_;
std::mutex new_stream_mutex_;
std::shared_ptr<TaggedStream> new_stream_;
diff --git a/src/audio/include/readahead_source.hpp b/src/audio/include/readahead_source.hpp
index 58076350..3e18a989 100644
--- a/src/audio/include/readahead_source.hpp
+++ b/src/audio/include/readahead_source.hpp
@@ -27,7 +27,7 @@ namespace audio {
*/
class ReadaheadSource : public codecs::IStream {
public:
- ReadaheadSource(tasks::Worker&, std::unique_ptr<codecs::IStream>);
+ ReadaheadSource(tasks::WorkerPool&, std::unique_ptr<codecs::IStream>);
~ReadaheadSource();
auto Read(cpp::span<std::byte> dest) -> ssize_t override;
@@ -48,7 +48,7 @@ class ReadaheadSource : public codecs::IStream {
private:
auto BeginReadahead() -> void;
- tasks::Worker& worker_;
+ tasks::WorkerPool& worker_;
std::unique_ptr<codecs::IStream> wrapped_;
bool readahead_enabled_;
diff --git a/src/audio/include/track_queue.hpp b/src/audio/include/track_queue.hpp
index 24b4fe48..0ff72021 100644
--- a/src/audio/include/track_queue.hpp
+++ b/src/audio/include/track_queue.hpp
@@ -57,7 +57,7 @@ class RandomIterator {
*/
class TrackQueue {
public:
- TrackQueue(tasks::Worker& bg_worker);
+ TrackQueue(tasks::WorkerPool& bg_worker);
/* Returns the currently playing track. */
auto current() const -> std::optional<database::TrackId>;
@@ -105,7 +105,7 @@ class TrackQueue {
private:
mutable std::shared_mutex mutex_;
- tasks::Worker& bg_worker_;
+ tasks::WorkerPool& bg_worker_;
size_t pos_;
std::pmr::vector<database::TrackId> tracks_;
diff --git a/src/audio/readahead_source.cpp b/src/audio/readahead_source.cpp
index aae878f8..c7b960d2 100644
--- a/src/audio/readahead_source.cpp
+++ b/src/audio/readahead_source.cpp
@@ -27,7 +27,7 @@ namespace audio {
static constexpr char kTag[] = "readahead";
static constexpr size_t kBufferSize = 1024 * 512;
-ReadaheadSource::ReadaheadSource(tasks::Worker& worker,
+ReadaheadSource::ReadaheadSource(tasks::WorkerPool& worker,
std::unique_ptr<codecs::IStream> wrapped)
: IStream(wrapped->type()),
worker_(worker),
diff --git a/src/audio/track_queue.cpp b/src/audio/track_queue.cpp
index 6bab60e7..33858e0a 100644
--- a/src/audio/track_queue.cpp
+++ b/src/audio/track_queue.cpp
@@ -74,7 +74,7 @@ auto notifyChanged(bool current_changed) -> void {
events::Audio().Dispatch(ev);
}
-TrackQueue::TrackQueue(tasks::Worker& bg_worker)
+TrackQueue::TrackQueue(tasks::WorkerPool& bg_worker)
: mutex_(),
bg_worker_(bg_worker),
pos_(0),
diff --git a/src/database/database.cpp b/src/database/database.cpp
index bb26bc96..6b879710 100644
--- a/src/database/database.cpp
+++ b/src/database/database.cpp
@@ -126,14 +126,14 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool {
auto Database::Open(IFileGatherer& gatherer,
ITagParser& parser,
locale::ICollator& collator,
- tasks::Worker& bg_worker)
+ tasks::WorkerPool& bg_worker)
-> cpp::result<Database*, DatabaseError> {
if (sIsDbOpen.exchange(true)) {
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
if (!leveldb::sBackgroundThread) {
- leveldb::sBackgroundThread = tasks::Worker::Start<tasks::Type::kDatabase>();
+ leveldb::sBackgroundThread = &bg_worker;
}
return bg_worker
diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp
index 8cd11727..f7a5637a 100644
--- a/src/database/env_esp.cpp
+++ b/src/database/env_esp.cpp
@@ -41,7 +41,7 @@
namespace leveldb {
-tasks::Worker *sBackgroundThread = nullptr;
+tasks::WorkerPool *sBackgroundThread = nullptr;
std::string ErrToStr(FRESULT err) {
switch (err) {
diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp
index c4da3dc1..783d3872 100644
--- a/src/database/include/database.hpp
+++ b/src/database/include/database.hpp
@@ -56,7 +56,7 @@ class Database {
static auto Open(IFileGatherer& file_gatherer,
ITagParser& tag_parser,
locale::ICollator& collator,
- tasks::Worker& bg_worker)
+ tasks::WorkerPool& bg_worker)
-> cpp::result<Database*, DatabaseError>;
static auto Destroy() -> void;
diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp
index dd0159b2..472a72a6 100644
--- a/src/database/include/env_esp.hpp
+++ b/src/database/include/env_esp.hpp
@@ -18,7 +18,7 @@
namespace leveldb {
-extern tasks::Worker* sBackgroundThread;
+extern tasks::WorkerPool* sBackgroundThread;
// Tracks the files locked by EspEnv::LockFile().
//
diff --git a/src/system_fsm/booting.cpp b/src/system_fsm/booting.cpp
index 898eb6aa..e6267879 100644
--- a/src/system_fsm/booting.cpp
+++ b/src/system_fsm/booting.cpp
@@ -68,8 +68,7 @@ auto Booting::entry() -> void {
}
ESP_LOGI(kTag, "starting bg worker");
- sServices->bg_worker(std::unique_ptr<tasks::Worker>{
- tasks::Worker::Start<tasks::Type::kBackgroundWorker>()});
+ sServices->bg_worker(std::make_unique<tasks::WorkerPool>());
ESP_LOGI(kTag, "installing remaining drivers");
drivers::spiffs_mount();
diff --git a/src/system_fsm/include/service_locator.hpp b/src/system_fsm/include/service_locator.hpp
index 4aa57df0..5978578c 100644
--- a/src/system_fsm/include/service_locator.hpp
+++ b/src/system_fsm/include/service_locator.hpp
@@ -80,13 +80,9 @@ class ServiceLocator {
touchwheel_ = std::move(i);
}
- auto haptics() -> drivers::Haptics& {
- return *haptics_;
- }
+ auto haptics() -> drivers::Haptics& { return *haptics_; }
- auto haptics(std::unique_ptr<drivers::Haptics> i) {
- haptics_ = std::move(i);
- }
+ auto haptics(std::unique_ptr<drivers::Haptics> i) { haptics_ = std::move(i); }
auto database() -> std::weak_ptr<database::Database> { return database_; }
@@ -121,12 +117,12 @@ class ServiceLocator {
collator_ = std::move(i);
}
- auto bg_worker() -> tasks::Worker& {
+ auto bg_worker() -> tasks::WorkerPool& {
assert(bg_worker_ != nullptr);
return *bg_worker_;
}
- auto bg_worker(std::unique_ptr<tasks::Worker> w) -> void {
+ auto bg_worker(std::unique_ptr<tasks::WorkerPool> w) -> void {
bg_worker_ = std::move(w);
}
@@ -149,7 +145,7 @@ class ServiceLocator {
std::unique_ptr<database::ITagParser> tag_parser_;
std::unique_ptr<locale::ICollator> collator_;
- std::unique_ptr<tasks::Worker> bg_worker_;
+ std::unique_ptr<tasks::WorkerPool> bg_worker_;
drivers::SdState sd_;
};
diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp
index 981ee20a..ea174039 100644
--- a/src/tasks/tasks.cpp
+++ b/src/tasks/tasks.cpp
@@ -31,14 +31,6 @@ template <>
auto Name<Type::kAudioConverter>() -> std::pmr::string {
return "audio_conv";
}
-template <>
-auto Name<Type::kDatabase>() -> std::pmr::string {
- return "db_fg";
-}
-template <>
-auto Name<Type::kBackgroundWorker>() -> std::pmr::string {
- return "bg_worker";
-}
template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>;
@@ -68,14 +60,10 @@ auto AllocateStack<Type::kAudioConverter>() -> cpp::span<StackType_t> {
static StackType_t sStack[size];
return {sStack, size};
}
-// Leveldb is designed for non-embedded use cases, where stack space isn't so
-// much of a concern. It therefore uses an eye-wateringly large amount of stack.
-template <>
-auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> {
- std::size_t size = 256 * 1024;
- return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)),
- size};
-}
+// Background workers receive huge stacks in PSRAM. This is mostly to faciliate
+// use of LevelDB from any bg worker; Leveldb is designed for non-embedded use
+// cases, where large stack usage isn't so much of a concern. It therefore uses
+// an eye-wateringly large amount of stack.
template <>
auto AllocateStack<Type::kBackgroundWorker>() -> cpp::span<StackType_t> {
std::size_t size = 256 * 1024;
@@ -115,26 +103,10 @@ auto Priority<Type::kUi>() -> UBaseType_t {
// couple of ms extra delay due to scheduling, so give this task the lowest
// priority.
template <>
-auto Priority<Type::kDatabase>() -> UBaseType_t {
- return 2;
-}
-template <>
auto Priority<Type::kBackgroundWorker>() -> UBaseType_t {
return 1;
}
-template <Type t>
-auto WorkerQueueSize() -> std::size_t;
-
-template <>
-auto WorkerQueueSize<Type::kDatabase>() -> std::size_t {
- return 8;
-}
-template <>
-auto WorkerQueueSize<Type::kBackgroundWorker>() -> std::size_t {
- return 8;
-}
-
auto PersistentMain(void* fn) -> void {
auto* function = reinterpret_cast<std::function<void(void)>*>(fn);
std::invoke(*function);
@@ -142,69 +114,50 @@ auto PersistentMain(void* fn) -> void {
vTaskDelete(NULL);
}
-auto Worker::Main(void* instance) {
- Worker* i = reinterpret_cast<Worker*>(instance);
+auto WorkerPool::Main(void* q) {
+ QueueHandle_t queue = reinterpret_cast<QueueHandle_t>(q);
while (1) {
WorkItem item;
- if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) {
- if (item.quit) {
- break;
- } else if (item.fn != nullptr) {
- std::invoke(*item.fn);
- delete item.fn;
- }
+ if (xQueueReceive(queue, &item, portMAX_DELAY)) {
+ std::invoke(*item);
+ delete item;
}
}
- i->is_task_running_.store(false);
- i->is_task_running_.notify_all();
- // Wait for the instance's destructor to delete this task. We do this instead
- // of just deleting ourselves so that it's 100% certain that it's safe to
- // delete or reuse this task's stack.
- while (1) {
- vTaskDelay(portMAX_DELAY);
+}
+
+static constexpr size_t kNumWorkers = 3;
+static constexpr size_t kMaxPendingItems = 8;
+
+WorkerPool::WorkerPool()
+ : queue_(xQueueCreate(kMaxPendingItems, sizeof(WorkItem))) {
+ for (size_t i = 0; i < kNumWorkers; i++) {
+ auto stack = AllocateStack<Type::kBackgroundWorker>();
+ // Task buffers must be in internal ram. Thankfully they're fairly small.
+ auto buffer = reinterpret_cast<StaticTask_t*>(heap_caps_malloc(
+ sizeof(StaticTask_t), MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT));
+
+ std::string name = "worker_" + std::to_string(i);
+
+ xTaskCreateStatic(&Main, name.c_str(), stack.size(), queue_,
+ Priority<Type::kBackgroundWorker>(), stack.data(),
+ buffer);
}
}
-Worker::Worker(const std::pmr::string& name,
- cpp::span<StackType_t> stack,
- std::size_t queue_size,
- UBaseType_t priority)
- : stack_(stack.data()),
- queue_(xQueueCreate(queue_size, sizeof(WorkItem))),
- is_task_running_(true),
- task_buffer_(static_cast<StaticTask_t*>(
- heap_caps_malloc(sizeof(StaticTask_t),
- MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
- task_(xTaskCreateStatic(&Main,
- name.c_str(),
- stack.size(),
- this,
- priority,
- stack_,
- task_buffer_)) {}
-
-Worker::~Worker() {
- WorkItem item{
- .fn = nullptr,
- .quit = true,
- };
- xQueueSend(queue_, &item, portMAX_DELAY);
- is_task_running_.wait(true);
- vTaskDelete(task_);
- free(stack_);
+WorkerPool::~WorkerPool() {
+ // This should never happen!
+ assert("worker pool destroyed" == 0);
}
template <>
-auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> {
+auto WorkerPool::Dispatch(const std::function<void(void)> fn)
+ -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
- WorkItem item{
- .fn = new std::function<void(void)>([=]() {
- std::invoke(fn);
- promise->set_value();
- }),
- .quit = false,
- };
+ WorkItem item = new std::function<void(void)>([=]() {
+ std::invoke(fn);
+ promise->set_value();
+ });
xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future();
}
diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp
index 6a3a0d09..1623a8d8 100644
--- a/src/tasks/tasks.hpp
+++ b/src/tasks/tasks.hpp
@@ -48,8 +48,6 @@ template <Type t>
auto AllocateStack() -> cpp::span<StackType_t>;
template <Type t>
auto Priority() -> UBaseType_t;
-template <Type t>
-auto WorkerQueueSize() -> std::size_t;
auto PersistentMain(void* fn) -> void;
@@ -74,32 +72,15 @@ auto StartPersistent(BaseType_t core, const std::function<void(void)>& fn)
Priority<t>(), stack.data(), task_buffer, core);
}
-class Worker {
+class WorkerPool {
private:
- Worker(const std::pmr::string& name,
- cpp::span<StackType_t> stack,
- std::size_t queue_size,
- UBaseType_t priority);
-
- StackType_t* stack_;
QueueHandle_t queue_;
- std::atomic<bool> is_task_running_;
- StaticTask_t *task_buffer_;
- TaskHandle_t task_;
-
- struct WorkItem {
- std::function<void(void)>* fn;
- bool quit;
- };
+ using WorkItem = std::function<void(void)>*;
+ static auto Main(void* instance);
public:
- template <Type t>
- static auto Start() -> Worker* {
- return new Worker(Name<t>(), AllocateStack<t>(), WorkerQueueSize<t>(),
- Priority<t>());
- }
-
- static auto Main(void* instance);
+ WorkerPool();
+ ~WorkerPool();
/*
* Schedules the given function to be executed on the worker task, and
@@ -109,22 +90,19 @@ class Worker {
auto Dispatch(const std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
- WorkItem item{
- .fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }),
- .quit = false,
- };
+ WorkItem item =
+ new std::function([=]() { promise->set_value(std::invoke(fn)); });
xQueueSend(queue_, &item, portMAX_DELAY);
return promise->get_future();
}
- ~Worker();
-
- Worker(const Worker&) = delete;
- Worker& operator=(const Worker&) = delete;
+ WorkerPool(const WorkerPool&) = delete;
+ WorkerPool& operator=(const WorkerPool&) = delete;
};
/* Specialisation of Evaluate for functions that return nothing. */
template <>
-auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void>;
+auto WorkerPool::Dispatch(const std::function<void(void)> fn)
+ -> std::future<void>;
} // namespace tasks