summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-05-22 15:23:51 +1000
committerjacqueline <me@jacqueline.id.au>2023-05-22 15:23:51 +1000
commit5ac4d3949cd7430e0d4c994bbc528e8e4fa91337 (patch)
treedcd30c252e45afa20f084e1a58cbabd861ca299c
parentb320a6a863cf1c10dc79254af41f573730935564 (diff)
downloadtangara-fw-5ac4d3949cd7430e0d4c994bbc528e8e4fa91337.tar.gz
Generalise worker tasks, and centralise task priorities + stacks
Includes making the display driver use a worker task for flushes, so that our double buffering actually does something useful /facepalm
-rw-r--r--src/audio/audio_task.cpp236
-rw-r--r--src/audio/include/audio_task.hpp14
-rw-r--r--src/database/CMakeLists.txt4
-rw-r--r--src/database/database.cpp88
-rw-r--r--src/database/db_task.cpp91
-rw-r--r--src/database/env_esp.cpp11
-rw-r--r--src/database/include/database.hpp6
-rw-r--r--src/database/include/db_task.hpp25
-rw-r--r--src/database/include/env_esp.hpp5
-rw-r--r--src/drivers/CMakeLists.txt2
-rw-r--r--src/drivers/display.cpp82
-rw-r--r--src/drivers/include/display.hpp3
-rw-r--r--src/main/main.cpp4
-rw-r--r--src/tasks/CMakeLists.txt2
-rw-r--r--src/tasks/tasks.cpp205
-rw-r--r--src/tasks/tasks.hpp106
-rw-r--r--src/ui/include/lvgl_task.hpp4
-rw-r--r--src/ui/lvgl_task.cpp44
18 files changed, 542 insertions, 390 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index b2a8062e..f0128bf3 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -35,157 +35,119 @@ namespace audio {
namespace task {
static const char* kTag = "task";
-static const std::size_t kStackSize = 24 * 1024;
-static const std::size_t kDrainStackSize = 1024;
-auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
- // Newly created task will free this.
- AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink};
+void AudioTaskMain(std::unique_ptr<Pipeline> pipeline, IAudioSink* sink) {
+ std::optional<StreamInfo::Format> output_format;
+
+ std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
+ std::size_t max_inputs =
+ (*std::max_element(elements.begin(), elements.end(),
+ [](Pipeline const* first, Pipeline const* second) {
+ return first->NumInputs() < second->NumInputs();
+ }))
+ ->NumInputs();
+
+ // We need to be able to simultaneously map all of an element's inputs, plus
+ // its output. So preallocate that many ranges.
+ std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
+ MappableRegion<kPipelineBufferSize> out_region;
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](const auto& region) { assert(region.is_valid); });
+ assert(out_region.is_valid);
+
+ // Each element has exactly one output buffer.
+ std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
+ std::vector<StreamInfo> buffer_infos(buffers.size());
+ std::for_each(buffers.begin(), buffers.end(),
+ [](const HimemAlloc<kPipelineBufferSize>& alloc) {
+ assert(alloc.is_valid);
+ });
+
+ while (1) {
+ for (int i = 0; i < elements.size(); i++) {
+ std::vector<RawStream> raw_in_streams;
+ elements.at(i)->InStreams(&in_regions, &raw_in_streams);
+ RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
+
+ // Crop the input and output streams to the ranges that are safe to
+ // touch. For the input streams, this is the region that contains
+ // data. For the output stream, this is the region that does *not*
+ // already contain data.
+ std::vector<InputStream> in_streams;
+ std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
+ [&](RawStream& s) { in_streams.emplace_back(&s); });
+ OutputStream out_stream(&raw_out_stream);
+
+ elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
+
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](auto&& r) { r.Unmap(); });
+ out_region.Unmap();
+ }
- ESP_LOGI(kTag, "starting audio pipeline task");
- xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
- kTaskPriorityAudioPipeline, NULL, 1);
-}
+ RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
+ InputStream sink_stream(&raw_sink_stream);
-auto StartDrain(IAudioSink* sink) -> void {
- auto command = new std::atomic<Command>(PLAY);
- // Newly created task will free this.
- AudioDrainArgs* drain_args = new AudioDrainArgs{
- .sink = sink,
- .command = command,
- };
+ if (sink_stream.info().bytes_in_stream == 0) {
+ out_region.Unmap();
+ vTaskDelay(pdMS_TO_TICKS(100));
+ continue;
+ }
- ESP_LOGI(kTag, "starting audio drain task");
- xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args,
- kTaskPriorityAudioDrain, NULL);
-}
+ if (!output_format || output_format != sink_stream.info().format) {
+ // The format of the stream within the sink stream has changed. We
+ // need to reconfigure the sink, but shouldn't do so until we've fully
+ // drained the current buffer.
+ if (xStreamBufferIsEmpty(sink->buffer())) {
+ ESP_LOGI(kTag, "reconfiguring dac");
+ output_format = sink_stream.info().format;
+ sink->Configure(*output_format);
+ }
+ }
-void AudioTaskMain(void* args) {
- // Nest the body within an additional scope to ensure that destructors are
- // called before the task quits.
- {
- AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
- std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
- IAudioSink* sink = real_args->sink;
- delete real_args;
-
- std::optional<StreamInfo::Format> output_format;
-
- std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
- std::size_t max_inputs =
- (*std::max_element(elements.begin(), elements.end(),
- [](Pipeline const* first, Pipeline const* second) {
- return first->NumInputs() < second->NumInputs();
- }))
- ->NumInputs();
-
- // We need to be able to simultaneously map all of an element's inputs, plus
- // its output. So preallocate that many ranges.
- std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
- MappableRegion<kPipelineBufferSize> out_region;
- std::for_each(in_regions.begin(), in_regions.end(),
- [](const auto& region) { assert(region.is_valid); });
- assert(out_region.is_valid);
-
- // Each element has exactly one output buffer.
- std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
- std::vector<StreamInfo> buffer_infos(buffers.size());
- std::for_each(buffers.begin(), buffers.end(),
- [](const HimemAlloc<kPipelineBufferSize>& alloc) {
- assert(alloc.is_valid);
- });
-
- bool playing = true;
- bool quit = false;
- while (!quit) {
- if (playing) {
- for (int i = 0; i < elements.size(); i++) {
- std::vector<RawStream> raw_in_streams;
- elements.at(i)->InStreams(&in_regions, &raw_in_streams);
- RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
-
- // Crop the input and output streams to the ranges that are safe to
- // touch. For the input streams, this is the region that contains
- // data. For the output stream, this is the region that does *not*
- // already contain data.
- std::vector<InputStream> in_streams;
- std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
- [&](RawStream& s) { in_streams.emplace_back(&s); });
- OutputStream out_stream(&raw_out_stream);
-
- elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
-
- std::for_each(in_regions.begin(), in_regions.end(),
- [](auto&& r) { r.Unmap(); });
- out_region.Unmap();
- }
-
- RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
- InputStream sink_stream(&raw_sink_stream);
-
- if (sink_stream.info().bytes_in_stream == 0) {
- out_region.Unmap();
- vTaskDelay(pdMS_TO_TICKS(100));
- continue;
- }
-
- if (!output_format || output_format != sink_stream.info().format) {
- // The format of the stream within the sink stream has changed. We
- // need to reconfigure the sink, but shouldn't do so until we've fully
- // drained the current buffer.
- if (xStreamBufferIsEmpty(sink->buffer())) {
- ESP_LOGI(kTag, "reconfiguring dac");
- output_format = sink_stream.info().format;
- sink->Configure(*output_format);
- }
- }
-
- // We've reconfigured the sink, or it was already configured correctly.
- // Send through some data.
- if (output_format == sink_stream.info().format &&
- !std::holds_alternative<std::monostate>(*output_format)) {
- // TODO: tune the delay on this, as it's currently the only way to
- // throttle this task's CPU time. Maybe also hold off on the pipeline
- // if the buffer is already close to full?
- std::size_t sent = xStreamBufferSend(
- sink->buffer(), sink_stream.data().data(),
- sink_stream.data().size_bytes(), pdMS_TO_TICKS(10));
- if (sent > 0) {
- ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent,
- sink_stream.info().bytes_in_stream,
- (int)(((float)sent /
- (float)sink_stream.info().bytes_in_stream) *
- 100));
- }
- sink_stream.consume(sent);
- }
-
- out_region.Unmap();
+ // We've reconfigured the sink, or it was already configured correctly.
+ // Send through some data.
+ if (output_format == sink_stream.info().format &&
+ !std::holds_alternative<std::monostate>(*output_format)) {
+ std::size_t sent =
+ xStreamBufferSend(sink->buffer(), sink_stream.data().data(),
+ sink_stream.data().size_bytes(), 0);
+ if (sent > 0) {
+ ESP_LOGI(
+ kTag, "sunk %u bytes out of %u (%d %%)", sent,
+ sink_stream.info().bytes_in_stream,
+ (int)(((float)sent / (float)sink_stream.info().bytes_in_stream) *
+ 100));
}
+ sink_stream.consume(sent);
}
+
+ out_region.Unmap();
}
- vTaskDelete(NULL);
}
static std::byte sDrainBuf[8 * 1024];
-void AudioDrainMain(void* args) {
- {
- AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args);
- IAudioSink* sink = real_args->sink;
- std::atomic<Command>* command = real_args->command;
- delete real_args;
-
- // TODO(jacqueline): implement PAUSE without busy-waiting.
- while (*command != QUIT) {
- std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
- sizeof(sDrainBuf), portMAX_DELAY);
- if (len > 0) {
- sink->Send({sDrainBuf, len});
- }
+void AudioDrainMain(IAudioSink* sink) {
+ while (1) {
+ std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
+ sizeof(sDrainBuf), portMAX_DELAY);
+ if (len > 0) {
+ sink->Send({sDrainBuf, len});
}
}
- vTaskDelete(NULL);
+}
+
+auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
+ ESP_LOGI(kTag, "starting audio pipeline task");
+ tasks::StartPersistent<tasks::Type::kAudio>(
+ [=]() { AudioTaskMain(std::unique_ptr<Pipeline>(pipeline), sink); });
+}
+
+auto StartDrain(IAudioSink* sink) -> void {
+ ESP_LOGI(kTag, "starting audio drain task");
+ tasks::StartPersistent<tasks::Type::kAudioDrain>(
+ [=]() { AudioDrainMain(sink); });
}
} // namespace task
diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp
index a7b7a0fa..8269c8d4 100644
--- a/src/audio/include/audio_task.hpp
+++ b/src/audio/include/audio_task.hpp
@@ -15,20 +15,6 @@ namespace audio {
namespace task {
-enum Command { PLAY, PAUSE, QUIT };
-
-struct AudioTaskArgs {
- Pipeline* pipeline;
- IAudioSink* sink;
-};
-struct AudioDrainArgs {
- IAudioSink* sink;
- std::atomic<Command>* command;
-};
-
-extern "C" void AudioTaskMain(void* args);
-extern "C" void AudioDrainMain(void* args);
-
auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void;
auto StartDrain(IAudioSink* sink) -> void;
diff --git a/src/database/CMakeLists.txt b/src/database/CMakeLists.txt
index 897bf029..c769fa33 100644
--- a/src/database/CMakeLists.txt
+++ b/src/database/CMakeLists.txt
@@ -1,7 +1,7 @@
idf_component_register(
- SRCS "env_esp.cpp" "database.cpp" "song.cpp" "db_task.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp"
+ SRCS "env_esp.cpp" "database.cpp" "song.cpp" "records.cpp" "file_gatherer.cpp" "tag_parser.cpp"
INCLUDE_DIRS "include"
- REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor")
+ REQUIRES "result" "span" "esp_psram" "fatfs" "libtags" "komihash" "cbor" "tasks")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})
diff --git a/src/database/database.cpp b/src/database/database.cpp
index f5fe5240..65a500d9 100644
--- a/src/database/database.cpp
+++ b/src/database/database.cpp
@@ -18,13 +18,13 @@
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
-#include "db_task.hpp"
#include "env_esp.hpp"
#include "file_gatherer.hpp"
#include "records.hpp"
#include "result.hpp"
#include "song.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
namespace database {
@@ -62,33 +62,33 @@ auto Database::Open(IFileGatherer* gatherer, ITagParser* parser)
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
- if (!StartDbTask()) {
- return cpp::fail(DatabaseError::ALREADY_OPEN);
- }
-
- return RunOnDbTask<cpp::result<Database*, DatabaseError>>(
- [=]() -> cpp::result<Database*, DatabaseError> {
- leveldb::DB* db;
- leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
- leveldb::Options options;
- options.env = sEnv.env();
- options.create_if_missing = true;
- options.write_buffer_size = 48 * 1024;
- options.max_file_size = 32;
- options.block_cache = cache;
- options.block_size = 512;
-
- auto status = leveldb::DB::Open(options, "/.db", &db);
- if (!status.ok()) {
- delete cache;
- ESP_LOGE(kTag, "failed to open db, status %s",
- status.ToString().c_str());
- return cpp::fail(FAILED_TO_OPEN);
- }
-
- ESP_LOGI(kTag, "Database opened successfully");
- return new Database(db, cache, gatherer, parser);
- })
+ std::shared_ptr<tasks::Worker> worker(
+ tasks::Worker::Start<tasks::Type::kDatabase>());
+ leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>(worker);
+ return worker
+ ->Dispatch<cpp::result<Database*, DatabaseError>>(
+ [&]() -> cpp::result<Database*, DatabaseError> {
+ leveldb::DB* db;
+ leveldb::Cache* cache = leveldb::NewLRUCache(24 * 1024);
+ leveldb::Options options;
+ options.env = sEnv.env();
+ options.create_if_missing = true;
+ options.write_buffer_size = 48 * 1024;
+ options.max_file_size = 32;
+ options.block_cache = cache;
+ options.block_size = 512;
+
+ auto status = leveldb::DB::Open(options, "/.db", &db);
+ if (!status.ok()) {
+ delete cache;
+ ESP_LOGE(kTag, "failed to open db, status %s",
+ status.ToString().c_str());
+ return cpp::fail(FAILED_TO_OPEN);
+ }
+
+ ESP_LOGI(kTag, "Database opened successfully");
+ return new Database(db, cache, gatherer, parser, worker);
+ })
.get();
}
@@ -101,9 +101,11 @@ auto Database::Destroy() -> void {
Database::Database(leveldb::DB* db,
leveldb::Cache* cache,
IFileGatherer* file_gatherer,
- ITagParser* tag_parser)
+ ITagParser* tag_parser,
+ std::shared_ptr<tasks::Worker> worker)
: db_(db),
cache_(cache),
+ worker_task_(worker),
file_gatherer_(file_gatherer),
tag_parser_(tag_parser) {}
@@ -113,12 +115,13 @@ Database::~Database() {
delete db_;
delete cache_;
- QuitDbTask();
+ leveldb::sBackgroundThread = std::weak_ptr<tasks::Worker>();
+
sIsDbOpen.store(false);
}
auto Database::Update() -> std::future<void> {
- return RunOnDbTask<void>([&]() -> void {
+ return worker_task_->Dispatch<void>([&]() -> void {
// Stage 1: verify all existing songs are still valid.
ESP_LOGI(kTag, "verifying existing songs");
const leveldb::Snapshot* snapshot = db_->GetSnapshot();
@@ -219,7 +222,7 @@ auto Database::Update() -> std::future<void> {
}
auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
- return RunOnDbTask<Result<Song>*>([=, this]() -> Result<Song>* {
+ return worker_task_->Dispatch<Result<Song>*>([=, this]() -> Result<Song>* {
Continuation<Song> c{.iterator = nullptr,
.prefix = CreateDataPrefix().data,
.start_key = CreateDataPrefix().data,
@@ -232,21 +235,22 @@ auto Database::GetSongs(std::size_t page_size) -> std::future<Result<Song>*> {
auto Database::GetDump(std::size_t page_size)
-> std::future<Result<std::string>*> {
- return RunOnDbTask<Result<std::string>*>([=, this]() -> Result<std::string>* {
- Continuation<std::string> c{.iterator = nullptr,
- .prefix = "",
- .start_key = "",
- .forward = true,
- .was_prev_forward = true,
- .page_size = page_size};
- return dbGetPage(c);
- });
+ return worker_task_->Dispatch<Result<std::string>*>(
+ [=, this]() -> Result<std::string>* {
+ Continuation<std::string> c{.iterator = nullptr,
+ .prefix = "",
+ .start_key = "",
+ .forward = true,
+ .was_prev_forward = true,
+ .page_size = page_size};
+ return dbGetPage(c);
+ });
}
template <typename T>
auto Database::GetPage(Continuation<T>* c) -> std::future<Result<T>*> {
Continuation<T> copy = *c;
- return RunOnDbTask<Result<T>*>(
+ return worker_task_->Dispatch<Result<T>*>(
[=, this]() -> Result<T>* { return dbGetPage(copy); });
}
diff --git a/src/database/db_task.cpp b/src/database/db_task.cpp
deleted file mode 100644
index 5b4b34b5..00000000
--- a/src/database/db_task.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#include "db_task.hpp"
-
-#include <functional>
-
-#include "esp_heap_caps.h"
-#include "freertos/FreeRTOS.h"
-#include "freertos/portmacro.h"
-#include "freertos/projdefs.h"
-#include "freertos/queue.h"
-#include "freertos/task.h"
-
-namespace database {
-
-static const std::size_t kDbStackSize = 256 * 1024;
-static StaticTask_t sDbStaticTask;
-static StackType_t* sDbStack = nullptr;
-
-static std::atomic<bool> sTaskRunning(false);
-static QueueHandle_t sWorkQueue;
-
-struct WorkItem {
- std::function<void(void)>* fn;
- bool quit;
-};
-
-auto SendToDbTask(std::function<void(void)> fn) -> void {
- WorkItem item{
- .fn = new std::function<void(void)>(fn),
- .quit = false,
- };
- xQueueSend(sWorkQueue, &item, portMAX_DELAY);
-}
-
-template <>
-auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void> {
- std::shared_ptr<std::promise<void>> promise =
- std::make_shared<std::promise<void>>();
- SendToDbTask([=]() {
- std::invoke(fn);
- promise->set_value();
- });
- return promise->get_future();
-}
-
-void DatabaseTaskMain(void* args) {
- while (true) {
- WorkItem item;
- if (xQueueReceive(sWorkQueue, &item, portMAX_DELAY)) {
- if (item.fn != nullptr) {
- std::invoke(*item.fn);
- delete item.fn;
- }
- if (item.quit) {
- break;
- }
- }
- }
- vQueueDelete(sWorkQueue);
- sTaskRunning.store(false);
- vTaskDelete(NULL);
-}
-
-auto StartDbTask() -> bool {
- if (sTaskRunning.exchange(true)) {
- return false;
- }
- if (sDbStack == nullptr) {
- sDbStack = reinterpret_cast<StackType_t*>(
- heap_caps_malloc(kDbStackSize, MALLOC_CAP_SPIRAM));
- }
- sWorkQueue = xQueueCreate(8, sizeof(WorkItem));
- xTaskCreateStatic(&DatabaseTaskMain, "DB", kDbStackSize, NULL, 1, sDbStack,
- &sDbStaticTask);
- return true;
-}
-
-auto QuitDbTask() -> void {
- if (!sTaskRunning.load()) {
- return;
- }
- WorkItem item{
- .fn = nullptr,
- .quit = true,
- };
- xQueueSend(sWorkQueue, &item, portMAX_DELAY);
- while (sTaskRunning.load()) {
- vTaskDelay(pdMS_TO_TICKS(1));
- }
-}
-
-} // namespace database
diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp
index 71d4fcea..3bc68984 100644
--- a/src/database/env_esp.cpp
+++ b/src/database/env_esp.cpp
@@ -29,10 +29,12 @@
#include "leveldb/slice.h"
#include "leveldb/status.h"
-#include "db_task.hpp"
+#include "tasks.hpp"
namespace leveldb {
+std::weak_ptr<tasks::Worker> sBackgroundThread;
+
std::string ErrToStr(FRESULT err) {
switch (err) {
case FR_OK:
@@ -455,8 +457,11 @@ EspEnv::EspEnv() {}
void EspEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
- database::SendToDbTask(
- [=]() { std::invoke(background_work_function, background_work_arg); });
+ auto worker = sBackgroundThread.lock();
+ if (worker) {
+ worker->Dispatch<void>(
+ [=]() { std::invoke(background_work_function, background_work_arg); });
+ }
}
} // namespace leveldb
diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp
index da0ed083..ce2ca9fe 100644
--- a/src/database/include/database.hpp
+++ b/src/database/include/database.hpp
@@ -19,6 +19,7 @@
#include "result.hpp"
#include "song.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
namespace database {
@@ -90,6 +91,8 @@ class Database {
leveldb::DB* db_;
leveldb::Cache* cache_;
+ std::shared_ptr<tasks::Worker> worker_task_;
+
// Not owned.
IFileGatherer* file_gatherer_;
ITagParser* tag_parser_;
@@ -97,7 +100,8 @@ class Database {
Database(leveldb::DB* db,
leveldb::Cache* cache,
IFileGatherer* file_gatherer,
- ITagParser* tag_parser);
+ ITagParser* tag_parser,
+ std::shared_ptr<tasks::Worker> worker);
auto dbMintNewSongId() -> SongId;
auto dbEntomb(SongId song, uint64_t hash) -> void;
diff --git a/src/database/include/db_task.hpp b/src/database/include/db_task.hpp
deleted file mode 100644
index 39f932b0..00000000
--- a/src/database/include/db_task.hpp
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-
-#include <functional>
-#include <future>
-#include <memory>
-
-namespace database {
-
-auto StartDbTask() -> bool;
-auto QuitDbTask() -> void;
-
-auto SendToDbTask(std::function<void(void)> fn) -> void;
-
-template <typename T>
-auto RunOnDbTask(std::function<T(void)> fn) -> std::future<T> {
- std::shared_ptr<std::promise<T>> promise =
- std::make_shared<std::promise<T>>();
- SendToDbTask([=]() { promise->set_value(std::invoke(fn)); });
- return promise->get_future();
-}
-
-template <>
-auto RunOnDbTask(std::function<void(void)> fn) -> std::future<void>;
-
-} // namespace database
diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp
index cf5a20e1..6a415ce6 100644
--- a/src/database/include/env_esp.hpp
+++ b/src/database/include/env_esp.hpp
@@ -1,5 +1,6 @@
#pragma once
+#include <memory>
#include <mutex>
#include <set>
#include <string>
@@ -7,8 +8,12 @@
#include "leveldb/env.h"
#include "leveldb/status.h"
+#include "tasks.hpp"
+
namespace leveldb {
+extern std::weak_ptr<tasks::Worker> sBackgroundThread;
+
// Tracks the files locked by EspEnv::LockFile().
//
// We maintain a separate set instead of relying on fcntl(F_SETLK) because
diff --git a/src/drivers/CMakeLists.txt b/src/drivers/CMakeLists.txt
index 413e1ea0..30b002a0 100644
--- a/src/drivers/CMakeLists.txt
+++ b/src/drivers/CMakeLists.txt
@@ -2,5 +2,5 @@ idf_component_register(
SRCS "touchwheel.cpp" "dac.cpp" "gpio_expander.cpp" "battery.cpp" "storage.cpp" "i2c.cpp"
"spi.cpp" "display.cpp" "display_init.cpp" "driver_cache.cpp" "samd.cpp"
INCLUDE_DIRS "include"
- REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span")
+ REQUIRES "esp_adc" "fatfs" "result" "lvgl" "span" "tasks")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})
diff --git a/src/drivers/display.cpp b/src/drivers/display.cpp
index f8594a5a..15c0af75 100644
--- a/src/drivers/display.cpp
+++ b/src/drivers/display.cpp
@@ -21,6 +21,7 @@
#include "display_init.hpp"
#include "gpio_expander.hpp"
#include "soc/soc.h"
+#include "tasks.hpp"
static const char* kTag = "DISPLAY";
@@ -49,6 +50,19 @@ static const int kDisplayBufferSize = (kDisplayWidth * kDisplayHeight) / 10;
DMA_ATTR static lv_color_t sBuffer1[kDisplayBufferSize];
DMA_ATTR static lv_color_t sBuffer2[kDisplayBufferSize];
+struct RenderTaskArgs {
+ std::atomic<bool>* quit;
+ QueueHandle_t work_queue;
+};
+
+struct FlushArgs {
+ lv_disp_drv_t* driver;
+ const lv_area_t* area;
+ lv_color_t* color_map;
+};
+
+void RenderMain(void* raw_args);
+
namespace drivers {
/*
@@ -138,7 +152,9 @@ auto Display::create(GpioExpander* expander,
}
Display::Display(GpioExpander* gpio, spi_device_handle_t handle)
- : gpio_(gpio), handle_(handle) {}
+ : gpio_(gpio),
+ handle_(handle),
+ worker_task_(tasks::Worker::Start<tasks::Type::kUiFlush>()) {}
Display::~Display() {}
@@ -225,31 +241,51 @@ void Display::SendTransaction(TransactionType type,
void Display::OnLvglFlush(lv_disp_drv_t* disp_drv,
const lv_area_t* area,
lv_color_t* color_map) {
- // Ideally we want to complete a single flush as quickly as possible, so grab
- // the bus for this entire transaction sequence.
- spi_device_acquire_bus(handle_, portMAX_DELAY);
-
- // First we need to specify the rectangle of the display we're writing into.
- uint16_t data[2] = {0, 0};
-
- data[0] = SPI_SWAP_DATA_TX(area->x1, 16);
- data[1] = SPI_SWAP_DATA_TX(area->x2, 16);
- SendCommandWithData(displays::ST77XX_CASET, reinterpret_cast<uint8_t*>(data),
- 4);
-
- data[0] = SPI_SWAP_DATA_TX(area->y1, 16);
- data[1] = SPI_SWAP_DATA_TX(area->y2, 16);
- SendCommandWithData(displays::ST77XX_RASET, reinterpret_cast<uint8_t*>(data),
- 4);
+ // area is stack-allocated, so it isn't safe to reference from the flush
+ // thread.
+ lv_area_t area_copy = *area;
+ worker_task_->Dispatch<void>([=, this]() {
+ // Ideally we want to complete a single flush as quickly as possible, so
+ // grab the bus for this entire transaction sequence.
+ spi_device_acquire_bus(handle_, portMAX_DELAY);
+
+ // First we need to specify the rectangle of the display we're writing into.
+ uint16_t data[2] = {0, 0};
+
+ data[0] = SPI_SWAP_DATA_TX(area_copy.x1, 16);
+ data[1] = SPI_SWAP_DATA_TX(area_copy.x2, 16);
+ SendCommandWithData(displays::ST77XX_CASET,
+ reinterpret_cast<uint8_t*>(data), 4);
+
+ data[0] = SPI_SWAP_DATA_TX(area_copy.y1, 16);
+ data[1] = SPI_SWAP_DATA_TX(area_copy.y2, 16);
+ SendCommandWithData(displays::ST77XX_RASET,
+ reinterpret_cast<uint8_t*>(data), 4);
+
+ // Now send the pixels for this region.
+ uint32_t size = lv_area_get_width(area) * lv_area_get_height(area);
+ SendCommandWithData(displays::ST77XX_RAMWR,
+ reinterpret_cast<uint8_t*>(color_map), size * 2);
+
+ spi_device_release_bus(handle_);
+
+ lv_disp_flush_ready(&driver_);
+ });
+}
- // Now send the pixels for this region.
- uint32_t size = lv_area_get_width(area) * lv_area_get_height(area);
- SendCommandWithData(displays::ST77XX_RAMWR,
- reinterpret_cast<uint8_t*>(color_map), size * 2);
+void RenderMain(void* raw_args) {
+ RenderTaskArgs* args = reinterpret_cast<RenderTaskArgs*>(raw_args);
+ QueueHandle_t queue = args->work_queue;
+ std::atomic<bool>* quit = args->quit;
+ delete args;
- spi_device_release_bus(handle_);
+ while (!quit->load()) {
+ // TODO: flush data here! Yay speed.
+ }
- lv_disp_flush_ready(&driver_);
+ vQueueDelete(queue);
+ delete quit;
+ vTaskDelete(NULL);
}
} // namespace drivers
diff --git a/src/drivers/include/display.hpp b/src/drivers/include/display.hpp
index 9e4a0224..04bdf669 100644
--- a/src/drivers/include/display.hpp
+++ b/src/drivers/include/display.hpp
@@ -6,6 +6,7 @@
#include "driver/spi_master.h"
#include "lvgl/lvgl.h"
#include "result.hpp"
+#include "tasks.hpp"
#include "display_init.hpp"
#include "gpio_expander.hpp"
@@ -37,6 +38,8 @@ class Display {
GpioExpander* gpio_;
spi_device_handle_t handle_;
+ std::unique_ptr<tasks::Worker> worker_task_;
+
lv_disp_draw_buf_t buffers_;
lv_disp_drv_t driver_;
lv_disp_t* display_ = nullptr;
diff --git a/src/main/main.cpp b/src/main/main.cpp
index 29ac2c7f..bd56eb87 100644
--- a/src/main/main.cpp
+++ b/src/main/main.cpp
@@ -77,9 +77,7 @@ extern "C" void app_main(void) {
std::shared_ptr<drivers::TouchWheel> touchwheel =
drivers->AcquireTouchWheel();
- std::atomic<bool> lvgl_quit;
- TaskHandle_t lvgl_task_handle;
- ui::StartLvgl(drivers.get(), &lvgl_quit, &lvgl_task_handle);
+ ui::StartLvgl(drivers.get());
std::unique_ptr<audio::AudioPlayback> playback;
if (storage) {
diff --git a/src/tasks/CMakeLists.txt b/src/tasks/CMakeLists.txt
index 0503d293..f7d7244f 100644
--- a/src/tasks/CMakeLists.txt
+++ b/src/tasks/CMakeLists.txt
@@ -1,2 +1,2 @@
-idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS ".")
+idf_component_register(SRCS "tasks.cpp" INCLUDE_DIRS "." REQUIRES "span")
target_compile_options(${COMPONENT_LIB} PRIVATE ${EXTRA_WARNINGS})
diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp
index b9fce7ec..0d9d7881 100644
--- a/src/tasks/tasks.cpp
+++ b/src/tasks/tasks.cpp
@@ -1,5 +1,204 @@
#include "tasks.hpp"
+#include <functional>
+#include "esp_heap_caps.h"
+#include "freertos/FreeRTOS.h"
+#include "freertos/portmacro.h"
-const UBaseType_t kTaskPriorityLvgl = 4;
-const UBaseType_t kTaskPriorityAudioPipeline = 5;
-const UBaseType_t kTaskPriorityAudioDrain = 6;
+namespace tasks {
+
+template <Type t>
+auto Name() -> std::string;
+
+template <>
+auto Name<Type::kUi>() -> std::string {
+ return "LVGL";
+}
+template <>
+auto Name<Type::kUiFlush>() -> std::string {
+ return "DISPLAY";
+}
+template <>
+auto Name<Type::kAudio>() -> std::string {
+ return "AUDIO";
+}
+template <>
+auto Name<Type::kAudioDrain>() -> std::string {
+ return "DRAIN";
+}
+template <>
+auto Name<Type::kDatabase>() -> std::string {
+ return "DB";
+}
+
+template <Type t>
+auto AllocateStack() -> cpp::span<StackType_t>;
+
+// Decoders run on the audio task, and these sometimes require a fairly large
+// amount of stack space.
+template <>
+auto AllocateStack<Type::kAudio>() -> cpp::span<StackType_t> {
+ std::size_t size = 32 * 1024;
+ return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
+ size};
+}
+template <>
+auto AllocateStack<Type::kAudioDrain>() -> cpp::span<StackType_t> {
+ std::size_t size = 1024;
+ return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
+ size};
+}
+// LVGL requires only a relatively small stack. However, it can be allocated in
+// PSRAM so we give it a bit of headroom for safety.
+template <>
+auto AllocateStack<Type::kUi>() -> cpp::span<StackType_t> {
+ std::size_t size = 16 * 1024;
+ return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
+ size};
+}
+// UI flushes *must* be done from internal RAM. Thankfully, there is very little
+// stack required to perform them, and the amount of stack needed is fixed.
+template <>
+auto AllocateStack<Type::kUiFlush>() -> cpp::span<StackType_t> {
+ std::size_t size = 1024;
+ return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_DEFAULT)),
+ size};
+}
+// Leveldb is designed for non-embedded use cases, where stack space isn't so
+// much of a concern. It therefore uses an eye-wateringly large amount of stack.
+template <>
+auto AllocateStack<Type::kDatabase>() -> cpp::span<StackType_t> {
+ std::size_t size = 256 * 1024;
+ return {static_cast<StackType_t*>(heap_caps_malloc(size, MALLOC_CAP_SPIRAM)),
+ size};
+}
+
+// 2048 bytes in internal ram
+// 302 KiB in external ram.
+
+/*
+ * Please keep the priorities below in descending order for better readability.
+ */
+
+template <Type t>
+auto Priority() -> UBaseType_t;
+
+// Realtime audio is the entire point of this device, so give this task the
+// highest priority.
+template <>
+auto Priority<Type::kAudio>() -> UBaseType_t {
+ return 10;
+}
+template <>
+auto Priority<Type::kAudioDrain>() -> UBaseType_t {
+ return 10;
+}
+// After audio issues, UI jank is the most noticeable kind of scheduling-induced
+// slowness that the user is likely to notice or care about. Therefore we place
+// this task directly below audio in terms of priority.
+template <>
+auto Priority<Type::kUi>() -> UBaseType_t {
+ return 9;
+}
+// UI flushing should use the same priority as the UI task, so as to maximise
+// the chance of the happy case: one of our cores is writing to the screen,
+// whilst the other is simultaneously preparing the next buffer to be flushed.
+template <>
+auto Priority<Type::kUiFlush>() -> UBaseType_t {
+ return 9;
+}
+// Database interactions are all inherently async already, due to their
+// potential for disk access. The user likely won't notice or care about a
+// couple of ms extra delay due to scheduling, so give this task the lowest
+// priority.
+template <>
+auto Priority<Type::kDatabase>() -> UBaseType_t {
+ return 8;
+}
+
+template <Type t>
+auto WorkerQueueSize() -> std::size_t;
+
+template <>
+auto WorkerQueueSize<Type::kDatabase>() -> std::size_t {
+ return 8;
+}
+
+template <>
+auto WorkerQueueSize<Type::kUiFlush>() -> std::size_t {
+ return 2;
+}
+
+auto PersistentMain(void* fn) -> void {
+ auto* function = reinterpret_cast<std::function<void(void)>*>(fn);
+ std::invoke(*function);
+ assert("persistent task quit!" == 0);
+ vTaskDelete(NULL);
+}
+
+auto Worker::Main(void* instance) {
+ Worker* i = reinterpret_cast<Worker*>(instance);
+ while (1) {
+ WorkItem item;
+ if (xQueueReceive(i->queue_, &item, portMAX_DELAY)) {
+ if (item.quit) {
+ break;
+ } else if (item.fn != nullptr) {
+ std::invoke(*item.fn);
+ delete item.fn;
+ }
+ }
+ }
+ i->is_task_running_.store(false);
+ i->is_task_running_.notify_all();
+ // Wait for the instance's destructor to delete this task. We do this instead
+ // of just deleting ourselves so that it's 100% certain that it's safe to
+ // delete or reuse this task's stack.
+ while (1) {
+ vTaskDelay(portMAX_DELAY);
+ }
+}
+
+Worker::Worker(const std::string& name,
+ cpp::span<StackType_t> stack,
+ std::size_t queue_size,
+ UBaseType_t priority)
+ : stack_(stack.data()),
+ queue_(xQueueCreate(queue_size, sizeof(WorkItem))),
+ is_task_running_(true),
+ task_buffer_(),
+ task_(xTaskCreateStatic(&Main,
+ name.c_str(),
+ stack.size(),
+ this,
+ priority,
+ stack_,
+ &task_buffer_)) {}
+
+Worker::~Worker() {
+ WorkItem item{
+ .fn = nullptr,
+ .quit = true,
+ };
+ xQueueSend(queue_, &item, portMAX_DELAY);
+ is_task_running_.wait(true);
+ vTaskDelete(task_);
+ free(stack_);
+}
+
+template <>
+auto Worker::Dispatch(const std::function<void(void)>& fn)
+ -> std::future<void> {
+ std::shared_ptr<std::promise<void>> promise =
+ std::make_shared<std::promise<void>>();
+ WorkItem item{
+ .fn = new std::function<void(void)>([=]() {
+ std::invoke(fn);
+ promise->set_value();
+ }),
+ .quit = false,
+ };
+ xQueueSend(queue_, &item, portMAX_DELAY);
+ return promise->get_future();
+}
+
+} // namespace tasks
diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp
index 47668aea..9f37131e 100644
--- a/src/tasks/tasks.hpp
+++ b/src/tasks/tasks.hpp
@@ -1,7 +1,107 @@
#pragma once
+#include <atomic>
+#include <functional>
+#include <future>
+#include <memory>
+#include <string>
+
+#include "freertos/FreeRTOS.h"
#include "freertos/portmacro.h"
+#include "freertos/projdefs.h"
+#include "freertos/queue.h"
+#include "freertos/task.h"
+#include "span.hpp"
+
+namespace tasks {
+
+/*
+ * Enumeration of every task (basically a thread) started within the firmware.
+ * These are centralised so that it is easier to reason about the relative
+ * priorities of tasks, as well as the amount and location of memory allocated
+ * to each one.
+ */
+enum class Type {
+ // The main UI task. This runs the LVGL main loop.
+ kUi,
+ // Task for flushing graphics buffers to the display.
+ kUiFlush,
+ // The main audio pipeline task.
+ kAudio,
+ // Task for flushing PCM samples to the current output.
+ kAudioDrain,
+ // Task for running database queries.
+ kDatabase,
+};
+
+template <Type t>
+auto Name() -> std::string;
+template <Type t>
+auto AllocateStack() -> cpp::span<StackType_t>;
+template <Type t>
+auto Priority() -> UBaseType_t;
+template <Type t>
+auto WorkerQueueSize() -> std::size_t;
+
+auto PersistentMain(void* fn) -> void;
+
+template <Type t>
+auto StartPersistent(const std::function<void(void)>& fn) -> void {
+ StaticTask_t* task_buffer = new StaticTask_t;
+ cpp::span<StackType_t> stack = AllocateStack<t>();
+ xTaskCreateStatic(&PersistentMain, Name<t>().c_str(), stack.size(),
+ new std::function<void(void)>(fn), Priority<t>(),
+ stack.data(), task_buffer);
+}
+
+class Worker {
+ private:
+ Worker(const std::string& name,
+ cpp::span<StackType_t> stack,
+ std::size_t queue_size,
+ UBaseType_t priority);
+
+ StackType_t* stack_;
+ QueueHandle_t queue_;
+ std::atomic<bool> is_task_running_;
+ StaticTask_t task_buffer_;
+ TaskHandle_t task_;
+
+ struct WorkItem {
+ std::function<void(void)>* fn;
+ bool quit;
+ };
+
+ public:
+ template <Type t>
+ static auto Start() -> Worker* {
+ return new Worker(Name<t>(), AllocateStack<t>(), WorkerQueueSize<t>(),
+ Priority<t>());
+ }
+
+ static auto Main(void* instance);
+
+ /*
+ * Schedules the given function to be executed on the worker task, and
+ * asynchronously returns the result as a future.
+ */
+ template <typename T>
+ auto Dispatch(const std::function<T(void)>& fn) -> std::future<T> {
+ std::shared_ptr<std::promise<T>> promise =
+ std::make_shared<std::promise<T>>();
+ WorkItem item{
+ .fn = new std::function([=]() { promise->set_value(std::invoke(fn)); }),
+ .quit = false,
+ };
+ xQueueSend(queue_, &item, portMAX_DELAY);
+ return promise->get_future();
+ }
+
+ ~Worker();
+};
+
+/* Specialisation of Evaluate for functions that return nothing. */
+template <>
+auto Worker::Dispatch(const std::function<void(void)>& fn) -> std::future<void>;
-extern const UBaseType_t kTaskPriorityLvgl;
-extern const UBaseType_t kTaskPriorityAudioPipeline;
-extern const UBaseType_t kTaskPriorityAudioDrain;
+} // namespace tasks
diff --git a/src/ui/include/lvgl_task.hpp b/src/ui/include/lvgl_task.hpp
index ca3fc771..b129e329 100644
--- a/src/ui/include/lvgl_task.hpp
+++ b/src/ui/include/lvgl_task.hpp
@@ -10,8 +10,6 @@
namespace ui {
-auto StartLvgl(drivers::DriverCache* drivers,
- std::atomic<bool>* quit,
- TaskHandle_t* handle) -> bool;
+auto StartLvgl(drivers::DriverCache* drivers) -> void;
} // namespace ui
diff --git a/src/ui/lvgl_task.cpp b/src/ui/lvgl_task.cpp
index 12dfd34e..0e96cd41 100644
--- a/src/ui/lvgl_task.cpp
+++ b/src/ui/lvgl_task.cpp
@@ -23,6 +23,7 @@
#include "misc/lv_color.h"
#include "misc/lv_style.h"
#include "misc/lv_timer.h"
+#include "tasks.hpp"
#include "widgets/lv_label.h"
#include "display.hpp"
@@ -37,24 +38,13 @@ auto tick_hook(TimerHandle_t xTimer) -> void {
lv_tick_inc(1);
}
-struct LvglArgs {
- drivers::DriverCache* drivers;
- std::atomic<bool>* quit;
-};
-
-void LvglMain(void* voidArgs) {
- LvglArgs* args = reinterpret_cast<LvglArgs*>(voidArgs);
- drivers::DriverCache* drivers = args->drivers;
- std::atomic<bool>* quit = args->quit;
- delete args;
-
+void LvglMain(drivers::DriverCache* drivers) {
{
ESP_LOGI(kTag, "init lvgl");
lv_init();
// LVGL has been initialised, so we can now start reporting ticks to it.
- TimerHandle_t tick_timer =
- xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook);
+ xTimerCreate("lv_tick", pdMS_TO_TICKS(1), pdTRUE, NULL, &tick_hook);
ESP_LOGI(kTag, "init display");
std::shared_ptr<drivers::Display> display = drivers->AcquireDisplay();
@@ -72,37 +62,15 @@ void LvglMain(void* voidArgs) {
lv_obj_center(label);
lv_scr_load(label);
- while (!quit->load()) {
+ while (1) {
lv_timer_handler();
vTaskDelay(pdMS_TO_TICKS(10));
}
-
- // TODO(robin? daniel?): De-init the UI stack here.
- lv_obj_del(label);
- lv_style_reset(&style);
-
- xTimerDelete(tick_timer, portMAX_DELAY);
-
- lv_deinit();
}
-
- vTaskDelete(NULL);
}
-static const size_t kLvglStackSize = 8 * 1024;
-static StaticTask_t sLvglTaskBuffer = {};
-static StackType_t sLvglStack[kLvglStackSize] = {0};
-
-auto StartLvgl(drivers::DriverCache* drivers,
- std::atomic<bool>* quit,
- TaskHandle_t* handle) -> bool {
- LvglArgs* args = new LvglArgs();
- args->drivers = drivers;
- args->quit = quit;
-
- return xTaskCreateStaticPinnedToCore(&LvglMain, "LVGL", kLvglStackSize,
- reinterpret_cast<void*>(args), 1,
- sLvglStack, &sLvglTaskBuffer, 1);
+auto StartLvgl(drivers::DriverCache* drivers) -> void {
+ tasks::StartPersistent<tasks::Type::kUi>([=]() { LvglMain(drivers); });
}
} // namespace ui