summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjacqueline <me@jacqueline.id.au>2023-11-07 10:32:07 +1100
committerjacqueline <me@jacqueline.id.au>2023-11-07 10:32:07 +1100
commit499d5a942fc2ad0149b0a16e978e090336dd8319 (patch)
tree1f671f0e3025b1350b25511e2442dfede19677df
parentd36fe9be6b522a3dade389213a0bb7e26a169627 (diff)
downloadtangara-fw-499d5a942fc2ad0149b0a16e978e090336dd8319.tar.gz
Add a wrapper codec source that does readahead
-rw-r--r--src/audio/CMakeLists.txt2
-rw-r--r--src/audio/audio_fsm.cpp3
-rw-r--r--src/audio/fatfs_audio_input.cpp9
-rw-r--r--src/audio/include/fatfs_audio_input.hpp4
-rw-r--r--src/audio/include/readahead_source.hpp53
-rw-r--r--src/audio/readahead_source.cpp124
-rw-r--r--src/codecs/source_buffer.cpp2
-rw-r--r--src/database/database.cpp7
-rw-r--r--src/database/env_esp.cpp2
-rw-r--r--src/database/include/database.hpp3
-rw-r--r--src/database/include/env_esp.hpp2
-rw-r--r--src/system_fsm/booting.cpp5
-rw-r--r--src/system_fsm/include/service_locator.hpp12
-rw-r--r--src/system_fsm/running.cpp5
-rw-r--r--src/tasks/tasks.cpp3
-rw-r--r--src/tasks/tasks.hpp4
16 files changed, 221 insertions, 19 deletions
diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt
index df5622f5..0cf8eacd 100644
--- a/src/audio/CMakeLists.txt
+++ b/src/audio/CMakeLists.txt
@@ -5,7 +5,7 @@
idf_component_register(
SRCS "audio_decoder.cpp" "fatfs_audio_input.cpp" "i2s_audio_output.cpp"
"track_queue.cpp" "audio_fsm.cpp" "audio_converter.cpp" "resample.cpp"
- "fatfs_source.cpp" "bt_audio_output.cpp"
+ "fatfs_source.cpp" "bt_audio_output.cpp" "readahead_source.cpp"
INCLUDE_DIRS "include"
REQUIRES "codecs" "drivers" "cbor" "result" "tasks" "span" "memory" "tinyfsm"
"database" "system_fsm" "playlist" "speexdsp")
diff --git a/src/audio/audio_fsm.cpp b/src/audio/audio_fsm.cpp
index 09a64db1..f43d0ce2 100644
--- a/src/audio/audio_fsm.cpp
+++ b/src/audio/audio_fsm.cpp
@@ -105,7 +105,8 @@ void Uninitialised::react(const system_fsm::BootComplete& ev) {
StreamBufferHandle_t stream = xStreamBufferCreateWithCaps(
kDrainBufferSize, sizeof(sample::Sample) * 2, MALLOC_CAP_DMA);
- sFileSource.reset(new FatfsAudioInput(sServices->tag_parser()));
+ sFileSource.reset(
+ new FatfsAudioInput(sServices->tag_parser(), sServices->bg_worker()));
sI2SOutput.reset(new I2SAudioOutput(stream, sServices->gpios()));
sBtOutput.reset(new BluetoothAudioOutput(stream, sServices->bluetooth()));
diff --git a/src/audio/fatfs_audio_input.cpp b/src/audio/fatfs_audio_input.cpp
index b919a3a8..e13ae793 100644
--- a/src/audio/fatfs_audio_input.cpp
+++ b/src/audio/fatfs_audio_input.cpp
@@ -23,6 +23,7 @@
#include "freertos/portmacro.h"
#include "freertos/projdefs.h"
#include "idf_additions.h"
+#include "readahead_source.hpp"
#include "span.hpp"
#include "audio_events.hpp"
@@ -42,9 +43,11 @@
namespace audio {
-FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser)
+FatfsAudioInput::FatfsAudioInput(database::ITagParser& tag_parser,
+ tasks::Worker& bg_worker)
: IAudioSource(),
tag_parser_(tag_parser),
+ bg_worker_(bg_worker),
new_stream_mutex_(),
new_stream_(),
has_new_stream_(false),
@@ -142,7 +145,9 @@ auto FatfsAudioInput::OpenFile(const std::pmr::string& path) -> bool {
return false;
}
- new_stream_.reset(new FatfsSource(stream_type.value(), std::move(file)));
+ auto source =
+ std::make_unique<FatfsSource>(stream_type.value(), std::move(file));
+ new_stream_.reset(new ReadaheadSource(bg_worker_, std::move(source)));
return true;
}
diff --git a/src/audio/include/fatfs_audio_input.hpp b/src/audio/include/fatfs_audio_input.hpp
index 08527350..9b516478 100644
--- a/src/audio/include/fatfs_audio_input.hpp
+++ b/src/audio/include/fatfs_audio_input.hpp
@@ -19,6 +19,7 @@
#include "codec.hpp"
#include "future_fetcher.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
#include "types.hpp"
namespace audio {
@@ -30,7 +31,7 @@ namespace audio {
*/
class FatfsAudioInput : public IAudioSource {
public:
- explicit FatfsAudioInput(database::ITagParser& tag_parser);
+ explicit FatfsAudioInput(database::ITagParser&, tasks::Worker&);
~FatfsAudioInput();
/*
@@ -54,6 +55,7 @@ class FatfsAudioInput : public IAudioSource {
-> std::optional<codecs::StreamType>;
database::ITagParser& tag_parser_;
+ tasks::Worker& bg_worker_;
std::mutex new_stream_mutex_;
std::shared_ptr<codecs::IStream> new_stream_;
diff --git a/src/audio/include/readahead_source.hpp b/src/audio/include/readahead_source.hpp
new file mode 100644
index 00000000..dea3ff3f
--- /dev/null
+++ b/src/audio/include/readahead_source.hpp
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2023 jacqueline <me@jacqueline.id.au>
+ *
+ * SPDX-License-Identifier: GPL-3.0-only
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+#include "freertos/FreeRTOS.h"
+
+#include "ff.h"
+#include "freertos/stream_buffer.h"
+
+#include "audio_source.hpp"
+#include "codec.hpp"
+#include "tasks.hpp"
+
+namespace audio {
+
+/*
+ * Wraps another stream, proactively buffering large chunks of it into memory
+ * at a time.
+ */
+class ReadaheadSource : public codecs::IStream {
+ public:
+ ReadaheadSource(tasks::Worker&, std::unique_ptr<codecs::IStream>);
+ ~ReadaheadSource();
+
+ auto Read(cpp::span<std::byte> dest) -> ssize_t override;
+
+ auto CanSeek() -> bool override;
+
+ auto SeekTo(int64_t destination, SeekFrom from) -> void override;
+
+ auto CurrentPosition() -> int64_t override;
+
+ ReadaheadSource(const ReadaheadSource&) = delete;
+ ReadaheadSource& operator=(const ReadaheadSource&) = delete;
+
+ private:
+ tasks::Worker& worker_;
+ std::unique_ptr<codecs::IStream> wrapped_;
+
+ std::atomic<bool> is_refilling_;
+ StreamBufferHandle_t buffer_;
+ int64_t tell_;
+};
+
+} // namespace audio \ No newline at end of file
diff --git a/src/audio/readahead_source.cpp b/src/audio/readahead_source.cpp
new file mode 100644
index 00000000..85922425
--- /dev/null
+++ b/src/audio/readahead_source.cpp
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2023 jacqueline <me@jacqueline.id.au>
+ *
+ * SPDX-License-Identifier: GPL-3.0-only
+ */
+
+#include "readahead_source.hpp"
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+#include "esp_heap_caps.h"
+#include "esp_log.h"
+#include "ff.h"
+
+#include "audio_source.hpp"
+#include "codec.hpp"
+#include "freertos/portmacro.h"
+#include "idf_additions.h"
+#include "spi.hpp"
+#include "tasks.hpp"
+#include "types.hpp"
+
+namespace audio {
+
+static constexpr char kTag[] = "readahead";
+static constexpr size_t kBufferSize = 1024 * 512;
+
+ReadaheadSource::ReadaheadSource(tasks::Worker& worker,
+ std::unique_ptr<codecs::IStream> wrapped)
+ : IStream(wrapped->type()),
+ worker_(worker),
+ wrapped_(std::move(wrapped)),
+ is_refilling_(false),
+ buffer_(xStreamBufferCreateWithCaps(kBufferSize, 1, MALLOC_CAP_SPIRAM)),
+ tell_(wrapped_->CurrentPosition()) {}
+
+ReadaheadSource::~ReadaheadSource() {
+ vStreamBufferDeleteWithCaps(buffer_);
+}
+
+auto ReadaheadSource::Read(cpp::span<std::byte> dest) -> ssize_t {
+ // Optismise for the most frequent case: the buffer already contains enough
+ // data for this call.
+ size_t bytes_read =
+ xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0);
+
+ tell_ += bytes_read;
+ if (bytes_read == dest.size_bytes()) {
+ return bytes_read;
+ }
+
+ dest = dest.subspan(bytes_read);
+
+ // Are we currently fetching more bytes?
+ ssize_t extra_bytes = 0;
+ if (!is_refilling_) {
+ // No! Pass through directly to the wrapped source for the fastest
+ // response.
+ extra_bytes = wrapped_->Read(dest);
+ } else {
+ // Yes! Wait for the refill to catch up, then try again.
+ is_refilling_.wait(true);
+ extra_bytes =
+ xStreamBufferReceive(buffer_, dest.data(), dest.size_bytes(), 0);
+ }
+
+ // No need to check whether the dest buffer is actually filled, since at this
+ // point we've read as many bytes as were available.
+ tell_ += extra_bytes;
+ bytes_read += extra_bytes;
+
+ // Before returning, make sure the readahead task is kicked off again.
+ ESP_LOGI(kTag, "triggering readahead");
+ is_refilling_ = true;
+ std::function<void(void)> refill = [this]() {
+ // Try to keep larger than most reasonable FAT sector sizes for more
+ // efficient disk reads.
+ constexpr size_t kMaxSingleRead = 1024 * 64;
+ std::byte working_buf[kMaxSingleRead];
+ for (;;) {
+ size_t bytes_to_read = std::min<size_t>(
+ kMaxSingleRead, xStreamBufferSpacesAvailable(buffer_));
+ if (bytes_to_read == 0) {
+ break;
+ }
+ size_t read = wrapped_->Read({working_buf, bytes_to_read});
+ if (read > 0) {
+ xStreamBufferSend(buffer_, working_buf, read, 0);
+ }
+ if (read < bytes_to_read) {
+ break;
+ }
+ }
+ is_refilling_ = false;
+ is_refilling_.notify_all();
+ };
+ worker_.Dispatch(refill);
+
+ return bytes_read;
+}
+
+auto ReadaheadSource::CanSeek() -> bool {
+ return wrapped_->CanSeek();
+}
+
+auto ReadaheadSource::SeekTo(int64_t destination, SeekFrom from) -> void {
+ // Seeking blows away all of our prefetched data. To do this safely, we
+ // first need to wait for the refill task to finish.
+ is_refilling_.wait(true);
+ // It's now safe to clear out the buffer.
+ xStreamBufferReset(buffer_);
+
+ wrapped_->SeekTo(destination, from);
+
+ // Make sure our tell is up to date with the new location.
+ tell_ = wrapped_->CurrentPosition();
+}
+
+auto ReadaheadSource::CurrentPosition() -> int64_t {
+ return tell_;
+}
+} // namespace audio
diff --git a/src/codecs/source_buffer.cpp b/src/codecs/source_buffer.cpp
index 8e40ba42..bf8951f3 100644
--- a/src/codecs/source_buffer.cpp
+++ b/src/codecs/source_buffer.cpp
@@ -18,7 +18,7 @@
namespace codecs {
[[maybe_unused]] static constexpr char kTag[] = "dec_buf";
-static constexpr size_t kBufferSize = 1024 * 128;
+static constexpr size_t kBufferSize = 1024 * 16;
static constexpr size_t kReadThreshold = 1024 * 8;
SourceBuffer::SourceBuffer()
diff --git a/src/database/database.cpp b/src/database/database.cpp
index e826f576..e6cb85ed 100644
--- a/src/database/database.cpp
+++ b/src/database/database.cpp
@@ -117,16 +117,15 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool {
auto Database::Open(IFileGatherer& gatherer,
ITagParser& parser,
- locale::ICollator& collator)
+ locale::ICollator& collator,
+ tasks::Worker& bg_worker)
-> cpp::result<Database*, DatabaseError> {
- // TODO(jacqueline): Why isn't compare_and_exchange_* available?
if (sIsDbOpen.exchange(true)) {
return cpp::fail(DatabaseError::ALREADY_OPEN);
}
if (!leveldb::sBackgroundThread) {
- leveldb::sBackgroundThread.reset(
- tasks::Worker::Start<tasks::Type::kDatabaseBackground>());
+ leveldb::sBackgroundThread = &bg_worker;
}
std::shared_ptr<tasks::Worker> worker(
diff --git a/src/database/env_esp.cpp b/src/database/env_esp.cpp
index 8cddd09a..8cd11727 100644
--- a/src/database/env_esp.cpp
+++ b/src/database/env_esp.cpp
@@ -41,7 +41,7 @@
namespace leveldb {
-std::shared_ptr<tasks::Worker> sBackgroundThread;
+tasks::Worker *sBackgroundThread = nullptr;
std::string ErrToStr(FRESULT err) {
switch (err) {
diff --git a/src/database/include/database.hpp b/src/database/include/database.hpp
index 5eb3a8e9..fb58f3e7 100644
--- a/src/database/include/database.hpp
+++ b/src/database/include/database.hpp
@@ -95,7 +95,8 @@ class Database {
};
static auto Open(IFileGatherer& file_gatherer,
ITagParser& tag_parser,
- locale::ICollator& collator)
+ locale::ICollator& collator,
+ tasks::Worker& bg_worker)
-> cpp::result<Database*, DatabaseError>;
static auto Destroy() -> void;
diff --git a/src/database/include/env_esp.hpp b/src/database/include/env_esp.hpp
index eba6e8a9..dd0159b2 100644
--- a/src/database/include/env_esp.hpp
+++ b/src/database/include/env_esp.hpp
@@ -18,7 +18,7 @@
namespace leveldb {
-extern std::shared_ptr<tasks::Worker> sBackgroundThread;
+extern tasks::Worker* sBackgroundThread;
// Tracks the files locked by EspEnv::LockFile().
//
diff --git a/src/system_fsm/booting.cpp b/src/system_fsm/booting.cpp
index 893a4560..f509c52e 100644
--- a/src/system_fsm/booting.cpp
+++ b/src/system_fsm/booting.cpp
@@ -32,6 +32,7 @@
#include "spi.hpp"
#include "system_events.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
#include "touchwheel.hpp"
#include "track_queue.hpp"
#include "ui_fsm.hpp"
@@ -63,6 +64,10 @@ auto Booting::entry() -> void {
return;
}
+ ESP_LOGI(kTag, "starting bg worker");
+ sServices->bg_worker(std::unique_ptr<tasks::Worker>{
+ tasks::Worker::Start<tasks::Type::kDatabaseBackground>()});
+
ESP_LOGI(kTag, "installing remaining drivers");
sServices->samd(std::unique_ptr<drivers::Samd>(drivers::Samd::Create()));
vTaskDelay(pdMS_TO_TICKS(1000));
diff --git a/src/system_fsm/include/service_locator.hpp b/src/system_fsm/include/service_locator.hpp
index 24dc1eb9..327d0c50 100644
--- a/src/system_fsm/include/service_locator.hpp
+++ b/src/system_fsm/include/service_locator.hpp
@@ -17,6 +17,7 @@
#include "samd.hpp"
#include "storage.hpp"
#include "tag_parser.hpp"
+#include "tasks.hpp"
#include "touchwheel.hpp"
#include "track_queue.hpp"
@@ -111,6 +112,15 @@ class ServiceLocator {
collator_ = std::move(i);
}
+ auto bg_worker() -> tasks::Worker& {
+ assert(bg_worker_ != nullptr);
+ return *bg_worker_;
+ }
+
+ auto bg_worker(std::unique_ptr<tasks::Worker> w) -> void {
+ bg_worker_ = std::move(w);
+ }
+
// Not copyable or movable.
ServiceLocator(const ServiceLocator&) = delete;
ServiceLocator& operator=(const ServiceLocator&) = delete;
@@ -129,6 +139,8 @@ class ServiceLocator {
std::unique_ptr<database::ITagParser> tag_parser_;
std::unique_ptr<locale::ICollator> collator_;
+ std::unique_ptr<tasks::Worker> bg_worker_;
+
drivers::SdState sd_;
};
diff --git a/src/system_fsm/running.cpp b/src/system_fsm/running.cpp
index 91cd46af..9c556e0a 100644
--- a/src/system_fsm/running.cpp
+++ b/src/system_fsm/running.cpp
@@ -55,8 +55,9 @@ void Running::entry() {
ESP_LOGI(kTag, "opening database");
sFileGatherer = new database::FileGathererImpl();
- auto database_res = database::Database::Open(
- *sFileGatherer, sServices->tag_parser(), sServices->collator());
+ auto database_res =
+ database::Database::Open(*sFileGatherer, sServices->tag_parser(),
+ sServices->collator(), sServices->bg_worker());
if (database_res.has_error()) {
ESP_LOGW(kTag, "failed to open!");
events::System().Dispatch(StorageError{});
diff --git a/src/tasks/tasks.cpp b/src/tasks/tasks.cpp
index ca93336f..8ff57d4a 100644
--- a/src/tasks/tasks.cpp
+++ b/src/tasks/tasks.cpp
@@ -195,8 +195,7 @@ Worker::~Worker() {
}
template <>
-auto Worker::Dispatch(const std::function<void(void)>& fn)
- -> std::future<void> {
+auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void> {
std::shared_ptr<std::promise<void>> promise =
std::make_shared<std::promise<void>>();
WorkItem item{
diff --git a/src/tasks/tasks.hpp b/src/tasks/tasks.hpp
index 2f008120..06cbd26b 100644
--- a/src/tasks/tasks.hpp
+++ b/src/tasks/tasks.hpp
@@ -106,7 +106,7 @@ class Worker {
* asynchronously returns the result as a future.
*/
template <typename T>
- auto Dispatch(const std::function<T(void)>& fn) -> std::future<T> {
+ auto Dispatch(const std::function<T(void)> fn) -> std::future<T> {
std::shared_ptr<std::promise<T>> promise =
std::make_shared<std::promise<T>>();
WorkItem item{
@@ -125,6 +125,6 @@ class Worker {
/* Specialisation of Evaluate for functions that return nothing. */
template <>
-auto Worker::Dispatch(const std::function<void(void)>& fn) -> std::future<void>;
+auto Worker::Dispatch(const std::function<void(void)> fn) -> std::future<void>;
} // namespace tasks