Skip to content

Commit fbb02c6

Browse files
committed
Improve DLPack support for external tensor consumption
- Adds C++ bulk DLPack constructor for TensorListGPU: accepts a Python list of DLPack-compatible objects (e.g. PyTorch GPU tensors) and builds the TensorList in a single pass, recording a CUDA event on the provided stream. Passes `stream` as a keyword argument to `__dlpack__()` for compatibility with NumPy ≥ 1.22 and JAX which define `def __dlpack__(self, *, stream=None)`. - Adds native DALI fast path in Batch.__init__: when given a list of already-evaluated ndd.Tensor objects, pass their _storage objects directly to TensorListGPU/CPU constructors, preserving all DALI metadata (layout, enum types) without going through DLPack. - Adds GPU DLPack fast path in Batch.__init__: when given a list of external GPU tensors (e.g. PyTorch) that support DLPack, use the new C++ bulk constructor to avoid per-tensor Python overhead. - Adds DLPack fallback for CPU read-only arrays in Tensor.__init__: catch BufferError from __dlpack__() and fall back to __array__ interface (fixes DALI-4580). Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
1 parent 85a85b6 commit fbb02c6

File tree

5 files changed

+381
-35
lines changed

5 files changed

+381
-35
lines changed

dali/python/backend_impl.cc

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,100 @@ std::unique_ptr<Tensor<Backend> > TensorListGetItemImpl(TensorList<Backend> &t,
11331133
return ptr;
11341134
}
11351135

1136+
std::shared_ptr<TensorList<GPUBackend>> TensorListFromListOfDLPackObjects(
1137+
py::list &list_of_objects,
1138+
const std::optional<std::string> &layout,
1139+
py::object stream,
1140+
bool contiguous) {
1141+
DomainTimeRange range("TensorListFromListOfDLPackObjects", kGPUTensorColor);
1142+
1143+
if (list_of_objects.empty()) {
1144+
auto ptr = std::make_shared<TensorList<GPUBackend>>();
1145+
if (layout.has_value()) {
1146+
ptr->set_sample_dim(layout->length());
1147+
ptr->SetLayout(*layout);
1148+
}
1149+
return ptr;
1150+
}
1151+
1152+
AccessOrder copy_order = AccessOrder::host();
1153+
if (!stream.is_none())
1154+
copy_order = AccessOrderFromPythonStreamObj(stream);
1155+
1156+
// __dlpack__ expects an integer stream handle; extract it from the stream wrapper object
1157+
py::object stream_handle = py::none();
1158+
if (!stream.is_none()) {
1159+
auto h = getattr(stream, "handle", py::none());
1160+
stream_handle = h.is_none() ? stream : h;
1161+
}
1162+
1163+
std::optional<TensorList<GPUBackend>> non_contiguous_tmp;
1164+
std::shared_ptr<TensorList<GPUBackend>> non_contiguous_out;
1165+
1166+
if (contiguous)
1167+
non_contiguous_tmp = TensorList<GPUBackend>(list_of_objects.size());
1168+
else
1169+
non_contiguous_out = std::make_shared<TensorList<GPUBackend>>(list_of_objects.size());
1170+
1171+
TensorList<GPUBackend> &non_contiguous = contiguous
1172+
? non_contiguous_tmp.value()
1173+
: *non_contiguous_out;
1174+
1175+
int expected_type = -2;
1176+
int expected_device_id = -1;
1177+
1178+
{
1179+
DomainTimeRange build_range("Build initial list", kGPUTensorColor);
1180+
for (size_t i = 0; i < list_of_objects.size(); ++i) {
1181+
py::object obj = list_of_objects[i];
1182+
if (!py::hasattr(obj, "__dlpack__"))
1183+
throw py::type_error(make_string(
1184+
"Object at position ", i, " does not support the DLPack protocol."));
1185+
1186+
py::capsule capsule = obj.attr("__dlpack__")("stream"_a = stream_handle);
1187+
Tensor<GPUBackend> tensor;
1188+
FillTensorFromDlPack(capsule, &tensor, i == 0 ? layout : std::optional<std::string>{});
1189+
1190+
if (i == 0) {
1191+
non_contiguous.SetupLike(tensor);
1192+
if (copy_order == AccessOrder::host())
1193+
copy_order = AccessOrder(UserStream::Get()->GetStream(tensor));
1194+
expected_device_id = tensor.device_id();
1195+
} else if (tensor.device_id() != expected_device_id) {
1196+
throw py::value_error(make_string(
1197+
"All tensors must reside on the same GPU device. "
1198+
"Tensor at position ", i, " is on GPU ", tensor.device_id(),
1199+
" but expected GPU ", expected_device_id, "."));
1200+
}
1201+
1202+
DALIDataType cur_type = tensor.type();
1203+
if (expected_type == -2) {
1204+
expected_type = cur_type;
1205+
} else if (expected_type != static_cast<int>(cur_type)) {
1206+
throw py::type_error(make_string(
1207+
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
1208+
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
1209+
}
1210+
non_contiguous.SetSample(i, tensor);
1211+
}
1212+
}
1213+
1214+
if (!contiguous) {
1215+
SetLayout(non_contiguous, layout, false);
1216+
return non_contiguous_out;
1217+
}
1218+
1219+
{
1220+
DomainTimeRange copy_range("Copy to contiguous", kGPUTensorColor);
1221+
auto contiguous_out = std::make_shared<TensorList<GPUBackend>>();
1222+
contiguous_out->SetContiguity(BatchContiguity::Contiguous);
1223+
contiguous_out->set_pinned(non_contiguous.is_pinned());
1224+
contiguous_out->Copy(non_contiguous, copy_order);
1225+
SetLayout(*contiguous_out, layout, false);
1226+
return contiguous_out;
1227+
}
1228+
}
1229+
11361230
template <typename Backend>
11371231
std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
11381232
py::list &list_of_tensors,
@@ -1163,6 +1257,7 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
11631257
: *non_contiguous_out;
11641258

11651259
int expected_type = -2;
1260+
int expected_device_id = -1;
11661261

11671262
AccessOrder wait_order = AccessOrder::host();
11681263
AccessOrder copy_order = AccessOrder::host();
@@ -1177,6 +1272,7 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
11771272
non_contiguous.SetupLike(t);
11781273
if constexpr (std::is_same_v<Backend, GPUBackend>) {
11791274
copy_order = AccessOrder(UserStream::Get()->GetStream(t));
1275+
expected_device_id = t.device_id();
11801276
}
11811277
}
11821278
DALIDataType cur_type = t.type();
@@ -1188,6 +1284,14 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
11881284
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
11891285
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
11901286
}
1287+
if constexpr (std::is_same_v<Backend, GPUBackend>) {
1288+
if (t.device_id() != expected_device_id) {
1289+
throw py::value_error(make_string(
1290+
"All tensors must reside on the same GPU device. "
1291+
"Tensor at position ", i, " is on GPU ", t.device_id(),
1292+
" but expected GPU ", expected_device_id, "."));
1293+
}
1294+
}
11911295
non_contiguous.SetSample(i, t);
11921296
} catch (const py::type_error &) {
11931297
throw;
@@ -1595,6 +1699,30 @@ void ExposeTesorListGPU(py::module &m) {
15951699
If True, the list of tensors is converted to a contiguous TensorListGPU, necessarily
15961700
creating a copy. Otherwise, the copy may be avoided.
15971701
)code")
1702+
.def(py::init([](
1703+
py::list &list_of_objects,
1704+
std::optional<std::string> layout = {},
1705+
py::object stream = py::none(),
1706+
bool contiguous = false) {
1707+
DomainTimeRange range("TensorListGPU::init from a list of DLPack objects", kGPUTensorColor);
1708+
return TensorListFromListOfDLPackObjects(list_of_objects, layout, stream, contiguous);
1709+
}),
1710+
"list_of_objects"_a,
1711+
"layout"_a = py::none(),
1712+
"stream"_a = py::none(),
1713+
"contiguous"_a = false,
1714+
R"code(
1715+
List of tensors residing in the GPU memory, constructed from a Python list of DLPack objects.
1716+
1717+
list_of_objects : list
1718+
Python list of objects supporting the DLPack protocol (e.g. PyTorch GPU tensors)
1719+
layout : str
1720+
Layout of the data
1721+
stream : stream, optional
1722+
CUDA stream used for the DLPack export handshake
1723+
contiguous : bool, default False
1724+
If True, samples are copied into a single contiguous GPU buffer
1725+
)code")
15981726
.def(py::init([](const py::object &object,
15991727
const std::optional<std::string> &layout = {},
16001728
int device_id = -1) {

dali/python/nvidia/dali/experimental/dynamic/_batch.py

Lines changed: 133 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import importlib.util
16-
from typing import Any
16+
from typing import TYPE_CHECKING, Any
1717
from collections.abc import Iterator
1818

1919
import nvidia.dali.backend as _backend
@@ -22,7 +22,10 @@
2222
from ._nvtx import NVTXRange
2323
from nvidia.dali._typing import BatchLike, TensorLike
2424

25-
from . import _eval_mode, _invocation
25+
if TYPE_CHECKING:
26+
from . import _invocation
27+
from . import _eval_mode, _stream as _stream_module
28+
from ._eval_context import EvalContext as _EvalContext
2629
from ._arithmetic import _arithm_op
2730
from ._device import Device, DeviceLike
2831
from ._device import device as _device
@@ -172,7 +175,7 @@ def __init__(
172175
dtype: DTypeLike | None = None,
173176
device: DeviceLike | None = None,
174177
layout: str | None = None,
175-
invocation_result: _invocation.InvocationResult | None = None,
178+
invocation_result: "_invocation.InvocationResult | None" = None,
176179
copy: bool = False,
177180
):
178181
"""Constructs a :class:`Batch` object.
@@ -282,38 +285,136 @@ def __init__(
282285
self._dtype = dtype
283286

284287
else:
285-
self._tensors = []
286-
for i, t in enumerate(tensors):
287-
if t is None:
288-
raise TypeError(
289-
f"Tensors must be array-like types or numbers. Got `None` at index {i}"
290-
)
291-
sample = Tensor(t, dtype=dtype, device=device, layout=layout)
288+
# Materialise first so len() and indexing work for any iterable.
289+
_tensors_list = tensors if isinstance(tensors, list) else list(tensors)
290+
_fast_path_used = False
291+
292+
# Native DALI fast path: list of evaluated ndd.Tensor objects.
293+
# Build TensorList directly from backend storage objects, preserving all
294+
# metadata (layout, enum types, etc.) without going through DLPack.
295+
if (
296+
dtype is None
297+
and len(_tensors_list) > 0
298+
and isinstance(_tensors_list[0], Tensor)
299+
and _tensors_list[0]._storage is not None
300+
):
301+
_first_storage = _tensors_list[0]._storage
302+
_storages = []
303+
_native_valid = True
304+
for _t in _tensors_list:
305+
if (
306+
not isinstance(_t, Tensor)
307+
or _t._storage is None
308+
or type(_t._storage) is not type(_first_storage)
309+
):
310+
_native_valid = False
311+
break
312+
_storages.append(_t._storage)
313+
if _native_valid:
314+
if isinstance(_first_storage, _backend.TensorGPU):
315+
_dev_id = _first_storage.device_id()
316+
_dev_matches = device is None or (
317+
device.device_type == "gpu" and device.device_id == _dev_id
318+
)
319+
_backend_type = _backend.TensorListGPU
320+
_dev = Device("gpu", _dev_id)
321+
else:
322+
_dev_matches = device is None or device.device_type == "cpu"
323+
_backend_type = _backend.TensorListCPU
324+
_dev = Device("cpu")
325+
if _dev_matches:
326+
try:
327+
_storage = _backend_type(
328+
_storages, layout=layout or None, contiguous=False
329+
)
330+
except (TypeError, RuntimeError):
331+
pass # fall through to slow path
332+
else:
333+
self._storage = _storage
334+
self._device = _dev
335+
self._dtype = DType.from_type_id(self._storage.dtype)
336+
self._layout = self._storage.layout() or ""
337+
self._wraps_external_data = any(
338+
t._wraps_external_data for t in _tensors_list
339+
)
340+
device = self._device
341+
dtype = self._dtype
342+
layout = self._layout
343+
_fast_path_used = True
344+
345+
# DLPack fast path: list of external GPU tensors (e.g. PyTorch GPU tensors).
346+
# Build TensorListGPU directly in C++, skipping per-sample Python Tensor wrappers.
347+
if not _fast_path_used and (
348+
dtype is None
349+
and len(_tensors_list) > 0
350+
and not isinstance(_tensors_list[0], Tensor)
351+
and hasattr(_tensors_list[0], "__dlpack_device__")
352+
):
353+
_dl_dev_type, _dl_dev_id = _tensors_list[0].__dlpack_device__()
354+
if int(_dl_dev_type) == 2: # GPU
355+
if device is None or (
356+
device.device_type == "gpu" and device.device_id == _dl_dev_id
357+
):
358+
ctx = _EvalContext.current()
359+
_stream = (
360+
ctx.cuda_stream
361+
if ctx.device_id == _dl_dev_id
362+
else _stream_module.stream(device_id=_dl_dev_id)
363+
)
364+
try:
365+
_storage = _backend.TensorListGPU(
366+
_tensors_list,
367+
layout=layout or None,
368+
stream=_stream,
369+
contiguous=False,
370+
)
371+
except TypeError:
372+
pass # fall through to slow path
373+
else:
374+
self._storage = _storage
375+
self._device = Device("gpu", _dl_dev_id)
376+
self._dtype = DType.from_type_id(self._storage.dtype)
377+
self._layout = self._storage.layout() or ""
378+
self._wraps_external_data = True
379+
device = self._device
380+
dtype = self._dtype
381+
layout = self._layout
382+
_fast_path_used = True
383+
384+
if not _fast_path_used:
385+
self._tensors = []
386+
for i, t in enumerate(_tensors_list):
387+
if t is None:
388+
raise TypeError(
389+
f"Tensors must be array-like types or numbers. "
390+
f"Got `None` at index {i}"
391+
)
392+
sample = Tensor(t, dtype=dtype, device=device, layout=layout)
393+
if dtype is None:
394+
dtype = sample.dtype
395+
if device is None:
396+
device = sample.device
397+
if layout is None:
398+
layout = sample.layout
399+
self._tensors.append(sample)
400+
if sample._wraps_external_data:
401+
self._wraps_external_data = True
402+
else:
403+
if not isinstance(t, Tensor) or t._storage is not sample._storage:
404+
copied = True
292405
if dtype is None:
293-
dtype = sample.dtype
406+
# We would have set dtype in the 1st iteration, so the only way it can
407+
# be None is if the `_tensors` are empty.
408+
assert len(self._tensors) == 0
409+
raise ValueError("Element type must be specified if the list is empty")
294410
if device is None:
295-
device = sample.device
411+
device = Device("cpu")
296412
if layout is None:
297-
layout = sample.layout
298-
self._tensors.append(sample)
299-
if sample._wraps_external_data:
300-
self._wraps_external_data = True
301-
else:
302-
if not isinstance(t, Tensor) or t._storage is not sample._storage:
303-
copied = True
304-
if dtype is None:
305-
# We would have set dtype in the 1st iteration, so the only way it can
306-
# be None is if the `_tensors` are empty.
307-
assert len(self._tensors) == 0
308-
raise ValueError("Element type must be specified if the list is empty")
309-
if device is None:
310-
device = Device("cpu")
311-
if layout is None:
312-
layout = ""
313-
self._device = device
314-
self._layout = layout
315-
self._dtype = dtype
316-
if len(self._tensors) == 0:
413+
layout = ""
414+
self._device = device
415+
self._layout = layout
416+
self._dtype = dtype
417+
if self._tensors is not None and len(self._tensors) == 0:
317418
with device:
318419
t = Tensor([], dtype=dtype, device=device).evaluate()
319420
if self._device.device_type == "cpu":

dali/python/nvidia/dali/experimental/dynamic/_tensor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,16 @@ def __init__(
203203
elif hasattr(data, "__dlpack_device__"):
204204
dl_device_type, device_id = data.__dlpack_device__()
205205
if int(dl_device_type) == 1 or int(dl_device_type) == 3: # CPU
206-
self._storage = _backend.TensorCPU(data.__dlpack__(), layout)
206+
try:
207+
self._storage = _backend.TensorCPU(data.__dlpack__(), layout)
208+
except (BufferError, TypeError):
209+
# DLPack may fail for read-only buffers or unsupported dtypes;
210+
# fall back to __array__ if available.
211+
a = _get_array_interface(data)
212+
if a is not None:
213+
self._storage = _backend.TensorCPU(a, layout)
214+
else:
215+
raise
207216
elif int(dl_device_type) == 2: # GPU
208217
# If the current context is on the same device, use the same stream.
209218
ctx = _EvalContext.current()
@@ -212,8 +221,9 @@ def __init__(
212221
else:
213222
stream = _stream.stream(device_id=device_id)
214223
args = {"stream": stream.handle}
224+
dlpack_capsule = data.__dlpack__(**args)
215225
self._storage = _backend.TensorGPU(
216-
data.__dlpack__(**args),
226+
dlpack_capsule,
217227
layout=layout,
218228
stream=stream,
219229
)
@@ -278,7 +288,7 @@ def __init__(
278288
self._dtype = DType.from_type_id(self._storage.dtype)
279289
self._layout = self._storage.layout()
280290

281-
if self._storage is not None and device != _backend_device(self._storage):
291+
if self._storage is not None and device != self._device:
282292
self._assign(self.to_device(device).evaluate())
283293
copied = True
284294
elif invocation_result is not None:

0 commit comments

Comments
 (0)