-
Notifications
You must be signed in to change notification settings - Fork 661
Expand file tree
/
Copy pathscheduler.h
More file actions
373 lines (334 loc) · 11.7 KB
/
scheduler.h
File metadata and controls
373 lines (334 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
// Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef DALI_CORE_EXEC_TASKING_SCHEDULER_H_
#define DALI_CORE_EXEC_TASKING_SCHEDULER_H_
#include <mutex>
#include <condition_variable>
#include <queue>
#include <utility>
#include <vector>
#include "dali/core/api_helper.h"
#include "dali/core/exec/tasking/task.h"
#include "dali/core/exec/tasking/sync.h"
#include "dali/core/semaphore.h"
#include "dali/core/spinlock.h"
namespace dali::tasking {
/** Represents a future result of a task.
*
* Like with std::future, this object can be used to wait for and obtain a result of a task.
* The difference is that it needs a Scheduler object which can be used to determine task's
* readiness.
* TaskFuture prolongs the life ot a task object and its result.
*/
class TaskFuture {
public:
TaskFuture(SharedTask task, TaskResults results)
: task_(std::move(task)), results_(std::move(results)) {}
/** Waits for a result of type T
*
* If the task throws, the exception rethrown here.
* If the task returns a value of a different type, std::bad_any_cast is thrown.
*/
template <typename T>
T Value() & {
Wait();
return results_.Value<T>();
}
/** Waits for a result of type T
*
* If the task throws, the exception rethrown here.
* If the task returns a value of a different type, std::bad_any_cast is thrown.
*/
template <typename T>
T Value() && {
static_assert(!std::is_reference_v<T>, "Returning a reference to a temporary");
Wait();
return results_.Value<T>();
}
/** Waits for a result and returns it as `std::any`
*
* If the task throws, the exception rethrown here.
* If the task returns `void`, the returned `any` is empty.
*/
decltype(auto) Value() & {
Wait();
return results_.Value();
}
/** Waits for a result and returns it as `std::any`
*
* If the task throws, the exception rethrown here.
* If the task returns `void`, the returned `any` is empty.
*/
auto Value() && {
Wait();
return results_.Value();
}
/** Waits for a result of type T
*
* If the task throws, the exception rethrown here.
* If the task returns a value of a different type, std::bad_any_cast is thrown.
*
* @param index The index of the return value of the task. See `num_results` in `Task::Create`
*/
template <typename T>
T Value(int index) & {
Wait();
return results_.Value<T>(index);
}
/** Waits for a result of type T
*
* If the task throws, the exception rethrown here.
* If the task returns a value of a different type, std::bad_any_cast is thrown.
*
* @param index The index of the return value of the task. See `num_results` in `Task::Create`
*/
template <typename T>
T Value(int index) && {
static_assert(!std::is_reference_v<T>, "Returning a reference to a temporary");
Wait();
return results_.Value<T>(index);
}
/** Waits for a result and returns it as `std::any`
*
* If the task throws, the exception rethrown here.
* If the task returns `void`, the returned `any` is empty.
*
* @param index The index of the return value of the task. See `num_results` in `Task::Create`
*/
decltype(auto) Value(int index) & {
Wait();
return results_.Value(index);
}
/** Waits for a result and returns it as `std::any`
*
* If the task throws, the exception rethrown here.
* If the task returns `void`, the returned `any` is empty.
*
* @param index The index of the return value of the task. See `num_results` in `Task::Create`
*/
auto Value(int index) && {
Wait();
return results_.Value(index);
}
private:
void Wait();
SharedTask task_;
TaskResults results_;
bool complete_ = false; // optimize multiple calls to Wait / Value
};
/** Determines the readiness and execution order of tasks.
*
* The scheduler manages tasks by identifying ready tasks and moving them from Pending to Ready
* state. The ready tasks can be Popped (at which point they transition to Running) and executed
* by an external entity (e.g. Executor).
*
* The scheduler is also the central entity that participates in notifications about the change
* of state of the objects. Whenever an object is signalled and may affect the readiness of
* tasks, scheduler is involved.
* Finally, scheduler is also used in determining task completion.
*
* In a typical scenario, it's used in the following calls:
* `AddTask`
* `AddSilentTask`
* `Wait`
* and in `Releasable::Release`.
*
* The scheduler implements deadlock mitigation by ensuring that objects are acquired only
* when all preconditions of a task can be met.
*
* Once a task is submitted to the scheduler, its shared pointer reference is increased and the
* caller doesn't need to maintain a copy of the task pointer.
*
* AddTask vs AddSilentTask
* AddTask produces a TaskFuture object. This object can be used to wait for the task and get its
* result. In case of tasks that do not produce final results, this future is not needed. In that
* case the overhead of creating a future object can be avoided by using AddSilentTask.
* The user can still Wait for tasks without a future object. The only difference is that
* the results of silent tasks are only accessible by the registered consumers of its outputs.
*/
class Scheduler {
struct TaskPriorityLess {
bool operator()(const SharedTask &a, const SharedTask &b) const {
return a->Priority() < b->Priority();
}
};
public:
virtual ~Scheduler() = default;
/** Removes a ready task with the highest priorty or waits for one to appear or
* for a shutdown notification.
*/
SharedTask Pop() {
queue_sem_.acquire();
std::lock_guard lock(queue_lock_);
if (ready_.empty()) {
assert(shutdown_requested_);
return nullptr;
}
auto ret = std::move(ready_.top());
assert(ret->state_ == TaskState::Ready);
ready_.pop();
ret->state_ = TaskState::Running;
return ret;
}
/** Submits a task for execution
*/
void AddSilentTask(SharedTask task) {
if (task->state_ != TaskState::New)
throw std::logic_error("A task can be submitted only once.");
AddTaskImpl(std::move(task));
}
/** Submits a task for execution and gets a Future which can be used to get the output value
*/
#if __cplusplus >= 201907L
[[nodiscard("Use AddSilentTask if the result is not needed")]]
#else
[[nodiscard]]
#endif
TaskFuture AddTask(SharedTask task) {
if (task->state_ != TaskState::New)
throw std::logic_error("A task can be submitted only once.");
auto res = task->results_;
AddTaskImpl(task);
return {std::move(task), std::move(res)};
}
/** Notifies the scheduler that a Waitable's state has changed and tasks waiting for it
* may become ready.
*/
void DLL_PUBLIC Notify(Waitable *w);
/** Waits for a task to complete. */
void Wait(const Task *task);
/** Waits for a task to complete. */
void Wait(const SharedTask &task) {
Wait(task.get());
}
/** Makes all Pop functions return with an error value. */
void Shutdown() {
std::lock_guard g(mtx_);
std::lock_guard lock(queue_lock_);
shutdown_requested_ = true;
task_done_.notify_all();
queue_sem_.release(NumThreads());
}
/** Checks whether a shutdown was requested. */
bool ShutdownRequested() const {
return shutdown_requested_;
}
private:
virtual int NumThreads() const { return 1; }
/** Moves the task to the ready queue if all of its preconditions can be acquired.
*
* This function atomically checks that all preconditions can be met and if so, acquires them.
* If the preconditions were met, the task is moved from the pending list to the ready queue.
*/
bool DLL_PUBLIC AcquireAllAndMoveToReady(SharedTask &task) noexcept;
void AddTaskImpl(SharedTask task) {
assert(task->state_ == TaskState::New);
task->Submit(*this);
if (task->Ready()) { // if the task has no preconditions...
{
// ...then we add it directly to the ready queue.
std::lock_guard lock(queue_lock_);
task->state_ = TaskState::Ready;
ready_.push(task);
}
queue_sem_.release();
} else {
// Otherwise, the task is added to the pending list
bool ready = false;
{
std::lock_guard lock(mtx_);
task->state_ = TaskState::Pending;
for (auto &pre : task->preconditions_) {
bool added = pre->AddToWaiting(task);
(void)added;
assert(added);
}
pending_.PushFront(task);
// ...and we check whether its preconditions are, in fact, met.
AcquireAllAndMoveToReady(task);
}
}
}
friend class Task;
counting_semaphore queue_sem_{0};
spinlock queue_lock_;
std::mutex mtx_;
std::condition_variable task_done_;
detail::TaskList pending_;
std::priority_queue<SharedTask, std::vector<SharedTask>, TaskPriorityLess> ready_;
bool shutdown_requested_ = false;
};
inline void Waitable::Notify(Scheduler &sched) {
sched.Notify(this);
}
inline void Task::Wait() const {
// Load the value of sched_ first....
Scheduler *sched = sched_;
// ...prevent the read of sched_ from being reordered
std::atomic_thread_fence(std::memory_order_acquire);
if (IsAcquirable())
return; // If the task is complete, the wait can exit immediately
// If the task is not complete, then the only way we may land with null sched is if the task was
// never submitted for execution.
if (sched == nullptr)
throw std::logic_error("The task is not associated with any scheduler.");
sched->Wait(this);
}
inline void Task::Run() {
// This function runs the payload, notifies the scheduler and clears the task object.
// After Run, the task can be used of only two things: it can be waited for and it can be
// suceeded.
assert(state_ == TaskState::Running);
assert(sched_ != nullptr);
// Run the payload
wrapped_(this);
// ... and get rid of it - it may be heavy
wrapped_ = {};
// Clear the results - all interested parties must have a shared pointer to the results.
results_.clear();
// We're done, the inputs are no longer necessary.
inputs_.clear();
// The task is complete - mark it as such and notify the scheduler
state_ = TaskState::Complete;
MarkAsComplete();
Notify(*sched_);
// If we have any waitables to release after run - now is the time
for (auto &r : release_) {
r->Release(*sched_);
}
release_.clear();
// Prevent clearing sched_ before everything above happens
std::atomic_thread_fence(std::memory_order_acq_rel);
// Finally, we should clear the scheduler pointer - the task object, after it's complete,
// may outlive its scheduler.
sched_ = nullptr;
}
inline void TaskFuture::Wait() {
if (!complete_) {
task_->Wait();
complete_ = true;
}
}
inline void Scheduler::Wait(const Task *task) {
std::unique_lock lock(mtx_);
if (task->state_ < TaskState::Pending)
throw std::logic_error("Cannot wait for a task that has not been submitted");
task_done_.wait(lock, [&]() { return task->IsAcquirable() || shutdown_requested_; });
if (!task->IsAcquirable()) {
assert(shutdown_requested_);
throw std::runtime_error("The scheduler was shut down before the task was completed.");
}
}
} // namespace dali::tasking
#endif // DALI_CORE_EXEC_TASKING_SCHEDULER_H_