Skip to content
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8567bff
Intermediate
danieldanciu Sep 4, 2020
97294cd
First attempt
danieldanciu Sep 5, 2020
1f731ff
Running'
danieldanciu Sep 6, 2020
7e981ba
Clear checkpoint
danieldanciu Sep 6, 2020
b9fbc53
Small
danieldanciu Sep 7, 2020
d83368d
Add --phase
danieldanciu Sep 7, 2020
0b27107
Working checkpointing
danieldanciu Sep 8, 2020
7d53b52
Added functional tests
danieldanciu Sep 8, 2020
7476e7a
Small changes self review
danieldanciu Sep 8, 2020
fa07b12
Add integration tests for parallel building
danieldanciu Sep 9, 2020
8301c22
Remove forgotten optnone
danieldanciu Sep 10, 2020
71ac989
Minor rename
danieldanciu Sep 10, 2020
12e1ada
Support filesystem
danieldanciu Sep 10, 2020
da1aea6
Mor elogging
danieldanciu Sep 15, 2020
bc63e50
small
danieldanciu Sep 15, 2020
0756a42
small
danieldanciu Sep 15, 2020
a2e6c0f
small
danieldanciu Sep 15, 2020
15b42da
Don't clean up unmerged files
danieldanciu Sep 15, 2020
8a9121b
Don't clean up unmerged files in SortedSetDisk, so that computation c…
danieldanciu Sep 15, 2020
9fabdb5
10 chunks
danieldanciu Sep 15, 2020
a4fb98c
Wait for merging before stopping
danieldanciu Sep 17, 2020
be4d767
Small fix
danieldanciu Sep 17, 2020
89d1588
Actually wait for merge to happen
danieldanciu Sep 17, 2020
4c73921
Write checkpoint after phase1
danieldanciu Sep 18, 2020
6b0b9d9
Address review comments
danieldanciu Sep 19, 2020
caaf272
Address missed comments
danieldanciu Sep 19, 2020
5dfcee8
Merged phase
danieldanciu Sep 19, 2020
62ec669
First checkpoint, then delete
danieldanciu Sep 20, 2020
9cf55a9
First checkpoint, then delete
danieldanciu Sep 20, 2020
88748b9
Merge branch 'phase' into phase2
danieldanciu Oct 12, 2020
2792ab0
Added some logging
danieldanciu Oct 12, 2020
a40dea5
Merge branch 'phase2' into phase
danieldanciu Oct 12, 2020
7600830
Addressed review comments
danieldanciu Oct 12, 2020
c4c43ec
minor
danieldanciu Oct 12, 2020
007845f
merged with dev
danieldanciu Oct 18, 2020
866e786
Remove double declaration
danieldanciu Oct 19, 2020
d52ca9e
Merged with dev
danieldanciu Oct 22, 2020
5a3a8a3
Default to phase 3
danieldanciu Oct 22, 2020
ad6d965
Skip phase 2 if no rc's are generated
danieldanciu Oct 22, 2020
6493adb
Flush sorted set at end of phase
danieldanciu Oct 23, 2020
57c111c
Better temp dir
danieldanciu Oct 24, 2020
e6c0d95
Don't push kmers into queue if phase is < 3
danieldanciu Oct 24, 2020
7cd5b63
Merged with dev
danieldanciu Nov 23, 2020
80f1e48
Set checkpoint to 2 when RC's are not being generated
danieldanciu Nov 23, 2020
de1173c
Clean up temp files in SSD
danieldanciu Nov 23, 2020
fb0ad63
Merge remote-tracking branch 'origin/dev' into phase
danieldanciu Nov 24, 2020
5e42d10
Remove trace logs
danieldanciu Nov 24, 2020
d728edd
s/remove/remove_all
danieldanciu Nov 24, 2020
79d97e4
Merged with dev
danieldanciu Dec 14, 2020
03d5b2d
Simplify test_build_phase
danieldanciu Dec 14, 2020
e4aed78
Small fix in checkpoint continuation
danieldanciu Dec 14, 2020
82a9e8e
Verbose mode for test_build_phase
danieldanciu Dec 16, 2020
76c0d1b
A bit more debugging
danieldanciu Dec 16, 2020
2a5b52f
All trace logs
danieldanciu Dec 18, 2020
dc278db
Reset kmers when continuing from cp 1
danieldanciu Dec 20, 2020
98ad5da
Skip phase 2
danieldanciu Dec 20, 2020
8c953a6
Copy file names
danieldanciu Dec 20, 2020
c238b20
Acquire lock when flushing
danieldanciu Dec 20, 2020
0d399b7
Count orig/rc
danieldanciu Dec 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions metagraph/integration_tests/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,78 @@ def test_build_chunks_from_kmc_canonical(self, build):
self.assertEqual('nodes (k): 802920', params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])

@parameterized.expand(['succinct_disk'])
def test_build_phase(self, build):
representation, tmp_dir = build_params[build]

construct_command = '{exe} build --phase 1 --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint'))

construct_command = '{exe} build --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

res = self.__get_stats(self.tempdir.name + '/graph' + graph_file_extension[representation])
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): 1159851', params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])
self.assertFalse(os.path.isfile(self.tempdir.name + '/graph.checkpoint'))

# tests that we can build and resume 2 separate graphs on the same machine
@parameterized.expand(['succinct_disk'])
def test_build_phase_parallel(self, build):
representation, tmp_dir = build_params[build]

for name in ('graph1', 'graph2'):
construct_command = '{exe} build --phase 1 --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/' + name,
input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa')
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/' + name + '.checkpoint'))
Comment thread
danieldanciu marked this conversation as resolved.

for name in ('graph', 'graph2'):
construct_command = '{exe} build --mask-dummy --graph {repr} --canonical -k 20 ' \
'--disk-swap {tmp_dir} -o {outfile} {input}'.format(
exe=METAGRAPH,
repr=representation,
tmp_dir=tmp_dir,
outfile=self.tempdir.name + '/' + name,
input=TEST_DATA_DIR + ('/transcripts_1000.fa' if name == 'graph1' else '/transcripts_100.fa')
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

res = self.__get_stats(self.tempdir.name + '/graph' + graph_file_extension[representation])
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): ' + ('1159851' if name == 'graph1' else '91584'), params_str[1])
self.assertEqual('canonical mode: yes', params_str[2])
self.assertFalse(os.path.isfile(self.tempdir.name + '/' + name + '.checkpoint'))

if __name__ == '__main__':
unittest.main()
36 changes: 36 additions & 0 deletions metagraph/integration_tests/test_build_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,5 +373,41 @@ def test_kmer_count_width_large(self, build, k_width_result):
self.assertEqual('avg weight: {}'.format(avg_count_expected), params_str[4])


def test_build_phase(self):
construct_command = '{exe} build --phase 1 --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} ' \
'--count-width 16 -o {outfile} {input}'.format(
exe=METAGRAPH,
tmp_dir=self.tempdir.name,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)
self.assertTrue(os.path.isfile(self.tempdir.name + '/graph.checkpoint'))

construct_command = '{exe} build --mask-dummy -k 20 --count-kmers --disk-swap {tmp_dir} --count-width 16 ' \
'-o {outfile} {input}'.format(
exe=METAGRAPH,
tmp_dir=self.tempdir.name,
outfile=self.tempdir.name + '/graph',
input=TEST_DATA_DIR + '/transcripts_1000.fa'
)
res = subprocess.run([construct_command], shell=True)
self.assertEqual(res.returncode, 0)

stats_command = '{exe} stats {graph}'.format(
exe=METAGRAPH,
graph=self.tempdir.name + '/graph.dbg',
)
res = subprocess.run(stats_command.split(), stdout=PIPE)
self.assertEqual(res.returncode, 0)
params_str = res.stdout.decode().split('\n')[2:]
self.assertEqual('k: 20', params_str[0])
self.assertEqual('nodes (k): 591997', params_str[1])
self.assertEqual('canonical mode: no', params_str[2])
self.assertEqual('nnz weights: 591997', params_str[3])
self.assertEqual('avg weight: 2.48587', params_str[4])


if __name__ == '__main__':
unittest.main()
21 changes: 18 additions & 3 deletions metagraph/src/cli/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "graph/representation/bitmap/dbg_bitmap_construct.hpp"
#include "graph/representation/succinct/dbg_succinct.hpp"
#include "graph/representation/succinct/boss_construct.hpp"
#include "graph/representation/succinct/build_checkpoint.hpp"
#include "graph/graph_extensions/node_weights.hpp"
#include "config/config.hpp"
#include "parse_sequences.hpp"
Expand Down Expand Up @@ -105,6 +106,9 @@ int build_graph(Config *config) {
logger->info("k-mer suffix: '{}'", suffix);
}

bool checkpoint_enabled = !config->tmp_dir.empty() && suffixes.size() == 1;
boss::BuildCheckpoint checkpoint(checkpoint_enabled ? config->outfbase : "",
config->phase);
auto constructor = boss::IBOSSChunkConstructor::initialize(
boss_graph->get_k(),
config->canonical,
Expand All @@ -115,15 +119,25 @@ int build_graph(Config *config) {
config->tmp_dir.empty() ? kmer::ContainerType::VECTOR
: kmer::ContainerType::VECTOR_DISK,
config->tmp_dir,
config->disk_cap_bytes
config->disk_cap_bytes,
checkpoint
);

push_sequences(files, *config, timer, constructor.get());
if (checkpoint.checkpoint() == 0) {
push_sequences(files, *config, timer, constructor.get());
} else {
logger->info("Skipping parsing sequences from input file(s)");
}

boss::BOSS::Chunk *next_chunk = constructor->build_chunk();

if (checkpoint.phase() < 2) { // phase 1 stops after generating dummy k-mers
assert(next_chunk == nullptr);
Comment thread
danieldanciu marked this conversation as resolved.
return 0;
}
Comment thread
danieldanciu marked this conversation as resolved.

logger->trace("Graph chunk with {} k-mers was built in {} sec",
next_chunk->size() - 1, timer.elapsed());

Comment thread
danieldanciu marked this conversation as resolved.
if (config->suffix.size()) {
logger->info("Serialize the graph chunk for suffix '{}'...", suffix);
timer.reset();
Expand All @@ -140,6 +154,7 @@ int build_graph(Config *config) {
} else {
graph_data.reset(next_chunk);
}
checkpoint.remove_checkpoint();
}

assert(graph_data);
Expand Down
27 changes: 27 additions & 0 deletions metagraph/src/cli/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ Config::Config(int argc, char *argv[]) {
tmp_dir = get_value(i++);
} else if (!strcmp(argv[i], "--disk-cap-gb")) {
disk_cap_bytes = atoi(get_value(i++)) * 1e9;
} else if (!strcmp(argv[i], "--phase")) {
phase = atoi(get_value(i++));
} else if (argv[i][0] == '-') {
fprintf(stderr, "\nERROR: Unknown option %s\n\n", argv[i]);
print_usage(argv[0], identity);
Expand Down Expand Up @@ -521,6 +523,30 @@ Config::Config(int argc, char *argv[]) {
if (identity == COMPARE && fnames.size() != 2)
print_usage_and_exit = true;

if (identity != BUILD && phase != 2) {
std::cerr << "Error: Phases are only supported for building. Remove --phase"
<< std::endl;
print_usage_and_exit = true;
}
Comment thread
danieldanciu marked this conversation as resolved.
Outdated

if (identity != BUILD && phase > 2) {
std::cerr << "Error: Invalid phase value. Can be either 1 or 2." << std::endl;
print_usage_and_exit = true;
}

if (phase != 2 && tmp_dir.empty()) {
std::cerr << "Error: Phases are only supported for disk-based building. "
"Please set --disk-swap." << std::endl;
print_usage_and_exit = true;
}

if (phase != 2 && suffix_len > 0) {
std::cerr << "Error: Phases are not supported for multiple suffixes. Remove "
"--phase or specify each suffix separately using --suffix"
<< std::endl;
print_usage_and_exit = true;
}

if (discovery_fraction < 0 || discovery_fraction > 1)
print_usage_and_exit = true;

Expand Down Expand Up @@ -751,6 +777,7 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n");
fprintf(stderr, "\t --disk-swap [STR] \tdirectory to use for temporary files [off]\n");
fprintf(stderr, "\t --disk-cap-gb [INT] \tmax temp disk space to use before forcing a merge, in GB [20]\n");
fprintf(stderr, "\t --phase [INT] \tmax where to stop the computation (1=generate kmers, 2=build all) [2]\n");
} break;
case CLEAN: {
fprintf(stderr, "Usage: %s clean -o <outfile-base> [options] GRAPH\n\n", prog_name.c_str());
Expand Down
2 changes: 2 additions & 0 deletions metagraph/src/cli/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ class Config {

size_t disk_cap_bytes = 20e9; // 20GB default

uint32_t phase = 2; // build phase; 1 = generate kmers, 2 = complete build

enum IdentityType {
NO_IDENTITY = -1,
BUILD = 1,
Expand Down
15 changes: 8 additions & 7 deletions metagraph/src/common/elias_fano.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@
namespace mtg {
namespace common {

void concat(const std::vector<std::string> &files, const std::string &result) {
std::vector<std::string> concat(const std::vector<std::string> &files, const std::string &result) {
if (files.empty())
return;
return {};

std::vector<std::string> original_files;
Comment thread
danieldanciu marked this conversation as resolved.

std::vector<std::string> suffixes = { "", ".up" };
if (std::filesystem::exists(files[0] + ".count"))
suffixes.push_back(".count");

for (const std::string &suffix : suffixes) {
std::string concat_command = "cat ";
for (uint32_t i = 1; i < files.size(); ++i) {
for (uint32_t i = 0; i < files.size(); ++i) {
concat_command += files[i] + suffix + " ";
}
concat_command += " >> " + files[0] + suffix;

concat_command += " > " + result + suffix;
if (std::system(concat_command.c_str()))
throw std::runtime_error("Error while cat-ing files: " + concat_command);

std::filesystem::rename(files[0] + suffix, result + suffix);
for (const std::string &f : files) {
std::filesystem::remove(f + suffix);
original_files.push_back(f + suffix);
}
}
return original_files;
}

template <class T, class Enable = void>
Expand Down
2 changes: 1 addition & 1 deletion metagraph/src/common/elias_fano.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace common {
* The files store data that is ordered and the values in a file are smaller than the
* values in the next file.
*/
void concat(const std::vector<std::string> &files, const std::string &result);
std::vector<std::string> concat(const std::vector<std::string> &files, const std::string &result);

/**
* Elias-Fano encoder that streams the encoded result into a file.
Expand Down
12 changes: 7 additions & 5 deletions metagraph/src/common/sorted_sets/sorted_set_disk_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ std::vector<std::string> SortedSetDiskBase<T>::files_to_merge() {
}

template <typename T>
void SortedSetDiskBase<T>::clear(const std::filesystem::path &tmp_path) {
void SortedSetDiskBase<T>::clear(const std::filesystem::path &tmp_path, bool remove_files) {
std::unique_lock<std::mutex> exclusive_lock(mutex_);
std::unique_lock<std::shared_timed_mutex> multi_insert_lock(multi_insert_mutex_);
is_merging_ = false;
// remove the files that have not been requested to merge
for (const auto &chunk_file : get_file_names()) {
std::filesystem::remove(chunk_file);
if (remove_files) {
// remove the files that have not been requested to merge
for (const auto &chunk_file : get_file_names()) {
std::filesystem::remove(chunk_file);
}
}
chunk_count_ = 0;
l1_chunk_count_ = 0;
Expand All @@ -91,7 +93,7 @@ void SortedSetDiskBase<T>::start_merging_async() {
async_worker_.enqueue([file_names, this]() {
std::function<void(const T &)> on_new_item
= [this](const T &v) { merge_queue_.push(v); };
merge_files(file_names, on_new_item);
merge_files(file_names, on_new_item, false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is responsible for cleaning these files up? Does it mean that it never removes the old temp files until all the k-mers are sorted, and thus, the disk usage has grown a lot?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only files affected by this change are the original collected chunks. These will be indeed deleted after checkpoint 5 instead of after checkpoint 2.
Why not delete them now? Because if the program crashes between merge_queue_.shutdown() (so the merge queue still has elements to merge, but no more new elements will be added) and the queue being emptied, then we lose all data. The old chunks have been deleted, but the new merge is not yet ready.

The timespan between the files being deleted and the merge queue being emptied is quite short, so the probability of this happening is low, but at the same time I cannot leave this flaw in the code with a clear conscience.

Since recover_dummy_kmers in boss_chunk_construct doesn't receive a reference to this object, but only to the ChunkedWaitQueue it creates, it cannot even manually call clear() on it when the merge is done.

merge_queue_.shutdown();
});
}
Expand Down
8 changes: 3 additions & 5 deletions metagraph/src/common/sorted_sets/sorted_set_disk_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ class SortedSetDiskBase {
size_t max_disk_space_bytes);

virtual ~SortedSetDiskBase() {
// remove the files that have not been requested to merge
for (const auto &chunk_file : get_file_names()) {
std::filesystem::remove(chunk_file);
}
// not cleaning up unmerged chunk_*** files so that the computation can be resumed
// if building in phases or in case of a crash
async_worker_.join(); // make sure the data was processed
}

Expand All @@ -74,7 +72,7 @@ class SortedSetDiskBase {
* sorted set may be expensive when #data_ is large. In these cases, prefer calling
* #clear and re-using the buffer.
*/
void clear(const std::filesystem::path &tmp_path = "/tmp/");
void clear(const std::filesystem::path &tmp_path = "/tmp/", bool remove_files = true);

protected:
/** Advances #it by step or points to #end, whichever comes first. */
Expand Down
11 changes: 8 additions & 3 deletions metagraph/src/common/utils/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ void cleanup_tmp_dir_on_exit() {
}

std::filesystem::path create_temp_dir(std::filesystem::path path,
const std::string &name) {
const std::string &name,
bool clean_on_exit) {
if (path.empty())
path = "./";

Expand All @@ -48,6 +49,12 @@ std::filesystem::path create_temp_dir(std::filesystem::path path,
exit(1);
}

logger->trace("Created temporary directory {}", tmp_dir_str);

if (!clean_on_exit) {
return tmp_dir_str;
}

if (TMP_DIRS.empty()) {
if (std::signal(SIGINT, cleanup_tmp_dir_on_signal) == SIG_ERR)
logger->error("Couldn't reset the signal handler for SIGINT");
Expand All @@ -57,8 +64,6 @@ std::filesystem::path create_temp_dir(std::filesystem::path path,
logger->error("Couldn't reset the atexit handler");
}

logger->trace("Registered temporary directory {}", tmp_dir_str);

static std::mutex mu;
std::lock_guard<std::mutex> lock(mu);

Expand Down
3 changes: 2 additions & 1 deletion metagraph/src/common/utils/file_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
namespace utils {

std::filesystem::path create_temp_dir(std::filesystem::path path,
const std::string &name = "");
const std::string &name = "",
bool clean_on_exit = true);


bool check_if_writable(const std::string &filename);
Expand Down
Loading