summaryrefslogtreecommitdiff
path: root/src/audio/sink_mixer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/audio/sink_mixer.cpp')
-rw-r--r--src/audio/sink_mixer.cpp263
1 files changed, 130 insertions, 133 deletions
diff --git a/src/audio/sink_mixer.cpp b/src/audio/sink_mixer.cpp
index 6c72c8b0..9f973d4b 100644
--- a/src/audio/sink_mixer.cpp
+++ b/src/audio/sink_mixer.cpp
@@ -7,6 +7,7 @@
#include "sink_mixer.hpp"
#include <stdint.h>
+#include <algorithm>
#include <cmath>
#include "esp_heap_caps.h"
@@ -23,20 +24,28 @@
static constexpr char kTag[] = "mixer";
static constexpr std::size_t kSourceBufferLength = 8 * 1024;
-static constexpr std::size_t kSampleBufferLength = 240 * 2 * sizeof(int32_t);
+static constexpr std::size_t kSampleBufferLength = 240 * 2;
namespace audio {
-SinkMixer::SinkMixer(StreamBufferHandle_t dest)
+SinkMixer::SinkMixer(IAudioSink* sink)
: commands_(xQueueCreate(1, sizeof(Args))),
- is_idle_(xSemaphoreCreateBinary()),
resampler_(nullptr),
source_(xStreamBufferCreateWithCaps(kSourceBufferLength,
1,
MALLOC_CAP_SPIRAM)),
- sink_(dest) {
- input_stream_.reset(new RawStream(kSampleBufferLength));
- resampled_stream_.reset(new RawStream(kSampleBufferLength));
+ sink_(sink) {
+ input_buffer_ = {
+ reinterpret_cast<sample::Sample*>(heap_caps_calloc(
+ kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)),
+ kSampleBufferLength};
+ input_buffer_as_bytes_ = {reinterpret_cast<std::byte*>(input_buffer_.data()),
+ input_buffer_.size_bytes()};
+
+ resampled_buffer_ = {
+ reinterpret_cast<sample::Sample*>(heap_caps_calloc(
+ kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)),
+ kSampleBufferLength};
// Pin to CORE0 because we need the FPU.
// FIXME: A fixed point implementation could run freely on either core,
@@ -46,168 +55,156 @@ SinkMixer::SinkMixer(StreamBufferHandle_t dest)
SinkMixer::~SinkMixer() {
vQueueDelete(commands_);
- vSemaphoreDelete(is_idle_);
vStreamBufferDelete(source_);
}
-auto SinkMixer::MixAndSend(InputStream& input, const StreamInfo::Pcm& target)
- -> std::size_t {
- if (input.info().format_as<StreamInfo::Pcm>() !=
- input_stream_->info().format_as<StreamInfo::Pcm>()) {
- xSemaphoreTake(is_idle_, portMAX_DELAY);
- Args args{
- .cmd = Command::kSetSourceFormat,
- .format = input.info().format_as<StreamInfo::Pcm>().value(),
- };
- xQueueSend(commands_, &args, portMAX_DELAY);
- xSemaphoreGive(is_idle_);
- }
- if (target_format_ != target) {
- xSemaphoreTake(is_idle_, portMAX_DELAY);
- Args args{
- .cmd = Command::kSetTargetFormat,
- .format = target,
- };
- xQueueSend(commands_, &args, portMAX_DELAY);
- xSemaphoreGive(is_idle_);
- }
-
+auto SinkMixer::MixAndSend(cpp::span<sample::Sample> input,
+ const IAudioSink::Format& format,
+ bool is_eos) -> void {
Args args{
- .cmd = Command::kReadBytes,
- .format = {},
+ .format = format,
+ .samples_available = input.size(),
+ .is_end_of_stream = is_eos,
};
xQueueSend(commands_, &args, portMAX_DELAY);
- auto buf = input.data();
- std::size_t bytes_sent =
- xStreamBufferSend(source_, buf.data(), buf.size_bytes(), portMAX_DELAY);
- input.consume(bytes_sent);
- return bytes_sent;
+ cpp::span<std::byte> input_as_bytes = {
+ reinterpret_cast<std::byte*>(input.data()), input.size_bytes()};
+ size_t bytes_sent = 0;
+ while (bytes_sent < input_as_bytes.size()) {
+ bytes_sent +=
+ xStreamBufferSend(source_, input_as_bytes.subspan(bytes_sent).data(),
+ input_as_bytes.size() - bytes_sent, portMAX_DELAY);
+ }
}
auto SinkMixer::Main() -> void {
- OutputStream input_receiver{input_stream_.get()};
- xSemaphoreGive(is_idle_);
-
for (;;) {
Args args;
while (!xQueueReceive(commands_, &args, portMAX_DELAY)) {
}
- switch (args.cmd) {
- case Command::kSetSourceFormat:
- ESP_LOGI(kTag, "setting source format");
- input_receiver.prepare(args.format, {});
- resampler_.reset();
- break;
- case Command::kSetTargetFormat:
- ESP_LOGI(kTag, "setting target format");
- target_format_ = args.format;
- resampler_.reset();
- break;
- case Command::kReadBytes:
- xSemaphoreTake(is_idle_, 0);
- while (!xStreamBufferIsEmpty(source_)) {
- auto buf = input_receiver.data();
- std::size_t bytes_received = xStreamBufferReceive(
- source_, buf.data(), buf.size_bytes(), portMAX_DELAY);
- input_receiver.add(bytes_received);
- HandleBytes();
+ if (args.format != source_format_) {
+ resampler_.reset();
+ source_format_ = args.format;
+ leftover_bytes_ = 0;
+ leftover_offset_ = 0;
+
+ auto new_target = sink_->PrepareFormat(args.format);
+ if (new_target != target_format_) {
+ // The new format is different to the old one. Wait for the sink to
+ // drain before continuing.
+ while (!xStreamBufferIsEmpty(sink_->stream())) {
+ ESP_LOGI(kTag, "waiting for sink stream to drain...");
+ // TODO(jacqueline): Get the sink drain ISR to notify us of this
+ // via semaphore instead of busy-ish waiting.
+ vTaskDelay(pdMS_TO_TICKS(10));
}
- xSemaphoreGive(is_idle_);
- break;
+
+ ESP_LOGI(kTag, "configuring sink");
+ sink_->Configure(new_target);
+ }
+ target_format_ = new_target;
}
- }
-}
-auto SinkMixer::HandleBytes() -> void {
- InputStream input{input_stream_.get()};
- auto pcm = input.info().format_as<StreamInfo::Pcm>();
- if (!pcm) {
- ESP_LOGE(kTag, "mixer got unsupported data");
- return;
+ // Loop until we finish reading all the bytes indicated. There might be
+ // leftovers from each iteration, and from this process as a whole,
+ // depending on the resampling stage.
+ size_t bytes_read = 0;
+ size_t bytes_to_read = args.samples_available * sizeof(sample::Sample);
+ while (bytes_read < bytes_to_read) {
+ // First top up the input buffer, taking care not to overwrite anything
+ // remaining from a previous iteration.
+ size_t bytes_read_this_it = xStreamBufferReceive(
+ source_,
+ input_buffer_as_bytes_.subspan(leftover_offset_ + leftover_bytes_)
+ .data(),
+ std::min(input_buffer_as_bytes_.size() - leftover_offset_ -
+ leftover_bytes_,
+ bytes_to_read - bytes_read),
+ portMAX_DELAY);
+ bytes_read += bytes_read_this_it;
+
+ // Calculate the number of whole samples that are now in the input buffer.
+ size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_;
+ size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample);
+
+ size_t samples_used = HandleSamples(
+ input_buffer_.subspan(leftover_offset_).first(samples_in_buffer),
+ args.is_end_of_stream && bytes_read == bytes_to_read);
+
+ // Maybe the resampler didn't consume everything. Maybe the last few
+ // bytes we read were half a frame. Either way, we need to calculate the
+ // size of the remainder in bytes.
+ size_t bytes_used = samples_used * sizeof(sample::Sample);
+ leftover_bytes_ = bytes_in_buffer - bytes_used;
+ if (leftover_bytes_ == 0) {
+ leftover_offset_ = 0;
+ } else {
+ leftover_offset_ += bytes_used;
+ }
+ }
}
+}
- if (*pcm == target_format_) {
+auto SinkMixer::HandleSamples(cpp::span<sample::Sample> input, bool is_eos)
+ -> size_t {
+ if (source_format_ == target_format_) {
// The happiest possible case: the input format matches the output
- // format already. Streams like this should probably have bypassed the
- // mixer.
- // TODO(jacqueline): Make this an error; it's slow to use the mixer in this
- // case, compared to just writing directly to the sink.
- auto buf = input.data();
- std::size_t bytes_sent =
- xStreamBufferSend(sink_, buf.data(), buf.size_bytes(), portMAX_DELAY);
- input.consume(bytes_sent);
- return;
+ // format already.
+ std::size_t bytes_sent = xStreamBufferSend(
+ sink_->stream(), input.data(), input.size_bytes(), portMAX_DELAY);
+ return bytes_sent / sizeof(sample::Sample);
}
- while (input_stream_->info().bytes_in_stream() >= sizeof(sample::Sample)) {
- RawStream* output_source;
- if (pcm->sample_rate != target_format_.sample_rate) {
- OutputStream resampled_writer{resampled_stream_.get()};
- if (Resample(input, resampled_writer)) {
+ size_t samples_used = 0;
+ while (input.size() < samples_used) {
+ cpp::span<sample::Sample> output_source;
+ if (source_format_.sample_rate != target_format_.sample_rate) {
+ if (resampler_ == nullptr) {
+ ESP_LOGI(kTag, "creating new resampler");
+ resampler_.reset(new Resampler(source_format_.sample_rate,
+ target_format_.sample_rate,
+ source_format_.num_channels));
+ }
+
+ size_t read, written;
+ std::tie(read, written) =
+ resampler_->Process(input, resampled_buffer_, is_eos);
+ samples_used += read;
+ if (read == 0 && written == 0) {
// Zero samples used or written. We need more input.
break;
}
- output_source = resampled_stream_.get();
+ output_source = resampled_buffer_.first(written);
} else {
- output_source = input_stream_.get();
+ output_source = input;
+ samples_used = input.size();
}
- size_t bytes_consumed = output_source->info().bytes_in_stream();
- size_t bytes_to_send = output_source->info().bytes_in_stream();
-
if (target_format_.bits_per_sample == 16) {
- // This is slightly scary; we're basically reaching into the internals of
- // the stream buffer to do in-place conversion of samples. Saving an
- // extra buffer + copy into that buffer is certainly worth it however.
- cpp::span<sample::Sample> src =
- output_source->data_as<sample::Sample>().first(
- output_source->info().bytes_in_stream() / sizeof(sample::Sample));
- cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(src.data()),
- src.size()};
-
- ApplyDither(src, 16);
- Downscale(src, dest);
-
- bytes_consumed = src.size_bytes();
- bytes_to_send = src.size_bytes() / 2;
- }
+ // FIXME: The source should have some kind of hint indicating whether it
+ // needs dither, since some codecs (e.g. opus) apply their own dither.
+ ApplyDither(output_source, 16);
+
+ cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(output_source.data()),
+ output_source.size()};
+ for (size_t i = 0; i < output_source.size(); i++) {
+ dest[i] = sample::ToSigned16Bit(output_source[i]);
+ }
- InputStream output{output_source};
- cpp::span<const std::byte> buf = output.data();
+ output_source = output_source.first(output_source.size() / 2);
+ }
size_t bytes_sent = 0;
+ size_t bytes_to_send = output_source.size_bytes();
while (bytes_sent < bytes_to_send) {
- auto cropped = buf.subspan(bytes_sent, bytes_to_send - bytes_sent);
- bytes_sent += xStreamBufferSend(sink_, cropped.data(),
- cropped.size_bytes(), portMAX_DELAY);
+ bytes_sent += xStreamBufferSend(
+ sink_->stream(),
+ reinterpret_cast<std::byte*>(output_source.data()) + bytes_sent,
+ bytes_to_send - bytes_sent, portMAX_DELAY);
}
- output.consume(bytes_consumed);
- }
-}
-
-auto SinkMixer::Resample(InputStream& in, OutputStream& out) -> bool {
- if (resampler_ == nullptr) {
- ESP_LOGI(kTag, "creating new resampler");
- auto format = in.info().format_as<StreamInfo::Pcm>();
- resampler_.reset(new Resampler(
- format->sample_rate, target_format_.sample_rate, format->channels));
- }
-
- auto res = resampler_->Process(in.data_as<sample::Sample>(),
- out.data_as<sample::Sample>(), false);
-
- in.consume(res.first * sizeof(sample::Sample));
- out.add(res.second * sizeof(sample::Sample));
-
- return res.first == 0 && res.second == 0;
-}
-
-auto SinkMixer::Downscale(cpp::span<sample::Sample> samples,
- cpp::span<int16_t> output) -> void {
- for (size_t i = 0; i < samples.size(); i++) {
- output[i] = sample::ToSigned16Bit(samples[i]);
}
+ return samples_used;
}
auto SinkMixer::ApplyDither(cpp::span<sample::Sample> samples,