diff --git a/metagraph/benchmarks/benchmark_query.cpp b/metagraph/benchmarks/benchmark_query.cpp index a85a1bc7bb..84fd8aee5e 100644 --- a/metagraph/benchmarks/benchmark_query.cpp +++ b/metagraph/benchmarks/benchmark_query.cpp @@ -18,10 +18,11 @@ namespace mtg::cli { // Forward-declared because construct_query_graph is no longer in the public -// header; the benchmark links against it directly. +// header; the benchmark links against it directly. Takes a getter so the +// annotation can be loaded asynchronously while the graph is being mapped. std::unique_ptr construct_query_graph(const graph::DeBruijnGraph &full_dbg, - const graph::AnnotatedDBG::Annotator &full_annotation, + const std::function &get_annotation, StringGenerator call_sequences, size_t num_threads, const Config &config); @@ -75,7 +76,7 @@ std::unique_ptr build_query_graph(const AnnotatedDBG &anno_graph, mtg::cli::Config config(sizeof(argv) / sizeof(argv[0]), const_cast(argv)); return cli::construct_query_graph( anno_graph.get_graph(), - anno_graph.get_annotator(), + [&anno_graph]() { return &anno_graph.get_annotator(); }, [&](std::function call_sequence) { seq_io::read_fasta_file_critical( query_filename, diff --git a/metagraph/src/cli/config/config.cpp b/metagraph/src/cli/config/config.cpp index 4bdc3cb9b8..59289d1f9e 100644 --- a/metagraph/src/cli/config/config.cpp +++ b/metagraph/src/cli/config/config.cpp @@ -182,6 +182,8 @@ Config::Config(int argc, char *argv[]) { num_columns_cached = atoi(get_value(i++)); } else if (!strcmp(argv[i], "--batch-size")) { query_batch_size = atoll(get_value(i++)); + } else if (!strcmp(argv[i], "--batch-min-matches")) { + batch_min_matches = std::stod(get_value(i++)); } else if (!strcmp(argv[i], "-p") || !strcmp(argv[i], "--parallel")) { set_num_threads(atoi(get_value(i++))); } else if (!strcmp(argv[i], "--parallel-nodes")) { @@ -1371,6 +1373,7 @@ if (advanced) { fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n"); // fprintf(stderr, "\t --cache-size [INT] \tnumber of uncompressed rows to store in the cache [0]\n"); fprintf(stderr, "\t --batch-size [INT] \tquery batch size in bp (0 to disable batch query) [100'000'000]\n"); + fprintf(stderr, "\t --batch-min-matches [FLOAT] \taggregate batches unless this they have this ratio of k-mer matches [0.0]\n"); if (advanced) { fprintf(stderr, "\t --threads-each [INT]\tnumber of parallel batches [1]\n"); fprintf(stderr, "\t --RA-ivbuff-size [INT] \tsize (in bytes) of int_vector_buffer used in random access mode (e.g. by row disk annotator) [16384]\n"); diff --git a/metagraph/src/cli/config/config.hpp b/metagraph/src/cli/config/config.hpp index 9228e6cb63..92b598b3fa 100644 --- a/metagraph/src/cli/config/config.hpp +++ b/metagraph/src/cli/config/config.hpp @@ -146,6 +146,7 @@ class Config { double min_fraction = 0.0; double max_fraction = 1.0; double cleaning_threshold_percentile = 0.001; + double batch_min_matches = 0.0; std::vector count_slice_quantiles; std::vector count_quantiles; diff --git a/metagraph/src/cli/query.cpp b/metagraph/src/cli/query.cpp index db4da5f32d..daba2396fa 100644 --- a/metagraph/src/cli/query.cpp +++ b/metagraph/src/cli/query.cpp @@ -14,6 +14,7 @@ #include "common/vectors/vector_algorithm.hpp" #include "annotation/representation/annotation_matrix/static_annotators_def.hpp" #include "graph/alignment/dbg_aligner.hpp" +#include "graph/representation/canonical_dbg.hpp" #include "graph/representation/hash/dbg_hash_ordered.hpp" #include "graph/representation/succinct/dbg_succinct.hpp" #include "graph/representation/succinct/boss_construct.hpp" @@ -21,6 +22,7 @@ #include "config/config.hpp" #include "load/load_graph.hpp" #include "load/load_annotated_graph.hpp" +#include "load/load_annotation.hpp" #include "graph/alignment/score_kmer_presence_mask.hpp" #include "cli/align.hpp" @@ -864,21 +866,21 @@ void split_contigs_for_rebalancing(size_t k, * (b, canonical) If the full graph is canonical, rebuild the query graph * in the canonical mode storing all k-mers found in the full * graph. - * - * 3. Extract annotation for the nodes of the query graph and return. */ -std::unique_ptr -construct_query_graph(const DeBruijnGraph &full_dbg, - const AnnotatedDBG::Annotator &full_annotation, - StringGenerator call_sequences, - size_t num_threads, - const Config &config) { +std::pair>>, size_t> +construct_contigs(const DeBruijnGraph &full_dbg, + StringGenerator call_sequences, + size_t num_threads, + const Config &config, + size_t *sub_k_ptr) { const auto *dbg_succ = dynamic_cast(&full_dbg); assert(full_dbg.get_mode() != DeBruijnGraph::PRIMARY && "primary graphs must be wrapped into canonical"); - size_t sub_k = full_dbg.get_k(); + assert(sub_k_ptr); + auto &sub_k = *sub_k_ptr; + sub_k = full_dbg.get_k(); size_t max_hull_forks = 0; size_t max_hull_depth = 0; size_t max_num_nodes_per_suffix = 1; @@ -910,29 +912,21 @@ construct_query_graph(const DeBruijnGraph &full_dbg, auto graph_init = std::make_shared(full_dbg.get_k()); size_t max_input_sequence_length = 0; + const mtg::kmer::KmerBloomFilter<> *bf = nullptr; if (kPrefilterWithBloom && dbg_succ && sub_k == full_dbg.get_k()) { - if (dbg_succ->get_bloom_filter()) + if ((bf = dbg_succ->get_bloom_filter())) logger->trace( "[Query graph construction] Started indexing k-mers pre-filtered " "with Bloom filter"); - - call_sequences([&](const std::string &sequence) { - // TODO: implement add_sequence with filter for all graph representations - graph_init->add_sequence( - sequence, - get_missing_kmer_skipper(dbg_succ->get_bloom_filter(), sequence) - ); - if (max_input_sequence_length < sequence.length()) - max_input_sequence_length = sequence.length(); - }); - } else { - call_sequences([&](const std::string &sequence) { - graph_init->add_sequence(sequence); - if (max_input_sequence_length < sequence.length()) - max_input_sequence_length = sequence.length(); - }); } + call_sequences([&](const std::string &seq) { + // TODO: implement add_sequence with filter for all graph representations + graph_init->add_sequence(seq, get_missing_kmer_skipper(bf, seq)); + if (max_input_sequence_length < seq.length()) + max_input_sequence_length = seq.length(); + }); + max_hull_depth = std::min( max_hull_depth, static_cast(max_hull_depth_per_seq_char * max_input_sequence_length) @@ -1020,55 +1014,72 @@ construct_query_graph(const DeBruijnGraph &full_dbg, contigs.size() - hull_contigs_begin, timer.elapsed()); } - graph_init.reset(); - timer.reset(); + return std::make_pair(std::move(contigs), (size_t)num_found_kmers); +} + +auto construct_query_graph(std::vector>>&& contigs, + size_t num_threads, + size_t k, + DeBruijnGraph::Mode mode, + size_t sub_k) { + assert(mode != DeBruijnGraph::PRIMARY && "primary graphs must be wrapped into canonical"); + + Timer timer; std::shared_ptr graph; // restrict nodes to those in the full graph - if (sub_k < full_dbg.get_k()) { - BOSSConstructor constructor(full_dbg.get_k() - 1, - full_dbg.get_mode() == DeBruijnGraph::CANONICAL, - 0, "", 0, num_threads); - add_to_graph(constructor, contigs, full_dbg.get_k()); + if (sub_k < k) { + BOSSConstructor constructor(k - 1, mode == DeBruijnGraph::CANONICAL, 0, "", 0, num_threads); + add_to_graph(constructor, contigs, k); - graph = std::make_shared(new BOSS(&constructor), full_dbg.get_mode()); + graph = std::make_shared(new BOSS(&constructor), mode); } else { - graph = std::make_shared(full_dbg.get_k(), full_dbg.get_mode()); - add_to_graph(*graph, contigs, full_dbg.get_k()); + graph = std::make_shared(k, mode); + add_to_graph(*graph, contigs, k); } logger->trace("[Query graph construction] Query graph contains {} k-mers" " and took {} sec to construct", graph->num_nodes(), timer.elapsed()); - std::vector> from_full_to_small; + VectorMap from_full_to_small_map; + from_full_to_small_map.reserve(graph->num_nodes()); - #pragma omp parallel for num_threads(num_threads) for (size_t i = 0; i < contigs.size(); ++i) { const std::string &contig = contigs[i].first; const std::vector &nodes_in_full = contigs[i].second; std::vector path(nodes_in_full.size()); - size_t j = 0; - // nodes in the query graph hull may overlap - graph->map_to_nodes(contig, [&](node_index node) { - path[j++] = node; - }); - assert(j == nodes_in_full.size()); + size_t begin = 0; + size_t end; + do { + end = std::find(nodes_in_full.begin() + begin, + nodes_in_full.end(), + DeBruijnGraph::npos) + - nodes_in_full.begin(); - #pragma omp critical - { - for (size_t j = 0; j < path.size(); ++j) { - if (nodes_in_full[j]) { - assert(path[j]); - from_full_to_small.emplace_back(nodes_in_full[j], path[j]); - } + if (begin != end) { + std::string_view segment(contig.data() + begin, end - begin + k - 1); + // nodes in the query graph hull may overlap + graph->map_to_nodes(segment, [&](node_index node) { + path[begin++] = node; + }); + } + begin = end + 1; + } while (end < nodes_in_full.size()); + + for (size_t j = 0; j < path.size(); ++j) { + if (nodes_in_full[j]) { + assert(path[j]); + from_full_to_small_map.try_emplace(nodes_in_full[j], path[j]); } } } - contigs = decltype(contigs)(); + std::vector>>().swap(contigs); + + auto from_full_to_small = to_vector(std::move(from_full_to_small_map)); for (auto &[first, second] : from_full_to_small) { assert(first && second); @@ -1076,8 +1087,16 @@ construct_query_graph(const DeBruijnGraph &full_dbg, second = AnnotatedDBG::graph_to_anno_index(second); } - timer.reset(); + return std::make_pair(std::move(graph), std::move(from_full_to_small)); +} +std::unique_ptr +construct_query_graph(const AnnotatedDBG::Annotator &full_annotation, + std::vector>&& from_full_to_small, + std::shared_ptr graph, + const Config &config, + size_t num_threads) { + Timer timer; // initialize fast query annotation // copy annotations from the full graph to the query graph auto annotation = slice_annotation(full_annotation, @@ -1094,6 +1113,29 @@ construct_query_graph(const DeBruijnGraph &full_dbg, return std::make_unique(graph, std::move(annotation)); } +std::unique_ptr +construct_query_graph(const graph::DeBruijnGraph &full_dbg, + const std::function &get_annotation, + StringGenerator call_sequences, + size_t num_threads, + const Config &config) { + size_t sub_k; + auto contigs = construct_contigs(full_dbg, call_sequences, num_threads, config, &sub_k).first; + auto [query_graph, from_full_to_small] = construct_query_graph( + std::move(contigs), num_threads, full_dbg.get_k(), full_dbg.get_mode(), sub_k + ); + return construct_query_graph(*get_annotation(), std::move(from_full_to_small), + std::move(query_graph), config, num_threads); +} + +size_t batched_query_fasta(const std::string &file, + const std::function &callback, + const Config &config, + const graph::align::DBGAlignerConfig *aligner_config, + const DeBruijnGraph &graph, + const std::function &get_annotation); + +void check_annotation(const Config &config, const annot::matrix::BinaryMatrix &anno_matrix); int query_graph(Config *config) { assert(config); @@ -1102,11 +1144,9 @@ int query_graph(Config *config) { assert(config->infbase_annotators.size() == 1); - // Load graph and annotation in parallel. auto loaded = load_graph_with_async_annotation(*config); - auto graph = loaded.first.get(); - auto anno_graph = loaded.second.get(); - assert(anno_graph); // guaranteed by the assertion above + auto graph_future = std::move(loaded.first); + auto anno_dbg_future = std::move(loaded.second); std::unique_ptr aligner_config; if (config->align_sequences) { @@ -1114,33 +1154,78 @@ int query_graph(Config *config) { && "only the best alignment is used in query"); aligner_config.reset(new align::DBGAlignerConfig( - initialize_aligner_config(*config, *graph) + initialize_aligner_config(*config, *graph_future.get()) )); } - graph.reset(); - size_t k = anno_graph->get_graph().get_k(); + // Callback, which captures the config pointer and the k-mer length. + size_t k = 0; + auto query_callback = [config, &k](const SeqSearchResult &result) { + if (config->output_json) { + std::ostringstream ss; + ss << result.to_json(config->verbose_output + || !(config->query_mode == COUNTS || config->query_mode == COORDS), + k) << "\n"; + std::cout << ss.str(); + } else { + std::cout << result.to_string(config->anno_labels_delimiter, + config->suppress_unlabeled, + config->verbose_output + || !(config->query_mode == COUNTS || config->query_mode == COORDS), + k) + "\n"; + } + }; + + // Type info from a stub annotation — lets us decide batched mode and + // validate the annotation type without waiting for the full data to load. + auto temp_anno = initialize_annotation(config->infbase_annotators.at(0), *config, 0); + check_annotation(*config, temp_anno->get_matrix()); + bool coord_to_header_exists = false; + if (dynamic_cast(&temp_anno->get_matrix()) + && !config->no_coord_mapping) { + coord_to_header_exists = std::filesystem::exists( + utils::remove_suffix(config->infbase_annotators.at(0), temp_anno->file_extension()) + + annot::CoordToHeader::kExtension + ); + } + // In batched mode the annotation is fetched lazily inside batched_query_fasta; + // queries can begin while it's still loading. + const bool use_batched = files.size() == 1 && config->query_batch_size + && config->query_mode != COORDS && !coord_to_header_exists; + + std::shared_ptr graph; + std::unique_ptr anno_graph; + std::shared_future> anno_dbg_shared; + + if (use_batched) { + graph = graph_future.get(); + if (graph->get_mode() == DeBruijnGraph::PRIMARY) { + graph = std::make_shared(graph); + logger->trace("Primary graph wrapped into canonical"); + } + anno_dbg_shared = anno_dbg_future.share(); + } else { + anno_graph = anno_dbg_future.get(); + graph = graph_future.get(); + } + k = graph->get_k(); // iterate over input files for (const auto &file : files) { + logger->trace("Parsing sequences from file '{}'", file); Timer curr_timer; + size_t num_bp = 0; + + if (use_batched) { + num_bp = batched_query_fasta(file, query_callback, *config, aligner_config.get(), + *graph, + [&anno_dbg_shared]() { + return &anno_dbg_shared.get()->get_annotator(); + }); + } else { + num_bp = query_fasta(file, query_callback, *config, *anno_graph, aligner_config.get()); + } - auto query_callback = [config, k](const SeqSearchResult &result) { - if (config->output_json) { - std::ostringstream ss; - ss << result.to_json(config->verbose_output - || !(config->query_mode == COUNTS || config->query_mode == COORDS), - k) << "\n"; - std::cout << ss.str(); - } else { - std::cout << result.to_string(config->anno_labels_delimiter, - config->suppress_unlabeled, - config->verbose_output - || !(config->query_mode == COUNTS || config->query_mode == COORDS), - k) + "\n"; - } - }; - size_t num_bp = query_fasta(file, query_callback, *config, *anno_graph, aligner_config.get()); auto time = curr_timer.elapsed(); logger->trace("File '{}' with {} base pairs was processed in {:.3f} sec, throughput: {:.1f} bp/s", file, num_bp, time, (double)num_bp / time); @@ -1210,25 +1295,18 @@ SeqSearchResult query_sequence(QuerySequence&& sequence, } -size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, +size_t batched_query_fasta(const std::string &file, const std::function &callback, const Config &config, - const graph::AnnotatedDBG &anno_graph, - const graph::align::DBGAlignerConfig *aligner_config); - -size_t query_fasta(const std::string &file, - const std::function &callback, - const Config &config, - const graph::AnnotatedDBG &anno_graph, - const graph::align::DBGAlignerConfig *aligner_config) { - logger->trace("Parsing sequences from file '{}'", file); - - seq_io::FastaParser fasta_parser(file, config.forward_and_reverse); + const graph::align::DBGAlignerConfig *aligner_config, + const DeBruijnGraph &graph, + const std::function &get_annotation); +// Check the the annotation matrix stores the data required for the query mode +void check_annotation(const Config &config, const annot::matrix::BinaryMatrix &anno_matrix) { // Only query_coords/count_kmers if using coord/count aware index. if (config.query_mode == COORDS - && !dynamic_cast( - &anno_graph.get_annotator().get_matrix())) { + && !dynamic_cast(&anno_matrix)) { logger->error("Annotation does not support k-mer coordinate queries. " "First transform this annotation to include coordinate data " "(e.g., {}, {}, {}, {}, {}).", @@ -1241,8 +1319,7 @@ size_t query_fasta(const std::string &file, } if ((config.query_mode == COUNTS || config.query_mode == COUNTS_SUM) - && !dynamic_cast( - &anno_graph.get_annotator().get_matrix())) { + && !dynamic_cast(&anno_matrix)) { logger->error("Annotation does not support k-mer count queries. " "First transform this annotation to include count data " "(e.g., {}, {}, {}).", @@ -1251,17 +1328,30 @@ size_t query_fasta(const std::string &file, Config::annotype_to_string(Config::IntRowDiffBRWT)); exit(1); } +} + +size_t query_fasta(const std::string &file, + const std::function &callback, + const Config &config, + const graph::AnnotatedDBG &anno_graph, + const graph::align::DBGAlignerConfig *aligner_config) { + check_annotation(config, anno_graph.get_annotator().get_matrix()); if (config.query_batch_size) { if (config.query_mode != COORDS && !anno_graph.get_coord_to_header()) { // Construct a query graph and query against it - return batched_query_fasta(fasta_parser, callback, config, anno_graph, aligner_config); + return batched_query_fasta(file, callback, config, aligner_config, + anno_graph.get_graph(), + [&]() { return &anno_graph.get_annotator(); }); } else { // TODO: Implement batch mode for query_coords queries logger->warn("Querying coordinates in batch mode is currently not supported. Querying sequentially..."); } } + seq_io::FastaParser fasta_parser(file, config.forward_and_reverse); + logger->trace("Parsing sequences from file '{}'", file); + // Query sequences independently size_t seq_count = 0; size_t num_bp = 0; @@ -1270,8 +1360,7 @@ size_t query_fasta(const std::string &file, for (const seq_io::kseq_t &kseq : fasta_parser) { thread_pool.enqueue([&](QuerySequence &sequence) { // Callback with the SeqSearchResult - callback(query_sequence(std::move(sequence), anno_graph, - config, aligner_config)); + callback(query_sequence(std::move(sequence), anno_graph, config, aligner_config)); }, QuerySequence { seq_count++, std::string(kseq.name.s), std::string(kseq.seq.s) }); num_bp += kseq.seq.l; } @@ -1281,11 +1370,18 @@ size_t query_fasta(const std::string &file, return num_bp; } -size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, +size_t batched_query_fasta(const std::string &file, const std::function &callback, const Config &config, - const graph::AnnotatedDBG &anno_graph, - const graph::align::DBGAlignerConfig *aligner_config) { + const graph::align::DBGAlignerConfig *aligner_config, + const DeBruijnGraph &graph, + const std::function &get_annotation) { + assert(graph.get_mode() != DeBruijnGraph::PRIMARY + && "Primary graphs must be wrapped into canonical"); + + seq_io::FastaParser fasta_parser(file, config.forward_and_reverse); + logger->trace("Parsing sequences from file '{}'", file); + auto it = fasta_parser.begin(); auto end = fasta_parser.end(); @@ -1297,6 +1393,15 @@ size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, size_t parallel_each = std::max(1, config.parallel_each); size_t threads_per_batch = std::max(1, get_num_threads() / parallel_each); omp_set_max_active_levels(2); + + std::mutex mu; + std::vector seq_chunk; + std::vector>> contigs_chunk; + size_t num_kmer_matches_chunk = 0; + uint64_t num_bytes_read_chunk = 0; + + std::mutex query_mu; + #pragma omp parallel num_threads(parallel_each) #pragma omp single while (it != end) { @@ -1311,11 +1416,11 @@ size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, num_bytes_read += it->seq.l; } - auto *seq_batch_p = seq_batch.release(); + std::vector *seq_batch_p = seq_batch.release(); - #pragma omp task firstprivate(seq_batch_p, num_bytes_read) shared(callback) - { - std::unique_ptr> seq_batch(seq_batch_p); + auto query_batch = [&](std::unique_ptr> seq_batch, + uint64_t num_bytes_read, + const auto &callback) { Timer batch_timer; std::vector alignments_batch; // Align sequences ahead of time on full graph if we don't have batch_align @@ -1324,33 +1429,76 @@ size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, logger->trace("Aligning sequences from batch against the full graph..."); batch_timer.reset(); - #pragma omp parallel for num_threads(threads_per_batch) schedule(dynamic) + #pragma omp parallel for num_threads(threads_per_batch) schedule(dynamic, 10) for (size_t i = 0; i < seq_batch->size(); ++i) { // Set alignment for this seq_batch alignments_batch[i] = align_sequence(&(*seq_batch)[i].sequence, - anno_graph.get_graph(), *aligner_config); + graph, *aligner_config); } logger->trace("Sequences alignment took {} sec", batch_timer.elapsed()); batch_timer.reset(); } // Construct the query graph for this batch - auto query_graph = construct_query_graph( - anno_graph.get_graph(), - anno_graph.get_annotator(), - [&seq_batch](auto callback) { - for (const auto &seq : *seq_batch) { - callback(seq.sequence); - } - }, - threads_per_batch, - config + auto call_seqs = [&seq_batch](auto callback) { + for (const QuerySequence &seq : *seq_batch) { + callback(seq.sequence); + } + }; + size_t sub_k; + auto [contigs, num_kmer_matches] = construct_contigs( + graph, call_seqs, threads_per_batch, config, &sub_k ); + // we accumulate batches until a certain number of k-mer matches to make query graph large enough + { + std::lock_guard lock(mu); + contigs_chunk.insert(contigs_chunk.end(), + std::make_move_iterator(contigs.begin()), + std::make_move_iterator(contigs.end())); + contigs.clear(); + seq_chunk.insert(seq_chunk.end(), + std::make_move_iterator(seq_batch->begin()), + std::make_move_iterator(seq_batch->end())); + seq_batch->clear(); + num_kmer_matches_chunk += num_kmer_matches; + num_kmer_matches = 0; + num_bytes_read_chunk += num_bytes_read; + num_bytes_read = 0; + // if the current batch is too small and there are more batches in the queue, go to next + if (num_kmer_matches_chunk < batch_size * config.batch_min_matches) + return; + std::swap(contigs, contigs_chunk); + std::swap(*seq_batch, seq_chunk); + std::swap(num_kmer_matches, num_kmer_matches_chunk); + std::swap(num_bytes_read, num_bytes_read_chunk); + // seq_chunk, contigs_chunk, and num_kmer_matches_chunk are now reset + } + + size_t contigs_total_bp = 0; + size_t contigs_total_kmers = 0; + for (size_t i = 0; i < contigs.size(); ++i) { + contigs_total_bp += contigs[i].first.length(); + contigs_total_kmers += contigs[i].second.size(); + } + + // work with seq_batch, contigs + auto [small_graph, from_full_to_small] = construct_query_graph( + std::move(contigs), threads_per_batch, graph.get_k(), graph.get_mode(), sub_k + ); + + std::lock_guard query_lock(query_mu); + + auto query_graph = construct_query_graph(*get_annotation(), + std::move(from_full_to_small), + std::move(small_graph), + config, + threads_per_batch); + auto query_graph_construction = batch_timer.elapsed(); batch_timer.reset(); - #pragma omp parallel for num_threads(threads_per_batch) schedule(dynamic) + #pragma omp parallel for num_threads(threads_per_batch) schedule(dynamic, 10) for (size_t i = 0; i < seq_batch->size(); ++i) { SeqSearchResult search_result = query_sequence(std::move((*seq_batch)[i]), *query_graph, config, @@ -1364,15 +1512,23 @@ size_t batched_query_fasta(seq_io::FastaParser &fasta_parser, auto query_time = batch_timer.elapsed(); logger->trace("Batch of {} bp from '{}': Query graph constructed in {:.5f} sec," - " redundancy: {:.2f} bp/kmer," + " redundancy: {:.2f} bp/kmer, total bp/k-mers in contigs: {}/{}," " queried with {} threads in {:.5f} sec. Batch query time: {:.5f} sec, {:.1f} bp/s", num_bytes_read, fasta_parser.get_filename(), query_graph_construction, (double)num_bytes_read / query_graph->get_graph().num_nodes(), + contigs_total_bp, contigs_total_kmers, threads_per_batch, query_time, query_graph_construction + query_time, num_bytes_read / (query_graph_construction + query_time)); + }; + + #pragma omp task firstprivate(seq_batch_p, num_bytes_read) shared(callback) + { + query_batch(std::unique_ptr>(seq_batch_p), + num_bytes_read, callback); } + num_bp += num_bytes_read; }