diff options
Diffstat (limited to 'src/audio/fatfs_audio_input.cpp')
| -rw-r--r-- | src/audio/fatfs_audio_input.cpp | 354 |
1 files changed, 241 insertions, 113 deletions
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp index da605a40..6a320a5a 100644 --- a/src/audio/fatfs_audio_input.cpp +++ b/src/audio/fatfs_audio_input.cpp @@ -5,96 +5,276 @@ */ #include "fatfs_audio_input.hpp" + #include <stdint.h> #include <algorithm> -#include <chrono> +#include <climits> #include <cstddef> #include <cstdint> +#include <functional> #include <future> #include <memory> +#include <mutex> #include <string> #include <variant> -#include "arena.hpp" -#include "audio_events.hpp" -#include "audio_fsm.hpp" #include "esp_heap_caps.h" #include "esp_log.h" -#include "event_queue.hpp" #include "ff.h" -#include "freertos/portmacro.h" -#include "audio_element.hpp" -#include "chunk.hpp" -#include "stream_buffer.hpp" -#include "stream_event.hpp" +#include "audio_events.hpp" +#include "audio_fsm.hpp" +#include "audio_source.hpp" +#include "event_queue.hpp" +#include "freertos/portmacro.h" +#include "freertos/projdefs.h" +#include "future_fetcher.hpp" +#include "span.hpp" #include "stream_info.hpp" -#include "stream_message.hpp" #include "tag_parser.hpp" -#include "track.hpp" +#include "tasks.hpp" #include "types.hpp" static const char* kTag = "SRC"; namespace audio { -FatfsAudioInput::FatfsAudioInput() - : IAudioElement(), +static constexpr UINT kFileBufferSize = 4096 * 2; +static constexpr UINT kStreamerBufferSize = 1024; + +static StreamBufferHandle_t sForwardDest = nullptr; + +auto forward_cb(const BYTE* buf, UINT buf_length) -> UINT { + if (buf_length == 0) { + return !xStreamBufferIsFull(sForwardDest); + } else { + return xStreamBufferSend(sForwardDest, buf, buf_length, 0); + } +} + +FileStreamer::FileStreamer(StreamBufferHandle_t dest, + SemaphoreHandle_t data_was_read) + : control_(xQueueCreate(1, sizeof(Command))), + destination_(dest), + data_was_read_(data_was_read), + has_data_(false), + file_(), + next_file_() { + assert(sForwardDest == nullptr); + sForwardDest = dest; + tasks::StartPersistent<tasks::Type::kFileStreamer>([this]() { Main(); }); +} + +FileStreamer::~FileStreamer() { + sForwardDest = nullptr; + Command quit = kQuit; + xQueueSend(control_, &quit, portMAX_DELAY); + vQueueDelete(control_); +} + +auto FileStreamer::Main() -> void { + for (;;) { + Command cmd; + xQueueReceive(control_, &cmd, portMAX_DELAY); + + if (cmd == kQuit) { + break; + } else if (cmd == kRestart) { + CloseFile(); + xStreamBufferReset(destination_); + file_ = std::move(next_file_); + has_data_ = file_ != nullptr; + } else if (cmd == kRefillBuffer && file_) { + UINT bytes_sent = 0; // Unused. + // Use f_forward to push bytes directly from FATFS internal buffers into + // the destination. This has the nice side effect of letting FATFS decide + // the most efficient way to pull in data from disk; usually one whole + // sector at a time. Consult the FATFS lib application notes if changing + // this to use f_read. + FRESULT res = f_forward(file_.get(), forward_cb, UINT_MAX, &bytes_sent); + if (res != FR_OK || f_eof(file_.get())) { + CloseFile(); + has_data_ = false; + } + if (bytes_sent > 0) { + xSemaphoreGive(data_was_read_); + } + } + } + + ESP_LOGW(kTag, "quit file streamer"); + CloseFile(); + vTaskDelete(NULL); +} + +auto FileStreamer::Fetch() -> void { + if (!has_data_.load()) { + return; + } + Command refill = kRefillBuffer; + xQueueSend(control_, &refill, portMAX_DELAY); +} + +auto FileStreamer::HasFinished() -> bool { + return !has_data_.load(); +} + +auto FileStreamer::Restart(std::unique_ptr<FIL> new_file) -> void { + next_file_ = std::move(new_file); + Command restart = kRestart; + xQueueSend(control_, &restart, portMAX_DELAY); + Command fill = kRefillBuffer; + xQueueSend(control_, &fill, portMAX_DELAY); +} + +auto FileStreamer::CloseFile() -> void { + if (!file_) { + return; + } + ESP_LOGI(kTag, "closing file"); + f_close(file_.get()); + file_ = {}; + events::Dispatch<internal::InputFileClosed, AudioState>({}); +} + +FatfsAudioInput::FatfsAudioInput( + std::shared_ptr<database::ITagParser> tag_parser) + : IAudioSource(), + tag_parser_(tag_parser), + has_data_(xSemaphoreCreateBinary()), + streamer_buffer_(xStreamBufferCreate(kStreamerBufferSize, 1)), + streamer_(new FileStreamer(streamer_buffer_, has_data_)), + file_buffer_info_(), + file_buffer_len_(kFileBufferSize), + file_buffer_(reinterpret_cast<std::byte*>( + heap_caps_malloc(file_buffer_len_, + MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL))), + file_buffer_stream_(&file_buffer_info_, {file_buffer_, file_buffer_len_}), + source_mutex_(), pending_path_(), - current_file_(), - is_file_open_(false), - has_prepared_output_(false), - current_container_(), current_format_() {} -FatfsAudioInput::~FatfsAudioInput() {} +FatfsAudioInput::~FatfsAudioInput() { + streamer_.reset(); + vStreamBufferDelete(streamer_buffer_); + vSemaphoreDelete(has_data_); + free(file_buffer_); +} -auto FatfsAudioInput::OpenFile(std::future<std::optional<std::string>>&& path) +auto FatfsAudioInput::SetPath(std::future<std::optional<std::string>> fut) -> void { - pending_path_ = std::move(path); + std::lock_guard<std::mutex> lock{source_mutex_}; + + CloseCurrentFile(); + pending_path_.reset( + new database::FutureFetcher<std::optional<std::string>>(std::move(fut))); + + xSemaphoreGive(has_data_); } -auto FatfsAudioInput::OpenFile(const std::string& path) -> bool { - current_path_.reset(); - if (is_file_open_) { - f_close(¤t_file_); - is_file_open_ = false; - has_prepared_output_ = false; - } +auto FatfsAudioInput::SetPath(const std::string& path) -> void { + std::lock_guard<std::mutex> lock{source_mutex_}; + + CloseCurrentFile(); + OpenFile(path); +} + +auto FatfsAudioInput::SetPath() -> void { + std::lock_guard<std::mutex> lock{source_mutex_}; + CloseCurrentFile(); +} +auto FatfsAudioInput::Read( + std::function<bool(StreamInfo::Format)> can_read, + std::function<size_t(cpp::span<const std::byte>)> read, + TickType_t max_wait) -> void { + // Wait until we have data to return. + xSemaphoreTake(has_data_, portMAX_DELAY); + + // Ensure the file doesn't change whilst we're trying to get data about it. + std::lock_guard<std::mutex> source_lock{source_mutex_}; + + // If the path is a future, then wait for it to complete. + // TODO(jacqueline): We should really make some kind of FreeRTOS-integrated + // way to block a task whilst awaiting a future. if (pending_path_) { - pending_path_ = {}; + while (!pending_path_->Finished()) { + vTaskDelay(pdMS_TO_TICKS(100)); + } + auto res = pending_path_->Result(); + pending_path_.reset(); + + if (res || *res) { + OpenFile(**res); + } + + // Bail out now that we've resolved the future. If we end up successfully + // readinig from the path, then has_data will be flagged again. + return; + } + + // Move data from the file streamer's buffer into our file buffer. We need our + // own buffer so that we can handle concatenating smaller file chunks into + // complete frames for the decoder. + OutputStream writer{&file_buffer_stream_}; + std::size_t bytes_added = + xStreamBufferReceive(streamer_buffer_, writer.data().data(), + writer.data().size_bytes(), pdMS_TO_TICKS(0)); + writer.add(bytes_added); + + // HACK: libmad needs at least MAD_HEADER_GUARD (= 8) extra bytes following a + // frame, or else it refuses to decode it. + if (IsCurrentFormatMp3() && !HasDataRemaining()) { + ESP_LOGI(kTag, "applying MAD_HEADER_GUARD fix"); + cpp::span<std::byte> buf = writer.data(); + size_t pad_amount = std::min<size_t>(buf.size_bytes(), 8); + std::fill_n(buf.begin(), pad_amount, static_cast<std::byte>(0)); } + InputStream reader{&file_buffer_stream_}; + auto data_for_cb = reader.data(); + if (!data_for_cb.empty() && std::invoke(can_read, *current_format_)) { + reader.consume(std::invoke(read, reader.data())); + } + + if (!HasDataRemaining()) { + // Out of data. We're finished. Note we don't care about anything left in + // the file buffer at this point; the callback as seen it, so if it didn't + // consume it then presumably whatever is left isn't enough to form a + // complete frame. + ESP_LOGI(kTag, "finished streaming file"); + CloseCurrentFile(); + } else { + // There is still data to be read, or sitting in the buffer. + streamer_->Fetch(); + xSemaphoreGive(has_data_); + } +} + +auto FatfsAudioInput::OpenFile(const std::string& path) -> void { ESP_LOGI(kTag, "opening file %s", path.c_str()); FILINFO info; if (f_stat(path.c_str(), &info) != FR_OK) { ESP_LOGE(kTag, "failed to stat file"); + return; } - database::TagParserImpl tag_parser; database::TrackTags tags; - if (!tag_parser.ReadAndParseTags(path, &tags)) { + if (!tag_parser_->ReadAndParseTags(path, &tags)) { ESP_LOGE(kTag, "failed to read tags"); - return false; + return; } auto stream_type = ContainerToStreamType(tags.encoding()); if (!stream_type.has_value()) { ESP_LOGE(kTag, "couldn't match container to stream"); - return false; + return; } - current_container_ = tags.encoding(); - if (*stream_type == codecs::StreamType::kPcm && tags.channels && tags.bits_per_sample && tags.channels) { - // WAV files are a special case bc they contain raw PCM streams. These don't - // need decoding, but we *do* need to parse the PCM format from the header. - // TODO(jacqueline): Maybe we should have a decoder for this just to deal - // with endianness differences? current_format_ = StreamInfo::Pcm{ .channels = static_cast<uint8_t>(*tags.channels), .bits_per_sample = static_cast<uint8_t>(*tags.bits_per_sample), @@ -107,89 +287,26 @@ auto FatfsAudioInput::OpenFile(const std::string& path) -> bool { }; } - FRESULT res = f_open(¤t_file_, path.c_str(), FA_READ); + std::unique_ptr<FIL> file = std::make_unique<FIL>(); + FRESULT res = f_open(file.get(), path.c_str(), FA_READ); if (res != FR_OK) { ESP_LOGE(kTag, "failed to open file! res: %i", res); - return false; + return; } + streamer_->Restart(std::move(file)); + events::Dispatch<internal::InputFileOpened, AudioState>({}); - current_path_ = path; - is_file_open_ = true; - return true; } -auto FatfsAudioInput::NeedsToProcess() const -> bool { - return is_file_open_ || pending_path_; +auto FatfsAudioInput::CloseCurrentFile() -> void { + streamer_->Restart({}); + xStreamBufferReset(streamer_buffer_); + current_format_ = {}; } -auto FatfsAudioInput::Process(const std::vector<InputStream>& inputs, - OutputStream* output) -> void { - // If the next path is being given to us asynchronously, then we need to check - // in regularly to see if it's available yet. - if (pending_path_) { - if (!pending_path_->valid()) { - pending_path_ = {}; - } else { - if (pending_path_->wait_for(std::chrono::seconds(0)) == - std::future_status::ready) { - auto result = pending_path_->get(); - if (result && result != current_path_) { - OpenFile(*result); - } - pending_path_ = {}; - } - } - } - - if (!is_file_open_) { - return; - } - - // If the output buffer isn't ready for a new stream, then we need to wait. - if (!has_prepared_output_ && !output->prepare(*current_format_)) { - return; - } - has_prepared_output_ = true; - - // Performing many small reads is inefficient; it's better to do fewer, larger - // reads. Try to achieve this by only reading in new bytes if the output - // buffer has been mostly drained. - std::size_t max_size = output->data().size_bytes(); - if (max_size < output->data().size_bytes() / 2) { - return; - } - - std::size_t size = 0; - FRESULT result = - f_read(¤t_file_, output->data().data(), max_size, &size); - if (result != FR_OK) { - ESP_LOGE(kTag, "file I/O error %d", result); - output->mark_producer_finished(); - // TODO(jacqueline): Handle errors. - return; - } - - output->add(size); - - if (size < max_size || f_eof(¤t_file_)) { - // HACK: In order to decode the last frame of a file, libmad requires 8 - // 0-bytes ( == MAD_GUARD_BYTES) to be appended to the end of the stream. - // It would be better to do this within mad.cpp, but so far it's the only - // decoder that has such a requirement. - if (current_container_ == database::Encoding::kMp3) { - std::fill_n(output->data().begin(), 8, std::byte(0)); - output->add(8); - } - - f_close(¤t_file_); - is_file_open_ = false; - current_path_.reset(); - has_prepared_output_ = false; - output->mark_producer_finished(); - - events::Dispatch<internal::InputFileClosed, AudioState>({}); - } +auto FatfsAudioInput::HasDataRemaining() -> bool { + return !xStreamBufferIsEmpty(streamer_buffer_) || !streamer_->HasFinished(); } auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc) @@ -209,4 +326,15 @@ auto FatfsAudioInput::ContainerToStreamType(database::Encoding enc) } } +auto FatfsAudioInput::IsCurrentFormatMp3() -> bool { + if (!current_format_) { + return false; + } + if (!std::holds_alternative<StreamInfo::Encoded>(*current_format_)) { + return false; + } + return std::get<StreamInfo::Encoded>(*current_format_).type == + codecs::StreamType::kMp3; +} + } // namespace audio |
