diff --git a/metagraph/integration_tests/test_build.py b/metagraph/integration_tests/test_build.py index 216603f757..0cdab0adc1 100644 --- a/metagraph/integration_tests/test_build.py +++ b/metagraph/integration_tests/test_build.py @@ -3,7 +3,6 @@ import subprocess from subprocess import PIPE from tempfile import TemporaryDirectory -import glob import os @@ -44,7 +43,7 @@ def test_simple_all_graphs(self, build): construct_command = '{exe} build --mask-dummy --graph {repr} --disk-swap {tmp_dir} -k 20 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000.fa' ) @@ -66,7 +65,7 @@ def test_simple_bloom_graph(self, build): construct_command = '{exe} build --mask-dummy --graph {repr} --disk-swap {tmp_dir} -k 20 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000.fa' ) @@ -111,7 +110,7 @@ def test_simple_all_graphs_canonical(self, build): --graph {repr} --canonical -k 20 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000.fa' ) @@ -176,7 +175,7 @@ def test_build_from_kmc(self, build): construct_command = '{exe} build --mask-dummy --graph {repr} --disk-swap {tmp_dir} -k 11 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters.kmc_suf' ) @@ -198,7 +197,7 @@ def test_build_from_kmc_both(self, build): construct_command = '{exe} build --mask-dummy --graph {repr} --disk-swap {tmp_dir} -k 11 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters_both_strands.kmc_suf' ) @@ -221,7 +220,7 @@ def test_build_from_kmc_canonical(self, build): --graph {repr} --disk-swap {tmp_dir} --canonical -k 11 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters.kmc_suf' ) @@ -244,7 +243,7 @@ def test_build_from_kmc_both_canonical(self, build): --graph {repr} --disk-swap {tmp_dir} --canonical -k 11 -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters_both_strands.kmc_suf' ) @@ -269,7 +268,7 @@ def test_build_chunks_from_kmc(self, build): --graph {repr} -k 11 --suffix {suffix} -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters.kmc_suf', suffix=suffix @@ -308,7 +307,7 @@ def test_build_chunks_from_kmc_canonical(self, build): --suffix {suffix} -o {outfile} {input}'.format( exe=METAGRAPH, repr=representation, - tmp_dir=tmp_dir, + tmp_dir='""' if tmp_dir == '""' else self.tempdir.name, outfile=self.tempdir.name + '/graph', input=TEST_DATA_DIR + '/transcripts_1000_kmc_counters.kmc_suf', suffix=suffix @@ -337,6 +336,77 @@ def test_build_chunks_from_kmc_canonical(self, build): self.assertEqual('nodes (k): 802920', params_str[1]) self.assertEqual('canonical mode: yes', params_str[2]) + @parameterized.expand(['succinct_disk']) + def test_build_phase(self, build): + representation, tmp_dir = build_params[build] + #TODO: remove -v + construct_command = '{exe} build -v --phase 1 --mask-dummy --graph {repr} --canonical -k 20 ' \ + '--disk-swap {tmp_dir} -o {outfile} {input}'.format( + exe=METAGRAPH, + repr=representation, + tmp_dir=self.tempdir.name, + outfile=self.tempdir.name + '/graph', + input=TEST_DATA_DIR + '/transcripts_1000.fa' + ) + print(f'Executing phase1: {construct_command}') + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint')) + + construct_command = construct_command.replace('--phase 1', '--phase 2') + print(f'Executing phase2: {construct_command}') + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint')) + + construct_command = construct_command.replace('--phase 2', '') + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + + res = self.__get_stats(self.tempdir.name + '/graph' + graph_file_extension[representation]) + self.assertEqual(res.returncode, 0) + params_str = res.stdout.decode().split('\n')[2:] + self.assertEqual('k: 20', params_str[0]) + self.assertEqual('nodes (k): 1159851', params_str[1]) + self.assertEqual('canonical mode: yes', params_str[2]) + self.assertFalse(os.path.isfile(self.tempdir.name + '/graph.checkpoint')) + + # tests that we can build and resume 2 separate graphs on the same machine + @parameterized.expand(['succinct_disk']) + def test_build_phase_parallel(self, build): + representation, tmp_dir = build_params[build] + for name in ('graph1', 'graph2'): + construct_command = '{exe} build --phase 2 --mask-dummy --graph {repr} --canonical -k 20 ' \ + '--disk-swap {tmp_dir} -o {outfile} {input}'.format( + exe=METAGRAPH, + repr=representation, + tmp_dir=self.tempdir.name, + outfile=self.tempdir.name + '/' + name, + input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa') + ) + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + self.assertTrue(os.path.isfile(self.tempdir.name + '/' + name + '.checkpoint')) + + for name in ('graph1', 'graph2'): + construct_command = '{exe} build --mask-dummy --graph {repr} --canonical -k 20 ' \ + '--disk-swap {tmp_dir} -o {outfile} {input}'.format( + exe=METAGRAPH, + repr=representation, + tmp_dir=self.tempdir.name, + outfile=self.tempdir.name + '/' + name, + input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa') + ) + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + + res = self.__get_stats(self.tempdir.name + '/' + name + graph_file_extension[representation]) + self.assertEqual(res.returncode, 0) + params_str = res.stdout.decode().split('\n')[2:] + self.assertEqual('k: 20', params_str[0]) + self.assertEqual('nodes (k): ' + ('1159851' if name == 'graph1' else '91584'), params_str[1]) + self.assertEqual('canonical mode: yes', params_str[2]) + self.assertFalse(os.path.isfile(self.tempdir.name + '/' + name + '.checkpoint')) if __name__ == '__main__': unittest.main() diff --git a/metagraph/integration_tests/test_build_weighted.py b/metagraph/integration_tests/test_build_weighted.py index d90a79d2fc..53d6525505 100644 --- a/metagraph/integration_tests/test_build_weighted.py +++ b/metagraph/integration_tests/test_build_weighted.py @@ -373,5 +373,41 @@ def test_kmer_count_width_large(self, build, k_width_result): self.assertEqual('avg weight: {}'.format(avg_count_expected), params_str[4]) + def test_build_phase(self): + construct_command = '{exe} build --phase 1 --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} ' \ + '--count-width 16 -o {outfile} {input}'.format( + exe=METAGRAPH, + tmp_dir=self.tempdir.name, + outfile=self.tempdir.name + '/graph', + input=TEST_DATA_DIR + '/transcripts_1000.fa' + ) + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint')) + + construct_command = '{exe} build --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} --count-width 16 ' \ + '-o {outfile} {input}'.format( + exe=METAGRAPH, + tmp_dir=self.tempdir.name, + outfile=self.tempdir.name + '/graph', + input=TEST_DATA_DIR + '/transcripts_1000.fa' + ) + res = subprocess.run([construct_command], shell=True) + self.assertEqual(res.returncode, 0) + + stats_command = '{exe} stats {graph}'.format( + exe=METAGRAPH, + graph=self.tempdir.name + '/graph.dbg', + ) + res = subprocess.run(stats_command.split(), stdout=PIPE) + self.assertEqual(res.returncode, 0) + params_str = res.stdout.decode().split('\n')[2:] + self.assertEqual('k: 20', params_str[0]) + self.assertEqual('nodes (k): 591997', params_str[1]) + self.assertEqual('canonical mode: no', params_str[2]) + self.assertEqual('nnz weights: 591997', params_str[3]) + self.assertEqual('avg weight: 2.48587', params_str[4]) + + if __name__ == '__main__': unittest.main() diff --git a/metagraph/src/cli/build.cpp b/metagraph/src/cli/build.cpp index f501d6fb29..b9e3eff00e 100644 --- a/metagraph/src/cli/build.cpp +++ b/metagraph/src/cli/build.cpp @@ -11,6 +11,7 @@ #include "graph/representation/bitmap/dbg_bitmap_construct.hpp" #include "graph/representation/succinct/dbg_succinct.hpp" #include "graph/representation/succinct/boss_construct.hpp" +#include "graph/representation/succinct/build_checkpoint.hpp" #include "graph/graph_extensions/node_weights.hpp" #include "config/config.hpp" #include "parse_sequences.hpp" @@ -105,6 +106,9 @@ int build_graph(Config *config) { logger->info("k-mer suffix: '{}'", suffix); } + bool checkpoint_enabled = !config->tmp_dir.empty() && suffixes.size() == 1; + boss::BuildCheckpoint checkpoint(checkpoint_enabled ? config->outfbase : "", + config->phase); auto constructor = boss::IBOSSChunkConstructor::initialize( boss_graph->get_k(), config->canonical, @@ -116,12 +120,28 @@ int build_graph(Config *config) { : kmer::ContainerType::VECTOR_DISK, config->tmp_dir.empty() ? std::filesystem::path(config->outfbase).remove_filename() : config->tmp_dir, - config->disk_cap_bytes + config->disk_cap_bytes, + checkpoint ); - push_sequences(files, *config, timer, constructor.get()); + if (checkpoint.checkpoint() == 0) { + push_sequences(files, *config, timer, constructor.get()); + } else { + logger->info("Skipping parsing sequences from input file(s)"); + } + // need to call build_chunk() even if checkpoint.phase()==1, because the + // SortedSetDisk needs to be flushed boss::BOSS::Chunk *next_chunk = constructor->build_chunk(); + + if (checkpoint.phase() <= 2) { // phase 2 stops after generating dummy k-mers + assert(next_chunk == nullptr); + logger->info("Phase {} (checkpoint {}) successfully finished.", + checkpoint.phase(), + checkpoint.checkpoint_for_phase(checkpoint.phase())); + return 0; + } + logger->trace("Graph chunk with {} k-mers was built in {} sec", next_chunk->size() - 1, timer.elapsed()); @@ -139,6 +159,7 @@ int build_graph(Config *config) { } else { graph_data.reset(next_chunk); } + checkpoint.remove_checkpoint(); } assert(graph_data); diff --git a/metagraph/src/cli/config/config.cpp b/metagraph/src/cli/config/config.cpp index 19641cdd58..4374fdeb23 100644 --- a/metagraph/src/cli/config/config.cpp +++ b/metagraph/src/cli/config/config.cpp @@ -346,6 +346,8 @@ Config::Config(int argc, char *argv[]) { tmp_dir = get_value(i++); } else if (!strcmp(argv[i], "--disk-cap-gb")) { disk_cap_bytes = atoi(get_value(i++)) * 1e9; + } else if (!strcmp(argv[i], "--phase")) { + phase = atoi(get_value(i++)); } else if (!strcmp(argv[i], "--anchors-file")) { anchors = get_value(i++); } else if (argv[i][0] == '-') { @@ -541,6 +543,30 @@ Config::Config(int argc, char *argv[]) { if (identity == COMPARE && fnames.size() != 2) print_usage_and_exit = true; + if (identity != BUILD && phase != 3) { + std::cerr << "Error: Phases are only supported for building. Remove --phase" + << std::endl; + print_usage_and_exit = true; + } + + if (identity != BUILD && phase > 3) { + std::cerr << "Error: Invalid phase value. Can be either 1, 2 or 3." << std::endl; + print_usage_and_exit = true; + } + + if (phase != 3 && tmp_dir.empty()) { + std::cerr << "Error: Phases are only supported for disk-based building. " + "Please set --disk-swap." << std::endl; + print_usage_and_exit = true; + } + + if (phase != 3 && suffix_len > 0) { + std::cerr << "Error: Phases are not supported for multiple suffixes. Remove " + "--phase or specify each suffix separately using --suffix" + << std::endl; + print_usage_and_exit = true; + } + if (discovery_fraction < 0 || discovery_fraction > 1) print_usage_and_exit = true; @@ -787,6 +813,7 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) { fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n"); fprintf(stderr, "\t --disk-swap [STR] \tdirectory to use for temporary files [off]\n"); fprintf(stderr, "\t --disk-cap-gb [INT] \tmax temp disk space to use before forcing a merge, in GB [20]\n"); + fprintf(stderr, "\t --phase [INT] \tmax where to stop the computation (1=collect kmers, 2=generate kmers, 3=build all) [3]\n"); } break; case CLEAN: { fprintf(stderr, "Usage: %s clean -o [options] GRAPH\n\n", prog_name.c_str()); diff --git a/metagraph/src/cli/config/config.hpp b/metagraph/src/cli/config/config.hpp index 764f8403a5..acdc2a5b65 100644 --- a/metagraph/src/cli/config/config.hpp +++ b/metagraph/src/cli/config/config.hpp @@ -148,6 +148,8 @@ class Config { size_t disk_cap_bytes = 20e9; // 20GB default + uint32_t phase = 3; // 1 = collect kmers, 2 = generate kmers, 3 = complete build + enum IdentityType { NO_IDENTITY = -1, BUILD = 1, diff --git a/metagraph/src/common/elias_fano.cpp b/metagraph/src/common/elias_fano.cpp index 010119ad29..7bfa89d505 100644 --- a/metagraph/src/common/elias_fano.cpp +++ b/metagraph/src/common/elias_fano.cpp @@ -14,9 +14,11 @@ namespace mtg { namespace common { -void concat(const std::vector &files, const std::string &result) { +std::vector concat(const std::vector &files, const std::string &result) { if (files.empty()) - return; + return {}; + + std::vector original_files; std::vector suffixes = { "", ".up" }; if (std::filesystem::exists(files[0] + ".count")) @@ -24,19 +26,18 @@ void concat(const std::vector &files, const std::string &result) { for (const std::string &suffix : suffixes) { std::string concat_command = "cat "; - for (uint32_t i = 1; i < files.size(); ++i) { + for (uint32_t i = 0; i < files.size(); ++i) { concat_command += files[i] + suffix + " "; } - concat_command += " >> " + files[0] + suffix; - + concat_command += " > " + result + suffix; if (std::system(concat_command.c_str())) throw std::runtime_error("Error while cat-ing files: " + concat_command); - std::filesystem::rename(files[0] + suffix, result + suffix); for (const std::string &f : files) { - std::filesystem::remove(f + suffix); + original_files.push_back(f + suffix); } } + return original_files; } template @@ -212,6 +213,10 @@ EliasFanoEncoder::~EliasFanoEncoder() { template void EliasFanoEncoder::add(T value) { #ifndef NDEBUG + if (value < last_value_) { + logger->trace("Ooops {} {}", last_value_ , value); + logger->info("Ooops {} {}", last_value_ , value); + } assert(value >= last_value_); #endif value -= offset_; diff --git a/metagraph/src/common/elias_fano.hpp b/metagraph/src/common/elias_fano.hpp index 7f31246ab8..5ff319c2a2 100644 --- a/metagraph/src/common/elias_fano.hpp +++ b/metagraph/src/common/elias_fano.hpp @@ -21,7 +21,7 @@ namespace common { * The files store data that is ordered and the values in a file are smaller than the * values in the next file. */ -void concat(const std::vector &files, const std::string &result); +std::vector concat(const std::vector &files, const std::string &result); /** * Elias-Fano encoder that streams the encoded result into a file. diff --git a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp index e3cd3114f0..343aa0cc69 100644 --- a/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp +++ b/metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp @@ -54,6 +54,17 @@ ChunkedWaitQueue& SortedSetDiskBase::data(bool free_buffer) { return merge_queue_; } +template +void SortedSetDiskBase::flush() { + std::unique_lock exclusive_lock(mutex_); + std::unique_lock multi_insert_lock(multi_insert_mutex_); + if (!data_.empty()) { + sort_and_dedupe(); + dump_to_file(true /* is_done */); + } + async_merge_l1_.join(); +} + template std::vector SortedSetDiskBase::files_to_merge() { std::unique_lock exclusive_lock(mutex_); @@ -63,17 +74,20 @@ std::vector SortedSetDiskBase::files_to_merge() { sort_and_dedupe(); dump_to_file(true /* is_done */); } + async_merge_l1_.join(); return get_file_names(); } template -void SortedSetDiskBase::clear(const std::filesystem::path &tmp_path) { +void SortedSetDiskBase::clear(const std::filesystem::path &tmp_path, bool remove_files) { std::unique_lock exclusive_lock(mutex_); std::unique_lock multi_insert_lock(multi_insert_mutex_); is_merging_ = false; - // remove the files that have not been requested to merge - for (const auto &chunk_file : get_file_names()) { - std::filesystem::remove(chunk_file); + if (remove_files) { + // remove the files that have not been requested to merge + for (const auto &chunk_file : get_file_names()) { + std::filesystem::remove(chunk_file); + } } chunk_count_ = 0; l1_chunk_count_ = 0; @@ -94,7 +108,7 @@ void SortedSetDiskBase::start_merging_async() { async_worker_.enqueue([file_names, this]() { std::function on_new_item = [this](const T &v) { merge_queue_.push(v); }; - merge_files(file_names, on_new_item); + merge_files(file_names, on_new_item, false); merge_queue_.shutdown(); }); } diff --git a/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp b/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp index 627cc22eae..e44b7b475b 100644 --- a/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp +++ b/metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp @@ -50,10 +50,9 @@ class SortedSetDiskBase { size_t merge_count); virtual ~SortedSetDiskBase() { - // remove the files that have not been requested to merge - for (const auto &chunk_file : get_file_names()) { - std::filesystem::remove(chunk_file); - } + // not cleaning up unmerged chunk_*** files so that the computation can be resumed + // if building in phases or in case of a crash + async_merge_l1_.join(); // don't leave half-merged chunks behind async_worker_.join(); // make sure the data was processed } @@ -65,6 +64,9 @@ class SortedSetDiskBase { */ ChunkedWaitQueue& data(bool free_buffer = true); + /** Flushes the unwritten buffers to disk. */ + void flush(); + /** * Returns the files to be merged - useful if the caller prefers to do the merging. */ @@ -75,7 +77,7 @@ class SortedSetDiskBase { * sorted set may be expensive when #data_ is large. In these cases, prefer calling * #clear and re-using the buffer. */ - void clear(const std::filesystem::path &tmp_path = "/tmp/"); + void clear(const std::filesystem::path &tmp_path = "/tmp/", bool remove_files = true); /** * Insert already sorted data into the set. This data is written directly to a diff --git a/metagraph/src/common/utils/file_utils.cpp b/metagraph/src/common/utils/file_utils.cpp index 9d0e7f646c..23b003d526 100644 --- a/metagraph/src/common/utils/file_utils.cpp +++ b/metagraph/src/common/utils/file_utils.cpp @@ -38,7 +38,8 @@ void cleanup_tmp_dir_on_exit() { } std::filesystem::path create_temp_dir(std::filesystem::path path, - const std::string &name) { + const std::string &name, + bool clean_on_exit) { if (path.empty()) path = "./"; @@ -48,6 +49,12 @@ std::filesystem::path create_temp_dir(std::filesystem::path path, exit(1); } + logger->trace("Created temporary directory {}", tmp_dir_str); + + if (!clean_on_exit) { + return tmp_dir_str; + } + if (TMP_DIRS.empty()) { if (std::signal(SIGINT, cleanup_tmp_dir_on_signal) == SIG_ERR) logger->error("Couldn't reset the signal handler for SIGINT"); @@ -57,8 +64,6 @@ std::filesystem::path create_temp_dir(std::filesystem::path path, logger->error("Couldn't reset the atexit handler"); } - logger->trace("Registered temporary directory {}", tmp_dir_str); - static std::mutex mu; std::lock_guard lock(mu); diff --git a/metagraph/src/common/utils/file_utils.hpp b/metagraph/src/common/utils/file_utils.hpp index 96718069cd..02dfc3539d 100644 --- a/metagraph/src/common/utils/file_utils.hpp +++ b/metagraph/src/common/utils/file_utils.hpp @@ -14,7 +14,8 @@ namespace utils { std::filesystem::path create_temp_dir(std::filesystem::path path, - const std::string &name = ""); + const std::string &name = "", + bool clean_on_exit = true); bool check_if_writable(const std::string &filename); diff --git a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp index 7cba5408e9..f2491902fb 100644 --- a/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp +++ b/metagraph/src/graph/representation/succinct/boss_chunk_construct.cpp @@ -243,7 +243,8 @@ template void recover_dummy_nodes(const KmerCollector &kmer_collector, Vector &kmers, ChunkedWaitQueue *kmers_out, - ThreadPool &async_worker) { + ThreadPool &async_worker, + BuildCheckpoint* ) { using KMER = get_first_type_t; using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $ using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer @@ -347,22 +348,31 @@ using Decoder = common::EliasFanoDecoder; template std::vector split(size_t k, const std::filesystem::path &dir, - const ChunkedWaitQueue &kmers) { + const ChunkedWaitQueue &kmers, + BuildCheckpoint *checkpoint) { using T_INT_REAL = get_int_t; const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size(); size_t chunk_count = std::pow(alphabet_size, 2); - logger->trace("Splitting k-mers into {} chunks...", chunk_count); - std::vector> sinks; std::vector names(chunk_count); for (size_t i = 0; i < names.size(); ++i) { names[i] = dir/("real_F_W_" + std::to_string(i)); + } + + assert(checkpoint->checkpoint() >= 2); + if (checkpoint->checkpoint() > 2) { + logger->trace("Skipping splitting k-mers into chunks"); + return names; + } + + for (size_t i = 0; i < names.size(); ++i) { sinks.emplace_back(names[i], ENCODER_BUFFER_SIZE); } + logger->trace("Splitting k-mers into {} chunks...", chunk_count); size_t num_kmers = 0; for (auto &it = kmers.begin(); it != kmers.end(); ++it) { const T_REAL &kmer = *it; @@ -374,6 +384,9 @@ std::vector split(size_t k, } std::for_each(sinks.begin(), sinks.end(), [](auto &f) { f.finish(); }); logger->trace("Total number of real k-mers: {}", num_kmers); + + checkpoint->set_checkpoint(3); + return names; } @@ -388,17 +401,67 @@ void skip_same_suffix(const KMER &el, Decoder &decoder, size_t suf) { } } +std::pair, std::string> +concatenate_chunks(const std::filesystem::path &dir, + const std::vector &dummy_sink_names, + const std::vector &real_F_W, + BuildCheckpoint *checkpoint) { + const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size(); + + std::vector real_split_by_W(alphabet_size); + std::string dummy_sink_name = dir / "dummy_sink"; + for (TAlphabet W = 0; W < alphabet_size; ++W) { + real_split_by_W[W] = dir/("real_split_by_W_" + std::to_string(W)); + } + + assert(checkpoint->checkpoint() >= 4); + if (checkpoint->checkpoint() > 4) { + logger->trace("Skipping concatenating chunks..."); + return { real_split_by_W, dummy_sink_name }; + } + + // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply + // concatenating the blocks will result in a single ordered block + logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...", + dummy_sink_names.size()); + std::vector to_delete + = common::concat(dummy_sink_names, dummy_sink_name); + + // similarly, the 16 blocks of the original k-mers can be concatenated in + // groups of 4 without destroying the order + logger->trace("Concatenating blocks of original real k-mers ({} -> {})...", + real_F_W.size(), alphabet_size); + for (TAlphabet W = 0; W < alphabet_size; ++W) { + std::vector blocks; + for (TAlphabet F = 0; F < alphabet_size; ++F) { + blocks.push_back(real_F_W[F * alphabet_size + W]); + } + std::vector original + = common::concat(blocks, real_split_by_W[W]); + to_delete.insert(to_delete.end(), original.begin(), original.end()); + } + + checkpoint->set_checkpoint(5); + + for (const auto &name : to_delete) { + std::filesystem::remove(name); + } + + return { real_split_by_W, dummy_sink_name }; +} + /** * Generates non-redundant dummy-1 source k-mers and dummy sink kmers from #kmers. * @return a triplet containing the names of the original k-mer blocks, the dummy-1 source * k-mer blocks and the dummy sink k-mers */ template -std::tuple, std::vector, std::string> +std::pair, std::vector> generate_dummy_1_kmers(size_t k, size_t num_threads, const std::filesystem::path &dir, - ChunkedWaitQueue &kmers) { + ChunkedWaitQueue &kmers, + BuildCheckpoint *checkpoint) { using KMER = get_first_type_t; // 64/128/256-bit KmerExtractorBOSS::KmerBOSS using KMER_INT = typename KMER::WordType; // KmerExtractorBOSS::KmerBOSS::WordType @@ -406,7 +469,7 @@ generate_dummy_1_kmers(size_t k, using KMER_INT_REAL = typename KMER_REAL::WordType; // KmerExtractorT::KmerBOSS::WordType // for a DNA alphabet, this will contain 16 chunks, split by kmer[0] and kmer[1] - std::vector real_F_W = split(k, dir, kmers); + std::vector real_F_W = split(k, dir, kmers, checkpoint); const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size(); @@ -417,6 +480,15 @@ generate_dummy_1_kmers(size_t k, for (TAlphabet i = 0; i < alphabet_size; ++i) { dummy_l1_names[i] = dir/("dummy_source_1_" + std::to_string(i)); dummy_sink_names[i] = dir/("dummy_sink_" + std::to_string(i)); + } + + assert(checkpoint->checkpoint() >= 3); + if (checkpoint->checkpoint() > 3) { + logger->trace("Skipping generating dummy-1 source k-mers and dummy sink kmers"); + return { dummy_sink_names, real_F_W }; + } + + for (TAlphabet i = 0; i < alphabet_size; ++i) { dummy_l1_chunks.emplace_back(dummy_l1_names[i], ENCODER_BUFFER_SIZE); dummy_sink_chunks.emplace_back(dummy_sink_names[i], ENCODER_BUFFER_SIZE); } @@ -430,13 +502,12 @@ generate_dummy_1_kmers(size_t k, #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1) for (TAlphabet F = 0; F < alphabet_size; ++F) { - // stream k-mers of pattern ***F* std::vector F_chunks(real_F_W.begin() + F * alphabet_size, real_F_W.begin() + (F + 1) * alphabet_size); common::MergeDecoder it(F_chunks, false); - std::vector W_chunks; // chunks with k-mers of the form ****F + std::vector W_chunks; // chunks with k-mers of the form ****F for (TAlphabet c = 0; c < alphabet_size; ++c) { W_chunks.push_back(real_F_W[c * alphabet_size + F]); } @@ -448,11 +519,10 @@ generate_dummy_1_kmers(size_t k, skip_same_suffix(dummy_source, it, 0); dummy_source.to_prev(k + 1, 0); // generate dummy sink k-mers from all non-dummy kmers smaller than |dummy_source| - while (!sink_gen_it.empty() - && sink_gen_it.top() < dummy_source.data()) { + while (!sink_gen_it.empty() && sink_gen_it.top() < dummy_source.data()) { KMER_REAL v(sink_gen_it.pop()); - // skip k-mers with the same suffix as v, as they generate identical dummy - // sink k-mers + // skip k-mers with the same suffix as v, as they generate identical + // dummy sink k-mers skip_same_suffix(v, sink_gen_it, 1); dummy_sink_chunks[F].add(kmer::get_sink_and_lift(v, k + 1)); } @@ -488,58 +558,11 @@ generate_dummy_1_kmers(size_t k, num_source += dummy_l1_chunks[i].size(); } - logger->trace("Generated {} dummy sink and {} dummy source k-mers", - num_sink, num_source); - - // dummy sink k-mers are partitioned into blocks by F (kmer[1]), so simply - // concatenating the blocks will result in a single ordered block - logger->trace("Concatenating blocks of dummy sink k-mers ({} -> 1)...", - dummy_sink_names.size()); - std::string dummy_sink_name = dir/"dummy_sink"; - common::concat(dummy_sink_names, dummy_sink_name); + logger->trace("Generated {} dummy sink and {} dummy source k-mers", num_sink, + num_source); + checkpoint->set_checkpoint(4); - // similarly, the 16 blocks of the original k-mers can be concatenated in groups of - // 4 without destroying the order - logger->trace("Concatenating blocks of original real k-mers ({} -> {})...", - real_F_W.size(), alphabet_size); - std::vector real_split_by_W; - for (TAlphabet W = 0; W < alphabet_size; ++W) { - std::vector blocks; - for (TAlphabet F = 0; F < alphabet_size; ++F) { - blocks.push_back(real_F_W[F * alphabet_size + W]); - } - real_split_by_W.push_back(dir/("real_split_by_W_" + std::to_string(W))); - common::concat(blocks, real_split_by_W.back()); - } - return { real_split_by_W, dummy_l1_names, dummy_sink_name }; -} - -/** Merges #original_kmers with #reverse_complements and places the result into #kmers */ -template -static void merge(common::EliasFanoDecoder &original_kmers, - ChunkedWaitQueue &reverse_complements, - ChunkedWaitQueue *kmers) { - auto &kmers_int = reinterpret_cast &>(*kmers); - auto &it = reverse_complements.begin(); - std::optional orig = original_kmers.next(); - while (it != reverse_complements.end() && orig.has_value()) { - if (get_first(orig.value()) < get_first(*it)) { - kmers_int.push(orig.value()); - orig = original_kmers.next(); - } else { - kmers_int.push(*it); - ++it; - } - } - while (it != reverse_complements.end()) { - kmers_int.push(*it); - ++it; - } - while (orig.has_value()) { - kmers_int.push(orig.value()); - orig = original_kmers.next(); - } - kmers->shutdown(); + return { dummy_sink_names, real_F_W }; } /** @@ -551,48 +574,102 @@ void add_reverse_complements(size_t k, size_t buffer_size, const std::filesystem::path &dir, ThreadPool& async_worker, - ChunkedWaitQueue *kmers) { + ChunkedWaitQueue *kmers, + BuildCheckpoint *checkpoint) { + assert(checkpoint->checkpoint() >= 1); + if (checkpoint->checkpoint() > 2) { + logger->trace("Skipping generating reverse complements"); + return; + } using T_INT_REAL = get_int_t; // either KMER_INT or - std::string rc_dir = dir/"rc"; - std::filesystem::create_directory(rc_dir); - auto rc_set = std::make_unique>( - num_threads, buffer_size, rc_dir, std::numeric_limits::max()); - logger->trace("Adding reverse complements..."); - common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE); - Vector buffer; - buffer.reserve(10'000); - for (auto &it = kmers->begin(); it != kmers->end(); ++it) { - const T_REAL &kmer = *it; - const T_REAL &reverse = rev_comp(k + 1, *it, KmerExtractor2Bit().complement_code()); - if (get_first(kmer) != get_first(reverse)) { - buffer.push_back(reinterpret_cast(reverse)); - if (buffer.size() == buffer.capacity()) { - rc_set->insert(buffer.begin(), buffer.end()); - buffer.resize(0); + std::unique_ptr> rc_set; + std::vector to_merge = { dir/"original" }; + if (checkpoint->checkpoint() == 2) { + logger->trace( + "Continuing from checkpoint 2. Looking for 'original' and " + "'rc/chunk_*' in {}", + checkpoint->kmer_dir()); + if (!std::filesystem::exists(checkpoint->kmer_dir()/"original")) { + logger->error( + "Could not find {}. Recovery not possible. Remove tmp dir to " + "restart the computation.", + checkpoint->kmer_dir()/"original"); + std::exit(1); + } + for (const auto &path : std::filesystem::directory_iterator(checkpoint->kmer_dir()/"rc")) { + if (path.is_regular_file() + && path.path().filename().string().find("chunk_", 0) == 0 + && path.path().filename().extension() == "") { + logger->trace("Found chunk: {}", path.path().string()); + to_merge.push_back(path.path().string()); } - original.add(reinterpret_cast(kmer)); - } else { - if constexpr (utils::is_pair_v) { - using C = typename T_REAL::second_type; - if (kmer.second >> (sizeof(C) * 8 - 1)) { - original.add({ kmer.first.data(), std::numeric_limits::max() }); - } else { - original.add({ kmer.first.data(), 2 * kmer.second }); + } + if (to_merge.size() == 1) { + logger->error( + "Could not find chunk_* files in {}. Recovery not possible. " + "Remove temp dir to restart the computation from scratch.", + checkpoint->kmer_dir()); + std::exit(1); + } + } else { // checkpoint->checkpoint() == 1 + std::string rc_dir = dir/"rc"; + std::filesystem::create_directory(rc_dir); + rc_set = std::make_unique>( + num_threads, buffer_size, rc_dir, std::numeric_limits::max()); + + common::EliasFanoEncoderBuffered original(dir/"original", ENCODER_BUFFER_SIZE); + Vector buffer; + buffer.reserve(10'000); + logger->trace("Adding reverse complements..."); + uint64_t orig_count = 0, rc_count = 0; + for (auto &it = kmers->begin(); it != kmers->end(); ++it) { + orig_count++; + const T_REAL &kmer = *it; + const T_REAL &reverse + = rev_comp(k + 1, *it, KmerExtractor2Bit().complement_code()); + if (get_first(kmer) != get_first(reverse)) { + rc_count++; + buffer.push_back(reinterpret_cast(reverse)); + if (buffer.size() == buffer.capacity()) { + rc_set->insert(buffer.begin(), buffer.end()); + buffer.resize(0); } - } else { original.add(reinterpret_cast(kmer)); + } else { + if constexpr (utils::is_pair_v) { + using C = typename T_REAL::second_type; + if (kmer.second >> (sizeof(C) * 8 - 1)) { + original.add({ kmer.first.data(), std::numeric_limits::max() }); + } else { + original.add({ kmer.first.data(), 2 * kmer.second }); + } + } else { + original.add(reinterpret_cast(kmer)); + } } } + logger->trace("Added {} orig and {} rc", orig_count, rc_count); + rc_set->insert(buffer.begin(), buffer.end()); + std::vector to_insert = rc_set->files_to_merge(); + to_merge.insert(to_merge.end(), to_insert.begin(), to_insert.end()); + rc_set->clear(dir, false /* don't delete chunk files! */); + original.finish(); + checkpoint->set_checkpoint(2); } - rc_set->insert(buffer.begin(), buffer.end()); - original.finish(); + // start merging #original with #reverse_complements into #kmers kmers->reset(); - async_worker.enqueue([rc_set = std::move(rc_set), &dir, kmers]() { - ChunkedWaitQueue &reverse_complements = rc_set->data(true); - common::EliasFanoDecoder original_kmers(dir / "original"); - merge(original_kmers, reverse_complements, kmers); + async_worker.enqueue([to_merge = std::move(to_merge), kmers]() { + uint64_t total_kmers = 0; + common::MergeDecoder chunked_kmers(to_merge, false); + auto &kmers_int = reinterpret_cast &>(*kmers); + while (!chunked_kmers.empty()) { + kmers_int.push(chunked_kmers.pop()); + total_kmers++; + } + kmers->shutdown(); + logger->trace("Merge {} files with {} k-mers", to_merge.size(), total_kmers); }); } @@ -611,75 +688,153 @@ template void recover_dummy_nodes(const KmerCollector &kmer_collector, ChunkedWaitQueue &kmers, ChunkedWaitQueue *kmers_out, - ThreadPool &async_worker) { - using KMER_REAL = get_first_type_t; // 64/128/256-bit KmerBOSS - using T_INT_REAL = get_int_t; // either KMER_INT or - - using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $ - using KMER_INT = typename KMER::WordType; // 64/128/256-bit integer + ThreadPool &async_worker, + BuildCheckpoint *checkpoint) { + using KMER_REAL = get_first_type_t; // 64/128/256-bit KmerBOSS on 2 bits + using T_INT_REAL = get_int_t; // either KMER_REAL or + + using KMER = get_first_type_t; // 64/128/256-bit KmerBOSS with sentinel $ (on 3 bits) + using KMER_INT = typename KMER::WordType; // the 64/128/256-bit integer in KMER + + // if we reached this code, we are obviously at checkpoint 1, but we delay setting the + // checkpoint until here, so that we can differentiate between re-starting a build + // at checkpoint 1, and the continuation of a build from checkpoint 0 to 1 + bool stopped_at_phase_one = checkpoint->checkpoint() == 1; + if (checkpoint->checkpoint() == 0) { + checkpoint->set_kmer_dir(kmer_collector.tmp_dir()); + checkpoint->set_checkpoint(1); + } size_t k = kmer_collector.get_k() - 1; - const std::filesystem::path dir = kmer_collector.tmp_dir(); + const std::filesystem::path dir = checkpoint->kmer_dir(); size_t num_threads = kmer_collector.num_threads(); + std::vector file_names; + if (stopped_at_phase_one) { + logger->trace( + "Continuing from checkpoint 1. Looking for chunk_* files in {}", + checkpoint->kmer_dir()); + namespace fs = std::filesystem; + std::vector entries; + if (fs::canonical(checkpoint->kmer_dir()).filename().string().rfind("temp_kmers") == 0) { + entries.push_back(checkpoint->kmer_dir()); + } else { + for (const auto & entry : fs::directory_iterator(checkpoint->kmer_dir())) { + if (entry.is_directory() + && fs::canonical(entry.path()).filename().string().rfind("temp_kmers") + == 0) { + entries.push_back(entry.path()); + continue; + } + } + } + for (const fs::path & entry : entries) { + logger->trace("Adding chunks in {}", fs::canonical(entry)); + for (const auto &path : fs::directory_iterator(entry)) { + if (path.is_regular_file() + && path.path().filename().string().find("chunk_", 0) == 0 + && path.path().filename().extension() == "") { + logger->trace("Found chunk: {}, size: {}", path.path().string(), + std::filesystem::file_size(path)); + file_names.push_back(path.path().string()); + } + } + } + if (file_names.empty()) { + logger->error( + "Could not find chunk_* files in {}. Recovery not possible. " + "Remove temp dir to restart the computation from scratch.", + checkpoint->kmer_dir()); + std::exit(1); + } + kmers.reset(); + async_worker.enqueue([&kmers, file_names]() { + auto &kmers_int = reinterpret_cast &>(kmers); + std::function on_new_item + = [&kmers_int](const T_INT_REAL &v) { kmers_int.push(v); }; + common::merge_files(file_names, on_new_item, false); + kmers.shutdown(); + std::for_each(file_names.begin(), file_names.end(), + [](const auto &f) { std::filesystem::remove(f); }); + }); + } if (kmer_collector.is_both_strands_mode()) { // compute the reverse complements of #kmers, then merge back into #kmers add_reverse_complements(k, num_threads, kmer_collector.buffer_size(), dir, - async_worker, &kmers); + async_worker, &kmers, checkpoint); + } else if (checkpoint->checkpoint() < 2) { + checkpoint->set_checkpoint(2); } - std::string dummy_sink_name; + auto [dummy_sink_names, real_F_W] + = generate_dummy_1_kmers(k, num_threads, dir, kmers, checkpoint); + std::vector real_split_by_W; - std::vector dummy_names; - std::tie(real_split_by_W, dummy_names, dummy_sink_name) - = generate_dummy_1_kmers(k, num_threads, dir, kmers); - - // stores the sorted original kmers and dummy-1 k-mers - std::vector dummy_chunks = { dummy_sink_name }; - // generate dummy k-mers of prefix length 1..k - logger->trace("Starting generating dummy-1..k source k-mers..."); + std::string dummy_sink_name; + std::tie(real_split_by_W, dummy_sink_name) + = concatenate_chunks(dir, dummy_sink_names, real_F_W, checkpoint); + + // file names for the dummy_sink and dummy_source_1..k_0..3 kmers + std::vector dummy_chunk_names; + const uint8_t alphabet_size = KmerExtractor2Bit().alphabet.size(); for (size_t dummy_pref_len = 1; dummy_pref_len <= k; ++dummy_pref_len) { - // this will compress all sorted dummy k-mers of given prefix length - for (const std::string &f : dummy_names) { - dummy_chunks.push_back(f); + for (TAlphabet i = 0; i < alphabet_size; ++i) { + std::string suffix = std::to_string(dummy_pref_len) + "_" + std::to_string(i); + dummy_chunk_names.push_back(dir/("dummy_source_" + suffix)); } + } + dummy_chunk_names.push_back(dummy_sink_name); + + if (checkpoint->checkpoint() < 6) { + // generate dummy k-mers of prefix length 1..k + logger->trace("Starting generating dummy-1..{} source k-mers...", k); + for (size_t dummy_pref_len = 1; dummy_pref_len < k; ++dummy_pref_len) { + + std::vector> next_chunks; + for (TAlphabet i = 0; i < alphabet_size; ++i) { + next_chunks.emplace_back( + dummy_chunk_names[dummy_pref_len * alphabet_size + i], + ENCODER_BUFFER_SIZE); + } - const uint8_t alphabet_size = KmerExtractorBOSS::alphabet.size(); - std::vector dummy_next_names(alphabet_size); - std::vector> dummy_next_chunks; - for (TAlphabet i = 0; i < alphabet_size; ++i) { - dummy_next_names[i] = dir/("dummy_source_" - + std::to_string(dummy_pref_len + 1) + "_" + std::to_string(i)); - dummy_next_chunks.emplace_back(dummy_next_names[i], ENCODER_BUFFER_SIZE); + // chunks containing dummy k-mers of prefix length dummy_pref_len + auto begin = dummy_chunk_names.begin() + (dummy_pref_len - 1) * alphabet_size; + std::vector current_names(begin, begin + alphabet_size); + + KMER prev_kmer(0); + uint64_t num_kmers = 0; + const std::function &write_dummy + = [&](const KMER_INT &v) { + KMER kmer(v); + assert(kmer[0]); + kmer.to_prev(k + 1, BOSS::kSentinelCode); + if (prev_kmer != kmer) { + next_chunks[kmer[0] - 1].add(kmer.data()); + prev_kmer = std::move(kmer); + } + num_kmers++; + }; + common::merge_files(current_names, write_dummy, false); + + std::for_each(next_chunks.begin(), next_chunks.end(), + [](auto &v) { v.finish(); }); + logger->trace("Number of dummy k-mers with dummy prefix of length {}: {}", + dummy_pref_len, num_kmers); } - KMER prev_kmer(0); - uint64_t num_kmers = 0; - const std::function &write_dummy = [&](const KMER_INT &v) { - KMER kmer(v); - kmer.to_prev(k + 1, BOSS::kSentinelCode); - if (prev_kmer != kmer) { - dummy_next_chunks[kmer[0]].add(kmer.data()); - prev_kmer = std::move(kmer); - } - num_kmers++; - }; - common::merge_files(dummy_names, write_dummy, false); - - std::for_each(dummy_next_chunks.begin(), dummy_next_chunks.end(), - [](auto &v) { v.finish(); }); - dummy_names = std::move(dummy_next_names); - logger->trace("Number of dummy k-mers with dummy prefix of length {}: {}", - dummy_pref_len, num_kmers); - } - // remove the last chunks with .up and .count - const std::function on_merge = [](const KMER_INT &) {}; - common::merge_files(dummy_names, on_merge); - - // at this point, we have the original k-mers and dummy-1 k-mers in original_and_dummy_l1, - // the dummy-x k-mers in dummy_source_{x}, and we merge them all into a single stream + checkpoint->set_checkpoint(6); + } else { + logger->trace("Skipping generating dummy-1..{} source k-mers", k); + } + + // at this point, we have the original k-mers in real_split_by_W, the dummy-x k-mers + // in dummy_chunks, and we merge them all into a single stream kmers_out->reset(); + if (checkpoint->phase() < 3) { + return; + } + // add the main dummy source k-mer if constexpr (utils::is_pair_v) { kmers_out->push({KMER(0), 0}); @@ -691,7 +846,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector, = kmer::get_sentinel_delta(KMER::kBitsPerChar, k + 1); // push all other dummy and non-dummy k-mers to |kmers_out| - async_worker.enqueue([k, kmer_delta, kmers_out, real_split_by_W, dummy_chunks]() { + async_worker.enqueue([k, kmer_delta, kmers_out, real_split_by_W, dummy_chunk_names]() { common::Transformed, T> decoder( [&](const T_INT_REAL &v) { if constexpr (utils::is_pair_v) { @@ -702,7 +857,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector, return kmer::transform(reinterpret_cast(v), k + 1) + kmer_delta; } }, - real_split_by_W, true + real_split_by_W, false /* remove sources */ ); common::Transformed, T> decoder_dummy( @@ -713,7 +868,7 @@ void recover_dummy_nodes(const KmerCollector &kmer_collector, return reinterpret_cast(v); } }, - dummy_chunks, true + dummy_chunk_names, false /* remove sources */ ); while (!decoder.empty() && !decoder_dummy.empty()) { @@ -769,7 +924,8 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor { size_t num_threads, double memory_preallocated, const std::filesystem::path &swap_dir, - size_t max_disk_space) + size_t max_disk_space, + const BuildCheckpoint &checkpoint) : swap_dir_(swap_dir), kmer_collector_(k + 1, both_strands_mode, @@ -779,26 +935,54 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor { swap_dir, max_disk_space, both_strands_mode && filter_suffix.empty() /* keep only canonical k-mers */), - bits_per_count_(bits_per_count) { + bits_per_count_(bits_per_count), + checkpoint_(checkpoint) { if (filter_suffix.size() && filter_suffix == std::string(filter_suffix.size(), BOSS::kSentinel)) { kmer_collector_.add_kmer(std::vector(k + 1, BOSS::kSentinelCode)); } } - void add_sequence(std::string_view sequence, uint64_t count) { + void add_sequence(std::string_view sequence, uint64_t count) override { kmer_collector_.add_sequence(sequence, count); } - void add_sequences(std::vector&& sequences) { + void add_sequences(std::vector&& sequences) override { kmer_collector_.add_sequences(std::move(sequences)); } - void add_sequences(std::vector>&& sequences) { + void add_sequences(std::vector>&& sequences) override { kmer_collector_.add_sequences(std::move(sequences)); } - BOSS::Chunk* build_chunk() { + template + BOSS::Chunk *build_chunk_2bit(Container &kmers) { + Timer timer; + ChunkedWaitQueue> queue(ENCODER_BUFFER_SIZE); + + logger->trace("Reconstructing all required dummy source k-mers..."); + recover_dummy_nodes(kmer_collector_, kmers, &queue, async_worker_, &checkpoint_); + logger->trace("Dummy source k-mers were reconstructed in {} sec", timer.elapsed()); + + if (checkpoint_.phase() == 2) { + return nullptr; + } + return new BOSS::Chunk(KmerExtractorBOSS().alphabet.size(), + kmer_collector_.get_k() - 1, + kmer_collector_.is_both_strands_mode(), queue, + bits_per_count_, swap_dir_); + } + + BOSS::Chunk* build_chunk() override { + if constexpr(utils::is_instance_v) { + if (checkpoint_.phase() == 1) { + kmer_collector_.kmers().flush(); + checkpoint_.set_kmer_dir(kmer_collector_.tmp_dir()); + checkpoint_.set_checkpoint(1); + return nullptr; + } + } + BOSS::Chunk *result; typename KmerCollector::Data &kmer_ints = kmer_collector_.data(); @@ -821,26 +1005,12 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor { KmerExtractor2Bit>); assert(!kmer_collector_.suffix_length()); - logger->trace("Reconstructing all required dummy source k-mers..."); - Timer timer; - -#define INIT_CHUNK(KMER) \ - ChunkedWaitQueue> queue(ENCODER_BUFFER_SIZE); \ - recover_dummy_nodes(kmer_collector_, kmers, &queue, async_worker_); \ - logger->trace("Dummy source k-mers were reconstructed in {} sec", timer.elapsed()); \ - result = new BOSS::Chunk(KmerExtractorBOSS().alphabet.size(), \ - kmer_collector_.get_k() - 1, \ - kmer_collector_.is_both_strands_mode(), \ - queue, \ - bits_per_count_, \ - swap_dir_) - if (kmer_collector_.get_k() * KmerExtractorBOSS::bits_per_char <= 64) { - INIT_CHUNK(KmerExtractorBOSS::Kmer64); + result = build_chunk_2bit(kmers); } else if (kmer_collector_.get_k() * KmerExtractorBOSS::bits_per_char <= 128) { - INIT_CHUNK(KmerExtractorBOSS::Kmer128); + result = build_chunk_2bit(kmers); } else { - INIT_CHUNK(KmerExtractorBOSS::Kmer256); + result = build_chunk_2bit(kmers); } } @@ -849,14 +1019,15 @@ class BOSSChunkConstructor : public IBOSSChunkConstructor { return result; } - uint64_t get_k() const { return kmer_collector_.get_k() - 1; } + uint64_t get_k() const override { return kmer_collector_.get_k() - 1; } private: std::filesystem::path swap_dir_; KmerCollector kmer_collector_; uint8_t bits_per_count_; - /** Used as an async executor for merging chunks from disk */ + /** Async executor for merging chunks, generating reverse complements, etc. */ ThreadPool async_worker_ = ThreadPool(1, 1); + BuildCheckpoint checkpoint_; }; template