Skip to content

Commit c45a34e

Browse files
10ne1gitster
authored andcommitted
run-command: poll child input in addition to output
Child input feeding might hit the 100ms output poll timeout as a side-effect of the ungroup=0 design when feeding multiple children in parallel and buffering their outputs. This throttles the write throughput as reported by Kristoffer. Peff also noted that the parent might block if the write pipe is full and cause a deadlock if both parent + child wait for one another. Thus we refactor the run-command I/O loop so it polls on both child input and output fds to eliminate the risk of artificial 100ms latencies and unnecessarily blocking the main process. This ensures that parallel hooks are fed data ASAP while maintaining responsiveness for (sideband) output. It's worth noting that in our current design, sequential execution is not affected by this because it still uses the ungroup=1 behavior, so there are no run-command induced buffering delays since the child sequentially outputs directly to the parent-inherited fds. Reported-by: Kristoffer Haugsbakk <kristofferhaugsbakk@fastmail.com> Suggested-by: Jeff King <peff@peff.net> Signed-off-by: Adrian Ratiu <adrian.ratiu@collabora.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
1 parent c549a40 commit c45a34e

1 file changed

Lines changed: 62 additions & 18 deletions

File tree

run-command.c

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,14 @@ static int child_is_receiving_input(const struct parallel_child *pp_child)
14991499
{
15001500
return child_is_working(pp_child) && pp_child->process.in > 0;
15011501
}
1502+
static int child_is_sending_output(const struct parallel_child *pp_child)
1503+
{
1504+
/*
1505+
* all pp children which buffer output through run_command via ungroup=0
1506+
* redirect stdout to stderr, so we just need to check process.err.
1507+
*/
1508+
return child_is_working(pp_child) && pp_child->process.err > 0;
1509+
}
15021510

15031511
struct parallel_processes {
15041512
size_t nr_processes;
@@ -1562,7 +1570,7 @@ static void pp_init(struct parallel_processes *pp,
15621570

15631571
CALLOC_ARRAY(pp->children, n);
15641572
if (!opts->ungroup)
1565-
CALLOC_ARRAY(pp->pfd, n);
1573+
CALLOC_ARRAY(pp->pfd, n * 2);
15661574

15671575
for (size_t i = 0; i < n; i++) {
15681576
strbuf_init(&pp->children[i].err, 0);
@@ -1707,21 +1715,63 @@ static void pp_buffer_stdin(struct parallel_processes *pp,
17071715
}
17081716
}
17091717

1710-
static void pp_buffer_stderr(struct parallel_processes *pp,
1711-
const struct run_process_parallel_opts *opts,
1712-
int output_timeout)
1718+
static void pp_buffer_io(struct parallel_processes *pp,
1719+
const struct run_process_parallel_opts *opts,
1720+
int timeout)
17131721
{
1714-
while (poll(pp->pfd, opts->processes, output_timeout) < 0) {
1722+
/* for each potential child slot, prepare two pollfd entries */
1723+
for (size_t i = 0; i < opts->processes; i++) {
1724+
if (child_is_sending_output(&pp->children[i])) {
1725+
pp->pfd[2*i].fd = pp->children[i].process.err;
1726+
pp->pfd[2*i].events = POLLIN | POLLHUP;
1727+
} else {
1728+
pp->pfd[2*i].fd = -1;
1729+
}
1730+
1731+
if (child_is_receiving_input(&pp->children[i])) {
1732+
pp->pfd[2*i+1].fd = pp->children[i].process.in;
1733+
pp->pfd[2*i+1].events = POLLOUT;
1734+
} else {
1735+
pp->pfd[2*i+1].fd = -1;
1736+
}
1737+
}
1738+
1739+
while (poll(pp->pfd, opts->processes * 2, timeout) < 0) {
17151740
if (errno == EINTR)
17161741
continue;
17171742
pp_cleanup(pp, opts);
17181743
die_errno("poll");
17191744
}
17201745

1721-
/* Buffer output from all pipes. */
17221746
for (size_t i = 0; i < opts->processes; i++) {
1747+
/* Handle input feeding (stdin) */
1748+
if (pp->pfd[2*i+1].revents & (POLLOUT | POLLHUP | POLLERR)) {
1749+
if (opts->feed_pipe) {
1750+
int ret = opts->feed_pipe(pp->children[i].process.in,
1751+
opts->data,
1752+
pp->children[i].data);
1753+
if (ret < 0)
1754+
die_errno("feed_pipe");
1755+
if (ret) {
1756+
/* done feeding */
1757+
close(pp->children[i].process.in);
1758+
pp->children[i].process.in = 0;
1759+
}
1760+
} else {
1761+
/*
1762+
* No feed_pipe means there is nothing to do, so
1763+
* close the fd. Child input can be fed by other
1764+
* methods, such as opts->path_to_stdin which
1765+
* slurps a file via dup2, so clean up here.
1766+
*/
1767+
close(pp->children[i].process.in);
1768+
pp->children[i].process.in = 0;
1769+
}
1770+
}
1771+
1772+
/* Handle output reading (stderr) */
17231773
if (child_is_working(&pp->children[i]) &&
1724-
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
1774+
pp->pfd[2*i].revents & (POLLIN | POLLHUP)) {
17251775
int n = strbuf_read_once(&pp->children[i].err,
17261776
pp->children[i].process.err, 0);
17271777
if (n == 0) {
@@ -1814,29 +1864,23 @@ static int pp_collect_finished(struct parallel_processes *pp,
18141864

18151865
static void pp_handle_child_IO(struct parallel_processes *pp,
18161866
const struct run_process_parallel_opts *opts,
1817-
int output_timeout)
1867+
int timeout)
18181868
{
1819-
/*
1820-
* First push input, if any (it might no-op), to child tasks to avoid them blocking
1821-
* after input. This also prevents deadlocks when ungrouping below, if a child blocks
1822-
* while the parent also waits for them to finish.
1823-
*/
1824-
pp_buffer_stdin(pp, opts);
1825-
18261869
if (opts->ungroup) {
1870+
pp_buffer_stdin(pp, opts);
18271871
for (size_t i = 0; i < opts->processes; i++)
18281872
if (child_is_ready_for_cleanup(&pp->children[i]))
18291873
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
18301874
} else {
1831-
pp_buffer_stderr(pp, opts, output_timeout);
1875+
pp_buffer_io(pp, opts, timeout);
18321876
pp_output(pp);
18331877
}
18341878
}
18351879

18361880
void run_processes_parallel(const struct run_process_parallel_opts *opts)
18371881
{
18381882
int i, code;
1839-
int output_timeout = 100;
1883+
int timeout = 100;
18401884
int spawn_cap = 4;
18411885
struct parallel_processes_for_signal pp_sig;
18421886
struct parallel_processes pp = {
@@ -1876,7 +1920,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
18761920
}
18771921
if (!pp.nr_processes)
18781922
break;
1879-
pp_handle_child_IO(&pp, opts, output_timeout);
1923+
pp_handle_child_IO(&pp, opts, timeout);
18801924
code = pp_collect_finished(&pp, opts);
18811925
if (code) {
18821926
pp.shutdown = 1;

0 commit comments

Comments
 (0)