Skip to content
Snippets Groups Projects
Commit 8a157a2c authored by Recolic K's avatar Recolic K
Browse files

update thread safe queue

parent 02da50be
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#include <cstdlib> #include <cstdlib>
#include <cassert> #include <cassert>
#include <atomic> #include <atomic>
#include <memory>
#include <stdexcept>
using std::size_t; using std::size_t;
namespace rlib { namespace rlib {
...@@ -13,45 +15,57 @@ namespace rlib { ...@@ -13,45 +15,57 @@ namespace rlib {
class thread_safe_queue { class thread_safe_queue {
struct node { struct node {
std::atomic<node *> next; std::atomic<node *> next;
ElementType payload; ElementType *payload;
template <typename ... PayloadArgsT> explicit node(ElementType *payload) : payload(payload), next(nullptr) {}
explicit node(PayloadArgsT && ... payloadArgs)
: payload(std::forward<PayloadArgsT>(payloadArgs) ...), next(nullptr)
{}
}; };
using apnode_t = std::atomic<node *>; using apnode_t = std::atomic<node *>;
// push to back (last), pop from front (first). // push to back (tail), pop from front (head).
apnode_t first, last; // To make it simple, the head node always exists. So pop will not hurt tail, and push will not change head.
apnode_t head, tail;
static constexpr node *NULLPTR = nullptr; static constexpr node *NULLPTR = nullptr;
void do_push_back(node *new_ele) {
// NO! Use the plan before coding...
while(true) {
auto last_node_ptr = last.load();
if(last_node_ptr == nullptr) {
last.store(new_ele);
}
// last_node_ptr->next = new_ele;
bool succeed = std::atomic_compare_exchange_weak(&last_node_ptr->next, &NULLPTR, new_ele);
if(succeed)
break;
}
// while(std::atomic_compare_exchange_weak(&last, last, )); // List structure is the primary concern, we must keep the list structure consistent before taking care of head/tail ptr.
if(first.load() == nullptr) { void do_push_back(node *new_ele) {
first.store(new_ele); auto tail_node_ptr = tail.load();
while(!std::atomic_compare_exchange_weak(&tail_node_ptr->next, &NULLPTR, new_ele)) {
// If failed to link the node to the tail, we catch up and try again.
tail_node_ptr = tail.load();
} }
// Move the tail ptr. If we succeed, that's good. (I don't think we might fail here)
// TODO: prove that we will not fail here.
std::atomic_compare_exchange_strong(&tail, tail_node_ptr, new_ele);
}
node *do_pop_front() {
node *head_node_ptr;
do {
head_node_ptr = head.load();
if(head_node_ptr->next == nullptr)
return nullptr;
// We move the head pointer forward to pop. (It will never invalidate tail node)
// If failed, just try again.
} while(!std::atomic_compare_exchange_weak(&head, head_node_ptr, head_node_ptr->next));
// I love this idea: value stored in head->next->data, instead of head->data.
std::swap(head_node_ptr->payload = head_node_ptr->next->payload);
return head_node_ptr;
// TODO: Warning: the returned pointer is not safe to delete!
} }
public: public:
thread_safe_queue() thread_safe_queue()
: first(nullptr), last(nullptr) {} : head(new node(nullptr)), tail(head) {}
void push(ElementType &&payload) { void push(ElementType &&payload) {
do_push_back(new node(std::forward<ElementType>(payload))); do_push_back(new node(new ElementType(std::forward<ElementType>(payload))));
}
std::shared_ptr<ElementType> pop() {
auto ret = do_pop_front();
if(ret)
return std::make_shared(ret->payload);
else
throw std::out_of_range("pop empty queue");
} }
}; };
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment