Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c544fce
Support lazy loading of annotation columns in differential assembly
hmusta Aug 8, 2022
2f0073c
fix
hmusta Aug 8, 2022
6e62c3d
minor fix
hmusta Aug 8, 2022
a9571c4
patch
hmusta Aug 9, 2022
069daca
Workaround for bug in g++-8.2
hmusta Aug 9, 2022
d91fe84
Merge remote-tracking branch 'origin/hm/workaround' into hm/stream_co…
hmusta Aug 9, 2022
1d14f5e
Support streaming of annotation columns in differential assembly (#424)
hmusta Aug 9, 2022
cecde78
better parallelism with multicolumn files
hmusta Aug 10, 2022
4c8faaf
limit number of columns to read at once
hmusta Aug 10, 2022
be77d39
check if config exists before loading graph
hmusta Aug 10, 2022
4eb1a27
more log messages
hmusta Aug 10, 2022
44175dc
better parallel
hmusta Aug 10, 2022
5a49464
optimization
hmusta Aug 10, 2022
13713d4
only take reverse complements if needed
hmusta Aug 10, 2022
120f3ae
refactor
hmusta Aug 11, 2022
8acf605
minor
hmusta Aug 11, 2022
b8ab7c9
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
karasikov Aug 30, 2022
ee19dc2
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
Jan 25, 2023
44854ba
cleanup
Jan 30, 2023
975e7a6
Merge branch 'master' into hm/stream_columns_try1
Jan 30, 2023
f8cd051
cleanup
Jan 30, 2023
6c31312
cleanup
Jan 30, 2023
f2c3d4e
fixes
Jan 30, 2023
e34b905
fix
Jan 30, 2023
5c7a787
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
Feb 1, 2023
9a41c07
Merge remote-tracking branch 'origin/master' into hm/stream_columns_try1
hmusta Feb 24, 2023
04fff02
refactor to allow for streaming columns
hmusta Feb 27, 2023
4229b1b
minor
hmusta Feb 27, 2023
eee5a10
cleanup
hmusta Feb 27, 2023
08c7ce5
fix other check
hmusta Feb 27, 2023
453c015
only load files that have in- or out-labels
hmusta Feb 27, 2023
0c175e9
handle other labels when loading separate columns
hmusta Feb 27, 2023
b300687
minor
hmusta Feb 27, 2023
14cb478
fix compile on clang
hmusta Feb 28, 2023
cf97623
cleanup
hmusta Feb 28, 2023
3f0df76
remove redundant tests
hmusta Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions metagraph/integration_tests/test_assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
}

GFAs = [name for name, _ in gfa_tests.items()]
MASKED = ['','--mask-dummy']
LOAD_TYPES = ['load', 'stream']

NUM_THREADS = 4

Expand Down Expand Up @@ -268,11 +268,13 @@ def check_suffix(anno_repr, suffix):
no_anchor_opt
)

def test_diff_assembly(self):
@parameterized.expand(LOAD_TYPES)
def test_diff_assembly(self, load_type):
assemble_command = f'{METAGRAPH} assemble -p {NUM_THREADS} \
-a {self.tempdir.name}/annotation{anno_file_extension[self.anno_repr]} \
-o {self.tempdir.name}/diff_contigs \
--diff-assembly-rules {TEST_DATA_DIR}/example.diff.json \
{"--separately" if load_type == "stream" else ""} \
{self.tempdir.name}/graph{graph_file_extension[self.graph_repr]}'
res = subprocess.run([assemble_command], shell=True)
self.assertEqual(res.returncode, 0)
Expand All @@ -298,11 +300,13 @@ def test_diff_assembly(self):
self.assertEqual(results['>metasub_by_kmer'][0], 'CTTGGATCACACTCTTCTCAGAGCCCAGGCCAGGGGCCCCCAAGAAAGGCTCTGGTGGAGAACCTGTGCATGAAGGCTGTCAACCAGTCCATAGGCAGGGCCATCAGGCACCAAAGGGATTCTGCCAGCATAGTGCTCCTGGACCAGTGATACACCCGGCACCCTGTCCTGGACATGCTGTTGGCCTGGATCTGAGCCCTCGTGGAGGTCAAAGCCACCTTTGGTTCTGCCATTGCTGCTGTGTGGAAGTTCACTCAAGTAGGCCTCTTCCTG')
self.assertEqual(results['>metasub_sym_diff'][0], 'TGGAAGTTCACTCAAGTAGGCCTCTTCCTGACAGGCAGCTGCACCACTGCCTGGCGCTGTGCCCTTCCTTTGCTCTGCCCGCTGGAGACGGTGTTTGTCATGGGCCTGGTCTGCAGG')

def test_diff_assembly_simple(self):
@parameterized.expand(LOAD_TYPES)
def test_diff_assembly_simple(self, load_type):
assemble_command = f'{METAGRAPH} assemble -p {NUM_THREADS} \
-a {self.tempdir.name}/annotation{anno_file_extension[self.anno_repr]} \
-o {self.tempdir.name}/diff_contigs \
--diff-assembly-rules {TEST_DATA_DIR}/example_simple.diff.json \
{"--separately" if load_type == "stream" else ""} \
{self.tempdir.name}/graph{graph_file_extension[self.graph_repr]}'
res = subprocess.run([assemble_command], shell=True)
self.assertEqual(res.returncode, 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "binary_matrix.hpp"

#include <ips4o.hpp>
#include <progress_bar.hpp>

#include "common/vectors/bitmap.hpp"
#include "common/serialization.hpp"
Expand Down Expand Up @@ -38,11 +39,14 @@ BinaryMatrix::slice_rows(const std::vector<Row> &row_ids) const {
}

void BinaryMatrix::call_columns(const std::vector<Column> &column_ids,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads) const {
ProgressBar progress_bar(column_ids.size(), "Parsing columns",
std::cerr, !common::get_verbose());
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t k = 0; k < column_ids.size(); ++k) {
callback(k, bitmap_generator(get_column(column_ids[k]), num_rows()));
++progress_bar;
}
}

Expand Down
4 changes: 2 additions & 2 deletions metagraph/src/annotation/binary_matrix/base/binary_matrix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "common/vector.hpp"


class bitmap;
class bitmap_generator;

namespace mtg {
namespace annot {
Expand Down Expand Up @@ -41,7 +41,7 @@ class BinaryMatrix {
// For each column id in columns, run callback on its respective index in columns
// and a bitmap represnting the column
virtual void call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads = 1) const;

virtual bool load(std::istream &in) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ ColumnMajor::slice_rows(const std::vector<Row> &row_ids) const {
}

void ColumnMajor::call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads) const {
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t j = 0; j < columns.size(); ++j) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ColumnMajor : public BinaryMatrix {
std::vector<Column> slice_rows(const std::vector<Row> &rows) const override;

void call_columns(const std::vector<Column> &columns,
const std::function<void(size_t, const bitmap&)> &callback,
const std::function<void(size_t, const bitmap_generator&)> &callback,
size_t num_threads = 1) const override;

bool load(std::istream &in) override;
Expand Down
27 changes: 25 additions & 2 deletions metagraph/src/annotation/representation/base/annotation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,21 @@
namespace mtg {
namespace annot {

template <typename IndexType, typename LabelType>
class Annotation {
public:
typedef std::function<void(size_t, const bitmap_generator&)> LabelIndexCallback;
virtual ~Annotation() {}

virtual uint64_t num_objects() const = 0;

virtual void call_label_objects(const LabelType &labels,
const LabelIndexCallback &callback,
size_t num_threads = 1) const = 0;
};

template <typename IndexType, typename LabelType>
class AnnotationCategory {
class AnnotationCategory : public Annotation<IndexType, LabelType> {
public:
virtual ~AnnotationCategory() {}

Expand Down Expand Up @@ -71,7 +83,6 @@ class MultiLabelAnnotation

/************************* Properties *************************/

virtual uint64_t num_objects() const = 0;
virtual size_t num_labels() const = 0;
virtual uint64_t num_relations() const = 0;
virtual const std::vector<Label>& get_all_labels() const = 0;
Expand Down Expand Up @@ -134,6 +145,7 @@ class MultiLabelEncoded : public MultiLabelAnnotation<uint64_t, LabelType> {
using Index = typename MultiLabelAnnotation<uint64_t, LabelType>::Index;
using Label = typename MultiLabelAnnotation<uint64_t, LabelType>::Label;
using VLabels = typename MultiLabelAnnotation<uint64_t, LabelType>::VLabels;
using LabelIndexCallback = typename Annotation<uint64_t, VLabels>::LabelIndexCallback;

virtual ~MultiLabelEncoded() {}

Expand Down Expand Up @@ -170,6 +182,17 @@ class MultiLabelEncoded : public MultiLabelAnnotation<uint64_t, LabelType> {

virtual const binmat::BinaryMatrix& get_matrix() const = 0;

virtual void call_label_objects(const VLabels &labels,
const LabelIndexCallback &callback,
size_t num_threads = 1) const override final {
std::vector<binmat::BinaryMatrix::Column> columns;
columns.reserve(labels.size());
for (const Label &label : labels) {
columns.emplace_back(label_encoder_.encode(label));
}
get_matrix().call_columns(columns, callback, num_threads);
}

protected:
LabelEncoder<Label> label_encoder_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,14 @@ ColumnCompressed<Label>::load_label_encoder(const std::string &filename) {
}

template <typename Label>
bool ColumnCompressed<Label>::merge_load(const std::vector<std::string> &filenames,
const ColumnCallback &callback,
size_t num_threads) {
bool ColumnCompressed<Label>
::merge_load(const std::vector<std::string> &filenames,
const ColumnCallback &callback,
size_t num_threads,
const std::optional<std::function<void(size_t)>> &num_columns_callback) {
if (!num_threads)
num_threads = 1;

std::atomic<bool> error_occurred = false;

std::vector<uint64_t> offsets(filenames.size(), 0);
Expand All @@ -618,45 +623,70 @@ bool ColumnCompressed<Label>::merge_load(const std::vector<std::string> &filenam
// compute global offsets (partial sums)
std::partial_sum(offsets.begin(), offsets.end(), offsets.begin());

// load annotations
#pragma omp parallel for num_threads(num_threads) schedule(dynamic)
for (size_t i = 0; i < filenames.size(); ++i) {
const auto &filename = make_suffix(filenames[i], kExtension);
logger->trace("Loading annotations from file {}", filename);
if (num_columns_callback) {
size_t total_count = offsets.back();
auto fname = make_suffix(filenames.back(), kExtension);
try {
std::ifstream in(filename, std::ios::binary);
if (!in.good())
throw std::ifstream::failure("can't open file");

const auto num_rows = load_number(in);
total_count += read_num_labels(fname);
} catch (...) {
logger->error("Can't load label encoder from {}", fname);
error_occurred = true;
}

LabelEncoder<Label> label_encoder_load;
if (!label_encoder_load.load(in))
throw std::ifstream::failure("can't load label encoder");
if (error_occurred)
return false;

if (!label_encoder_load.size())
logger->warn("No columns in {}", filename);
(*num_columns_callback)(total_count);
}

// update the existing and add some new columns
for (size_t c = 0; c < label_encoder_load.size(); ++c) {
auto new_column = std::make_unique<bit_vector_smart>();
// load annotations
#pragma omp parallel num_threads(num_threads) shared(error_occurred, filenames, offsets, logger)
#pragma omp single
{
for (size_t i = 0; i < filenames.size(); ++i) {
const auto &filename = make_suffix(filenames[i], kExtension);
logger->trace("Loading annotations from file {}", filename);
try {
std::ifstream in(filename, std::ios::binary);
if (!in.good())
throw std::ifstream::failure("can't open file");

const auto num_rows = load_number(in);

LabelEncoder<Label> label_encoder_load;
if (!label_encoder_load.load(in))
throw std::ifstream::failure("can't load label encoder");

if (!label_encoder_load.size())
logger->warn("No columns in {}", filename);

// update the existing and add some new columns
for (size_t c = 0; c < label_encoder_load.size(); ++c) {
auto *new_column = new bit_vector_smart();

if (!new_column->load(in)) {
delete new_column;
throw std::ifstream::failure("can't load next column");
}

if (!new_column->load(in))
throw std::ifstream::failure("can't load next column");
if (new_column->size() != num_rows) {
delete new_column;
throw std::ifstream::failure("inconsistent column size");
}

if (new_column->size() != num_rows)
throw std::ifstream::failure("inconsistent column size");
uint64_t offset = offsets[i] + c;
Label label = label_encoder_load.decode(c);

callback(offsets[i] + c,
label_encoder_load.decode(c),
std::move(new_column));
#pragma omp task firstprivate(new_column, label, offset)
callback(offset, label, std::unique_ptr<bit_vector_smart>(new_column));
}
} catch (const std::exception &e) {
logger->error("Caught exception when loading columns from {}: {}", filename, e.what());
error_occurred = true;
} catch (...) {
logger->error("Unknown exception when loading columns from {}", filename);
error_occurred = true;
}
} catch (const std::exception &e) {
logger->error("Caught exception when loading columns from {}: {}", filename, e.what());
error_occurred = true;
} catch (...) {
logger->error("Unknown exception when loading columns from {}", filename);
error_occurred = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class ColumnCompressed : public MultiLabelEncoded<Label> {
std::unique_ptr<bit_vector>&&)>;
static bool merge_load(const std::vector<std::string> &filenames,
const ColumnCallback &callback,
size_t num_threads = 1);
size_t num_threads = 1,
const std::optional<std::function<void(size_t)>> &num_columns_callback
= std::nullopt);
static size_t read_num_labels(const std::string &filename);
static LabelEncoder<Label> load_label_encoder(const std::string &filename);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#ifndef __COLUMN_COMPRESSED_LAZY_HPP__
#define __COLUMN_COMPRESSED_LAZY_HPP__

#include <tsl/hopscotch_map.h>
#include <progress_bar.hpp>

#include "annotation/representation/column_compressed/annotate_column_compressed.hpp"

namespace mtg {
namespace annot {

template <typename Label = std::string>
class ColumnCompressedLazy
: public Annotation<typename MultiLabelEncoded<Label>::Index,
typename MultiLabelEncoded<Label>::VLabels> {
public:
using Index = typename MultiLabelEncoded<Label>::Index;
using VLabels = typename MultiLabelEncoded<Label>::VLabels;
using LabelIndexCallback = typename MultiLabelEncoded<Label>::LabelIndexCallback;

ColumnCompressedLazy(size_t num_objects, const std::vector<std::string> &files)
: num_objects_(num_objects), files_(files) {}

uint64_t num_objects() const override final { return num_objects_; }

void call_label_objects(const VLabels &labels,
const LabelIndexCallback &callback,
size_t num_threads = 1) const override final {
tsl::hopscotch_map<Label, size_t> label_set;
for (size_t i = 0; i < labels.size(); ++i) {
label_set[labels[i]] = i;
}

std::unique_ptr<ProgressBar> progress_bar;
ColumnCompressed<Label>::merge_load(files_,
[&](size_t, const Label &label, auto&& bitmap) {
if (bitmap->size() != num_objects_) {
common::logger->error("Label {} has incorrect number of rows: {} != {}",
bitmap->size(), num_objects_);
exit(1);
}

auto find = label_set.find(label);
if (find != label_set.end())
callback(find->second, *bitmap);

++(*progress_bar);
},
num_threads,
[&progress_bar](size_t total_column_count) {
progress_bar = std::make_unique<ProgressBar>(
total_column_count, "Streaming columns",
std::cerr, !common::get_verbose()
);
}
);
}

const std::string& get_file(size_t i) const { return files_[i]; }

private:
size_t num_objects_;
const std::vector<std::string> &files_;
};

} // namespace annot
} // namespace mtg

#endif // __COLUMN_COMPRESSED_LAZY_HPP__
Loading