diff options
| author | jacqueline <me@jacqueline.id.au> | 2023-04-26 08:49:02 +1000 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2023-04-26 08:49:02 +1000 |
| commit | 7972bd4567a99179338259e9e6ce19168c2c0db3 (patch) | |
| tree | f46642afd36011d3d064e022232e77744b82c6ae /src/audio/audio_task.cpp | |
| parent | 4887f3789817f87bf1272af0b52684e3364270c2 (diff) | |
| parent | 5575378c1c8171cd716b79d3ab89df1e56ceb9d3 (diff) | |
| download | tangara-fw-7972bd4567a99179338259e9e6ce19168c2c0db3.tar.gz | |
Merge branch 'main' into leveldb
Diffstat (limited to 'src/audio/audio_task.cpp')
| -rw-r--r-- | src/audio/audio_task.cpp | 249 |
1 files changed, 137 insertions, 112 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index ce6d724e..b2a8062e 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -2,16 +2,23 @@ #include <stdlib.h> +#include <algorithm> +#include <cstddef> #include <cstdint> #include <deque> #include <memory> +#include <variant> +#include "audio_sink.hpp" #include "cbor.h" +#include "dac.hpp" +#include "esp_err.h" #include "esp_heap_caps.h" #include "esp_log.h" #include "freertos/portmacro.h" #include "freertos/projdefs.h" #include "freertos/queue.h" +#include "pipeline.hpp" #include "span.hpp" #include "arena.hpp" @@ -25,25 +32,32 @@ namespace audio { +namespace task { + static const char* kTag = "task"; +static const std::size_t kStackSize = 24 * 1024; +static const std::size_t kDrainStackSize = 1024; + +auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void { + // Newly created task will free this. + AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink}; -auto StartAudioTask(const std::string& name, - std::optional<BaseType_t> core_id, - std::shared_ptr<IAudioElement> element) -> void { - auto task_handle = std::make_unique<TaskHandle_t>(); + ESP_LOGI(kTag, "starting audio pipeline task"); + xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, + kTaskPriorityAudioPipeline, NULL, 1); +} +auto StartDrain(IAudioSink* sink) -> void { + auto command = new std::atomic<Command>(PLAY); // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{.element = element}; - - ESP_LOGI(kTag, "starting audio task %s", name.c_str()); - if (core_id) { - xTaskCreatePinnedToCore(&AudioTaskMain, name.c_str(), - element->StackSizeBytes(), args, kTaskPriorityAudio, - task_handle.get(), *core_id); - } else { - xTaskCreate(&AudioTaskMain, name.c_str(), element->StackSizeBytes(), args, - kTaskPriorityAudio, task_handle.get()); - } + AudioDrainArgs* drain_args = new AudioDrainArgs{ + .sink = sink, + .command = command, + }; + + ESP_LOGI(kTag, "starting audio drain task"); + xTaskCreate(&AudioDrainMain, "drain", kDrainStackSize, drain_args, + kTaskPriorityAudioDrain, NULL); } void AudioTaskMain(void* args) { @@ -51,118 +65,129 @@ void AudioTaskMain(void* args) { // called before the task quits. { AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args); - std::shared_ptr<IAudioElement> element = std::move(real_args->element); + std::unique_ptr<Pipeline> pipeline(real_args->pipeline); + IAudioSink* sink = real_args->sink; delete real_args; - // Queue of events that we have received on our input queue, but not yet - // processed. - std::deque<std::unique_ptr<StreamEvent>> pending_events; - - // TODO(jacqueline): quit event - while (true) { - // First, we pull events from our input queue into pending_events. This - // keeps us responsive to any events that need to be handled immediately. - // Then we check if there's any events to flush downstream. - // Then we pass anything requiring processing to the element. - - bool has_work_to_do = - (!pending_events.empty() || element->HasUnflushedOutput() || - element->HasUnprocessedInput()) && - !element->IsOverBuffered(); - - if (has_work_to_do) { - ESP_LOGD(kTag, "checking for events"); - } else { - ESP_LOGD(kTag, "waiting for events"); - } - - // If we have no new events to process and the element has nothing left to - // do, then just delay forever waiting for a new event. - TickType_t ticks_to_wait = has_work_to_do ? 0 : portMAX_DELAY; - - StreamEvent* new_event = nullptr; - bool has_event = - xQueueReceive(element->InputEventQueue(), &new_event, ticks_to_wait); - - if (has_event) { - if (new_event->tag == StreamEvent::UNINITIALISED) { - ESP_LOGE(kTag, "discarding invalid event!!"); - } else if (new_event->tag == StreamEvent::CHUNK_NOTIFICATION) { - delete new_event; - } else if (new_event->tag == StreamEvent::LOG_STATUS) { - element->ProcessLogStatus(); - if (element->OutputEventQueue() != nullptr) { - xQueueSendToFront(element->OutputEventQueue(), &new_event, 0); - } else { - delete new_event; - } - } else { - // This isn't an event that needs to be actioned immediately. Add it - // to our work queue. - pending_events.emplace_back(new_event); - ESP_LOGD(kTag, "deferring event"); + std::optional<StreamInfo::Format> output_format; + + std::vector<Pipeline*> elements = pipeline->GetIterationOrder(); + std::size_t max_inputs = + (*std::max_element(elements.begin(), elements.end(), + [](Pipeline const* first, Pipeline const* second) { + return first->NumInputs() < second->NumInputs(); + })) + ->NumInputs(); + + // We need to be able to simultaneously map all of an element's inputs, plus + // its output. So preallocate that many ranges. + std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs); + MappableRegion<kPipelineBufferSize> out_region; + std::for_each(in_regions.begin(), in_regions.end(), + [](const auto& region) { assert(region.is_valid); }); + assert(out_region.is_valid); + + // Each element has exactly one output buffer. + std::vector<HimemAlloc<kPipelineBufferSize>> buffers(elements.size()); + std::vector<StreamInfo> buffer_infos(buffers.size()); + std::for_each(buffers.begin(), buffers.end(), + [](const HimemAlloc<kPipelineBufferSize>& alloc) { + assert(alloc.is_valid); + }); + + bool playing = true; + bool quit = false; + while (!quit) { + if (playing) { + for (int i = 0; i < elements.size(); i++) { + std::vector<RawStream> raw_in_streams; + elements.at(i)->InStreams(&in_regions, &raw_in_streams); + RawStream raw_out_stream = elements.at(i)->OutStream(&out_region); + + // Crop the input and output streams to the ranges that are safe to + // touch. For the input streams, this is the region that contains + // data. For the output stream, this is the region that does *not* + // already contain data. + std::vector<InputStream> in_streams; + std::for_each(raw_in_streams.begin(), raw_in_streams.end(), + [&](RawStream& s) { in_streams.emplace_back(&s); }); + OutputStream out_stream(&raw_out_stream); + + elements.at(i)->OutputElement()->Process(in_streams, &out_stream); + + std::for_each(in_regions.begin(), in_regions.end(), + [](auto&& r) { r.Unmap(); }); + out_region.Unmap(); } - // Loop again, so that we service all incoming events before doing our - // possibly expensive processing. - continue; - } - - if (element->HasUnflushedOutput()) { - ESP_LOGD(kTag, "flushing output"); - } - // We have no new events. Next, see if there's anything that needs to be - // flushed. - if (element->HasUnflushedOutput() && !element->FlushBufferedOutput()) { - // We had things to flush, but couldn't send it all. This probably - // implies that the downstream element is having issues servicing its - // input queue, so hold off for a moment before retrying. - ESP_LOGW(kTag, "failed to flush buffered output"); - vTaskDelay(pdMS_TO_TICKS(100)); - continue; - } + RawStream raw_sink_stream = elements.front()->OutStream(&out_region); + InputStream sink_stream(&raw_sink_stream); - if (element->HasUnprocessedInput()) { - ESP_LOGD(kTag, "processing input events"); - element->Process(); - continue; - } - - // The element ran out of data, so now it's time to let it process more - // input. - while (!pending_events.empty()) { - std::unique_ptr<StreamEvent> event; - pending_events.front().swap(event); - pending_events.pop_front(); - ESP_LOGD(kTag, "processing event, tag %i", event->tag); + if (sink_stream.info().bytes_in_stream == 0) { + out_region.Unmap(); + vTaskDelay(pdMS_TO_TICKS(100)); + continue; + } - if (event->tag == StreamEvent::STREAM_INFO) { - ESP_LOGD(kTag, "processing stream info"); + if (!output_format || output_format != sink_stream.info().format) { + // The format of the stream within the sink stream has changed. We + // need to reconfigure the sink, but shouldn't do so until we've fully + // drained the current buffer. + if (xStreamBufferIsEmpty(sink->buffer())) { + ESP_LOGI(kTag, "reconfiguring dac"); + output_format = sink_stream.info().format; + sink->Configure(*output_format); + } + } - element->ProcessStreamInfo(*event->stream_info); + // We've reconfigured the sink, or it was already configured correctly. + // Send through some data. + if (output_format == sink_stream.info().format && + !std::holds_alternative<std::monostate>(*output_format)) { + // TODO: tune the delay on this, as it's currently the only way to + // throttle this task's CPU time. Maybe also hold off on the pipeline + // if the buffer is already close to full? + std::size_t sent = xStreamBufferSend( + sink->buffer(), sink_stream.data().data(), + sink_stream.data().size_bytes(), pdMS_TO_TICKS(10)); + if (sent > 0) { + ESP_LOGI(kTag, "sunk %u bytes out of %u (%d %%)", sent, + sink_stream.info().bytes_in_stream, + (int)(((float)sent / + (float)sink_stream.info().bytes_in_stream) * + 100)); + } + sink_stream.consume(sent); + } - } else if (event->tag == StreamEvent::ARENA_CHUNK) { - ESP_LOGD(kTag, "processing arena data"); + out_region.Unmap(); + } + } + } + vTaskDelete(NULL); +} - memory::ArenaRef ref(event->arena_chunk); - auto callback = - StreamEvent::CreateChunkNotification(element->InputEventQueue()); - if (!xQueueSend(event->source, &callback, 0)) { - ESP_LOGW(kTag, "failed to send chunk notif"); - continue; - } +static std::byte sDrainBuf[8 * 1024]; - // TODO(jacqueline): Consider giving the element a full ArenaRef here, - // so that it can hang on to it and potentially save an alloc+copy. - element->ProcessChunk({ref.ptr.start, ref.ptr.used_size}); +void AudioDrainMain(void* args) { + { + AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args); + IAudioSink* sink = real_args->sink; + std::atomic<Command>* command = real_args->command; + delete real_args; - // TODO: think about whether to do the whole queue - break; - } + // TODO(jacqueline): implement PAUSE without busy-waiting. + while (*command != QUIT) { + std::size_t len = xStreamBufferReceive(sink->buffer(), sDrainBuf, + sizeof(sDrainBuf), portMAX_DELAY); + if (len > 0) { + sink->Send({sDrainBuf, len}); } } } vTaskDelete(NULL); } +} // namespace task + } // namespace audio |
