From f253d2ee7568b61ce2fab962f7328a50e2da6adf Mon Sep 17 00:00:00 2001 From: jacqueline Date: Tue, 27 Aug 2024 21:17:53 +1000 Subject: Timeout when writing output samples throughout the audio pipeline This allows the audio pipeline to remain responsive even when the drain buffer has completely filled. This in turn means that you now see the track info in the 'now playing' screen change if the current track changes whilst you are paused. Since I was fucking around a lot in the audio processor anyway, I also added mono->stereo expansion so that playing mono tracks on Bluetooth no longer destroys your ears. --- src/tangara/audio/processor.cpp | 409 +++++++++++++++++++++++++++------------- 1 file changed, 275 insertions(+), 134 deletions(-) (limited to 'src/tangara/audio/processor.cpp') diff --git a/src/tangara/audio/processor.cpp b/src/tangara/audio/processor.cpp index 81858110..29124232 100644 --- a/src/tangara/audio/processor.cpp +++ b/src/tangara/audio/processor.cpp @@ -10,60 +10,57 @@ #include #include #include +#include #include #include -#include "audio/audio_events.hpp" -#include "audio/audio_sink.hpp" -#include "drivers/i2s_dac.hpp" -#include "drivers/pcm_buffer.hpp" +#include "assert.h" #include "esp_heap_caps.h" #include "esp_log.h" -#include "events/event_queue.hpp" +#include "esp_timer.h" #include "freertos/portmacro.h" #include "freertos/projdefs.h" +#include "audio/audio_events.hpp" +#include "audio/audio_sink.hpp" +#include "audio/i2s_audio_output.hpp" #include "audio/resample.hpp" +#include "drivers/i2s_dac.hpp" +#include "drivers/pcm_buffer.hpp" +#include "events/event_queue.hpp" #include "sample.hpp" #include "tasks.hpp" [[maybe_unused]] static constexpr char kTag[] = "mixer"; -static constexpr std::size_t kSampleBufferLength = - drivers::kI2SBufferLengthFrames * sizeof(sample::Sample) * 2; -static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2; +static const size_t kSampleBufferLength = drivers::kI2SBufferLengthFrames * 2; +static const size_t kSourceBufferLength = kSampleBufferLength * 2; namespace audio { +static const I2SAudioOutput::Format kTargetFormat{ + .sample_rate = 48000, + .num_channels = 2, + .bits_per_sample = 16, +}; + SampleProcessor::SampleProcessor(drivers::PcmBuffer& sink) - : commands_(xQueueCreate(1, sizeof(Args))), - resampler_(nullptr), - source_(xStreamBufferCreateWithCaps(kSourceBufferLength, - sizeof(sample::Sample) * 2, + : commands_(xQueueCreate(2, sizeof(Args))), + source_(xStreamBufferCreateWithCaps(kSourceBufferLength + 1, + sizeof(sample::Sample), MALLOC_CAP_DMA)), sink_(sink), - leftover_bytes_(0) { - input_buffer_ = { - reinterpret_cast(heap_caps_calloc( - kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)), - kSampleBufferLength}; - input_buffer_as_bytes_ = {reinterpret_cast(input_buffer_.data()), - input_buffer_.size_bytes()}; - - resampled_buffer_ = { - reinterpret_cast(heap_caps_calloc( - kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)), - kSampleBufferLength}; - + unprocessed_samples_(0) { tasks::StartPersistent([&]() { Main(); }); } SampleProcessor::~SampleProcessor() { vQueueDelete(commands_); - vStreamBufferDelete(source_); + vStreamBufferDeleteWithCaps(source_); } auto SampleProcessor::SetOutput(std::shared_ptr output) -> void { + output->Configure(kTargetFormat); // FIXME: We should add synchronisation here, but we should be careful // about not impacting performance given that the output will change only // very rarely (if ever). @@ -80,15 +77,30 @@ auto SampleProcessor::beginStream(std::shared_ptr track) -> void { xQueueSend(commands_, &args, portMAX_DELAY); } -auto SampleProcessor::continueStream(std::span input) -> void { +auto SampleProcessor::continueStream(std::span input) + -> std::span { + size_t bytes_sent = xStreamBufferSend(source_, input.data(), + input.size_bytes(), pdMS_TO_TICKS(100)); + if (!bytes_sent) { + // If nothing could be sent, then bail out early. We don't want to send a + // samples_available command with zero samples. + return input; + } + + // We should only ever be placing whole samples into the buffer. If half + // samples start being sent, then this indicates a serious bug somewhere. + size_t samples_sent = bytes_sent / sizeof(sample::Sample); + assert(samples_sent * sizeof(sample::Sample) == bytes_sent); + Args args{ .track = nullptr, - .samples_available = input.size(), + .samples_available = samples_sent, .is_end_of_stream = false, .clear_buffers = false, }; xQueueSend(commands_, &args, portMAX_DELAY); - xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY); + + return input.subspan(samples_sent); } auto SampleProcessor::endStream(bool cancelled) -> void { @@ -101,152 +113,281 @@ auto SampleProcessor::endStream(bool cancelled) -> void { xQueueSend(commands_, &args, portMAX_DELAY); } +IRAM_ATTR auto SampleProcessor::Main() -> void { for (;;) { + // Block indefinitely if the processor is idle. Otherwise check briefly for + // new commands, then continue processing. + TickType_t wait = hasPendingWork() ? 0 : portMAX_DELAY; + Args args; - while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { + if (xQueueReceive(commands_, &args, wait)) { + if (args.is_end_of_stream && args.clear_buffers) { + // The new command is telling us to clear our buffers! This includes + // discarding any commands that have backed up without being processed. + // Discard all the old commands, then immediately handle the end of + // stream. + while (!pending_commands_.empty()) { + Args discard = pending_commands_.front(); + pending_commands_.pop_front(); + discardCommand(discard); + } + handleEndStream(true); + } else { + pending_commands_.push_back(args); + } } - if (args.track) { - handleBeginStream(*args.track); - delete args.track; + // We need to finish flushing all processed samples before we can process + // more samples. + if (!output_buffer_.isEmpty() && flushOutputBuffer()) { + continue; } - if (args.samples_available) { - handleContinueStream(args.samples_available); + + // We need to finish processing all the samples we've been told about + // before we handle backed up commands. + if (unprocessed_samples_ && !processSamples(false)) { + continue; } - if (args.is_end_of_stream) { - handleEndStream(args.clear_buffers); + + while (!pending_commands_.empty()) { + args = pending_commands_.front(); + pending_commands_.pop_front(); + + if (args.track) { + handleBeginStream(*args.track); + delete args.track; + } + if (args.samples_available) { + unprocessed_samples_ += args.samples_available; + } + if (args.is_end_of_stream) { + if (processSamples(true) || args.clear_buffers) { + handleEndStream(args.clear_buffers); + } else { + // The output filled up while we were trying to flush the last + // samples of this stream, and we haven't been told to clear our + // buffers. Retry handling this command later. + pending_commands_.push_front(args); + break; + } + } } } } auto SampleProcessor::handleBeginStream(std::shared_ptr track) -> void { - if (track->format != source_format_) { - source_format_ = track->format; - // The new stream has a different format to the previous stream (or there - // was no previous stream). - // First, clean up our filters. - resampler_.reset(); - leftover_bytes_ = 0; - - // If the output is idle, then we can reconfigure it to the closest format - // to our new source. - // If the output *wasn't* idle, then we can't reconfigure without an - // audible gap in playback. So instead, we simply keep the same target - // format and begin resampling. - if (sink_.isEmpty()) { - target_format_ = output_->PrepareFormat(track->format); - output_->Configure(target_format_); + // If the new stream's sample rate doesn't match our canonical sample rate, + // then prepare to start resampling. + if (track->format.sample_rate != kTargetFormat.sample_rate) { + ESP_LOGI(kTag, "resampling %lu -> %lu", track->format.sample_rate, + kTargetFormat.sample_rate); + if (!resampler_ || resampler_->sourceRate() != track->format.sample_rate) { + // If there's already a resampler instance for this source rate, then + // reuse it to help gapless playback work smoothly. + resampler_.reset(new Resampler(track->format.sample_rate, + kTargetFormat.sample_rate, + track->format.num_channels)); } + } else { + resampler_.reset(); } + // If the new stream has only one channel, then we double it to get stereo + // audio. + // FIXME: If the Bluetooth stack allowed us to configure the number of + // channels, we could remove this. + double_samples_ = track->format.num_channels != kTargetFormat.num_channels; + events::Audio().Dispatch(internal::StreamStarted{ .track = track, - .sink_format = target_format_, + .sink_format = kTargetFormat, .cue_at_sample = sink_.totalSent(), }); } -auto SampleProcessor::handleContinueStream(size_t samples_available) -> void { - // Loop until we finish reading all the bytes indicated. There might be - // leftovers from each iteration, and from this process as a whole, - // depending on the resampling stage. - size_t bytes_read = 0; - size_t bytes_to_read = samples_available * sizeof(sample::Sample); - while (bytes_read < bytes_to_read) { - // First top up the input buffer, taking care not to overwrite anything - // remaining from a previous iteration. - size_t bytes_read_this_it = xStreamBufferReceive( - source_, input_buffer_as_bytes_.subspan(leftover_bytes_).data(), - std::min(input_buffer_as_bytes_.size() - leftover_bytes_, - bytes_to_read - bytes_read), - portMAX_DELAY); - bytes_read += bytes_read_this_it; - - // Calculate the number of whole samples that are now in the input buffer. - size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; - size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); - - size_t samples_used = handleSamples(input_buffer_.first(samples_in_buffer)); - - // Maybe the resampler didn't consume everything. Maybe the last few - // bytes we read were half a frame. Either way, we need to calculate the - // size of the remainder in bytes, then move it to the front of our - // buffer. - size_t bytes_used = samples_used * sizeof(sample::Sample); - assert(bytes_used <= bytes_in_buffer); - - leftover_bytes_ = bytes_in_buffer - bytes_used; - if (leftover_bytes_ > 0) { - std::memmove(input_buffer_as_bytes_.data(), - input_buffer_as_bytes_.data() + bytes_used, leftover_bytes_); - } - } -} +IRAM_ATTR +auto SampleProcessor::processSamples(bool finalise) -> bool { + for (;;) { + bool out_of_work = true; -auto SampleProcessor::handleSamples(std::span input) -> size_t { - if (source_format_ == target_format_) { - // The happiest possible case: the input format matches the output - // format already. - sink_.send(input); - return input.size(); - } + // First, fill up our input buffer with samples. + if (unprocessed_samples_ > 0) { + out_of_work = false; + auto input = input_buffer_.writeAcquire(); - size_t samples_used = 0; - while (samples_used < input.size()) { - std::span output_source; - if (source_format_.sample_rate != target_format_.sample_rate) { - if (resampler_ == nullptr) { - ESP_LOGI(kTag, "creating new resampler for %lu -> %lu", - source_format_.sample_rate, target_format_.sample_rate); - resampler_.reset(new Resampler(source_format_.sample_rate, - target_format_.sample_rate, - source_format_.num_channels)); - } + size_t bytes_received = xStreamBufferReceive( + source_, input.data(), + std::min(input.size_bytes(), + unprocessed_samples_ * sizeof(sample::Sample)), + 0); - size_t read, written; - std::tie(read, written) = resampler_->Process(input.subspan(samples_used), - resampled_buffer_, false); - samples_used += read; + // We should never receive a half sample. Blow up immediately if we do. + size_t samples_received = bytes_received / sizeof(sample::Sample); + assert(samples_received * sizeof(sample::Sample) == bytes_received); - if (read == 0 && written == 0) { - // Zero samples used or written. We need more input. - break; - } - output_source = resampled_buffer_.first(written); - } else { - output_source = input; - samples_used = input.size(); + unprocessed_samples_ -= samples_received; + input_buffer_.writeCommit(samples_received); } - sink_.send(output_source); - } + // Next, push input samples through the resampler. In the best case, this + // is a simple copy operation. + if (!input_buffer_.isEmpty()) { + out_of_work = false; + auto resample_input = input_buffer_.readAcquire(); + auto resample_output = resampled_buffer_.writeAcquire(); + + size_t read, wrote; + if (resampler_) { + std::tie(read, wrote) = + resampler_->Process(resample_input, resample_output, finalise); + } else { + read = wrote = std::min(resample_input.size(), resample_output.size()); + std::copy_n(resample_input.begin(), read, resample_output.begin()); + } - return samples_used; -} + input_buffer_.readCommit(read); + resampled_buffer_.writeCommit(wrote); + } -auto SampleProcessor::handleEndStream(bool clear_bufs) -> void { - if (resampler_ && !clear_bufs) { - size_t read, written; - std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true); + // Next, we need to make sure the output is in stereo. This is also a simple + // copy in the best case. + if (!resampled_buffer_.isEmpty()) { + out_of_work = false; + auto channels_input = resampled_buffer_.readAcquire(); + auto channels_output = output_buffer_.writeAcquire(); + size_t read, wrote; + if (double_samples_) { + wrote = channels_output.size(); + read = wrote / 2; + if (read > channels_input.size()) { + read = channels_input.size(); + wrote = read * 2; + } + for (size_t i = 0; i < read; i++) { + channels_output[i * 2] = channels_input[i]; + channels_output[(i * 2) + 1] = channels_input[i]; + } + } else { + read = wrote = std::min(channels_input.size(), channels_output.size()); + std::copy_n(channels_input.begin(), read, channels_output.begin()); + } + resampled_buffer_.readCommit(read); + output_buffer_.writeCommit(wrote); + } - if (written > 0) { - sink_.send(resampled_buffer_.first(written)); + // Finally, flush whatever ended up in the output buffer. + if (flushOutputBuffer()) { + if (out_of_work) { + return true; + } + } else { + // The output is congested. Back off of processing for a moment. + return false; } } +} +auto SampleProcessor::handleEndStream(bool clear_bufs) -> void { if (clear_bufs) { sink_.clear(); - } - // FIXME: This discards any leftover samples, but there probably shouldn't be - // any leftover samples. Can this be an assert instead? - leftover_bytes_ = 0; + input_buffer_.clear(); + resampled_buffer_.clear(); + output_buffer_.clear(); + + size_t bytes_discarded = 0; + size_t bytes_to_discard = unprocessed_samples_ * sizeof(sample::Sample); + auto scratch_buf = output_buffer_.writeAcquire(); + while (bytes_discarded < bytes_to_discard) { + size_t bytes_read = + xStreamBufferReceive(source_, scratch_buf.data(), + std::min(scratch_buf.size_bytes(), + bytes_to_discard - bytes_discarded), + 0); + bytes_discarded += bytes_read; + } + unprocessed_samples_ = 0; + } events::Audio().Dispatch(internal::StreamEnded{ .cue_at_sample = sink_.totalSent(), }); } +auto SampleProcessor::hasPendingWork() -> bool { + return !pending_commands_.empty() || unprocessed_samples_ > 0 || + !input_buffer_.isEmpty() || !resampled_buffer_.isEmpty() || + !output_buffer_.isEmpty(); +} + +IRAM_ATTR +auto SampleProcessor::flushOutputBuffer() -> bool { + auto samples = output_buffer_.readAcquire(); + size_t sent = sink_.send(samples); + output_buffer_.readCommit(sent); + return output_buffer_.isEmpty(); +} + +auto SampleProcessor::discardCommand(Args& command) -> void { + if (command.track) { + delete command.track; + } + if (command.samples_available) { + unprocessed_samples_ += command.samples_available; + } + // End of stream commands can just be dropped. Without further actions. +} + +SampleProcessor::Buffer::Buffer() + : buffer_(reinterpret_cast( + heap_caps_calloc(kSampleBufferLength, + sizeof(sample::Sample), + MALLOC_CAP_DMA)), + kSampleBufferLength), + samples_in_buffer_() {} + +SampleProcessor::Buffer::~Buffer() { + heap_caps_free(buffer_.data()); +} + +auto SampleProcessor::Buffer::writeAcquire() -> std::span { + return buffer_.subspan(samples_in_buffer_.size()); +} + +auto SampleProcessor::Buffer::writeCommit(size_t samples) -> void { + if (samples == 0) { + return; + } + samples_in_buffer_ = buffer_.first(samples + samples_in_buffer_.size()); +} + +auto SampleProcessor::Buffer::readAcquire() -> std::span { + return samples_in_buffer_; +} + +auto SampleProcessor::Buffer::readCommit(size_t samples) -> void { + if (samples == 0) { + return; + } + samples_in_buffer_ = samples_in_buffer_.subspan(samples); + + // Move the leftover samples to the front of the buffer, so that we're setup + // for a new write. + if (!samples_in_buffer_.empty()) { + std::memmove(buffer_.data(), samples_in_buffer_.data(), + samples_in_buffer_.size_bytes()); + samples_in_buffer_ = buffer_.first(samples_in_buffer_.size()); + } +} + +auto SampleProcessor::Buffer::isEmpty() -> bool { + return samples_in_buffer_.empty(); +} + +auto SampleProcessor::Buffer::clear() -> void { + samples_in_buffer_ = {}; +} + } // namespace audio -- cgit v1.2.3 From 8f4e1ece7512c2b911491d87edc475b803c3989c Mon Sep 17 00:00:00 2001 From: jacqueline Date: Wed, 28 Aug 2024 09:43:41 +1000 Subject: Some minor cleanup, docs, assertions --- src/tangara/audio/processor.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'src/tangara/audio/processor.cpp') diff --git a/src/tangara/audio/processor.cpp b/src/tangara/audio/processor.cpp index 29124232..aa2604b5 100644 --- a/src/tangara/audio/processor.cpp +++ b/src/tangara/audio/processor.cpp @@ -1,11 +1,10 @@ /* - * Copyright 2023 jacqueline + * Copyright 2024 jacqueline * * SPDX-License-Identifier: GPL-3.0-only */ #include "audio/processor.hpp" -#include #include #include @@ -38,6 +37,11 @@ static const size_t kSourceBufferLength = kSampleBufferLength * 2; namespace audio { +/* + * The output format to convert all sources to. This is currently fixed because + * the Bluetooth output doesn't support runtime configuration of its input + * format. + */ static const I2SAudioOutput::Format kTargetFormat{ .sample_rate = 48000, .num_channels = 2, @@ -60,7 +64,10 @@ SampleProcessor::~SampleProcessor() { } auto SampleProcessor::SetOutput(std::shared_ptr output) -> void { + // Make sure our fixed output format is valid. + assert(output->PrepareFormat(kTargetFormat) == kTargetFormat); output->Configure(kTargetFormat); + // FIXME: We should add synchronisation here, but we should be careful // about not impacting performance given that the output will change only // very rarely (if ever). @@ -337,7 +344,7 @@ auto SampleProcessor::discardCommand(Args& command) -> void { if (command.samples_available) { unprocessed_samples_ += command.samples_available; } - // End of stream commands can just be dropped. Without further actions. + // End of stream commands can just be dropped without further action. } SampleProcessor::Buffer::Buffer() -- cgit v1.2.3