diff options
| author | jacqueline <me@jacqueline.id.au> | 2023-08-10 15:33:00 +1000 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2023-08-10 15:33:00 +1000 |
| commit | d8fc77101dcf80a3643a00b3446dca1e390ce997 (patch) | |
| tree | 9e03881f3857c7b4c6a0b6e3a062947daecc69d1 /src/audio/sink_mixer.cpp | |
| parent | 67caeb6e3cda44205ba8fe783274b20dc7ea216e (diff) | |
| download | tangara-fw-d8fc77101dcf80a3643a00b3446dca1e390ce997.tar.gz | |
Give codecs complete control of their input files
Diffstat (limited to 'src/audio/sink_mixer.cpp')
| -rw-r--r-- | src/audio/sink_mixer.cpp | 263 |
1 files changed, 130 insertions, 133 deletions
diff --git a/src/audio/sink_mixer.cpp b/src/audio/sink_mixer.cpp index 6c72c8b0..9f973d4b 100644 --- a/src/audio/sink_mixer.cpp +++ b/src/audio/sink_mixer.cpp @@ -7,6 +7,7 @@ #include "sink_mixer.hpp" #include <stdint.h> +#include <algorithm> #include <cmath> #include "esp_heap_caps.h" @@ -23,20 +24,28 @@ static constexpr char kTag[] = "mixer"; static constexpr std::size_t kSourceBufferLength = 8 * 1024; -static constexpr std::size_t kSampleBufferLength = 240 * 2 * sizeof(int32_t); +static constexpr std::size_t kSampleBufferLength = 240 * 2; namespace audio { -SinkMixer::SinkMixer(StreamBufferHandle_t dest) +SinkMixer::SinkMixer(IAudioSink* sink) : commands_(xQueueCreate(1, sizeof(Args))), - is_idle_(xSemaphoreCreateBinary()), resampler_(nullptr), source_(xStreamBufferCreateWithCaps(kSourceBufferLength, 1, MALLOC_CAP_SPIRAM)), - sink_(dest) { - input_stream_.reset(new RawStream(kSampleBufferLength)); - resampled_stream_.reset(new RawStream(kSampleBufferLength)); + sink_(sink) { + input_buffer_ = { + reinterpret_cast<sample::Sample*>(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; + input_buffer_as_bytes_ = {reinterpret_cast<std::byte*>(input_buffer_.data()), + input_buffer_.size_bytes()}; + + resampled_buffer_ = { + reinterpret_cast<sample::Sample*>(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; // Pin to CORE0 because we need the FPU. // FIXME: A fixed point implementation could run freely on either core, @@ -46,168 +55,156 @@ SinkMixer::SinkMixer(StreamBufferHandle_t dest) SinkMixer::~SinkMixer() { vQueueDelete(commands_); - vSemaphoreDelete(is_idle_); vStreamBufferDelete(source_); } -auto SinkMixer::MixAndSend(InputStream& input, const StreamInfo::Pcm& target) - -> std::size_t { - if (input.info().format_as<StreamInfo::Pcm>() != - input_stream_->info().format_as<StreamInfo::Pcm>()) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetSourceFormat, - .format = input.info().format_as<StreamInfo::Pcm>().value(), - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - if (target_format_ != target) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetTargetFormat, - .format = target, - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - +auto SinkMixer::MixAndSend(cpp::span<sample::Sample> input, + const IAudioSink::Format& format, + bool is_eos) -> void { Args args{ - .cmd = Command::kReadBytes, - .format = {}, + .format = format, + .samples_available = input.size(), + .is_end_of_stream = is_eos, }; xQueueSend(commands_, &args, portMAX_DELAY); - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return bytes_sent; + cpp::span<std::byte> input_as_bytes = { + reinterpret_cast<std::byte*>(input.data()), input.size_bytes()}; + size_t bytes_sent = 0; + while (bytes_sent < input_as_bytes.size()) { + bytes_sent += + xStreamBufferSend(source_, input_as_bytes.subspan(bytes_sent).data(), + input_as_bytes.size() - bytes_sent, portMAX_DELAY); + } } auto SinkMixer::Main() -> void { - OutputStream input_receiver{input_stream_.get()}; - xSemaphoreGive(is_idle_); - for (;;) { Args args; while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { } - switch (args.cmd) { - case Command::kSetSourceFormat: - ESP_LOGI(kTag, "setting source format"); - input_receiver.prepare(args.format, {}); - resampler_.reset(); - break; - case Command::kSetTargetFormat: - ESP_LOGI(kTag, "setting target format"); - target_format_ = args.format; - resampler_.reset(); - break; - case Command::kReadBytes: - xSemaphoreTake(is_idle_, 0); - while (!xStreamBufferIsEmpty(source_)) { - auto buf = input_receiver.data(); - std::size_t bytes_received = xStreamBufferReceive( - source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input_receiver.add(bytes_received); - HandleBytes(); + if (args.format != source_format_) { + resampler_.reset(); + source_format_ = args.format; + leftover_bytes_ = 0; + leftover_offset_ = 0; + + auto new_target = sink_->PrepareFormat(args.format); + if (new_target != target_format_) { + // The new format is different to the old one. Wait for the sink to + // drain before continuing. + while (!xStreamBufferIsEmpty(sink_->stream())) { + ESP_LOGI(kTag, "waiting for sink stream to drain..."); + // TODO(jacqueline): Get the sink drain ISR to notify us of this + // via semaphore instead of busy-ish waiting. + vTaskDelay(pdMS_TO_TICKS(10)); } - xSemaphoreGive(is_idle_); - break; + + ESP_LOGI(kTag, "configuring sink"); + sink_->Configure(new_target); + } + target_format_ = new_target; } - } -} -auto SinkMixer::HandleBytes() -> void { - InputStream input{input_stream_.get()}; - auto pcm = input.info().format_as<StreamInfo::Pcm>(); - if (!pcm) { - ESP_LOGE(kTag, "mixer got unsupported data"); - return; + // Loop until we finish reading all the bytes indicated. There might be + // leftovers from each iteration, and from this process as a whole, + // depending on the resampling stage. + size_t bytes_read = 0; + size_t bytes_to_read = args.samples_available * sizeof(sample::Sample); + while (bytes_read < bytes_to_read) { + // First top up the input buffer, taking care not to overwrite anything + // remaining from a previous iteration. + size_t bytes_read_this_it = xStreamBufferReceive( + source_, + input_buffer_as_bytes_.subspan(leftover_offset_ + leftover_bytes_) + .data(), + std::min(input_buffer_as_bytes_.size() - leftover_offset_ - + leftover_bytes_, + bytes_to_read - bytes_read), + portMAX_DELAY); + bytes_read += bytes_read_this_it; + + // Calculate the number of whole samples that are now in the input buffer. + size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; + size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); + + size_t samples_used = HandleSamples( + input_buffer_.subspan(leftover_offset_).first(samples_in_buffer), + args.is_end_of_stream && bytes_read == bytes_to_read); + + // Maybe the resampler didn't consume everything. Maybe the last few + // bytes we read were half a frame. Either way, we need to calculate the + // size of the remainder in bytes. + size_t bytes_used = samples_used * sizeof(sample::Sample); + leftover_bytes_ = bytes_in_buffer - bytes_used; + if (leftover_bytes_ == 0) { + leftover_offset_ = 0; + } else { + leftover_offset_ += bytes_used; + } + } } +} - if (*pcm == target_format_) { +auto SinkMixer::HandleSamples(cpp::span<sample::Sample> input, bool is_eos) + -> size_t { + if (source_format_ == target_format_) { // The happiest possible case: the input format matches the output - // format already. Streams like this should probably have bypassed the - // mixer. - // TODO(jacqueline): Make this an error; it's slow to use the mixer in this - // case, compared to just writing directly to the sink. - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(sink_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return; + // format already. + std::size_t bytes_sent = xStreamBufferSend( + sink_->stream(), input.data(), input.size_bytes(), portMAX_DELAY); + return bytes_sent / sizeof(sample::Sample); } - while (input_stream_->info().bytes_in_stream() >= sizeof(sample::Sample)) { - RawStream* output_source; - if (pcm->sample_rate != target_format_.sample_rate) { - OutputStream resampled_writer{resampled_stream_.get()}; - if (Resample(input, resampled_writer)) { + size_t samples_used = 0; + while (input.size() < samples_used) { + cpp::span<sample::Sample> output_source; + if (source_format_.sample_rate != target_format_.sample_rate) { + if (resampler_ == nullptr) { + ESP_LOGI(kTag, "creating new resampler"); + resampler_.reset(new Resampler(source_format_.sample_rate, + target_format_.sample_rate, + source_format_.num_channels)); + } + + size_t read, written; + std::tie(read, written) = + resampler_->Process(input, resampled_buffer_, is_eos); + samples_used += read; + if (read == 0 && written == 0) { // Zero samples used or written. We need more input. break; } - output_source = resampled_stream_.get(); + output_source = resampled_buffer_.first(written); } else { - output_source = input_stream_.get(); + output_source = input; + samples_used = input.size(); } - size_t bytes_consumed = output_source->info().bytes_in_stream(); - size_t bytes_to_send = output_source->info().bytes_in_stream(); - if (target_format_.bits_per_sample == 16) { - // This is slightly scary; we're basically reaching into the internals of - // the stream buffer to do in-place conversion of samples. Saving an - // extra buffer + copy into that buffer is certainly worth it however. - cpp::span<sample::Sample> src = - output_source->data_as<sample::Sample>().first( - output_source->info().bytes_in_stream() / sizeof(sample::Sample)); - cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(src.data()), - src.size()}; - - ApplyDither(src, 16); - Downscale(src, dest); - - bytes_consumed = src.size_bytes(); - bytes_to_send = src.size_bytes() / 2; - } + // FIXME: The source should have some kind of hint indicating whether it + // needs dither, since some codecs (e.g. opus) apply their own dither. + ApplyDither(output_source, 16); + + cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(output_source.data()), + output_source.size()}; + for (size_t i = 0; i < output_source.size(); i++) { + dest[i] = sample::ToSigned16Bit(output_source[i]); + } - InputStream output{output_source}; - cpp::span<const std::byte> buf = output.data(); + output_source = output_source.first(output_source.size() / 2); + } size_t bytes_sent = 0; + size_t bytes_to_send = output_source.size_bytes(); while (bytes_sent < bytes_to_send) { - auto cropped = buf.subspan(bytes_sent, bytes_to_send - bytes_sent); - bytes_sent += xStreamBufferSend(sink_, cropped.data(), - cropped.size_bytes(), portMAX_DELAY); + bytes_sent += xStreamBufferSend( + sink_->stream(), + reinterpret_cast<std::byte*>(output_source.data()) + bytes_sent, + bytes_to_send - bytes_sent, portMAX_DELAY); } - output.consume(bytes_consumed); - } -} - -auto SinkMixer::Resample(InputStream& in, OutputStream& out) -> bool { - if (resampler_ == nullptr) { - ESP_LOGI(kTag, "creating new resampler"); - auto format = in.info().format_as<StreamInfo::Pcm>(); - resampler_.reset(new Resampler( - format->sample_rate, target_format_.sample_rate, format->channels)); - } - - auto res = resampler_->Process(in.data_as<sample::Sample>(), - out.data_as<sample::Sample>(), false); - - in.consume(res.first * sizeof(sample::Sample)); - out.add(res.second * sizeof(sample::Sample)); - - return res.first == 0 && res.second == 0; -} - -auto SinkMixer::Downscale(cpp::span<sample::Sample> samples, - cpp::span<int16_t> output) -> void { - for (size_t i = 0; i < samples.size(); i++) { - output[i] = sample::ToSigned16Bit(samples[i]); } + return samples_used; } auto SinkMixer::ApplyDither(cpp::span<sample::Sample> samples, |
