Skip to content

Commit a0755d2

Browse files
committed
Align separated prefetch feed count
Make async separated executor prefetch and InputFeedCount use the same maximum queue-depth contract. Keep the Python separated prefetch path for drainable queue shapes, but use interleaved prefetch when the CPU queue is longer than the GPU queue so end-of-epoch Python sources do not leave CPU-only work without scheduled Mixed/GPU stages. Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
1 parent 8e20760 commit a0755d2

2 files changed

Lines changed: 50 additions & 13 deletions

File tree

dali/pipeline/executor/async_separated_pipelined_executor.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include "dali/pipeline/executor/async_separated_pipelined_executor.h"
1616

17+
#include <algorithm>
18+
1719
namespace dali {
1820

1921
void AsyncSeparatedPipelinedExecutor::RunCPU() {
@@ -39,14 +41,16 @@ void AsyncSeparatedPipelinedExecutor::Prefetch() {
3941
RunGPU();
4042
}
4143

42-
for (int i = 0; i < queue_sizes_.cpu_size; i++) {
44+
int cpu_only_prefetch_count =
45+
std::max(0, queue_sizes_.cpu_size - queue_sizes_.gpu_size);
46+
for (int i = 0; i < cpu_only_prefetch_count; i++) {
4347
RunCPU();
4448
}
4549
}
4650

4751
int AsyncSeparatedPipelinedExecutor::InputFeedCount(std::string_view op_name) {
4852
(void)graph_->Node(op_name);
49-
return queue_sizes_.cpu_size + queue_sizes_.gpu_size;
53+
return std::max(queue_sizes_.cpu_size, queue_sizes_.gpu_size);
5054
}
5155

5256
} // namespace dali

dali/python/nvidia/dali/pipeline.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,22 +1591,34 @@ def _prefetch(self):
15911591
raise RuntimeError("The pipeline was destroyed.")
15921592
self._schedule_py_workers()
15931593

1594-
# Keep input feeding interleaved with backend runs. Feeding all inputs
1595-
# first can leave separated execution with CPU-prefetched batches that
1596-
# have no scheduled Mixed/GPU work when an external source reaches end
1597-
# of epoch.
1598-
self._legacy_interleaved_prefetch()
1594+
# A larger separated CPU queue leaves CPU-only iterations after backend
1595+
# Prefetch. If a Python source reaches end of epoch, those iterations
1596+
# cannot be advanced through Mixed/GPU without feeding more CPU work.
1597+
if (
1598+
not self._exec_separated
1599+
or self._cpu_queue_size > self._gpu_queue_size
1600+
):
1601+
self._legacy_interleaved_prefetch()
1602+
return
1603+
1604+
# The new way: try to run the inputs and then feed them, finally call
1605+
# _pipe.Prefetch(). If this fails, we just run `_pipe.Run()` a bunch of
1606+
# times. This will likely blow up for separated queues, which are not
1607+
# properly supported anyway.
1608+
iters_fed = 0
1609+
self._first_iter = False
1610+
iters_fed, success = self._prefetch_inputs()
1611+
if success:
1612+
self._pipe.Prefetch()
1613+
else:
1614+
self._last_iter = True
1615+
for _ in range(iters_fed):
1616+
self._pipe.Run()
15991617

16001618
# This is the old way of prefetching - the feeding and running steps are interleaved.
16011619
# Running all callbacks at once, then feeding, then running - may affect the performance
16021620
# of the 1st iteration.
16031621
def _legacy_interleaved_prefetch(self):
1604-
# Separated execution has independent CPU and GPU queue depths, but an
1605-
# interleaved Run schedules one whole pipeline iteration through all
1606-
# stages. After max(cpu, gpu) runs each stage has seen enough iterations
1607-
# to fill its own queue. Using cpu + gpu would schedule extra full
1608-
# iterations, not just fill the GPU queue, and could over-read external
1609-
# inputs at an epoch boundary.
16101622
prefetch_count = (
16111623
max(self._cpu_queue_size, self._gpu_queue_size)
16121624
if self._exec_separated
@@ -1624,6 +1636,27 @@ def _legacy_interleaved_prefetch(self):
16241636
self._last_iter = True
16251637
break
16261638

1639+
def _prefetch_inputs(self):
1640+
prefetched, success = self._run_input_callbacks(True)
1641+
self._batches_to_consume += prefetched
1642+
1643+
if success:
1644+
if self._exec_separated:
1645+
prefetch_count = max(self._cpu_queue_size, self._gpu_queue_size)
1646+
else:
1647+
prefetch_count = self._cpu_queue_size
1648+
1649+
for i in range(prefetched, prefetch_count):
1650+
try:
1651+
self.iter_setup()
1652+
prefetched = i + 1
1653+
self._batches_to_consume += 1
1654+
except StopIteration:
1655+
success = False
1656+
break
1657+
1658+
return prefetched, success
1659+
16271660
def _run_once(self):
16281661
"""Start running the whole pipeline once without waiting for its results.
16291662

0 commit comments

Comments
 (0)