summaryrefslogtreecommitdiff
path: root/src/audio/audio_task.cpp
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-04-01 13:22:21 +1100
committerjacqueline <me@jacqueline.id.au>2023-04-19 10:29:38 +1000
commit7c6fd654f50e6665efa4226c6b927f9762734182 (patch)
tree58ccb69068c550e9c2223c1b510cfa525690b731 /src/audio/audio_task.cpp
parent3817ec0c77b8d44e54b35ea9f76e7bb4666c6c08 (diff)
downloadtangara-fw-7c6fd654f50e6665efa4226c6b927f9762734182.tar.gz
New pipeline building, still needs proper control
Diffstat (limited to 'src/audio/audio_task.cpp')
-rw-r--r--src/audio/audio_task.cpp129
1 files changed, 75 insertions, 54 deletions
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index 542bada8..e6c7778c 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -8,7 +8,9 @@
#include <deque>
#include <memory>
+#include "audio_sink.hpp"
#include "cbor.h"
+#include "dac.hpp"
#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
@@ -33,22 +35,29 @@ namespace task {
static const char* kTag = "task";
static const std::size_t kStackSize = 24 * 1024;
+static const std::size_t kDrainStackSize = 1024;
static const uint8_t kAudioCore = 0;
-auto Start(Pipeline* pipeline) -> Handle* {
- auto input_queue = xQueueCreate(8, 1);
-
+auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
// Newly created task will free this.
- AudioTaskArgs* args = new AudioTaskArgs{
- .pipeline = pipeline,
- .input = input_queue,
- };
+ AudioTaskArgs* args = new AudioTaskArgs{.pipeline = pipeline, .sink = sink};
- ESP_LOGI(kTag, "starting audio task");
+ ESP_LOGI(kTag, "starting audio pipeline task");
xTaskCreatePinnedToCore(&AudioTaskMain, "pipeline", kStackSize, args,
kTaskPriorityAudio, NULL, kAudioCore);
+}
- return new Handle(input_queue);
+auto StartDrain(IAudioSink* sink) -> void {
+ auto command = new std::atomic<Command>(PLAY);
+ // Newly created task will free this.
+ AudioDrainArgs* drain_args = new AudioDrainArgs{
+ .sink = sink,
+ .command = command,
+ };
+
+ ESP_LOGI(kTag, "starting audio drain task");
+ xTaskCreatePinnedToCore(&AudioDrainMain, "drain", kDrainStackSize, drain_args,
+ kTaskPriorityAudio, NULL, kAudioCore);
}
void AudioTaskMain(void* args) {
@@ -57,10 +66,11 @@ void AudioTaskMain(void* args) {
{
AudioTaskArgs* real_args = reinterpret_cast<AudioTaskArgs*>(args);
std::unique_ptr<Pipeline> pipeline(real_args->pipeline);
- QueueHandle_t input;
- StreamBufferHandle_t output;
+ IAudioSink* sink = real_args->sink;
delete real_args;
+ std::optional<StreamInfo::Format> output_format;
+
std::vector<Pipeline*> elements = pipeline->GetIterationOrder();
std::size_t max_inputs =
(*std::max_element(elements.begin(), elements.end(),
@@ -74,9 +84,7 @@ void AudioTaskMain(void* args) {
std::vector<MappableRegion<kPipelineBufferSize>> in_regions(max_inputs);
MappableRegion<kPipelineBufferSize> out_region;
std::for_each(in_regions.begin(), in_regions.end(),
- [](const MappableRegion<kBufferSize>& region) {
- assert(region.is_valid);
- });
+ [](const auto& region) { assert(region.is_valid); });
assert(out_region.is_valid);
// Each element has exactly one output buffer.
@@ -90,55 +98,68 @@ void AudioTaskMain(void* args) {
bool playing = true;
bool quit = false;
while (!quit) {
- // TODO: full event here?
- Command cmd;
- bool has_cmd = xQueueReceive(input, &cmd, 0);
- if (has_cmd) {
- switch (cmd) {
- case PLAY:
- playing = true;
- break;
- case PAUSE:
- playing = false;
- break;
- case QUIT:
- quit = true;
- break;
- }
- }
- if (quit) {
- break;
- }
-
if (playing) {
for (int i = 0; i < elements.size(); i++) {
- std::vector<MutableStream> in_streams;
- elements.at(i)->InStreams(&in_regions, &in_streams);
- MutableStream out_stream = elements.at(i)->OutStream(&out_region);
+ std::vector<RawStream> raw_in_streams;
+ elements.at(i)->InStreams(&in_regions, &raw_in_streams);
+ RawStream raw_out_stream = elements.at(i)->OutStream(&out_region);
// Crop the input and output streams to the ranges that are safe to
// touch. For the input streams, this is the region that contains
// data. For the output stream, this is the region that does *not*
// already contain data.
- std::vector<Stream> cropped_in_streams;
- std::for_each(in_streams.begin(), in_streams.end(),
- [&](MutableStream& s) {
- cropped_in_streams.emplace_back(
- s.info, s.data.first(s.info->bytes_in_stream));
- });
-
- elements.at(i)->OutputElement()->Process(&cropped_in_streams,
- &out_stream);
-
- for (int stream = 0; stream < in_streams.size(); stream++) {
- MutableStream& orig_stream = in_streams.at(stream);
- Stream& cropped_stream = cropped_in_streams.at(stream);
- std::move(cropped_stream.data.begin(), cropped_stream.data.end(),
- orig_stream.data.begin());
- orig_stream.info->bytes_in_stream =
- cropped_stream.data.size_bytes();
+ std::vector<InputStream> in_streams;
+ std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
+ [&](RawStream& s) { in_streams.emplace_back(&s); });
+ OutputStream out_stream(&raw_out_stream);
+
+ elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
+ }
+
+ RawStream raw_sink_stream = elements.back()->OutStream(&out_region);
+ InputStream sink_stream(&raw_sink_stream);
+
+ if (!output_format || output_format != sink_stream.info().format) {
+ // The format of the stream within the sink stream has changed. We
+ // need to reconfigure the sink, but shouldn't do so until we've fully
+ // drained the current buffer.
+ if (xStreamBufferIsEmpty(sink->buffer())) {
+ output_format = sink_stream.info().format;
+ sink->Configure(*output_format);
}
}
+
+ // We've reconfigured the sink, or it was already configured correctly.
+ // Send through some data.
+ if (output_format == sink_stream.info().format) {
+ // TODO: tune the delay on this, as it's currently the only way to
+ // throttle this task's CPU time. Maybe also hold off on the pipeline
+ // if the buffer is already close to full?
+ std::size_t sent = xStreamBufferSend(
+ sink->buffer(), sink_stream.data().data(),
+ sink_stream.data().size_bytes(), pdMS_TO_TICKS(10));
+ sink_stream.consume(sent);
+ }
+ }
+ }
+ }
+ vTaskDelete(NULL);
+}
+
+void AudioDrainMain(void* args) {
+ {
+ AudioDrainArgs* real_args = reinterpret_cast<AudioDrainArgs*>(args);
+ IAudioSink* sink = real_args->sink;
+ std::atomic<Command>* command = real_args->command;
+ delete real_args;
+
+ // TODO(jacqueline): implement PAUSE without busy-waiting.
+ while (*command != QUIT) {
+ std::byte buf[64];
+ std::size_t len =
+ xStreamBufferReceive(sink->buffer(), buf, sizeof(buf), portMAX_DELAY);
+ if (len > 0) {
+ sink->Send({buf, len});
}
}
}