From 47154c27b0afa90b99fdc43ab1e21089eba2c1ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Thu, 10 Jul 2025 10:24:16 +0200 Subject: [PATCH 1/3] Use semaphore in tasking::Scheduler. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- dali/core/exec/tasking/scheduler.cc | 22 +++++++++--------- include/dali/core/exec/tasking/executor.h | 4 ++++ include/dali/core/exec/tasking/scheduler.h | 26 +++++++++++++--------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/dali/core/exec/tasking/scheduler.cc b/dali/core/exec/tasking/scheduler.cc index 2c225d0e226..de12a3adaea 100644 --- a/dali/core/exec/tasking/scheduler.cc +++ b/dali/core/exec/tasking/scheduler.cc @@ -38,7 +38,11 @@ bool Scheduler::AcquireAllAndMoveToReady(SharedTask &task) noexcept { task->preconditions_.clear(); task->state_ = TaskState::Ready; pending_.Remove(task); - ready_.push(std::move(task)); + { + std::lock_guard lock(queue_lock_); + ready_.push(std::move(task)); + } + queue_sem_.release(); return true; } @@ -46,7 +50,6 @@ void Scheduler::Notify(Waitable *w) { bool is_completion_event = dynamic_cast(w) != nullptr; bool is_task = is_completion_event && dynamic_cast(w); - int new_ready = 0; { std::lock_guard g(mtx_); if (is_task) @@ -88,22 +91,19 @@ void Scheduler::Notify(Waitable *w) { if (task->Ready()) { pending_.Remove(task); task->state_ = TaskState::Ready; - ready_.push(std::move(task)); - new_ready++; + { + std::lock_guard lock(queue_lock_); + ready_.push(std::move(task)); + } + queue_sem_.release(); // OK, the task is ready, we're done with it continue; } } - if (AcquireAllAndMoveToReady(task)) - new_ready++; + AcquireAllAndMoveToReady(task); } } - - if (new_ready == 1) - this->task_ready_.notify_one(); - else if (new_ready > 1) - this->task_ready_.notify_all(); } } // namespace dali::tasking diff --git a/include/dali/core/exec/tasking/executor.h b/include/dali/core/exec/tasking/executor.h index 4079840a49a..d113dea760d 100644 --- a/include/dali/core/exec/tasking/executor.h +++ b/include/dali/core/exec/tasking/executor.h @@ -70,6 +70,10 @@ class Executor : public Scheduler { } private: + int NumThreads() const override { + return num_threads_; + } + bool started_ = false; void RunWorker() { diff --git a/include/dali/core/exec/tasking/scheduler.h b/include/dali/core/exec/tasking/scheduler.h index c81d7f9b1d6..eb8709425f8 100644 --- a/include/dali/core/exec/tasking/scheduler.h +++ b/include/dali/core/exec/tasking/scheduler.h @@ -23,6 +23,8 @@ #include "dali/core/api_helper.h" #include "dali/core/exec/tasking/task.h" #include "dali/core/exec/tasking/sync.h" +#include "dali/core/semaphore.h" +#include "dali/core/spinlock.h" namespace dali::tasking { @@ -182,8 +184,8 @@ class Scheduler { * for a shutdown notification. */ SharedTask Pop() { - std::unique_lock lock(mtx_); - task_ready_.wait(lock, [&]() { return !ready_.empty() || shutdown_requested_; }); + queue_sem_.acquire(); + std::lock_guard lock(queue_lock_); if (ready_.empty()) { assert(shutdown_requested_); return nullptr; @@ -234,9 +236,10 @@ class Scheduler { /** Makes all Pop functions return with an error value. */ void Shutdown() { std::lock_guard g(mtx_); + std::lock_guard lock(queue_lock_); shutdown_requested_ = true; - task_ready_.notify_all(); task_done_.notify_all(); + queue_sem_.release(NumThreads()); } /** Checks whether a shutdown was requested. */ @@ -245,10 +248,12 @@ class Scheduler { } private: + virtual int NumThreads() const { return 1; } + /** Moves the task to the ready queue if all of its preconditions can be acquired. * * This function atomically checks that all preconditions can be met and if so, acquires them. - * If the preconditions where met, the task is moved from the pending list to the ready queue. + * If the preconditions were met, the task is moved from the pending list to the ready queue. */ bool DLL_PUBLIC AcquireAllAndMoveToReady(SharedTask &task) noexcept; @@ -258,11 +263,11 @@ class Scheduler { if (task->Ready()) { // if the task has no preconditions... { // ...then we add it directly to the ready queue. - std::lock_guard lock(mtx_); + std::lock_guard lock(queue_lock_); task->state_ = TaskState::Ready; ready_.push(task); } - task_ready_.notify_one(); + queue_sem_.release(); } else { // Otherwise, the task is added to the pending list bool ready = false; @@ -276,17 +281,18 @@ class Scheduler { } pending_.PushFront(task); // ...and we check whether its preconditions are, in fact, met. - ready = AcquireAllAndMoveToReady(task); + AcquireAllAndMoveToReady(task); } - if (ready) - task_ready_.notify_one(); + } } friend class Task; + counting_semaphore queue_sem_{0}; + spinlock queue_lock_; std::mutex mtx_; - std::condition_variable task_ready_, task_done_; + std::condition_variable task_done_; detail::TaskList pending_; std::priority_queue, TaskPriorityLess> ready_; From bf8e73b4f38b3dccdebc1cd1f086c343bf7b3b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Fri, 11 Jul 2025 12:34:31 +0200 Subject: [PATCH 2/3] Lint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- include/dali/core/exec/tasking/scheduler.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/dali/core/exec/tasking/scheduler.h b/include/dali/core/exec/tasking/scheduler.h index eb8709425f8..e52e8167e5c 100644 --- a/include/dali/core/exec/tasking/scheduler.h +++ b/include/dali/core/exec/tasking/scheduler.h @@ -283,7 +283,6 @@ class Scheduler { // ...and we check whether its preconditions are, in fact, met. AcquireAllAndMoveToReady(task); } - } } From 85d579eefeb0acdf2661a7cb28734037ed75bddc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Fri, 11 Jul 2025 13:12:13 +0200 Subject: [PATCH 3/3] Add virtual dtor. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Zientkiewicz --- include/dali/core/exec/tasking/scheduler.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/dali/core/exec/tasking/scheduler.h b/include/dali/core/exec/tasking/scheduler.h index e52e8167e5c..cbe6b46f94f 100644 --- a/include/dali/core/exec/tasking/scheduler.h +++ b/include/dali/core/exec/tasking/scheduler.h @@ -180,6 +180,8 @@ class Scheduler { }; public: + virtual ~Scheduler() = default; + /** Removes a ready task with the highest priorty or waits for one to appear or * for a shutdown notification. */