Skip to content

Commit 2b4e4c1

Browse files
authored
feat(terminal): add [REMOTE] tag to dispatch log lines (#308)
* feat(AE-2646): add [REMOTE] tag to LB stub dispatch logs * feat(AE-2646): add [REMOTE] tag to QB dispatch log ServerlessResource.run() now prefixes its dispatch log line with [REMOTE] so users can clearly identify that execution is happening on Runpod cloud infrastructure, not locally. * feat(AE-2646): add [REMOTE] tag to LB stub dispatch logs * fix(terminal): add [REMOTE] tag to runsync dispatch log - Add [REMOTE] prefix to runsync _fetch_job log line for consistency with run() dispatch log - Add test_runsync_logs_remote_prefix_on_dispatch test
1 parent 5928dcb commit 2b4e4c1

6 files changed

Lines changed: 91 additions & 10 deletions

File tree

src/runpod_flash/cli/commands/_run_server_helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,13 @@ async def lb_execute(resource_config, func, body: dict):
123123
if routing and routing.get("method")
124124
else func.__name__
125125
)
126-
log.info(f"{resource_config} | {route_label}")
126+
log.info(f"[REMOTE] {resource_config} | {route_label}")
127127

128128
try:
129129
result = await stub(
130130
func, dependencies, system_dependencies, accelerate_downloads, **kwargs
131131
)
132-
log.info(f"{resource_config} | Execution complete")
132+
log.info(f"[REMOTE] {resource_config} | Execution complete")
133133
return result
134134
except TimeoutError as e:
135135
raise HTTPException(status_code=504, detail=str(e))

src/runpod_flash/core/resources/serverless.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,7 +1264,7 @@ async def runsync(self, payload: Dict[str, Any]) -> "JobOutput":
12641264
)
12651265

12661266
def _fetch_job():
1267-
log.info(f"{self} | API /runsync")
1267+
log.info(f"[REMOTE] {self} | API /runsync")
12681268
return self.endpoint.rp_client.post(
12691269
f"{self.id}/runsync", payload, timeout=timeout_s
12701270
)
@@ -1294,7 +1294,7 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput":
12941294

12951295
try:
12961296
# Create a job using the endpoint
1297-
log.info(f"{self} | API /run")
1297+
log.info(f"[REMOTE] {self} | API /run")
12981298
job = await asyncio.to_thread(self.endpoint.run, request_input=payload)
12991299

13001300
log_subgroup = f"Job:{job.job_id}"

src/runpod_flash/stubs/load_balancer_sls.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ async def _execute_via_user_route(
321321

322322
# Construct full URL
323323
url = f"{self.server.endpoint_url}{path}"
324-
log.info(f"{self.server} | {method} {path}")
324+
log.info(f"[REMOTE] {self.server} | {method} {path}")
325325

326326
try:
327327
async with get_authenticated_httpx_client(
@@ -330,7 +330,7 @@ async def _execute_via_user_route(
330330
response = await client.request(method, url, json=body)
331331
response.raise_for_status()
332332
result = response.json()
333-
log.info(f"{self.server} | Execution complete")
333+
log.info(f"[REMOTE] {self.server} | Execution complete")
334334
log.debug(
335335
f"User route execution successful (type={type(result).__name__})"
336336
)

tests/unit/cli/commands/test_run_server_helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ async def fake_func(x: int):
255255

256256
assert result == 42
257257
info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO]
258-
assert any("GET /images/{filename}" in m for m in info_messages)
259-
assert any("Execution complete" in m for m in info_messages)
258+
assert any("[REMOTE]" in m and "GET /images/{filename}" in m for m in info_messages)
259+
assert any("[REMOTE]" in m and "Execution complete" in m for m in info_messages)
260260

261261

262262
@pytest.mark.asyncio

tests/unit/resources/test_serverless.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,87 @@ async def test_run_async_success(self):
12131213
assert result.id == "job-123"
12141214
assert result.status == "COMPLETED"
12151215

1216+
@pytest.mark.asyncio
1217+
async def test_run_logs_remote_prefix_on_dispatch(self):
1218+
"""run() must emit [REMOTE] prefix so users know execution is on Runpod cloud."""
1219+
serverless = ServerlessResource(name="test")
1220+
serverless.id = "endpoint-123"
1221+
1222+
mock_job = MagicMock()
1223+
mock_job.job_id = "job-123"
1224+
mock_job.status.return_value = "COMPLETED"
1225+
mock_job._fetch_job.return_value = {
1226+
"id": "job-123",
1227+
"workerId": "worker-456",
1228+
"status": "COMPLETED",
1229+
"delayTime": 1000,
1230+
"executionTime": 2000,
1231+
"output": {"result": "success"},
1232+
}
1233+
1234+
mock_endpoint = MagicMock()
1235+
mock_endpoint.run.return_value = mock_job
1236+
1237+
with patch.object(
1238+
type(serverless),
1239+
"endpoint",
1240+
new_callable=lambda: property(lambda self: mock_endpoint),
1241+
):
1242+
with patch("asyncio.sleep"):
1243+
with patch("runpod_flash.core.resources.serverless.log") as mock_log:
1244+
await serverless.run({"input": "test data"})
1245+
1246+
dispatch_calls = [
1247+
call for call in mock_log.info.call_args_list if "API /run" in str(call)
1248+
]
1249+
assert len(dispatch_calls) == 1, "Expected exactly one 'API /run' log call"
1250+
log_message = dispatch_calls[0].args[0]
1251+
assert "[REMOTE]" in log_message, (
1252+
f"Expected [REMOTE] in log message, got: {log_message!r}"
1253+
)
1254+
assert "API /run" in log_message, (
1255+
f"Expected 'API /run' in log message, got: {log_message!r}"
1256+
)
1257+
1258+
@pytest.mark.asyncio
1259+
async def test_runsync_logs_remote_prefix_on_dispatch(self):
1260+
"""runsync() must emit [REMOTE] prefix so users know execution is on Runpod cloud."""
1261+
serverless = ServerlessResource(name="test")
1262+
serverless.id = "endpoint-123"
1263+
1264+
mock_rp_client = MagicMock()
1265+
mock_rp_client.post.return_value = {
1266+
"id": "job-123",
1267+
"workerId": "worker-456",
1268+
"status": "COMPLETED",
1269+
"delayTime": 1000,
1270+
"executionTime": 2000,
1271+
"output": {"result": "success"},
1272+
}
1273+
1274+
mock_endpoint = MagicMock()
1275+
mock_endpoint.rp_client = mock_rp_client
1276+
1277+
with patch.object(
1278+
type(serverless),
1279+
"endpoint",
1280+
new_callable=lambda: property(lambda self: mock_endpoint),
1281+
):
1282+
with patch("runpod_flash.core.resources.serverless.log") as mock_log:
1283+
await serverless.runsync({"input": "test data"})
1284+
1285+
dispatch_calls = [
1286+
call for call in mock_log.info.call_args_list if "API /runsync" in str(call)
1287+
]
1288+
assert len(dispatch_calls) == 1, "Expected exactly one 'API /runsync' log call"
1289+
log_message = dispatch_calls[0].args[0]
1290+
assert "[REMOTE]" in log_message, (
1291+
f"Expected [REMOTE] in log message, got: {log_message!r}"
1292+
)
1293+
assert "API /runsync" in log_message, (
1294+
f"Expected 'API /runsync' in log message, got: {log_message!r}"
1295+
)
1296+
12161297
@pytest.mark.asyncio
12171298
async def test_run_async_dedupes_stdout_against_streamed_pod_logs(self):
12181299
serverless = ServerlessResource(name="test")

tests/unit/test_load_balancer_sls_stub.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,5 +482,5 @@ def add(x, y):
482482
await stub._execute_via_user_route(add, "POST", "/api/add", 5, 3)
483483

484484
info_messages = [r.message for r in caplog.records if r.levelno == logging.INFO]
485-
assert any("POST /api/add" in m for m in info_messages)
486-
assert any("Execution complete" in m for m in info_messages)
485+
assert any("[REMOTE]" in m and "POST /api/add" in m for m in info_messages)
486+
assert any("[REMOTE]" in m and "Execution complete" in m for m in info_messages)

0 commit comments

Comments
 (0)