Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions dali/python/backend_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,100 @@ std::unique_ptr<Tensor<Backend> > TensorListGetItemImpl(TensorList<Backend> &t,
return ptr;
}

std::shared_ptr<TensorList<GPUBackend>> TensorListFromListOfDLPackObjects(
py::list &list_of_objects,
const std::optional<std::string> &layout,
py::object stream,
bool contiguous) {
DomainTimeRange range("TensorListFromListOfDLPackObjects", kGPUTensorColor);

if (list_of_objects.empty()) {
auto ptr = std::make_shared<TensorList<GPUBackend>>();
if (layout.has_value()) {
ptr->set_sample_dim(layout->length());
ptr->SetLayout(*layout);
}
return ptr;
}

AccessOrder copy_order = AccessOrder::host();
if (!stream.is_none())
copy_order = AccessOrderFromPythonStreamObj(stream);

// __dlpack__ expects an integer stream handle; extract it from the stream wrapper object
py::object stream_handle = py::none();
if (!stream.is_none()) {
auto h = getattr(stream, "handle", py::none());
stream_handle = h.is_none() ? stream : h;
}

std::optional<TensorList<GPUBackend>> non_contiguous_tmp;
std::shared_ptr<TensorList<GPUBackend>> non_contiguous_out;

if (contiguous)
non_contiguous_tmp = TensorList<GPUBackend>(list_of_objects.size());
else
non_contiguous_out = std::make_shared<TensorList<GPUBackend>>(list_of_objects.size());

TensorList<GPUBackend> &non_contiguous = contiguous
? non_contiguous_tmp.value()
: *non_contiguous_out;

int expected_type = -2;
int expected_device_id = -1;

{
DomainTimeRange build_range("Build initial list", kGPUTensorColor);
for (size_t i = 0; i < list_of_objects.size(); ++i) {
py::object obj = list_of_objects[i];
if (!py::hasattr(obj, "__dlpack__"))
throw py::type_error(make_string(
"Object at position ", i, " does not support the DLPack protocol."));

py::capsule capsule = obj.attr("__dlpack__")("stream"_a = stream_handle);
Tensor<GPUBackend> tensor;
FillTensorFromDlPack(capsule, &tensor, i == 0 ? layout : std::optional<std::string>{});

if (i == 0) {
non_contiguous.SetupLike(tensor);
if (copy_order == AccessOrder::host())
copy_order = AccessOrder(UserStream::Get()->GetStream(tensor));
expected_device_id = tensor.device_id();
} else if (tensor.device_id() != expected_device_id) {
throw py::value_error(make_string(
"All tensors must reside on the same GPU device. "
"Tensor at position ", i, " is on GPU ", tensor.device_id(),
" but expected GPU ", expected_device_id, "."));
}

DALIDataType cur_type = tensor.type();
if (expected_type == -2) {
expected_type = cur_type;
} else if (expected_type != static_cast<int>(cur_type)) {
throw py::type_error(make_string(
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
}
non_contiguous.SetSample(i, tensor);
}
}

if (!contiguous) {
SetLayout(non_contiguous, layout, false);
return non_contiguous_out;
}

{
DomainTimeRange copy_range("Copy to contiguous", kGPUTensorColor);
auto contiguous_out = std::make_shared<TensorList<GPUBackend>>();
contiguous_out->SetContiguity(BatchContiguity::Contiguous);
contiguous_out->set_pinned(non_contiguous.is_pinned());
contiguous_out->Copy(non_contiguous, copy_order);
SetLayout(*contiguous_out, layout, false);
return contiguous_out;
}
}

template <typename Backend>
std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
py::list &list_of_tensors,
Expand Down Expand Up @@ -1163,6 +1257,7 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
: *non_contiguous_out;

int expected_type = -2;
int expected_device_id = -1;

AccessOrder wait_order = AccessOrder::host();
AccessOrder copy_order = AccessOrder::host();
Expand All @@ -1177,6 +1272,7 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
non_contiguous.SetupLike(t);
if constexpr (std::is_same_v<Backend, GPUBackend>) {
copy_order = AccessOrder(UserStream::Get()->GetStream(t));
expected_device_id = t.device_id();
}
}
DALIDataType cur_type = t.type();
Expand All @@ -1188,6 +1284,14 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
}
if constexpr (std::is_same_v<Backend, GPUBackend>) {
if (t.device_id() != expected_device_id) {
throw py::value_error(make_string(
"All tensors must reside on the same GPU device. "
"Tensor at position ", i, " is on GPU ", t.device_id(),
" but expected GPU ", expected_device_id, "."));
}
}
non_contiguous.SetSample(i, t);
} catch (const py::type_error &) {
throw;
Expand Down Expand Up @@ -1595,6 +1699,30 @@ void ExposeTesorListGPU(py::module &m) {
If True, the list of tensors is converted to a contiguous TensorListGPU, necessarily
creating a copy. Otherwise, the copy may be avoided.
)code")
.def(py::init([](
py::list &list_of_objects,
std::optional<std::string> layout = {},
py::object stream = py::none(),
bool contiguous = false) {
DomainTimeRange range("TensorListGPU::init from a list of DLPack objects", kGPUTensorColor);
return TensorListFromListOfDLPackObjects(list_of_objects, layout, stream, contiguous);
}),
"list_of_objects"_a,
"layout"_a = py::none(),
"stream"_a = py::none(),
"contiguous"_a = false,
R"code(
List of tensors residing in the GPU memory, constructed from a Python list of DLPack objects.

list_of_objects : list
Python list of objects supporting the DLPack protocol (e.g. PyTorch GPU tensors)
layout : str
Layout of the data
stream : stream, optional
CUDA stream used for the DLPack export handshake
contiguous : bool, default False
If True, samples are copied into a single contiguous GPU buffer
)code")
.def(py::init([](const py::object &object,
const std::optional<std::string> &layout = {},
int device_id = -1) {
Expand Down
165 changes: 133 additions & 32 deletions dali/python/nvidia/dali/experimental/dynamic/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import importlib.util
from typing import Any
from typing import TYPE_CHECKING, Any
from collections.abc import Iterator

import nvidia.dali.backend as _backend
Expand All @@ -22,7 +22,10 @@
from ._nvtx import NVTXRange
from nvidia.dali._typing import BatchLike, TensorLike

from . import _eval_mode, _invocation
if TYPE_CHECKING:
from . import _invocation
from . import _eval_mode, _stream as _stream_module
from ._eval_context import EvalContext as _EvalContext
from ._arithmetic import _arithm_op
from ._device import Device, DeviceLike
from ._device import device as _device
Expand Down Expand Up @@ -172,7 +175,7 @@
dtype: DTypeLike | None = None,
device: DeviceLike | None = None,
layout: str | None = None,
invocation_result: _invocation.InvocationResult | None = None,
invocation_result: "_invocation.InvocationResult | None" = None,
copy: bool = False,
):
"""Constructs a :class:`Batch` object.
Expand Down Expand Up @@ -282,38 +285,136 @@
self._dtype = dtype

else:
self._tensors = []
for i, t in enumerate(tensors):
if t is None:
raise TypeError(
f"Tensors must be array-like types or numbers. Got `None` at index {i}"
)
sample = Tensor(t, dtype=dtype, device=device, layout=layout)
# Materialise first so len() and indexing work for any iterable.
_tensors_list = tensors if isinstance(tensors, list) else list(tensors)
_fast_path_used = False

# Native DALI fast path: list of evaluated ndd.Tensor objects.
# Build TensorList directly from backend storage objects, preserving all
# metadata (layout, enum types, etc.) without going through DLPack.
if (
dtype is None
and len(_tensors_list) > 0
and isinstance(_tensors_list[0], Tensor)
and _tensors_list[0]._storage is not None
):
_first_storage = _tensors_list[0]._storage
_storages = []
_native_valid = True
for _t in _tensors_list:
if (
not isinstance(_t, Tensor)
or _t._storage is None
or type(_t._storage) is not type(_first_storage)
):
_native_valid = False
break
_storages.append(_t._storage)
if _native_valid:
if isinstance(_first_storage, _backend.TensorGPU):
_dev_id = _first_storage.device_id()
_dev_matches = device is None or (
device.device_type == "gpu" and device.device_id == _dev_id
)
_backend_type = _backend.TensorListGPU
_dev = Device("gpu", _dev_id)
else:
_dev_matches = device is None or device.device_type == "cpu"
_backend_type = _backend.TensorListCPU
_dev = Device("cpu")
if _dev_matches:
try:
_storage = _backend_type(
_storages, layout=layout or None, contiguous=False
)
except (TypeError, RuntimeError):
pass # fall through to slow path
else:
self._storage = _storage
self._device = _dev
self._dtype = DType.from_type_id(self._storage.dtype)
self._layout = self._storage.layout() or ""
self._wraps_external_data = any(
t._wraps_external_data for t in _tensors_list
)
device = self._device
dtype = self._dtype
layout = self._layout
_fast_path_used = True

# DLPack fast path: list of external GPU tensors (e.g. PyTorch GPU tensors).
# Build TensorListGPU directly in C++, skipping per-sample Python Tensor wrappers.
if not _fast_path_used and (
dtype is None
and len(_tensors_list) > 0
and not isinstance(_tensors_list[0], Tensor)
and hasattr(_tensors_list[0], "__dlpack_device__")
):
_dl_dev_type, _dl_dev_id = _tensors_list[0].__dlpack_device__()
if int(_dl_dev_type) == 2: # GPU
if device is None or (
device.device_type == "gpu" and device.device_id == _dl_dev_id
):
ctx = _EvalContext.current()
_stream = (
ctx.cuda_stream
if ctx.device_id == _dl_dev_id
else _stream_module.stream(device_id=_dl_dev_id)
)
try:
_storage = _backend.TensorListGPU(
_tensors_list,
layout=layout or None,
stream=_stream,
contiguous=False,
)
except TypeError:
pass # fall through to slow path
else:
self._storage = _storage
self._device = Device("gpu", _dl_dev_id)
self._dtype = DType.from_type_id(self._storage.dtype)
self._layout = self._storage.layout() or ""
self._wraps_external_data = True
device = self._device
dtype = self._dtype
layout = self._layout
_fast_path_used = True

if not _fast_path_used:
self._tensors = []
for i, t in enumerate(_tensors_list):
if t is None:
raise TypeError(
f"Tensors must be array-like types or numbers. "
f"Got `None` at index {i}"
)
sample = Tensor(t, dtype=dtype, device=device, layout=layout)
if dtype is None:
dtype = sample.dtype
if device is None:
device = sample.device
if layout is None:
layout = sample.layout
self._tensors.append(sample)
if sample._wraps_external_data:
self._wraps_external_data = True
else:
if not isinstance(t, Tensor) or t._storage is not sample._storage:
copied = True
if dtype is None:
dtype = sample.dtype
# We would have set dtype in the 1st iteration, so the only way it can
# be None is if the `_tensors` are empty.
assert len(self._tensors) == 0
raise ValueError("Element type must be specified if the list is empty")
if device is None:
device = sample.device
device = Device("cpu")
if layout is None:
layout = sample.layout
self._tensors.append(sample)
if sample._wraps_external_data:
self._wraps_external_data = True
else:
if not isinstance(t, Tensor) or t._storage is not sample._storage:
copied = True
if dtype is None:
# We would have set dtype in the 1st iteration, so the only way it can
# be None is if the `_tensors` are empty.
assert len(self._tensors) == 0
raise ValueError("Element type must be specified if the list is empty")
if device is None:
device = Device("cpu")
if layout is None:
layout = ""
self._device = device
self._layout = layout
self._dtype = dtype
if len(self._tensors) == 0:
layout = ""
self._device = device
self._layout = layout
self._dtype = dtype
if self._tensors is not None and len(self._tensors) == 0:
with device:
t = Tensor([], dtype=dtype, device=device).evaluate()
if self._device.device_type == "cpu":
Expand Down
16 changes: 13 additions & 3 deletions dali/python/nvidia/dali/experimental/dynamic/_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,16 @@ def __init__(
elif hasattr(data, "__dlpack_device__"):
dl_device_type, device_id = data.__dlpack_device__()
if int(dl_device_type) == 1 or int(dl_device_type) == 3: # CPU
self._storage = _backend.TensorCPU(data.__dlpack__(), layout)
try:
self._storage = _backend.TensorCPU(data.__dlpack__(), layout)
except (BufferError, TypeError):
# DLPack may fail for read-only buffers or unsupported dtypes;
# fall back to __array__ if available.
a = _get_array_interface(data)
if a is not None:
self._storage = _backend.TensorCPU(a, layout)
else:
raise
elif int(dl_device_type) == 2: # GPU
# If the current context is on the same device, use the same stream.
ctx = _EvalContext.current()
Expand All @@ -212,8 +221,9 @@ def __init__(
else:
stream = _stream.stream(device_id=device_id)
args = {"stream": stream.handle}
dlpack_capsule = data.__dlpack__(**args)
self._storage = _backend.TensorGPU(
data.__dlpack__(**args),
dlpack_capsule,
layout=layout,
stream=stream,
)
Expand Down Expand Up @@ -278,7 +288,7 @@ def __init__(
self._dtype = DType.from_type_id(self._storage.dtype)
self._layout = self._storage.layout()

if self._storage is not None and device != _backend_device(self._storage):
if self._storage is not None and device != self._device:
self._assign(self.to_device(device).evaluate())
copied = True
elif invocation_result is not None:
Expand Down
Loading
Loading