summaryrefslogtreecommitdiff
path: root/src/tangara/database/database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/tangara/database/database.cpp')
-rw-r--r--src/tangara/database/database.cpp253
1 files changed, 132 insertions, 121 deletions
diff --git a/src/tangara/database/database.cpp b/src/tangara/database/database.cpp
index 2d72fe95..491ad8b7 100644
--- a/src/tangara/database/database.cpp
+++ b/src/tangara/database/database.cpp
@@ -6,9 +6,6 @@
#include "database/database.hpp"
-#include <stdint.h>
-#include <sys/_stdint.h>
-
#include <algorithm>
#include <cstdint>
#include <functional>
@@ -20,12 +17,8 @@
#include <string>
#include <variant>
-#include "collation.hpp"
#include "cppbor.h"
#include "cppbor_parse.h"
-#include "database/index.hpp"
-#include "database/track_finder.hpp"
-#include "debug.hpp"
#include "esp_log.h"
#include "esp_timer.h"
#include "ff.h"
@@ -39,12 +32,14 @@
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
+#include "collation.hpp"
#include "database/db_events.hpp"
#include "database/env_esp.hpp"
+#include "database/index.hpp"
#include "database/records.hpp"
#include "database/tag_parser.hpp"
#include "database/track.hpp"
-#include "drivers/spi.hpp"
+#include "database/track_finder.hpp"
#include "events/event_queue.hpp"
#include "memory_resource.hpp"
#include "result.hpp"
@@ -58,12 +53,16 @@ static SingletonEnv<leveldb::EspEnv> sEnv;
static const char kDbPath[] = "/.tangara-db";
static const char kKeyDbVersion[] = "schema_version";
-
static const char kKeyCustom[] = "U\0";
static const char kKeyCollator[] = "collator";
+static constexpr size_t kMaxParallelism = 2;
+
static std::atomic<bool> sIsDbOpen(false);
+using std::placeholders::_1;
+using std::placeholders::_2;
+
static auto CreateNewDatabase(leveldb::Options& options, locale::ICollator& col)
-> leveldb::DB* {
Database::Destroy();
@@ -167,7 +166,8 @@ auto Database::Open(ITagParser& parser,
}
ESP_LOGI(kTag, "Database opened successfully");
- return new Database(db, cache.release(), parser, collator);
+ return new Database(db, cache.release(), bg_worker, parser,
+ collator);
})
.get();
}
@@ -180,15 +180,20 @@ auto Database::Destroy() -> void {
Database::Database(leveldb::DB* db,
leveldb::Cache* cache,
+ tasks::WorkerPool& pool,
ITagParser& tag_parser,
locale::ICollator& collator)
: db_(db),
cache_(cache),
+ track_finder_(
+ pool,
+ kMaxParallelism,
+ std::bind(&Database::processCandidateCallback, this, _1, _2),
+ std::bind(&Database::indexingCompleteCallback, this)),
tag_parser_(tag_parser),
collator_(collator),
is_updating_(false) {
dbCalculateNextTrackId();
- ESP_LOGI(kTag, "next track id is %lu", next_track_id_.load());
}
Database::~Database() {
@@ -243,7 +248,7 @@ auto Database::get(const std::string& key) -> std::optional<std::string> {
}
auto Database::getTrackPath(TrackId id) -> std::optional<std::string> {
- auto track_data = dbGetTrackData(id);
+ auto track_data = dbGetTrackData(leveldb::ReadOptions(), id);
if (!track_data) {
return {};
}
@@ -251,7 +256,7 @@ auto Database::getTrackPath(TrackId id) -> std::optional<std::string> {
}
auto Database::getTrack(TrackId id) -> std::shared_ptr<Track> {
- std::shared_ptr<TrackData> data = dbGetTrackData(id);
+ std::shared_ptr<TrackData> data = dbGetTrackData(leveldb::ReadOptions(), id);
if (!data || data->is_tombstoned) {
return {};
}
@@ -274,34 +279,61 @@ auto Database::getIndexes() -> std::vector<IndexInfo> {
};
}
-class UpdateNotifier {
- public:
- UpdateNotifier(std::atomic<bool>& is_updating) : is_updating_(is_updating) {
- events::Ui().Dispatch(event::UpdateStarted{});
- events::System().Dispatch(event::UpdateStarted{});
+Database::UpdateTracker::UpdateTracker()
+ : num_old_tracks_(0),
+ num_new_tracks_(0),
+ start_time_(esp_timer_get_time()) {
+ events::Ui().Dispatch(event::UpdateStarted{});
+ events::System().Dispatch(event::UpdateStarted{});
+}
+
+Database::UpdateTracker::~UpdateTracker() {
+ uint64_t end_time = esp_timer_get_time();
+
+ uint64_t time_per_old = 0;
+ if (num_old_tracks_) {
+ time_per_old = (verification_finish_time_ - start_time_) / num_old_tracks_;
}
- ~UpdateNotifier() {
- is_updating_ = false;
- events::Ui().Dispatch(event::UpdateFinished{});
- events::System().Dispatch(event::UpdateFinished{});
+ uint64_t time_per_new = 0;
+ if (num_new_tracks_) {
+ time_per_new = (end_time - verification_finish_time_) / num_new_tracks_;
}
- private:
- std::atomic<bool>& is_updating_;
-};
+ ESP_LOGI(
+ kTag,
+ "processed %lu old tracks and %lu new tracks in %llu seconds (%llums "
+ "per old, %llums per new)",
+ num_old_tracks_, num_new_tracks_, (end_time - start_time_) / 1000000,
+ time_per_old / 1000, time_per_new / 1000);
+
+ events::Ui().Dispatch(event::UpdateFinished{});
+ events::System().Dispatch(event::UpdateFinished{});
+}
+
+auto Database::UpdateTracker::onTrackVerified() -> void {
+ events::Ui().Dispatch(event::UpdateProgress{
+ .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks,
+ .val = ++num_old_tracks_,
+ });
+}
+
+auto Database::UpdateTracker::onVerificationFinished() -> void {
+ verification_finish_time_ = esp_timer_get_time();
+}
+
+auto Database::UpdateTracker::onTrackAdded() -> void {
+ num_new_tracks_++;
+}
auto Database::updateIndexes() -> void {
if (is_updating_.exchange(true)) {
return;
}
- UpdateNotifier notifier{is_updating_};
-
- uint32_t num_old_tracks = 0;
- uint32_t num_new_tracks = 0;
- uint64_t start_time = esp_timer_get_time();
+ update_tracker_ = std::make_unique<UpdateTracker>();
leveldb::ReadOptions read_options;
- read_options.fill_cache = true;
+ read_options.fill_cache = false;
+ read_options.verify_checksums = true;
// Stage 1: verify all existing tracks are still valid.
ESP_LOGI(kTag, "verifying existing tracks");
@@ -310,11 +342,7 @@ auto Database::updateIndexes() -> void {
std::string prefix = EncodeDataPrefix();
for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix);
it->Next()) {
- num_old_tracks++;
- events::Ui().Dispatch(event::UpdateProgress{
- .stage = event::UpdateProgress::Stage::kVerifyingExistingTracks,
- .val = num_old_tracks,
- });
+ update_tracker_->onTrackVerified();
std::shared_ptr<TrackData> track = ParseDataValue(it->value());
if (!track) {
@@ -325,7 +353,6 @@ auto Database::updateIndexes() -> void {
}
if (track->is_tombstoned) {
- ESP_LOGW(kTag, "skipping tombstoned %lx", track->id);
continue;
}
@@ -392,103 +419,86 @@ auto Database::updateIndexes() -> void {
}
}
- uint64_t verify_end_time = esp_timer_get_time();
+ update_tracker_->onVerificationFinished();
// Stage 2: search for newly added files.
ESP_LOGI(kTag, "scanning for new tracks");
- uint64_t num_files = 0;
-
- auto track_finder = std::make_shared<TrackFinder>("");
+ track_finder_.launch("");
+};
- FILINFO info;
- while (auto path = track_finder->next(info)) {
- num_files++;
- events::Ui().Dispatch(event::UpdateProgress{
- .stage = event::UpdateProgress::Stage::kScanningForNewTracks,
- .val = num_files,
- });
+auto Database::processCandidateCallback(FILINFO& info, std::string_view path)
+ -> void {
+ leveldb::ReadOptions read_options;
+ read_options.fill_cache = true;
+ read_options.verify_checksums = false;
- std::string unused;
- if (db_->Get(read_options, EncodePathKey(*path), &unused).ok()) {
- // This file is already in the database; skip it.
- continue;
- }
+ std::string unused;
+ if (db_->Get(read_options, EncodePathKey(path), &unused).ok()) {
+ // This file is already in the database; skip it.
+ return;
+ }
- std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(*path);
- if (!tags || tags->encoding() == Container::kUnsupported) {
- // No parseable tags; skip this fiile.
- continue;
- }
+ std::shared_ptr<TrackTags> tags = tag_parser_.ReadAndParseTags(path);
+ if (!tags || tags->encoding() == Container::kUnsupported) {
+ // No parseable tags; skip this fiile.
+ return;
+ }
- // Check for any existing track with the same hash.
- uint64_t hash = tags->Hash();
- std::optional<TrackId> existing_id;
- std::string raw_entry;
- if (db_->Get(leveldb::ReadOptions(), EncodeHashKey(hash), &raw_entry)
- .ok()) {
- existing_id = ParseHashValue(raw_entry);
- }
+ // Check for any existing track with the same hash.
+ uint64_t hash = tags->Hash();
+ std::optional<TrackId> existing_id;
+ std::string raw_entry;
+ if (db_->Get(read_options, EncodeHashKey(hash), &raw_entry).ok()) {
+ existing_id = ParseHashValue(raw_entry);
+ }
- std::shared_ptr<TrackData> data;
- if (existing_id) {
- // Do we have any existing data for this track? This could be the case if
- // this is a tombstoned entry. In such as case, we want to reuse the
- // previous TrackData so that any extra metadata is preserved.
- data = dbGetTrackData(*existing_id);
- if (!data) {
- data = std::make_shared<TrackData>();
- data->id = *existing_id;
- } else if (std::string_view{data->filepath} != *path) {
- ESP_LOGW(kTag, "hash collision: %s, %s, %s",
- tags->title().value_or("no title").c_str(),
- tags->artist().value_or("no artist").c_str(),
- tags->album().value_or("no album").c_str());
- // Don't commit anything if there's a hash collision, since we're
- // likely to make a big mess.
- continue;
- }
- } else {
- num_new_tracks++;
+ std::shared_ptr<TrackData> data;
+ if (existing_id) {
+ // Do we have any existing data for this track? This could be the case if
+ // this is a tombstoned entry. In such as case, we want to reuse the
+ // previous TrackData so that any extra metadata is preserved.
+ data = dbGetTrackData(read_options, *existing_id);
+ if (!data) {
data = std::make_shared<TrackData>();
- data->id = dbMintNewTrackId();
+ data->id = *existing_id;
+ } else if (data->filepath != path && !data->is_tombstoned) {
+ ESP_LOGW(kTag, "hash collision: %s, %s, %s",
+ tags->title().value_or("no title").c_str(),
+ tags->artist().value_or("no artist").c_str(),
+ tags->album().value_or("no album").c_str());
+ // Don't commit anything if there's a hash collision, since we're
+ // likely to make a big mess.
+ return;
}
+ } else {
+ update_tracker_->onTrackAdded();
+ data = std::make_shared<TrackData>();
+ data->id = dbMintNewTrackId();
+ }
- // Make sure the file-based metadata on the TrackData is up to date.
- data->filepath = *path;
- data->tags_hash = hash;
- data->modified_at = {info.fdate, info.ftime};
-
- // Apply all the actual database changes as one atomic batch. This makes
- // the whole 'new track' operation atomic, and also reduces the amount of
- // lock contention when adding many tracks at once.
- leveldb::WriteBatch batch;
- dbIngestTagHashes(*tags, data->individual_tag_hashes, batch);
-
- dbCreateIndexesForTrack(*data, *tags, batch);
- batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data));
- batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id));
- batch.Put(EncodePathKey(*path), TrackIdToBytes(data->id));
+ // Make sure the file-based metadata on the TrackData is up to date.
+ data->filepath = path;
+ data->tags_hash = hash;
+ data->modified_at = {info.fdate, info.ftime};
+ data->is_tombstoned = false;
- db_->Write(leveldb::WriteOptions(), &batch);
- };
+ // Apply all the actual database changes as one atomic batch. This makes
+ // the whole 'new track' operation atomic, and also reduces the amount of
+ // lock contention when adding many tracks at once.
+ leveldb::WriteBatch batch;
+ dbIngestTagHashes(*tags, data->individual_tag_hashes, batch);
- uint64_t end_time = esp_timer_get_time();
+ dbCreateIndexesForTrack(*data, *tags, batch);
+ batch.Put(EncodeDataKey(data->id), EncodeDataValue(*data));
+ batch.Put(EncodeHashKey(data->tags_hash), EncodeHashValue(data->id));
+ batch.Put(EncodePathKey(path), TrackIdToBytes(data->id));
- uint64_t time_per_old = 0;
- if (num_old_tracks) {
- time_per_old = (verify_end_time - start_time) / num_old_tracks;
- }
- uint64_t time_per_new = 0;
- if (num_new_tracks) {
- time_per_new = (end_time - verify_end_time) / num_new_tracks;
- }
+ db_->Write(leveldb::WriteOptions(), &batch);
+}
- ESP_LOGI(
- kTag,
- "processed %lu old tracks and %lu new tracks in %llu seconds (%llums "
- "per old, %llums per new)",
- num_old_tracks, num_new_tracks, (end_time - start_time) / 1000000,
- time_per_old / 1000, time_per_new / 1000);
+auto Database::indexingCompleteCallback() -> void {
+ update_tracker_.reset();
+ is_updating_ = false;
}
auto Database::isUpdating() -> bool {
@@ -536,10 +546,11 @@ auto Database::dbMintNewTrackId() -> TrackId {
return next_track_id_++;
}
-auto Database::dbGetTrackData(TrackId id) -> std::shared_ptr<TrackData> {
+auto Database::dbGetTrackData(leveldb::ReadOptions options, TrackId id)
+ -> std::shared_ptr<TrackData> {
std::string key = EncodeDataKey(id);
std::string raw_val;
- if (!db_->Get(leveldb::ReadOptions(), key, &raw_val).ok()) {
+ if (!db_->Get(options, key, &raw_val).ok()) {
ESP_LOGW(kTag, "no key found for #%lx", id);
return {};
}