diff options
Diffstat (limited to 'src/audio/audio_task.cpp')
| -rw-r--r-- | src/audio/audio_task.cpp | 129 |
1 files changed, 75 insertions, 54 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 542bada8..e6c7778c 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -8,7 +8,9 @@ #include <deque> #include <memory> +#include "audio_sink.hpp" #include "cbor.h" +#include "dac.hpp" #include "esp_err.h" #include "esp_heap_caps.h" #include "esp_log.h" @@ -33,22 +35,29 @@ namespace task { static const char* kTag = "task"; static const std::size_t kStackSize = 24 * 1024; +static const std::size_t kDrainStackSize = 1024; static const uint8_t kAudioCore = 0; -auto Start(Pipeline* pipeline) -> Handle* { - auto input_queue = xQueueCreate(8, 1); - +auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void { // Newly created task will free this. - AudioTaskArgs* args = new AudioTaskArgs{ - .pipeline = pipeline, - .input = input_queue, - }; + AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink}; - ESP_LOGI(kTag, "starting audio task"); + ESP_LOGI(kTag, "starting audio pipeline task"); xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args, kTaskPriorityAudio, NULL, kAudioCore); +} - return new Handle(input_queue); +auto StartDrain(IAudioSink* sink) -> void { + auto command = new std::atomic<Command>(PLAY); + // Newly created task will free this. + AudioDrainArgs* drain_args = new AudioDrainArgs{ + .sink = sink, + .command = command, + }; + + ESP_LOGI(kTag, "starting audio drain task"); + xTaskCreatePinnedToCore(&AudioDrainMain, "drain", kDrainStackSize, drain_args, + kTaskPriorityAudio, NULL, kAudioCore); } void AudioTaskMain(void* args) { @@ -57,10 +66,11 @@ void AudioTaskMain(void* args) { { AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args); std::unique_ptr<Pipeline> pipeline(real_args->pipeline); - QueueHandle_t input; - StreamBufferHandle_t output; + IAudioSink* sink = real_args->sink; delete real_args; + std::optional<StreamInfo::Format> output_format; + std::vector<Pipeline*> elements = pipeline->GetIterationOrder(); std::size_t max_inputs = (*std::max_element(elements.begin(), elements.end(), @@ -74,9 +84,7 @@ void AudioTaskMain(void* args) { std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs); MappableRegion<kPipelineBufferSize> out_region; std::for_each(in_regions.begin(), in_regions.end(), - [](const MappableRegion<kBufferSize>& region) { - assert(region.is_valid); - }); + [](const auto& region) { assert(region.is_valid); }); assert(out_region.is_valid); // Each element has exactly one output buffer. @@ -90,55 +98,68 @@ void AudioTaskMain(void* args) { bool playing = true; bool quit = false; while (!quit) { - // TODO: full event here? - Command cmd; - bool has_cmd = xQueueReceive(input, &cmd, 0); - if (has_cmd) { - switch (cmd) { - case PLAY: - playing = true; - break; - case PAUSE: - playing = false; - break; - case QUIT: - quit = true; - break; - } - } - if (quit) { - break; - } - if (playing) { for (int i = 0; i < elements.size(); i++) { - std::vector<MutableStream> in_streams; - elements.at(i)->InStreams(&in_regions, &in_streams); - MutableStream out_stream = elements.at(i)->OutStream(&out_region); + std::vector<RawStream> raw_in_streams; + elements.at(i)->InStreams(&in_regions, &raw_in_streams); + RawStream raw_out_stream = elements.at(i)->OutStream(&out_region); // Crop the input and output streams to the ranges that are safe to // touch. For the input streams, this is the region that contains // data. For the output stream, this is the region that does *not* // already contain data. - std::vector<Stream> cropped_in_streams; - std::for_each(in_streams.begin(), in_streams.end(), - [&](MutableStream& s) { - cropped_in_streams.emplace_back( - s.info, s.data.first(s.info->bytes_in_stream)); - }); - - elements.at(i)->OutputElement()->Process(&cropped_in_streams, - &out_stream); - - for (int stream = 0; stream < in_streams.size(); stream++) { - MutableStream& orig_stream = in_streams.at(stream); - Stream& cropped_stream = cropped_in_streams.at(stream); - std::move(cropped_stream.data.begin(), cropped_stream.data.end(), - orig_stream.data.begin()); - orig_stream.info->bytes_in_stream = - cropped_stream.data.size_bytes(); + std::vector<InputStream> in_streams; + std::for_each(raw_in_streams.begin(), raw_in_streams.end(), + [&](RawStream& s) { in_streams.emplace_back(&s); }); + OutputStream out_stream(&raw_out_stream); + + elements.at(i)->OutputElement()->Process(in_streams, &out_stream); + } + + RawStream raw_sink_stream = elements.back()->OutStream(&out_region); + InputStream sink_stream(&raw_sink_stream); + + if (!output_format || output_format != sink_stream.info().format) { + // The format of the stream within the sink stream has changed. We + // need to reconfigure the sink, but shouldn't do so until we've fully + // drained the current buffer. + if (xStreamBufferIsEmpty(sink->buffer())) { + output_format = sink_stream.info().format; + sink->Configure(*output_format); } } + + // We've reconfigured the sink, or it was already configured correctly. + // Send through some data. + if (output_format == sink_stream.info().format) { + // TODO: tune the delay on this, as it's currently the only way to + // throttle this task's CPU time. Maybe also hold off on the pipeline + // if the buffer is already close to full? + std::size_t sent = xStreamBufferSend( + sink->buffer(), sink_stream.data().data(), + sink_stream.data().size_bytes(), pdMS_TO_TICKS(10)); + sink_stream.consume(sent); + } + } + } + } + vTaskDelete(NULL); +} + +void AudioDrainMain(void* args) { + { + AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args); + IAudioSink* sink = real_args->sink; + std::atomic<Command>* command = real_args->command; + delete real_args; + + // TODO(jacqueline): implement PAUSE without busy-waiting. + while (*command != QUIT) { + std::byte buf[64]; + std::size_t len = + xStreamBufferReceive(sink->buffer(), buf, sizeof(buf), portMAX_DELAY); + if (len > 0) { + sink->Send({buf, len}); } } } |
