diff options
| author | jacqueline <me@jacqueline.id.au> | 2023-08-10 15:33:00 +1000 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2023-08-10 15:33:00 +1000 |
| commit | d8fc77101dcf80a3643a00b3446dca1e390ce997 (patch) | |
| tree | 9e03881f3857c7b4c6a0b6e3a062947daecc69d1 /src/audio | |
| parent | 67caeb6e3cda44205ba8fe783274b20dc7ea216e (diff) | |
| download | tangara-fw-d8fc77101dcf80a3643a00b3446dca1e390ce997.tar.gz | |
Give codecs complete control of their input files
Diffstat (limited to 'src/audio')
| -rw-r--r-- | src/audio/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/audio/audio_fsm.cpp | 4 | ||||
| -rw-r--r-- | src/audio/audio_task.cpp | 274 | ||||
| -rw-r--r-- | src/audio/fatfs_audio_input.cpp | 294 | ||||
| -rw-r--r-- | src/audio/fatfs_source.cpp | 70 | ||||
| -rw-r--r-- | src/audio/fatfs_source.hpp | 44 | ||||
| -rw-r--r-- | src/audio/i2s_audio_output.cpp | 25 | ||||
| -rw-r--r-- | src/audio/include/audio_fsm.hpp | 1 | ||||
| -rw-r--r-- | src/audio/include/audio_sink.hpp | 13 | ||||
| -rw-r--r-- | src/audio/include/audio_source.hpp | 24 | ||||
| -rw-r--r-- | src/audio/include/audio_task.hpp | 27 | ||||
| -rw-r--r-- | src/audio/include/fatfs_audio_input.hpp | 79 | ||||
| -rw-r--r-- | src/audio/include/i2s_audio_output.hpp | 6 | ||||
| -rw-r--r-- | src/audio/include/sink_mixer.hpp | 38 | ||||
| -rw-r--r-- | src/audio/sink_mixer.cpp | 263 |
15 files changed, 424 insertions, 739 deletions
diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index bd4ba32d..02e84c3f 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -6,6 +6,7 @@ idf_component_register( SRCS "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp" "stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "track_queue.cpp" "stream_event.cpp" "stream_info.cpp" "audio_fsm.cpp" "sink_mixer.cpp" "resample.cpp" + "fatfs_source.cpp" INCLUDE_DIRS "include" REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm" "database" "system_fsm" "playlist") diff --git a/src/audio/audio_fsm.cpp b/src/audio/audio_fsm.cpp index 79080c9a..617272b3 100644 --- a/src/audio/audio_fsm.cpp +++ b/src/audio/audio_fsm.cpp @@ -97,6 +97,10 @@ void Standby::react(const PlayFile& ev) { sFileSource->SetPath(ev.filename); } +void Playback::react(const PlayFile& ev) { + sFileSource->SetPath(ev.filename); +} + void Standby::react(const internal::InputFileOpened& ev) { transit<Playback>(); } diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp index 046df378..d880e6b1 100644 --- a/src/audio/audio_task.cpp +++ b/src/audio/audio_task.cpp @@ -46,6 +46,7 @@ #include "stream_message.hpp" #include "sys/_stdint.h" #include "tasks.hpp" +#include "track.hpp" #include "types.hpp" #include "ui_fsm.hpp" @@ -53,7 +54,7 @@ namespace audio { static const char* kTag = "audio_dec"; -static constexpr std::size_t kSampleBufferSize = 16 * 1024; +static constexpr std::size_t kCodecBufferLength = 240 * 4; Timer::Timer(const StreamInfo::Pcm& format, const Duration& duration) : format_(format), current_seconds_(0), current_sample_in_second_(0) { @@ -120,260 +121,69 @@ AudioTask::AudioTask(IAudioSource* source, IAudioSink* sink) : source_(source), sink_(sink), codec_(), - mixer_(new SinkMixer(sink->stream())), + mixer_(new SinkMixer(sink)), timer_(), - has_begun_decoding_(false), - current_input_format_(), - current_output_format_(), - codec_buffer_(new RawStream(kSampleBufferSize, MALLOC_CAP_8BIT)) {} + current_format_() { + codec_buffer_ = { + reinterpret_cast<sample::Sample*>(heap_caps_calloc( + kCodecBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kCodecBufferLength}; +} void AudioTask::Main() { for (;;) { - source_->Read( - [this](IAudioSource::Flags flags, InputStream& stream) -> void { - if (flags.is_start()) { - has_begun_decoding_ = false; - if (!HandleNewStream(stream)) { - return; - } - } - - auto pcm = stream.info().format_as<StreamInfo::Pcm>(); - if (pcm) { - if (ForwardPcmStream(*pcm, stream.data())) { - stream.consume(stream.data().size_bytes()); - } - return; - } - - if (!stream.info().format_as<StreamInfo::Encoded>() || !codec_) { - // Either unknown stream format, or it's encoded but we don't have - // a decoder that supports it. Either way, bail out. - return; - } - - if (!has_begun_decoding_) { - if (BeginDecoding(stream)) { - has_begun_decoding_ = true; - } else { - return; - } - } - - // At this point the decoder has been initialised, and the sink has - // been correctly configured. All that remains is to throw samples - // into the sink as fast as possible. - if (!ContinueDecoding(stream)) { - codec_.reset(); - } + if (source_->HasNewStream() || !stream_) { + std::shared_ptr<codecs::IStream> new_stream = source_->NextStream(); + if (new_stream && BeginDecoding(new_stream)) { + stream_ = new_stream; + } else { + continue; + } + } - if (flags.is_end()) { - FinishDecoding(stream); - events::Audio().Dispatch(internal::InputFileFinished{}); - } - }, - portMAX_DELAY); + if (ContinueDecoding()) { + events::Audio().Dispatch(internal::InputFileFinished{}); + stream_.reset(); + } } } -auto AudioTask::HandleNewStream(const InputStream& stream) -> bool { - // This must be a new stream of data. Reset everything to prepare to - // handle it. - current_input_format_ = stream.info().format(); - codec_.reset(); - - // What kind of data does this new stream contain? - auto pcm = stream.info().format_as<StreamInfo::Pcm>(); - auto encoded = stream.info().format_as<StreamInfo::Encoded>(); - if (pcm) { - // It's already decoded! We can always handle this. - return true; - } else if (encoded) { - // The stream has some kind of encoding. Whether or not we can - // handle it is entirely down to whether or not we have a codec for - // it. - has_begun_decoding_ = false; - auto codec = codecs::CreateCodecForType(encoded->type); - if (codec) { - ESP_LOGI(kTag, "successfully created codec for stream"); - codec_.reset(*codec); - return true; - } else { - ESP_LOGE(kTag, "stream has unknown encoding"); - return false; - } - } else { - // programmer error / skill issue :( - ESP_LOGE(kTag, "stream has unknown format"); +auto AudioTask::BeginDecoding(std::shared_ptr<codecs::IStream> stream) -> bool { + codec_.reset(codecs::CreateCodecForType(stream->type()).value_or(nullptr)); + if (!codec_) { + ESP_LOGE(kTag, "no codec found"); return false; } -} - -auto AudioTask::BeginDecoding(InputStream& stream) -> bool { - auto res = codec_->BeginStream(stream.data()); - stream.consume(res.first); - if (res.second.has_error()) { - if (res.second.error() == codecs::ICodec::Error::kOutOfInput) { - // Running out of input is fine; just return and we will try beginning the - // stream again when we have more data. - return false; - } - // Decoding the header failed, so we can't actually deal with this stream - // after all. It could be malformed. - ESP_LOGE(kTag, "error beginning stream"); - codec_.reset(); + auto open_res = codec_->OpenStream(stream); + if (open_res.has_error()) { + ESP_LOGE(kTag, "codec failed to start: %s", + codecs::ICodec::ErrorString(open_res.error()).c_str()); return false; } - codecs::ICodec::OutputFormat format = res.second.value(); - StreamInfo::Pcm new_format{ - .channels = format.num_channels, + current_sink_format_ = IAudioSink::Format{ + .sample_rate = open_res->sample_rate_hz, + .num_channels = open_res->num_channels, .bits_per_sample = 32, - .sample_rate = format.sample_rate_hz, }; - - Duration duration; - if (format.duration_seconds) { - duration.src = Duration::Source::kCodec; - duration.duration = *format.duration_seconds; - } else if (stream.info().total_length_seconds()) { - duration.src = Duration::Source::kLibTags; - duration.duration = *stream.info().total_length_seconds(); - } else { - duration.src = Duration::Source::kFileSize; - duration.duration = *stream.info().total_length_bytes(); - } - - if (!ConfigureSink(new_format, duration)) { - return false; - } - - OutputStream writer{codec_buffer_.get()}; - writer.prepare(new_format, {}); - - return true; -} - -auto AudioTask::ContinueDecoding(InputStream& stream) -> bool { - while (!stream.data().empty()) { - OutputStream writer{codec_buffer_.get()}; - - auto res = - codec_->ContinueStream(stream.data(), writer.data_as<sample::Sample>()); - - stream.consume(res.first); - - if (res.second.has_error()) { - if (res.second.error() == codecs::ICodec::Error::kOutOfInput) { - return true; - } else { - return false; - } - } else { - writer.add(res.second->samples_written * sizeof(sample::Sample)); - - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - } - } + ESP_LOGI(kTag, "stream started ok"); + events::Audio().Dispatch(internal::InputFileOpened{}); return true; } -auto AudioTask::FinishDecoding(InputStream& stream) -> void { - // HACK: libmad requires each frame passed to it to have an additional - // MAD_HEADER_GUARD (8) bytes after the end of the frame. Without these extra - // bytes, it will not decode the frame. - // The is fine for most of the stream, but at the end of the stream we don't - // get a trailing 8 bytes for free. - if (stream.info().format_as<StreamInfo::Encoded>()->type == - codecs::StreamType::kMp3) { - ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix"); - - std::unique_ptr<RawStream> mad_buffer; - mad_buffer.reset(new RawStream(stream.data().size_bytes() + 8)); - - OutputStream mad_writer{mad_buffer.get()}; - std::copy(stream.data().begin(), stream.data().end(), - mad_writer.data().begin()); - std::fill(mad_writer.data().begin(), mad_writer.data().end(), std::byte{0}); - InputStream padded_stream{mad_buffer.get()}; - - OutputStream writer{codec_buffer_.get()}; - auto res = - codec_->ContinueStream(stream.data(), writer.data_as<sample::Sample>()); - if (res.second.has_error()) { - return; - } - - writer.add(res.second->samples_written * sizeof(sample::Sample)); - - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - } -} - -auto AudioTask::ForwardPcmStream(StreamInfo::Pcm& format, - cpp::span<const std::byte> samples) -> bool { - // First we need to reconfigure the sink for this sample format. - if (format != current_output_format_) { - Duration d{ - .src = Duration::Source::kFileSize, - .duration = samples.size_bytes(), - }; - if (!ConfigureSink(format, d)) { - return false; - } +auto AudioTask::ContinueDecoding() -> bool { + auto res = codec_->DecodeTo(codec_buffer_); + if (res.has_error()) { + return true; } - // Stream the raw samples directly to the sink. - xStreamBufferSend(sink_->stream(), samples.data(), samples.size_bytes(), - portMAX_DELAY); - timer_->AddBytes(samples.size_bytes()); - InputStream reader{codec_buffer_.get()}; - SendToSink(reader); - - return true; -} - -auto AudioTask::ConfigureSink(const StreamInfo::Pcm& format, - const Duration& duration) -> bool { - if (format != current_output_format_) { - current_output_format_ = format; - StreamInfo::Pcm new_sink_format = sink_->PrepareFormat(format); - if (new_sink_format != current_sink_format_) { - current_sink_format_ = new_sink_format; - - // The new format is different to the old one. Wait for the sink to drain - // before continuing. - while (!xStreamBufferIsEmpty(sink_->stream())) { - ESP_LOGI(kTag, "waiting for sink stream to drain..."); - // TODO(jacqueline): Get the sink drain ISR to notify us of this - // via semaphore instead of busy-ish waiting. - vTaskDelay(pdMS_TO_TICKS(10)); - } - - ESP_LOGI(kTag, "configuring sink"); - sink_->Configure(new_sink_format); - } + if (res->samples_written > 0) { + mixer_->MixAndSend(codec_buffer_.first(res->samples_written), + current_sink_format_.value(), res->is_stream_finished); } - current_output_format_ = format; - timer_.reset(new Timer(format, duration)); - return true; -} - -auto AudioTask::SendToSink(InputStream& stream) -> void { - std::size_t bytes_to_send = stream.data().size_bytes(); - std::size_t bytes_sent; - if (stream.info().format_as<StreamInfo::Pcm>() == current_sink_format_) { - bytes_sent = xStreamBufferSend(sink_->stream(), stream.data().data(), - bytes_to_send, portMAX_DELAY); - stream.consume(bytes_sent); - } else { - bytes_sent = mixer_->MixAndSend(stream, current_sink_format_.value()); - } - timer_->AddBytes(bytes_sent); + return res->is_stream_finished; } } // namespace audio diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index d5194821..cae552db 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -19,8 +19,10 @@ #include <string> #include <variant> +#include "codec.hpp" #include "esp_heap_caps.h" #include "esp_log.h" +#include "fatfs_source.hpp" #include "ff.h" #include "audio_events.hpp" @@ -41,294 +43,124 @@ static const char* kTag = "SRC"; namespace audio { -static constexpr UINT kFileBufferSize = 8 * 1024; -static constexpr UINT kStreamerBufferSize = 64 * 1024; - -static StreamBufferHandle_t sForwardDest = nullptr; - -auto forward_cb(const BYTE* buf, UINT buf_length) -> UINT { - if (buf_length == 0) { - return !xStreamBufferIsFull(sForwardDest); - } else { - return xStreamBufferSend(sForwardDest, buf, buf_length, 0); - } -} - -FileStreamer::FileStreamer(StreamBufferHandle_t dest, - SemaphoreHandle_t data_was_read) - : control_(xQueueCreate(1, sizeof(Command))), - destination_(dest), - data_was_read_(data_was_read), - has_data_(false), - file_(), - next_file_() { - assert(sForwardDest == nullptr); - sForwardDest = dest; - tasks::StartPersistent<tasks::Type::kFileStreamer>([this]() { Main(); }); -} - -FileStreamer::~FileStreamer() { - sForwardDest = nullptr; - Command quit = kQuit; - xQueueSend(control_, &quit, portMAX_DELAY); - vQueueDelete(control_); -} - -auto FileStreamer::Main() -> void { - for (;;) { - Command cmd; - xQueueReceive(control_, &cmd, portMAX_DELAY); - - if (cmd == kQuit) { - break; - } else if (cmd == kRestart) { - CloseFile(); - xStreamBufferReset(destination_); - file_ = std::move(next_file_); - has_data_ = file_ != nullptr; - } else if (cmd == kRefillBuffer && file_) { - UINT bytes_sent = 0; // Unused. - // Use f_forward to push bytes directly from FATFS internal buffers into - // the destination. This has the nice side effect of letting FATFS decide - // the most efficient way to pull in data from disk; usually one whole - // sector at a time. Consult the FATFS lib application notes if changing - // this to use f_read. - FRESULT res = f_forward(file_.get(), forward_cb, UINT_MAX, &bytes_sent); - if (res != FR_OK || f_eof(file_.get())) { - CloseFile(); - has_data_ = false; - } - if (bytes_sent > 0) { - xSemaphoreGive(data_was_read_); - } - } - } - - ESP_LOGW(kTag, "quit file streamer"); - CloseFile(); - vTaskDelete(NULL); -} - -auto FileStreamer::Fetch() -> void { - if (!has_data_.load()) { - return; - } - Command refill = kRefillBuffer; - xQueueSend(control_, &refill, portMAX_DELAY); -} - -auto FileStreamer::HasFinished() -> bool { - return !has_data_.load(); -} - -auto FileStreamer::Restart(std::unique_ptr<FIL> new_file) -> void { - next_file_ = std::move(new_file); - Command restart = kRestart; - xQueueSend(control_, &restart, portMAX_DELAY); - Command fill = kRefillBuffer; - xQueueSend(control_, &fill, portMAX_DELAY); -} - -auto FileStreamer::CloseFile() -> void { - if (!file_) { - return; - } - ESP_LOGI(kTag, "closing file"); - f_close(file_.get()); - file_ = {}; - events::Audio().Dispatch(internal::InputFileClosed{}); -} - FatfsAudioInput::FatfsAudioInput( std::shared_ptr<database::ITagParser> tag_parser) : IAudioSource(), tag_parser_(tag_parser), - has_data_(xSemaphoreCreateBinary()), - streamer_buffer_(xStreamBufferCreateWithCaps(kStreamerBufferSize, - 1, - MALLOC_CAP_SPIRAM)), - streamer_(new FileStreamer(streamer_buffer_, has_data_)), - input_buffer_(new RawStream(kFileBufferSize)), - source_mutex_(), - pending_path_(), - is_first_read_(false) {} + new_stream_mutex_(), + new_stream_(), + has_new_stream_(xSemaphoreCreateBinary()), + pending_path_() {} FatfsAudioInput::~FatfsAudioInput() { - streamer_.reset(); - vStreamBufferDelete(streamer_buffer_); - vSemaphoreDelete(has_data_); + vSemaphoreDelete(has_new_stream_); } auto FatfsAudioInput::SetPath(std::future<std::optional<std::string>> fut) -> void { - std::lock_guard<std::mutex> lock{source_mutex_}; - - CloseCurrentFile(); + std::lock_guard<std::mutex> guard{new_stream_mutex_}; pending_path_.reset( new database::FutureFetcher<std::optional<std::string>>(std::move(fut))); - xSemaphoreGive(has_data_); + xSemaphoreGive(has_new_stream_); } auto FatfsAudioInput::SetPath(const std::string& path) -> void { - std::lock_guard<std::mutex> lock{source_mutex_}; - - CloseCurrentFile(); - OpenFile(path); + std::lock_guard<std::mutex> guard{new_stream_mutex_}; + if (OpenFile(path)) { + xSemaphoreGive(has_new_stream_); + } } auto FatfsAudioInput::SetPath() -> void { - std::lock_guard<std::mutex> lock{source_mutex_}; - CloseCurrentFile(); + std::lock_guard<std::mutex> guard{new_stream_mutex_}; + new_stream_.reset(); + xSemaphoreGive(has_new_stream_); } -auto FatfsAudioInput::Read(std::function<void(Flags, InputStream&)> read_cb, - TickType_t max_wait) -> void { - // Wait until we have data to return. - xSemaphoreTake(has_data_, portMAX_DELAY); - - // Ensure the file doesn't change whilst we're trying to get data about it. - std::lock_guard<std::mutex> source_lock{source_mutex_}; - - // If the path is a future, then wait for it to complete. - // TODO(jacqueline): We should really make some kind of FreeRTOS-integrated - // way to block a task whilst awaiting a future. - if (pending_path_) { - while (!pending_path_->Finished()) { - vTaskDelay(pdMS_TO_TICKS(100)); - } - auto res = pending_path_->Result(); - pending_path_.reset(); - - if (res && *res) { - OpenFile(**res); - } - - // Bail out now that we've resolved the future. If we end up successfully - // readinig from the path, then has_data will be flagged again. - return; +auto FatfsAudioInput::HasNewStream() -> bool { + bool res = xSemaphoreTake(has_new_stream_, 0); + if (res) { + xSemaphoreGive(has_new_stream_); } + return res; +} + +auto FatfsAudioInput::NextStream() -> std::shared_ptr<codecs::IStream> { + while (true) { + xSemaphoreTake(has_new_stream_, portMAX_DELAY); + + { + std::lock_guard<std::mutex> guard{new_stream_mutex_}; + // If the path is a future, then wait for it to complete. + // TODO(jacqueline): We should really make some kind of + // FreeRTOS-integrated way to block a task whilst awaiting a future. + if (pending_path_) { + while (!pending_path_->Finished()) { + vTaskDelay(pdMS_TO_TICKS(100)); + } + auto res = pending_path_->Result(); + pending_path_.reset(); + + if (res && *res) { + OpenFile(**res); + } + } - // Move data from the file streamer's buffer into our file buffer. We need our - // own buffer so that we can handle concatenating smaller file chunks into - // complete frames for the decoder. - OutputStream writer{input_buffer_.get()}; - std::size_t bytes_added = - xStreamBufferReceive(streamer_buffer_, writer.data().data(), - writer.data().size_bytes(), pdMS_TO_TICKS(0)); - writer.add(bytes_added); - - bool has_data_remaining = HasDataRemaining(); - - InputStream reader{input_buffer_.get()}; - auto data_for_cb = reader.data(); - if (!data_for_cb.empty()) { - std::invoke(read_cb, Flags{is_first_read_, !has_data_remaining}, reader); - is_first_read_ = false; - } + if (new_stream_ == nullptr) { + continue; + } - if (!has_data_remaining) { - // Out of data. We're finished. Note we don't care about anything left in - // the file buffer at this point; the callback as seen it, so if it didn't - // consume it then presumably whatever is left isn't enough to form a - // complete frame. - ESP_LOGI(kTag, "finished streaming file"); - CloseCurrentFile(); - } else { - // There is still data to be read, or sitting in the buffer. - streamer_->Fetch(); - xSemaphoreGive(has_data_); + auto stream = new_stream_; + new_stream_ = nullptr; + return stream; + } } } -auto FatfsAudioInput::OpenFile(const std::string& path) -> void { +auto FatfsAudioInput::OpenFile(const std::string& path) -> bool { ESP_LOGI(kTag, "opening file %s", path.c_str()); - FILINFO info; - if (f_stat(path.c_str(), &info) != FR_OK) { - ESP_LOGE(kTag, "failed to stat file"); - return; - } - database::TrackTags tags; if (!tag_parser_->ReadAndParseTags(path, &tags)) { ESP_LOGE(kTag, "failed to read tags"); - return; + return false; } auto stream_type = ContainerToStreamType(tags.encoding()); if (!stream_type.has_value()) { ESP_LOGE(kTag, "couldn't match container to stream"); - return; - } - - StreamInfo::Format format; - if (*stream_type == codecs::StreamType::kPcm) { - if (tags.channels && tags.bits_per_sample && tags.channels) { - format = StreamInfo::Pcm{ - .channels = static_cast<uint8_t>(*tags.channels), - .bits_per_sample = static_cast<uint8_t>(*tags.bits_per_sample), - .sample_rate = static_cast<uint32_t>(*tags.sample_rate)}; - } else { - ESP_LOGW(kTag, "pcm stream missing format info"); - return; - } - } else { - format = StreamInfo::Encoded{.type = *stream_type}; + return false; } std::unique_ptr<FIL> file = std::make_unique<FIL>(); FRESULT res = f_open(file.get(), path.c_str(), FA_READ); if (res != FR_OK) { ESP_LOGE(kTag, "failed to open file! res: %i", res); - return; - } - - OutputStream writer{input_buffer_.get()}; - writer.prepare(format, info.fsize); - if (tags.duration) { - writer.info().total_length_seconds() = *tags.duration; + return false; } - streamer_->Restart(std::move(file)); - is_first_read_ = true; - events::Audio().Dispatch(internal::InputFileOpened{}); + new_stream_.reset(new FatfsSource(stream_type.value(), std::move(file))); + return true; } -auto FatfsAudioInput::CloseCurrentFile() -> void { - streamer_->Restart({}); - xStreamBufferReset(streamer_buffer_); -} - -auto FatfsAudioInput::HasDataRemaining() -> bool { - return !streamer_->HasFinished() || !xStreamBufferIsEmpty(streamer_buffer_); -} - -auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc) +auto FatfsAudioInput::ContainerToStreamType(database::Container enc) -> std::optional<codecs::StreamType> { switch (enc) { - case database::Encoding::kMp3: + case database::Container::kMp3: return codecs::StreamType::kMp3; - case database::Encoding::kWav: + case database::Container::kWav: return codecs::StreamType::kPcm; - case database::Encoding::kOgg: + case database::Container::kOgg: return codecs::StreamType::kVorbis; - case database::Encoding::kFlac: + case database::Container::kFlac: return codecs::StreamType::kFlac; - case database::Encoding::kOpus: + case database::Container::kOpus: return codecs::StreamType::kOpus; - case database::Encoding::kUnsupported: + case database::Container::kUnsupported: default: return {}; } } -auto FatfsAudioInput::IsCurrentFormatMp3() -> bool { - auto format = input_buffer_->info().format_as<StreamInfo::Encoded>(); - if (!format) { - return false; - } - return format->type == codecs::StreamType::kMp3; -} - } // namespace audio diff --git a/src/audio/fatfs_source.cpp b/src/audio/fatfs_source.cpp new file mode 100644 index 00000000..6a9aea47 --- /dev/null +++ b/src/audio/fatfs_source.cpp @@ -0,0 +1,70 @@ +/* + * Copyright 2023 jacqueline <me@jacqueline.id.au> + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#include "fatfs_source.hpp" +#include <sys/_stdint.h> + +#include <cstddef> +#include <cstdint> +#include <memory> + +#include "esp_log.h" +#include "ff.h" + +#include "audio_source.hpp" +#include "codec.hpp" +#include "types.hpp" + +namespace audio { + +static constexpr char kTag[] = "fatfs_src"; + +FatfsSource::FatfsSource(codecs::StreamType t, std::unique_ptr<FIL> file) + : IStream(t), file_(std::move(file)) {} + +FatfsSource::~FatfsSource() { + f_close(file_.get()); +} + +auto FatfsSource::Read(cpp::span<std::byte> dest) -> ssize_t { + if (f_eof(file_.get())) { + ESP_LOGI(kTag, "read from empty file"); + return 0; + } + UINT bytes_read = 0; + FRESULT res = f_read(file_.get(), dest.data(), dest.size(), &bytes_read); + if (res != FR_OK) { + ESP_LOGE(kTag, "error reading from file"); + return -1; + } + ESP_LOGI(kTag, "read %u bytes into %p (%u)", bytes_read, dest.data(), + dest.size_bytes()); + return bytes_read; +} + +auto FatfsSource::CanSeek() -> bool { + return true; +} + +auto FatfsSource::SeekTo(int64_t destination, SeekFrom from) -> void { + ESP_LOGI(kTag, "seeking to %llu", destination); + switch (from) { + case SeekFrom::kStartOfStream: + f_lseek(file_.get(), destination); + break; + case SeekFrom::kEndOfStream: + f_lseek(file_.get(), f_size(file_.get()) + destination); + break; + case SeekFrom::kCurrentPosition: + f_lseek(file_.get(), f_tell(file_.get()) + destination); + break; + } +} + +auto FatfsSource::CurrentPosition() -> int64_t { + return f_tell(file_.get()); +} +} // namespace audio diff --git a/src/audio/fatfs_source.hpp b/src/audio/fatfs_source.hpp new file mode 100644 index 00000000..e4187d60 --- /dev/null +++ b/src/audio/fatfs_source.hpp @@ -0,0 +1,44 @@ +/* + * Copyright 2023 jacqueline <me@jacqueline.id.au> + * + * SPDX-License-Identifier: GPL-3.0-only + */ + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> + +#include "codec.hpp" +#include "ff.h" + +#include "audio_source.hpp" + +namespace audio { + +/* + * Handles coordination with a persistent background task to asynchronously + * read files from disk into a StreamBuffer. + */ +class FatfsSource : public codecs::IStream { + public: + FatfsSource(codecs::StreamType, std::unique_ptr<FIL> file); + ~FatfsSource(); + + auto Read(cpp::span<std::byte> dest) -> ssize_t override; + + auto CanSeek() -> bool override; + + auto SeekTo(int64_t destination, SeekFrom from) -> void override; + + auto CurrentPosition() -> int64_t override; + + FatfsSource(const FatfsSource&) = delete; + FatfsSource& operator=(const FatfsSource&) = delete; + + private: + std::unique_ptr<FIL> file_; +}; + +} // namespace audio
\ No newline at end of file diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp index e53dbe2a..b7fcf104 100644 --- a/src/audio/i2s_audio_output.cpp +++ b/src/audio/i2s_audio_output.cpp @@ -115,26 +115,25 @@ auto I2SAudioOutput::AdjustVolumeDown() -> bool { return true; } -auto I2SAudioOutput::PrepareFormat(const StreamInfo::Pcm& orig) - -> StreamInfo::Pcm { - return StreamInfo::Pcm{ - .channels = std::min<uint8_t>(orig.channels, 2), - .bits_per_sample = std::clamp<uint8_t>(orig.bits_per_sample, 16, 32), +auto I2SAudioOutput::PrepareFormat(const Format& orig) -> Format { + return Format{ .sample_rate = std::clamp<uint32_t>(orig.sample_rate, 8000, 96000), + .num_channels = std::min<uint8_t>(orig.num_channels, 2), + .bits_per_sample = std::clamp<uint8_t>(orig.bits_per_sample, 16, 32), }; } -auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { - if (current_config_ && pcm == *current_config_) { +auto I2SAudioOutput::Configure(const Format& fmt) -> void { + if (current_config_ && fmt == *current_config_) { ESP_LOGI(kTag, "ignoring unchanged format"); return; } - ESP_LOGI(kTag, "incoming audio stream: %u ch %u bpp @ %lu Hz", pcm.channels, - pcm.bits_per_sample, pcm.sample_rate); + ESP_LOGI(kTag, "incoming audio stream: %u ch %u bpp @ %lu Hz", + fmt.num_channels, fmt.bits_per_sample, fmt.sample_rate); drivers::I2SDac::Channels ch; - switch (pcm.channels) { + switch (fmt.num_channels) { case 1: ch = drivers::I2SDac::CHANNELS_MONO; break; @@ -147,7 +146,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } drivers::I2SDac::BitsPerSample bps; - switch (pcm.bits_per_sample) { + switch (fmt.bits_per_sample) { case 16: bps = drivers::I2SDac::BPS_16; break; @@ -163,7 +162,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } drivers::I2SDac::SampleRate sample_rate; - switch (pcm.sample_rate) { + switch (fmt.sample_rate) { case 8000: sample_rate = drivers::I2SDac::SAMPLE_RATE_8; break; @@ -188,7 +187,7 @@ auto I2SAudioOutput::Configure(const StreamInfo::Pcm& pcm) -> void { } dac_->Reconfigure(ch, bps, sample_rate); - current_config_ = pcm; + current_config_ = fmt; } } // namespace audio diff --git a/src/audio/include/audio_fsm.hpp b/src/audio/include/audio_fsm.hpp index cc3dae0e..d10f31e1 100644 --- a/src/audio/include/audio_fsm.hpp +++ b/src/audio/include/audio_fsm.hpp @@ -95,6 +95,7 @@ class Playback : public AudioState { void entry() override; void exit() override; + void react(const PlayFile&) override; void react(const QueueUpdate&) override; void react(const PlaybackUpdate&) override; diff --git a/src/audio/include/audio_sink.hpp b/src/audio/include/audio_sink.hpp index 28acdc31..2fb4bf63 100644 --- a/src/audio/include/audio_sink.hpp +++ b/src/audio/include/audio_sink.hpp @@ -7,6 +7,7 @@ #pragma once #include <stdint.h> +#include <cstdint> #include "audio_element.hpp" #include "esp_heap_caps.h" #include "freertos/FreeRTOS.h" @@ -37,8 +38,16 @@ class IAudioSink { virtual auto AdjustVolumeUp() -> bool = 0; virtual auto AdjustVolumeDown() -> bool = 0; - virtual auto PrepareFormat(const StreamInfo::Pcm&) -> StreamInfo::Pcm = 0; - virtual auto Configure(const StreamInfo::Pcm& format) -> void = 0; + struct Format { + uint32_t sample_rate; + uint_fast8_t num_channels; + uint_fast8_t bits_per_sample; + + bool operator==(const Format&) const = default; + }; + + virtual auto PrepareFormat(const Format&) -> Format = 0; + virtual auto Configure(const Format& format) -> void = 0; auto stream() -> StreamBufferHandle_t { return stream_; } }; diff --git a/src/audio/include/audio_source.hpp b/src/audio/include/audio_source.hpp index 055a92cd..6c54a882 100644 --- a/src/audio/include/audio_source.hpp +++ b/src/audio/include/audio_source.hpp @@ -15,7 +15,10 @@ #include "freertos/portmacro.h" #include "freertos/semphr.h" +#include "codec.hpp" #include "stream_info.hpp" +#include "track.hpp" +#include "types.hpp" namespace audio { @@ -23,25 +26,8 @@ class IAudioSource { public: virtual ~IAudioSource() {} - class Flags { - public: - Flags(bool is_start, bool is_end) { - flags_[0] = is_start; - flags_[1] = is_end; - } - - auto is_start() -> bool { return flags_[0]; } - auto is_end() -> bool { return flags_[1]; } - - private: - std::bitset<2> flags_; - }; - - /* - * Synchronously fetches data from this source. - */ - virtual auto Read(std::function<void(Flags, InputStream&)>, TickType_t) - -> void = 0; + virtual auto HasNewStream() -> bool = 0; + virtual auto NextStream() -> std::shared_ptr<codecs::IStream> = 0; }; } // namespace audio diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp index b27aa039..48f5502c 100644 --- a/src/audio/include/audio_task.hpp +++ b/src/audio/include/audio_task.hpp @@ -16,6 +16,8 @@ #include "pipeline.hpp" #include "sink_mixer.hpp" #include "stream_info.hpp" +#include "track.hpp" +#include "types.hpp" namespace audio { @@ -52,32 +54,27 @@ class AudioTask { auto Main() -> void; + AudioTask(const AudioTask&) = delete; + AudioTask& operator=(const AudioTask&) = delete; + private: AudioTask(IAudioSource* source, IAudioSink* sink); - auto HandleNewStream(const InputStream&) -> bool; - - auto BeginDecoding(InputStream&) -> bool; - auto ContinueDecoding(InputStream&) -> bool; - auto FinishDecoding(InputStream&) -> void; - - auto ForwardPcmStream(StreamInfo::Pcm&, cpp::span<const std::byte>) -> bool; - - auto ConfigureSink(const StreamInfo::Pcm&, const Duration&) -> bool; - auto SendToSink(InputStream&) -> void; + auto BeginDecoding(std::shared_ptr<codecs::IStream>) -> bool; + auto ContinueDecoding() -> bool; IAudioSource* source_; IAudioSink* sink_; + + std::shared_ptr<codecs::IStream> stream_; std::unique_ptr<codecs::ICodec> codec_; std::unique_ptr<SinkMixer> mixer_; std::unique_ptr<Timer> timer_; - bool has_begun_decoding_; - std::optional<StreamInfo::Format> current_input_format_; - std::optional<StreamInfo::Pcm> current_output_format_; - std::optional<StreamInfo::Pcm> current_sink_format_; + std::optional<codecs::ICodec::OutputFormat> current_format_; + std::optional<IAudioSink::Format> current_sink_format_; - std::unique_ptr<RawStream> codec_buffer_; + cpp::span<sample::Sample> codec_buffer_; }; } // namespace audio diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp index e13e49e2..df40696a 100644 --- a/src/audio/include/fatfs_audio_input.hpp +++ b/src/audio/include/fatfs_audio_input.hpp @@ -12,6 +12,7 @@ #include <memory> #include <string> +#include "codec.hpp" #include "ff.h" #include "audio_source.hpp" @@ -24,54 +25,6 @@ namespace audio { /* - * Handles coordination with a persistent background task to asynchronously - * read files from disk into a StreamBuffer. - */ -class FileStreamer { - public: - FileStreamer(StreamBufferHandle_t dest, SemaphoreHandle_t first_read); - ~FileStreamer(); - - /* - * Continues reading data into the destination buffer until the destination - * is full. - */ - auto Fetch() -> void; - - /* Returns true if the streamer has run out of data from the current file. */ - auto HasFinished() -> bool; - - /* - * Clears any remaining buffered data, and begins reading again from the - * given file. This function respects any seeking/reading that has already - * been done on the new source file. - */ - auto Restart(std::unique_ptr<FIL>) -> void; - - FileStreamer(const FileStreamer&) = delete; - FileStreamer& operator=(const FileStreamer&) = delete; - - private: - // Note: private methods here should only be called from the streamer's task. - - auto Main() -> void; - auto CloseFile() -> void; - - enum Command { - kRestart, - kRefillBuffer, - kQuit, - }; - QueueHandle_t control_; - StreamBufferHandle_t destination_; - SemaphoreHandle_t data_was_read_; - - std::atomic<bool> has_data_; - std::unique_ptr<FIL> file_; - std::unique_ptr<FIL> next_file_; -}; - -/* * Audio source that fetches data from a FatFs (or exfat i guess) filesystem. * * All public methods are safe to call from any task. @@ -89,43 +42,27 @@ class FatfsAudioInput : public IAudioSource { auto SetPath(const std::string&) -> void; auto SetPath() -> void; - auto Read(std::function<void(Flags, InputStream&)>, TickType_t) - -> void override; + auto HasNewStream() -> bool override; + auto NextStream() -> std::shared_ptr<codecs::IStream> override; FatfsAudioInput(const FatfsAudioInput&) = delete; FatfsAudioInput& operator=(const FatfsAudioInput&) = delete; private: - // Note: private methods assume that the appropriate locks have already been - // acquired. - - auto OpenFile(const std::string& path) -> void; - auto CloseCurrentFile() -> void; - auto HasDataRemaining() -> bool; + auto OpenFile(const std::string& path) -> bool; - auto ContainerToStreamType(database::Encoding) + auto ContainerToStreamType(database::Container) -> std::optional<codecs::StreamType>; - auto IsCurrentFormatMp3() -> bool; std::shared_ptr<database::ITagParser> tag_parser_; - // Semaphore used to block when this source is out of data. This should be - // acquired before attempting to read data, and returned after each incomplete - // read. - SemaphoreHandle_t has_data_; - - StreamBufferHandle_t streamer_buffer_; - std::unique_ptr<FileStreamer> streamer_; - - std::unique_ptr<RawStream> input_buffer_; + std::mutex new_stream_mutex_; + std::shared_ptr<codecs::IStream> new_stream_; - // Mutex guarding the current file/stream associated with this source. Must be - // held during readings, and before altering the current file. - std::mutex source_mutex_; + SemaphoreHandle_t has_new_stream_; std::unique_ptr<database::FutureFetcher<std::optional<std::string>>> pending_path_; - bool is_first_read_; }; } // namespace audio diff --git a/src/audio/include/i2s_audio_output.hpp b/src/audio/include/i2s_audio_output.hpp index e0f791c5..717e6519 100644 --- a/src/audio/include/i2s_audio_output.hpp +++ b/src/audio/include/i2s_audio_output.hpp @@ -35,8 +35,8 @@ class I2SAudioOutput : public IAudioSink { auto AdjustVolumeUp() -> bool override; auto AdjustVolumeDown() -> bool override; - auto PrepareFormat(const StreamInfo::Pcm&) -> StreamInfo::Pcm override; - auto Configure(const StreamInfo::Pcm& format) -> void override; + auto PrepareFormat(const Format&) -> Format override; + auto Configure(const Format& format) -> void override; I2SAudioOutput(const I2SAudioOutput&) = delete; I2SAudioOutput& operator=(const I2SAudioOutput&) = delete; @@ -45,7 +45,7 @@ class I2SAudioOutput : public IAudioSink { drivers::IGpios* expander_; std::shared_ptr<drivers::I2SDac> dac_; - std::optional<StreamInfo::Pcm> current_config_; + std::optional<Format> current_config_; int_fast8_t left_difference_; uint16_t current_volume_; uint16_t max_volume_; diff --git a/src/audio/include/sink_mixer.hpp b/src/audio/include/sink_mixer.hpp index d1e9aa8a..b4d25781 100644 --- a/src/audio/include/sink_mixer.hpp +++ b/src/audio/include/sink_mixer.hpp @@ -28,44 +28,42 @@ namespace audio { */ class SinkMixer { public: - SinkMixer(StreamBufferHandle_t dest); + SinkMixer(IAudioSink* sink); ~SinkMixer(); - auto MixAndSend(InputStream&, const StreamInfo::Pcm&) -> std::size_t; + auto MixAndSend(cpp::span<sample::Sample>, + const IAudioSink::Format& format, + bool is_eos) -> void; private: auto Main() -> void; auto SetTargetFormat(const StreamInfo::Pcm& format) -> void; - auto HandleBytes() -> void; + auto HandleSamples(cpp::span<sample::Sample>, bool) -> size_t; - auto Resample(InputStream&, OutputStream&) -> bool; auto ApplyDither(cpp::span<sample::Sample> samples, uint_fast8_t bits) -> void; - auto Downscale(cpp::span<sample::Sample>, cpp::span<int16_t>) -> void; - - enum class Command { - kReadBytes, - kSetSourceFormat, - kSetTargetFormat, - }; struct Args { - Command cmd; - StreamInfo::Pcm format; + IAudioSink::Format format; + size_t samples_available; + bool is_end_of_stream; }; - QueueHandle_t commands_; - SemaphoreHandle_t is_idle_; std::unique_ptr<Resampler> resampler_; - std::unique_ptr<RawStream> input_stream_; - std::unique_ptr<RawStream> resampled_stream_; - - StreamInfo::Pcm target_format_; StreamBufferHandle_t source_; - StreamBufferHandle_t sink_; + cpp::span<sample::Sample> input_buffer_; + cpp::span<std::byte> input_buffer_as_bytes_; + + cpp::span<sample::Sample> resampled_buffer_; + + IAudioSink* sink_; + IAudioSink::Format source_format_; + IAudioSink::Format target_format_; + size_t leftover_bytes_; + size_t leftover_offset_; }; } // namespace audio diff --git a/src/audio/sink_mixer.cpp b/src/audio/sink_mixer.cpp index 6c72c8b0..9f973d4b 100644 --- a/src/audio/sink_mixer.cpp +++ b/src/audio/sink_mixer.cpp @@ -7,6 +7,7 @@ #include "sink_mixer.hpp" #include <stdint.h> +#include <algorithm> #include <cmath> #include "esp_heap_caps.h" @@ -23,20 +24,28 @@ static constexpr char kTag[] = "mixer"; static constexpr std::size_t kSourceBufferLength = 8 * 1024; -static constexpr std::size_t kSampleBufferLength = 240 * 2 * sizeof(int32_t); +static constexpr std::size_t kSampleBufferLength = 240 * 2; namespace audio { -SinkMixer::SinkMixer(StreamBufferHandle_t dest) +SinkMixer::SinkMixer(IAudioSink* sink) : commands_(xQueueCreate(1, sizeof(Args))), - is_idle_(xSemaphoreCreateBinary()), resampler_(nullptr), source_(xStreamBufferCreateWithCaps(kSourceBufferLength, 1, MALLOC_CAP_SPIRAM)), - sink_(dest) { - input_stream_.reset(new RawStream(kSampleBufferLength)); - resampled_stream_.reset(new RawStream(kSampleBufferLength)); + sink_(sink) { + input_buffer_ = { + reinterpret_cast<sample::Sample*>(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; + input_buffer_as_bytes_ = {reinterpret_cast<std::byte*>(input_buffer_.data()), + input_buffer_.size_bytes()}; + + resampled_buffer_ = { + reinterpret_cast<sample::Sample*>(heap_caps_calloc( + kSampleBufferLength, sizeof(sample::Sample), MALLOC_CAP_SPIRAM)), + kSampleBufferLength}; // Pin to CORE0 because we need the FPU. // FIXME: A fixed point implementation could run freely on either core, @@ -46,168 +55,156 @@ SinkMixer::SinkMixer(StreamBufferHandle_t dest) SinkMixer::~SinkMixer() { vQueueDelete(commands_); - vSemaphoreDelete(is_idle_); vStreamBufferDelete(source_); } -auto SinkMixer::MixAndSend(InputStream& input, const StreamInfo::Pcm& target) - -> std::size_t { - if (input.info().format_as<StreamInfo::Pcm>() != - input_stream_->info().format_as<StreamInfo::Pcm>()) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetSourceFormat, - .format = input.info().format_as<StreamInfo::Pcm>().value(), - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - if (target_format_ != target) { - xSemaphoreTake(is_idle_, portMAX_DELAY); - Args args{ - .cmd = Command::kSetTargetFormat, - .format = target, - }; - xQueueSend(commands_, &args, portMAX_DELAY); - xSemaphoreGive(is_idle_); - } - +auto SinkMixer::MixAndSend(cpp::span<sample::Sample> input, + const IAudioSink::Format& format, + bool is_eos) -> void { Args args{ - .cmd = Command::kReadBytes, - .format = {}, + .format = format, + .samples_available = input.size(), + .is_end_of_stream = is_eos, }; xQueueSend(commands_, &args, portMAX_DELAY); - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return bytes_sent; + cpp::span<std::byte> input_as_bytes = { + reinterpret_cast<std::byte*>(input.data()), input.size_bytes()}; + size_t bytes_sent = 0; + while (bytes_sent < input_as_bytes.size()) { + bytes_sent += + xStreamBufferSend(source_, input_as_bytes.subspan(bytes_sent).data(), + input_as_bytes.size() - bytes_sent, portMAX_DELAY); + } } auto SinkMixer::Main() -> void { - OutputStream input_receiver{input_stream_.get()}; - xSemaphoreGive(is_idle_); - for (;;) { Args args; while (!xQueueReceive(commands_, &args, portMAX_DELAY)) { } - switch (args.cmd) { - case Command::kSetSourceFormat: - ESP_LOGI(kTag, "setting source format"); - input_receiver.prepare(args.format, {}); - resampler_.reset(); - break; - case Command::kSetTargetFormat: - ESP_LOGI(kTag, "setting target format"); - target_format_ = args.format; - resampler_.reset(); - break; - case Command::kReadBytes: - xSemaphoreTake(is_idle_, 0); - while (!xStreamBufferIsEmpty(source_)) { - auto buf = input_receiver.data(); - std::size_t bytes_received = xStreamBufferReceive( - source_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input_receiver.add(bytes_received); - HandleBytes(); + if (args.format != source_format_) { + resampler_.reset(); + source_format_ = args.format; + leftover_bytes_ = 0; + leftover_offset_ = 0; + + auto new_target = sink_->PrepareFormat(args.format); + if (new_target != target_format_) { + // The new format is different to the old one. Wait for the sink to + // drain before continuing. + while (!xStreamBufferIsEmpty(sink_->stream())) { + ESP_LOGI(kTag, "waiting for sink stream to drain..."); + // TODO(jacqueline): Get the sink drain ISR to notify us of this + // via semaphore instead of busy-ish waiting. + vTaskDelay(pdMS_TO_TICKS(10)); } - xSemaphoreGive(is_idle_); - break; + + ESP_LOGI(kTag, "configuring sink"); + sink_->Configure(new_target); + } + target_format_ = new_target; } - } -} -auto SinkMixer::HandleBytes() -> void { - InputStream input{input_stream_.get()}; - auto pcm = input.info().format_as<StreamInfo::Pcm>(); - if (!pcm) { - ESP_LOGE(kTag, "mixer got unsupported data"); - return; + // Loop until we finish reading all the bytes indicated. There might be + // leftovers from each iteration, and from this process as a whole, + // depending on the resampling stage. + size_t bytes_read = 0; + size_t bytes_to_read = args.samples_available * sizeof(sample::Sample); + while (bytes_read < bytes_to_read) { + // First top up the input buffer, taking care not to overwrite anything + // remaining from a previous iteration. + size_t bytes_read_this_it = xStreamBufferReceive( + source_, + input_buffer_as_bytes_.subspan(leftover_offset_ + leftover_bytes_) + .data(), + std::min(input_buffer_as_bytes_.size() - leftover_offset_ - + leftover_bytes_, + bytes_to_read - bytes_read), + portMAX_DELAY); + bytes_read += bytes_read_this_it; + + // Calculate the number of whole samples that are now in the input buffer. + size_t bytes_in_buffer = bytes_read_this_it + leftover_bytes_; + size_t samples_in_buffer = bytes_in_buffer / sizeof(sample::Sample); + + size_t samples_used = HandleSamples( + input_buffer_.subspan(leftover_offset_).first(samples_in_buffer), + args.is_end_of_stream && bytes_read == bytes_to_read); + + // Maybe the resampler didn't consume everything. Maybe the last few + // bytes we read were half a frame. Either way, we need to calculate the + // size of the remainder in bytes. + size_t bytes_used = samples_used * sizeof(sample::Sample); + leftover_bytes_ = bytes_in_buffer - bytes_used; + if (leftover_bytes_ == 0) { + leftover_offset_ = 0; + } else { + leftover_offset_ += bytes_used; + } + } } +} - if (*pcm == target_format_) { +auto SinkMixer::HandleSamples(cpp::span<sample::Sample> input, bool is_eos) + -> size_t { + if (source_format_ == target_format_) { // The happiest possible case: the input format matches the output - // format already. Streams like this should probably have bypassed the - // mixer. - // TODO(jacqueline): Make this an error; it's slow to use the mixer in this - // case, compared to just writing directly to the sink. - auto buf = input.data(); - std::size_t bytes_sent = - xStreamBufferSend(sink_, buf.data(), buf.size_bytes(), portMAX_DELAY); - input.consume(bytes_sent); - return; + // format already. + std::size_t bytes_sent = xStreamBufferSend( + sink_->stream(), input.data(), input.size_bytes(), portMAX_DELAY); + return bytes_sent / sizeof(sample::Sample); } - while (input_stream_->info().bytes_in_stream() >= sizeof(sample::Sample)) { - RawStream* output_source; - if (pcm->sample_rate != target_format_.sample_rate) { - OutputStream resampled_writer{resampled_stream_.get()}; - if (Resample(input, resampled_writer)) { + size_t samples_used = 0; + while (input.size() < samples_used) { + cpp::span<sample::Sample> output_source; + if (source_format_.sample_rate != target_format_.sample_rate) { + if (resampler_ == nullptr) { + ESP_LOGI(kTag, "creating new resampler"); + resampler_.reset(new Resampler(source_format_.sample_rate, + target_format_.sample_rate, + source_format_.num_channels)); + } + + size_t read, written; + std::tie(read, written) = + resampler_->Process(input, resampled_buffer_, is_eos); + samples_used += read; + if (read == 0 && written == 0) { // Zero samples used or written. We need more input. break; } - output_source = resampled_stream_.get(); + output_source = resampled_buffer_.first(written); } else { - output_source = input_stream_.get(); + output_source = input; + samples_used = input.size(); } - size_t bytes_consumed = output_source->info().bytes_in_stream(); - size_t bytes_to_send = output_source->info().bytes_in_stream(); - if (target_format_.bits_per_sample == 16) { - // This is slightly scary; we're basically reaching into the internals of - // the stream buffer to do in-place conversion of samples. Saving an - // extra buffer + copy into that buffer is certainly worth it however. - cpp::span<sample::Sample> src = - output_source->data_as<sample::Sample>().first( - output_source->info().bytes_in_stream() / sizeof(sample::Sample)); - cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(src.data()), - src.size()}; - - ApplyDither(src, 16); - Downscale(src, dest); - - bytes_consumed = src.size_bytes(); - bytes_to_send = src.size_bytes() / 2; - } + // FIXME: The source should have some kind of hint indicating whether it + // needs dither, since some codecs (e.g. opus) apply their own dither. + ApplyDither(output_source, 16); + + cpp::span<int16_t> dest{reinterpret_cast<int16_t*>(output_source.data()), + output_source.size()}; + for (size_t i = 0; i < output_source.size(); i++) { + dest[i] = sample::ToSigned16Bit(output_source[i]); + } - InputStream output{output_source}; - cpp::span<const std::byte> buf = output.data(); + output_source = output_source.first(output_source.size() / 2); + } size_t bytes_sent = 0; + size_t bytes_to_send = output_source.size_bytes(); while (bytes_sent < bytes_to_send) { - auto cropped = buf.subspan(bytes_sent, bytes_to_send - bytes_sent); - bytes_sent += xStreamBufferSend(sink_, cropped.data(), - cropped.size_bytes(), portMAX_DELAY); + bytes_sent += xStreamBufferSend( + sink_->stream(), + reinterpret_cast<std::byte*>(output_source.data()) + bytes_sent, + bytes_to_send - bytes_sent, portMAX_DELAY); } - output.consume(bytes_consumed); - } -} - -auto SinkMixer::Resample(InputStream& in, OutputStream& out) -> bool { - if (resampler_ == nullptr) { - ESP_LOGI(kTag, "creating new resampler"); - auto format = in.info().format_as<StreamInfo::Pcm>(); - resampler_.reset(new Resampler( - format->sample_rate, target_format_.sample_rate, format->channels)); - } - - auto res = resampler_->Process(in.data_as<sample::Sample>(), - out.data_as<sample::Sample>(), false); - - in.consume(res.first * sizeof(sample::Sample)); - out.add(res.second * sizeof(sample::Sample)); - - return res.first == 0 && res.second == 0; -} - -auto SinkMixer::Downscale(cpp::span<sample::Sample> samples, - cpp::span<int16_t> output) -> void { - for (size_t i = 0; i < samples.size(); i++) { - output[i] = sample::ToSigned16Bit(samples[i]); } + return samples_used; } auto SinkMixer::ApplyDither(cpp::span<sample::Sample> samples, |
