Skip to content

Commit 64e228b

Browse files
Add integration test for heartbeat
1 parent eaae748 commit 64e228b

3 files changed

Lines changed: 75 additions & 4 deletions

File tree

etc/config.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ protocol.spooling.retrieval-mode=coordinator_proxy
1616
# Enable dynamic catalog management
1717
catalog.management=dynamic
1818

19-
# Disable http request log
20-
http-server.log.enabled=false
19+
# Enable HTTP request log as it's grepped in integration tests for the heartbeat mechanism
20+
http-server.log.enabled=true
21+
http-server.log.immediate-flush=true

tests/development_server.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616
TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
1717
TRINO_HOST = "localhost"
1818

19+
_trino_container: "DockerContainer | None" = None
20+
21+
22+
def get_trino_container() -> "DockerContainer | None":
23+
"""Return the Trino DockerContainer started by start_development_server(), or None
24+
if Trino was not started by this module (e.g. TRINO_RUNNING_HOST points to an
25+
existing server)."""
26+
return _trino_container
27+
1928

2029
def create_bucket(s3_client):
2130
bucket_name = "spooling"
@@ -99,7 +108,12 @@ def start_development_server(port=None, trino_version=TRINO_VERSION):
99108
# Otherwise some tests fail with No nodes available
100109
time.sleep(2)
101110

102-
yield localstack, trino, network
111+
global _trino_container
112+
_trino_container = trino
113+
try:
114+
yield localstack, trino, network
115+
finally:
116+
_trino_container = None
103117
finally:
104118
# Stop containers when exiting the context
105119
if trino:

tests/integration/test_dbapi_integration.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from tzlocal import get_localzone_name # type: ignore
2828

2929
import trino
30+
from tests.development_server import get_trino_container
3031
from tests.integration.conftest import trino_version
3132
from trino import constants
3233
from trino.client import InlineSegment
@@ -1896,7 +1897,7 @@ def test_segments_cursor(trino_connection):
18961897
assert isinstance(segment.segment.ack_uri, str), (
18971898
f"Expected string for ack_uri, got {type(segment.segment.ack_uri)}"
18981899
)
1899-
total += len(list(SegmentIterator(segment, row_mapper)))
1900+
total += len(list(SegmentIterator(segment, row_mapper, cur._query._request)))
19001901
assert total == 300875, f"Expected total rows 300875, got {total}"
19011902

19021903

@@ -2000,6 +2001,61 @@ def test_spooled_segments_lazy_description(trino_connection):
20002001
assert len(cur.fetchall()) == 60175
20012002

20022003

2004+
@pytest.mark.skipif(
2005+
trino_version() <= 466,
2006+
reason="spooling protocol was introduced in version 466"
2007+
)
2008+
def test_heartbeat_head_requests_during_spooled_download(run_trino):
2009+
"""Verify that heartbeat HEAD requests are sent to the coordinator while
2010+
downloading spooled segments from external storage."""
2011+
trino_container = get_trino_container()
2012+
if trino_container is None:
2013+
pytest.skip("cannot read Trino HTTP request log (Trino not started by this test session)")
2014+
2015+
host, port = run_trino
2016+
conn = trino.dbapi.Connection(
2017+
host=host, port=port, user="test", source="test",
2018+
max_attempts=1, encoding="json", heartbeat_interval=0.1,
2019+
)
2020+
2021+
container = trino_container.get_wrapped_container()
2022+
log_path = "/data/trino/var/log/http-request.log"
2023+
2024+
# Capture the current size of the HTTP request log
2025+
exit_code, output = container.exec_run(["wc", "-l", log_path])
2026+
if exit_code != 0:
2027+
pytest.skip("cannot read Trino HTTP request log (log disabled or path changed)")
2028+
logfile_lines = int(output.decode().split()[0])
2029+
2030+
cur = conn.cursor()
2031+
cur.execute("""SELECT l.*
2032+
FROM tpch.tiny.lineitem l, TABLE(sequence(
2033+
start => 1,
2034+
stop => 5,
2035+
step => 1)) n""")
2036+
cur.fetchall()
2037+
cur.close()
2038+
2039+
for try_ in range(10):
2040+
if try_:
2041+
# Sometimes trino needs time to flush the logs
2042+
t.sleep(1.0)
2043+
2044+
_, output = container.exec_run(["tail", "-n", f"+{logfile_lines}", log_path])
2045+
loglines = output.decode().splitlines()
2046+
head_requests = [
2047+
line for line in loglines
2048+
if "HEAD" in line and "/v1/statement/executing/" in line
2049+
]
2050+
if head_requests:
2051+
break
2052+
2053+
assert head_requests, (
2054+
"Expected heartbeat HEAD requests in http-request.log but found none.\n"
2055+
f"Log tail:\n{''.join(loglines)}"
2056+
)
2057+
2058+
20032059
def get_cursor(legacy_prepared_statements, run_trino):
20042060
host, port = run_trino
20052061

0 commit comments

Comments
 (0)