Skip to content

Commit 8466d1c

Browse files
committed
Tests: Improve resource teardown by closing/disposing connections etc.
1 parent 7fd21e8 commit 8466d1c

1 file changed

Lines changed: 64 additions & 9 deletions

File tree

ingestr/main_test.py

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ def test_create_replace_csv_to_duckdb():
249249
for i, row in enumerate(actual_rows):
250250
assert res[i] == tuple(row)
251251

252+
# Clean up
253+
conn.close()
252254
try:
253255
os.remove(abs_db_path)
254256
except Exception:
@@ -272,7 +274,10 @@ def test_merge_with_primary_key_csv_to_duckdb():
272274
rel_db_path_to_command = f"ingestr/testdata/{dbname}"
273275
uri = f"duckdb:///{rel_db_path_to_command}"
274276

275-
conn = duckdb.connect(abs_db_path)
277+
# DuckDB is sensitive about multiple connections to the same database file.
278+
# Connection Error: Can't open a connection to same database file with a
279+
# different configuration than existing connections
280+
# conn = duckdb.connect(abs_db_path)
276281

277282
def run(source: str):
278283
res = invoke_ingest_command(
@@ -288,10 +293,13 @@ def run(source: str):
288293
return res
289294

290295
def get_output_rows():
296+
conn = duckdb.connect(abs_db_path)
291297
conn.execute("CHECKPOINT")
292-
return conn.sql(
298+
results = conn.sql(
293299
"select symbol, date, is_enabled, name from testschema_merge.output order by symbol asc"
294300
).fetchall()
301+
conn.close()
302+
return results
295303

296304
def assert_output_equals_to_csv(path: str):
297305
res = get_output_rows()
@@ -309,19 +317,23 @@ def assert_output_equals_to_csv(path: str):
309317
run("csv://ingestr/testdata/merge_part1.csv")
310318
assert_output_equals_to_csv("./testdata/merge_part1.csv")
311319

320+
conn = duckdb.connect(abs_db_path)
312321
first_run_id = conn.sql(
313322
"select _dlt_load_id from testschema_merge.output limit 1"
314323
).fetchall()[0][0]
324+
conn.close()
315325

316326
##############################
317327
# we'll run again, we don't expect any changes since the data hasn't changed
318328
run("csv://ingestr/testdata/merge_part1.csv")
319329
assert_output_equals_to_csv("./testdata/merge_part1.csv")
320330

321331
# we also ensure that the other rows were not touched
332+
conn = duckdb.connect(abs_db_path)
322333
count_by_run_id = conn.sql(
323334
"select _dlt_load_id, count(*) from testschema_merge.output group by 1"
324335
).fetchall()
336+
conn.close()
325337
assert len(count_by_run_id) == 1
326338
assert count_by_run_id[0][1] == 3
327339
assert count_by_run_id[0][0] == first_run_id
@@ -334,9 +346,11 @@ def assert_output_equals_to_csv(path: str):
334346
assert_output_equals_to_csv("./testdata/merge_expected.csv")
335347

336348
# let's check the runs
349+
conn = duckdb.connect(abs_db_path)
337350
count_by_run_id = conn.sql(
338351
"select _dlt_load_id, count(*) from testschema_merge.output group by 1 order by 1 asc"
339352
).fetchall()
353+
conn.close()
340354

341355
# we expect that there's a new load ID now
342356
assert len(count_by_run_id) == 2
@@ -349,6 +363,8 @@ def assert_output_equals_to_csv(path: str):
349363
assert count_by_run_id[1][1] == 3
350364
##############################
351365

366+
# Clean up
367+
# conn.close()
352368
try:
353369
os.remove(abs_db_path)
354370
except Exception:
@@ -446,6 +462,8 @@ def assert_output_equals_to_csv(path: str):
446462
assert count_by_run_id[1][1] == 4
447463
##############################
448464

465+
# Clean up
466+
conn.close()
449467
try:
450468
os.remove(abs_db_path)
451469
except Exception:
@@ -786,6 +804,7 @@ def db_to_db_create_replace(
786804
f"select count(*) from {schema_rand_prefix}.input"
787805
).fetchall()
788806
assert res[0][0] == 2
807+
source_engine.dispose()
789808

790809
result = invoke_ingest_command(
791810
source_connection_url,
@@ -803,6 +822,7 @@ def db_to_db_create_replace(
803822
res = dest_engine.execute(
804823
f"select id, val, updated_at from {schema_rand_prefix}.output"
805824
).fetchall()
825+
dest_engine.dispose()
806826

807827
assert len(res) == 2
808828

@@ -845,6 +865,7 @@ def db_to_db_append(
845865
f"select count(*) from {schema_rand_prefix}.input"
846866
).fetchall()
847867
assert res[0][0] == 2
868+
source_engine.dispose()
848869

849870
def run():
850871
res = invoke_ingest_command(
@@ -858,15 +879,16 @@ def run():
858879
)
859880
assert res.exit_code == 0
860881

861-
dest_engine = sqlalchemy.create_engine(dest_connection_url_read)
862-
863882
def get_output_table():
883+
dest_engine = sqlalchemy.create_engine(dest_connection_url_read)
864884
# CrateDB needs an explicit flush to make data available for reads immediately.
865885
if dest_engine.dialect.name == "crate":
866886
dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output")
867-
return dest_engine.execute(
887+
results = dest_engine.execute(
868888
f"select id, val, updated_at from {schema_rand_prefix}.output order by id asc"
869889
).fetchall()
890+
dest_engine.dispose()
891+
return results
870892

871893
run()
872894

@@ -1011,6 +1033,7 @@ def assert_output_equals(expected):
10111033
source_engine.execute(
10121034
f"UPDATE {schema_rand_prefix}.input SET val = 'val1_modified' WHERE id = 2"
10131035
)
1036+
source_engine.dispose()
10141037

10151038
run()
10161039
if dest_connection_url.startswith("cratedb://"):
@@ -1038,6 +1061,7 @@ def assert_output_equals(expected):
10381061
source_engine.execute(
10391062
f"INSERT INTO {schema_rand_prefix}.input VALUES (3, 'val3', '2022-01-01')"
10401063
)
1064+
source_engine.dispose()
10411065

10421066
run()
10431067
if dest_connection_url.startswith("cratedb://"):
@@ -1065,6 +1089,7 @@ def assert_output_equals(expected):
10651089
source_engine.execute(
10661090
f"INSERT INTO {schema_rand_prefix}.input VALUES (3, 'val3', '2022-02-02')"
10671091
)
1092+
source_engine.dispose()
10681093

10691094
run()
10701095
if dest_connection_url.startswith("cratedb://"):
@@ -1101,6 +1126,7 @@ def assert_output_equals(expected):
11011126
source_engine.execute(
11021127
f"UPDATE {schema_rand_prefix}.input SET val='val2_modified', updated_at = '2022-02-03' WHERE id = 2"
11031128
)
1129+
source_engine.dispose()
11041130

11051131
run()
11061132
if dest_connection_url.startswith("cratedb://"):
@@ -1130,6 +1156,7 @@ def assert_output_equals(expected):
11301156
# we don't care about the rest of the run IDs
11311157
assert count_by_run_id[1][1] == 1
11321158
assert count_by_run_id[2][1] == 1
1159+
dest_engine.dispose()
11331160
##############################
11341161

11351162

@@ -1167,6 +1194,7 @@ def db_to_db_delete_insert_without_primary_key(
11671194
f"select count(*) from {schema_rand_prefix}.input"
11681195
).fetchall()
11691196
assert res[0][0] == 2
1197+
source_engine.dispose()
11701198

11711199
def run():
11721200
res = invoke_ingest_command(
@@ -1189,9 +1217,11 @@ def get_output_rows():
11891217
# CrateDB needs an explicit flush to make data available for reads immediately.
11901218
if dest_engine.dialect.name == "crate":
11911219
dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output")
1192-
return dest_engine.execute(
1220+
results = dest_engine.execute(
11931221
f"select id, val, updated_at from {schema_rand_prefix}.output order by id asc"
11941222
).fetchall()
1223+
dest_engine.dispose()
1224+
return results
11951225

11961226
def assert_output_equals(expected):
11971227
res = get_output_rows()
@@ -1245,6 +1275,7 @@ def assert_output_equals(expected):
12451275
source_engine.execute(
12461276
f"INSERT INTO {schema_rand_prefix}.input VALUES (3, 'val3', '2022-02-01'), (4, 'val4', '2022-02-01')"
12471277
)
1278+
source_engine.dispose()
12481279

12491280
run()
12501281
if dest_connection_url.startswith("cratedb://"):
@@ -1314,6 +1345,7 @@ def db_to_db_delete_insert_with_timerange(
13141345
f"select count(*) from {schema_rand_prefix}.input"
13151346
).fetchall()
13161347
assert res[0][0] == 6
1348+
source_engine.dispose()
13171349

13181350
def run(start_date: str, end_date: str):
13191351
res = invoke_ingest_command(
@@ -1451,6 +1483,7 @@ def assert_output_equals(expected):
14511483
source_engine.execute(
14521484
f"UPDATE {schema_rand_prefix}.input SET val = 'val1_modified' WHERE id = 1"
14531485
)
1486+
source_engine.dispose()
14541487

14551488
run("2022-01-01", "2022-01-02")
14561489
assert_output_equals(
@@ -1671,6 +1704,7 @@ def run_command(
16711704
).fetchall()
16721705
assert res[0][0] == "some value"
16731706
assert res[0][1] == row_count
1707+
dest_engine.dispose()
16741708

16751709

16761710
@pytest.mark.parametrize(
@@ -1805,6 +1839,7 @@ def build_datetime(ds: str):
18051839
assert res[0][1] == row_count
18061840
assert res[1][0] == build_datetime("2024-11-06")
18071841
assert res[1][1] == 1000
1842+
dest_engine.dispose()
18081843

18091844

18101845
@pytest.mark.parametrize(
@@ -1915,6 +1950,7 @@ def run_command(df: pd.DataFrame):
19151950
).fetchall()
19161951
assert res[0][0] == "a"
19171952
assert res[0][1] == row_count + 1000
1953+
dest_engine.dispose()
19181954

19191955

19201956
@pytest.mark.parametrize(
@@ -1952,6 +1988,7 @@ def test_db_to_db_exclude_columns(source, dest):
19521988
f"select count(*) from {schema_rand_prefix}.input"
19531989
).fetchall()
19541990
assert res[0][0] == 2
1991+
source_engine.dispose()
19551992
result = invoke_ingest_command(
19561993
source_uri,
19571994
f"{schema_rand_prefix}.input",
@@ -1984,6 +2021,9 @@ def test_db_to_db_exclude_columns(source, dest):
19842021
f"SELECT column_name FROM information_schema.columns WHERE table_schema = '{schema_rand_prefix}' AND table_name = 'output'"
19852022
).fetchall()
19862023
assert columns == [("id",), ("val",), ("updated_at",)]
2024+
2025+
# Clean up
2026+
dest_engine.dispose()
19872027
source.stop()
19882028
dest.stop()
19892029

@@ -2404,7 +2444,7 @@ def get_query_result(uri: str, query: str):
24042444
engine = sqlalchemy.create_engine(uri, poolclass=NullPool)
24052445
with engine.connect() as conn:
24062446
res = conn.execute(query).fetchall()
2407-
2447+
engine.dispose()
24082448
return res
24092449

24102450

@@ -2657,9 +2697,15 @@ def run():
26572697
@pytest.mark.parametrize("source", list(SOURCES.values()), ids=list(SOURCES.keys()))
26582698
@pytest.mark.parametrize("testcase", custom_query_tests())
26592699
def test_custom_query(testcase, source, dest):
2660-
dest_uri = dest.container.get_connection_url()
2700+
with ThreadPoolExecutor() as executor:
2701+
source_future = executor.submit(source.start)
2702+
dest_future = executor.submit(dest.start)
2703+
source_uri = source_future.result()
2704+
dest_uri = dest_future.result()
26612705
dest_uri_read = get_uri_read(dest_uri, dest)
2662-
testcase(source.start(), dest.start(), dest_uri_read)
2706+
testcase(source_uri, dest_uri, dest_uri_read)
2707+
source.stop()
2708+
dest.stop()
26632709

26642710

26652711
# Integration testing when the access token is not provided, and it is only for the resource "repo_events
@@ -2682,6 +2728,7 @@ def test_github_to_duckdb(dest):
26822728
if dest_engine.dialect.name == "crate":
26832729
dest_engine.execute(f"REFRESH TABLE {dest_table}")
26842730
res = dest_engine.execute(f"select count(*) from {dest_table}").fetchall()
2731+
dest_engine.dispose()
26852732
assert len(res) > 0
26862733

26872734

@@ -2967,6 +3014,7 @@ def test_successful_ingestion(dest_uri, dest_uri_read):
29673014
if dest_engine.dialect.name == "crate":
29683015
dest_engine.execute(f"REFRESH TABLE {dest_table}")
29693016
count = dest_engine.execute(f"select count(*) from {dest_table}").fetchone()[0]
3017+
dest_engine.dispose()
29703018
assert count == 3
29713019

29723020
def test_incremental_ingestion(dest_uri, dest_uri_read):
@@ -3115,6 +3163,7 @@ def test_incremental_ingestion(dest_uri, dest_uri_read):
31153163
)
31163164
== 2
31173165
)
3166+
dest_engine.dispose()
31183167

31193168
return [
31203169
test_no_report_instances_found,
@@ -3209,6 +3258,7 @@ def assert_rows(dest_uri, dest_table, n):
32093258
rows = conn.execute(f"select count(*) from {dest_table}").fetchall()
32103259
assert len(rows) == 1
32113260
assert rows[0] == (n,)
3261+
engine.dispose()
32123262

32133263
def test_empty_source_uri(dest_uri, dest_uri_read):
32143264
"""
@@ -3628,6 +3678,7 @@ def exchange_rate_on_specific_date(dest_uri, dest_uri_read):
36283678
if dest_engine.dialect.name == "crate":
36293679
conn.execute(f"REFRESH TABLE {dest_table}")
36303680
rows = conn.execute(query).fetchall()
3681+
dest_engine.dispose()
36313682

36323683
# Assert that the rate for GBP is 0.82993
36333684
assert len(rows) > 0, "No data found for GBP"
@@ -3710,6 +3761,7 @@ def test_mysql_zero_dates(source, dest):
37103761
f"select count(*) from {schema_rand_prefix}.input"
37113762
).fetchall()
37123763
assert res[0][0] == 5
3764+
source_engine.dispose()
37133765

37143766
result = invoke_ingest_command(
37153767
source_uri,
@@ -3728,6 +3780,7 @@ def test_mysql_zero_dates(source, dest):
37283780
if dest_engine.dialect.name == "crate":
37293781
dest_engine.execute(f"REFRESH TABLE {schema_rand_prefix}.output")
37303782
res = dest_engine.execute(f"select * from {schema_rand_prefix}.output").fetchall()
3783+
dest_engine.dispose()
37313784

37323785
# assert there are no new rows, since DBs like DuckDB accept NULL and dlt adds a separate string column for the value `0000-00-00 00:00:00`
37333786
# we want 4 columns: name, created_at, _dlt_load_id, _dlt_id
@@ -4197,6 +4250,7 @@ def test_stripe_source_full_refresh(stripe_table):
41974250
assert res[0] > 0, f"No {stripe_table} records found"
41984251

41994252
# Clean up
4253+
conn.close()
42004254
try:
42014255
os.remove(abs_db_path)
42024256
except Exception:
@@ -4237,6 +4291,7 @@ def test_stripe_source_incremental(stripe_table):
42374291
assert res[0] > 0, f"No {stripe_table} records found"
42384292

42394293
# Clean up
4294+
conn.close()
42404295
try:
42414296
os.remove(abs_db_path)
42424297
except Exception:

0 commit comments

Comments
 (0)