Skip to content
Snippets Groups Projects
Commit 30fd25a8 authored by Recolic Keghart's avatar Recolic Keghart
Browse files

Squashed commit: multidev support done

parent 52152bf9
No related branches found
No related tags found
No related merge requests found
......@@ -9,14 +9,14 @@ TEST_DEV ?= GPU
OMP_FLAGS = -Xcompiler -fopenmp
NVFLAGS = -I. -O3 -std=c++14 -DCUDA_SM=$(SM) -arch sm_$(SM) --relocatable-device-code=true --extended-lambda $(OMP_FLAGS) -g -DTEST_DEV=$(TEST_DEV)
NVFLAGS = -I. -O3 -std=c++14 -DCUDA_SM=$(SM) -arch sm_$(SM) --relocatable-device-code=true --extended-lambda $(OMP_FLAGS) -g
default:
$(NVCC) $(NVFLAGS) gpma_bfs_demo.cu -o gpma_bfs_demo -lgomp
mini:
$(NVCC) $(NVFLAGS) mini.cu -o mini -lgomp -DDEBUG
$(NVCC) $(NVFLAGS) mini.cu -o mini -lgomp -DDEBUG -DTEST_DEV=$(TEST_DEV)
format:
clang-format --style=file -i *.cuh *.cu *.hpp
......
......@@ -108,7 +108,7 @@ class GPMA {
* 0x50000001 31
* 0x00000101 9
*/
__host__ __device__ [[gnu::always_inline]] SIZE_TYPE fls(SIZE_TYPE x) {
__host__ __device__ [[gnu::always_inline]] inline SIZE_TYPE fls(SIZE_TYPE x) {
SIZE_TYPE r = 32;
if (!x)
return 0;
......@@ -968,6 +968,7 @@ template <dev_type_t DEV>
__host__ void update_gpma(GPMA<DEV> &gpma, NATIVE_VEC_KEY<DEV> &update_keys, NATIVE_VEC_VALUE<DEV> &update_values) {
DEBUG_PRINTFLN("DBG: (ENTER UPDATE)update_gpma args, update_keys={}, values={}", rlib::printable_iter(update_keys), rlib::printable_iter(values_for_print(update_values)));
gpma.print_status("ENTER update_gpma");
rlib::printfln("DBG: tree_height={}, update_size={}", gpma.tree_height, update_keys.size());
//LOG_TIME("enter_update_gpma")
......@@ -1008,7 +1009,6 @@ __host__ void update_gpma(GPMA<DEV> &gpma, NATIVE_VEC_KEY<DEV> &update_keys, NAT
// step5: rebalance each tree level
for (SIZE_TYPE level = 0; level <= gpma.tree_height && update_size; level++) {
rlib::printfln("debug: rebalance tree level {}, tree_height={} update_size={}", level, gpma.tree_height, update_size);
SIZE_TYPE lower_bound = gpma.lower_element[level];
SIZE_TYPE upper_bound = gpma.upper_element[level];
......
This diff is collapsed.
......@@ -56,30 +56,24 @@ int main(int argc, char **argv) {
h_base_keys[i] = ((KEY_TYPE)host_x[i] << 32) + host_y[i];
}
NATIVE_VEC_KEY<TEST_DEV> base_keys = h_base_keys;
NATIVE_VEC_VALUE<TEST_DEV> base_values(half, 1);
NATIVE_VEC_KEY<CPU> base_keys = h_base_keys;
NATIVE_VEC_VALUE<CPU> base_values(half, 1);
cudaDeviceSynchronize();
int num_slide = 100;
int step = half / num_slide;
LOG_TIME("before init_csr_gpma")
GPMA<TEST_DEV> gpma(node_size);
GPMA_multidev<1, 1> gpma(node_size);
cudaDeviceSynchronize();
LOG_TIME("before update_gpma 1")
update_gpma(gpma, base_keys, base_values);
thrust::device_vector<SIZE_TYPE> bfs_result(node_size);
gpma.update_batch(base_keys, base_values);
native_vector<CPU, SIZE_TYPE> bfs_result(node_size);
cudaDeviceSynchronize();
LOG_TIME("before first bfs") {
auto gpma_mirror = gpma
#if TEST_DEV == CPU
.mirror()
#endif
;
gpma_bfs<GPU>(RAW_PTR(gpma_mirror.keys), RAW_PTR(gpma_mirror.values), RAW_PTR(gpma_mirror.row_offset), node_size, edge_size, bfs_start_node, RAW_PTR(bfs_result));
}
LOG_TIME("before first bfs")
gpma_bfs(gpma, node_size, edge_size, bfs_start_node, RAW_PTR(bfs_result));
int reach_nodes = node_size - thrust::count(bfs_result.begin(), bfs_result.end(), 0);
printf("start from node %d, number of reachable nodes: %d\n", bfs_start_node, reach_nodes);
......@@ -95,26 +89,19 @@ int main(int argc, char **argv) {
hk[j + step] = ((KEY_TYPE)host_x[idx] << 32) + host_y[idx];
}
NATIVE_VEC_VALUE<TEST_DEV> update_values(step * 2);
NATIVE_VEC_VALUE<CPU> update_values(step * 2);
thrust::fill(update_values.begin(), update_values.begin() + step, 1);
thrust::fill(update_values.begin() + step, update_values.end(), VALUE_NONE);
NATIVE_VEC_KEY<TEST_DEV> update_keys = hk;
NATIVE_VEC_KEY<CPU> update_keys = hk;
cudaDeviceSynchronize();
update_gpma(gpma, update_keys, update_values);
gpma.update_batch(update_keys, update_values);
cudaDeviceSynchronize();
}
printf("Graph is updated.\n");
LOG_TIME("before second bfs")
{
auto gpma_mirror = gpma
#if TEST_DEV == CPU
.mirror()
#endif
;
gpma_bfs<GPU>(RAW_PTR(gpma_mirror.keys), RAW_PTR(gpma_mirror.values), RAW_PTR(gpma_mirror.row_offset), node_size, edge_size, bfs_start_node, RAW_PTR(bfs_result));
}
gpma_bfs(gpma, node_size, edge_size, bfs_start_node, RAW_PTR(bfs_result));
reach_nodes = node_size - thrust::count(bfs_result.begin(), bfs_result.end(), 0);
printf("start from node %d, number of reachable nodes: %d\n", bfs_start_node, reach_nodes);
LOG_TIME("after second bfs")
......
// Author: Kirk Saunders (ks825016@ohio.edu)
// Description: Simple implementation of a thread barrier
// using C++ condition variables.
// Date: 2/17/2020
#ifndef BARRIER_HPP
#define BARRIER_HPP
#include <condition_variable>
#include <mutex>
#include <stdexcept>
#include <thread>
class Barrier {
public:
// Construct barrier for use with num threads.
Barrier(std::size_t num)
: num_threads(num),
wait_count(0),
instance(0),
mut(),
cv()
{
if (num == 0) {
throw std::invalid_argument("Barrier thread count cannot be 0");
}
}
// disable copying of barrier
Barrier(const Barrier&) = delete;
Barrier& operator =(const Barrier&) = delete;
// This function blocks the calling thread until
// all threads (specified by num_threads) have
// called it. Blocking is achieved using a
// call to condition_variable.wait().
void wait() {
std::unique_lock<std::mutex> lock(mut); // acquire lock
std::size_t inst = instance; // store current instance for comparison
// in predicate
if (++wait_count == num_threads) { // all threads reached barrier
wait_count = 0; // reset wait_count
instance++; // increment instance for next use of barrier and to
// pass condition variable predicate
cv.notify_all();
} else { // not all threads have reached barrier
cv.wait(lock, [this, &inst]() { return instance != inst; });
// NOTE: The predicate lambda here protects against spurious
// wakeups of the thread. As long as this->instance is
// equal to inst, the thread will not wake.
// this->instance will only increment when all threads
// have reached the barrier and are ready to be unblocked.
}
}
private:
std::size_t num_threads; // number of threads using barrier
std::size_t wait_count; // counter to keep track of waiting threads
std::size_t instance; // counter to keep track of barrier use count
std::mutex mut; // mutex used to protect resources
std::condition_variable cv; // condition variable used to block threads
};
#endif
\ No newline at end of file
#ifndef RLIB_GPMA_MULTIDEV_CUH_
#define RLIB_GPMA_MULTIDEV_CUH_ 1
#include <vector>
#include <type_traits>
#include "gpma.cuh"
#include <rlib/meta.hpp>
namespace gpma_impl {
template <typename K, typename V>
struct quick_1way_hash_table {
static_assert(std::is_integral<K>::value, "only allows integral key");
std::vector<V> buf;
quick_1way_hash_table(K max_key, const V &def_value)
: buf(max_key, def_value)
{}
const V &get(const K &k) const {return buf[k];}
const V &set(const K &k, const V &v) {
#if DEBUG
return buf.at(k) = v;
#else
return buf[k] = v;
#endif
}
};
template <size_t cpu_instances, size_t gpu_instances>
struct dispatcher {
using CpuArrT = std::array<GPMA<CPU> *, cpu_instances>;
using GpuArrT = std::array<GPMA<GPU> *, gpu_instances>;
static constexpr KEY_TYPE hashSize = 1024;
quick_1way_hash_table<KEY_TYPE, size_t> mapKeyToSlot;
dispatcher()
: mapKeyToSlot(hashSize, (size_t)(-1)) {}
// void init(const CpuArrT &ptrs_cpu, const GpuArrT &ptrs_gpu) {}
// Given KEY, returns the ID(offset from zero) of device, which is responsible to this KEY.
[[gnu::always_inline]] size_t select_device(const KEY_TYPE &k) {
auto hashKey = k % hashSize;
auto dev_id = mapKeyToSlot.get(hashKey);
if(dev_id == (size_t)(-1)) {
// appoint a device for a new hash.
dev_id = hashKey % (cpu_instances + gpu_instances);
// Add link: hashKey => dev_id
return mapKeyToSlot.set(hashKey, dev_id);
}
else
return dev_id;
}
};
template <dev_type_t DEV, size_t instances>
struct thread_safe_kv_buf {
std::array<NATIVE_VEC_KEY<DEV>, instances> k_buffers;
std::array<NATIVE_VEC_VALUE<DEV>, instances> v_buffers;
std::array<std::atomic<size_t>, instances> sizes;
thread_safe_kv_buf(size_t max_size) {
for(auto &buf : k_buffers) buf.resize(max_size);
for(auto &buf : v_buffers) buf.resize(max_size);
std::fill(sizes.begin(), sizes.end(), 0);
}
void push_back(const size_t &dev_id, const KEY_TYPE &k, const VALUE_TYPE &v) {
// thread safe.
const auto pos = sizes[dev_id]++;
k_buffers[dev_id][pos] = k;
v_buffers[dev_id][pos] = v;
}
};
}
template <size_t cpu_instances, size_t gpu_instances>
struct GPMA_multidev {
static constexpr size_t instances = cpu_instances + gpu_instances;
static_assert(instances > 0, "Need at lease one DEV instance.");
// TODO: support multi-GPU.
std::array<GPMA<CPU> *, cpu_instances> ptrs_cpu;
std::array<GPMA<GPU> *, gpu_instances> ptrs_gpu;
GPMA_multidev(size_t row_num) {
// Construct these actual instances.
for(auto dev_id = 0; dev_id < instances; ++dev_id) {
if(dev_id < cpu_instances)
ptrs_cpu.at(dev_id) = new GPMA<CPU>(row_num);
else
ptrs_gpu.at(dev_id - cpu_instances) = new GPMA<GPU>(row_num);
}
}
GPMA_multidev() : ptrs_cpu{nullptr}, ptrs_gpu{nullptr} {}
~GPMA_multidev() {
for(auto *ptr : ptrs_cpu) delete ptr;
for(auto *ptr : ptrs_gpu) delete ptr;
}
gpma_impl::dispatcher<cpu_instances, gpu_instances> dispatcher;
void update_batch(NATIVE_VEC_KEY<CPU> &update_keys, NATIVE_VEC_VALUE<CPU> &update_values) {
// keys MUST be transfer-ed from CPU to GPU.
// values MAY NOT NECESSARILY to be transfer-ed. TODO: test if `device_vec<>(SIZE, VAL)` occupies the PCIE bandwidth.
// currently, everything transfer-ed from cpu.
if(update_keys.size() != update_values.size())
throw std::invalid_argument("Inconsistant kv size.");
gpma_impl::thread_safe_kv_buf<CPU, cpu_instances> cpu_buffers(update_keys.size());
gpma_impl::thread_safe_kv_buf<GPU, gpu_instances> gpu_buffers(update_keys.size());
// #pragma omp parallel for schedule(static) // not supporting parallel push back
for(auto i = 0; i < update_keys.size(); ++i) {
auto dev_id = dispatcher.select_device(update_keys[i]);
//rlib::printfln("PUSH, dev_id={}, i={}, k={}, v={}.", dev_id, i, update_keys[i], update_values[i]);
if(dev_id < cpu_instances) {
cpu_buffers.push_back(dev_id, update_keys[i], update_values[i]);
}
else {
gpu_buffers.push_back(dev_id - cpu_instances, update_keys[i], update_values[i]);
}
}
// Maybe this barrier could be removed in the future.
// Push GPU data, Run GPU update(in background).
// Push CPU data, Run CPU update. Then check if GPU has done.
anySync<GPU>();
#pragma omp parallel for schedule(dynamic)
for(auto dev_id = 0; dev_id < instances; ++dev_id) {
if(dev_id < cpu_instances)
::update_gpma(*(ptrs_cpu[dev_id]), cpu_buffers.k_buffers[dev_id], cpu_buffers.v_buffers[dev_id]);
else {
auto gpu_dev_id = dev_id - cpu_instances;
::update_gpma(*(ptrs_gpu[gpu_dev_id]), gpu_buffers.k_buffers[gpu_dev_id], gpu_buffers.v_buffers[gpu_dev_id]);
}
}
// barrier.
anySync<GPU>();
}
};
#endif
......@@ -9,6 +9,7 @@
#include <type_traits>
#define MAKE_64b(high, low) (((uint64_t)(high) << 32) + low)
using namespace rlib::literals;
template <typename T>
constexpr bool IsPowerOfTwo(T x) {
......@@ -137,3 +138,24 @@ void anyExclusiveSum(const SIZE_TYPE *inputVec, SIZE_TYPE *outputVec, SIZE_TYPE
} else
rlib::exclusiveSumParallel(inputVec, outputVec, len);
}
template <typename T>
T &cpuGetZero() {
static const T zero = 0;
return zero;
}
template <typename T>
T &cpuGetOne() {
static const T zero = 1;
return zero;
}
template <dev_type_t DEV, typename T>
void anySetVal(T *devPtr, const T &val) {
anyMemcpy<CPU, DEV>(devPtr, &val, sizeof(T));
}
template <dev_type_t DEV, typename T>
T anyGetVal(T *devPtr) {
T cpuVal;
anyMemcpy<DEV, CPU>(&cpuVal, devPtr, sizeof(T));
return cpuVal;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment