summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-03-10 11:28:33 +1100
committerjacqueline <me@jacqueline.id.au>2023-04-19 10:27:59 +1000
commita9531c86a433c8b7ae1f77ff0266c27c39eca7f4 (patch)
tree11835552aa2ecb400537781d8eb3851118c47e61 /src/audio/audio_task.cpp
parent2a46eecdc6334c31cee2b40427d2536b48cbb6be (diff)
downloadtangara-fw-a9531c86a433c8b7ae1f77ff0266c27c39eca7f4.tar.gz
mostly single task pipeline
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp212
1 files changed, 97 insertions, 115 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index ce6d724e..542bada8 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -2,16 +2,20 @@
#include <stdlib.h>
+#include <algorithm>
+#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include "cbor.h"
+#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
+#include "pipeline.hpp"
#include "span.hpp"
#include "arena.hpp"
@@ -25,25 +29,26 @@
namespace audio {
+namespace task {
+
static const char* kTag = "task";
+static const std::size_t kStackSize = 24 * 1024;
+static const uint8_t kAudioCore = 0;
-auto StartAudioTask(const std::string& name,
- std::optional<BaseType_t> core_id,
- std::shared_ptr<IAudioElement> element) -> void {
- auto task_handle = std::make_unique<TaskHandle_t>();
+auto Start(Pipeline* pipeline) -> Handle* {
+ auto input_queue = xQueueCreate(8, 1);
// Newly created task will free this.
- AudioTaskArgs* args = new AudioTaskArgs{.element = element};
-
- ESP_LOGI(kTag, "starting audio task %s", name.c_str());
- if (core_id) {
- xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(),
- element->StackSizeBytes(), args, kTaskPriorityAudio,
- task_handle.get(), *core_id);
- } else {
- xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args,
- kTaskPriorityAudio, task_handle.get());
- }
+ AudioTaskArgs* args = new AudioTaskArgs{
+ .pipeline = pipeline,
+ .input = input_queue,
+ };
+
+ ESP_LOGI(kTag, "starting audio task");
+ xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
+ kTaskPriorityAudio, NULL, kAudioCore);
+
+ return new Handle(input_queue);
}
void AudioTaskMain(void* args) {
@@ -51,113 +56,88 @@ void AudioTaskMain(void* args) {
// called before the task quits.
{
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
- std::shared_ptr<IAudioElement> element = std::move(real_args->element);
+ std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
+ QueueHandle_t input;
+ StreamBufferHandle_t output;
delete real_args;
- // Queue of events that we have received on our input queue, but not yet
- // processed.
- std::deque<std::unique_ptr<StreamEvent>> pending_events;
-
- // TODO(jacqueline): quit event
- while (true) {
- // First, we pull events from our input queue into pending_events. This
- // keeps us responsive to any events that need to be handled immediately.
- // Then we check if there's any events to flush downstream.
- // Then we pass anything requiring processing to the element.
-
- bool has_work_to_do =
- (!pending_events.empty() || element->HasUnflushedOutput() ||
- element->HasUnprocessedInput()) &&
- !element->IsOverBuffered();
-
- if (has_work_to_do) {
- ESP_LOGD(kTag, "checking for events");
- } else {
- ESP_LOGD(kTag, "waiting for events");
- }
-
- // If we have no new events to process and the element has nothing left to
- // do, then just delay forever waiting for a new event.
- TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY;
-
- StreamEvent* new_event = nullptr;
- bool has_event =
- xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait);
-
- if (has_event) {
- if (new_event->tag == StreamEvent::UNINITIALISED) {
- ESP_LOGE(kTag, "discarding invalid event!!");
- } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) {
- delete new_event;
- } else if (new_event->tag == StreamEvent::LOG_STATUS) {
- element->ProcessLogStatus();
- if (element->OutputEventQueue() != nullptr) {
- xQueueSendToFront(element->OutputEventQueue(), &new_event, 0);
- } else {
- delete new_event;
- }
- } else {
- // This isn't an event that needs to be actioned immediately. Add it
- // to our work queue.
- pending_events.emplace_back(new_event);
- ESP_LOGD(kTag, "deferring event");
+ std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
+ std::size_t max_inputs =
+ (*std::max_element(elements.begin(), elements.end(),
+ [](Pipeline const* first, Pipeline const* second) {
+ return first->NumInputs() < second->NumInputs();
+ }))
+ ->NumInputs();
+
+ // We need to be able to simultaneously map all of an element's inputs, plus
+ // its output. So preallocate that many ranges.
+ std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
+ MappableRegion<kPipelineBufferSize> out_region;
+ std::for_each(in_regions.begin(), in_regions.end(),
+ [](const MappableRegion<kBufferSize>& region) {
+ assert(region.is_valid);
+ });
+ assert(out_region.is_valid);
+
+ // Each element has exactly one output buffer.
+ std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size());
+ std::vector<StreamInfo> buffer_infos(buffers.size());
+ std::for_each(buffers.begin(), buffers.end(),
+ [](const HimemAlloc<kPipelineBufferSize>& alloc) {
+ assert(alloc.is_valid);
+ });
+
+ bool playing = true;
+ bool quit = false;
+ while (!quit) {
+ // TODO: full event here?
+ Command cmd;
+ bool has_cmd = xQueueReceive(input, &cmd, 0);
+ if (has_cmd) {
+ switch (cmd) {
+ case PLAY:
+ playing = true;
+ break;
+ case PAUSE:
+ playing = false;
+ break;
+ case QUIT:
+ quit = true;
+ break;
}
- // Loop again, so that we service all incoming events before doing our
- // possibly expensive processing.
- continue;
}
-
- if (element->HasUnflushedOutput()) {
- ESP_LOGD(kTag, "flushing output");
- }
-
- // We have no new events. Next, see if there's anything that needs to be
- // flushed.
- if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) {
- // We had things to flush, but couldn't send it all. This probably
- // implies that the downstream element is having issues servicing its
- // input queue, so hold off for a moment before retrying.
- ESP_LOGW(kTag, "failed to flush buffered output");
- vTaskDelay(pdMS_TO_TICKS(100));
- continue;
+ if (quit) {
+ break;
}
- if (element->HasUnprocessedInput()) {
- ESP_LOGD(kTag, "processing input events");
- element->Process();
- continue;
- }
-
- // The element ran out of data, so now it's time to let it process more
- // input.
- while (!pending_events.empty()) {
- std::unique_ptr<StreamEvent> event;
- pending_events.front().swap(event);
- pending_events.pop_front();
- ESP_LOGD(kTag, "processing event, tag %i", event->tag);
-
- if (event->tag == StreamEvent::STREAM_INFO) {
- ESP_LOGD(kTag, "processing stream info");
-
- element->ProcessStreamInfo(*event->stream_info);
-
- } else if (event->tag == StreamEvent::ARENA_CHUNK) {
- ESP_LOGD(kTag, "processing arena data");
-
- memory::ArenaRef ref(event->arena_chunk);
- auto callback =
- StreamEvent::CreateChunkNotification(element->InputEventQueue());
- if (!xQueueSend(event->source, &callback, 0)) {
- ESP_LOGW(kTag, "failed to send chunk notif");
- continue;
+ if (playing) {
+ for (int i = 0; i < elements.size(); i++) {
+ std::vector<MutableStream> in_streams;
+ elements.at(i)->InStreams(&in_regions, &in_streams);
+ MutableStream out_stream = elements.at(i)->OutStream(&out_region);
+
+ // Crop the input and output streams to the ranges that are safe to
+ // touch. For the input streams, this is the region that contains
+ // data. For the output stream, this is the region that does *not*
+ // already contain data.
+ std::vector<Stream> cropped_in_streams;
+ std::for_each(in_streams.begin(), in_streams.end(),
+ [&](MutableStream& s) {
+ cropped_in_streams.emplace_back(
+ s.info, s.data.first(s.info->bytes_in_stream));
+ });
+
+ elements.at(i)->OutputElement()->Process(&cropped_in_streams,
+ &out_stream);
+
+ for (int stream = 0; stream < in_streams.size(); stream++) {
+ MutableStream& orig_stream = in_streams.at(stream);
+ Stream& cropped_stream = cropped_in_streams.at(stream);
+ std::move(cropped_stream.data.begin(), cropped_stream.data.end(),
+ orig_stream.data.begin());
+ orig_stream.info->bytes_in_stream =
+ cropped_stream.data.size_bytes();
}
-
- // TODO(jacqueline): Consider giving the element a full ArenaRef here,
- // so that it can hang on to it and potentially save an alloc+copy.
- element->ProcessChunk({ref.ptr.start, ref.ptr.used_size});
-
- // TODO: think about whether to do the whole queue
- break;
}
}
}
@@ -165,4 +145,6 @@ void AudioTaskMain(void* args) {
vTaskDelete(NULL);
}
+} // namespace task
+
} // namespace audio