Skip to content

Commit 30381bb

Browse files
committed
Add missing test_integration.py
1 parent 3ffdc0b commit 30381bb

File tree

3 files changed

+363
-1
lines changed

3 files changed

+363
-1
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,3 @@ htmlcov/
4747
# Local files
4848
*.log
4949
.DS_Store
50-
test_integration.py

test_integration.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""Integration test with a minimal DuckDB project."""
2+
import os
3+
import sys
4+
import tempfile
5+
import shutil
6+
7+
# Add sqlmesh to path
8+
sys.path.insert(0, "/Users/nico/Code/sqlmesh")
9+
10+
# Set up console transport for testing (prints events instead of sending to server)
11+
os.environ["OPENLINEAGE_URL"] = "console://localhost"
12+
os.environ["OPENLINEAGE_NAMESPACE"] = "test_namespace"
13+
14+
import sqlmesh_openlineage
15+
16+
# Install the OpenLineage console wrapper
17+
sqlmesh_openlineage.install()
18+
19+
print("OpenLineage integration installed:", sqlmesh_openlineage.is_installed())
20+
21+
# Create a minimal temp project
22+
tmpdir = tempfile.mkdtemp(prefix="sqlmesh_ol_test_")
23+
print(f"\nCreating test project in: {tmpdir}")
24+
25+
# Create config.yaml
26+
config_content = """
27+
gateways:
28+
local:
29+
connection:
30+
type: duckdb
31+
database: ':memory:'
32+
33+
default_gateway: local
34+
35+
model_defaults:
36+
dialect: duckdb
37+
"""
38+
with open(os.path.join(tmpdir, "config.yaml"), "w") as f:
39+
f.write(config_content)
40+
41+
# Create models directory
42+
models_dir = os.path.join(tmpdir, "models")
43+
os.makedirs(models_dir)
44+
45+
# Create a simple model
46+
model_content = """
47+
MODEL (
48+
name test_db.source_data,
49+
kind FULL
50+
);
51+
52+
SELECT 1 as id, 'foo' as name
53+
UNION ALL
54+
SELECT 2 as id, 'bar' as name
55+
"""
56+
with open(os.path.join(models_dir, "source_data.sql"), "w") as f:
57+
f.write(model_content)
58+
59+
# Create a downstream model to show lineage
60+
downstream_content = """
61+
MODEL (
62+
name test_db.processed_data,
63+
kind FULL
64+
);
65+
66+
SELECT
67+
id,
68+
name,
69+
UPPER(name) as name_upper
70+
FROM test_db.source_data
71+
"""
72+
with open(os.path.join(models_dir, "processed_data.sql"), "w") as f:
73+
f.write(downstream_content)
74+
75+
# Now import and run SQLMesh
76+
from sqlmesh import Context
77+
78+
# Create context for our test project
79+
ctx = Context(paths=[tmpdir])
80+
81+
print(f"\nLoaded {len(ctx.models)} models")
82+
print("Models:", list(ctx.models.keys()))
83+
84+
# Check that our console is installed
85+
from sqlmesh.core.console import get_console
86+
console = get_console()
87+
print(f"\nConsole type: {type(console).__name__}")
88+
print(f"Is OpenLineageConsole: {type(console).__name__ == 'OpenLineageConsole'}")
89+
90+
# Patch emitter to print events
91+
from sqlmesh_openlineage.console import OpenLineageConsole
92+
console = get_console()
93+
if isinstance(console, OpenLineageConsole):
94+
original_emit = console._emitter.client.emit
95+
def debug_emit(event):
96+
print(f"\n[OpenLineage Event] {event.eventType.name}: {event.job.name}")
97+
if event.inputs:
98+
print(f" Inputs: {[i.name for i in event.inputs]}")
99+
if event.outputs:
100+
print(f" Outputs: {[o.name for o in event.outputs]}")
101+
if hasattr(event, 'run') and event.run and event.run.facets:
102+
print(f" Facets: {list(event.run.facets.keys())}")
103+
return original_emit(event)
104+
console._emitter.client.emit = debug_emit
105+
106+
# Run a plan to trigger model evaluation
107+
print("\n--- Running SQLMesh plan (this should emit OpenLineage events) ---\n")
108+
try:
109+
ctx.plan(auto_apply=True, no_prompts=True)
110+
print("\n--- Plan completed successfully ---")
111+
except Exception as e:
112+
print(f"Error during plan: {e}")
113+
import traceback
114+
traceback.print_exc()
115+
116+
# Cleanup
117+
shutil.rmtree(tmpdir)
118+
print(f"\nCleaned up temp directory: {tmpdir}")
119+
print("\nIntegration test complete!")

tests/test_integration.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
"""Integration test that runs SQLMesh and verifies OpenLineage events."""
2+
from __future__ import annotations
3+
4+
import os
5+
import sys
6+
import tempfile
7+
import shutil
8+
import pytest
9+
from unittest.mock import MagicMock, patch
10+
11+
# Add sqlmesh to path for tests
12+
sys.path.insert(0, "/Users/nico/Code/sqlmesh")
13+
14+
15+
class TestOpenLineageIntegration:
16+
"""Integration tests for OpenLineage event emission."""
17+
18+
@pytest.fixture
19+
def temp_project(self):
20+
"""Create a temporary SQLMesh project."""
21+
tmpdir = tempfile.mkdtemp(prefix="sqlmesh_ol_test_")
22+
23+
# Create config.yaml
24+
config_content = """
25+
gateways:
26+
local:
27+
connection:
28+
type: duckdb
29+
database: ':memory:'
30+
31+
default_gateway: local
32+
33+
model_defaults:
34+
dialect: duckdb
35+
"""
36+
with open(os.path.join(tmpdir, "config.yaml"), "w") as f:
37+
f.write(config_content)
38+
39+
# Create models directory
40+
models_dir = os.path.join(tmpdir, "models")
41+
os.makedirs(models_dir)
42+
43+
# Create source model
44+
source_content = """
45+
MODEL (
46+
name test_db.source_data,
47+
kind FULL,
48+
columns (
49+
id INT,
50+
name VARCHAR
51+
)
52+
);
53+
54+
SELECT 1 as id, 'foo' as name
55+
UNION ALL
56+
SELECT 2 as id, 'bar' as name
57+
"""
58+
with open(os.path.join(models_dir, "source_data.sql"), "w") as f:
59+
f.write(source_content)
60+
61+
# Create downstream model with dependency
62+
downstream_content = """
63+
MODEL (
64+
name test_db.processed_data,
65+
kind FULL,
66+
columns (
67+
id INT,
68+
name VARCHAR,
69+
name_upper VARCHAR
70+
)
71+
);
72+
73+
SELECT
74+
id,
75+
name,
76+
UPPER(name) as name_upper
77+
FROM test_db.source_data
78+
"""
79+
with open(os.path.join(models_dir, "processed_data.sql"), "w") as f:
80+
f.write(downstream_content)
81+
82+
yield tmpdir
83+
84+
# Cleanup
85+
shutil.rmtree(tmpdir)
86+
87+
@pytest.fixture
88+
def captured_events(self):
89+
"""Fixture to capture emitted OpenLineage events."""
90+
events = []
91+
return events
92+
93+
def test_full_run_emits_correct_events(self, temp_project, captured_events):
94+
"""Test that a full SQLMesh run emits correct OpenLineage events."""
95+
import sqlmesh_openlineage
96+
from sqlmesh.core import console as sqlmesh_console
97+
98+
# Reset console state completely
99+
sqlmesh_openlineage._installed = False
100+
sqlmesh_console._console = None
101+
102+
# Install with explicit config
103+
sqlmesh_openlineage.install(
104+
url="console://localhost",
105+
namespace="test_namespace",
106+
)
107+
108+
from sqlmesh import Context
109+
from sqlmesh.core.console import get_console
110+
from sqlmesh_openlineage.console import OpenLineageConsole
111+
112+
# Get the console and patch emit to capture events
113+
console = get_console()
114+
assert isinstance(console, OpenLineageConsole)
115+
116+
original_emit = console._emitter.client.emit
117+
118+
def capture_emit(event):
119+
captured_events.append(event)
120+
return original_emit(event)
121+
122+
console._emitter.client.emit = capture_emit
123+
124+
# Create context and run plan
125+
ctx = Context(paths=[temp_project])
126+
ctx.plan(auto_apply=True, no_prompts=True)
127+
128+
# Verify we got the expected events
129+
assert len(captured_events) == 4, f"Expected 4 events, got {len(captured_events)}"
130+
131+
# Sort events by job name then by event type for predictable order
132+
from openlineage.client.event_v2 import RunState
133+
134+
start_events = [e for e in captured_events if e.eventType == RunState.START]
135+
complete_events = [e for e in captured_events if e.eventType == RunState.COMPLETE]
136+
137+
assert len(start_events) == 2, "Expected 2 START events"
138+
assert len(complete_events) == 2, "Expected 2 COMPLETE events"
139+
140+
# Find source_data events
141+
source_start = next(e for e in start_events if "source_data" in e.job.name)
142+
source_complete = next(e for e in complete_events if "source_data" in e.job.name)
143+
144+
# Find processed_data events
145+
processed_start = next(e for e in start_events if "processed_data" in e.job.name)
146+
processed_complete = next(e for e in complete_events if "processed_data" in e.job.name)
147+
148+
# Verify source_data START event
149+
assert source_start.job.namespace == "test_namespace"
150+
assert len(source_start.inputs) == 0, "source_data should have no inputs"
151+
assert len(source_start.outputs) == 1
152+
assert "source_data" in source_start.outputs[0].name
153+
154+
# Verify source_data COMPLETE event
155+
assert source_complete.run.facets is not None
156+
assert "sqlmesh_execution" in source_complete.run.facets
157+
158+
# Verify processed_data START event has source_data as input (LINEAGE!)
159+
assert len(processed_start.inputs) == 1, "processed_data should have 1 input"
160+
assert "source_data" in processed_start.inputs[0].name
161+
assert len(processed_start.outputs) == 1
162+
assert "processed_data" in processed_start.outputs[0].name
163+
164+
# Verify schema is captured in output
165+
output_facets = processed_start.outputs[0].facets
166+
assert output_facets is not None
167+
assert "schema" in output_facets
168+
schema_facet = output_facets["schema"]
169+
assert len(schema_facet.fields) == 3 # id, name, name_upper
170+
171+
# Verify processed_data COMPLETE event
172+
assert processed_complete.run.facets is not None
173+
assert "sqlmesh_execution" in processed_complete.run.facets
174+
175+
def test_audit_failure_emits_fail_event(self, temp_project, captured_events):
176+
"""Test that audit failures emit FAIL events."""
177+
# Create a model with a failing audit
178+
models_dir = os.path.join(temp_project, "models")
179+
failing_model = """
180+
MODEL (
181+
name test_db.failing_model,
182+
kind FULL,
183+
audits (
184+
assert_positive_id
185+
)
186+
);
187+
188+
SELECT -1 as id, 'negative' as name;
189+
190+
AUDIT (
191+
name assert_positive_id
192+
);
193+
194+
SELECT * FROM @this_model WHERE id < 0;
195+
"""
196+
with open(os.path.join(models_dir, "failing_model.sql"), "w") as f:
197+
f.write(failing_model)
198+
199+
import sqlmesh_openlineage
200+
from sqlmesh.core import console as sqlmesh_console
201+
202+
# Reset console state completely
203+
sqlmesh_openlineage._installed = False
204+
sqlmesh_console._console = None
205+
206+
# Install with explicit config
207+
sqlmesh_openlineage.install(
208+
url="console://localhost",
209+
namespace="test_namespace",
210+
)
211+
212+
from sqlmesh import Context
213+
from sqlmesh.core.console import get_console
214+
from sqlmesh_openlineage.console import OpenLineageConsole
215+
216+
console = get_console()
217+
assert isinstance(console, OpenLineageConsole)
218+
219+
original_emit = console._emitter.client.emit
220+
221+
def capture_emit(event):
222+
captured_events.append(event)
223+
return original_emit(event)
224+
225+
console._emitter.client.emit = capture_emit
226+
227+
ctx = Context(paths=[temp_project])
228+
229+
# Run plan - this should produce FAIL events for audit failures
230+
try:
231+
ctx.plan(auto_apply=True, no_prompts=True)
232+
except Exception:
233+
pass # Expected to fail
234+
235+
from openlineage.client.event_v2 import RunState
236+
237+
fail_events = [e for e in captured_events if e.eventType == RunState.FAIL]
238+
239+
# We should have at least one FAIL event from the audit failure
240+
# Note: depends on whether audit actually runs
241+
if fail_events:
242+
fail_event = fail_events[0]
243+
assert fail_event.run.facets is not None
244+
assert "errorMessage" in fail_event.run.facets

0 commit comments

Comments
 (0)