From 82baf66e3e372a072e31043b27c81309528bb9d2 Mon Sep 17 00:00:00 2001 From: TSUNG-WEI HUANG Date: Thu, 19 Feb 2026 13:08:59 -0600 Subject: [PATCH 1/5] created a new graph_opt branch --- 3rd-party/tbb/cmake/TBBConfig.cmake | 4 ++-- taskflow/core/graph.hpp | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/3rd-party/tbb/cmake/TBBConfig.cmake b/3rd-party/tbb/cmake/TBBConfig.cmake index 1296f1543..3a5b3a26f 100644 --- a/3rd-party/tbb/cmake/TBBConfig.cmake +++ b/3rd-party/tbb/cmake/TBBConfig.cmake @@ -48,10 +48,10 @@ get_filename_component(_tbb_root "${_tbb_root}" PATH) foreach (_tbb_component ${TBB_FIND_COMPONENTS}) set(TBB_${_tbb_component}_FOUND 0) - set(_tbb_release_lib "/home/thuang295/Code/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_release/lib${_tbb_component}.so.2") + set(_tbb_release_lib "/home/thuang295/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_release/lib${_tbb_component}.so.2") if (NOT TBB_FIND_RELEASE_ONLY) - set(_tbb_debug_lib "/home/thuang295/Code/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_debug/lib${_tbb_component}_debug.so.2") + set(_tbb_debug_lib "/home/thuang295/taskflow/build/benchmarks/tbb_cmake_build/tbb_cmake_build_subdir_debug/lib${_tbb_component}_debug.so.2") endif() if (EXISTS "${_tbb_release_lib}" OR EXISTS "${_tbb_debug_lib}") diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index 11ca36bc0..bc9f5b4be 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -13,6 +13,7 @@ #include "../utility/small_vector.hpp" #include "../utility/serializer.hpp" #include "../utility/lazy_string.hpp" +#include "../utility/list.hpp" #include "error.hpp" #include "declarations.hpp" #include "semaphore.hpp" From 5192fb5f678b535b2dccaf536594d563e6792fa5 Mon Sep 17 00:00:00 2001 From: TSUNG-WEI HUANG Date: Thu, 19 Feb 2026 13:10:44 -0600 Subject: [PATCH 2/5] create a new graph_opt branch --- taskflow/utility/list.hpp | 85 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 taskflow/utility/list.hpp diff --git a/taskflow/utility/list.hpp b/taskflow/utility/list.hpp new file mode 100644 index 000000000..14ed85b63 --- /dev/null +++ b/taskflow/utility/list.hpp @@ -0,0 +1,85 @@ +#pragma once + +namespace tf { + +template +class IntrusiveForwardList { + + public: + + // --- Iterator Subclass --- + class Iterator { + + public: + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using pointer = T*; + using reference = T&; + + explicit Iterator(T* ptr) : _ptr(ptr) {} + + reference operator*() const { return *_ptr; } + pointer operator->() { return _ptr; } + + Iterator& operator++() { + if (_ptr) _ptr = _ptr->next; + return *this; + } + + Iterator operator++(int) { + Iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const Iterator& other) const { return _ptr == other._ptr; } + bool operator!=(const Iterator& other) const { return _ptr != other._ptr; } + + private: + T* _ptr; + }; + + IntrusiveForwardList(); + + // Element Access + T& front() { return *_head; } + const T& front() const { return *_head; } + + // Iterators + Iterator begin() { return Iterator(_head); } + Iterator end() { return Iterator(nullptr); } + + // Capacity + bool empty() const { return _head == nullptr; } + + // Modifiers + void push_front(T* node) { + node->next = _head; + _head = node; + } + + void pop_front() { + if (_head) { + T* old_head = _head; + _head = _head->next; + old_head->next = nullptr; + } + } + + void clear() { + while (!empty()) pop_front(); + } + + // std::forward_list style insertion after a specific node + void insert_after(T* prev_node, T* new_node) { + if (!prev_node || !new_node) return; + new_node->next = prev_node->next; + prev_node->next = new_node; + } + + private: + + T* _head {nullptr}; +}; + +} // end of namespace tf ------------------------------------------------------------------------- From 45cad41cb673abc08cf813f6d38ae0190a51fcc3 Mon Sep 17 00:00:00 2001 From: TSUNG-WEI HUANG Date: Thu, 19 Feb 2026 15:15:11 -0600 Subject: [PATCH 3/5] implemented singly linked list for graph storage --- examples/simple.cpp | 2 + taskflow/algorithm/module.hpp | 2 +- taskflow/core/declarations.hpp | 1 + taskflow/core/executor.hpp | 63 +++-------- taskflow/core/flow_builder.hpp | 16 +-- taskflow/core/graph.hpp | 195 +++++++++++++++++++++++++++------ taskflow/core/runtime.hpp | 2 +- taskflow/core/taskflow.hpp | 9 +- taskflow/core/wsq.hpp | 12 +- taskflow/utility/list.hpp | 21 +++- unittests/test_wsq.cpp | 44 +++++--- 11 files changed, 252 insertions(+), 115 deletions(-) diff --git a/examples/simple.cpp b/examples/simple.cpp index d2b9aeb70..4ed1d3a30 100644 --- a/examples/simple.cpp +++ b/examples/simple.cpp @@ -13,6 +13,8 @@ #include // the only include you need int main(){ + + std::cout << sizeof(tf::Node) << '\n'; tf::Executor executor; tf::Taskflow taskflow("simple"); diff --git a/taskflow/algorithm/module.hpp b/taskflow/algorithm/module.hpp index 2b257bb96..31bd8cd3b 100644 --- a/taskflow/algorithm/module.hpp +++ b/taskflow/algorithm/module.hpp @@ -17,7 +17,7 @@ auto Algorithm::make_module_task(T&& target) { return; } rt._executor._schedule_graph_with_parent( - rt._worker, graph.begin(), graph.end(), rt._node->_topology, rt._node + rt._worker, graph, rt._node->_topology, rt._node ); }; } diff --git a/taskflow/core/declarations.hpp b/taskflow/core/declarations.hpp index ab52f68fa..791a33906 100644 --- a/taskflow/core/declarations.hpp +++ b/taskflow/core/declarations.hpp @@ -7,6 +7,7 @@ namespace tf { // ---------------------------------------------------------------------------- class Algorithm; +class NodeBase; class Node; class Graph; class FlowBuilder; diff --git a/taskflow/core/executor.hpp b/taskflow/core/executor.hpp index dd731fc13..e3f12bce1 100644 --- a/taskflow/core/executor.hpp +++ b/taskflow/core/executor.hpp @@ -1161,14 +1161,10 @@ class Executor { bool _invoke_runtime_task_impl(Worker&, Node*, std::function&); bool _invoke_runtime_task_impl(Worker&, Node*, std::function&); - template - I _set_up_graph(I, I, Topology*, NodeBase*); - template void _corun_until(Worker&, P&&); - template - void _corun_graph(Worker&, Topology*, NodeBase*, I, I); + void _corun_graph(Worker&, Graph&, Topology*, NodeBase*); template void _bulk_schedule(Worker&, I, size_t); @@ -1182,8 +1178,7 @@ class Executor { template void _bulk_update_cache(Worker&, Node*&, Node*, std::array&, size_t&); - template - void _schedule_graph_with_parent(Worker&, I, I, Topology*, NodeBase*); + void _schedule_graph_with_parent(Worker&, Graph&, Topology*, NodeBase*); template auto _async(P&&, F&&, Topology*, NodeBase*); @@ -1568,7 +1563,7 @@ void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) { // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter // iterator implementation in std::vector than GCC/Clang. if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) { - _bulk_spill(first + n, num_nodes - n); + _bulk_spill(first, num_nodes - n); } _notifier.notify_n(num_nodes); @@ -1922,13 +1917,13 @@ inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) { // spawn the subflow if it is joinable and its graph is non-empty // implicit join is faster than Subflow::join as it does not involve corun - if(sf.joinable() && g.size()) { + if(sf.joinable() && !g.empty()) { // signal the executor to preempt this node node->_nstate |= NSTATE::PREEMPTED; // set up and schedule the graph - _schedule_graph_with_parent(worker, g.begin(), g.end(), node->_topology, node); + _schedule_graph_with_parent(worker, g, node->_topology, node); return true; } } @@ -1990,7 +1985,7 @@ inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& gra // signal the executor to preempt this node node->_nstate |= NSTATE::PREEMPTED; - _schedule_graph_with_parent(w, graph.begin(), graph.end(), node->_topology, node); + _schedule_graph_with_parent(w, graph, node->_topology, node); return true; } @@ -2229,22 +2224,21 @@ void Executor::corun(T& target) { } NodeBase anchor; - _corun_graph(*w, nullptr, &anchor, target.graph().begin(), target.graph().end()); + _corun_graph(*w, target.graph(), nullptr, &anchor); } // Procedure: _corun_graph -template -void Executor::_corun_graph(Worker& w, Topology* tpg, NodeBase* p, I first, I last) { +inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) { // empty graph - if(first == last) { + if(g.empty()) { return; } // anchor this parent as the blocking point { ExplicitAnchorGuard anchor(p); - _schedule_graph_with_parent(w, first, last, tpg, p); + _schedule_graph_with_parent(w, g, tpg, p); _corun_until(w, [p] () -> bool { return p->_join_counter.load(std::memory_order_acquire) == 0; } ); @@ -2276,48 +2270,23 @@ inline void Executor::wait_for_all() { } // Function: _schedule_graph_with_parent -template -void Executor::_schedule_graph_with_parent( - Worker& worker, I beg, I end, Topology* tpg, NodeBase* parent +inline void Executor::_schedule_graph_with_parent( + Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent ) { - size_t num_srcs = (_set_up_graph(beg, end, tpg, parent) - beg); + size_t num_srcs = graph._set_up(tpg, parent); parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed); - _bulk_schedule(worker, beg, num_srcs); + _bulk_schedule(worker, graph.begin(), num_srcs); } // Function: _set_up_topology inline void Executor::_set_up_topology(Worker* w, Topology* tpg) { // ---- under taskflow lock ---- auto& g = tpg->_taskflow._graph; - size_t num_srcs = (_set_up_graph(g.begin(), g.end(), tpg, tpg) - g.begin()); + size_t num_srcs = g._set_up(tpg, tpg); tpg->_join_counter.store(num_srcs, std::memory_order_relaxed); w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs); } -// Function: _set_up_graph -template -I Executor::_set_up_graph(I first, I last, Topology* tpg, NodeBase* parent) { - - auto send = first; - for(; first != last; ++first) { - - auto node = *first; - node->_topology = tpg; - node->_parent = parent; - node->_nstate = NSTATE::NONE; - node->_estate.store(ESTATE::NONE, std::memory_order_relaxed); - node->_set_up_join_counter(); - node->_exception_ptr = nullptr; - - // move source to the first partition - // root, root, root, v1, v2, v3, v4, ... - if(node->num_predecessors() == 0) { - std::iter_swap(send++, first); - } - } - return send; -} - // Function: _tear_down_topology inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) { @@ -2398,7 +2367,7 @@ inline void Subflow::join() { TF_THROW("subflow already joined"); } - _executor._corun_graph(_worker, _node->_topology, _node, _graph.begin(), _graph.end()); + _executor._corun_graph(_worker, _graph, _node->_topology, _node); // join here since corun graph may throw exception _node->_nstate |= NSTATE::JOINED_SUBFLOW; diff --git a/taskflow/core/flow_builder.hpp b/taskflow/core/flow_builder.hpp index b4effda69..d9270419a 100644 --- a/taskflow/core/flow_builder.hpp +++ b/taskflow/core/flow_builder.hpp @@ -1282,7 +1282,7 @@ inline FlowBuilder::FlowBuilder(Graph& graph) : // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1291,12 +1291,12 @@ Task FlowBuilder::emplace(C&& c) { template , void>*> Task FlowBuilder::emplace(C&& c) { if constexpr (std::is_invocable_v) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } else if constexpr (std::is_invocable_v) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1308,7 +1308,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1316,7 +1316,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1324,7 +1324,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: emplace template , void>*> Task FlowBuilder::emplace(C&& c) { - return Task(_graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + return Task(_graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, std::forward(c) )); } @@ -1332,7 +1332,7 @@ Task FlowBuilder::emplace(C&& c) { // Function: composed_of template Task FlowBuilder::composed_of(T& object) { - auto node = _graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + auto node = _graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{}, object ); return Task(node); @@ -1340,7 +1340,7 @@ Task FlowBuilder::composed_of(T& object) { // Function: placeholder inline Task FlowBuilder::placeholder() { - auto node = _graph._emplace_back(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, + auto node = _graph._emplace(NSTATE::NONE, ESTATE::NONE, DefaultTaskParams{}, nullptr, nullptr, 0, std::in_place_type_t{} ); return Task(node); diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index bc9f5b4be..a92444236 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -54,6 +54,28 @@ class Graph { friend class Executor; public: + + // --- Iterator Subclass --- + class Iterator { + + public: + using iterator_category = std::forward_iterator_tag; + + explicit Iterator(Node* ptr) : _ptr(ptr) {} + + auto operator*() const { return _ptr; } + auto operator*() { return _ptr; } + //pointer operator->() { return _ptr; } + + Iterator& operator++(); + Iterator operator++(int); + + bool operator==(const Iterator& other) const { return _ptr == other._ptr; } + bool operator!=(const Iterator& other) const { return _ptr != other._ptr; } + + private: + Node* _ptr; + }; /** @brief constructs the graph object @@ -93,7 +115,7 @@ class Graph { /** @brief returns the number of nodes in the graph */ - size_t size() const; + //size_t size() const; /** @brief queries the emptiness of the graph @@ -120,17 +142,23 @@ class Graph { */ auto end() const; + + private: - std::vector _nodes; + //std::vector _nodes; + + Node* _head {nullptr}; void _erase(Node*); + + size_t _set_up(Topology*, NodeBase*); /** @private */ template - Node* _emplace_back(ArgsT&&...); + Node* _emplace(ArgsT&&...); }; // ---------------------------------------------------------------------------- @@ -472,13 +500,15 @@ class Node : public NodeBase { Topology* _topology {nullptr}; + handle_t _handle; + size_t _num_successors {0}; SmallVector _edges; - - handle_t _handle; std::unique_ptr _semaphores; + Node* next {nullptr}; + bool _is_parent_cancelled() const; bool _is_conditioner() const; bool _acquire_all(SmallVector&); @@ -841,6 +871,17 @@ TF_FORCE_INLINE void recycle(Node* ptr) { // ---------------------------------------------------------------------------- // Graph definition // ---------------------------------------------------------------------------- + +inline Graph::Iterator& Graph::Iterator::operator++() { + if (_ptr) _ptr = _ptr->next; + return *this; +} + +inline Graph::Iterator Graph::Iterator::operator++(int) { + Iterator tmp = *this; + ++(*this); + return tmp; +} // Destructor inline Graph::~Graph() { @@ -849,79 +890,163 @@ inline Graph::~Graph() { // Move constructor inline Graph::Graph(Graph&& other) : - _nodes {std::move(other._nodes)} { + _head {other._head} { + other._head = nullptr; } // Move assignment inline Graph& Graph::operator = (Graph&& other) { clear(); - _nodes = std::move(other._nodes); + _head = other._head; + other._head = nullptr; return *this; } // Procedure: clear inline void Graph::clear() { - for(auto node : _nodes) { - recycle(node); + while(_head) { + auto old_head = _head; + _head = _head->next; + recycle(old_head); } - _nodes.clear(); } // Function: size -inline size_t Graph::size() const { - return _nodes.size(); -} +//inline size_t Graph::size() const { +// size_t count = 0; +// auto curr = _head; +// while (curr) { +// count++; +// curr = curr->next; +// } +// return count; +//} // Function: empty inline bool Graph::empty() const { - return _nodes.empty(); + return _head == nullptr; } // Function: begin inline auto Graph::begin() { - return _nodes.begin(); + return Iterator(_head); } // Function: end inline auto Graph::end() { - return _nodes.end(); + return Iterator(nullptr); } // Function: begin inline auto Graph::begin() const { - return _nodes.begin(); + return Iterator(_head); } // Function: end inline auto Graph::end() const { - return _nodes.end(); + return Iterator(nullptr); } // Function: erase -inline void Graph::_erase(Node* node) { - //erase( - // std::remove_if(begin(), end(), [&](auto& p){ return p.get() == node; }), - // end() - //); - _nodes.erase( - std::remove_if(_nodes.begin(), _nodes.end(), [&](auto& p){ - if(p == node) { - recycle(p); - return true; +inline void Graph::_erase(Node* tgt) { + + if (!_head || !tgt) { + return; + } + + // Case: Target is the head + if (_head == tgt) { + _head = _head->next; + //tgt->next = nullptr; + recycle(tgt); + return; + } + + // Case: Search for predecessor + auto current = _head; + while (current->next && current->next != tgt) { + current = current->next; + } + + // Bridge the gap + if (current->next == tgt) { + current->next = tgt->next; + //tgt->next = nullptr; + recycle(tgt); + } +} + +// Function: _set_up +inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { + + if (!_head) { + return 0; + } + + size_t num_srcs = 0; + + Node* zeroHead = nullptr; // Head of the "ready" list + Node* zeroTail = nullptr; // Tail of the "ready" list + Node* restHead = nullptr; // Head of the "busy" list + Node* restTail = nullptr; // Tail of the "busy" list + + Node* current = _head; + + while (current) { + + current->_topology = tpg; + current->_parent = parent; + current->_nstate = NSTATE::NONE; + current->_estate.store(ESTATE::NONE, std::memory_order_relaxed); + current->_set_up_join_counter(); + current->_exception_ptr = nullptr; + + auto nextNode = current->next; // Save next pointer before re-linking + current->next = nullptr; // Disconnect the current node + + if (current->num_predecessors() == 0) { + ++num_srcs; + // Append to the "zero" chain + if (!zeroHead) { + zeroHead = zeroTail = current; + } else { + zeroTail->next = current; + zeroTail = current; } - return false; - }), - _nodes.end() - ); + } else { + // Append to the "rest" chain + if (!restHead) { + restHead = restTail = current; + } else { + restTail->next = current; + restTail = current; + } + } + current = nextNode; + } + + // Stitch the two chains together + if (zeroHead) { + _head = zeroHead; + zeroTail->next = restHead; // Connect end of zeros to start of rest + } else { + _head = restHead; // No zeros found, head is just the rest + } + + return num_srcs; } /** @private */ template -Node* Graph::_emplace_back(ArgsT&&... args) { - _nodes.push_back(animate(std::forward(args)...)); - return _nodes.back(); +Node* Graph::_emplace(ArgsT&&... args) { + //_nodes.push_back(animate(std::forward(args)...)); + //return _nodes.back(); + auto node = animate(std::forward(args)...); + node->next = _head; + _head = node; + return _head; } // ---------------------------------------------------------------------------- diff --git a/taskflow/core/runtime.hpp b/taskflow/core/runtime.hpp index 73bf8ceee..ce931e2c4 100644 --- a/taskflow/core/runtime.hpp +++ b/taskflow/core/runtime.hpp @@ -993,7 +993,7 @@ tf::Future Executor::run_until(Taskflow&& f, P&& p, C&& c) { t->_parent->_join_counter.fetch_add(1, std::memory_order_release); if(g.object->_fetch_enqueue(t) == 0) { rt._executor._schedule_graph_with_parent( - rt._worker, g.object->_graph.begin(), g.object->_graph.end(), t.get(), t.get() + rt._worker, g.object->_graph, t.get(), t.get() ); } }); diff --git a/taskflow/core/taskflow.hpp b/taskflow/core/taskflow.hpp index 533465c1b..e6b871a6d 100644 --- a/taskflow/core/taskflow.hpp +++ b/taskflow/core/taskflow.hpp @@ -374,7 +374,14 @@ inline void Taskflow::clear() { // Function: num_tasks inline size_t Taskflow::num_tasks() const { - return _graph.size(); + //return _graph.size(); + size_t count = 0; + auto curr = _graph._head; + while (curr) { + count++; + curr = curr->next; + } + return count; } // Function: empty diff --git a/taskflow/core/wsq.hpp b/taskflow/core/wsq.hpp index d02e3a16c..2da7c6593 100644 --- a/taskflow/core/wsq.hpp +++ b/taskflow/core/wsq.hpp @@ -238,7 +238,7 @@ class UnboundedWSQ { Only the owner thread can insert an item to the queue. */ template - void bulk_push(I first, size_t N); + void bulk_push(I& first, size_t N); /** @brief pops out an item from the queue @@ -397,7 +397,7 @@ void UnboundedWSQ::push(T o) { // Function: bulk_push template template -void UnboundedWSQ::bulk_push(I first, size_t N) { +void UnboundedWSQ::bulk_push(I& first, size_t N) { if(N == 0) return; @@ -411,7 +411,7 @@ void UnboundedWSQ::bulk_push(I first, size_t N) { } for(size_t i=0; ipush(b++, first[i]); + a->push(b++, *first++); } std::atomic_thread_fence(std::memory_order_release); @@ -694,7 +694,7 @@ class BoundedWSQ { Only the owner thread can insert items into the queue. */ template - size_t try_bulk_push(I first, size_t N); + size_t try_bulk_push(I& first, size_t N); /** @brief pops out an item from the queue @@ -832,7 +832,7 @@ bool BoundedWSQ::try_push(O&& o) { // Function: try_bulk_push template template -size_t BoundedWSQ::try_bulk_push(I first, size_t N) { +size_t BoundedWSQ::try_bulk_push(I& first, size_t N) { if(N == 0) return 0; @@ -845,7 +845,7 @@ size_t BoundedWSQ::try_bulk_push(I first, size_t N) { if(n > 0) { // push n elements into the queue for(size_t i=0; i data(2048); // insert an element - queue.bulk_push(data.data(), 1); + auto first = data.data(); + queue.bulk_push(first, 1); REQUIRE(queue.size() == 1); REQUIRE(queue.capacity() == 2); // insert 2 elements - queue.bulk_push(data.data(), 2); + first = data.data(); + queue.bulk_push(first, 2); REQUIRE(queue.size() == 3); REQUIRE(queue.capacity() == 4); // insert 10 elements - queue.bulk_push(data.data(), 10); + first = data.data(); + queue.bulk_push(first, 10); REQUIRE(queue.size() == 13); REQUIRE(queue.capacity() == 16); // insert 1200 elements - queue.bulk_push(data.data(), 1200); + first = data.data(); + queue.bulk_push(first, 1200); REQUIRE(queue.size() == 1213); REQUIRE(queue.capacity() == 2048); @@ -156,22 +160,26 @@ TEST_CASE("UnboundedWSQ.Resize") { REQUIRE(queue.empty() == true); // insert an element - queue.bulk_push(data.data(), 1); + first = data.data(); + queue.bulk_push(first, 1); REQUIRE(queue.size() == 1); REQUIRE(queue.capacity() == 2048); // insert 2 elements - queue.bulk_push(data.data(), 2); + first = data.data(); + queue.bulk_push(first, 2); REQUIRE(queue.size() == 3); REQUIRE(queue.capacity() == 2048); // insert 10 elements - queue.bulk_push(data.data(), 10); + first = data.data(); + queue.bulk_push(first, 10); REQUIRE(queue.size() == 13); REQUIRE(queue.capacity() == 2048); // insert 1200 elements - queue.bulk_push(data.data(), 1200); + first = data.data(); + queue.bulk_push(first, 1200); REQUIRE(queue.size() == 1213); REQUIRE(queue.capacity() == 2048); } @@ -349,8 +357,9 @@ void bounded_tsq_n_consumers_bulk_push(size_t M) { size_t capacity = queue.capacity(); REQUIRE((size == 0 && capacity > 0)); const size_t num_pushable_elements = std::min(capacity, N); - - REQUIRE(num_pushable_elements == queue.try_bulk_push(gold.data(), N)); + + auto first = gold.data(); + REQUIRE(num_pushable_elements == queue.try_bulk_push(first, N)); REQUIRE(queue.size() == num_pushable_elements); for(size_t i=0; i items; @@ -648,7 +659,8 @@ void unbounded_tsq_n_consumers_bulk_push(size_t M) { size_t capacity = queue.capacity(); REQUIRE((size == 0 && capacity > 0)); - queue.bulk_push(gold.data(), N); + auto first = gold.data(); + queue.bulk_push(first, N); REQUIRE(queue.size() == N); for(size_t i=0; i items; while(consumed != N) { From 2c4ab123683807720f5d4aef5428f244f75962e4 Mon Sep 17 00:00:00 2001 From: TSUNG-WEI HUANG Date: Thu, 19 Feb 2026 23:22:19 -0600 Subject: [PATCH 4/5] optimized set_up_graph with branchless implementation --- taskflow/core/graph.hpp | 73 +++++++++++++++++++++++++++++--------- taskflow/core/taskflow.hpp | 9 +---- 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index a92444236..9adb44dfd 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -115,7 +115,7 @@ class Graph { /** @brief returns the number of nodes in the graph */ - //size_t size() const; + size_t size() const; /** @brief queries the emptiness of the graph @@ -142,13 +142,12 @@ class Graph { */ auto end() const; - - private: //std::vector _nodes; - Node* _head {nullptr}; + Node* _head {nullptr}; + size_t _size {0}; void _erase(Node*); @@ -890,15 +889,19 @@ inline Graph::~Graph() { // Move constructor inline Graph::Graph(Graph&& other) : - _head {other._head} { + _head {other._head}, + _size {other._size} { other._head = nullptr; + other._size = 0; } // Move assignment inline Graph& Graph::operator = (Graph&& other) { clear(); _head = other._head; + _size = other._size; other._head = nullptr; + other._size = 0; return *this; } @@ -909,22 +912,18 @@ inline void Graph::clear() { _head = _head->next; recycle(old_head); } + _size = 0; } // Function: size -//inline size_t Graph::size() const { -// size_t count = 0; -// auto curr = _head; -// while (curr) { -// count++; -// curr = curr->next; -// } -// return count; -//} +inline size_t Graph::size() const { + return _size; +} // Function: empty inline bool Graph::empty() const { - return _head == nullptr; + // here we don't do _head == nullptr as it can cause data race with set_up + return _size == 0; } // Function: begin @@ -959,6 +958,7 @@ inline void Graph::_erase(Node* tgt) { _head = _head->next; //tgt->next = nullptr; recycle(tgt); + --_size; return; } @@ -973,6 +973,7 @@ inline void Graph::_erase(Node* tgt) { current->next = tgt->next; //tgt->next = nullptr; recycle(tgt); + --_size; } } @@ -983,6 +984,45 @@ inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { return 0; } + size_t num_srcs = 0; + + Node* head[2] = {nullptr, nullptr}; // 0 = rest, 1 = zero + Node** tail[2] = {&head[0], &head[1]}; + + Node* current = _head; + + while (current) { + + current->_topology = tpg; + current->_parent = parent; + current->_nstate = NSTATE::NONE; + current->_estate.store(ESTATE::NONE, std::memory_order_relaxed); + current->_set_up_join_counter(); + current->_exception_ptr = nullptr; + + Node* next = current->next; + current->next = nullptr; // detach immediately (important) + + const size_t is_zero = (current->num_predecessors() == 0); + num_srcs += is_zero; + + *tail[is_zero] = current; + tail [is_zero] = ¤t->next; + + current = next; + } + + // stitch zero list before rest list + *tail[1] = head[0]; + _head = head[1] ? head[1] : head[0]; + + return num_srcs; + //////// +/* + if (!_head) { + return 0; + } + size_t num_srcs = 0; Node* zeroHead = nullptr; // Head of the "ready" list @@ -1033,7 +1073,7 @@ inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { _head = restHead; // No zeros found, head is just the rest } - return num_srcs; + return num_srcs;*/ } /** @@ -1046,6 +1086,7 @@ Node* Graph::_emplace(ArgsT&&... args) { auto node = animate(std::forward(args)...); node->next = _head; _head = node; + ++_size; return _head; } diff --git a/taskflow/core/taskflow.hpp b/taskflow/core/taskflow.hpp index e6b871a6d..533465c1b 100644 --- a/taskflow/core/taskflow.hpp +++ b/taskflow/core/taskflow.hpp @@ -374,14 +374,7 @@ inline void Taskflow::clear() { // Function: num_tasks inline size_t Taskflow::num_tasks() const { - //return _graph.size(); - size_t count = 0; - auto curr = _graph._head; - while (curr) { - count++; - curr = curr->next; - } - return count; + return _graph.size(); } // Function: empty From f7ea4fd90d4379e075bc1bb85229868b5ce0ce6d Mon Sep 17 00:00:00 2001 From: TSUNG-WEI HUANG Date: Fri, 20 Feb 2026 12:43:19 -0600 Subject: [PATCH 5/5] refactored graph --- taskflow/core/graph.hpp | 61 ++--------------------------------------- 1 file changed, 2 insertions(+), 59 deletions(-) diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index 9adb44dfd..77f425c0e 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -986,7 +986,7 @@ inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { size_t num_srcs = 0; - Node* head[2] = {nullptr, nullptr}; // 0 = rest, 1 = zero + Node* head[2] = {nullptr, nullptr}; // 0 = rest, 1 = zero Node** tail[2] = {&head[0], &head[1]}; Node* current = _head; @@ -1003,7 +1003,7 @@ inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { Node* next = current->next; current->next = nullptr; // detach immediately (important) - const size_t is_zero = (current->num_predecessors() == 0); + size_t is_zero = (current->num_predecessors() == 0); num_srcs += is_zero; *tail[is_zero] = current; @@ -1017,63 +1017,6 @@ inline size_t Graph::_set_up(Topology* tpg, NodeBase* parent) { _head = head[1] ? head[1] : head[0]; return num_srcs; - //////// -/* - if (!_head) { - return 0; - } - - size_t num_srcs = 0; - - Node* zeroHead = nullptr; // Head of the "ready" list - Node* zeroTail = nullptr; // Tail of the "ready" list - Node* restHead = nullptr; // Head of the "busy" list - Node* restTail = nullptr; // Tail of the "busy" list - - Node* current = _head; - - while (current) { - - current->_topology = tpg; - current->_parent = parent; - current->_nstate = NSTATE::NONE; - current->_estate.store(ESTATE::NONE, std::memory_order_relaxed); - current->_set_up_join_counter(); - current->_exception_ptr = nullptr; - - auto nextNode = current->next; // Save next pointer before re-linking - current->next = nullptr; // Disconnect the current node - - if (current->num_predecessors() == 0) { - ++num_srcs; - // Append to the "zero" chain - if (!zeroHead) { - zeroHead = zeroTail = current; - } else { - zeroTail->next = current; - zeroTail = current; - } - } else { - // Append to the "rest" chain - if (!restHead) { - restHead = restTail = current; - } else { - restTail->next = current; - restTail = current; - } - } - current = nextNode; - } - - // Stitch the two chains together - if (zeroHead) { - _head = zeroHead; - zeroTail->next = restHead; // Connect end of zeros to start of rest - } else { - _head = restHead; // No zeros found, head is just the rest - } - - return num_srcs;*/ } /**