Skip to content

Commit 7f165fa

Browse files
airhornsclaude
andcommitted
Fix style check and mypy errors from CI
- Apply ruff formatting to new/modified lines - Fix mypy error in test_audit_only_no_nested_concurrency: use fully mocked evaluator instead of real evaluator with replaced method, avoiding type mismatch on call_count/call_args_list Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0f081da commit 7f165fa

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

sqlmesh/core/scheduler.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@
3838
)
3939
from sqlmesh.core.state_sync import StateSync
4040
from sqlmesh.utils import CompletionStatus
41-
from sqlmesh.utils.concurrency import concurrent_apply_to_dag, concurrent_apply_to_values, NodeExecutionFailedError
41+
from sqlmesh.utils.concurrency import (
42+
concurrent_apply_to_dag,
43+
concurrent_apply_to_values,
44+
NodeExecutionFailedError,
45+
)
4246
from sqlmesh.utils.dag import DAG
4347
from sqlmesh.utils.date import (
4448
TimeLike,
@@ -527,7 +531,9 @@ def run_merged_intervals(
527531
}
528532

529533
dag = self._dag(
530-
batched_intervals, snapshot_dag=snapshot_dag, snapshots_to_create=snapshots_to_create
534+
batched_intervals,
535+
snapshot_dag=snapshot_dag,
536+
snapshots_to_create=snapshots_to_create,
531537
)
532538

533539
def run_node(node: SchedulingUnit) -> None:
@@ -545,7 +551,8 @@ def run_node(node: SchedulingUnit) -> None:
545551

546552
# If batch_index > 0, then the target table must exist since the first batch would have created it
547553
target_table_exists = (
548-
snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
554+
snapshot.snapshot_id not in snapshots_to_create
555+
or node.batch_index > 0
549556
)
550557

551558
def _do_evaluate() -> t.List[AuditResult]:
@@ -972,9 +979,7 @@ def _run_node_with_progress(
972979
num_audits - num_audits_failed,
973980
num_audits_failed,
974981
execution_stats=execution_stats,
975-
auto_restatement_triggers=auto_restatement_triggers.get(
976-
snapshot.snapshot_id
977-
),
982+
auto_restatement_triggers=auto_restatement_triggers.get(snapshot.snapshot_id),
978983
)
979984

980985
def _run_audits_concurrently(

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,9 @@ def _run_audit(
633633
**kwargs,
634634
)
635635

636-
tasks_num = audit_concurrent_tasks if audit_concurrent_tasks is not None else self.concurrent_tasks
636+
tasks_num = (
637+
audit_concurrent_tasks if audit_concurrent_tasks is not None else self.concurrent_tasks
638+
)
637639
results = concurrent_apply_to_values(
638640
prepared_audits,
639641
_run_audit,

tests/core/test_scheduler.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,10 +1619,6 @@ def test_audit_only_no_nested_concurrency(mocker: MockerFixture, make_snapshot):
16191619
This prevents nested thread pool multiplication: max_workers * concurrent_tasks threads hitting
16201620
the DB at the same time.
16211621
"""
1622-
import sqlmesh.core.snapshot.evaluator as evaluator_module
1623-
1624-
spy = mocker.spy(evaluator_module, "concurrent_apply_to_values")
1625-
16261622
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("SELECT 1 as id")))
16271623
snapshot_b = make_snapshot(SqlModel(name="b", query=parse_one("SELECT 2 as id")))
16281624
snapshot_a.categorize_as(SnapshotChangeCategory.BREAKING)
@@ -1634,13 +1630,9 @@ def test_audit_only_no_nested_concurrency(mocker: MockerFixture, make_snapshot):
16341630
mock_evaluator.concurrent_context.return_value.__enter__ = mocker.Mock(return_value=None)
16351631
mock_evaluator.concurrent_context.return_value.__exit__ = mocker.Mock(return_value=False)
16361632

1637-
# Use the real SnapshotEvaluator to test the audit_concurrent_tasks parameter flows through
1638-
real_evaluator = SnapshotEvaluator(adapters=mocker.MagicMock(), concurrent_tasks=4)
1639-
real_evaluator.audit = mocker.MagicMock(return_value=[]) # type: ignore
1640-
16411633
scheduler = Scheduler(
16421634
snapshots=[snapshot_a, snapshot_b],
1643-
snapshot_evaluator=real_evaluator,
1635+
snapshot_evaluator=mock_evaluator,
16441636
state_sync=mocker.MagicMock(),
16451637
default_catalog=None,
16461638
max_workers=2,
@@ -1661,10 +1653,10 @@ def test_audit_only_no_nested_concurrency(mocker: MockerFixture, make_snapshot):
16611653

16621654
assert errors == []
16631655
assert skipped == []
1664-
assert real_evaluator.audit.call_count == 2
1656+
assert mock_evaluator.audit.call_count == 2
16651657

16661658
# Verify that audit_concurrent_tasks=1 was passed to each audit call to prevent nested pools
1667-
for call in real_evaluator.audit.call_args_list:
1659+
for call in mock_evaluator.audit.call_args_list:
16681660
assert call.kwargs.get("audit_concurrent_tasks") == 1, (
16691661
"audit_concurrent_tasks=1 must be passed to prevent nested thread pool multiplication"
16701662
)

0 commit comments

Comments
 (0)