diff options
Diffstat (limited to 'src/audio/audio_task.cpp')
| -rw-r--r-- | src/audio/audio_task.cpp | 212 |
1 files changed, 97 insertions, 115 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index ce6d724e..542bada8 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -2,16 +2,20 @@ #include <stdlib.h> +#include <algorithm> +#include <cstddef> #include <cstdint> #include <deque> #include <memory> #include "cbor.h" +#include "esp_err.h" #include "esp_heap_caps.h" #include "esp_log.h" #include "freertos/portmacro.h" #include "freertos/projdefs.h" #include "freertos/queue.h" +#include "pipeline.hpp" #include "span.hpp" #include "arena.hpp" @@ -25,25 +29,26 @@ namespace audio { +namespace task { + static const char* kTag = "task"; +static const std::size_t kStackSize = 24 * 1024; +static const uint8_t kAudioCore = 0; -auto StartAudioTask(const std::string& name, - std::optional<BaseType_t> core_id, - std::shared_ptr<IAudioElement> element) -> void { - auto task_handle = std::make_unique<TaskHandle_t>(); +auto Start(Pipeline* pipeline) -> Handle* { + auto input_queue = xQueueCreate(8, 1); // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{.element = element}; - - ESP_LOGI(kTag, "starting audio task %s", name.c_str()); - if (core_id) { - xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(), - element->StackSizeBytes(), args, kTaskPriorityAudio, - task_handle.get(), *core_id); - } else { - xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, - kTaskPriorityAudio, task_handle.get()); - } + AudioTaskArgs* args = new AudioTaskArgs{ + .pipeline = pipeline, + .input = input_queue, + }; + + ESP_LOGI(kTag, "starting audio task"); + xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, + kTaskPriorityAudio, NULL, kAudioCore); + + return new Handle(input_queue); } void AudioTaskMain(void* args) { @@ -51,113 +56,88 @@ void AudioTaskMain(void* args) { // called before the task quits. { AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args); - std::shared_ptr<IAudioElement> element = std::move(real_args->element); + std::unique_ptr<Pipeline> pipeline(real_args->pipeline); + QueueHandle_t input; + StreamBufferHandle_t output; delete real_args; - // Queue of events that we have received on our input queue, but not yet - // processed. - std::deque<std::unique_ptr<StreamEvent>> pending_events; - - // TODO(jacqueline): quit event - while (true) { - // First, we pull events from our input queue into pending_events. This - // keeps us responsive to any events that need to be handled immediately. - // Then we check if there's any events to flush downstream. - // Then we pass anything requiring processing to the element. - - bool has_work_to_do = - (!pending_events.empty() || element->HasUnflushedOutput() || - element->HasUnprocessedInput()) && - !element->IsOverBuffered(); - - if (has_work_to_do) { - ESP_LOGD(kTag, "checking for events"); - } else { - ESP_LOGD(kTag, "waiting for events"); - } - - // If we have no new events to process and the element has nothing left to - // do, then just delay forever waiting for a new event. - TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY; - - StreamEvent* new_event = nullptr; - bool has_event = - xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait); - - if (has_event) { - if (new_event->tag == StreamEvent::UNINITIALISED) { - ESP_LOGE(kTag, "discarding invalid event!!"); - } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) { - delete new_event; - } else if (new_event->tag == StreamEvent::LOG_STATUS) { - element->ProcessLogStatus(); - if (element->OutputEventQueue() != nullptr) { - xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); - } else { - delete new_event; - } - } else { - // This isn't an event that needs to be actioned immediately. Add it - // to our work queue. - pending_events.emplace_back(new_event); - ESP_LOGD(kTag, "deferring event"); + std::vector<Pipeline*> elements = pipeline->GetIterationOrder(); + std::size_t max_inputs = + (*std::max_element(elements.begin(), elements.end(), + [](Pipeline const* first, Pipeline const* second) { + return first->NumInputs() < second->NumInputs(); + })) + ->NumInputs(); + + // We need to be able to simultaneously map all of an element's inputs, plus + // its output. So preallocate that many ranges. + std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs); + MappableRegion<kPipelineBufferSize> out_region; + std::for_each(in_regions.begin(), in_regions.end(), + [](const MappableRegion<kBufferSize>& region) { + assert(region.is_valid); + }); + assert(out_region.is_valid); + + // Each element has exactly one output buffer. + std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size()); + std::vector<StreamInfo> buffer_infos(buffers.size()); + std::for_each(buffers.begin(), buffers.end(), + [](const HimemAlloc<kPipelineBufferSize>& alloc) { + assert(alloc.is_valid); + }); + + bool playing = true; + bool quit = false; + while (!quit) { + // TODO: full event here? + Command cmd; + bool has_cmd = xQueueReceive(input, &cmd, 0); + if (has_cmd) { + switch (cmd) { + case PLAY: + playing = true; + break; + case PAUSE: + playing = false; + break; + case QUIT: + quit = true; + break; } - // Loop again, so that we service all incoming events before doing our - // possibly expensive processing. - continue; } - - if (element->HasUnflushedOutput()) { - ESP_LOGD(kTag, "flushing output"); - } - - // We have no new events. Next, see if there's anything that needs to be - // flushed. - if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) { - // We had things to flush, but couldn't send it all. This probably - // implies that the downstream element is having issues servicing its - // input queue, so hold off for a moment before retrying. - ESP_LOGW(kTag, "failed to flush buffered output"); - vTaskDelay(pdMS_TO_TICKS(100)); - continue; + if (quit) { + break; } - if (element->HasUnprocessedInput()) { - ESP_LOGD(kTag, "processing input events"); - element->Process(); - continue; - } - - // The element ran out of data, so now it's time to let it process more - // input. - while (!pending_events.empty()) { - std::unique_ptr<StreamEvent> event; - pending_events.front().swap(event); - pending_events.pop_front(); - ESP_LOGD(kTag, "processing event, tag %i", event->tag); - - if (event->tag == StreamEvent::STREAM_INFO) { - ESP_LOGD(kTag, "processing stream info"); - - element->ProcessStreamInfo(*event->stream_info); - - } else if (event->tag == StreamEvent::ARENA_CHUNK) { - ESP_LOGD(kTag, "processing arena data"); - - memory::ArenaRef ref(event->arena_chunk); - auto callback = - StreamEvent::CreateChunkNotification(element->InputEventQueue()); - if (!xQueueSend(event->source, &callback, 0)) { - ESP_LOGW(kTag, "failed to send chunk notif"); - continue; + if (playing) { + for (int i = 0; i < elements.size(); i++) { + std::vector<MutableStream> in_streams; + elements.at(i)->InStreams(&in_regions, &in_streams); + MutableStream out_stream = elements.at(i)->OutStream(&out_region); + + // Crop the input and output streams to the ranges that are safe to + // touch. For the input streams, this is the region that contains + // data. For the output stream, this is the region that does *not* + // already contain data. + std::vector<Stream> cropped_in_streams; + std::for_each(in_streams.begin(), in_streams.end(), + [&](MutableStream& s) { + cropped_in_streams.emplace_back( + s.info, s.data.first(s.info->bytes_in_stream)); + }); + + elements.at(i)->OutputElement()->Process(&cropped_in_streams, + &out_stream); + + for (int stream = 0; stream < in_streams.size(); stream++) { + MutableStream& orig_stream = in_streams.at(stream); + Stream& cropped_stream = cropped_in_streams.at(stream); + std::move(cropped_stream.data.begin(), cropped_stream.data.end(), + orig_stream.data.begin()); + orig_stream.info->bytes_in_stream = + cropped_stream.data.size_bytes(); } - - // TODO(jacqueline): Consider giving the element a full ArenaRef here, - // so that it can hang on to it and potentially save an alloc+copy. - element->ProcessChunk({ref.ptr.start, ref.ptr.used_size}); - - // TODO: think about whether to do the whole queue - break; } } } @@ -165,4 +145,6 @@ void AudioTaskMain(void* args) { vTaskDelete(NULL); } +} // namespace task + } // namespace audio |
