Skip to content
Snippets Groups Projects
Commit dfeee5fb authored by Recolic Keghart's avatar Recolic Keghart
Browse files

finished. extremely slow. not sure that's because vector_on_hash or randomized...

finished. extremely slow. not sure that's because vector_on_hash or randomized algorithm(extremely success rate)
parent 29dfce26
No related branches found
No related tags found
No related merge requests found
Pipeline #25 failed with stages
in 2 minutes and 12 seconds
......@@ -55,7 +55,11 @@ public:
std::pair<bool, value_type> get(const key_type &k) const {
auto target_rank = get_rank_for_hash(get_hash_for_ele(k));
if(my_rank == target_rank) {
return std::make_pair(true, *do_get(k));
auto res = do_get(k, true);
if(res == nullptr)
return std::make_pair(false, value_type{});
else
return std::make_pair(true, *res);
}
else {
auto res = upcxx::rpc(target_rank, std::bind(&this_type::do_rpc_get, this, k)).wait();
......@@ -74,8 +78,13 @@ public:
}
std::pair<bool, value_type> get_if_is_mine(const key_type &k) {
auto target_rank = get_rank_for_hash(get_hash_for_ele(k));
if(my_rank == target_rank)
return std::make_pair(true, *do_get(k));
if(my_rank == target_rank) {
auto res = do_get(k, true);
if(res == nullptr)
return std::make_pair(false, value_type{});
else
return std::make_pair(true, *res);
}
else
return std::make_pair(false, value_type{});
}
......
#ifndef R267_GLOBAL_ALLOC_
#define R267_GLOBAL_ALLOC_
// See StackOverflow replies to this answer for important commentary about inheriting from std::allocator before replicating this code.
template <typename T>
class mmap_allocator: public std::allocator<T>
{
public:
typedef size_t size_type;
typedef T* pointer;
typedef const T* const_pointer;
template<typename _Tp1>
struct rebind
{
typedef mmap_allocator<_Tp1> other;
};
pointer allocate(size_type size, const void *hint=0)
{
void *ptr = create_shared_memory(size + sizeof(size));
size_t *size_ptr = reinterpret_cast<size_t *>(ptr);
*size_ptr = size;
return (pointer)++size_ptr;
}
void deallocate(pointer ptr, size_type n)
{
size_t *size_ptr = reinterpret_cast<size_t *>(ptr);
--size_ptr;
auto res = munmap(size_ptr, *size_ptr);
if(res == -1)
throw std::runtime_error("munmap failed. system error: {}"_format(strerror(errno)));
}
mmap_allocator() throw(): std::allocator<T>() {}
mmap_allocator(const mmap_allocator &a) throw(): std::allocator<T>(a) { }
template <class U>
mmap_allocator(const mmap_allocator<U> &a) throw(): std::allocator<T>(a) { }
~mmap_allocator() throw() { }
};
#endif
......@@ -11,6 +11,7 @@
#include "kmer_t.hpp"
#include "read_kmers.hpp"
#include "hash_map.hpp"
#include "upcxx_dist_vector.hpp"
#include "butil.hpp"
......@@ -87,30 +88,38 @@ int main(int argc, char **argv) {
auto start_read = std::chrono::high_resolution_clock::now();
std::list <std::list <kmer_pair>> contigs;//assume it is global addressed
if (upcxx::rank_me()==0){
for (const auto &start_kmer : start_nodes) {
std::list <kmer_pair> contig;
contig.push_back(start_kmer);
contigs.push_back(contig);
//std::list <std::vector <kmer_pair>> contigs;//assume it is global addressed
upcxx_matrix<kmer_pair> contigs(hash_table_size);
size_t contigs_size = start_nodes.size();
if (upcxx::rank_me()==0) {
//contigs.set_rows(start_nodes.size());
for(auto cter = 0; cter < start_nodes.size(); ++cter) {
contigs.push_to_row(cter, start_nodes[cter]);
}
//for (auto &&start_kmer : start_nodes) {
//contigs.push_back(std::vector<kmer_pair>{start_kmer});
//}
//now broadcast contigs, and broadcast the number of contig inside contigs to every processor
}
// the following will be done by every processor
bool all_done = false;
while (all_done==false){
for (int num=0;num<num_contig;num++){
check_if_this_contig_ends_with_F//update all_done
fetch_last_kmer_if_needed(contigs[i],upcxx::rank_me()).wait();
while (all_done == false) {
all_done = true;
for (auto row = 0; row < contigs_size; ++row){
kmer_pair this_contig_end = contigs.back_of_row(row);
if(this_contig_end.forwardExt() == 'F')
continue; // already done.
else
all_done = false;
kmer_pair next;
bool isMine = hashmap.find(this_contig_end.next_kmer(), next);
// check_if_this_contig_ends_with_F; //update all_done
//fetch_last_kmer_if_needed(contigs[i],upcxx::rank_me()).wait();
//return success,and contigs[i].back().next_kmer()
if (fetch_last_kmer_if_needed.success==true){
kmer_pair kmer;
bool success = hashmap.find(fetch_last_kmer_if_needed.next_kmer, kmer);
if (!success) {
//panic!
}
remotely_push_kmer_into_contig(contigs[i],kmer)
if (isMine){
contigs.push_to_row(row, next);
//remotely_push_kmer_into_contig(contigs[i],kmer);
//contig.push_back(kmer);
}
}
......@@ -125,10 +134,14 @@ int main(int argc, char **argv) {
std::chrono::duration <double> insert = end_insert - start;
std::chrono::duration <double> total = end - start;
int numKmers = std::accumulate(contigs.begin(), contigs.end(), 0,
[] (int sum, const std::list <kmer_pair> &contig) {
return sum + contig.size();
});
int numKmers = 0;
for(auto cter = 0; cter < contigs_size; ++cter) {
numKmers += contigs.get_cols_of_row(cter);
}
//int numKmers = std::accumulate(contigs.begin(), contigs.end(), 0,
// [] (int sum, const std::list <kmer_pair> &contig) {
// return sum + contig.size();
// });
if (run_type != "test") {
BUtil::print("Assembled in %lf total\n", total.count());
......@@ -136,14 +149,19 @@ int main(int argc, char **argv) {
if (run_type == "verbose") {
printf("Rank %d reconstructed %d contigs with %d nodes from %d start nodes."
" (%lf read, %lf insert, %lf total)\n", upcxx::rank_me(), contigs.size(),
" (%lf read, %lf insert, %lf total)\n", upcxx::rank_me(), contigs_size,
numKmers, start_nodes.size(), read.count(), insert.count(), total.count());
}
if (run_type == "test") {
std::ofstream fout("test_" + std::to_string(upcxx::rank_me()) + ".dat");
for (const auto &contig : contigs) {
fout << extract_contig(contig) << std::endl;
//for (const auto &contig : contigs) {
for(auto cter = 0; cter < contigs_size; ++cter) {
std::list<kmer_pair> real_contig;
auto len = contigs.get_cols_of_row(cter);
for(auto c = 0; c < len; ++c)
real_contig.emplace_back(contigs.get(cter, c));
fout << extract_contig(real_contig) << std::endl;
}
fout.close();
}
......
......@@ -38,6 +38,7 @@ struct kmer_pair {
bool operator==(const kmer_pair &kmer) const noexcept;
bool operator!=(const kmer_pair &kmer) const noexcept;
uint64_t padding; // recolic: make sizeof(T) >= sizeof(uint64_t)
};
char kmer_pair::forwardExt() const noexcept {
......
#ifndef UPCXX_DIST_VECTOR_HPP
#define UPCXX_DIST_VECTOR_HPP
#include <upcxx/upcxx.hpp>
namespace fucking {
struct key_type {
uint64_t d1, d2;
bool operator==(const key_type &another) const {
return d1 == another.d1 and d2 == another.d2;
}
};
}
namespace std {
template <>
struct hash <fucking::key_type> {
hash() {}
size_t operator()(const fucking::key_type &p) {
// This function is REALLY REALLY IMPORTANT to minimize the communication cost
// in vector operations of `contigs`.
if(p.d2 + 1 == 0) return 0;
else return std::hash<uint64_t>{}(p.d2>>4) ^ p.d1;
}
};
}
#include "dist_kv_store.hpp"
#include "kmer_t.hpp"
//template <typename T>
//class dist_vector {
//public:
// dist_vector() : cap(0), size(0) {
//
// }
//
//private:
// void grow() {
// cap += 1;
//
// }
//
// size_t cap, size;
// upcxx::global_ptr<T> data;
//};
#include <numeric>
#include <cstdint>
using std::uint64_t;
template <typename T>
class upcxx_matrix {
using key_type = fucking::key_type;
public:
upcxx_matrix(size_t hash_table_size) : buf(upcxx::rank_me(), upcxx::rank_n(), hash_table_size) {}
static_assert(sizeof(T) >= sizeof(uint64_t), "T should larger than 64b");
void set_rows(uint64_t val) {
// set size of col 0
buf.set(index2key(SIZE_SLOT, 0), val);
}
void push_to_row(uint64_t row_index, const T &data) {
//std::cout << "push_to:" << row_index << std::endl;
// WARNING WARNING WARNING!!! NOT PROCESS SAFE!!! NOT THREAD SAFE!!!
auto curr_size = get_cols_of_row(row_index);
buf.set(index2key(row_index, curr_size), data);
// Warning: possible corruption here.
set_cols_of_row(row_index, curr_size + 1);
}
uint64_t get_cols_of_row(uint64_t row_index) const {
auto curr_size_res = buf.get(index2key(row_index, SIZE_SLOT));
if(curr_size_res.first == false) // not found
return 0;
return *(const uint64_t *)&curr_size_res.second;
}
void set_cols_of_row(uint64_t row_index, uint64_t cols) {
buf.set(index2key(row_index, SIZE_SLOT), *(T*)(&cols));
}
auto back_of_row(uint64_t row_index) const {
return get(row_index, get_cols_of_row(row_index)-1);
}
auto get(uint64_t row, uint64_t col) const {
//std::cout << "get:" << row << ":" << col << std::endl;
auto res = buf.get(index2key(row, col));
if(res.first == false)
throw std::out_of_range("matrix::get index out of range");
return res.second;
}
private:
kv_store<key_type, T> buf;
static constexpr uint64_t SIZE_SLOT = (uint64_t)0xffffffffffffffff;
__attribute__((const)) key_type index2key(uint64_t x, uint64_t y) const {
return key_type{x, y};
}
};
#endif
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment