Skip to content
Snippets Groups Projects
Commit cd0261bf authored by Mo Sha's avatar Mo Sha
Browse files

init

parent dc7947d4
No related branches found
No related tags found
No related merge requests found
NVCC = /usr/local/cuda/bin/nvcc
gpma_bfs_demo:
$(NVCC) -I./ -O3 -std=c++11 -w -gencode arch=compute_61,code=sm_61 -odir "." -M -o "gpma_bfs_demo.d" "./gpma_bfs_demo.cu"
$(NVCC) -I./ -O3 -std=c++11 -w --compile --relocatable-device-code=true -gencode arch=compute_61,code=compute_61 -gencode arch=compute_61,code=sm_61 -x cu -o "gpma_bfs_demo.o" "gpma_bfs_demo.cu"
$(NVCC) --cudart static --relocatable-device-code=true -gencode arch=compute_61,code=compute_61 -gencode arch=compute_61,code=sm_61 -link -o "gpma_bfs_demo" ./gpma_bfs_demo.o
clean:
rm ./gpma_bfs_demo.o ./gpma_bfs_demo.d gpma_bfs_demo
# GPMA BFS Demo
Source code for the paper:
[Accelerating Dynamic Graph Analytics on GPUs](http://www.vldb.org/pvldb/vol11/p107-sha.pdf)
GPMA is a data structure to maintain dynamic graphs on GPUs. This repository illustrates a demo to conduct BFS on a dynamic graph, which is maintained by GPMA, on the GPU.
## Environment and Dependency
This code is developed and tested on:
* Ubuntu 16.04
* GeForce GTX 1080 Ti with Nvidia Drive 384.111
* CUDA 9.0
* [CUB](https://nvlabs.github.io/cub/) v1.8.0
## Preparation
```preparation.py``` will download, re-format, and shuffle a graph dataset, [__pokec__](https://snap.stanford.edu/data/soc-pokec.html), which can be used in this demo. Meanwhile, it will put CUB in the root folder.
## Build
To build this demo, use ```make```.
You may need to modify the ```Makefile``` with a proper setting, e.g., nvcc path, include path, and GPU architecture.
## Demo
```./gpma_bfs_demo [graph_path] [bfs_start_node]```
In this demo, first, the first half of edges (the init sliding window) of the given graph will be loaded into GPMA, and then, BFS is conducted starting from the given start node. After that, the sliding window will be moved 100 times to the second half of edges, which means that the current sliding window will not overlap with the original one. Finally, BFS is conducted on the updated graph.
The format of the given graph should start with one line including node_size and edge_size, and the following edge_size lines should provide all edges. The edges are directed.
If you have executed ```preparation.py``` to generate a well-formatted pokec graph dataset in advance, and you want to start the BFS from node 0:
```./gpma_bfs_demo pokec.txt 0```
The output should be in a similar format as follows:
```
node_num: 1632803, edge_num: 30622564
Graph file is loaded.
start from node 0, number of reachable nodes: 1334862
Graph is updated.
start from node 0, number of reachable nodes: 1334356
```
The number of reachable nodes may be different since the graph is shuffled.
## Reference
```
@article{sha2017gpma,
title={Accelerating Dynamic Graph Analytics on GPUs},
author={Sha, Mo and Li, Yuchen and He, Bingsheng and Tan, Kian-Lee},
journal={Proceedings of the VLDB Endowment},
volume={11},
number={1},
year={2017}
}
```
## License
MIT
gpma.cuh 0 → 100644
This diff is collapsed.
#pragma once
#include "cub/cub.cuh"
#define FULL_MASK 0xffffffff
template<SIZE_TYPE THREADS_NUM>
__global__
void gpma_bfs_gather_kernel(SIZE_TYPE *node_queue, SIZE_TYPE *node_queue_offset,
SIZE_TYPE *edge_queue, SIZE_TYPE *edge_queue_offset,
KEY_TYPE *keys, VALUE_TYPE *values, SIZE_TYPE *row_offsets) {
typedef cub::BlockScan<SIZE_TYPE, THREADS_NUM> BlockScan;
__shared__ typename BlockScan::TempStorage block_temp_storage;
volatile __shared__ SIZE_TYPE comm[THREADS_NUM / 32][3];
volatile __shared__ SIZE_TYPE comm2[THREADS_NUM];
volatile __shared__ SIZE_TYPE output_cta_offset;
volatile __shared__ SIZE_TYPE output_warp_offset[THREADS_NUM / 32];
typedef cub::WarpScan<SIZE_TYPE> WarpScan;
__shared__ typename WarpScan::TempStorage temp_storage[THREADS_NUM / 32];
SIZE_TYPE thread_id = threadIdx.x;
SIZE_TYPE lane_id = thread_id % 32;
SIZE_TYPE warp_id = thread_id / 32;
SIZE_TYPE cta_offset = blockDim.x * blockIdx.x;
while (cta_offset < node_queue_offset[0]) {
SIZE_TYPE node, row_begin, row_end;
if (cta_offset + thread_id < node_queue_offset[0]) {
node = node_queue[cta_offset + thread_id];
row_begin = row_offsets[node];
row_end = row_offsets[node + 1];
} else
row_begin = row_end = 0;
// CTA-based coarse-grained gather
while (__syncthreads_or(row_end - row_begin >= THREADS_NUM)) {
// vie for control of block
if (row_end - row_begin >= THREADS_NUM)
comm[0][0] = thread_id;
__syncthreads();
// winner describes adjlist
if (comm[0][0] == thread_id) {
comm[0][1] = row_begin;
comm[0][2] = row_end;
row_begin = row_end;
}
__syncthreads();
SIZE_TYPE gather = comm[0][1] + thread_id;
SIZE_TYPE gather_end = comm[0][2];
SIZE_TYPE neighbour;
SIZE_TYPE thread_data_in;
SIZE_TYPE thread_data_out;
SIZE_TYPE block_aggregate;
while (__syncthreads_or(gather < gather_end)) {
if (gather < gather_end) {
KEY_TYPE cur_key = keys[gather];
VALUE_TYPE cur_value = values[gather];
neighbour = (SIZE_TYPE) (cur_key & COL_IDX_NONE);
thread_data_in = (neighbour == COL_IDX_NONE || cur_value == VALUE_NONE) ? 0 : 1;
} else
thread_data_in = 0;
__syncthreads();
BlockScan(block_temp_storage).ExclusiveSum(thread_data_in, thread_data_out, block_aggregate);
__syncthreads();
if (0 == thread_id) {
output_cta_offset = atomicAdd(edge_queue_offset, block_aggregate);
}
__syncthreads();
if (thread_data_in)
edge_queue[output_cta_offset + thread_data_out] = neighbour;
gather += THREADS_NUM;
}
}
// warp-based coarse-grained gather
while (__any_sync(FULL_MASK, row_end - row_begin >= 32)) {
// vie for control of warp
if (row_end - row_begin >= 32)
comm[warp_id][0] = lane_id;
// winner describes adjlist
if (comm[warp_id][0] == lane_id) {
comm[warp_id][1] = row_begin;
comm[warp_id][2] = row_end;
row_begin = row_end;
}
SIZE_TYPE gather = comm[warp_id][1] + lane_id;
SIZE_TYPE gather_end = comm[warp_id][2];
SIZE_TYPE neighbour;
SIZE_TYPE thread_data_in;
SIZE_TYPE thread_data_out;
SIZE_TYPE warp_aggregate;
while (__any_sync(FULL_MASK, gather < gather_end)) {
if (gather < gather_end) {
KEY_TYPE cur_key = keys[gather];
VALUE_TYPE cur_value = values[gather];
neighbour = (SIZE_TYPE) (cur_key & COL_IDX_NONE);
thread_data_in = (neighbour == COL_IDX_NONE || cur_value == VALUE_NONE) ? 0 : 1;
} else
thread_data_in = 0;
WarpScan(temp_storage[warp_id]).ExclusiveSum(thread_data_in, thread_data_out, warp_aggregate);
if (0 == lane_id) {
output_warp_offset[warp_id] = atomicAdd(edge_queue_offset, warp_aggregate);
}
if (thread_data_in)
edge_queue[output_warp_offset[warp_id] + thread_data_out] = neighbour;
gather += 32;
}
}
// scan-based fine-grained gather
SIZE_TYPE thread_data = row_end - row_begin;
SIZE_TYPE rsv_rank;
SIZE_TYPE total;
SIZE_TYPE remain;
__syncthreads();
BlockScan(block_temp_storage).ExclusiveSum(thread_data, rsv_rank, total);
__syncthreads();
SIZE_TYPE cta_progress = 0;
while (cta_progress < total) {
remain = total - cta_progress;
// share batch of gather offsets
while ((rsv_rank < cta_progress + THREADS_NUM) && (row_begin < row_end)) {
comm2[rsv_rank - cta_progress] = row_begin;
rsv_rank++;
row_begin++;
}
__syncthreads();
SIZE_TYPE neighbour;
// gather batch of adjlist
if (thread_id < min(remain, THREADS_NUM)) {
KEY_TYPE cur_key = keys[comm2[thread_id]];
VALUE_TYPE cur_value = values[comm2[thread_id]];
neighbour = (SIZE_TYPE) (cur_key & COL_IDX_NONE);
thread_data = (neighbour == COL_IDX_NONE || cur_value == VALUE_NONE) ? 0 : 1;
} else
thread_data = 0;
__syncthreads();
SIZE_TYPE scatter;
SIZE_TYPE block_aggregate;
BlockScan(block_temp_storage).ExclusiveSum(thread_data, scatter, block_aggregate);
__syncthreads();
if (0 == thread_id) {
output_cta_offset = atomicAdd(edge_queue_offset, block_aggregate);
}
__syncthreads();
if (thread_data)
edge_queue[output_cta_offset + scatter] = neighbour;
cta_progress += THREADS_NUM;
__syncthreads();
}
cta_offset += blockDim.x * gridDim.x;
}
}
template<SIZE_TYPE THREADS_NUM>
__global__
void gpma_bfs_contract_kernel(SIZE_TYPE *edge_queue, SIZE_TYPE *edge_queue_offset,
SIZE_TYPE *node_queue, SIZE_TYPE *node_queue_offset,
SIZE_TYPE level, SIZE_TYPE *label, SIZE_TYPE *bitmap) {
typedef cub::BlockScan<SIZE_TYPE, THREADS_NUM> BlockScan;
__shared__ typename BlockScan::TempStorage temp_storage;
volatile __shared__ SIZE_TYPE output_cta_offset;
volatile __shared__ SIZE_TYPE warp_cache[THREADS_NUM / 32][128];
const SIZE_TYPE HASH_KEY1 = 1097;
const SIZE_TYPE HASH_KEY2 = 1103;
volatile __shared__ SIZE_TYPE cta1_cache[HASH_KEY1];
volatile __shared__ SIZE_TYPE cta2_cache[HASH_KEY2];
// init cta-level cache
for (int i = threadIdx.x; i < HASH_KEY1; i += blockDim.x)
cta1_cache[i] = SIZE_NONE;
for (int i = threadIdx.x; i < HASH_KEY2; i += blockDim.x)
cta2_cache[i] = SIZE_NONE;
__syncthreads();
SIZE_TYPE thread_id = threadIdx.x;
SIZE_TYPE warp_id = thread_id / 32;
SIZE_TYPE cta_offset = blockDim.x * blockIdx.x;
while (cta_offset < edge_queue_offset[0]) {
SIZE_TYPE neighbour;
SIZE_TYPE valid = 0;
do {
if (cta_offset + thread_id >= edge_queue_offset[0]) break;
neighbour = edge_queue[cta_offset + thread_id];
// warp cull
SIZE_TYPE hash = neighbour & 127;
warp_cache[warp_id][hash] = neighbour;
SIZE_TYPE retrieved = warp_cache[warp_id][hash];
if (retrieved == neighbour) {
warp_cache[warp_id][hash] = thread_id;
if (warp_cache[warp_id][hash] != thread_id)
break;
}
// history cull
if (cta1_cache[neighbour % HASH_KEY1] == neighbour) break;
if (cta2_cache[neighbour % HASH_KEY2] == neighbour) break;
cta1_cache[neighbour % HASH_KEY1] = neighbour;
cta2_cache[neighbour % HASH_KEY2] = neighbour;
// bitmap check
SIZE_TYPE bit_loc = 1 << (neighbour % 32);
SIZE_TYPE bit_chunk = bitmap[neighbour / 32];
if (bit_chunk & bit_loc) break;
bitmap[neighbour / 32] = bit_chunk + bit_loc;
SIZE_TYPE ret = atomicCAS(label + neighbour, 0, level);
valid = ret ? 0 : 1;
} while (false);
__syncthreads();
SIZE_TYPE scatter;
SIZE_TYPE total;
BlockScan(temp_storage).ExclusiveSum(valid, scatter, total);
__syncthreads();
if (0 == thread_id) {
output_cta_offset = atomicAdd(node_queue_offset, total);
}
__syncthreads();
if (valid)
node_queue[output_cta_offset + scatter] = neighbour;
cta_offset += blockDim.x * gridDim.x;
}
}
__host__
void gpma_bfs(KEY_TYPE *keys, VALUE_TYPE *values, SIZE_TYPE *row_offsets,
SIZE_TYPE node_size, SIZE_TYPE edge_size, SIZE_TYPE start_node, SIZE_TYPE *results) {
cudaMemset(results, 0, sizeof(SIZE_TYPE) * node_size);
SIZE_TYPE *bitmap;
cudaMalloc(&bitmap, sizeof(SIZE_TYPE) * ((node_size - 1) / 32 + 1));
cudaMemset(bitmap, 0, sizeof(SIZE_TYPE) * ((node_size - 1) / 32 + 1));
SIZE_TYPE *node_queue;
cudaMalloc(&node_queue, sizeof(SIZE_TYPE) * node_size);
SIZE_TYPE *node_queue_offset;
cudaMalloc(&node_queue_offset, sizeof(SIZE_TYPE));
SIZE_TYPE *edge_queue;
cudaMalloc(&edge_queue, sizeof(SIZE_TYPE) * edge_size);
SIZE_TYPE *edge_queue_offset;
cudaMalloc(&edge_queue_offset, sizeof(SIZE_TYPE));
// init
SIZE_TYPE host_num[1];
host_num[0] = start_node;
cudaMemcpy(node_queue, host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
host_num[0] = 1 << (start_node % 32);
cudaMemcpy(&bitmap[start_node / 32], host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
host_num[0] = 1;
cudaMemcpy(node_queue_offset, host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
cudaMemcpy(&results[start_node], host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
SIZE_TYPE level = 1;
const SIZE_TYPE THREADS_NUM = 256;
while (true) {
// gather
SIZE_TYPE BLOCKS_NUM = CALC_BLOCKS_NUM(THREADS_NUM, host_num[0]);
host_num[0] = 0;
cudaMemcpy(edge_queue_offset, host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
gpma_bfs_gather_kernel<THREADS_NUM> <<<BLOCKS_NUM, THREADS_NUM>>>(node_queue, node_queue_offset,
edge_queue, edge_queue_offset, keys, values, row_offsets);
// contract
level++;
cudaMemcpy(node_queue_offset, host_num, sizeof(SIZE_TYPE), cudaMemcpyHostToDevice);
cudaMemcpy(host_num, edge_queue_offset, sizeof(SIZE_TYPE), cudaMemcpyDeviceToHost);
BLOCKS_NUM = CALC_BLOCKS_NUM(THREADS_NUM, host_num[0]);
gpma_bfs_contract_kernel<THREADS_NUM> <<<BLOCKS_NUM, THREADS_NUM>>>(edge_queue, edge_queue_offset,
node_queue, node_queue_offset, level, results, bitmap);
cudaMemcpy(host_num, node_queue_offset, sizeof(SIZE_TYPE), cudaMemcpyDeviceToHost);
if (0 == host_num[0]) break;
}
cudaFree(bitmap);
cudaFree(node_queue);
cudaFree(node_queue_offset);
cudaFree(edge_queue);
cudaFree(edge_queue_offset);
}
#include <iostream>
#include "gpma.cuh"
#include "gpma_bfs.cuh"
void load_data(const char *file_path, thrust::host_vector<int> &host_x, thrust::host_vector<int> &host_y,
int &node_size, int &edge_size) {
FILE *fp;
fp = fopen(file_path, "r");
if (not fp) {
printf("Open graph file failed.\n");
exit(0);
}
fscanf(fp, "%d %d", &node_size, &edge_size);
printf("node_num: %d, edge_num: %d\n", node_size, edge_size);
host_x.resize(edge_size);
host_y.resize(edge_size);
for (int i = 0; i < edge_size; i++) {
int x, y;
(void) fscanf(fp, "%d %d", &x, &y);
host_x[i] = x;
host_y[i] = y;
}
printf("Graph file is loaded.\n");
fclose(fp);
}
int main(int argc, char **argv) {
if (argc != 3) {
printf("Invalid arguments.\n");
return -1;
}
char* data_path = argv[1];
int bfs_start_node = std::atoi(argv[2]);
cudaDeviceSetLimit(cudaLimitMallocHeapSize, 1024ll * 1024 * 1024);
cudaDeviceSetLimit(cudaLimitDevRuntimeSyncDepth, 5);
thrust::host_vector<int> host_x;
thrust::host_vector<int> host_y;
int node_size;
int edge_size;
load_data(data_path, host_x, host_y, node_size, edge_size);
int half = edge_size / 2;
thrust::host_vector<KEY_TYPE> h_base_keys(half);
for (int i = 0; i < half; i++) {
h_base_keys[i] = ((KEY_TYPE) host_x[i] << 32) + host_y[i];
}
DEV_VEC_KEY base_keys = h_base_keys;
DEV_VEC_VALUE base_values(half, 1);
cudaDeviceSynchronize();
int num_slide = 100;
int step = half / num_slide;
GPMA gpma;
init_csr_gpma(gpma, node_size);
cudaDeviceSynchronize();
update_gpma(gpma, base_keys, base_values);
thrust::device_vector<SIZE_TYPE> bfs_result(node_size);
cudaDeviceSynchronize();
gpma_bfs(RAW_PTR(gpma.keys), RAW_PTR(gpma.values), RAW_PTR(gpma.row_offset), node_size,
edge_size, bfs_start_node, RAW_PTR(bfs_result));
int reach_nodes = node_size - thrust::count(bfs_result.begin(), bfs_result.end(), 0);
printf("start from node %d, number of reachable nodes: %d\n", bfs_start_node, reach_nodes);
for (int i = 0; i < num_slide; i++) {
thrust::host_vector<KEY_TYPE> hk(step * 2);
for (int j = 0; j < step; j++) {
int idx = half + i * step + j;
hk[j] = ((KEY_TYPE) host_x[idx] << 32) + host_y[idx];
}
for (int j = 0; j < step; j++) {
int idx = i * step + j;
hk[j + step] = ((KEY_TYPE) host_x[idx] << 32) + host_y[idx];
}
DEV_VEC_VALUE update_values(step * 2);
thrust::fill(update_values.begin(), update_values.begin() + step, 1);
thrust::fill(update_values.begin() + step, update_values.end(), VALUE_NONE);
DEV_VEC_KEY update_keys = hk;
cudaDeviceSynchronize();
update_gpma(gpma, update_keys, update_values);
cudaDeviceSynchronize();
}
printf("Graph is updated.\n");
gpma_bfs(RAW_PTR(gpma.keys), RAW_PTR(gpma.values), RAW_PTR(gpma.row_offset), node_size,
edge_size, bfs_start_node, RAW_PTR(bfs_result));
reach_nodes = node_size - thrust::count(bfs_result.begin(), bfs_result.end(), 0);
printf("start from node %d, number of reachable nodes: %d\n", bfs_start_node, reach_nodes);
return 0;
}
#!/usr/bin/env python
import os
import random
# download dataset
print('downloading graph dataset...')
os.system('wget https://snap.stanford.edu/data/soc-pokec-relationships.txt.gz')
os.system('gunzip soc-pokec-relationships.txt.gz')
# reformat and shuffle
print('re-formating and shuffling graph dataset...')
node_size = 0
edges = []
with open('soc-pokec-relationships.txt', 'r') as f:
for line in f.readlines():
a, b = line.strip().split('\t')
a, b = int(a), int(b)
edges.append((a, b))
node_size = max(node_size, a)
node_size = max(node_size, b)
edge_size = len(edges)
random.shuffle(edges)
with open('pokec.txt', 'w') as f:
f.write('{} {}\n'.format(node_size, edge_size))
for a, b in edges:
f.write('{} {}\n'.format(a - 1, b - 1))
os.system('rm soc-pokec-relationships.txt')
# download CUB v1.8.0
print('downloading CUB...')
os.system('wget https://github.com/NVlabs/cub/archive/1.8.0.zip')
os.system('unzip 1.8.0.zip')
os.system('cp -rf cub-1.8.0/cub .')
os.system('rm -rf cub-1.8.0')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment