diff --git a/3rd-party/tbb/cmake/TBBConfig.cmake b/3rd-party/tbb/cmake/TBBConfig.cmake index 1296f1543..3a5b3a26f 100644 --- a/3rd-party/tbb/cmake/TBBConfig.cmake +++ b/3rd-party/tbb/cmake/TBBConfig.cmake @@ -48,10 +48,10 @@ get_filename_component(_tbb_root "${_tbb_root}" PATH) foreach (_tbb_component ${TBB_FIND_COMPONENTS}) set(TBB_${_tbb_component}_FOUND 0) - set(_tbb_release_lib "/home/thuang295/Code/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_release/lib${_tbb_component}.so.2") + set(_tbb_release_lib "/home/thuang295/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_release/lib${_tbb_component}.so.2") if (NOT TBB_FIND_RELEASE_ONLY) - set(_tbb_debug_lib "/home/thuang295/Code/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_debug/lib${_tbb_component}_debug.so.2") + set(_tbb_debug_lib "/home/thuang295/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_debug/lib${_tbb_component}_debug.so.2") endif() if (EXISTS "${_tbb_release_lib}" OR EXISTS "${_tbb_debug_lib}") diff --git a/examples/simple.cpp b/examples/simple.cpp index d2b9aeb70..4ed1d3a30 100644 --- a/examples/simple.cpp +++ b/examples/simple.cpp @@ -13,6 +13,8 @@ #include // the only include you need int main(){ + + std::cout << sizeof(tf::Node) << '\n'; tf::Executor executor; tf::Taskflow taskflow("simple"); diff --git a/taskflow/algorithm/module.hpp b/taskflow/algorithm/module.hpp index 2b257bb96..31bd8cd3b 100644 --- a/taskflow/algorithm/module.hpp +++ b/taskflow/algorithm/module.hpp @@ -17,7 +17,7 @@ auto Algorithm::make_module_task(T&& target) { return; } rt._executor._schedule_graph_with_parent( - rt._worker, graph.begin(), graph.end(), rt._node->_topology, rt._node + rt._worker, graph, rt._node->_topology, rt._node ); }; } diff --git a/taskflow/core/declarations.hpp b/taskflow/core/declarations.hpp index ab52f68fa..791a33906 100644 --- a/taskflow/core/declarations.hpp +++ b/taskflow/core/declarations.hpp @@ -7,6 +7,7 @@ namespace tf { // ---------------------------------------------------------------------------- class Algorithm; +class NodeBase; class Node; class Graph; class FlowBuilder; diff --git a/taskflow/core/executor.hpp b/taskflow/core/executor.hpp index dd731fc13..e3f12bce1 100644 --- a/taskflow/core/executor.hpp +++ b/taskflow/core/executor.hpp @@ -1161,14 +1161,10 @@ class Executor { bool _invoke_runtime_task_impl(Worker&, Node*, std::function&); bool _invoke_runtime_task_impl(Worker&, Node*, std::function&); - template - I _set_up_graph(I, I, Topology*, NodeBase*); - template void _corun_until(Worker&, P&&); - template - void _corun_graph(Worker&, Topology*, NodeBase*, I, I); + void _corun_graph(Worker&, Graph&, Topology*, NodeBase*); template void _bulk_schedule(Worker&, I, size_t); @@ -1182,8 +1178,7 @@ class Executor { template void _bulk_update_cache(Worker&, Node*&, Node*, std::array&, size_t&); - template - void _schedule_graph_with_parent(Worker&, I, I, Topology*, NodeBase*); + void _schedule_graph_with_parent(Worker&, Graph&, Topology*, NodeBase*); template auto _async(P&&, F&&, Topology*, NodeBase*); @@ -1568,7 +1563,7 @@ void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) { // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter // iterator implementation in std::vector than GCC/Clang. if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) { - _bulk_spill(first + n, num_nodes - n); + _bulk_spill(first, num_nodes - n); } _notifier.notify_n(num_nodes); @@ -1922,13 +1917,13 @@ inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) { // spawn the subflow if it is joinable and its graph is non-empty // implicit join is faster than Subflow::join as it does not involve corun - if(sf.joinable() && g.size()) { + if(sf.joinable() && !g.empty()) { // signal the executor to preempt this node node->_nstate |= NSTATE::PREEMPTED; // set up and schedule the graph - _schedule_graph_with_parent(worker, g.begin(), g.end(), node->_topology, node); + _schedule_graph_with_parent(worker, g, node->_topology, node); return true; } } @@ -1990,7 +1985,7 @@ inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& gra // signal the executor to preempt this node node->_nstate |= NSTATE::PREEMPTED; - _schedule_graph_with_parent(w, graph.begin(), graph.end(), node->_topology, node); + _schedule_graph_with_parent(w, graph, node->_topology, node); return true; } @@ -2229,22 +2224,21 @@ void Executor::corun(T& target) { } NodeBase anchor; - _corun_graph(*w, nullptr, &anchor, target.graph().begin(), target.graph().end()); + _corun_graph(*w, target.graph(), nullptr, &anchor); } // Procedure: _corun_graph -template -void Executor::_corun_graph(Worker& w, Topology* tpg, NodeBase* p, I first, I last) { +inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) { // empty graph - if(first == last) { + if(g.empty()) { return; } // anchor this parent as the blocking point { ExplicitAnchorGuard anchor(p); - _schedule_graph_with_parent(w, first, last, tpg, p); + _schedule_graph_with_parent(w, g, tpg, p); _corun_until(w, [p] () -> bool { return p->_join_counter.load(std::memory_order_acquire) == 0; } ); @@ -2276,48 +2270,23 @@ inline void Executor::wait_for_all() { } // Function: _schedule_graph_with_parent -template -void Executor::_schedule_graph_with_parent( - Worker& worker, I beg, I end, Topology* tpg, NodeBase* parent +inline void Executor::_schedule_graph_with_parent( + Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent ) { - size_t num_srcs = (_set_up_graph(beg, end, tpg, parent) - beg); + size_t num_srcs = graph._set_up(tpg, parent); parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed); - _bulk_schedule(worker, beg, num_srcs); + _bulk_schedule(worker, graph.begin(), num_srcs); } // Function: _set_up_topology inline void Executor::_set_up_topology(Worker* w, Topology* tpg) { // ---- under taskflow lock ---- auto& g = tpg->_taskflow._graph; - size_t num_srcs = (_set_up_graph(g.begin(), g.end(), tpg, tpg) - g.begin()); + size_t num_srcs = g._set_up(tpg, tpg); tpg->_join_counter.store(num_srcs, std::memory_order_relaxed); w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs); } -// Function: _set_up_graph -template -I Executor::_set_up_graph(I first, I last, Topology* tpg, NodeBase* parent) { - - auto send = first; - for(; first != last; ++first) { - - auto node = *first; - node->_topology = tpg; - node->_parent = parent; - node->_nstate = NSTATE::NONE; - node->_estate.store(ESTATE::NONE, std::memory_order_relaxed); - node->_set_up_join_counter(); - node->_exception_ptr = nullptr; - - // move source to the first partition - // root, root, root, v1, v2, v3, v4, ... - if(node->num_predecessors() == 0) { - std::iter_swap(send++, first); - } - } - return send; -} - // Function: _tear_down_topology inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) { @@ -2398,7 +2367,7 @@ inline void Subflow::join() { TF_THROW("subflow already joined"); } - _executor._corun_graph(_worker, _node->_topology, _node, _graph.begin(), _graph.end()); + _executor._corun_graph(_worker, _graph, _node->_topology, _node); // join here since corun graph may throw exception _node->_nstate |= NSTATE::JOINED_SUBFLOW; diff --git a/taskflow/core/flow_builder.hpp b/taskflow/core/flow_builder.hpp index b4effda69..d9270419a 100644 --- a/taskflow/core/flow_builder.hpp +++ b/taskflow/core/flow_builder.hpp @@ -1282,7 +1282,7 @@ inline FlowBuilder::FlowBuilder(Graph& graph) : // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1291,12 +1291,12 @@ Task FlowBuilder::emplace(C&& c) { template , void>*> Task FlowBuilder::emplace(C&& c) { if constexpr (std::is_invocable_v) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } else if constexpr (std::is_invocable_v) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1308,7 +1308,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1316,7 +1316,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1324,7 +1324,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1332,7 +1332,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: composed_of template Task FlowBuilder::composed_of(T& object) { - auto node = _graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + auto node = _graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, object ); return Task(node); @@ -1340,7 +1340,7 @@ Task FlowBuilder::composed_of(T& object) { // Function: placeholder inline Task FlowBuilder::placeholder() { - auto node = _graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + auto node = _graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{} ); return Task(node); diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index 11ca36bc0..77f425c0e 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -13,6 +13,7 @@ #include "../utility/small_vector.hpp" #include "../utility/serializer.hpp" #include "../utility/lazy_string.hpp" +#include "../utility/list.hpp" #include "error.hpp" #include "declarations.hpp" #include "semaphore.hpp" @@ -53,6 +54,28 @@ class Graph { friend class Executor; public: + + // --- Iterator Subclass --- + class Iterator { + + public: + using iterator_category = std::forward_iterator_tag; + + explicit Iterator(Node* ptr) : _ptr(ptr) {} + + auto operator*() const { return _ptr; } + auto operator*() { return _ptr; } + //pointer operator->() { return _ptr; } + + Iterator& operator++(); + Iterator operator++(int); + + bool operator==(const Iterator& other) const { return _ptr == other._ptr; } + bool operator!=(const Iterator& other) const { return _ptr != other._ptr; } + + private: + Node* _ptr; + }; /** @brief constructs the graph object @@ -121,15 +144,20 @@ class Graph { private: - std::vector _nodes; + //std::vector _nodes; + + Node* _head {nullptr}; + size_t _size {0}; void _erase(Node*); + + size_t _set_up(Topology*, NodeBase*); /** @private */ template - Node* _emplace_back(ArgsT&&...); + Node* _emplace(ArgsT&&...); }; // ---------------------------------------------------------------------------- @@ -471,13 +499,15 @@ class Node : public NodeBase { Topology* _topology {nullptr}; + handle_t _handle; + size_t _num_successors {0}; SmallVector _edges; - - handle_t _handle; std::unique_ptr _semaphores; + Node* next {nullptr}; + bool _is_parent_cancelled() const; bool _is_conditioner() const; bool _acquire_all(SmallVector&); @@ -840,6 +870,17 @@ TF_FORCE_INLINE void recycle(Node* ptr) { // ---------------------------------------------------------------------------- // Graph definition // ---------------------------------------------------------------------------- + +inline Graph::Iterator& Graph::Iterator::operator++() { + if (_ptr) _ptr = _ptr->next; + return *this; +} + +inline Graph::Iterator Graph::Iterator::operator++(int) { + Iterator tmp = *this; + ++(*this); + return tmp; +} // Destructor inline Graph::~Graph() { @@ -848,79 +889,148 @@ inline Graph::~Graph() { // Move constructor inline Graph::Graph(Graph&& other) : - _nodes {std::move(other._nodes)} { + _head {other._head}, + _size {other._size} { + other._head = nullptr; + other._size = 0; } // Move assignment inline Graph& Graph::operator = (Graph&& other) { clear(); - _nodes = std::move(other._nodes); + _head = other._head; + _size = other._size; + other._head = nullptr; + other._size = 0; return *this; } // Procedure: clear inline void Graph::clear() { - for(auto node : _nodes) { - recycle(node); + while(_head) { + auto old_head = _head; + _head = _head->next; + recycle(old_head); } - _nodes.clear(); + _size = 0; } // Function: size inline size_t Graph::size() const { - return _nodes.size(); + return _size; } // Function: empty inline bool Graph::empty() const { - return _nodes.empty(); + // here we don't do _head == nullptr as it can cause data race with set_up + return _size == 0; } // Function: begin inline auto Graph::begin() { - return _nodes.begin(); + return Iterator(_head); } // Function: end inline auto Graph::end() { - return _nodes.end(); + return Iterator(nullptr); } // Function: begin inline auto Graph::begin() const { - return _nodes.begin(); + return Iterator(_head); } // Function: end inline auto Graph::end() const { - return _nodes.end(); + return Iterator(nullptr); } // Function: erase -inline void Graph::_erase(Node* node) { - //erase( - // std::remove_if(begin(), end(), [&](auto& p){ return p.get() == node; }), - // end() - //); - _nodes.erase( - std::remove_if(_nodes.begin(), _nodes.end(), [&](auto& p){ - if(p == node) { - recycle(p); - return true; - } - return false; - }), - _nodes.end() - ); +inline void Graph::_erase(Node* tgt) { + + if (!_head || !tgt) { + return; + } + + // Case: Target is the head + if (_head == tgt) { + _head = _head->next; + //tgt->next = nullptr; + recycle(tgt); + --_size; + return; + } + + // Case: Search for predecessor + auto current = _head; + while (current->next && current->next != tgt) { + current = current->next; + } + + // Bridge the gap + if (current->next == tgt) { + current->next = tgt->next; + //tgt->next = nullptr; + recycle(tgt); + --_size; + } +} + +// Function: _set_up +inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { + + if (!_head) { + return 0; + } + + size_t num_srcs = 0; + + Node* head[2] = {nullptr, nullptr}; // 0 = rest, 1 = zero + Node** tail[2] = {&head[0], &head[1]}; + + Node* current = _head; + + while (current) { + + current->_topology = tpg; + current->_parent = parent; + current->_nstate = NSTATE::NONE; + current->_estate.store(ESTATE::NONE, std::memory_order_relaxed); + current->_set_up_join_counter(); + current->_exception_ptr = nullptr; + + Node* next = current->next; + current->next = nullptr; // detach immediately (important) + + size_t is_zero = (current->num_predecessors() == 0); + num_srcs += is_zero; + + *tail[is_zero] = current; + tail [is_zero] = ¤t->next; + + current = next; + } + + // stitch zero list before rest list + *tail[1] = head[0]; + _head = head[1] ? head[1] : head[0]; + + return num_srcs; } /** @private */ template -Node* Graph::_emplace_back(ArgsT&&... args) { - _nodes.push_back(animate(std::forward(args)...)); - return _nodes.back(); +Node* Graph::_emplace(ArgsT&&... args) { + //_nodes.push_back(animate(std::forward(args)...)); + //return _nodes.back(); + auto node = animate(std::forward(args)...); + node->next = _head; + _head = node; + ++_size; + return _head; } // ---------------------------------------------------------------------------- diff --git a/taskflow/core/runtime.hpp b/taskflow/core/runtime.hpp index 73bf8ceee..ce931e2c4 100644 --- a/taskflow/core/runtime.hpp +++ b/taskflow/core/runtime.hpp @@ -993,7 +993,7 @@ tf::Future Executor::run_until(Taskflow&& f, P&& p, C&& c) { t->_parent->_join_counter.fetch_add(1, std::memory_order_release); if(g.object->_fetch_enqueue(t) == 0) { rt._executor._schedule_graph_with_parent( - rt._worker, g.object->_graph.begin(), g.object->_graph.end(), t.get(), t.get() + rt._worker, g.object->_graph, t.get(), t.get() ); } }); diff --git a/taskflow/core/wsq.hpp b/taskflow/core/wsq.hpp index d02e3a16c..2da7c6593 100644 --- a/taskflow/core/wsq.hpp +++ b/taskflow/core/wsq.hpp @@ -238,7 +238,7 @@ class UnboundedWSQ { Only the owner thread can insert an item to the queue. */ template - void bulk_push(I first, size_t N); + void bulk_push(I& first, size_t N); /** @brief pops out an item from the queue @@ -397,7 +397,7 @@ void UnboundedWSQ::push(T o) { // Function: bulk_push template template -void UnboundedWSQ::bulk_push(I first, size_t N) { +void UnboundedWSQ::bulk_push(I& first, size_t N) { if(N == 0) return; @@ -411,7 +411,7 @@ void UnboundedWSQ::bulk_push(I first, size_t N) { } for(size_t i=0; ipush(b++, first[i]); + a->push(b++, *first++); } std::atomic_thread_fence(std::memory_order_release); @@ -694,7 +694,7 @@ class BoundedWSQ { Only the owner thread can insert items into the queue. */ template - size_t try_bulk_push(I first, size_t N); + size_t try_bulk_push(I& first, size_t N); /** @brief pops out an item from the queue @@ -832,7 +832,7 @@ bool BoundedWSQ::try_push(O&& o) { // Function: try_bulk_push template template -size_t BoundedWSQ::try_bulk_push(I first, size_t N) { +size_t BoundedWSQ::try_bulk_push(I& first, size_t N) { if(N == 0) return 0; @@ -845,7 +845,7 @@ size_t BoundedWSQ::try_bulk_push(I first, size_t N) { if(n > 0) { // push n elements into the queue for(size_t i=0; i +class IntrusiveForwardList { + + public: + + // --- Iterator Subclass --- + class Iterator { + + public: + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using pointer = T*; + using reference = T&; + + explicit Iterator(T* ptr) : _ptr(ptr) {} + + reference operator*() const { return *_ptr; } + pointer operator->() { return _ptr; } + + Iterator& operator++() { + if (_ptr) _ptr = _ptr->next; + return *this; + } + + Iterator operator++(int) { + Iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const Iterator& other) const { return _ptr == other._ptr; } + bool operator!=(const Iterator& other) const { return _ptr != other._ptr; } + + private: + T* _ptr; + }; + + IntrusiveForwardList() = default; + + IntrusiveForwardList(const IntrusiveForwardList&) = delete; + IntrusiveForwardList& operator=(const IntrusiveForwardList&) = delete; + + IntrusiveForwardList(IntrusiveForwardList&& other) noexcept + : _head(other._head) { + other._head = nullptr; + } + + // 4. Move Assignment + IntrusiveForwardList& operator=(IntrusiveForwardList&& other) noexcept { + if (this != &other) { + // In a real std::forward_list, we might clear() here, + // but since we don't own the nodes, we just overwrite. + _head = other._head; + other._head = nullptr; + } + return *this; + } + + // Element Access + T& front() { return *_head; } + const T& front() const { return *_head; } + + // Iterators + Iterator begin() { return Iterator(_head); } + Iterator end() { return Iterator(nullptr); } + + // Capacity + bool empty() const { return _head == nullptr; } + + // Modifiers + void push_front(T* node) { + node->next = _head; + _head = node; + } + + void pop_front() { + if (_head) { + T* old_head = _head; + _head = _head->next; + old_head->next = nullptr; + } + } + + void clear() { + while (!empty()) pop_front(); + } + + // std::forward_list style insertion after a specific node + void insert_after(T* prev_node, T* new_node) { + if (!prev_node || !new_node) return; + new_node->next = prev_node->next; + prev_node->next = new_node; + } + + private: + + T* _head {nullptr}; +}; + +} // end of namespace tf ------------------------------------------------------------------------- diff --git a/unittests/test_wsq.cpp b/unittests/test_wsq.cpp index b2eb60b30..121b980f4 100644 --- a/unittests/test_wsq.cpp +++ b/unittests/test_wsq.cpp @@ -129,22 +129,26 @@ TEST_CASE("UnboundedWSQ.Resize") { std::vector data(2048); // insert an element - queue.bulk_push(data.data(), 1); + auto first = data.data(); + queue.bulk_push(first, 1); REQUIRE(queue.size() == 1); REQUIRE(queue.capacity() == 2); // insert 2 elements - queue.bulk_push(data.data(), 2); + first = data.data(); + queue.bulk_push(first, 2); REQUIRE(queue.size() == 3); REQUIRE(queue.capacity() == 4); // insert 10 elements - queue.bulk_push(data.data(), 10); + first = data.data(); + queue.bulk_push(first, 10); REQUIRE(queue.size() == 13); REQUIRE(queue.capacity() == 16); // insert 1200 elements - queue.bulk_push(data.data(), 1200); + first = data.data(); + queue.bulk_push(first, 1200); REQUIRE(queue.size() == 1213); REQUIRE(queue.capacity() == 2048); @@ -156,22 +160,26 @@ TEST_CASE("UnboundedWSQ.Resize") { REQUIRE(queue.empty() == true); // insert an element - queue.bulk_push(data.data(), 1); + first = data.data(); + queue.bulk_push(first, 1); REQUIRE(queue.size() == 1); REQUIRE(queue.capacity() == 2048); // insert 2 elements - queue.bulk_push(data.data(), 2); + first = data.data(); + queue.bulk_push(first, 2); REQUIRE(queue.size() == 3); REQUIRE(queue.capacity() == 2048); // insert 10 elements - queue.bulk_push(data.data(), 10); + first = data.data(); + queue.bulk_push(first, 10); REQUIRE(queue.size() == 13); REQUIRE(queue.capacity() == 2048); // insert 1200 elements - queue.bulk_push(data.data(), 1200); + first = data.data(); + queue.bulk_push(first, 1200); REQUIRE(queue.size() == 1213); REQUIRE(queue.capacity() == 2048); } @@ -349,8 +357,9 @@ void bounded_tsq_n_consumers_bulk_push(size_t M) { size_t capacity = queue.capacity(); REQUIRE((size == 0 && capacity > 0)); const size_t num_pushable_elements = std::min(capacity, N); - - REQUIRE(num_pushable_elements == queue.try_bulk_push(gold.data(), N)); + + auto first = gold.data(); + REQUIRE(num_pushable_elements == queue.try_bulk_push(first, N)); REQUIRE(queue.size() == num_pushable_elements); for(size_t i=0; i items; @@ -648,7 +659,8 @@ void unbounded_tsq_n_consumers_bulk_push(size_t M) { size_t capacity = queue.capacity(); REQUIRE((size == 0 && capacity > 0)); - queue.bulk_push(gold.data(), N); + auto first = gold.data(); + queue.bulk_push(first, N); REQUIRE(queue.size() == N); for(size_t i=0; i items; while(consumed != N) {