Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f76b523
Support constructing readers with tensor args
rostan-t Mar 9, 2026
0878b6c
Detect when default values are passed when invoking a reader
rostan-t Mar 9, 2026
553d01c
Fix call stack depth for reader invocations when reconstructing stack…
rostan-t Mar 9, 2026
17bb0b5
Add tests passing tensor arguments
rostan-t Mar 9, 2026
a3172ff
Disallow constructing a reader with batch kwargs
rostan-t Mar 10, 2026
a8aa74a
Update signature of reader constructors to allow tensor arguments
rostan-t Mar 10, 2026
a1fccb3
Update NumpyReader example to pass ROI in the reader constructor
rostan-t Mar 11, 2026
cdaefaf
Apply suggestions from review
rostan-t Mar 11, 2026
a3a59ea
Prevent processing again tensor args when not necessary in batch proc…
rostan-t Mar 11, 2026
ab75701
Fix test_video_resize_tensor_args_partial
rostan-t Mar 11, 2026
4ba75c3
Cache processed tensor args passed in the constructor
rostan-t Mar 13, 2026
aee5ab5
Fix caller_depth handling. Remove special case for readers
rostan-t Mar 13, 2026
4785f39
Pass scalar arguments directly to reader constructors and copy extern…
rostan-t Mar 13, 2026
a5e6c5e
Set _raw_tensor_args instead of _tensor_args in reader constructor
rostan-t Mar 13, 2026
d1f0de9
Fix tensor arg tracking in reader op constructor
rostan-t Mar 13, 2026
09b7dd7
Properly use reader tensor args in TorchData integration
rostan-t Mar 16, 2026
e3de140
Fix signature of reader constructors
rostan-t Mar 16, 2026
2137a04
Fix tensor arg handling in reader op constructor
rostan-t Mar 16, 2026
ccbe827
Fix typos
rostan-t Mar 16, 2026
f2382ac
Copy all tensors passed to constructors and perform only broadcast in…
rostan-t Mar 18, 2026
59aa5c6
Revert the change to the caller depth.
rostan-t Mar 18, 2026
0a640ea
Perform dtype conversion of reader constructor arguments
rostan-t Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from nvidia.dali import backend as _b

if TYPE_CHECKING:
from .ops import Operator
from ._ops import Operator


class Invocation:
Expand Down
37 changes: 35 additions & 2 deletions dali/python/nvidia/dali/experimental/dynamic/_op_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from . import _device, _invocation, _op_filter, _ops, _type
from ._batch import Batch
from ._tensor import Tensor
from ._tensor import Tensor, tensor as to_tensor


def is_external(x):
Expand Down Expand Up @@ -161,20 +161,25 @@ def build_constructor(schema, op_class):
Operator._get() can be used instead of the constructor to utilize the instance caching.
"""
stateful = op_class._is_stateful
is_reader = op_class._is_reader
function_name = "__init__"

init_args = []
used_kwargs = set()
tensor_arg_names = set()
for arg in schema.GetArgumentNames():
if arg in _unsupported_args:
continue
if schema.IsTensorArgument(arg):
is_tensor = schema.IsTensorArgument(arg)
if is_tensor and not is_reader:
continue
if schema.IsArgumentOptional(arg):
init_args.append(f"{arg}=None")
else:
init_args.append(arg)
used_kwargs.add(arg)
if is_tensor:
tensor_arg_names.add(arg)

if init_args:
init_args = ["*"] + init_args
Expand All @@ -193,8 +198,25 @@ def build_constructor(schema, op_class):

# Note: Base __init__ will keep the **kwargs
def init(self, max_batch_size, name, **kwargs):
if is_reader:
actual_tensor_arg_names = {
arg_name for arg_name in tensor_arg_names if kwargs.get(arg_name) is not None
}
tensor_args = {}
for arg_name in tensor_arg_names:
arg = kwargs.get(arg_name)
if arg is None or isinstance(arg, (int, float, bool, str, tuple, list)):
continue
del kwargs[arg_name]
if isinstance(arg, Batch):
raise ValueError("Readers cannot be constructed with batch keyword arguments")
dtype = op_class._argument_conversion_map[arg_name]
tensor_args[arg_name] = to_tensor(arg, dtype=dtype)
kwargs = {k: _scalar_decay(v) for k, v in kwargs.items()}
op_class.__base__.__init__(self, max_batch_size, name, **kwargs)
if is_reader: # Need to be done here not to be overridden by the constructor
self._tensor_arg_names = actual_tensor_arg_names
self._raw_tensor_args = tensor_args
if stateful:
self._call_id = 0

Expand Down Expand Up @@ -272,6 +294,17 @@ def build_call_function(schema, op_class):
@NVTXRange(f"__call__: {op_class._op_name}", category="op_builder")
def call(self, *raw_args, batch_size=None, _process_params=True, **raw_kwargs):
self._pre_call(*raw_args, **raw_kwargs)

if op_class._is_reader and self._tensor_arg_names:
actual_kwargs = {name for name, value in raw_kwargs.items() if value is not None}
overlap = actual_kwargs & self._tensor_arg_names
if overlap:
raise ValueError(
f"Keyword argument{'s'[:len(overlap)^1]} {sorted(overlap)}"
f" cannot be passed both in the constructor and __call__."
)
raw_kwargs = {**raw_kwargs, **self._raw_tensor_args}

batch_size = _ops._infer_batch_size(batch_size, *raw_args, **raw_kwargs)
is_batch = batch_size is not None

Expand Down
41 changes: 35 additions & 6 deletions dali/python/nvidia/dali/experimental/dynamic/_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,14 @@ def __init__(
if stick_to_shard is not None
else _READER_SHARD_DEFAULTS["stick_to_shard"]
)

self._raw_tensor_args = {}
self._tensor_args = {}
# Used to know when to recompute _tensor_args for _raw_tensor_args
self._previous_batch_size: int | None = None
# Used to make sure that args passed to the constructor are not repeated in __call__
self._tensor_arg_names: set[str] = set()

if self._num_shards < 1:
raise ValueError(
f"The number of shards must be a positive integer. Got {self._num_shards}."
Expand Down Expand Up @@ -667,10 +675,26 @@ def next_epoch(self, batch_size=None, ctx: _eval_context.EvalContext | None = No
else:
return self._samples(ctx)

def get_metadata(self) -> ReaderMeta:
def _process_tensor_args(self, batch_size: int | None):
"""Converts stored tensor args to Batch/Tensor form for the given batch_size."""
if not self._raw_tensor_args:
return {}

if batch_size is None:
self._tensor_args = self._raw_tensor_args
elif self._previous_batch_size != batch_size:
self._tensor_args = {
name: Batch.broadcast(sample, batch_size)
for name, sample in self._raw_tensor_args.items()
}

self._previous_batch_size = batch_size
return self._tensor_args

def get_metadata(self, batch_size: int) -> ReaderMeta:
"""Returns the metadata of the underlying reader operator"""

self._init_backend(None, (), {})
self._init_backend(None, (), self._process_tensor_args(batch_size))
return self._op_backend.GetReaderMeta()

def _samples(self, ctx: _eval_context.EvalContext | None = None):
Expand All @@ -689,14 +713,16 @@ def _samples(self, ctx: _eval_context.EvalContext | None = None):
self._actual_batch_size = 1
if self._max_batch_size is None:
self._max_batch_size = self._actual_batch_size
self._init_backend(ctx, (), {})
self._init_backend(ctx, (), self._process_tensor_args(self._actual_batch_size))

tensor_args = self._process_tensor_args(self._actual_batch_size)
meta = self._op_backend.GetReaderMeta()
idx = 0
padded_size = meta["epoch_size_padded"]
shards_beg = math.floor(self._shard_id * padded_size / self._num_shards)
shards_end = math.floor((self._shard_id + 1) * padded_size / self._num_shards)
while idx < shards_end - shards_beg:
outputs = super()._run(ctx, batch_size=self._actual_batch_size)
outputs = super()._run(ctx, batch_size=self._actual_batch_size, **tensor_args)
batch_size = len(
outputs[0] if isinstance(outputs, tuple) else next(iter(outputs.values()))
)
Expand Down Expand Up @@ -734,20 +760,23 @@ def _batches(self, batch_size=None, ctx: _eval_context.EvalContext | None = None
f"{self._max_batch_size} specified when the operator was created"
)
self._max_batch_size = batch_size
self._init_backend(ctx, (), {})
tensor_args = self._process_tensor_args(batch_size)
self._init_backend(ctx, (), tensor_args)
else:
if self._max_batch_size and self._max_batch_size != batch_size:
raise ValueError(
f"`batch_size` {batch_size} is different than the `max_batch_size` "
f"{self._max_batch_size} used in the previous call"
)
tensor_args = None
meta = self._op_backend.GetReaderMeta()
idx = 0
padded_size = meta["epoch_size_padded"]
shards_beg = math.floor(self._shard_id * padded_size / self._num_shards)
shards_end = math.floor((self._shard_id + 1) * padded_size / self._num_shards)
while idx < shards_end - shards_beg:
outputs = super()._run(ctx, batch_size=batch_size)
tensor_args = self._process_tensor_args(batch_size)
outputs = super()._run(ctx, batch_size=batch_size, **tensor_args)
batch_size_returned = batch_size = len(
outputs[0] if isinstance(outputs, tuple) else next(iter(outputs.values()))
)
Expand Down
5 changes: 3 additions & 2 deletions dali/python/nvidia/dali/experimental/dynamic/pytorch/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def get_state(self):

def get_metadata(self) -> _ops.ReaderMeta:
"""Returns the metadata of the underlying reader operator"""
return self._reader.get_metadata()
return self._reader.get_metadata(self._batch_size)


class DictMapper(tn.BaseNode[dict[str, T]], metaclass=_CUDANodeMeta):
Expand Down Expand Up @@ -202,7 +202,8 @@ def __init__(
elif isinstance(output_stream, torch.cuda.Stream | None):
self._stream_context = torch.cuda.stream(output_stream) # no-op if None
else:
self._dali_stream = _stream(output_stream) # keep it alive in case the caller doesn't
# Keep it alive in case the caller doesn't
self._dali_stream = _stream(stream=output_stream)
self._stream_context = torch.cuda.stream(
torch.cuda.ExternalStream(self._dali_stream.handle, self._dali_stream.device_id)
)
Expand Down
5 changes: 3 additions & 2 deletions dali/python/nvidia/dali/ops/_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ def __call__$signature:
schema,
include_self=True,
include_only_inputs=True,
input_annotation_gen=lambda _: _TensorLike,
)
)

Expand All @@ -796,12 +797,12 @@ class {op_name}:
def __init__{_call_signature(
schema,
"dynamic",
include_inputs=False,
include_kwarg_inputs=False,
include_self=True,
return_annotation=False,
include_kwargs=True,
include_init_header=True,
allow_data_node_kwargs=False,
allow_batch_kwargs=False,
)}:
...

Expand Down
59 changes: 57 additions & 2 deletions dali/test/python/experimental_mode/test_reader_decoder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,6 +184,61 @@ def test_reader_shards_error():
num_shards=99999,
)
with assert_raises(
RuntimeError, glob='Assert on "num_shards_ <= Size()" failed: The number of input samples:'
RuntimeError, glob='Assert on "num_shards_ <= Size()" failed: The number of input samples:*'
):
tuple(reader.next_epoch())


@params(None, 4)
def test_video_resize_tensor_args(batch_size):
sequence_length = 60
width, height = 108, 192
video_root = os.path.join(dali_extra_path, "db", "video", "sintel", "video_files")
reader = ndd.readers.VideoResize(
filenames=[os.path.join(video_root, "sintel_trailer-720p_3.mp4")],
sequence_length=sequence_length,
device="gpu",
resize_x=ndd.tensor(width),
resize_y=height,
)
for (tensor,) in reader.next_epoch(batch_size=batch_size):
assert isinstance(tensor, ndd.Tensor if batch_size is None else ndd.Batch)
assert tensor.layout == "FHWC"
assert tensor.dtype == ndd.uint8
assert tensor.device == ndd.Device("gpu")
expected_shape = (sequence_length, height, width, 3)
if batch_size is None:
assert tensor.shape == expected_shape
else:
assert tensor.shape == [expected_shape] * batch_size


def test_video_resize_tensor_args_partial():
sequence_length = 60
width, height = 144, 192
video_root = os.path.join(dali_extra_path, "db", "video", "sintel", "video_files")
reader = ndd.readers.VideoResize(
filenames=[os.path.join(video_root, "sintel_trailer-720p_3.mp4")],
sequence_length=sequence_length,
device="gpu",
resize_x=width,
)
tensor = reader(resize_y=height)
assert isinstance(tensor, ndd.Tensor)
assert tensor.layout == "FHWC"
assert tensor.dtype == ndd.uint8
assert tensor.device == ndd.Device("gpu")
assert tensor.shape == (sequence_length, height, width, 3)


def test_video_resize_tensor_repeated_args():
video_root = os.path.join(dali_extra_path, "db", "video", "sintel", "video_files")
reader = ndd.readers.VideoResize(
filenames=[os.path.join(video_root, "sintel_trailer-720p_3.mp4")],
sequence_length=60,
device="gpu",
resize_x=108,
resize_y=192,
)
with assert_raises(ValueError, glob="*resize_x*"):
reader(resize_x=144)
14 changes: 13 additions & 1 deletion dali/test/python/type_annotations/test_typing_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import numpy as np
import nvidia.dali.experimental.dynamic as ndd
from nose_utils import attr # type: ignore
from nose_utils import attr
from nvidia.dali import types
from test_utils import get_dali_extra_path

Expand Down Expand Up @@ -102,3 +102,15 @@ def test_copy_tensor_constant():
assert np.array_equal(const_tuple, [4, 5])
assert np.array_equal(const_torch, np.full((2, 2), 6))
assert np.array_equal(const_np, np.full((2, 2), 7))


def test_numpy_reader_roi():
reader = ndd.readers.Numpy(
file_root=str(_test_root / "db" / "3D" / "MRI" / "Knee" / "npy_2d_slices" / "STU00001"),
roi_start=(30, 30),
roi_end=(230, 230),
)
for (data,) in reader.next_epoch():
expect_tensor(data)
assert data.shape == (200, 200)
break
79 changes: 65 additions & 14 deletions docs/examples/general/data_loading/numpy_reader/dynamic_mode.ipynb

Large diffs are not rendered by default.

Loading