Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c544fce
Support lazy loading of annotation columns in differential assembly
hmusta Aug 8, 2022
2f0073c
fix
hmusta Aug 8, 2022
6e62c3d
minor fix
hmusta Aug 8, 2022
a9571c4
patch
hmusta Aug 9, 2022
069daca
Workaround for bug in g++-8.2
hmusta Aug 9, 2022
d91fe84
Merge remote-tracking branch 'origin/hm/workaround' into hm/stream_co…
hmusta Aug 9, 2022
1d14f5e
Support streaming of annotation columns in differential assembly (#424)
hmusta Aug 9, 2022
cecde78
better parallelism with multicolumn files
hmusta Aug 10, 2022
4c8faaf
limit number of columns to read at once
hmusta Aug 10, 2022
be77d39
check if config exists before loading graph
hmusta Aug 10, 2022
4eb1a27
more log messages
hmusta Aug 10, 2022
44175dc
better parallel
hmusta Aug 10, 2022
5a49464
optimization
hmusta Aug 10, 2022
13713d4
only take reverse complements if needed
hmusta Aug 10, 2022
120f3ae
refactor
hmusta Aug 11, 2022
8acf605
minor
hmusta Aug 11, 2022
b8ab7c9
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
karasikov Aug 30, 2022
ee19dc2
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
Jan 25, 2023
44854ba
cleanup
Jan 30, 2023
975e7a6
Merge branch 'master' into hm/stream_columns_try1
Jan 30, 2023
f8cd051
cleanup
Jan 30, 2023
6c31312
cleanup
Jan 30, 2023
f2c3d4e
fixes
Jan 30, 2023
e34b905
fix
Jan 30, 2023
5c7a787
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
Feb 1, 2023
9a41c07
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
hmusta Feb 24, 2023
04fff02
refactor to allow for streaming columns
hmusta Feb 27, 2023
4229b1b
minor
hmusta Feb 27, 2023
eee5a10
cleanup
hmusta Feb 27, 2023
08c7ce5
fix other check
hmusta Feb 27, 2023
453c015
only load files that have in- or out-labels
hmusta Feb 27, 2023
0c175e9
handle other labels when loading separate columns
hmusta Feb 27, 2023
b300687
minor
hmusta Feb 27, 2023
14cb478
fix compile on clang
hmusta Feb 28, 2023
cf97623
cleanup
hmusta Feb 28, 2023
3f0df76
remove redundant tests
hmusta Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "binary_matrix.hpp"

#include <ips4o.hpp>
#include <progress_bar.hpp>

#include "common/vectors/bitmap.hpp"
#include "common/serialization.hpp"
Expand Down Expand Up @@ -38,11 +39,14 @@ BinaryMatrix::slice_rows(const std::vector<Row> &row_ids) const {
}

void BinaryMatrix::call_columns(const std::vector<Column> &column_ids,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads) const {
ProgressBar progress_bar(column_ids.size(), "Parsing columns",
std::cerr, !common::get_verbose());
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t k = 0; k < column_ids.size(); ++k) {
callback(k, bitmap_generator(get_column(column_ids[k]), num_rows()));
++progress_bar;
}
}

Expand Down
4 changes: 2 additions & 2 deletions metagraph/src/annotation/binary_matrix/base/binary_matrix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "common/vector.hpp"


class bitmap;
class bitmap_generator;

namespace mtg {
namespace annot {
Expand Down Expand Up @@ -41,7 +41,7 @@ class BinaryMatrix {
// For each column id in columns, run callback on its respective index in columns
// and a bitmap represnting the column
virtual void call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads = 1) const;

virtual bool load(std::istream &in) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ ColumnMajor::slice_rows(const std::vector<Row> &row_ids) const {
}

void ColumnMajor::call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads) const {
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t j = 0; j < columns.size(); ++j) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ColumnMajor : public BinaryMatrix {
std::vector<Column> slice_rows(const std::vector<Row> &rows) const override;

void call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads = 1) const override;

bool load(std::istream &in) override;
Expand Down
165 changes: 112 additions & 53 deletions metagraph/src/cli/assemble.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "seq_io/sequence_io.hpp"
#include "config/config.hpp"
#include "load/load_graph.hpp"
#include "load/load_annotation.hpp"
#include "load/load_annotated_graph.hpp"
#include "graph/annotated_graph_algorithm.hpp"
#include "graph/representation/masked_graph.hpp"
Expand Down Expand Up @@ -41,10 +42,8 @@ void check_labels(const tsl::hopscotch_set<std::string> &label_set,
exit(1);
}

DifferentialAssemblyConfig diff_assembly_config(const Json::Value &experiment,
const DeBruijnGraph &graph) {
DifferentialAssemblyConfig diff_assembly_config(const Json::Value &experiment) {
DifferentialAssemblyConfig diff_config;
diff_config.add_complement = graph.get_mode() == DeBruijnGraph::CANONICAL;
diff_config.label_mask_in_kmer_fraction = experiment.get("in_min_fraction", 1.0).asDouble();
diff_config.label_mask_in_unitig_fraction = experiment.get("unitig_in_min_fraction", 0.0).asDouble();
diff_config.label_mask_out_kmer_fraction = experiment.get("out_max_fraction", 0.0).asDouble();
Expand Down Expand Up @@ -81,16 +80,11 @@ void call_masked_graphs(const AnnotatedDBG &anno_graph,
Json::Value diff_json;
fin >> diff_json;

tsl::hopscotch_set<std::string> foreground_labels;
tsl::hopscotch_set<std::string> background_labels;
tsl::hopscotch_set<std::string> shared_foreground_labels;
tsl::hopscotch_set<std::string> shared_background_labels;

for (const Json::Value &group : diff_json["groups"]) {
if (group["shared_labels"]) {
shared_foreground_labels.clear();
shared_background_labels.clear();
tsl::hopscotch_set<std::string> shared_foreground_labels;
tsl::hopscotch_set<std::string> shared_background_labels;

if (group["shared_labels"]) {
for (const Json::Value &in_label : group["shared_labels"]["in"]) {
shared_foreground_labels.emplace(in_label.asString());
}
Expand All @@ -107,12 +101,13 @@ void call_masked_graphs(const AnnotatedDBG &anno_graph,
throw std::runtime_error("Missing experiments in group");

for (const Json::Value &experiment : group["experiments"]) {
DifferentialAssemblyConfig diff_config = diff_assembly_config(
experiment, anno_graph.get_graph()
);
tsl::hopscotch_set<std::string> foreground_labels;
tsl::hopscotch_set<std::string> background_labels;

foreground_labels.clear();
background_labels.clear();
DifferentialAssemblyConfig diff_config = diff_assembly_config(experiment);

std::string exp_name = experiment["name"].asString()
+ (config->enumerate_out_sequences ? "." : "");

for (const Json::Value &in_label : experiment["in"]) {
foreground_labels.emplace(in_label.asString());
Expand All @@ -125,27 +120,90 @@ void call_masked_graphs(const AnnotatedDBG &anno_graph,
check_labels(foreground_labels, anno_graph);
check_labels(background_labels, anno_graph);

std::string exp_name = experiment["name"].asString()
+ (config->enumerate_out_sequences ? "." : "");

logger->trace("Running assembly: {}", exp_name);

callback(*mask_nodes_by_label(anno_graph,
foreground_labels,
background_labels,
shared_foreground_labels,
shared_background_labels,
diff_config, num_threads), exp_name);
diff_config, num_threads,
config->parallel_nodes), exp_name);
}
}
}

void call_masked_graphs(std::shared_ptr<const DeBruijnGraph> graph_ptr,
Config *config,
const CallMaskedGraphHeader &callback) {
assert(!config->assembly_config_file.empty());

std::ifstream fin(config->assembly_config_file);
if (!fin.good())
throw std::iostream::failure("Failed to read assembly config JSON from " + config->assembly_config_file);

for (const auto &name : config->fnames) {
if (parse_annotation_type(name) != Config::ColumnCompressed) {
throw std::runtime_error("Multiple annotation files must be ColumnCompressed");
}
}

size_t num_threads = std::max(1u, get_num_threads());

Json::Value diff_json;
fin >> diff_json;

for (const Json::Value &group : diff_json["groups"]) {
if (group["shared_labels"]) {
throw std::runtime_error("Shared labels not supported with multiple annotation files");
}

if (!group["experiments"])
throw std::runtime_error("Missing experiments in group");

for (const Json::Value &experiment : group["experiments"]) {
tsl::hopscotch_set<std::string> foreground_labels;
tsl::hopscotch_set<std::string> background_labels;

DifferentialAssemblyConfig diff_config = diff_assembly_config(experiment);

std::string exp_name = experiment["name"].asString()
+ (config->enumerate_out_sequences ? "." : "");

for (const Json::Value &in_label : experiment["in"]) {
foreground_labels.emplace(in_label.asString());
}

for (const Json::Value &out_label : experiment["out"]) {
background_labels.emplace(out_label.asString());
}

logger->trace("Running assembly: {}", exp_name);

callback(*mask_nodes_by_label(graph_ptr,
config->fnames,
foreground_labels,
background_labels,
diff_config, num_threads,
config->parallel_nodes), exp_name);
}
}
}

int assemble(Config *config) {
assert(config);

if (config->infbase_annotators.size()) {
assert(config->assembly_config_file.size());
if (!std::filesystem::exists(config->assembly_config_file)) {
logger->error("Differential assembly config does not exist\n{}",
config->assembly_config_file);
exit(1);
}
}

const auto &files = config->fnames;

assert(files.size() == 1);
assert(config->outfbase.size());

Timer timer;
Expand All @@ -156,48 +214,49 @@ int assemble(Config *config) {
logger->trace("Graph loaded in {} sec", timer.elapsed());

if (config->infbase_annotators.size()) {
config->infbase = files.at(0);

assert(config->assembly_config_file.size());
auto anno_graph = initialize_annotated_dbg(graph, *config);

logger->trace("Generating masked graphs...");
std::mutex write_mutex;
size_t num_threads = std::max(1u, get_num_threads());

std::filesystem::remove(
utils::remove_suffix(config->outfbase, ".gz", ".fasta") + ".fasta.gz"
);

std::mutex write_mutex;

size_t num_threads = std::max(1u, get_num_threads());

call_masked_graphs(*anno_graph, config,
[&](const graph::MaskedDeBruijnGraph &graph, const std::string &header) {
seq_io::FastaWriter writer(config->outfbase, header,
config->enumerate_out_sequences,
num_threads > 1, /* async write */
"a" /* append mode */);

if (config->unitigs || config->min_tip_size > 1) {
graph.call_unitigs([&](const std::string &unitig, auto&&) {
std::lock_guard<std::mutex> lock(write_mutex);
writer.write(unitig);
},
num_threads, config->min_tip_size,
config->kmers_in_single_form);
} else {
graph.call_sequences([&](const std::string &seq, auto&&) {
std::lock_guard<std::mutex> lock(write_mutex);
writer.write(seq);
},
num_threads, config->kmers_in_single_form);
}
auto graph_callback = [&](const graph::MaskedDeBruijnGraph &graph, const std::string &header) {
seq_io::FastaWriter writer(config->outfbase, header,
config->enumerate_out_sequences,
num_threads > 1, /* async write */
"a" /* append mode */);

if (config->unitigs || config->min_tip_size > 1) {
graph.call_unitigs([&](const std::string &unitig, auto&&) {
std::lock_guard<std::mutex> lock(write_mutex);
writer.write(unitig);
},
num_threads, config->min_tip_size,
config->kmers_in_single_form);
} else {
graph.call_sequences([&](const std::string &seq, auto&&) {
std::lock_guard<std::mutex> lock(write_mutex);
writer.write(seq);
},
num_threads, config->kmers_in_single_form);
}
);

};

if (files.size() > 1) {
config->fnames.erase(config->fnames.begin(), config->fnames.begin() + 1);
call_masked_graphs(graph, config, graph_callback);
} else {
config->infbase = files.at(0);
auto anno_graph = initialize_annotated_dbg(graph, *config);
call_masked_graphs(*anno_graph, config, graph_callback);
}
return 0;
}

assert(files.size() == 1);

logger->trace("Extracting sequences from graph...");

timer.reset();
Expand Down
7 changes: 7 additions & 0 deletions metagraph/src/common/vectors/bitmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ uint64_t bitmap_lazy::num_set_bits() const {
return num_set_bits_;
}

std::optional<uint64_t> bitmap_lazy::try_get_num_set_bits() const {
if (num_set_bits_ == static_cast<size_t>(-1))
return std::nullopt;

return num_set_bits_;
}

void bitmap_lazy::add_to(sdsl::bit_vector *other) const {
assert(other);
assert(other->size() == size());
Expand Down
11 changes: 9 additions & 2 deletions metagraph/src/common/vectors/bitmap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class bitmap_lazy : public bitmap {

uint64_t size() const { return size_; }
uint64_t num_set_bits() const;
std::optional<uint64_t> try_get_num_set_bits() const;

void add_to(sdsl::bit_vector *other) const;
void call_ones_in_range(uint64_t begin, uint64_t end,
Expand All @@ -197,12 +198,18 @@ class bitmap_generator : public bitmap {
std::for_each(s.begin(), s.end(), callback);
}) {}

bitmap_generator(const bitmap &bits) noexcept
: size_(bits.size()), num_set_bits_(bits.num_set_bits()),
generator_([&bits](const VoidCall<uint64_t> &callback) {
bits.call_ones(callback);
}) {}

bitmap_generator(std::function<void(const VoidCall<uint64_t>&)>&& generator,
size_t size,
size_t num_set_bits = -1) noexcept;

bool operator[](uint64_t) const { throw std::runtime_error("Not implemented"); }
uint64_t get_int(uint64_t, uint32_t) const { throw std::runtime_error("Not implemented"); }
bool operator[](uint64_t) const { throw std::runtime_error("operator[] not implemented for bitmap_generator"); }
uint64_t get_int(uint64_t, uint32_t) const { throw std::runtime_error("get_int not implemented for bitmap_generator"); }

uint64_t size() const { return size_; }
uint64_t num_set_bits() const { return num_set_bits_; }
Expand Down
Loading