summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp249
1 files changed, 137 insertions, 112 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index ce6d724e..b2a8062e 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -2,16 +2,23 @@
#include <stdlib.h>
+#include <algorithm>
+#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
+#include <variant>
+#include "audio_sink.hpp"
#include "cbor.h"
+#include "dac.hpp"
+#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
+#include "pipeline.hpp"
#include "span.hpp"
#include "arena.hpp"
@@ -25,25 +32,32 @@
namespace audio {
+namespace task {
+
static const char* kTag = "task";
+static const std::size_t kStackSize = 24 * 1024;
+static const std::size_t kDrainStackSize = 1024;
+
+auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
+ // Newly created task will free this.
+ AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink};
-auto StartAudioTask(const std::string& name,
- std::optional<BaseType_t> core_id,
- std::shared_ptr<IAudioElement> element) -> void {
- auto task_handle = std::make_unique<TaskHandle_t>();
+ ESP_LOGI(kTag, "starting audio pipeline task");
+ xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
+ kTaskPriorityAudioPipeline, NULL, 1);
+}
+auto StartDrain(IAudioSink* sink) -> void {
+ auto command = new std::atomic<Command>(PLAY);
// Newly created task will free this.
- AudioTaskArgs* args = new AudioTaskArgs{.element = element};
-
- ESP_LOGI(kTag, "starting audio task %s", name.c_str());
- if (core_id) {
- xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(),
- element->StackSizeBytes(), args, kTaskPriorityAudio,
- task_handle.get(), *core_id);
- } else {
- xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
- kTaskPriorityAudio, task_handle.get());
- }
+ AudioDrainArgs* drain_args = new AudioDrainArgs{
+ .sink = sink,
+ .command = command,
+ };
+
+ ESP_LOGI(kTag, "starting audio drain task");
+ xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args,
+ kTaskPriorityAudioDrain, NULL);
}
void AudioTaskMain(void* args) {
@@ -51,118 +65,129 @@ void AudioTaskMain(void* args) {
// called before the task quits.
{
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
- std::shared_ptr<IAudioElement> element = std::move(real_args->element);
+ std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
+ IAudioSink* sink = real_args->sink;
delete real_args;
- // Queue of events that we have received on our input queue, but not yet
- // processed.
- std::deque<std::unique_ptr<StreamEvent>> pending_events;
-
- // TODO(jacqueline): quit event
- while (true) {
- // First, we pull events from our input queue into pending_events. This
- // keeps us responsive to any events that need to be handled immediately.
- // Then we check if there's any events to flush downstream.
- // Then we pass anything requiring processing to the element.
-
- bool has_work_to_do =
- (!pending_events.empty() || element->HasUnflushedOutput() ||
- element->HasUnprocessedInput()) &&
- !element->IsOverBuffered();
-
- if (has_work_to_do) {
- ESP_LOGD(kTag, "checking for events");
- } else {
- ESP_LOGD(kTag, "waiting for events");
- }
-
- // If we have no new events to process and the element has nothing left to
- // do, then just delay forever waiting for a new event.
- TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
-
- StreamEvent* new_event = nullptr;
- bool has_event =
- xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait);
-
- if (has_event) {
- if (new_event->tag == StreamEvent::UNINITIALISED) {
- ESP_LOGE(kTag, "discarding invalid event!!");
- } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) {
- delete new_event;
- } else if (new_event->tag == StreamEvent::LOG_STATUS) {
- element->ProcessLogStatus();
- if (element->OutputEventQueue() != nullptr) {
- xQueueSendToFront(element->OutputEventQueue(), &new_event, 0);
- } else {
- delete new_event;
- }
- } else {
- // This isn't an event that needs to be actioned immediately. Add it
- // to our work queue.
- pending_events.emplace_back(new_event);
- ESP_LOGD(kTag, "deferring event");
+ std::optional<StreamInfo::Format> output_format;
+
+ std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
+ std::size_t max_inputs =
+ (*std::max_element(elements.begin(), elements.end(),
+ [](Pipeline const* first, Pipeline const* second) {
+ return first->NumInputs() < second->NumInputs();
+ }))
+ ->NumInputs();
+
+ // We need to be able to simultaneously map all of an element's inputs, plus
+ // its output. So preallocate that many ranges.
+ std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
+ MappableRegion<kPipelineBufferSize> out_region;
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](const auto& region) { assert(region.is_valid); });
+ assert(out_region.is_valid);
+
+ // Each element has exactly one output buffer.
+ std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
+ std::vector<StreamInfo> buffer_infos(buffers.size());
+ std::for_each(buffers.begin(), buffers.end(),
+ [](const HimemAlloc<kPipelineBufferSize>& alloc) {
+ assert(alloc.is_valid);
+ });
+
+ bool playing = true;
+ bool quit = false;
+ while (!quit) {
+ if (playing) {
+ for (int i = 0; i < elements.size(); i++) {
+ std::vector<RawStream> raw_in_streams;
+ elements.at(i)->InStreams(&in_regions, &raw_in_streams);
+ RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
+
+ // Crop the input and output streams to the ranges that are safe to
+ // touch. For the input streams, this is the region that contains
+ // data. For the output stream, this is the region that does *not*
+ // already contain data.
+ std::vector<InputStream> in_streams;
+ std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
+ [&](RawStream& s) { in_streams.emplace_back(&s); });
+ OutputStream out_stream(&raw_out_stream);
+
+ elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
+
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](auto&& r) { r.Unmap(); });
+ out_region.Unmap();
}
- // Loop again, so that we service all incoming events before doing our
- // possibly expensive processing.
- continue;
- }
-
- if (element->HasUnflushedOutput()) {
- ESP_LOGD(kTag, "flushing output");
- }
- // We have no new events. Next, see if there's anything that needs to be
- // flushed.
- if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
- // We had things to flush, but couldn't send it all. This probably
- // implies that the downstream element is having issues servicing its
- // input queue, so hold off for a moment before retrying.
- ESP_LOGW(kTag, "failed to flush buffered output");
- vTaskDelay(pdMS_TO_TICKS(100));
- continue;
- }
+ RawStream raw_sink_stream = elements.front()->OutStream(&out_region);
+ InputStream sink_stream(&raw_sink_stream);
- if (element->HasUnprocessedInput()) {
- ESP_LOGD(kTag, "processing input events");
- element->Process();
- continue;
- }
-
- // The element ran out of data, so now it's time to let it process more
- // input.
- while (!pending_events.empty()) {
- std::unique_ptr<StreamEvent> event;
- pending_events.front().swap(event);
- pending_events.pop_front();
- ESP_LOGD(kTag, "processing event, tag %i", event->tag);
+ if (sink_stream.info().bytes_in_stream == 0) {
+ out_region.Unmap();
+ vTaskDelay(pdMS_TO_TICKS(100));
+ continue;
+ }
- if (event->tag == StreamEvent::STREAM_INFO) {
- ESP_LOGD(kTag, "processing stream info");
+ if (!output_format || output_format != sink_stream.info().format) {
+ // The format of the stream within the sink stream has changed. We
+ // need to reconfigure the sink, but shouldn't do so until we've fully
+ // drained the current buffer.
+ if (xStreamBufferIsEmpty(sink->buffer())) {
+ ESP_LOGI(kTag, "reconfiguring dac");
+ output_format = sink_stream.info().format;
+ sink->Configure(*output_format);
+ }
+ }
- element->ProcessStreamInfo(*event->stream_info);
+ // We've reconfigured the sink, or it was already configured correctly.
+ // Send through some data.
+ if (output_format == sink_stream.info().format &&
+ !std::holds_alternative<std::monostate>(*output_format)) {
+ // TODO: tune the delay on this, as it's currently the only way to
+ // throttle this task's CPU time. Maybe also hold off on the pipeline
+ // if the buffer is already close to full?
+ std::size_t sent = xStreamBufferSend(
+ sink->buffer(), sink_stream.data().data(),
+ sink_stream.data().size_bytes(), pdMS_TO_TICKS(10));
+ if (sent > 0) {
+ ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent,
+ sink_stream.info().bytes_in_stream,
+ (int)(((float)sent /
+ (float)sink_stream.info().bytes_in_stream) *
+ 100));
+ }
+ sink_stream.consume(sent);
+ }
- } else if (event->tag == StreamEvent::ARENA_CHUNK) {
- ESP_LOGD(kTag, "processing arena data");
+ out_region.Unmap();
+ }
+ }
+ }
+ vTaskDelete(NULL);
+}
- memory::ArenaRef ref(event->arena_chunk);
- auto callback =
- StreamEvent::CreateChunkNotification(element->InputEventQueue());
- if (!xQueueSend(event->source, &callback, 0)) {
- ESP_LOGW(kTag, "failed to send chunk notif");
- continue;
- }
+static std::byte sDrainBuf[8 * 1024];
- // TODO(jacqueline): Consider giving the element a full ArenaRef here,
- // so that it can hang on to it and potentially save an alloc+copy.
- element->ProcessChunk({ref.ptr.start, ref.ptr.used_size});
+void AudioDrainMain(void* args) {
+ {
+ AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args);
+ IAudioSink* sink = real_args->sink;
+ std::atomic<Command>* command = real_args->command;
+ delete real_args;
- // TODO: think about whether to do the whole queue
- break;
- }
+ // TODO(jacqueline): implement PAUSE without busy-waiting.
+ while (*command != QUIT) {
+ std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf,
+ sizeof(sDrainBuf), portMAX_DELAY);
+ if (len > 0) {
+ sink->Send({sDrainBuf, len});
}
}
}
vTaskDelete(NULL);
}
+} // namespace task
+
} // namespace audio