From 2ad83cb2108dc55c9eb0573b0645513a1e8a61f5 Mon Sep 17 00:00:00 2001 From: jacqueline Date: Fri, 9 Aug 2024 11:43:48 +1000 Subject: Shard searching for new tracks across multiple tasks This also has the effect of breaking up the enormous 'updateIndexes' method into one call per file, which means database updates also no longer monopolise a single background task for their entire duration. avg. time per new file is now <140ms for a completely fresh database, which is pretty good i think! --- src/tangara/database/track_finder.cpp | 44 +++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 5 deletions(-) (limited to 'src/tangara/database/track_finder.cpp') diff --git a/src/tangara/database/track_finder.cpp b/src/tangara/database/track_finder.cpp index 86948e70..21a44339 100644 --- a/src/tangara/database/track_finder.cpp +++ b/src/tangara/database/track_finder.cpp @@ -24,12 +24,12 @@ namespace database { static_assert(sizeof(TCHAR) == sizeof(char), "TCHAR must be CHAR"); -TrackFinder::TrackFinder(std::string_view root) +CandidateIterator::CandidateIterator(std::string_view root) : to_explore_(&memory::kSpiRamResource) { to_explore_.push_back({root.data(), root.size()}); } -auto TrackFinder::next(FILINFO& out_info) -> std::optional { +auto CandidateIterator::next(FILINFO& info) -> std::optional { std::scoped_lock lock{mut_}; while (!to_explore_.empty() || current_) { if (!current_) { @@ -49,7 +49,6 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional { } } - FILINFO info; FRESULT res = f_readdir(¤t_->second, &info); if (res != FR_OK || info.fname[0] == 0) { // No more files in the directory. @@ -71,14 +70,49 @@ auto TrackFinder::next(FILINFO& out_info) -> std::optional { to_explore_.push_back(full_path); } else { // This is a file! We can return now. - out_info = info; return {{full_path.data(), full_path.size()}}; } } } - // Out of things to explore. + // Out of paths to explore. return {}; } +TrackFinder::TrackFinder( + tasks::WorkerPool& pool, + size_t parallelism, + std::function processor, + std::function complete_cb) + : pool_{pool}, + parallelism_(parallelism), + processor_(processor), + complete_cb_(complete_cb) {} + +auto TrackFinder::launch(std::string_view root) -> void { + iterator_ = std::make_unique(root); + num_workers_ = parallelism_; + for (size_t i = 0; i < parallelism_; i++) { + schedule(); + } +} + +auto TrackFinder::schedule() -> void { + pool_.Dispatch([&]() { + FILINFO info; + auto next = iterator_->next(info); + if (next) { + std::invoke(processor_, info, *next); + schedule(); + } else { + std::scoped_lock lock{workers_mutex_}; + num_workers_ -= 1; + if (num_workers_ == 0) { + iterator_.reset(); + std::invoke(complete_cb_); + } + } + }); +} + } // namespace database -- cgit v1.2.3