summaryrefslogtreecommitdiff
path: root/src/audio
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-07-25 17:42:36 +1000
committerjacqueline <me@jacqueline.id.au>2023-07-25 17:43:12 +1000
commit80d7df910987db5201402fe987124f29f09344f3 (patch)
tree7e8c1e04ab40026087343efee95a771c7839b32f /src/audio
parent7b72e5479ee6d11f76c49f7463ba0e7f4e5165c5 (diff)
downloadtangara-fw-80d7df910987db5201402fe987124f29f09344f3.tar.gz
fuck off
Diffstat (limited to 'src/audio')
-rw-r--r--src/audio/CMakeLists.txt2
-rw-r--r--src/audio/audio_fsm.cpp42
-rw-r--r--src/audio/audio_task.cpp388
-rw-r--r--src/audio/fatfs_audio_input.cpp354
-rw-r--r--src/audio/i2s_audio_output.cpp42
-rw-r--r--src/audio/include/audio_decoder.hpp7
-rw-r--r--src/audio/include/audio_events.hpp4
-rw-r--r--src/audio/include/audio_fsm.hpp6
-rw-r--r--src/audio/include/audio_sink.hpp32
-rw-r--r--src/audio/include/audio_source.hpp33
-rw-r--r--src/audio/include/audio_task.hpp45
-rw-r--r--src/audio/include/fatfs_audio_input.hpp125
-rw-r--r--src/audio/include/stream_info.hpp4
-rw-r--r--src/audio/track_queue.cpp27
14 files changed, 725 insertions, 386 deletions
diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt
index 2501f773..6ef144ac 100644
--- a/src/audio/CMakeLists.txt
+++ b/src/audio/CMakeLists.txt
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: GPL-3.0-only
idf_component_register(
- SRCS "audio_decoder.cpp" "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
+ SRCS "audio_task.cpp" "chunk.cpp" "fatfs_audio_input.cpp"
"stream_message.cpp" "i2s_audio_output.cpp" "stream_buffer.cpp" "track_queue.cpp"
"stream_event.cpp" "pipeline.cpp" "stream_info.cpp" "audio_fsm.cpp"
INCLUDE_DIRS "include"
diff --git a/src/audio/audio_fsm.cpp b/src/audio/audio_fsm.cpp
index 5f4f8783..c3313820 100644
--- a/src/audio/audio_fsm.cpp
+++ b/src/audio/audio_fsm.cpp
@@ -14,6 +14,8 @@
#include "esp_log.h"
#include "event_queue.hpp"
#include "fatfs_audio_input.hpp"
+#include "freertos/portmacro.h"
+#include "future_fetcher.hpp"
#include "i2s_audio_output.hpp"
#include "i2s_dac.hpp"
#include "pipeline.hpp"
@@ -29,14 +31,16 @@ drivers::IGpios* AudioState::sIGpios;
std::shared_ptr<drivers::I2SDac> AudioState::sDac;
std::weak_ptr<database::Database> AudioState::sDatabase;
+std::unique_ptr<AudioTask> AudioState::sTask;
std::unique_ptr<FatfsAudioInput> AudioState::sFileSource;
std::unique_ptr<I2SAudioOutput> AudioState::sI2SOutput;
-std::vector<std::unique_ptr<IAudioElement>> AudioState::sPipeline;
TrackQueue* AudioState::sTrackQueue;
+std::optional<database::TrackId> AudioState::sCurrentTrack;
auto AudioState::Init(drivers::IGpios* gpio_expander,
std::weak_ptr<database::Database> database,
+ std::shared_ptr<database::ITagParser> tag_parser,
TrackQueue* queue) -> bool {
sIGpios = gpio_expander;
sTrackQueue = queue;
@@ -48,19 +52,10 @@ auto AudioState::Init(drivers::IGpios* gpio_expander,
sDac.reset(dac.value());
sDatabase = database;
- sFileSource.reset(new FatfsAudioInput());
+ sFileSource.reset(new FatfsAudioInput(tag_parser));
sI2SOutput.reset(new I2SAudioOutput(sIGpios, sDac));
- // Perform initial pipeline configuration.
- // TODO(jacqueline): Factor this out once we have any kind of dynamic
- // reconfiguration.
- AudioDecoder* codec = new AudioDecoder();
- sPipeline.emplace_back(codec);
-
- Pipeline* pipeline = new Pipeline(sPipeline.front().get());
- pipeline->AddInput(sFileSource.get());
-
- task::StartPipeline(pipeline, sI2SOutput.get());
+ AudioTask::Start(sFileSource.get(), sI2SOutput.get());
return true;
}
@@ -85,9 +80,9 @@ void AudioState::react(const system_fsm::KeyDownChanged& ev) {
void AudioState::react(const system_fsm::HasPhonesChanged& ev) {
if (ev.falling) {
- ESP_LOGI(kTag, "headphones in!");
+ // ESP_LOGI(kTag, "headphones in!");
} else {
- ESP_LOGI(kTag, "headphones out!");
+ // ESP_LOGI(kTag, "headphones out!");
}
}
@@ -107,13 +102,15 @@ void Standby::react(const QueueUpdate& ev) {
return;
}
+ sCurrentTrack = current_track;
+
auto db = sDatabase.lock();
if (!db) {
ESP_LOGW(kTag, "database not open; ignoring play request");
return;
}
- sFileSource->OpenFile(db->GetTrackPath(*current_track));
+ sFileSource->SetPath(db->GetTrackPath(*current_track));
}
void Playback::entry() {
@@ -127,20 +124,25 @@ void Playback::exit() {
}
void Playback::react(const QueueUpdate& ev) {
+ if (!ev.current_changed) {
+ return;
+ }
auto current_track = sTrackQueue->GetCurrent();
if (!current_track) {
- // TODO: return to standby?
+ sFileSource->SetPath();
+ sCurrentTrack.reset();
+ transit<Standby>();
return;
}
+ sCurrentTrack = current_track;
+
auto db = sDatabase.lock();
if (!db) {
return;
}
- // TODO: what if we just finished this, and are preemptively loading the next
- // one?
- sFileSource->OpenFile(db->GetTrackPath(*current_track));
+ sFileSource->SetPath(db->GetTrackPath(*current_track));
}
void Playback::react(const PlaybackUpdate& ev) {
@@ -161,7 +163,7 @@ void Playback::react(const internal::InputFileClosed& ev) {
return;
}
ESP_LOGI(kTag, "preemptively opening next file");
- sFileSource->OpenFile(db->GetTrackPath(upcoming.front()));
+ sFileSource->SetPath(db->GetTrackPath(upcoming.front()));
}
void Playback::react(const internal::InputFileFinished& ev) {
diff --git a/src/audio/audio_task.cpp b/src/audio/audio_task.cpp
index babe6849..dbe5d50e 100644
--- a/src/audio/audio_task.cpp
+++ b/src/audio/audio_task.cpp
@@ -9,23 +9,29 @@
#include <stdlib.h>
#include <algorithm>
+#include <cmath>
#include <cstddef>
#include <cstdint>
+#include <cstring>
#include <deque>
#include <memory>
#include <variant>
+#include "audio_decoder.hpp"
#include "audio_events.hpp"
#include "audio_fsm.hpp"
#include "audio_sink.hpp"
#include "cbor.h"
+#include "codec.hpp"
#include "esp_err.h"
#include "esp_heap_caps.h"
#include "esp_log.h"
#include "event_queue.hpp"
+#include "fatfs_audio_input.hpp"
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "freertos/queue.h"
+#include "freertos/ringbuf.h"
#include "pipeline.hpp"
#include "span.hpp"
@@ -41,193 +47,209 @@
namespace audio {
-namespace task {
-
-static const char* kTag = "task";
-
-// The default amount of time to wait between pipeline iterations for a single
-// track.
-static constexpr uint_fast16_t kDefaultDelayTicks = pdMS_TO_TICKS(5);
-static constexpr uint_fast16_t kMaxDelayTicks = pdMS_TO_TICKS(10);
-static constexpr uint_fast16_t kMinDelayTicks = pdMS_TO_TICKS(1);
-
-void AudioTaskMain(std::unique_ptr<Pipeline> pipeline, IAudioSink* sink) {
- // The stream format for bytes currently in the sink buffer.
- std::optional<StreamInfo::Format> output_format;
-
- // How long to wait between pipeline iterations. This is reset for each track,
- // and readjusted on the fly to maintain a reasonable amount playback buffer.
- // Buffering too much will mean we process samples inefficiently, wasting CPU
- // time, whilst buffering too little will affect the quality of the output.
- uint_fast16_t delay_ticks = kDefaultDelayTicks;
-
- std::vector<Pipeline*> all_elements = pipeline->GetIterationOrder();
-
- float current_sample_in_second = 0;
- uint32_t previous_second = 0;
- uint32_t current_second = 0;
-
- bool previously_had_work = false;
- events::EventQueue& event_queue = events::EventQueue::GetInstance();
- while (1) {
- // First, see if we actually have any pipeline work to do in this iteration.
- bool has_work = false;
- // We always have work to do if there's still bytes to be sunk.
- has_work = all_elements.back()->OutStream().info->bytes_in_stream > 0;
- if (!has_work) {
- for (Pipeline* p : all_elements) {
- has_work = p->OutputElement()->NeedsToProcess();
- if (has_work) {
- break;
- }
- }
- }
-
- if (!has_work) {
- has_work = !xStreamBufferIsEmpty(sink->buffer());
- }
-
- if (previously_had_work && !has_work) {
- events::Dispatch<internal::AudioPipelineIdle, AudioState>({});
- }
- previously_had_work = has_work;
-
- // See if there's any new events.
- event_queue.ServiceAudio(has_work ? delay_ticks : portMAX_DELAY);
-
- if (!has_work) {
- // See if we've been given work by this event.
- for (Pipeline* p : all_elements) {
- has_work = p->OutputElement()->NeedsToProcess();
- if (has_work) {
- delay_ticks = kDefaultDelayTicks;
- break;
- }
- }
- if (!has_work) {
- continue;
- }
- }
-
- // We have work to do! Allow each element in the pipeline to process one
- // chunk. We iterate from input nodes first, so this should result in
- // samples in the output buffer.
-
- for (int i = 0; i < all_elements.size(); i++) {
- std::vector<RawStream> raw_in_streams;
- all_elements.at(i)->InStreams(&raw_in_streams);
- RawStream raw_out_stream = all_elements.at(i)->OutStream();
-
- // Crop the input and output streams to the ranges that are safe to
- // touch. For the input streams, this is the region that contains
- // data. For the output stream, this is the region that does *not*
- // already contain data.
- std::vector<InputStream> in_streams;
- std::for_each(raw_in_streams.begin(), raw_in_streams.end(),
- [&](RawStream& s) { in_streams.emplace_back(&s); });
- OutputStream out_stream(&raw_out_stream);
-
- all_elements.at(i)->OutputElement()->Process(in_streams, &out_stream);
- }
-
- RawStream raw_sink_stream = all_elements.back()->OutStream();
- InputStream sink_stream(&raw_sink_stream);
-
- if (sink_stream.info().bytes_in_stream == 0) {
- if (sink_stream.is_producer_finished()) {
- sink_stream.mark_consumer_finished();
-
- if (current_second > 0 || current_sample_in_second > 0) {
- events::Dispatch<internal::InputFileFinished, AudioState>({});
- }
-
- current_second = 0;
- previous_second = 0;
- current_sample_in_second = 0;
- } else {
- // The user is probably about to hear a skip :(
- ESP_LOGW(kTag, "!! audio sink is underbuffered !!");
- }
- // No new bytes to sink, so skip sinking completely.
- continue;
- }
-
- if (!output_format || output_format != sink_stream.info().format) {
- // The format of the stream within the sink stream has changed. We
- // need to reconfigure the sink, but shouldn't do so until we've fully
- // drained the current buffer.
- if (xStreamBufferIsEmpty(sink->buffer())) {
- ESP_LOGI(kTag, "reconfiguring dac");
- output_format = sink_stream.info().format;
- sink->Configure(*output_format);
- } else {
- ESP_LOGI(kTag, "waiting to reconfigure");
- continue;
- }
- }
-
- // We've reconfigured the sink, or it was already configured correctly.
- // Send through some data.
- std::size_t bytes_sunk =
- xStreamBufferSend(sink->buffer(), sink_stream.data().data(),
- sink_stream.data().size_bytes(), 0);
-
- if (std::holds_alternative<StreamInfo::Pcm>(*output_format)) {
- StreamInfo::Pcm pcm = std::get<StreamInfo::Pcm>(*output_format);
-
- float samples_sunk = bytes_sunk;
- samples_sunk /= pcm.channels;
-
- // Samples must be aligned to 16 bits. The number of actual bytes per
- // sample is therefore the bps divided by 16, rounded up (align to word),
- // times two (convert to bytes).
- uint8_t bytes_per_sample = ((pcm.bits_per_sample + 16 - 1) / 16) * 2;
- samples_sunk /= bytes_per_sample;
-
- current_sample_in_second += samples_sunk;
- while (current_sample_in_second >= pcm.sample_rate) {
- current_second++;
- current_sample_in_second -= pcm.sample_rate;
- }
- if (previous_second != current_second) {
- events::Dispatch<PlaybackUpdate, AudioState, ui::UiState>({
- .seconds_elapsed = current_second,
- .seconds_total =
- sink_stream.info().duration_seconds.value_or(current_second),
- });
- }
- previous_second = current_second;
- }
-
- // Adjust how long we wait for the next iteration if we're getting too far
- // ahead or behind.
- float sunk_percent = static_cast<float>(bytes_sunk) /
- static_cast<float>(sink_stream.info().bytes_in_stream);
-
- if (sunk_percent > 0.66f) {
- // We're sinking a lot of the output buffer per iteration, so we need to
- // be running faster.
- delay_ticks--;
- } else if (sunk_percent < 0.33f) {
- // We're not sinking much of the output buffer per iteration, so we can
- // slow down to save some cycles.
- delay_ticks++;
- }
- delay_ticks = std::clamp(delay_ticks, kMinDelayTicks, kMaxDelayTicks);
-
- // Finally, actually mark the bytes we sunk as consumed.
- if (bytes_sunk > 0) {
- sink_stream.consume(bytes_sunk);
- }
+static const char* kTag = "audio_dec";
+
+static constexpr std::size_t kSampleBufferSize = 16 * 1024;
+
+Timer::Timer(StreamInfo::Pcm format)
+ : format_(format),
+ last_seconds_(0),
+ total_duration_seconds_(0),
+ current_seconds_(0) {}
+
+auto Timer::SetLengthSeconds(uint32_t len) -> void {
+ total_duration_seconds_ = len;
+}
+
+auto Timer::SetLengthBytes(uint32_t len) -> void {
+ total_duration_seconds_ = 0;
+}
+
+auto Timer::AddBytes(std::size_t bytes) -> void {
+ float samples_sunk = bytes;
+ samples_sunk /= format_.channels;
+
+ // Samples must be aligned to 16 bits. The number of actual bytes per
+ // sample is therefore the bps divided by 16, rounded up (align to word),
+ // times two (convert to bytes).
+ uint8_t bytes_per_sample = ((format_.bits_per_sample + 16 - 1) / 16) * 2;
+ samples_sunk /= bytes_per_sample;
+
+ current_seconds_ += samples_sunk / format_.sample_rate;
+
+ uint32_t rounded = std::round(current_seconds_);
+ if (rounded != last_seconds_) {
+ last_seconds_ = rounded;
+ events::Dispatch<PlaybackUpdate, AudioState, ui::UiState>(PlaybackUpdate{
+ .seconds_elapsed = rounded,
+ .seconds_total =
+ total_duration_seconds_ == 0 ? rounded : total_duration_seconds_});
}
}
-auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void {
- ESP_LOGI(kTag, "starting audio pipeline task");
- tasks::StartPersistent<tasks::Type::kAudio>(
- [=]() { AudioTaskMain(std::unique_ptr<Pipeline>(pipeline), sink); });
+auto AudioTask::Start(IAudioSource* source, IAudioSink* sink) -> AudioTask* {
+ AudioTask* task = new AudioTask(source, sink);
+ tasks::StartPersistent<tasks::Type::kAudio>([=]() { task->Main(); });
+ return task;
}
-} // namespace task
+AudioTask::AudioTask(IAudioSource* source, IAudioSink* sink)
+ : source_(source),
+ sink_(sink),
+ codec_(),
+ timer_(),
+ is_new_stream_(false),
+ current_input_format_(),
+ current_output_format_(),
+ sample_buffer_(reinterpret_cast<std::byte*>(
+ heap_caps_malloc(kSampleBufferSize,
+ MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
+ sample_buffer_len_(kSampleBufferSize) {}
+
+void AudioTask::Main() {
+ for (;;) {
+ source_->Read(
+ [this](StreamInfo::Format format) -> bool {
+ if (current_input_format_ && format == *current_input_format_) {
+ // This is the continuation of previous data. We can handle it if
+ // we are able to decode it, or if it doesn't need decoding.
+ return current_output_format_ == format || codec_ != nullptr;
+ }
+ // This must be a new stream of data. Reset everything to prepare to
+ // handle it.
+ current_input_format_ = format;
+ is_new_stream_ = true;
+ codec_.reset();
+ timer_.reset();
+
+ // What kind of data does this new stream contain?
+ if (std::holds_alternative<StreamInfo::Pcm>(format)) {
+ // It's already decoded! We can handle this immediately if it
+ // matches what we're currently sending to the sink. Otherwise, we
+ // will need to wait for the sink to drain before we can reconfigure
+ // it.
+ if (current_output_format_ && format == *current_output_format_) {
+ return true;
+ } else if (xStreamBufferIsEmpty(sink_->stream())) {
+ return true;
+ } else {
+ return false;
+ }
+ } else if (std::holds_alternative<StreamInfo::Encoded>(format)) {
+ // The stream has some kind of encoding. Whether or not we can
+ // handle it is entirely down to whether or not we have a codec for
+ // it.
+ auto encoding = std::get<StreamInfo::Encoded>(format);
+ auto codec = codecs::CreateCodecForType(encoding.type);
+ if (codec) {
+ ESP_LOGI(kTag, "successfully created codec for stream");
+ codec_.reset(*codec);
+ return true;
+ } else {
+ ESP_LOGE(kTag, "stream has unknown encoding");
+ return false;
+ }
+ } else {
+ // programmer error / skill issue :(
+ ESP_LOGE(kTag, "stream has unknown format");
+ current_input_format_ = format;
+ return false;
+ }
+ },
+ [this](cpp::span<const std::byte> bytes) -> size_t {
+ // PCM streams are simple, so handle them first.
+ if (std::holds_alternative<StreamInfo::Pcm>(*current_input_format_)) {
+ // First we need to reconfigure the sink for this sample format.
+ // TODO(jacqueline): We should verify whether or not the sink can
+ // actually deal with this format first.
+ if (current_input_format_ != current_output_format_) {
+ current_output_format_ = current_input_format_;
+ sink_->Configure(*current_output_format_);
+ timer_.reset(new Timer(
+ std::get<StreamInfo::Pcm>(*current_output_format_)));
+ }
+ // Stream the raw samples directly to the sink.
+ xStreamBufferSend(sink_->stream(), bytes.data(), bytes.size_bytes(),
+ portMAX_DELAY);
+ timer_->AddBytes(bytes.size_bytes());
+ return bytes.size_bytes();
+ }
+ // Else, assume it's an encoded stream.
+
+ size_t bytes_used = 0;
+ if (is_new_stream_) {
+ // This is a new stream! First order of business is verifying that
+ // we can indeed decode it.
+ auto res = codec_->BeginStream(bytes);
+ bytes_used += res.first;
+
+ if (res.second.has_error()) {
+ if (res.second.error() != codecs::ICodec::Error::kOutOfInput) {
+ // Decoding the header failed, so we can't actually deal with
+ // this stream after all. It could be malformed.
+ ESP_LOGE(kTag, "error beginning stream");
+ codec_.reset();
+ }
+ return bytes_used;
+ }
+ is_new_stream_ = false;
+
+ codecs::ICodec::OutputFormat format = res.second.value();
+ StreamInfo::Pcm pcm{
+ .channels = format.num_channels,
+ .bits_per_sample = format.bits_per_sample,
+ .sample_rate = format.sample_rate_hz,
+ };
+ StreamInfo::Format new_format{pcm};
+ timer_.reset(new Timer{pcm});
+ if (format.duration_seconds) {
+ timer_->SetLengthSeconds(*format.duration_seconds);
+ }
+
+ // Now that we have the output format for decoded samples from this
+ // stream, we need to see if they are compatible with what's already
+ // in the sink stream.
+ if (new_format != current_output_format_) {
+ // The new format is different to the old one. Wait for the sink
+ // to drain before continuing.
+ while (!xStreamBufferIsEmpty(sink_->stream())) {
+ ESP_LOGI(kTag, "waiting for sink stream to drain...");
+ // TODO(jacqueline): Get the sink drain ISR to notify us of this
+ // via semaphore instead of busy-ish waiting.
+ vTaskDelay(pdMS_TO_TICKS(100));
+ }
+ }
+
+ ESP_LOGI(kTag, "configuring sink");
+ current_output_format_ = new_format;
+ sink_->Configure(new_format);
+ timer_.reset(
+ new Timer(std::get<StreamInfo::Pcm>(*current_output_format_)));
+ }
+
+ // At this point the decoder has been initialised, and the sink has
+ // been correctly configured. All that remains is to throw samples
+ // into the sink as fast as possible.
+ while (bytes_used < bytes.size_bytes()) {
+ auto res =
+ codec_->ContinueStream(bytes.subspan(bytes_used),
+ {sample_buffer_, sample_buffer_len_});
+
+ bytes_used += res.first;
+
+ if (res.second.has_error()) {
+ return bytes_used;
+ } else {
+ xStreamBufferSend(sink_->stream(), sample_buffer_,
+ res.second->bytes_written, portMAX_DELAY);
+ timer_->AddBytes(res.second->bytes_written);
+ }
+ }
+
+ return bytes_used;
+ },
+ portMAX_DELAY);
+ }
+}
} // namespace audio
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp
index da605a40..6a320a5a 100644
--- a/src/audio/fatfs_audio_input.cpp
+++ b/src/audio/fatfs_audio_input.cpp
@@ -5,96 +5,276 @@
*/
#include "fatfs_audio_input.hpp"
+
#include <stdint.h>
#include <algorithm>
-#include <chrono>
+#include <climits>
#include <cstddef>
#include <cstdint>
+#include <functional>
#include <future>
#include <memory>
+#include <mutex>
#include <string>
#include <variant>
-#include "arena.hpp"
-#include "audio_events.hpp"
-#include "audio_fsm.hpp"
#include "esp_heap_caps.h"
#include "esp_log.h"
-#include "event_queue.hpp"
#include "ff.h"
-#include "freertos/portmacro.h"
-#include "audio_element.hpp"
-#include "chunk.hpp"
-#include "stream_buffer.hpp"
-#include "stream_event.hpp"
+#include "audio_events.hpp"
+#include "audio_fsm.hpp"
+#include "audio_source.hpp"
+#include "event_queue.hpp"
+#include "freertos/portmacro.h"
+#include "freertos/projdefs.h"
+#include "future_fetcher.hpp"
+#include "span.hpp"
#include "stream_info.hpp"
-#include "stream_message.hpp"
#include "tag_parser.hpp"
-#include "track.hpp"
+#include "tasks.hpp"
#include "types.hpp"
static const char* kTag = "SRC";
namespace audio {
-FatfsAudioInput::FatfsAudioInput()
- : IAudioElement(),
+static constexpr UINT kFileBufferSize = 4096 * 2;
+static constexpr UINT kStreamerBufferSize = 1024;
+
+static StreamBufferHandle_t sForwardDest = nullptr;
+
+auto forward_cb(const BYTE* buf, UINT buf_length) -> UINT {
+ if (buf_length == 0) {
+ return !xStreamBufferIsFull(sForwardDest);
+ } else {
+ return xStreamBufferSend(sForwardDest, buf, buf_length, 0);
+ }
+}
+
+FileStreamer::FileStreamer(StreamBufferHandle_t dest,
+ SemaphoreHandle_t data_was_read)
+ : control_(xQueueCreate(1, sizeof(Command))),
+ destination_(dest),
+ data_was_read_(data_was_read),
+ has_data_(false),
+ file_(),
+ next_file_() {
+ assert(sForwardDest == nullptr);
+ sForwardDest = dest;
+ tasks::StartPersistent<tasks::Type::kFileStreamer>([this]() { Main(); });
+}
+
+FileStreamer::~FileStreamer() {
+ sForwardDest = nullptr;
+ Command quit = kQuit;
+ xQueueSend(control_, &quit, portMAX_DELAY);
+ vQueueDelete(control_);
+}
+
+auto FileStreamer::Main() -> void {
+ for (;;) {
+ Command cmd;
+ xQueueReceive(control_, &cmd, portMAX_DELAY);
+
+ if (cmd == kQuit) {
+ break;
+ } else if (cmd == kRestart) {
+ CloseFile();
+ xStreamBufferReset(destination_);
+ file_ = std::move(next_file_);
+ has_data_ = file_ != nullptr;
+ } else if (cmd == kRefillBuffer && file_) {
+ UINT bytes_sent = 0; // Unused.
+ // Use f_forward to push bytes directly from FATFS internal buffers into
+ // the destination. This has the nice side effect of letting FATFS decide
+ // the most efficient way to pull in data from disk; usually one whole
+ // sector at a time. Consult the FATFS lib application notes if changing
+ // this to use f_read.
+ FRESULT res = f_forward(file_.get(), forward_cb, UINT_MAX, &bytes_sent);
+ if (res != FR_OK || f_eof(file_.get())) {
+ CloseFile();
+ has_data_ = false;
+ }
+ if (bytes_sent > 0) {
+ xSemaphoreGive(data_was_read_);
+ }
+ }
+ }
+
+ ESP_LOGW(kTag, "quit file streamer");
+ CloseFile();
+ vTaskDelete(NULL);
+}
+
+auto FileStreamer::Fetch() -> void {
+ if (!has_data_.load()) {
+ return;
+ }
+ Command refill = kRefillBuffer;
+ xQueueSend(control_, &refill, portMAX_DELAY);
+}
+
+auto FileStreamer::HasFinished() -> bool {
+ return !has_data_.load();
+}
+
+auto FileStreamer::Restart(std::unique_ptr<FIL> new_file) -> void {
+ next_file_ = std::move(new_file);
+ Command restart = kRestart;
+ xQueueSend(control_, &restart, portMAX_DELAY);
+ Command fill = kRefillBuffer;
+ xQueueSend(control_, &fill, portMAX_DELAY);
+}
+
+auto FileStreamer::CloseFile() -> void {
+ if (!file_) {
+ return;
+ }
+ ESP_LOGI(kTag, "closing file");
+ f_close(file_.get());
+ file_ = {};
+ events::Dispatch<internal::InputFileClosed, AudioState>({});
+}
+
+FatfsAudioInput::FatfsAudioInput(
+ std::shared_ptr<database::ITagParser> tag_parser)
+ : IAudioSource(),
+ tag_parser_(tag_parser),
+ has_data_(xSemaphoreCreateBinary()),
+ streamer_buffer_(xStreamBufferCreate(kStreamerBufferSize, 1)),
+ streamer_(new FileStreamer(streamer_buffer_, has_data_)),
+ file_buffer_info_(),
+ file_buffer_len_(kFileBufferSize),
+ file_buffer_(reinterpret_cast<std::byte*>(
+ heap_caps_malloc(file_buffer_len_,
+ MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL))),
+ file_buffer_stream_(&file_buffer_info_, {file_buffer_, file_buffer_len_}),
+ source_mutex_(),
pending_path_(),
- current_file_(),
- is_file_open_(false),
- has_prepared_output_(false),
- current_container_(),
current_format_() {}
-FatfsAudioInput::~FatfsAudioInput() {}
+FatfsAudioInput::~FatfsAudioInput() {
+ streamer_.reset();
+ vStreamBufferDelete(streamer_buffer_);
+ vSemaphoreDelete(has_data_);
+ free(file_buffer_);
+}
-auto FatfsAudioInput::OpenFile(std::future<std::optional<std::string>>&& path)
+auto FatfsAudioInput::SetPath(std::future<std::optional<std::string>> fut)
-> void {
- pending_path_ = std::move(path);
+ std::lock_guard<std::mutex> lock{source_mutex_};
+
+ CloseCurrentFile();
+ pending_path_.reset(
+ new database::FutureFetcher<std::optional<std::string>>(std::move(fut)));
+
+ xSemaphoreGive(has_data_);
}
-auto FatfsAudioInput::OpenFile(const std::string& path) -> bool {
- current_path_.reset();
- if (is_file_open_) {
- f_close(&current_file_);
- is_file_open_ = false;
- has_prepared_output_ = false;
- }
+auto FatfsAudioInput::SetPath(const std::string& path) -> void {
+ std::lock_guard<std::mutex> lock{source_mutex_};
+
+ CloseCurrentFile();
+ OpenFile(path);
+}
+
+auto FatfsAudioInput::SetPath() -> void {
+ std::lock_guard<std::mutex> lock{source_mutex_};
+ CloseCurrentFile();
+}
+auto FatfsAudioInput::Read(
+ std::function<bool(StreamInfo::Format)> can_read,
+ std::function<size_t(cpp::span<const std::byte>)> read,
+ TickType_t max_wait) -> void {
+ // Wait until we have data to return.
+ xSemaphoreTake(has_data_, portMAX_DELAY);
+
+ // Ensure the file doesn't change whilst we're trying to get data about it.
+ std::lock_guard<std::mutex> source_lock{source_mutex_};
+
+ // If the path is a future, then wait for it to complete.
+ // TODO(jacqueline): We should really make some kind of FreeRTOS-integrated
+ // way to block a task whilst awaiting a future.
if (pending_path_) {
- pending_path_ = {};
+ while (!pending_path_->Finished()) {
+ vTaskDelay(pdMS_TO_TICKS(100));
+ }
+ auto res = pending_path_->Result();
+ pending_path_.reset();
+
+ if (res || *res) {
+ OpenFile(**res);
+ }
+
+ // Bail out now that we've resolved the future. If we end up successfully
+ // readinig from the path, then has_data will be flagged again.
+ return;
+ }
+
+ // Move data from the file streamer's buffer into our file buffer. We need our
+ // own buffer so that we can handle concatenating smaller file chunks into
+ // complete frames for the decoder.
+ OutputStream writer{&file_buffer_stream_};
+ std::size_t bytes_added =
+ xStreamBufferReceive(streamer_buffer_, writer.data().data(),
+ writer.data().size_bytes(), pdMS_TO_TICKS(0));
+ writer.add(bytes_added);
+
+ // HACK: libmad needs at least MAD_HEADER_GUARD (= 8) extra bytes following a
+ // frame, or else it refuses to decode it.
+ if (IsCurrentFormatMp3() && !HasDataRemaining()) {
+ ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix");
+ cpp::span<std::byte> buf = writer.data();
+ size_t pad_amount = std::min<size_t>(buf.size_bytes(), 8);
+ std::fill_n(buf.begin(), pad_amount, static_cast<std::byte>(0));
}
+ InputStream reader{&file_buffer_stream_};
+ auto data_for_cb = reader.data();
+ if (!data_for_cb.empty() && std::invoke(can_read, *current_format_)) {
+ reader.consume(std::invoke(read, reader.data()));
+ }
+
+ if (!HasDataRemaining()) {
+ // Out of data. We're finished. Note we don't care about anything left in
+ // the file buffer at this point; the callback as seen it, so if it didn't
+ // consume it then presumably whatever is left isn't enough to form a
+ // complete frame.
+ ESP_LOGI(kTag, "finished streaming file");
+ CloseCurrentFile();
+ } else {
+ // There is still data to be read, or sitting in the buffer.
+ streamer_->Fetch();
+ xSemaphoreGive(has_data_);
+ }
+}
+
+auto FatfsAudioInput::OpenFile(const std::string& path) -> void {
ESP_LOGI(kTag, "opening file %s", path.c_str());
FILINFO info;
if (f_stat(path.c_str(), &info) != FR_OK) {
ESP_LOGE(kTag, "failed to stat file");
+ return;
}
- database::TagParserImpl tag_parser;
database::TrackTags tags;
- if (!tag_parser.ReadAndParseTags(path, &tags)) {
+ if (!tag_parser_->ReadAndParseTags(path, &tags)) {
ESP_LOGE(kTag, "failed to read tags");
- return false;
+ return;
}
auto stream_type = ContainerToStreamType(tags.encoding());
if (!stream_type.has_value()) {
ESP_LOGE(kTag, "couldn't match container to stream");
- return false;
+ return;
}
- current_container_ = tags.encoding();
-
if (*stream_type == codecs::StreamType::kPcm && tags.channels &&
tags.bits_per_sample && tags.channels) {
- // WAV files are a special case bc they contain raw PCM streams. These don't
- // need decoding, but we *do* need to parse the PCM format from the header.
- // TODO(jacqueline): Maybe we should have a decoder for this just to deal
- // with endianness differences?
current_format_ = StreamInfo::Pcm{
.channels = static_cast<uint8_t>(*tags.channels),
.bits_per_sample = static_cast<uint8_t>(*tags.bits_per_sample),
@@ -107,89 +287,26 @@ auto FatfsAudioInput::OpenFile(const std::string& path) -> bool {
};
}
- FRESULT res = f_open(&current_file_, path.c_str(), FA_READ);
+ std::unique_ptr<FIL> file = std::make_unique<FIL>();
+ FRESULT res = f_open(file.get(), path.c_str(), FA_READ);
if (res != FR_OK) {
ESP_LOGE(kTag, "failed to open file! res: %i", res);
- return false;
+ return;
}
+ streamer_->Restart(std::move(file));
+
events::Dispatch<internal::InputFileOpened, AudioState>({});
- current_path_ = path;
- is_file_open_ = true;
- return true;
}
-auto FatfsAudioInput::NeedsToProcess() const -> bool {
- return is_file_open_ || pending_path_;
+auto FatfsAudioInput::CloseCurrentFile() -> void {
+ streamer_->Restart({});
+ xStreamBufferReset(streamer_buffer_);
+ current_format_ = {};
}
-auto FatfsAudioInput::Process(const std::vector<InputStream>& inputs,
- OutputStream* output) -> void {
- // If the next path is being given to us asynchronously, then we need to check
- // in regularly to see if it's available yet.
- if (pending_path_) {
- if (!pending_path_->valid()) {
- pending_path_ = {};
- } else {
- if (pending_path_->wait_for(std::chrono::seconds(0)) ==
- std::future_status::ready) {
- auto result = pending_path_->get();
- if (result && result != current_path_) {
- OpenFile(*result);
- }
- pending_path_ = {};
- }
- }
- }
-
- if (!is_file_open_) {
- return;
- }
-
- // If the output buffer isn't ready for a new stream, then we need to wait.
- if (!has_prepared_output_ && !output->prepare(*current_format_)) {
- return;
- }
- has_prepared_output_ = true;
-
- // Performing many small reads is inefficient; it's better to do fewer, larger
- // reads. Try to achieve this by only reading in new bytes if the output
- // buffer has been mostly drained.
- std::size_t max_size = output->data().size_bytes();
- if (max_size < output->data().size_bytes() / 2) {
- return;
- }
-
- std::size_t size = 0;
- FRESULT result =
- f_read(&current_file_, output->data().data(), max_size, &size);
- if (result != FR_OK) {
- ESP_LOGE(kTag, "file I/O error %d", result);
- output->mark_producer_finished();
- // TODO(jacqueline): Handle errors.
- return;
- }
-
- output->add(size);
-
- if (size < max_size || f_eof(&current_file_)) {
- // HACK: In order to decode the last frame of a file, libmad requires 8
- // 0-bytes ( == MAD_GUARD_BYTES) to be appended to the end of the stream.
- // It would be better to do this within mad.cpp, but so far it's the only
- // decoder that has such a requirement.
- if (current_container_ == database::Encoding::kMp3) {
- std::fill_n(output->data().begin(), 8, std::byte(0));
- output->add(8);
- }
-
- f_close(&current_file_);
- is_file_open_ = false;
- current_path_.reset();
- has_prepared_output_ = false;
- output->mark_producer_finished();
-
- events::Dispatch<internal::InputFileClosed, AudioState>({});
- }
+auto FatfsAudioInput::HasDataRemaining() -> bool {
+ return !xStreamBufferIsEmpty(streamer_buffer_) || !streamer_->HasFinished();
}
auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc)
@@ -209,4 +326,15 @@ auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc)
}
}
+auto FatfsAudioInput::IsCurrentFormatMp3() -> bool {
+ if (!current_format_) {
+ return false;
+ }
+ if (!std::holds_alternative<StreamInfo::Encoded>(*current_format_)) {
+ return false;
+ }
+ return std::get<StreamInfo::Encoded>(*current_format_).type ==
+ codecs::StreamType::kMp3;
+}
+
} // namespace audio
diff --git a/src/audio/i2s_audio_output.cpp b/src/audio/i2s_audio_output.cpp
index 5f705dd1..57b5e071 100644
--- a/src/audio/i2s_audio_output.cpp
+++ b/src/audio/i2s_audio_output.cpp
@@ -6,6 +6,7 @@
#include "i2s_audio_output.hpp"
#include <stdint.h>
+#include <sys/_stdint.h>
#include <algorithm>
#include <cstddef>
@@ -18,6 +19,7 @@
#include "audio_element.hpp"
#include "freertos/projdefs.h"
#include "gpios.hpp"
+#include "i2c.hpp"
#include "i2s_dac.hpp"
#include "result.hpp"
#include "stream_info.hpp"
@@ -34,7 +36,7 @@ I2SAudioOutput::I2SAudioOutput(drivers::IGpios* expander,
left_difference_(0),
attenuation_() {
SetVolume(25); // For testing
- dac_->SetSource(buffer());
+ dac_->SetSource(stream());
}
I2SAudioOutput::~I2SAudioOutput() {
@@ -68,13 +70,47 @@ auto I2SAudioOutput::GetAdjustedMaxAttenuation() -> int_fast8_t {
return 0;
}
+static uint8_t vol = 0xFF;
+
auto I2SAudioOutput::AdjustVolumeUp() -> bool {
- // TODO
+ vol += 0xF;
+ {
+ drivers::I2CTransaction transaction;
+ transaction.start()
+ .write_addr(0b0011010, I2C_MASTER_WRITE)
+ .write_ack(6, 0b01, vol)
+ .stop();
+ transaction.Execute();
+ }
+ {
+ drivers::I2CTransaction transaction;
+ transaction.start()
+ .write_addr(0b0011010, I2C_MASTER_WRITE)
+ .write_ack(7, 0b11, vol)
+ .stop();
+ transaction.Execute();
+ }
return true;
}
auto I2SAudioOutput::AdjustVolumeDown() -> bool {
- // TODO
+ vol -= 0xF;
+ {
+ drivers::I2CTransaction transaction;
+ transaction.start()
+ .write_addr(0b0011010, I2C_MASTER_WRITE)
+ .write_ack(6, 0b01, vol)
+ .stop();
+ transaction.Execute();
+ }
+ {
+ drivers::I2CTransaction transaction;
+ transaction.start()
+ .write_addr(0b0011010, I2C_MASTER_WRITE)
+ .write_ack(7, 0b11, vol)
+ .stop();
+ transaction.Execute();
+ }
return true;
}
diff --git a/src/audio/include/audio_decoder.hpp b/src/audio/include/audio_decoder.hpp
index a6b4754a..e8da415e 100644
--- a/src/audio/include/audio_decoder.hpp
+++ b/src/audio/include/audio_decoder.hpp
@@ -25,15 +25,12 @@ namespace audio {
* An audio element that accepts various kinds of encoded audio streams as
* input, and converts them to uncompressed PCM output.
*/
-class AudioDecoder : public IAudioElement {
+class AudioDecoder {
public:
AudioDecoder();
~AudioDecoder();
- auto NeedsToProcess() const -> bool override;
-
- auto Process(const std::vector<InputStream>& inputs, OutputStream* output)
- -> void override;
+ auto Process(const InputStream& input, OutputStream* output) -> void;
AudioDecoder(const AudioDecoder&) = delete;
AudioDecoder& operator=(const AudioDecoder&) = delete;
diff --git a/src/audio/include/audio_events.hpp b/src/audio/include/audio_events.hpp
index 8af3703a..933eb7a2 100644
--- a/src/audio/include/audio_events.hpp
+++ b/src/audio/include/audio_events.hpp
@@ -26,7 +26,9 @@ struct PlaybackUpdate : tinyfsm::Event {
uint32_t seconds_total;
};
-struct QueueUpdate : tinyfsm::Event {};
+struct QueueUpdate : tinyfsm::Event {
+ bool current_changed;
+};
struct VolumeChanged : tinyfsm::Event {};
diff --git a/src/audio/include/audio_fsm.hpp b/src/audio/include/audio_fsm.hpp
index 7910f4e2..3a598902 100644
--- a/src/audio/include/audio_fsm.hpp
+++ b/src/audio/include/audio_fsm.hpp
@@ -11,6 +11,7 @@
#include <vector>
#include "audio_events.hpp"
+#include "audio_task.hpp"
#include "database.hpp"
#include "display.hpp"
#include "fatfs_audio_input.hpp"
@@ -18,6 +19,7 @@
#include "i2s_audio_output.hpp"
#include "i2s_dac.hpp"
#include "storage.hpp"
+#include "tag_parser.hpp"
#include "tinyfsm.hpp"
#include "track.hpp"
@@ -30,6 +32,7 @@ class AudioState : public tinyfsm::Fsm<AudioState> {
public:
static auto Init(drivers::IGpios* gpio_expander,
std::weak_ptr<database::Database>,
+ std::shared_ptr<database::ITagParser>,
TrackQueue* queue) -> bool;
virtual ~AudioState() {}
@@ -61,11 +64,12 @@ class AudioState : public tinyfsm::Fsm<AudioState> {
static std::shared_ptr<drivers::I2SDac> sDac;
static std::weak_ptr<database::Database> sDatabase;
+ static std::unique_ptr<AudioTask> sTask;
static std::unique_ptr<FatfsAudioInput> sFileSource;
static std::unique_ptr<I2SAudioOutput> sI2SOutput;
- static std::vector<std::unique_ptr<IAudioElement>> sPipeline;
static TrackQueue* sTrackQueue;
+ static std::optional<database::TrackId> sCurrentTrack;
};
namespace states {
diff --git a/src/audio/include/audio_sink.hpp b/src/audio/include/audio_sink.hpp
index ac007bf8..c9124688 100644
--- a/src/audio/include/audio_sink.hpp
+++ b/src/audio/include/audio_sink.hpp
@@ -10,35 +10,25 @@
#include "audio_element.hpp"
#include "esp_heap_caps.h"
#include "freertos/FreeRTOS.h"
+#include "idf_additions.h"
#include "stream_info.hpp"
+
namespace audio {
class IAudioSink {
private:
// TODO: tune. at least about 12KiB seems right for mp3
- static const std::size_t kDrainBufferSize = 48 * 1024;
- uint8_t* buffer_;
- StaticStreamBuffer_t* metadata_;
- StreamBufferHandle_t handle_;
+ static const std::size_t kDrainBufferSize = 24 * 1024;
+ StreamBufferHandle_t stream_;
public:
IAudioSink()
- : buffer_(reinterpret_cast<uint8_t*>(
- heap_caps_malloc(kDrainBufferSize,
- MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
- metadata_(reinterpret_cast<StaticStreamBuffer_t*>(
- heap_caps_malloc(sizeof(StaticStreamBuffer_t),
- MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT))),
- handle_(xStreamBufferCreateStatic(kDrainBufferSize,
- 1,
- buffer_,
- metadata_)) {}
-
- virtual ~IAudioSink() {
- vStreamBufferDelete(handle_);
- free(buffer_);
- free(metadata_);
- }
+ : stream_(xStreamBufferCreateWithCaps(
+ kDrainBufferSize,
+ 1,
+ MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT)) {}
+
+ virtual ~IAudioSink() { vStreamBufferDeleteWithCaps(stream_); }
virtual auto SetInUse(bool) -> void {}
@@ -51,7 +41,7 @@ class IAudioSink {
virtual auto Configure(const StreamInfo::Format& format) -> bool = 0;
virtual auto Send(const cpp::span<std::byte>& data) -> void = 0;
- auto buffer() -> StreamBufferHandle_t { return handle_; }
+ auto stream() -> StreamBufferHandle_t { return stream_; }
};
} // namespace audio
diff --git a/src/audio/include/audio_source.hpp b/src/audio/include/audio_source.hpp
new file mode 100644
index 00000000..e062fd1a
--- /dev/null
+++ b/src/audio/include/audio_source.hpp
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2023 jacqueline <me@jacqueline.id.au>
+ *
+ * SPDX-License-Identifier: GPL-3.0-only
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "freertos/FreeRTOS.h"
+#include "freertos/portmacro.h"
+#include "freertos/semphr.h"
+
+#include "stream_info.hpp"
+
+namespace audio {
+
+class IAudioSource {
+ public:
+ virtual ~IAudioSource() {}
+
+ /*
+ * Synchronously fetches data from this source.
+ */
+ virtual auto Read(std::function<bool(StreamInfo::Format)>,
+ std::function<size_t(cpp::span<const std::byte>)>,
+ TickType_t) -> void = 0;
+};
+
+} // namespace audio
diff --git a/src/audio/include/audio_task.hpp b/src/audio/include/audio_task.hpp
index f997caee..e316f17a 100644
--- a/src/audio/include/audio_task.hpp
+++ b/src/audio/include/audio_task.hpp
@@ -6,15 +6,54 @@
#pragma once
+#include <sys/_stdint.h>
+#include <cstdint>
+#include <memory>
+#include "audio_decoder.hpp"
#include "audio_sink.hpp"
+#include "audio_source.hpp"
+#include "codec.hpp"
#include "pipeline.hpp"
namespace audio {
-namespace task {
+class Timer {
+ public:
+ explicit Timer(StreamInfo::Pcm);
-auto StartPipeline(Pipeline* pipeline, IAudioSink* sink) -> void;
+ auto SetLengthSeconds(uint32_t) -> void;
+ auto SetLengthBytes(uint32_t) -> void;
-} // namespace task
+ auto AddBytes(std::size_t) -> void;
+
+ private:
+ StreamInfo::Pcm format_;
+
+ uint32_t last_seconds_;
+ uint32_t total_duration_seconds_;
+ float current_seconds_;
+};
+
+class AudioTask {
+ public:
+ static auto Start(IAudioSource* source, IAudioSink* sink) -> AudioTask*;
+
+ auto Main() -> void;
+
+ private:
+ AudioTask(IAudioSource* source, IAudioSink* sink);
+
+ IAudioSource* source_;
+ IAudioSink* sink_;
+ std::unique_ptr<codecs::ICodec> codec_;
+ std::unique_ptr<Timer> timer_;
+
+ bool is_new_stream_;
+ std::optional<StreamInfo::Format> current_input_format_;
+ std::optional<StreamInfo::Format> current_output_format_;
+
+ std::byte* sample_buffer_;
+ std::size_t sample_buffer_len_;
+};
} // namespace audio
diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp
index 56f92fcf..a1b9689b 100644
--- a/src/audio/include/fatfs_audio_input.hpp
+++ b/src/audio/include/fatfs_audio_input.hpp
@@ -6,57 +6,130 @@
#pragma once
+#include <cstddef>
#include <cstdint>
#include <future>
#include <memory>
#include <string>
-#include <vector>
-
-#include "arena.hpp"
-#include "chunk.hpp"
-#include "freertos/FreeRTOS.h"
#include "ff.h"
-#include "freertos/message_buffer.h"
-#include "freertos/queue.h"
-#include "span.hpp"
-#include "track.hpp"
-#include "audio_element.hpp"
-#include "stream_buffer.hpp"
+#include "audio_source.hpp"
+#include "freertos/portmacro.h"
+#include "future_fetcher.hpp"
#include "stream_info.hpp"
+#include "tag_parser.hpp"
#include "types.hpp"
namespace audio {
-class FatfsAudioInput : public IAudioElement {
+/*
+ * Handles coordination with a persistent background task to asynchronously
+ * read files from disk into a StreamBuffer.
+ */
+class FileStreamer {
public:
- FatfsAudioInput();
- ~FatfsAudioInput();
+ FileStreamer(StreamBufferHandle_t dest, SemaphoreHandle_t first_read);
+ ~FileStreamer();
+
+ /*
+ * Continues reading data into the destination buffer until the destination
+ * is full.
+ */
+ auto Fetch() -> void;
+
+ /* Returns true if the streamer has run out of data from the current file. */
+ auto HasFinished() -> bool;
+
+ /*
+ * Clears any remaining buffered data, and begins reading again from the
+ * given file. This function respects any seeking/reading that has already
+ * been done on the new source file.
+ */
+ auto Restart(std::unique_ptr<FIL>) -> void;
+
+ FileStreamer(const FileStreamer&) = delete;
+ FileStreamer& operator=(const FileStreamer&) = delete;
+
+ private:
+ // Note: private methods here should only be called from the streamer's task.
+
+ auto Main() -> void;
+ auto CloseFile() -> void;
+
+ enum Command {
+ kRestart,
+ kRefillBuffer,
+ kQuit,
+ };
+ QueueHandle_t control_;
+ StreamBufferHandle_t destination_;
+ SemaphoreHandle_t data_was_read_;
+
+ std::atomic<bool> has_data_;
+ std::unique_ptr<FIL> file_;
+ std::unique_ptr<FIL> next_file_;
+};
- auto CurrentFile() -> std::optional<std::string> { return current_path_; }
- auto OpenFile(std::future<std::optional<std::string>>&& path) -> void;
- auto OpenFile(const std::string& path) -> bool;
+/*
+ * Audio source that fetches data from a FatFs (or exfat i guess) filesystem.
+ *
+ * All public methods are safe to call from any task.
+ */
+class FatfsAudioInput : public IAudioSource {
+ public:
+ explicit FatfsAudioInput(std::shared_ptr<database::ITagParser> tag_parser);
+ ~FatfsAudioInput();
- auto NeedsToProcess() const -> bool override;
+ /*
+ * Immediately cease reading any current source, and begin reading from the
+ * given file path.
+ */
+ auto SetPath(std::future<std::optional<std::string>>) -> void;
+ auto SetPath(const std::string&) -> void;
+ auto SetPath() -> void;
- auto Process(const std::vector<InputStream>& inputs, OutputStream* output)
- -> void override;
+ auto Read(std::function<bool(StreamInfo::Format)>,
+ std::function<size_t(cpp::span<const std::byte>)>,
+ TickType_t) -> void override;
FatfsAudioInput(const FatfsAudioInput&) = delete;
FatfsAudioInput& operator=(const FatfsAudioInput&) = delete;
private:
+ // Note: private methods assume that the appropriate locks have already been
+ // acquired.
+
+ auto OpenFile(const std::string& path) -> void;
+ auto CloseCurrentFile() -> void;
+ auto HasDataRemaining() -> bool;
+
auto ContainerToStreamType(database::Encoding)
-> std::optional<codecs::StreamType>;
+ auto IsCurrentFormatMp3() -> bool;
+
+ std::shared_ptr<database::ITagParser> tag_parser_;
+
+ // Semaphore used to block when this source is out of data. This should be
+ // acquired before attempting to read data, and returned after each incomplete
+ // read.
+ SemaphoreHandle_t has_data_;
+
+ StreamBufferHandle_t streamer_buffer_;
+ std::unique_ptr<FileStreamer> streamer_;
+
+ StreamInfo file_buffer_info_;
+ std::size_t file_buffer_len_;
+ std::byte* file_buffer_;
+
+ RawStream file_buffer_stream_;
- std::optional<std::future<std::optional<std::string>>> pending_path_;
- std::optional<std::string> current_path_;
- FIL current_file_;
- bool is_file_open_;
- bool has_prepared_output_;
+ // Mutex guarding the current file/stream associated with this source. Must be
+ // held during readings, and before altering the current file.
+ std::mutex source_mutex_;
- std::optional<database::Encoding> current_container_;
+ std::unique_ptr<database::FutureFetcher<std::optional<std::string>>>
+ pending_path_;
std::optional<StreamInfo::Format> current_format_;
};
diff --git a/src/audio/include/stream_info.hpp b/src/audio/include/stream_info.hpp
index 69bf3c4b..00aa1110 100644
--- a/src/audio/include/stream_info.hpp
+++ b/src/audio/include/stream_info.hpp
@@ -15,6 +15,10 @@
#include <utility>
#include <variant>
+#include "freertos/FreeRTOS.h"
+#include "freertos/ringbuf.h"
+#include "freertos/stream_buffer.h"
+
#include "result.hpp"
#include "span.hpp"
#include "types.hpp"
diff --git a/src/audio/track_queue.cpp b/src/audio/track_queue.cpp
index 0709056f..721329f9 100644
--- a/src/audio/track_queue.cpp
+++ b/src/audio/track_queue.cpp
@@ -81,39 +81,45 @@ auto TrackQueue::GetUpcoming(std::size_t limit) const
auto TrackQueue::AddNext(database::TrackId t) -> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_front(t);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::AddNext(std::shared_ptr<playlist::ISource> src) -> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_front(src);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::IncludeNext(std::shared_ptr<playlist::IResetableSource> src)
-> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_front(src);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::AddLast(database::TrackId t) -> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_back(t);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::AddLast(std::shared_ptr<playlist::ISource> src) -> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_back(src);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::IncludeLast(std::shared_ptr<playlist::IResetableSource> src)
-> void {
const std::lock_guard<std::mutex> lock(mutex_);
enqueued_.push_back(src);
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = enqueued_.size() < 2});
}
auto TrackQueue::Next() -> void {
@@ -143,7 +149,8 @@ auto TrackQueue::Next() -> void {
}
}
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = true});
}
auto TrackQueue::Previous() -> void {
@@ -173,14 +180,16 @@ auto TrackQueue::Previous() -> void {
}
played_.pop_front();
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = true});
}
auto TrackQueue::Clear() -> void {
const std::lock_guard<std::mutex> lock(mutex_);
played_.clear();
enqueued_.clear();
- events::Dispatch<QueueUpdate, AudioState, ui::UiState>({});
+ events::Dispatch<QueueUpdate, AudioState, ui::UiState>(
+ QueueUpdate{.current_changed = true});
}
} // namespace audio