Skip to content

Commit fe2b252

Browse files
committed
Fix separated external source prefetch drain
Keep pipeline prefetching interleaved with backend runs so separated execution does not leave CPU-prefetched external source batches without scheduled Mixed/GPU work at end of epoch. Prime separated execution for the maximum of CPU and GPU queue depths to avoid underfilling asymmetric queue configurations. Add a regression that drains a batch external source through mixed image decoding with symmetric and asymmetric separated CPU/GPU prefetch queues. Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
1 parent 87a78c0 commit fe2b252

2 files changed

Lines changed: 59 additions & 18 deletions

File tree

dali/python/nvidia/dali/pipeline.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,29 +1591,22 @@ def _prefetch(self):
15911591
raise RuntimeError("The pipeline was destroyed.")
15921592
self._schedule_py_workers()
15931593

1594-
# We probably need some benchmarking before we remove this code path
1595-
if not self._exec_separated:
1596-
self._legacy_interleaved_prefetch()
1597-
return
1598-
1599-
# The new way: try to run the inputs and then feed them, finally call _pipe.Prefetch()
1600-
# If this fails, we just run `_pipe.Run()` a bunch of times. This will likely blow up for
1601-
# separated queues, which are not properly supported anyway.
1602-
iters_fed = 0
1603-
self._first_iter = False
1604-
iters_fed, success = self._prefetch_inputs()
1605-
if success:
1606-
self._pipe.Prefetch()
1607-
else:
1608-
self._last_iter = True
1609-
for _ in range(iters_fed):
1610-
self._pipe.Run()
1594+
# Keep input feeding interleaved with backend runs. Feeding all inputs
1595+
# first can leave separated execution with CPU-prefetched batches that
1596+
# have no scheduled Mixed/GPU work when an external source reaches end
1597+
# of epoch.
1598+
self._legacy_interleaved_prefetch()
16111599

16121600
# This is the old way of prefetching - the feeding and running steps are interleaved.
16131601
# Running all callbacks at once, then feeding, then running - may affect the performance
16141602
# of the 1st iteration.
16151603
def _legacy_interleaved_prefetch(self):
1616-
for _ in range(self._cpu_queue_size):
1604+
prefetch_count = (
1605+
max(self._cpu_queue_size, self._gpu_queue_size)
1606+
if self._exec_separated
1607+
else self._cpu_queue_size
1608+
)
1609+
for _ in range(prefetch_count):
16171610
try:
16181611
self._first_iter = False
16191612
self._iter_setup()

dali/test/python/test_pipeline.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,54 @@ def my_pipe():
18931893
my_pipe(device_id=0, seed=1234, num_threads=3, set_affinity=True, py_num_workers=3)
18941894

18951895

1896+
def test_separated_queue_external_source_drains_prefetched_batches():
1897+
batch_size = 4
1898+
num_batches = 10
1899+
image_pattern = os.path.join(jpeg_folder, "*", "*.jpg")
1900+
paths = sorted(glob.glob(image_pattern))[: batch_size * num_batches]
1901+
assert len(paths) == batch_size * num_batches
1902+
1903+
def batches():
1904+
for i in range(num_batches):
1905+
batch_paths = paths[i * batch_size : (i + 1) * batch_size]
1906+
yield [np.fromfile(path, dtype=np.uint8) for path in batch_paths]
1907+
1908+
for cpu_size, gpu_size in [(2, 2), (3, 2), (2, 3)]:
1909+
1910+
@dali.pipeline_def(
1911+
batch_size=batch_size,
1912+
num_threads=4,
1913+
device_id=0,
1914+
prefetch_queue_depth={"cpu_size": cpu_size, "gpu_size": gpu_size},
1915+
)
1916+
def pipe():
1917+
encoded = fn.external_source(
1918+
source=batches,
1919+
batch=True,
1920+
cycle="raise",
1921+
)
1922+
decoded = fn.decoders.image(
1923+
encoded,
1924+
device="mixed",
1925+
output_type=types.RGB,
1926+
)
1927+
return decoded
1928+
1929+
p = pipe()
1930+
p.build()
1931+
for _ in range(num_batches):
1932+
out = p.run()[0]
1933+
assert len(out) == batch_size
1934+
decoded = out.as_cpu()
1935+
for sample_idx in range(batch_size):
1936+
sample = decoded.at(sample_idx)
1937+
assert sample.ndim == 3
1938+
assert sample.shape[-1] == 3
1939+
assert np.any(sample)
1940+
with assert_raises(StopIteration):
1941+
p.run()
1942+
1943+
18961944
def test_not_iterable():
18971945
import nvidia.dali._utils.hacks as hacks
18981946
import collections.abc

0 commit comments

Comments
 (0)