Skip to content

Commit cd15067

Browse files
committed
Use new thread pool in NvImgCodec
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
1 parent d5f32ed commit cd15067

File tree

3 files changed

+170
-26
lines changed

3 files changed

+170
-26
lines changed

dali/operators/imgcodec/image_decoder.h

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "dali/pipeline/operator/checkpointing/stateless_operator.h"
3232
#include "dali/pipeline/operator/common.h"
3333
#include "dali/pipeline/operator/operator.h"
34+
#include "dali/pipeline/util/new_thread_pool.h"
3435

3536
#if not(WITH_DYNAMIC_NVIMGCODEC_ENABLED)
3637
nvimgcodecStatus_t get_libjpeg_turbo_extension_desc(nvimgcodecExtensionDesc_t *ext_desc);
@@ -75,6 +76,17 @@ constexpr uint32_t verbosity_to_severity(int verbose) {
7576
static constexpr size_t kDevAlignment = 256; // warp alignment for 32x64-bit
7677
static constexpr size_t kHostAlignment = 64; // cache alignment
7778

79+
inline std::mutex &print_mutex() {
80+
static std::mutex mtx;
81+
return mtx;
82+
}
83+
84+
template <typename... Args>
85+
inline void sync_print(Args &&... args) {
86+
std::lock_guard g(print_mutex());
87+
print(std::cout, std::forward<Args>(args)...);
88+
}
89+
7890
inline int static_dali_device_malloc(void *ctx, void **ptr, size_t size, cudaStream_t stream) {
7991
auto *mr = static_cast<mm::device_async_resource *>(ctx);
8092
try {
@@ -228,9 +240,9 @@ class ImageDecoder : public StatelessOperator<Backend> {
228240
num_threads_ = spec.GetArgument<int>("num_threads");
229241
GetDecoderSpecificArguments(spec);
230242

243+
thread_pool_ = std::make_unique<NewThreadPool>(num_threads_, device_id_,
244+
spec.GetArgument<bool>("affine"), "MixedDecoder");
231245
if (std::is_same<MixedBackend, Backend>::value) {
232-
thread_pool_ = std::make_unique<ThreadPool>(num_threads_, device_id_,
233-
spec.GetArgument<bool>("affine"), "MixedDecoder");
234246
if (spec_.HasArgument("cache_size"))
235247
cache_ = std::make_unique<CachedDecoderImpl>(spec_);
236248
}
@@ -410,23 +422,31 @@ class ImageDecoder : public StatelessOperator<Backend> {
410422
nvimgcodecStatus_t schedule(int device_id, int sample_idx, void *task_context,
411423
void (*task)(int thread_id, int sample_idx, void *task_context)) {
412424
assert(tp_);
413-
nvimgcodec_scheduled_tasks_.emplace_back([=](int tid) { task(tid, sample_idx, task_context); });
425+
nvimgcodec_scheduled_tasks_.emplace_back([=]() {
426+
task(NewThreadPool::this_thread_idx(), sample_idx, task_context);
427+
});
414428
return NVIMGCODEC_STATUS_SUCCESS;
415429
}
416430

417431
nvimgcodecStatus_t run(int device_id) {
418432
assert(tp_);
433+
sync_print("Scheduling from thread ", std::this_thread::get_id(), "\n");
434+
if (!job_)
435+
job_.emplace();
419436
for (int i = 0; i < static_cast<int>(nvimgcodec_scheduled_tasks_.size()); i++) {
420-
tp_->AddWork(std::move(nvimgcodec_scheduled_tasks_[i]), -i);
437+
job_->AddTask(std::move(nvimgcodec_scheduled_tasks_[i]));
421438
}
422439
nvimgcodec_scheduled_tasks_.clear();
423-
tp_->RunAll(false);
440+
job_->Run(*tp_, false);
424441
return NVIMGCODEC_STATUS_SUCCESS;
425442
}
426443

427444
nvimgcodecStatus_t wait(int device_id) {
428-
assert(tp_);
429-
tp_->WaitForWork();
445+
if (job_) {
446+
sync_print("Waiting in thread ", std::this_thread::get_id(), "\n");
447+
job_->Wait();
448+
job_.reset();
449+
}
430450
return NVIMGCODEC_STATUS_SUCCESS;
431451
}
432452

@@ -525,8 +545,8 @@ class ImageDecoder : public StatelessOperator<Backend> {
525545
throw std::runtime_error(make_string("Invalid sample_type: ", sample_type));
526546
}
527547

528-
ThreadPool *GetThreadPool(const Workspace &ws) {
529-
return std::is_same<MixedBackend, Backend>::value ? thread_pool_.get() : &ws.GetThreadPool();
548+
NewThreadPool *GetThreadPool(const Workspace &ws) {
549+
return thread_pool_.get();
530550
}
531551

532552
bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
@@ -672,7 +692,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
672692
TensorListShape<> out_shape(nsamples, 3);
673693

674694
const bool use_cache = cache_ && cache_->IsCacheEnabled() && dtype_ == DALI_UINT8;
675-
auto setup_block = [&](int block_idx, int nblocks, int tid) {
695+
auto setup_block = [&](int block_idx, int nblocks) {
676696
int i_start = nsamples * block_idx / nblocks;
677697
int i_end = nsamples * (block_idx + 1) / nblocks;
678698
DomainTimeRange tr("Setup #" + std::to_string(block_idx) + "/" + std::to_string(nblocks),
@@ -751,25 +771,26 @@ class ImageDecoder : public StatelessOperator<Backend> {
751771

752772
if (ntasks < 2) {
753773
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
754-
setup_block(0, 1, -1); // run all in current thread
774+
setup_block(0, 1); // run all in current thread
755775
} else {
776+
Job job;
756777
int block_idx = 0;
757778
atomic_idx_.store(0);
758-
auto setup_task = [&, nblocks](int tid) {
779+
auto setup_task = [&, nblocks]() {
759780
DomainTimeRange tr("Setup", DomainTimeRange::kOrange);
760781
int block_idx;
761782
while ((block_idx = atomic_idx_.fetch_add(1)) < nblocks) {
762-
setup_block(block_idx, nblocks, tid);
783+
setup_block(block_idx, nblocks);
763784
}
764785
};
765786

766787
for (int task_idx = 0; task_idx < ntasks - 1; task_idx++) {
767-
tp_->AddWork(setup_task, -task_idx);
788+
job.AddTask(setup_task, -task_idx);
768789
}
769790
assert(ntasks >= 2);
770-
tp_->RunAll(false); // start work but not wait
771-
setup_task(-1); // last task in current thread
772-
tp_->WaitForWork(); // wait for the other threads
791+
job.Run(*tp_, false); // start work but not wait
792+
setup_task(); // last task in current thread
793+
job.Wait(); // wait for the other threads
773794
}
774795

775796
// Allocate the memory for the outputs...
@@ -844,7 +865,11 @@ class ImageDecoder : public StatelessOperator<Backend> {
844865
// before it issues stream synchronization with the user stream. Even if we didn't have that
845866
// race, we probably want to wait for all threads to finish anyway because we can't
846867
// guarantee that the thread pool from the workspace outlives RunImplImpl call.
847-
tp_->WaitForWork();
868+
if (job_) {
869+
sync_print("Waiting in thread ", std::this_thread::get_id(), "\n");
870+
job_->Wait();
871+
job_.reset();
872+
}
848873
}
849874
if (decode_status_size != nsamples_decode)
850875
throw std::runtime_error("Failed to run decoder");
@@ -857,12 +882,13 @@ class ImageDecoder : public StatelessOperator<Backend> {
857882
}
858883
}
859884
if (any_need_processing) {
885+
Job job;
860886
for (size_t idx = 0; idx < nsamples_decode; idx++) {
861887
size_t orig_idx = decode_sample_idxs_[idx];
862888
auto st_ptr = state_[orig_idx].get();
863889
if (st_ptr->need_processing) {
864-
tp_->AddWork(
865-
[&, out = output[orig_idx], st_ptr, orig_idx](int tid) {
890+
job.AddTask(
891+
[&, out = output[orig_idx], st_ptr, orig_idx]() {
866892
DomainTimeRange tr(make_string("Convert #", orig_idx), DomainTimeRange::kOrange);
867893
auto &st = *st_ptr;
868894
if constexpr (std::is_same<MixedBackend, Backend>::value) {
@@ -876,11 +902,10 @@ class ImageDecoder : public StatelessOperator<Backend> {
876902
st.req_layout, st.orig_img_type, ROI{}, nvimgcodecOrientation_t{});
877903
st.host_buf.reset();
878904
}
879-
},
880-
-idx);
905+
}, -idx);
881906
}
882907
}
883-
tp_->RunAll(true);
908+
job.Run(*tp_, true);
884909
}
885910
}
886911

@@ -904,7 +929,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
904929
}
905930
}
906931

907-
std::unique_ptr<ThreadPool> thread_pool_;
932+
std::unique_ptr<NewThreadPool> thread_pool_;
908933
std::unique_ptr<CachedDecoderImpl> cache_;
909934

910935
NvImageCodecInstance instance_ = {};
@@ -934,7 +959,8 @@ class ImageDecoder : public StatelessOperator<Backend> {
934959
bool use_orientation_ = true;
935960
int max_batch_size_ = 1;
936961
int num_threads_ = -1;
937-
ThreadPool *tp_ = nullptr;
962+
NewThreadPool *tp_ = nullptr;
963+
std::optional<IncrementalJob> job_;
938964
std::vector<std::unique_ptr<SampleState>> state_;
939965
std::vector<nvimgcodecCodeStream_t> batch_encoded_streams_;
940966
std::vector<nvimgcodecImage_t> batch_images_;
@@ -950,7 +976,7 @@ class ImageDecoder : public StatelessOperator<Backend> {
950976
std::vector<nvimgcodecExtensionDesc_t> extensions_descs_;
951977
std::vector<nvimgcodecExtension_t> extensions_;
952978

953-
std::vector<std::function<void(int)>> nvimgcodec_scheduled_tasks_;
979+
std::vector<std::function<void()>> nvimgcodec_scheduled_tasks_;
954980
};
955981

956982
} // namespace imgcodec
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <typeinfo>
16+
#include "dali/pipeline/util/new_thread_pool.h"
17+
#include "dali/core/device_guard.h"
18+
#include "dali/util/nvml.h"
19+
#include "dali/core/nvtx.h"
20+
21+
namespace dali {
22+
23+
NewThreadPool::NewThreadPool(
24+
int num_threads,
25+
std::optional<int> device_id,
26+
bool set_affinity,
27+
std::string name)
28+
: name_(name) {
29+
if (device_id.has_value() && *device_id == CPU_ONLY_DEVICE_ID)
30+
device_id = std::nullopt;
31+
#if NVML_ENABLED
32+
// We use NVML only for setting thread affinity
33+
if (device_id.has_value() && set_affinity) {
34+
nvml_handle_ = nvml::NvmlInstance::CreateNvmlInstance();
35+
}
36+
#endif
37+
Init(num_threads, [=, this](int thread_idx) {
38+
return OnThreadStart(thread_idx, set_affinity);
39+
});
40+
}
41+
42+
std::any NewThreadPool::OnThreadStart(int thread_idx, bool set_affinity) {
43+
std::string name = make_string("[DALI][NT", thread_idx, "]", name);
44+
SetThreadName(name.c_str());
45+
std::any dg;
46+
if (device_id_.has_value())
47+
dg.emplace<DeviceGuard>(*device_id_);
48+
#if NVML_ENABLED
49+
try {
50+
if (set_affinity) {
51+
const char *env_affinity = std::getenv("DALI_AFFINITY_MASK");
52+
int core = -1;
53+
if (env_affinity) {
54+
const auto &vec = string_split(env_affinity, ',');
55+
if ((size_t)thread_idx < vec.size()) {
56+
core = std::stoi(vec[thread_idx]);
57+
} else {
58+
DALI_WARN("DALI_AFFINITY_MASK environment variable is set, "
59+
"but does not have enough entries: thread_id (", thread_idx,
60+
") vs #entries (", vec.size(), "). Ignoring...");
61+
}
62+
}
63+
nvml::SetCPUAffinity(core);
64+
}
65+
} catch (const std::exception &e) {
66+
DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"",
67+
name_, "\". Exception ", typeid(e).name(), ": ", e.what());
68+
} catch (...) {
69+
DALI_WARN("Couldn't set thread affinity in thread ", thread_idx, " of thread pool \"",
70+
name_, "\". Unknown error.");
71+
}
72+
#endif
73+
return dg;
74+
}
75+
76+
} // namespace dali
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <optional>
16+
#include <string>
17+
#include "dali/core/exec/thread_pool_base.h"
18+
#if NVML_ENABLED
19+
#include "dali/util/nvml.h"
20+
#endif
21+
22+
#ifndef DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_
23+
#define DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_
24+
25+
namespace dali {
26+
27+
class DLL_PUBLIC NewThreadPool : public ThreadPoolBase {
28+
public:
29+
NewThreadPool(int num_threads, std::optional<int> device_id, bool set_affinity, std::string name);
30+
31+
private:
32+
std::any OnThreadStart(int thread_idx, bool set_affinity);
33+
std::optional<int> device_id_;
34+
std::string name_;
35+
#if NVML_ENABLED
36+
nvml::NvmlInstance nvml_handle_;
37+
#endif
38+
};
39+
40+
} // namespace dali
41+
42+
#endif // DALI_PIPELINE_UTIL_NEW_THREAD_POOL_H_

0 commit comments

Comments
 (0)