summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-05-22 15:23:51 +1000
committerjacqueline <me@jacqueline.id.au>2023-05-22 15:23:51 +1000
commit5ac4d3949cd7430e0d4c994bbc528e8e4fa91337 (patch)
treedcd30c252e45afa20f084e1a58cbabd861ca299c /src/audio/audio_task.cpp
parentb320a6a863cf1c10dc79254af41f573730935564 (diff)
downloadtangara-fw-5ac4d3949cd7430e0d4c994bbc528e8e4fa91337.tar.gz
Generalise worker tasks, and centralise task priorities + stacks
Includes making the display driver use a worker task for flushes, so that our double buffering actually does something useful /facepalm
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp236
1 files changed, 99 insertions, 137 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index b2a8062e..f0128bf3 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -35,157 +35,119 @@ namespace audio {
namespace task {
static const char* kTag = "task";
-static const std::size_t kStackSize = 24 * 1024;
-static const std::size_t kDrainStackSize = 1024;
-auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
- // Newly created task will free this.
- AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink};
+void AudioTaskMain(std::unique_ptr<Pipeline> pipeline, IAudioSink* sink) {
+ std::optional<StreamInfo::Format> output_format;
+
+ std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
+ std::size_t max_inputs =
+ (*std::max_element(elements.begin(), elements.end(),
+ [](Pipeline const* first, Pipeline const* second) {
+ return first->NumInputs() < second->NumInputs();
+ }))
+ ->NumInputs();
+
+ // We need to be able to simultaneously map all of an element's inputs, plus
+ // its output. So preallocate that many ranges.
+ std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
+ MappableRegion<kPipelineBufferSize> out_region;
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](const auto& region) { assert(region.is_valid); });
+ assert(out_region.is_valid);
+
+ // Each element has exactly one output buffer.
+ std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
+ std::vector<StreamInfo> buffer_infos(buffers.size());
+ std::for_each(buffers.begin(), buffers.end(),
+ [](const HimemAlloc<kPipelineBufferSize>& alloc) {
+ assert(alloc.is_valid);
+ });
+
+ while (1) {
+ for (int i = 0; i < elements.size(); i++) {
+ std::vector<RawStream> raw_in_streams;
+ elements.at(i)->InStreams(&in_regions, &raw_in_streams);
+ RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
+
+ // Crop the input and output streams to the ranges that are safe to
+ // touch. For the input streams, this is the region that contains
+ // data. For the output stream, this is the region that does *not*
+ // already contain data.
+ std::vector<InputStream> in_streams;
+ std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
+ [&](RawStream& s) { in_streams.emplace_back(&s); });
+ OutputStream out_stream(&raw_out_stream);
+
+ elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
+
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](auto&& r) { r.Unmap(); });
+ out_region.Unmap();
+ }
- ESP_LOGI(kTag, "starting audio pipeline task");
- xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
- kTaskPriorityAudioPipeline, NULL, 1);
-}
+ RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
+ InputStream sink_stream(&raw_sink_stream);
-auto StartDrain(IAudioSink* sink) -> void {
- auto command = new std::atomic<Command>(PLAY);
- // Newly created task will free this.
- AudioDrainArgs* drain_args = new AudioDrainArgs{
- .sink = sink,
- .command = command,
- };
+ if (sink_stream.info().bytes_in_stream == 0) {
+ out_region.Unmap();
+ vTaskDelay(pdMS_TO_TICKS(100));
+ continue;
+ }
- ESP_LOGI(kTag, "starting audio drain task");
- xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args,
- kTaskPriorityAudioDrain, NULL);
-}
+ if (!output_format || output_format != sink_stream.info().format) {
+ // The format of the stream within the sink stream has changed. We
+ // need to reconfigure the sink, but shouldn't do so until we've fully
+ // drained the current buffer.
+ if (xStreamBufferIsEmpty(sink->buffer())) {
+ ESP_LOGI(kTag, "reconfiguring dac");
+ output_format = sink_stream.info().format;
+ sink->Configure(*output_format);
+ }
+ }
-void AudioTaskMain(void* args) {
- // Nest the body within an additional scope to ensure that destructors are
- // called before the task quits.
- {
- AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
- std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
- IAudioSink* sink = real_args->sink;
- delete real_args;
-
- std::optional<StreamInfo::Format> output_format;
-
- std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
- std::size_t max_inputs =
- (*std::max_element(elements.begin(), elements.end(),
- [](Pipeline const* first, Pipeline const* second) {
- return first->NumInputs() < second->NumInputs();
- }))
- ->NumInputs();
-
- // We need to be able to simultaneously map all of an element's inputs, plus
- // its output. So preallocate that many ranges.
- std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
- MappableRegion<kPipelineBufferSize> out_region;
- std::for_each(in_regions.begin(), in_regions.end(),
- [](const auto& region) { assert(region.is_valid); });
- assert(out_region.is_valid);
-
- // Each element has exactly one output buffer.
- std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
- std::vector<StreamInfo> buffer_infos(buffers.size());
- std::for_each(buffers.begin(), buffers.end(),
- [](const HimemAlloc<kPipelineBufferSize>& alloc) {
- assert(alloc.is_valid);
- });
-
- bool playing = true;
- bool quit = false;
- while (!quit) {
- if (playing) {
- for (int i = 0; i < elements.size(); i++) {
- std::vector<RawStream> raw_in_streams;
- elements.at(i)->InStreams(&in_regions, &raw_in_streams);
- RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
-
- // Crop the input and output streams to the ranges that are safe to
- // touch. For the input streams, this is the region that contains
- // data. For the output stream, this is the region that does *not*
- // already contain data.
- std::vector<InputStream> in_streams;
- std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
- [&](RawStream& s) { in_streams.emplace_back(&s); });
- OutputStream out_stream(&raw_out_stream);
-
- elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
-
- std::for_each(in_regions.begin(), in_regions.end(),
- [](auto&& r) { r.Unmap(); });
- out_region.Unmap();
- }
-
- RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
- InputStream sink_stream(&raw_sink_stream);
-
- if (sink_stream.info().bytes_in_stream == 0) {
- out_region.Unmap();
- vTaskDelay(pdMS_TO_TICKS(100));
- continue;
- }
-
- if (!output_format || output_format != sink_stream.info().format) {
- // The format of the stream within the sink stream has changed. We
- // need to reconfigure the sink, but shouldn't do so until we've fully
- // drained the current buffer.
- if (xStreamBufferIsEmpty(sink->buffer())) {
- ESP_LOGI(kTag, "reconfiguring dac");
- output_format = sink_stream.info().format;
- sink->Configure(*output_format);
- }
- }
-
- // We've reconfigured the sink, or it was already configured correctly.
- // Send through some data.
- if (output_format == sink_stream.info().format &&
- !std::holds_alternative<std::monostate>(*output_format)) {
- // TODO: tune the delay on this, as it's currently the only way to
- // throttle this task's CPU time. Maybe also hold off on the pipeline
- // if the buffer is already close to full?
- std::size_t sent = xStreamBufferSend(
- sink->buffer(), sink_stream.data().data(),
- sink_stream.data().size_bytes(), pdMS_TO_TICKS(10));
- if (sent > 0) {
- ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent,
- sink_stream.info().bytes_in_stream,
- (int)(((float)sent /
- (float)sink_stream.info().bytes_in_stream) *
- 100));
- }
- sink_stream.consume(sent);
- }
-
- out_region.Unmap();
+ // We've reconfigured the sink, or it was already configured correctly.
+ // Send through some data.
+ if (output_format == sink_stream.info().format &&
+ !std::holds_alternative<std::monostate>(*output_format)) {
+ std::size_t sent =
+ xStreamBufferSend(sink->buffer(), sink_stream.data().data(),
+ sink_stream.data().size_bytes(), 0);
+ if (sent > 0) {
+ ESP_LOGI(
+ kTag, "sunk %u bytes out of %u (%d %%)", sent,
+ sink_stream.info().bytes_in_stream,
+ (int)(((float)sent / (float)sink_stream.info().bytes_in_stream) *
+ 100));
}
+ sink_stream.consume(sent);
}
+
+ out_region.Unmap();
}
- vTaskDelete(NULL);
}
static std::byte sDrainBuf[8 * 1024];
-void AudioDrainMain(void* args) {
- {
- AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args);
- IAudioSink* sink = real_args->sink;
- std::atomic<Command>* command = real_args->command;
- delete real_args;
-
- // TODO(jacqueline): implement PAUSE without busy-waiting.
- while (*command != QUIT) {
- std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
- sizeof(sDrainBuf), portMAX_DELAY);
- if (len > 0) {
- sink->Send({sDrainBuf, len});
- }
+void AudioDrainMain(IAudioSink* sink) {
+ while (1) {
+ std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
+ sizeof(sDrainBuf), portMAX_DELAY);
+ if (len > 0) {
+ sink->Send({sDrainBuf, len});
}
}
- vTaskDelete(NULL);
+}
+
+auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
+ ESP_LOGI(kTag, "starting audio pipeline task");
+ tasks::StartPersistent<tasks::Type::kAudio>(
+ [=]() { AudioTaskMain(std::unique_ptr<Pipeline>(pipeline), sink); });
+}
+
+auto StartDrain(IAudioSink* sink) -> void {
+ ESP_LOGI(kTag, "starting audio drain task");
+ tasks::StartPersistent<tasks::Type::kAudioDrain>(
+ [=]() { AudioDrainMain(sink); });
}
} // namespace task