diff options
| author | jacqueline <me@jacqueline.id.au> | 2024-08-09 11:43:48 +1000 |
|---|---|---|
| committer | jacqueline <me@jacqueline.id.au> | 2024-08-12 13:20:08 +1000 |
| commit | 2ad83cb2108dc55c9eb0573b0645513a1e8a61f5 (patch) | |
| tree | 5a4c588763bb36497c90d3f51660ca6400d5c364 /src/tangara/database/database.cpp | |
| parent | 28cf749951a8f811606bb233efecfd36738c3c89 (diff) | |
| download | tangara-fw-2ad83cb2108dc55c9eb0573b0645513a1e8a61f5.tar.gz | |
Shard searching for new tracks across multiple tasks
This also has the effect of breaking up the enormous 'updateIndexes'
method into one call per file, which means database updates also no
longer monopolise a single background task for their entire duration.
avg. time per new file is now <140ms for a completely fresh database,
which is pretty good i think!
Diffstat (limited to 'src/tangara/database/database.cpp')
| -rw-r--r-- | src/tangara/database/database.cpp | 253 |
1 files changed, 132 insertions, 121 deletions
diff --git a/src/tangara/database/database.cpp b/src/tangara/database/database.cpp index 2d72fe95..491ad8b7 100644 --- a/src/tangara/database/database.cpp +++ b/src/tangara/database/database.cpp @@ -6,9 +6,6 @@ #include "database/database.hpp" -#include <stdint.h> -#include <sys/_stdint.h> - #include <algorithm> #include <cstdint> #include <functional> @@ -20,12 +17,8 @@ #include <string> #include <variant> -#include "collation.hpp" #include "cppbor.h" #include "cppbor_parse.h" -#include "database/index.hpp" -#include "database/track_finder.hpp" -#include "debug.hpp" #include "esp_log.h" #include "esp_timer.h" #include "ff.h" @@ -39,12 +32,14 @@ #include "leveldb/status.h" #include "leveldb/write_batch.h" +#include "collation.hpp" #include "database/db_events.hpp" #include "database/env_esp.hpp" +#include "database/index.hpp" #include "database/records.hpp" #include "database/tag_parser.hpp" #include "database/track.hpp" -#include "drivers/spi.hpp" +#include "database/track_finder.hpp" #include "events/event_queue.hpp" #include "memory_resource.hpp" #include "result.hpp" @@ -58,12 +53,16 @@ static SingletonEnv<leveldb::EspEnv> sEnv; static const char kDbPath[] = "/.tangara-db"; static const char kKeyDbVersion[] = "schema_version"; - static const char kKeyCustom[] = "U\0"; static const char kKeyCollator[] = "collator"; +static constexpr size_t kMaxParallelism = 2; + static std::atomic<bool> sIsDbOpen(false); +using std::placeholders::_1; +using std::placeholders::_2; + static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col) -> leveldb::DB* { Database::Destroy(); @@ -167,7 +166,8 @@ auto Database::Open(ITagParser& parser, } ESP_LOGI(kTag, "Database opened successfully"); - return new Database(db, cache.release(), parser, collator); + return new Database(db, cache.release(), bg_worker, parser, + collator); }) .get(); } @@ -180,15 +180,20 @@ auto Database::Destroy() -> void { Database::Database(leveldb::DB* db, leveldb::Cache* cache, + tasks::WorkerPool& pool, ITagParser& tag_parser, locale::ICollator& collator) : db_(db), cache_(cache), + track_finder_( + pool, + kMaxParallelism, + std::bind(&Database::processCandidateCallback, this, _1, _2), + std::bind(&Database::indexingCompleteCallback, this)), tag_parser_(tag_parser), collator_(collator), is_updating_(false) { dbCalculateNextTrackId(); - ESP_LOGI(kTag, "next track id is %lu", next_track_id_.load()); } Database::~Database() { @@ -243,7 +248,7 @@ auto Database::get(const std::string& key) -> std::optional<std::string> { } auto Database::getTrackPath(TrackId id) -> std::optional<std::string> { - auto track_data = dbGetTrackData(id); + auto track_data = dbGetTrackData(leveldb::ReadOptions(), id); if (!track_data) { return {}; } @@ -251,7 +256,7 @@ auto Database::getTrackPath(TrackId id) -> std::optional<std::string> { } auto Database::getTrack(TrackId id) -> std::shared_ptr<Track> { - std::shared_ptr<TrackData> data = dbGetTrackData(id); + std::shared_ptr<TrackData> data = dbGetTrackData(leveldb::ReadOptions(), id); if (!data || data->is_tombstoned) { return {}; } @@ -274,34 +279,61 @@ auto Database::getIndexes() -> std::vector<IndexInfo> { }; } -class UpdateNotifier { - public: - UpdateNotifier(std::atomic<bool>& is_updating) : is_updating_(is_updating) { - events::Ui().Dispatch(event::UpdateStarted{}); - events::System().Dispatch(event::UpdateStarted{}); +Database::UpdateTracker::UpdateTracker() + : num_old_tracks_(0), + num_new_tracks_(0), + start_time_(esp_timer_get_time()) { + events::Ui().Dispatch(event::UpdateStarted{}); + events::System().Dispatch(event::UpdateStarted{}); +} + +Database::UpdateTracker::~UpdateTracker() { + uint64_t end_time = esp_timer_get_time(); + + uint64_t time_per_old = 0; + if (num_old_tracks_) { + time_per_old = (verification_finish_time_ - start_time_) / num_old_tracks_; } - ~UpdateNotifier() { - is_updating_ = false; - events::Ui().Dispatch(event::UpdateFinished{}); - events::System().Dispatch(event::UpdateFinished{}); + uint64_t time_per_new = 0; + if (num_new_tracks_) { + time_per_new = (end_time - verification_finish_time_) / num_new_tracks_; } - private: - std::atomic<bool>& is_updating_; -}; + ESP_LOGI( + kTag, + "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " + "per old, %llums per new)", + num_old_tracks_, num_new_tracks_, (end_time - start_time_) / 1000000, + time_per_old / 1000, time_per_new / 1000); + + events::Ui().Dispatch(event::UpdateFinished{}); + events::System().Dispatch(event::UpdateFinished{}); +} + +auto Database::UpdateTracker::onTrackVerified() -> void { + events::Ui().Dispatch(event::UpdateProgress{ + .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, + .val = ++num_old_tracks_, + }); +} + +auto Database::UpdateTracker::onVerificationFinished() -> void { + verification_finish_time_ = esp_timer_get_time(); +} + +auto Database::UpdateTracker::onTrackAdded() -> void { + num_new_tracks_++; +} auto Database::updateIndexes() -> void { if (is_updating_.exchange(true)) { return; } - UpdateNotifier notifier{is_updating_}; - - uint32_t num_old_tracks = 0; - uint32_t num_new_tracks = 0; - uint64_t start_time = esp_timer_get_time(); + update_tracker_ = std::make_unique<UpdateTracker>(); leveldb::ReadOptions read_options; - read_options.fill_cache = true; + read_options.fill_cache = false; + read_options.verify_checksums = true; // Stage 1: verify all existing tracks are still valid. ESP_LOGI(kTag, "verifying existing tracks"); @@ -310,11 +342,7 @@ auto Database::updateIndexes() -> void { std::string prefix = EncodeDataPrefix(); for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix); it->Next()) { - num_old_tracks++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, - .val = num_old_tracks, - }); + update_tracker_->onTrackVerified(); std::shared_ptr<TrackData> track = ParseDataValue(it->value()); if (!track) { @@ -325,7 +353,6 @@ auto Database::updateIndexes() -> void { } if (track->is_tombstoned) { - ESP_LOGW(kTag, "skipping tombstoned %lx", track->id); continue; } @@ -392,103 +419,86 @@ auto Database::updateIndexes() -> void { } } - uint64_t verify_end_time = esp_timer_get_time(); + update_tracker_->onVerificationFinished(); // Stage 2: search for newly added files. ESP_LOGI(kTag, "scanning for new tracks"); - uint64_t num_files = 0; - - auto track_finder = std::make_shared<TrackFinder>(""); + track_finder_.launch(""); +}; - FILINFO info; - while (auto path = track_finder->next(info)) { - num_files++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kScanningForNewTracks, - .val = num_files, - }); +auto Database::processCandidateCallback(FILINFO& info, std::string_view path) + -> void { + leveldb::ReadOptions read_options; + read_options.fill_cache = true; + read_options.verify_checksums = false; - std::string unused; - if (db_->Get(read_options, EncodePathKey(*path), &unused).ok()) { - // This file is already in the database; skip it. - continue; - } + std::string unused; + if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) { + // This file is already in the database; skip it. + return; + } - std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(*path); - if (!tags || tags->encoding() == Container::kUnsupported) { - // No parseable tags; skip this fiile. - continue; - } + std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(path); + if (!tags || tags->encoding() == Container::kUnsupported) { + // No parseable tags; skip this fiile. + return; + } - // Check for any existing track with the same hash. - uint64_t hash = tags->Hash(); - std::optional<TrackId> existing_id; - std::string raw_entry; - if (db_->Get(leveldb::ReadOptions(), EncodeHashKey(hash), &raw_entry) - .ok()) { - existing_id = ParseHashValue(raw_entry); - } + // Check for any existing track with the same hash. + uint64_t hash = tags->Hash(); + std::optional<TrackId> existing_id; + std::string raw_entry; + if (db_->Get(read_options, EncodeHashKey(hash), &raw_entry).ok()) { + existing_id = ParseHashValue(raw_entry); + } - std::shared_ptr<TrackData> data; - if (existing_id) { - // Do we have any existing data for this track? This could be the case if - // this is a tombstoned entry. In such as case, we want to reuse the - // previous TrackData so that any extra metadata is preserved. - data = dbGetTrackData(*existing_id); - if (!data) { - data = std::make_shared<TrackData>(); - data->id = *existing_id; - } else if (std::string_view{data->filepath} != *path) { - ESP_LOGW(kTag, "hash collision: %s, %s, %s", - tags->title().value_or("no title").c_str(), - tags->artist().value_or("no artist").c_str(), - tags->album().value_or("no album").c_str()); - // Don't commit anything if there's a hash collision, since we're - // likely to make a big mess. - continue; - } - } else { - num_new_tracks++; + std::shared_ptr<TrackData> data; + if (existing_id) { + // Do we have any existing data for this track? This could be the case if + // this is a tombstoned entry. In such as case, we want to reuse the + // previous TrackData so that any extra metadata is preserved. + data = dbGetTrackData(read_options, *existing_id); + if (!data) { data = std::make_shared<TrackData>(); - data->id = dbMintNewTrackId(); + data->id = *existing_id; + } else if (data->filepath != path && !data->is_tombstoned) { + ESP_LOGW(kTag, "hash collision: %s, %s, %s", + tags->title().value_or("no title").c_str(), + tags->artist().value_or("no artist").c_str(), + tags->album().value_or("no album").c_str()); + // Don't commit anything if there's a hash collision, since we're + // likely to make a big mess. + return; } + } else { + update_tracker_->onTrackAdded(); + data = std::make_shared<TrackData>(); + data->id = dbMintNewTrackId(); + } - // Make sure the file-based metadata on the TrackData is up to date. - data->filepath = *path; - data->tags_hash = hash; - data->modified_at = {info.fdate, info.ftime}; - - // Apply all the actual database changes as one atomic batch. This makes - // the whole 'new track' operation atomic, and also reduces the amount of - // lock contention when adding many tracks at once. - leveldb::WriteBatch batch; - dbIngestTagHashes(*tags, data->individual_tag_hashes, batch); - - dbCreateIndexesForTrack(*data, *tags, batch); - batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); - batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); - batch.Put(EncodePathKey(*path), TrackIdToBytes(data->id)); + // Make sure the file-based metadata on the TrackData is up to date. + data->filepath = path; + data->tags_hash = hash; + data->modified_at = {info.fdate, info.ftime}; + data->is_tombstoned = false; - db_->Write(leveldb::WriteOptions(), &batch); - }; + // Apply all the actual database changes as one atomic batch. This makes + // the whole 'new track' operation atomic, and also reduces the amount of + // lock contention when adding many tracks at once. + leveldb::WriteBatch batch; + dbIngestTagHashes(*tags, data->individual_tag_hashes, batch); - uint64_t end_time = esp_timer_get_time(); + dbCreateIndexesForTrack(*data, *tags, batch); + batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); + batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); + batch.Put(EncodePathKey(path), TrackIdToBytes(data->id)); - uint64_t time_per_old = 0; - if (num_old_tracks) { - time_per_old = (verify_end_time - start_time) / num_old_tracks; - } - uint64_t time_per_new = 0; - if (num_new_tracks) { - time_per_new = (end_time - verify_end_time) / num_new_tracks; - } + db_->Write(leveldb::WriteOptions(), &batch); +} - ESP_LOGI( - kTag, - "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " - "per old, %llums per new)", - num_old_tracks, num_new_tracks, (end_time - start_time) / 1000000, - time_per_old / 1000, time_per_new / 1000); +auto Database::indexingCompleteCallback() -> void { + update_tracker_.reset(); + is_updating_ = false; } auto Database::isUpdating() -> bool { @@ -536,10 +546,11 @@ auto Database::dbMintNewTrackId() -> TrackId { return next_track_id_++; } -auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> { +auto Database::dbGetTrackData(leveldb::ReadOptions options, TrackId id) + -> std::shared_ptr<TrackData> { std::string key = EncodeDataKey(id); std::string raw_val; - if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) { + if (!db_->Get(options, key, &raw_val).ok()) { ESP_LOGW(kTag, "no key found for #%lx", id); return {}; } |
