Skip to content

Commit 24e593c

Browse files
refactor: always set --log-format and always capture output, make capture_output a no-op
- Always pass --log-format to dbt CLI (previously gated on capture_output) - Always capture subprocess output (for transient-error detection) - Always parse output when log_format is json (previously gated on capture_output) - Remove capture_output from internal methods (_run_command, _execute_inner_command, _inner_run_command) - Keep capture_output on public API methods (run, test, deps, run_operation) as a deprecated no-op for backward compatibility - Remove sys.stdout/stderr.write hack (no longer needed since output is always parsed/logged) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com>
1 parent 0c5f1e5 commit 24e593c

3 files changed

Lines changed: 31 additions & 58 deletions

File tree

elementary/clients/dbt/api_dbt_runner.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ class APIDbtRunner(CommandLineDbtRunner):
2727
def _inner_run_command(
2828
self,
2929
dbt_command_args: List[str],
30-
capture_output: bool,
3130
quiet: bool,
3231
log_output: bool,
3332
log_format: str,

elementary/clients/dbt/command_line_dbt_runner.py

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
22
import os
33
import re
4-
import sys
54
from dataclasses import dataclass
65
from typing import Any, Dict, List, Optional
76

@@ -83,7 +82,6 @@ def __init__(
8382
def _inner_run_command(
8483
self,
8584
dbt_command_args: List[str],
86-
capture_output: bool,
8785
quiet: bool,
8886
log_output: bool,
8987
log_format: str,
@@ -98,15 +96,13 @@ def _parse_ls_command_result(
9896
def _run_command(
9997
self,
10098
command_args: List[str],
101-
capture_output: bool = False,
10299
log_format: str = "json",
103100
vars: Optional[dict] = None,
104101
quiet: bool = False,
105102
log_output: bool = True,
106103
) -> DbtCommandResult:
107104
dbt_command_args = []
108-
if capture_output:
109-
dbt_command_args.extend(["--log-format", log_format])
105+
dbt_command_args.extend(["--log-format", log_format])
110106
dbt_command_args.extend(command_args)
111107
dbt_command_args.extend(["--project-dir", os.path.abspath(self.project_dir)])
112108
if self.profiles_dir:
@@ -138,7 +134,6 @@ def _run_command(
138134
result = self._execute_inner_command(
139135
dbt_command_args=dbt_command_args,
140136
log_command_args=log_command_args,
141-
capture_output=capture_output,
142137
quiet=quiet,
143138
log_output=log_output,
144139
log_format=log_format,
@@ -150,7 +145,6 @@ def _execute_inner_command(
150145
self,
151146
dbt_command_args: List[str],
152147
log_command_args: List[str],
153-
capture_output: bool,
154148
quiet: bool,
155149
log_output: bool,
156150
log_format: str,
@@ -183,15 +177,9 @@ def _before_retry(retry_state: RetryCallState) -> None:
183177
reraise=True,
184178
)
185179
def _attempt() -> DbtCommandResult:
186-
# Always capture output so transient-error detection can inspect
187-
# stdout/stderr. When the caller set capture_output=False
188-
# (expecting streaming output), we print the captured output
189-
# to stdout/stderr after the command completes so it still
190-
# appears in the terminal.
191180
try:
192181
result = self._inner_run_command(
193182
dbt_command_args,
194-
capture_output=True,
195183
quiet=quiet,
196184
log_output=log_output,
197185
log_format=log_format,
@@ -229,22 +217,13 @@ def _attempt() -> DbtCommandResult:
229217
) from exc
230218
raise
231219

232-
if not capture_output and not quiet:
233-
# The caller expected output to stream to the terminal.
234-
# Since we captured it for transient-error detection,
235-
# print it now so the user still sees it.
236-
if isinstance(result.output, str) and result.output:
237-
sys.stdout.write(result.output)
238-
if isinstance(result.stderr, str) and result.stderr:
239-
sys.stderr.write(result.stderr)
240-
241-
if capture_output and result.output:
220+
if result.output:
242221
logger.debug(
243222
"Result bytes size for command '%s' is %d",
244223
log_command_args,
245224
len(result.output),
246225
)
247-
if log_output or is_debug():
226+
if log_format == "json" and (log_output or is_debug()):
248227
for log in parse_dbt_output(result.output, log_format):
249228
logger.info(log.msg)
250229

@@ -281,10 +260,12 @@ def _attempt() -> DbtCommandResult:
281260
# failed result so callers that check result.success work.
282261
return exc.result
283262

284-
def deps(self, quiet: bool = False, capture_output: bool = True) -> bool:
285-
result = self._run_command(
286-
command_args=["deps"], quiet=quiet, capture_output=capture_output
287-
)
263+
def deps(
264+
self,
265+
quiet: bool = False,
266+
capture_output: bool = True, # Deprecated: no-op, kept for backward compatibility.
267+
) -> bool:
268+
result = self._run_command(command_args=["deps"], quiet=quiet)
288269
return result.success
289270

290271
def seed(self, select: Optional[str] = None, full_refresh: bool = False) -> bool:
@@ -303,7 +284,7 @@ def snapshot(self) -> bool:
303284
def run_operation(
304285
self,
305286
macro_name: str,
306-
capture_output: bool = True,
287+
capture_output: bool = True, # Deprecated: no-op, kept for backward compatibility.
307288
macro_args: Optional[dict] = None,
308289
log_errors: bool = True,
309290
vars: Optional[dict] = None,
@@ -328,7 +309,6 @@ def run_operation(
328309
command_args.extend(["--args", json_args])
329310
result = self._run_command(
330311
command_args=command_args,
331-
capture_output=capture_output,
332312
vars=vars,
333313
quiet=quiet,
334314
log_output=log_output,
@@ -342,23 +322,22 @@ def run_operation(
342322
log_pattern = (
343323
RAW_EDR_LOGS_PATTERN if return_raw_edr_logs else MACRO_RESULT_PATTERN
344324
)
345-
if capture_output:
346-
if result.output is not None:
347-
for log in parse_dbt_output(result.output):
348-
if log_errors and log.level == "error":
349-
logger.error(log.msg)
350-
continue
351-
352-
if log.msg:
353-
match = log_pattern.match(log.msg)
354-
if match:
355-
run_operation_results.append(match.group(1))
356-
357-
if result.stderr is not None and log_errors:
358-
for log in parse_dbt_output(result.stderr):
359-
if log.level == "error":
360-
logger.error(log.msg)
361-
continue
325+
if result.output is not None:
326+
for log in parse_dbt_output(result.output):
327+
if log_errors and log.level == "error":
328+
logger.error(log.msg)
329+
continue
330+
331+
if log.msg:
332+
match = log_pattern.match(log.msg)
333+
if match:
334+
run_operation_results.append(match.group(1))
335+
336+
if result.stderr is not None and log_errors:
337+
for log in parse_dbt_output(result.stderr):
338+
if log.level == "error":
339+
logger.error(log.msg)
340+
continue
362341

363342
return run_operation_results
364343

@@ -369,7 +348,7 @@ def run(
369348
full_refresh: bool = False,
370349
vars: Optional[dict] = None,
371350
quiet: bool = False,
372-
capture_output: bool = False,
351+
capture_output: bool = False, # Deprecated: no-op, kept for backward compatibility.
373352
) -> bool:
374353
command_args = ["run"]
375354
if full_refresh:
@@ -382,7 +361,6 @@ def run(
382361
command_args=command_args,
383362
vars=vars,
384363
quiet=quiet,
385-
capture_output=capture_output,
386364
)
387365
return result.success
388366

@@ -391,7 +369,7 @@ def test(
391369
select: Optional[str] = None,
392370
vars: Optional[dict] = None,
393371
quiet: bool = False,
394-
capture_output: bool = False,
372+
capture_output: bool = False, # Deprecated: no-op, kept for backward compatibility.
395373
) -> bool:
396374
command_args = ["test"]
397375
if select:
@@ -400,7 +378,6 @@ def test(
400378
command_args=command_args,
401379
vars=vars,
402380
quiet=quiet,
403-
capture_output=capture_output,
404381
)
405382
return result.success
406383

@@ -417,9 +394,7 @@ def ls(self, select: Optional[str] = None) -> list:
417394
if select:
418395
command_args.extend(["-s", select])
419396
try:
420-
result = self._run_command(
421-
command_args=command_args, capture_output=True, log_format="text"
422-
)
397+
result = self._run_command(command_args=command_args, log_format="text")
423398
return self._parse_ls_command_result(select, result)
424399
except DbtCommandError:
425400
raise DbtLsCommandError(select)

elementary/clients/dbt/subprocess_dbt_runner.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ class SubprocessDbtRunner(CommandLineDbtRunner):
1919
def _inner_run_command(
2020
self,
2121
dbt_command_args: List[str],
22-
capture_output: bool,
2322
quiet: bool,
2423
log_output: bool,
2524
log_format: str,
@@ -28,7 +27,7 @@ def _inner_run_command(
2827
result = subprocess.run(
2928
[self._get_dbt_command_name()] + dbt_command_args,
3029
check=self.raise_on_failure,
31-
capture_output=capture_output or quiet,
30+
capture_output=True,
3231
env=self._get_command_env(),
3332
cwd=self.project_dir,
3433
)
@@ -43,7 +42,7 @@ def _inner_run_command(
4342
if err.output
4443
else []
4544
)
46-
if capture_output and (log_output or is_debug()):
45+
if log_output or is_debug():
4746
for log in logs:
4847
logger.info(log.msg)
4948
raise DbtCommandError(

0 commit comments

Comments
 (0)