summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/drivers/include/drivers/pcm_buffer.hpp8
-rw-r--r--src/drivers/pcm_buffer.cpp9
-rw-r--r--src/tangara/audio/audio_decoder.cpp46
-rw-r--r--src/tangara/audio/audio_decoder.hpp2
-rw-r--r--src/tangara/audio/audio_fsm.cpp1
-rw-r--r--src/tangara/audio/processor.cpp409
-rw-r--r--src/tangara/audio/processor.hpp70
-rw-r--r--src/tangara/audio/resample.cpp18
-rw-r--r--src/tangara/audio/resample.hpp3
9 files changed, 407 insertions, 159 deletions
diff --git a/src/drivers/include/drivers/pcm_buffer.hpp b/src/drivers/include/drivers/pcm_buffer.hpp
index 6630f720..8f53317e 100644
--- a/src/drivers/include/drivers/pcm_buffer.hpp
+++ b/src/drivers/include/drivers/pcm_buffer.hpp
@@ -28,8 +28,12 @@ class PcmBuffer {
PcmBuffer(size_t size_in_samples);
~PcmBuffer();
- /* Adds samples to the buffer. */
- auto send(std::span<const int16_t>) -> void;
+ /*
+ * Adds samples to the buffer. Returns the number of samples that were added,
+ * which may be less than the number of samples given if this PcmBuffer is
+ * close to full.
+ */
+ auto send(std::span<const int16_t>) -> size_t;
/*
* Fills the given span with samples. If enough samples are available in
diff --git a/src/drivers/pcm_buffer.cpp b/src/drivers/pcm_buffer.cpp
index 25762c50..071f5cea 100644
--- a/src/drivers/pcm_buffer.cpp
+++ b/src/drivers/pcm_buffer.cpp
@@ -17,6 +17,7 @@
#include "freertos/FreeRTOS.h"
#include "esp_heap_caps.h"
+#include "freertos/projdefs.h"
#include "freertos/ringbuf.h"
#include "portmacro.h"
@@ -39,9 +40,13 @@ PcmBuffer::~PcmBuffer() {
heap_caps_free(buf_);
}
-auto PcmBuffer::send(std::span<const int16_t> data) -> void {
- xRingbufferSend(ringbuf_, data.data(), data.size_bytes(), portMAX_DELAY);
+auto PcmBuffer::send(std::span<const int16_t> data) -> size_t {
+ if (!xRingbufferSend(ringbuf_, data.data(), data.size_bytes(),
+ pdMS_TO_TICKS(100))) {
+ return 0;
+ }
sent_ += data.size();
+ return data.size();
}
IRAM_ATTR auto PcmBuffer::receive(std::span<int16_t> dest, bool isr)
diff --git a/src/tangara/audio/audio_decoder.cpp b/src/tangara/audio/audio_decoder.cpp
index ee06d984..992444f0 100644
--- a/src/tangara/audio/audio_decoder.cpp
+++ b/src/tangara/audio/audio_decoder.cpp
@@ -48,7 +48,7 @@ static const char* kTag = "decoder";
* increasing its size.
*/
static constexpr std::size_t kCodecBufferLength =
- drivers::kI2SBufferLengthFrames * sizeof(sample::Sample);
+ drivers::kI2SBufferLengthFrames * 2;
auto Decoder::Start(std::shared_ptr<SampleProcessor> sink) -> Decoder* {
Decoder* task = new Decoder(sink);
@@ -78,11 +78,17 @@ Decoder::Decoder(std::shared_ptr<SampleProcessor> processor)
* Main decoding loop. Handles watching for new streams, or continuing to nudge
* along the current stream if we have one.
*/
+IRAM_ATTR
void Decoder::Main() {
for (;;) {
- // Check whether there's a new stream to begin. If we're idle, then we
- // simply park and wait forever for a stream to arrive.
- TickType_t wait_time = stream_ ? 0 : portMAX_DELAY;
+ // How long should we spend waiting for a command? By default, assume we're
+ // idle and wait forever.
+ TickType_t wait_time = portMAX_DELAY;
+ if (!leftover_samples_.empty() || stream_) {
+ // If we have work to do, then don't block waiting for a new stream.
+ wait_time = 0;
+ }
+
NextStream* next;
if (xQueueReceive(next_stream_, &next, wait_time)) {
// Copy the data out of the queue, then clean up the item.
@@ -103,8 +109,15 @@ void Decoder::Main() {
// Start decoding the new stream.
prepareDecode(new_stream);
+
+ // Keep handling commands until the command queue is empty.
+ continue;
}
+ // We should always have a stream if we returned from xQueueReceive without
+ // receiving a new stream.
+ assert(stream_);
+
if (!continueDecode()) {
finishDecode(false);
}
@@ -167,16 +180,36 @@ auto Decoder::prepareDecode(std::shared_ptr<TaggedStream> stream) -> void {
}
auto Decoder::continueDecode() -> bool {
+ // First, see if we have any samples from a previous decode that still need
+ // to be sent.
+ if (!leftover_samples_.empty()) {
+ leftover_samples_ = processor_->continueStream(leftover_samples_);
+ return true;
+ }
+
+ // We might have already cleaned up the codec if the last decode pass of the
+ // stream resulted in leftoverr samples.
+ if (!codec_) {
+ return false;
+ }
+
auto res = codec_->DecodeTo(codec_buffer_);
if (res.has_error()) {
return false;
}
if (res->samples_written > 0) {
- processor_->continueStream(codec_buffer_.first(res->samples_written));
+ leftover_samples_ =
+ processor_->continueStream(codec_buffer_.first(res->samples_written));
+ }
+
+ if (res->is_stream_finished) {
+ // The codec has finished, so make sure we don't call it again.
+ codec_.reset();
}
- return !res->is_stream_finished;
+ // We're done iff the codec has finished and we sent everything.
+ return codec_ || !leftover_samples_.empty();
}
auto Decoder::finishDecode(bool cancel) -> void {
@@ -191,6 +224,7 @@ auto Decoder::finishDecode(bool cancel) -> void {
processor_->endStream(cancel);
// Clean up after ourselves.
+ leftover_samples_ = {};
stream_.reset();
codec_.reset();
track_.reset();
diff --git a/src/tangara/audio/audio_decoder.hpp b/src/tangara/audio/audio_decoder.hpp
index 64561d9d..9f20ec59 100644
--- a/src/tangara/audio/audio_decoder.hpp
+++ b/src/tangara/audio/audio_decoder.hpp
@@ -15,6 +15,7 @@
#include "audio/processor.hpp"
#include "codec.hpp"
#include "database/track.hpp"
+#include "sample.hpp"
#include "types.hpp"
namespace audio {
@@ -55,6 +56,7 @@ class Decoder {
std::shared_ptr<TrackInfo> track_;
std::span<sample::Sample> codec_buffer_;
+ std::span<sample::Sample> leftover_samples_;
};
} // namespace audio
diff --git a/src/tangara/audio/audio_fsm.cpp b/src/tangara/audio/audio_fsm.cpp
index 16c16002..54ea5b6c 100644
--- a/src/tangara/audio/audio_fsm.cpp
+++ b/src/tangara/audio/audio_fsm.cpp
@@ -216,6 +216,7 @@ void AudioState::react(const internal::StreamStarted& ev) {
}
sStreamCues.addCue(ev.track, ev.cue_at_sample);
+ sStreamCues.update(sDrainBuffer->totalReceived());
if (!sIsPaused && !is_in_state<states::Playback>()) {
transit<states::Playback>();
diff --git a/src/tangara/audio/processor.cpp b/src/tangara/audio/processor.cpp
index 81858110..29124232 100644
--- a/src/tangara/audio/processor.cpp
+++ b/src/tangara/audio/processor.cpp
@@ -10,60 +10,57 @@
#include <algorithm>
#include <cmath>
#include <cstdint>
+#include <cstring>
#include <limits>
#include <span>
-#include "audio/audio_events.hpp"
-#include "audio/audio_sink.hpp"
-#include "drivers/i2s_dac.hpp"
-#include "drivers/pcm_buffer.hpp"
+#include "assert.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
-#include "events/event_queue.hpp"
+#include "esp_timer.h"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
+#include "audio/audio_events.hpp"
+#include "audio/audio_sink.hpp"
+#include "audio/i2s_audio_output.hpp"
#include "audio/resample.hpp"
+#include "drivers/i2s_dac.hpp"
+#include "drivers/pcm_buffer.hpp"
+#include "events/event_queue.hpp"
#include "sample.hpp"
#include "tasks.hpp"
[[maybe_unused]] static constexpr char kTag[] = "mixer";
-static constexpr std::size_t kSampleBufferLength =
- drivers::kI2SBufferLengthFrames * sizeof(sample::Sample) * 2;
-static constexpr std::size_t kSourceBufferLength = kSampleBufferLength * 2;
+static const size_t kSampleBufferLength = drivers::kI2SBufferLengthFrames * 2;
+static const size_t kSourceBufferLength = kSampleBufferLength * 2;
namespace audio {
+static const I2SAudioOutput::Format kTargetFormat{
+ .sample_rate = 48000,
+ .num_channels = 2,
+ .bits_per_sample = 16,
+};
+
SampleProcessor::SampleProcessor(drivers::PcmBuffer& sink)
- : commands_(xQueueCreate(1, sizeof(Args))),
- resampler_(nullptr),
- source_(xStreamBufferCreateWithCaps(kSourceBufferLength,
- sizeof(sample::Sample) * 2,
+ : commands_(xQueueCreate(2, sizeof(Args))),
+ source_(xStreamBufferCreateWithCaps(kSourceBufferLength + 1,
+ sizeof(sample::Sample),
MALLOC_CAP_DMA)),
sink_(sink),
- leftover_bytes_(0) {
- input_buffer_ = {
- reinterpret_cast<sample::Sample*>(heap_caps_calloc(
- kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)),
- kSampleBufferLength};
- input_buffer_as_bytes_ = {reinterpret_cast<std::byte*>(input_buffer_.data()),
- input_buffer_.size_bytes()};
-
- resampled_buffer_ = {
- reinterpret_cast<sample::Sample*>(heap_caps_calloc(
- kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_DMA)),
- kSampleBufferLength};
-
+ unprocessed_samples_(0) {
tasks::StartPersistent<tasks::Type::kAudioConverter>([&]() { Main(); });
}
SampleProcessor::~SampleProcessor() {
vQueueDelete(commands_);
- vStreamBufferDelete(source_);
+ vStreamBufferDeleteWithCaps(source_);
}
auto SampleProcessor::SetOutput(std::shared_ptr<IAudioOutput> output) -> void {
+ output->Configure(kTargetFormat);
// FIXME: We should add synchronisation here, but we should be careful
// about not impacting performance given that the output will change only
// very rarely (if ever).
@@ -80,15 +77,30 @@ auto SampleProcessor::beginStream(std::shared_ptr<TrackInfo> track) -> void {
xQueueSend(commands_, &args, portMAX_DELAY);
}
-auto SampleProcessor::continueStream(std::span<sample::Sample> input) -> void {
+auto SampleProcessor::continueStream(std::span<sample::Sample> input)
+ -> std::span<sample::Sample> {
+ size_t bytes_sent = xStreamBufferSend(source_, input.data(),
+ input.size_bytes(), pdMS_TO_TICKS(100));
+ if (!bytes_sent) {
+ // If nothing could be sent, then bail out early. We don't want to send a
+ // samples_available command with zero samples.
+ return input;
+ }
+
+ // We should only ever be placing whole samples into the buffer. If half
+ // samples start being sent, then this indicates a serious bug somewhere.
+ size_t samples_sent = bytes_sent / sizeof(sample::Sample);
+ assert(samples_sent * sizeof(sample::Sample) == bytes_sent);
+
Args args{
.track = nullptr,
- .samples_available = input.size(),
+ .samples_available = samples_sent,
.is_end_of_stream = false,
.clear_buffers = false,
};
xQueueSend(commands_, &args, portMAX_DELAY);
- xStreamBufferSend(source_, input.data(), input.size_bytes(), portMAX_DELAY);
+
+ return input.subspan(samples_sent);
}
auto SampleProcessor::endStream(bool cancelled) -> void {
@@ -101,152 +113,281 @@ auto SampleProcessor::endStream(bool cancelled) -> void {
xQueueSend(commands_, &args, portMAX_DELAY);
}
+IRAM_ATTR
auto SampleProcessor::Main() -> void {
for (;;) {
+ // Block indefinitely if the processor is idle. Otherwise check briefly for
+ // new commands, then continue processing.
+ TickType_t wait = hasPendingWork() ? 0 : portMAX_DELAY;
+
Args args;
- while (!xQueueReceive(commands_, &args, portMAX_DELAY)) {
+ if (xQueueReceive(commands_, &args, wait)) {
+ if (args.is_end_of_stream && args.clear_buffers) {
+ // The new command is telling us to clear our buffers! This includes
+ // discarding any commands that have backed up without being processed.
+ // Discard all the old commands, then immediately handle the end of
+ // stream.
+ while (!pending_commands_.empty()) {
+ Args discard = pending_commands_.front();
+ pending_commands_.pop_front();
+ discardCommand(discard);
+ }
+ handleEndStream(true);
+ } else {
+ pending_commands_.push_back(args);
+ }
}
- if (args.track) {
- handleBeginStream(*args.track);
- delete args.track;
+ // We need to finish flushing all processed samples before we can process
+ // more samples.
+ if (!output_buffer_.isEmpty() && flushOutputBuffer()) {
+ continue;
}
- if (args.samples_available) {
- handleContinueStream(args.samples_available);
+
+ // We need to finish processing all the samples we've been told about
+ // before we handle backed up commands.
+ if (unprocessed_samples_ && !processSamples(false)) {
+ continue;
}
- if (args.is_end_of_stream) {
- handleEndStream(args.clear_buffers);
+
+ while (!pending_commands_.empty()) {
+ args = pending_commands_.front();
+ pending_commands_.pop_front();
+
+ if (args.track) {
+ handleBeginStream(*args.track);
+ delete args.track;
+ }
+ if (args.samples_available) {
+ unprocessed_samples_ += args.samples_available;
+ }
+ if (args.is_end_of_stream) {
+ if (processSamples(true) || args.clear_buffers) {
+ handleEndStream(args.clear_buffers);
+ } else {
+ // The output filled up while we were trying to flush the last
+ // samples of this stream, and we haven't been told to clear our
+ // buffers. Retry handling this command later.
+ pending_commands_.push_front(args);
+ break;
+ }
+ }
}
}
}
auto SampleProcessor::handleBeginStream(std::shared_ptr<TrackInfo> track)
-> void {
- if (track->format != source_format_) {
- source_format_ = track->format;
- // The new stream has a different format to the previous stream (or there
- // was no previous stream).
- // First, clean up our filters.
- resampler_.reset();
- leftover_bytes_ = 0;
-
- // If the output is idle, then we can reconfigure it to the closest format
- // to our new source.
- // If the output *wasn't* idle, then we can't reconfigure without an
- // audible gap in playback. So instead, we simply keep the same target
- // format and begin resampling.
- if (sink_.isEmpty()) {
- target_format_ = output_->PrepareFormat(track->format);
- output_->Configure(target_format_);
+ // If the new stream's sample rate doesn't match our canonical sample rate,
+ // then prepare to start resampling.
+ if (track->format.sample_rate != kTargetFormat.sample_rate) {
+ ESP_LOGI(kTag, "resampling %lu -> %lu", track->format.sample_rate,
+ kTargetFormat.sample_rate);
+ if (!resampler_ || resampler_->sourceRate() != track->format.sample_rate) {
+ // If there's already a resampler instance for this source rate, then
+ // reuse it to help gapless playback work smoothly.
+ resampler_.reset(new Resampler(track->format.sample_rate,
+ kTargetFormat.sample_rate,
+ track->format.num_channels));
}
+ } else {
+ resampler_.reset();
}
+ // If the new stream has only one channel, then we double it to get stereo
+ // audio.
+ // FIXME: If the Bluetooth stack allowed us to configure the number of
+ // channels, we could remove this.
+ double_samples_ = track->format.num_channels != kTargetFormat.num_channels;
+
events::Audio().Dispatch(internal::StreamStarted{
.track = track,
- .sink_format = target_format_,
+ .sink_format = kTargetFormat,
.cue_at_sample = sink_.totalSent(),
});
}
-auto SampleProcessor::handleContinueStream(size_t samples_available) -> void {
- // Loop until we finish reading all the bytes indicated. There might be
- // leftovers from each iteration, and from this process as a whole,
- // depending on the resampling stage.
- size_t bytes_read = 0;
- size_t bytes_to_read = samples_available * sizeof(sample::Sample);
- while (bytes_read < bytes_to_read) {
- // First top up the input buffer, taking care not to overwrite anything
- // remaining from a previous iteration.
- size_t bytes_read_this_it = xStreamBufferReceive(
- source_, input_buffer_as_bytes_.subspan(leftover_bytes_).data(),
- std::min(input_buffer_as_bytes_.size() - leftover_bytes_,
- bytes_to_read - bytes_read),
- portMAX_DELAY);
- bytes_read += bytes_read_this_it;
-
- // Calculate the number of whole samples that are now in the input buffer.
- size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_;
- size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample);
-
- size_t samples_used = handleSamples(input_buffer_.first(samples_in_buffer));
-
- // Maybe the resampler didn't consume everything. Maybe the last few
- // bytes we read were half a frame. Either way, we need to calculate the
- // size of the remainder in bytes, then move it to the front of our
- // buffer.
- size_t bytes_used = samples_used * sizeof(sample::Sample);
- assert(bytes_used <= bytes_in_buffer);
-
- leftover_bytes_ = bytes_in_buffer - bytes_used;
- if (leftover_bytes_ > 0) {
- std::memmove(input_buffer_as_bytes_.data(),
- input_buffer_as_bytes_.data() + bytes_used, leftover_bytes_);
- }
- }
-}
+IRAM_ATTR
+auto SampleProcessor::processSamples(bool finalise) -> bool {
+ for (;;) {
+ bool out_of_work = true;
-auto SampleProcessor::handleSamples(std::span<sample::Sample> input) -> size_t {
- if (source_format_ == target_format_) {
- // The happiest possible case: the input format matches the output
- // format already.
- sink_.send(input);
- return input.size();
- }
+ // First, fill up our input buffer with samples.
+ if (unprocessed_samples_ > 0) {
+ out_of_work = false;
+ auto input = input_buffer_.writeAcquire();
- size_t samples_used = 0;
- while (samples_used < input.size()) {
- std::span<sample::Sample> output_source;
- if (source_format_.sample_rate != target_format_.sample_rate) {
- if (resampler_ == nullptr) {
- ESP_LOGI(kTag, "creating new resampler for %lu -> %lu",
- source_format_.sample_rate, target_format_.sample_rate);
- resampler_.reset(new Resampler(source_format_.sample_rate,
- target_format_.sample_rate,
- source_format_.num_channels));
- }
+ size_t bytes_received = xStreamBufferReceive(
+ source_, input.data(),
+ std::min(input.size_bytes(),
+ unprocessed_samples_ * sizeof(sample::Sample)),
+ 0);
- size_t read, written;
- std::tie(read, written) = resampler_->Process(input.subspan(samples_used),
- resampled_buffer_, false);
- samples_used += read;
+ // We should never receive a half sample. Blow up immediately if we do.
+ size_t samples_received = bytes_received / sizeof(sample::Sample);
+ assert(samples_received * sizeof(sample::Sample) == bytes_received);
- if (read == 0 && written == 0) {
- // Zero samples used or written. We need more input.
- break;
- }
- output_source = resampled_buffer_.first(written);
- } else {
- output_source = input;
- samples_used = input.size();
+ unprocessed_samples_ -= samples_received;
+ input_buffer_.writeCommit(samples_received);
}
- sink_.send(output_source);
- }
+ // Next, push input samples through the resampler. In the best case, this
+ // is a simple copy operation.
+ if (!input_buffer_.isEmpty()) {
+ out_of_work = false;
+ auto resample_input = input_buffer_.readAcquire();
+ auto resample_output = resampled_buffer_.writeAcquire();
+
+ size_t read, wrote;
+ if (resampler_) {
+ std::tie(read, wrote) =
+ resampler_->Process(resample_input, resample_output, finalise);
+ } else {
+ read = wrote = std::min(resample_input.size(), resample_output.size());
+ std::copy_n(resample_input.begin(), read, resample_output.begin());
+ }
- return samples_used;
-}
+ input_buffer_.readCommit(read);
+ resampled_buffer_.writeCommit(wrote);
+ }
-auto SampleProcessor::handleEndStream(bool clear_bufs) -> void {
- if (resampler_ && !clear_bufs) {
- size_t read, written;
- std::tie(read, written) = resampler_->Process({}, resampled_buffer_, true);
+ // Next, we need to make sure the output is in stereo. This is also a simple
+ // copy in the best case.
+ if (!resampled_buffer_.isEmpty()) {
+ out_of_work = false;
+ auto channels_input = resampled_buffer_.readAcquire();
+ auto channels_output = output_buffer_.writeAcquire();
+ size_t read, wrote;
+ if (double_samples_) {
+ wrote = channels_output.size();
+ read = wrote / 2;
+ if (read > channels_input.size()) {
+ read = channels_input.size();
+ wrote = read * 2;
+ }
+ for (size_t i = 0; i < read; i++) {
+ channels_output[i * 2] = channels_input[i];
+ channels_output[(i * 2) + 1] = channels_input[i];
+ }
+ } else {
+ read = wrote = std::min(channels_input.size(), channels_output.size());
+ std::copy_n(channels_input.begin(), read, channels_output.begin());
+ }
+ resampled_buffer_.readCommit(read);
+ output_buffer_.writeCommit(wrote);
+ }
- if (written > 0) {
- sink_.send(resampled_buffer_.first(written));
+ // Finally, flush whatever ended up in the output buffer.
+ if (flushOutputBuffer()) {
+ if (out_of_work) {
+ return true;
+ }
+ } else {
+ // The output is congested. Back off of processing for a moment.
+ return false;
}
}
+}
+auto SampleProcessor::handleEndStream(bool clear_bufs) -> void {
if (clear_bufs) {
sink_.clear();
- }
- // FIXME: This discards any leftover samples, but there probably shouldn't be
- // any leftover samples. Can this be an assert instead?
- leftover_bytes_ = 0;
+ input_buffer_.clear();
+ resampled_buffer_.clear();
+ output_buffer_.clear();
+
+ size_t bytes_discarded = 0;
+ size_t bytes_to_discard = unprocessed_samples_ * sizeof(sample::Sample);
+ auto scratch_buf = output_buffer_.writeAcquire();
+ while (bytes_discarded < bytes_to_discard) {
+ size_t bytes_read =
+ xStreamBufferReceive(source_, scratch_buf.data(),
+ std::min(scratch_buf.size_bytes(),
+ bytes_to_discard - bytes_discarded),
+ 0);
+ bytes_discarded += bytes_read;
+ }
+ unprocessed_samples_ = 0;
+ }
events::Audio().Dispatch(internal::StreamEnded{
.cue_at_sample = sink_.totalSent(),
});
}
+auto SampleProcessor::hasPendingWork() -> bool {
+ return !pending_commands_.empty() || unprocessed_samples_ > 0 ||
+ !input_buffer_.isEmpty() || !resampled_buffer_.isEmpty() ||
+ !output_buffer_.isEmpty();
+}
+
+IRAM_ATTR
+auto SampleProcessor::flushOutputBuffer() -> bool {
+ auto samples = output_buffer_.readAcquire();
+ size_t sent = sink_.send(samples);
+ output_buffer_.readCommit(sent);
+ return output_buffer_.isEmpty();
+}
+
+auto SampleProcessor::discardCommand(Args& command) -> void {
+ if (command.track) {
+ delete command.track;
+ }
+ if (command.samples_available) {
+ unprocessed_samples_ += command.samples_available;
+ }
+ // End of stream commands can just be dropped. Without further actions.
+}
+
+SampleProcessor::Buffer::Buffer()
+ : buffer_(reinterpret_cast<sample::Sample*>(
+ heap_caps_calloc(kSampleBufferLength,
+ sizeof(sample::Sample),
+ MALLOC_CAP_DMA)),
+ kSampleBufferLength),
+ samples_in_buffer_() {}
+
+SampleProcessor::Buffer::~Buffer() {
+ heap_caps_free(buffer_.data());
+}
+
+auto SampleProcessor::Buffer::writeAcquire() -> std::span<sample::Sample> {
+ return buffer_.subspan(samples_in_buffer_.size());
+}
+
+auto SampleProcessor::Buffer::writeCommit(size_t samples) -> void {
+ if (samples == 0) {
+ return;
+ }
+ samples_in_buffer_ = buffer_.first(samples + samples_in_buffer_.size());
+}
+
+auto SampleProcessor::Buffer::readAcquire() -> std::span<sample::Sample> {
+ return samples_in_buffer_;
+}
+
+auto SampleProcessor::Buffer::readCommit(size_t samples) -> void {
+ if (samples == 0) {
+ return;
+ }
+ samples_in_buffer_ = samples_in_buffer_.subspan(samples);
+
+ // Move the leftover samples to the front of the buffer, so that we're setup
+ // for a new write.
+ if (!samples_in_buffer_.empty()) {
+ std::memmove(buffer_.data(), samples_in_buffer_.data(),
+ samples_in_buffer_.size_bytes());
+ samples_in_buffer_ = buffer_.first(samples_in_buffer_.size());
+ }
+}
+
+auto SampleProcessor::Buffer::isEmpty() -> bool {
+ return samples_in_buffer_.empty();
+}
+
+auto SampleProcessor::Buffer::clear() -> void {
+ samples_in_buffer_ = {};
+}
+
} // namespace audio
diff --git a/src/tangara/audio/processor.hpp b/src/tangara/audio/processor.hpp
index 5c4ad0fa..f1b1d921 100644
--- a/src/tangara/audio/processor.hpp
+++ b/src/tangara/audio/processor.hpp
@@ -8,6 +8,8 @@
#include <stdint.h>
#include <cstdint>
+#include <functional>
+#include <list>
#include <memory>
#include "audio/audio_events.hpp"
@@ -33,18 +35,43 @@ class SampleProcessor {
auto SetOutput(std::shared_ptr<IAudioOutput>) -> void;
+ /*
+ * Signals to the sample processor that a new discrete stream of audio is now
+ * being sent. This will typically represent a new track being played.
+ */
auto beginStream(std::shared_ptr<TrackInfo>) -> void;
- auto continueStream(std::span<sample::Sample>) -> void;
+
+ /*
+ * Sends a span of PCM samples to the processor. Returns a subspan of the
+ * given span containing samples that were not able to be sent during this
+ * call, e.g. because of congestion downstream from the processor.
+ */
+ auto continueStream(std::span<sample::Sample>) -> std::span<sample::Sample>;
+
+ /*
+ * Signals to the sample processor that the current stream is ending. This
+ * can either be because the stream has naturally finished, or because it is
+ * being interrupted.
+ * If `cancelled` is false, the sample processor will ensure all previous
+ * samples are processed and sent before communicating the end of the stream
+ * onwards. If `cancelled` is true, any samples from the current stream that
+ * have not yet been played will be discarded.
+ */
auto endStream(bool cancelled) -> void;
+ SampleProcessor(const SampleProcessor&) = delete;
+ SampleProcessor& operator=(const SampleProcessor&) = delete;
+
private:
auto Main() -> void;
auto handleBeginStream(std::shared_ptr<TrackInfo>) -> void;
- auto handleContinueStream(size_t samples_available) -> void;
auto handleEndStream(bool cancel) -> void;
- auto handleSamples(std::span<sample::Sample>) -> size_t;
+ auto processSamples(bool finalise) -> bool;
+
+ auto hasPendingWork() -> bool;
+ auto flushOutputBuffer() -> bool;
struct Args {
std::shared_ptr<TrackInfo>* track;
@@ -53,21 +80,44 @@ class SampleProcessor {
bool clear_buffers;
};
QueueHandle_t commands_;
+ std::list<Args> pending_commands_;
- std::unique_ptr<Resampler> resampler_;
+ auto discardCommand(Args& command) -> void;
StreamBufferHandle_t source_;
drivers::PcmBuffer& sink_;
- std::span<sample::Sample> input_buffer_;
- std::span<std::byte> input_buffer_as_bytes_;
+ class Buffer {
+ public:
+ Buffer();
+ ~Buffer();
+
+ auto writeAcquire() -> std::span<sample::Sample>;
+ auto writeCommit(size_t) -> void;
+
+ auto readAcquire() -> std::span<sample::Sample>;
+ auto readCommit(size_t) -> void;
- std::span<sample::Sample> resampled_buffer_;
+ auto isEmpty() -> bool;
+ auto clear() -> void;
+
+ Buffer(const Buffer&) = delete;
+ Buffer& operator=(const Buffer&) = delete;
+
+ private:
+ std::span<sample::Sample> buffer_;
+ std::span<sample::Sample> samples_in_buffer_;
+ };
+
+ Buffer input_buffer_;
+ Buffer resampled_buffer_;
+ Buffer output_buffer_;
+
+ std::unique_ptr<Resampler> resampler_;
+ bool double_samples_;
std::shared_ptr<IAudioOutput> output_;
- IAudioOutput::Format source_format_;
- IAudioOutput::Format target_format_;
- size_t leftover_bytes_;
+ size_t unprocessed_samples_;
};
} // namespace audio
diff --git a/src/tangara/audio/resample.cpp b/src/tangara/audio/resample.cpp
index 143ce230..d6369022 100644
--- a/src/tangara/audio/resample.cpp
+++ b/src/tangara/audio/resample.cpp
@@ -4,6 +4,7 @@
* SPDX-License-Identifier: GPL-3.0-only
*/
#include "audio/resample.hpp"
+#include <stdint.h>
#include <algorithm>
#include <cmath>
@@ -31,6 +32,7 @@ Resampler::Resampler(uint32_t source_sample_rate,
kQuality,
&err_)),
num_channels_(num_channels) {
+ speex_resampler_skip_zeros(resampler_);
assert(err_ == 0);
}
@@ -38,18 +40,24 @@ Resampler::~Resampler() {
speex_resampler_destroy(resampler_);
}
+auto Resampler::sourceRate() -> uint32_t {
+ uint32_t input = 0;
+ uint32_t output = 0;
+ speex_resampler_get_rate(resampler_, &input, &output);
+ return input;
+}
+
auto Resampler::Process(std::span<sample::Sample> input,
std::span<sample::Sample> output,
bool end_of_data) -> std::pair<size_t, size_t> {
- uint32_t samples_used = input.size() / num_channels_;
- uint32_t samples_produced = output.size() / num_channels_;
+ uint32_t frames_used = input.size() / num_channels_;
+ uint32_t frames_produced = output.size() / num_channels_;
int err = speex_resampler_process_interleaved_int(
- resampler_, input.data(), &samples_used, output.data(),
- &samples_produced);
+ resampler_, input.data(), &frames_used, output.data(), &frames_produced);
assert(err == 0);
- return {samples_used * num_channels_, samples_produced * num_channels_};
+ return {frames_used * num_channels_, frames_produced * num_channels_};
}
} // namespace audio
diff --git a/src/tangara/audio/resample.hpp b/src/tangara/audio/resample.hpp
index 4d48d47f..df285020 100644
--- a/src/tangara/audio/resample.hpp
+++ b/src/tangara/audio/resample.hpp
@@ -6,6 +6,7 @@
#pragma once
+#include <stdint.h>
#include <cstdint>
#include <span>
#include <vector>
@@ -24,6 +25,8 @@ class Resampler {
~Resampler();
+ auto sourceRate() -> uint32_t;
+
auto Process(std::span<sample::Sample> input,
std::span<sample::Sample> output,
bool end_of_data) -> std::pair<size_t, size_t>;