diff options
Diffstat (limited to 'src/tangara/database/database.cpp')
| -rw-r--r-- | src/tangara/database/database.cpp | 413 |
1 files changed, 211 insertions, 202 deletions
diff --git a/src/tangara/database/database.cpp b/src/tangara/database/database.cpp index 85700431..64451f48 100644 --- a/src/tangara/database/database.cpp +++ b/src/tangara/database/database.cpp @@ -6,9 +6,6 @@ #include "database/database.hpp" -#include <stdint.h> -#include <sys/_stdint.h> - #include <algorithm> #include <cstdint> #include <functional> @@ -20,10 +17,8 @@ #include <string> #include <variant> -#include "collation.hpp" #include "cppbor.h" #include "cppbor_parse.h" -#include "database/index.hpp" #include "esp_log.h" #include "esp_timer.h" #include "ff.h" @@ -37,13 +32,14 @@ #include "leveldb/status.h" #include "leveldb/write_batch.h" +#include "collation.hpp" #include "database/db_events.hpp" #include "database/env_esp.hpp" -#include "database/file_gatherer.hpp" +#include "database/index.hpp" #include "database/records.hpp" #include "database/tag_parser.hpp" #include "database/track.hpp" -#include "drivers/spi.hpp" +#include "database/track_finder.hpp" #include "events/event_queue.hpp" #include "memory_resource.hpp" #include "result.hpp" @@ -55,15 +51,19 @@ static SingletonEnv<leveldb::EspEnv> sEnv; [[maybe_unused]] static const char* kTag = "DB"; static const char kDbPath[] = "/.tangara-db"; +static const char kMusicPath[] = "Music"; static const char kKeyDbVersion[] = "schema_version"; - static const char kKeyCustom[] = "U\0"; static const char kKeyCollator[] = "collator"; -static const char kKeyTrackId[] = "next_track_id"; + +static constexpr size_t kMaxParallelism = 2; static std::atomic<bool> sIsDbOpen(false); +using std::placeholders::_1; +using std::placeholders::_2; + static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col) -> leveldb::DB* { Database::Destroy(); @@ -122,8 +122,7 @@ static auto CheckDatabase(leveldb::DB& db, locale::ICollator& col) -> bool { return true; } -auto Database::Open(IFileGatherer& gatherer, - ITagParser& parser, +auto Database::Open(ITagParser& parser, locale::ICollator& collator, tasks::WorkerPool& bg_worker) -> cpp::result<Database*, DatabaseError> { @@ -144,10 +143,10 @@ auto Database::Open(IFileGatherer& gatherer, leveldb::Options options; options.env = sEnv.env(); - options.write_buffer_size = 4 * 1024; - options.max_file_size = 16 * 1024; + // Match the write buffer size to the MMU page size in order to + // make most efficient use of PSRAM mapping. + options.write_buffer_size = CONFIG_MMU_PAGE_SIZE; options.block_cache = cache.get(); - options.block_size = 2048; auto status = leveldb::DB::Open(options, kDbPath, &db); if (!status.ok()) { @@ -168,7 +167,7 @@ auto Database::Open(IFileGatherer& gatherer, } ESP_LOGI(kTag, "Database opened successfully"); - return new Database(db, cache.release(), gatherer, parser, + return new Database(db, cache.release(), bg_worker, parser, collator); }) .get(); @@ -182,15 +181,21 @@ auto Database::Destroy() -> void { Database::Database(leveldb::DB* db, leveldb::Cache* cache, - IFileGatherer& file_gatherer, + tasks::WorkerPool& pool, ITagParser& tag_parser, locale::ICollator& collator) : db_(db), cache_(cache), - file_gatherer_(file_gatherer), + track_finder_( + pool, + kMaxParallelism, + std::bind(&Database::processCandidateCallback, this, _1, _2), + std::bind(&Database::indexingCompleteCallback, this)), tag_parser_(tag_parser), collator_(collator), - is_updating_(false) {} + is_updating_(false) { + dbCalculateNextTrackId(); +} Database::~Database() { // Delete db_ first so that any outstanding background work finishes before @@ -244,7 +249,7 @@ auto Database::get(const std::string& key) -> std::optional<std::string> { } auto Database::getTrackPath(TrackId id) -> std::optional<std::string> { - auto track_data = dbGetTrackData(id); + auto track_data = dbGetTrackData(leveldb::ReadOptions(), id); if (!track_data) { return {}; } @@ -252,7 +257,7 @@ auto Database::getTrackPath(TrackId id) -> std::optional<std::string> { } auto Database::getTrack(TrackId id) -> std::shared_ptr<Track> { - std::shared_ptr<TrackData> data = dbGetTrackData(id); + std::shared_ptr<TrackData> data = dbGetTrackData(leveldb::ReadOptions(), id); if (!data || data->is_tombstoned) { return {}; } @@ -275,34 +280,61 @@ auto Database::getIndexes() -> std::vector<IndexInfo> { }; } -class UpdateNotifier { - public: - UpdateNotifier(std::atomic<bool>& is_updating) : is_updating_(is_updating) { - events::Ui().Dispatch(event::UpdateStarted{}); - events::System().Dispatch(event::UpdateStarted{}); +Database::UpdateTracker::UpdateTracker() + : num_old_tracks_(0), + num_new_tracks_(0), + start_time_(esp_timer_get_time()) { + events::Ui().Dispatch(event::UpdateStarted{}); + events::System().Dispatch(event::UpdateStarted{}); +} + +Database::UpdateTracker::~UpdateTracker() { + uint64_t end_time = esp_timer_get_time(); + + uint64_t time_per_old = 0; + if (num_old_tracks_) { + time_per_old = (verification_finish_time_ - start_time_) / num_old_tracks_; } - ~UpdateNotifier() { - is_updating_ = false; - events::Ui().Dispatch(event::UpdateFinished{}); - events::System().Dispatch(event::UpdateFinished{}); + uint64_t time_per_new = 0; + if (num_new_tracks_) { + time_per_new = (end_time - verification_finish_time_) / num_new_tracks_; } - private: - std::atomic<bool>& is_updating_; -}; + ESP_LOGI( + kTag, + "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " + "per old, %llums per new)", + num_old_tracks_, num_new_tracks_, (end_time - start_time_) / 1000000, + time_per_old / 1000, time_per_new / 1000); + + events::Ui().Dispatch(event::UpdateFinished{}); + events::System().Dispatch(event::UpdateFinished{}); +} + +auto Database::UpdateTracker::onTrackVerified() -> void { + events::Ui().Dispatch(event::UpdateProgress{ + .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, + .val = ++num_old_tracks_, + }); +} + +auto Database::UpdateTracker::onVerificationFinished() -> void { + verification_finish_time_ = esp_timer_get_time(); +} + +auto Database::UpdateTracker::onTrackAdded() -> void { + num_new_tracks_++; +} auto Database::updateIndexes() -> void { if (is_updating_.exchange(true)) { return; } - UpdateNotifier notifier{is_updating_}; - - uint32_t num_old_tracks = 0; - uint32_t num_new_tracks = 0; - uint64_t start_time = esp_timer_get_time(); + update_tracker_ = std::make_unique<UpdateTracker>(); leveldb::ReadOptions read_options; - read_options.fill_cache = true; + read_options.fill_cache = false; + read_options.verify_checksums = true; // Stage 1: verify all existing tracks are still valid. ESP_LOGI(kTag, "verifying existing tracks"); @@ -311,11 +343,7 @@ auto Database::updateIndexes() -> void { std::string prefix = EncodeDataPrefix(); for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix); it->Next()) { - num_old_tracks++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks, - .val = num_old_tracks, - }); + update_tracker_->onTrackVerified(); std::shared_ptr<TrackData> track = ParseDataValue(it->value()); if (!track) { @@ -326,7 +354,6 @@ auto Database::updateIndexes() -> void { } if (track->is_tombstoned) { - ESP_LOGW(kTag, "skipping tombstoned %lx", track->id); continue; } @@ -349,11 +376,19 @@ auto Database::updateIndexes() -> void { // We couldn't read the tags for this track. Either they were // malformed, or perhaps the file is missing. Either way, tombstone // this record. - ESP_LOGW(kTag, "entombing missing #%lx", track->id); + ESP_LOGI(kTag, "entombing missing #%lx", track->id); + + // Remove the indexes first, so that interrupted operations don't leave + // dangling index records. dbRemoveIndexes(track); + + // Do the rest of the tombstoning as one atomic write. + leveldb::WriteBatch batch; track->is_tombstoned = true; - dbPutTrackData(*track); - db_->Delete(leveldb::WriteOptions{}, EncodePathKey(track->filepath)); + batch.Put(EncodeDataKey(track->id), EncodeDataValue(*track)); + batch.Delete(EncodePathKey(track->filepath)); + + db_->Write(leveldb::WriteOptions(), &batch); continue; } @@ -367,204 +402,181 @@ auto Database::updateIndexes() -> void { // database. ESP_LOGI(kTag, "updating hash (%llx -> %llx)", track->tags_hash, new_hash); + + // Again, we remove the old index records first so has to avoid + // dangling references. dbRemoveIndexes(track); + // Atomically correct the hash + create the new index records. + leveldb::WriteBatch batch; track->tags_hash = new_hash; - dbIngestTagHashes(*tags, track->individual_tag_hashes); - dbPutTrackData(*track); - dbPutHash(new_hash, track->id); + dbIngestTagHashes(*tags, track->individual_tag_hashes, batch); + + dbCreateIndexesForTrack(*track, *tags, batch); + batch.Put(EncodeDataKey(track->id), EncodeDataValue(*track)); + batch.Put(EncodeHashKey(new_hash), EncodeHashValue(track->id)); + db_->Write(leveldb::WriteOptions(), &batch); } } } - uint64_t verify_end_time = esp_timer_get_time(); + update_tracker_->onVerificationFinished(); // Stage 2: search for newly added files. - ESP_LOGI(kTag, "scanning for new tracks"); - uint64_t num_files = 0; - file_gatherer_.FindFiles("", [&](std::string_view path, const FILINFO& info) { - num_files++; - events::Ui().Dispatch(event::UpdateProgress{ - .stage = event::UpdateProgress::Stage::kScanningForNewTracks, - .val = num_files, - }); - - std::string unused; - if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) { - // This file is already in the database; skip it. - return; - } - - std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(path); - if (!tags || tags->encoding() == Container::kUnsupported) { - // No parseable tags; skip this fiile. - return; - } + std::string root; + FF_DIR dir; + if (f_opendir(&dir, kMusicPath) == FR_OK) { + f_closedir(&dir); + root = kMusicPath; + } + ESP_LOGI(kTag, "scanning for new tracks in '%s'", root.c_str()); + track_finder_.launch(root); +}; - // Check for any existing record with the same hash. - uint64_t hash = tags->Hash(); - std::string key = EncodeHashKey(hash); - std::optional<TrackId> existing_hash; - std::string raw_entry; - if (db_->Get(leveldb::ReadOptions(), key, &raw_entry).ok()) { - existing_hash = ParseHashValue(raw_entry); - } +auto Database::processCandidateCallback(FILINFO& info, std::string_view path) + -> void { + leveldb::ReadOptions read_options; + read_options.fill_cache = true; + read_options.verify_checksums = false; - std::pair<uint16_t, uint16_t> modified{info.fdate, info.ftime}; - if (!existing_hash) { - // We've never met this track before! Or we have, but the entry is - // malformed. Either way, record this as a new track. - TrackId id = dbMintNewTrackId(); - ESP_LOGD(kTag, "recording new 0x%lx", id); - num_new_tracks++; - - auto data = std::make_shared<TrackData>(); - data->id = id; - data->filepath = path; - data->tags_hash = hash; - data->modified_at = modified; - dbIngestTagHashes(*tags, data->individual_tag_hashes); - - dbPutTrackData(*data); - dbPutHash(hash, id); - auto t = std::make_shared<Track>(data, tags); - dbCreateIndexesForTrack(*t); - db_->Put(leveldb::WriteOptions{}, EncodePathKey(path), - TrackIdToBytes(id)); - return; - } + std::string unused; + if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) { + // This file is already in the database; skip it. + return; + } - std::shared_ptr<TrackData> existing_data = dbGetTrackData(*existing_hash); - if (!existing_data) { - // We found a hash that matches, but there's no data record? Weird. - auto new_data = std::make_shared<TrackData>(); - new_data->id = dbMintNewTrackId(); - new_data->filepath = path; - new_data->tags_hash = hash; - new_data->modified_at = modified; - dbIngestTagHashes(*tags, new_data->individual_tag_hashes); - dbPutTrackData(*new_data); - auto t = std::make_shared<Track>(new_data, tags); - dbCreateIndexesForTrack(*t); - db_->Put(leveldb::WriteOptions{}, EncodePathKey(path), - TrackIdToBytes(new_data->id)); - return; - } + std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(path); + if (!tags || tags->encoding() == Container::kUnsupported) { + // No parseable tags; skip this fiile. + return; + } - if (existing_data->is_tombstoned) { - ESP_LOGI(kTag, "exhuming track %lu", existing_data->id); - existing_data->is_tombstoned = false; - existing_data->modified_at = modified; - dbPutTrackData(*existing_data); - auto t = std::make_shared<Track>(existing_data, tags); - dbCreateIndexesForTrack(*t); - db_->Put(leveldb::WriteOptions{}, EncodePathKey(path), - TrackIdToBytes(existing_data->id)); - } else if (existing_data->filepath != - std::pmr::string{path.data(), path.size()}) { + // Check for any existing track with the same hash. + uint64_t hash = tags->Hash(); + std::optional<TrackId> existing_id; + std::string raw_entry; + if (db_->Get(read_options, EncodeHashKey(hash), &raw_entry).ok()) { + existing_id = ParseHashValue(raw_entry); + } + + std::shared_ptr<TrackData> data; + if (existing_id) { + // Do we have any existing data for this track? This could be the case if + // this is a tombstoned entry. In such as case, we want to reuse the + // previous TrackData so that any extra metadata is preserved. + data = dbGetTrackData(read_options, *existing_id); + if (!data) { + data = std::make_shared<TrackData>(); + data->id = *existing_id; + } else if (data->filepath != path && !data->is_tombstoned) { ESP_LOGW(kTag, "hash collision: %s, %s, %s", tags->title().value_or("no title").c_str(), tags->artist().value_or("no artist").c_str(), tags->album().value_or("no album").c_str()); + // Don't commit anything if there's a hash collision, since we're + // likely to make a big mess. + return; } - }); + } else { + update_tracker_->onTrackAdded(); + data = std::make_shared<TrackData>(); + data->id = dbMintNewTrackId(); + } - uint64_t end_time = esp_timer_get_time(); + // Make sure the file-based metadata on the TrackData is up to date. + data->filepath = path; + data->tags_hash = hash; + data->modified_at = {info.fdate, info.ftime}; + data->is_tombstoned = false; - uint64_t time_per_old = 0; - if (num_old_tracks) { - time_per_old = (verify_end_time - start_time) / num_old_tracks; - } - uint64_t time_per_new = 0; - if (num_new_tracks) { - time_per_new = (end_time - verify_end_time) / num_new_tracks; - } + // Apply all the actual database changes as one atomic batch. This makes + // the whole 'new track' operation atomic, and also reduces the amount of + // lock contention when adding many tracks at once. + leveldb::WriteBatch batch; + dbIngestTagHashes(*tags, data->individual_tag_hashes, batch); - ESP_LOGI( - kTag, - "processed %lu old tracks and %lu new tracks in %llu seconds (%llums " - "per old, %llums per new)", - num_old_tracks, num_new_tracks, (end_time - start_time) / 1000000, - time_per_old / 1000, time_per_new / 1000); + dbCreateIndexesForTrack(*data, *tags, batch); + batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data)); + batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id)); + batch.Put(EncodePathKey(path), TrackIdToBytes(data->id)); + + db_->Write(leveldb::WriteOptions(), &batch); +} + +auto Database::indexingCompleteCallback() -> void { + update_tracker_.reset(); + is_updating_ = false; } auto Database::isUpdating() -> bool { return is_updating_; } -auto Database::dbMintNewTrackId() -> TrackId { - TrackId next_id = 1; - std::string val; - auto status = db_->Get(leveldb::ReadOptions(), kKeyTrackId, &val); - if (status.ok()) { - next_id = BytesToTrackId(val).value_or(next_id); - } else if (!status.IsNotFound()) { - // TODO(jacqueline): Handle this more. - ESP_LOGE(kTag, "failed to get next track id"); +auto Database::dbCalculateNextTrackId() -> void { + std::unique_ptr<leveldb::Iterator> it{ + db_->NewIterator(leveldb::ReadOptions())}; + + // Track data entries are of the format 'D/trackid', where track ids are + // encoded as big-endian cbor types. They can therefore be compared through + // byte ordering, which means we can determine what the next id should be by + // looking at the larged track data record in the database. + std::string prefix = EncodeDataPrefix(); + std::string prefixPlusOne = prefix; + prefixPlusOne[prefixPlusOne.size() - 1]++; + + // Seek to just past the track data section. + it->Seek(prefixPlusOne); + if (!it->Valid()) { + next_track_id_ = 1; + return; } - if (!db_->Put(leveldb::WriteOptions(), kKeyTrackId, - TrackIdToBytes(next_id + 1)) - .ok()) { - ESP_LOGE(kTag, "failed to write next track id"); + // Go back to the last track data record. + it->Prev(); + if (!it->Valid() || !it->key().starts_with(prefix)) { + next_track_id_ = 1; + return; } - return next_id; -} - -auto Database::dbEntomb(TrackId id, uint64_t hash) -> void { - std::string key = EncodeHashKey(hash); - std::string val = EncodeHashValue(id); - if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) { - ESP_LOGE(kTag, "failed to entomb #%llx (id #%lx)", hash, id); + // Parse the track id back out of the key. + std::span<const char> key{it->key().data(), it->key().size()}; + auto id_part = key.subspan(prefix.size()); + if (id_part.empty()) { + next_track_id_ = 1; + return; } + + next_track_id_ = BytesToTrackId(id_part).value_or(0) + 1; } -auto Database::dbPutTrackData(const TrackData& s) -> void { - std::string key = EncodeDataKey(s.id); - std::string val = EncodeDataValue(s); - if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) { - ESP_LOGE(kTag, "failed to write data for #%lx", s.id); - } +auto Database::dbMintNewTrackId() -> TrackId { + return next_track_id_++; } -auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> { +auto Database::dbGetTrackData(leveldb::ReadOptions options, TrackId id) + -> std::shared_ptr<TrackData> { std::string key = EncodeDataKey(id); std::string raw_val; - if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) { + if (!db_->Get(options, key, &raw_val).ok()) { ESP_LOGW(kTag, "no key found for #%lx", id); return {}; } return ParseDataValue(raw_val); } -auto Database::dbPutHash(const uint64_t& hash, TrackId i) -> void { - std::string key = EncodeHashKey(hash); - std::string val = EncodeHashValue(i); - if (!db_->Put(leveldb::WriteOptions(), key, val).ok()) { - ESP_LOGE(kTag, "failed to write hash for #%lx", i); - } +auto Database::dbCreateIndexesForTrack(const Track& track, + leveldb::WriteBatch& batch) -> void { + dbCreateIndexesForTrack(track.data(), track.tags(), batch); } -auto Database::dbGetHash(const uint64_t& hash) -> std::optional<TrackId> { - std::string key = EncodeHashKey(hash); - std::string raw_val; - if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) { - ESP_LOGW(kTag, "no key found for hash #%llx", hash); - return {}; - } - return ParseHashValue(raw_val); -} - -auto Database::dbCreateIndexesForTrack(const Track& track) -> void { +auto Database::dbCreateIndexesForTrack(const TrackData& data, + const TrackTags& tags, + leveldb::WriteBatch& batch) -> void { for (const IndexInfo& index : getIndexes()) { - leveldb::WriteBatch writes; - auto entries = Index(collator_, index, track); + auto entries = Index(collator_, index, data, tags); for (const auto& it : entries) { - writes.Put(EncodeIndexKey(it.first), - {it.second.data(), it.second.size()}); + batch.Put(EncodeIndexKey(it.first), {it.second.data(), it.second.size()}); } - db_->Write(leveldb::WriteOptions(), &writes); } } @@ -573,9 +585,8 @@ auto Database::dbRemoveIndexes(std::shared_ptr<TrackData> data) -> void { if (!tags) { return; } - Track track{data, tags}; for (const IndexInfo& index : getIndexes()) { - auto entries = Index(collator_, index, track); + auto entries = Index(collator_, index, *data, *tags); for (auto it = entries.rbegin(); it != entries.rend(); it++) { auto key = EncodeIndexKey(it->first); auto status = db_->Delete(leveldb::WriteOptions{}, key); @@ -602,16 +613,14 @@ auto Database::dbRemoveIndexes(std::shared_ptr<TrackData> data) -> void { } auto Database::dbIngestTagHashes(const TrackTags& tags, - std::pmr::unordered_map<Tag, uint64_t>& out) - -> void { - leveldb::WriteBatch batch{}; + std::pmr::unordered_map<Tag, uint64_t>& out, + leveldb::WriteBatch& batch) -> void { for (const auto& tag : tags.allPresent()) { auto val = tags.get(tag); auto hash = tagHash(val); batch.Put(EncodeTagHashKey(hash), tagToString(val)); out[tag] = hash; } - db_->Write(leveldb::WriteOptions{}, &batch); } auto Database::dbRecoverTagsFromHashes( |
