Skip to content

Commit 70ba6e7

Browse files
enable metrics for harness worker and bugfix for client losing its connection (#397)
1 parent 387d6db commit 70ba6e7

11 files changed

Lines changed: 715 additions & 59 deletions

File tree

.github/workflows/harness-image.yml

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ on:
1313
release:
1414
types: [published]
1515
workflow_dispatch:
16+
inputs:
17+
deploy:
18+
description: "Dispatch downstream deploy after the image is built"
19+
type: boolean
20+
default: true
1621

1722
concurrency:
1823
group: ${{ github.workflow }}-${{ github.ref }}
@@ -41,13 +46,21 @@ jobs:
4146
username: ${{ github.actor }}
4247
password: ${{ secrets.GITHUB_TOKEN }}
4348

49+
- name: Calculate branch tag
50+
id: vars
51+
shell: bash
52+
run: |
53+
BRANCH="${{ github.ref_name }}"
54+
CLEANED_BRANCH_NAME=$(echo "$BRANCH" | tr '/' '-' | tr '[:upper:]' '[:lower:]')
55+
echo "cleaned-branch-name=$CLEANED_BRANCH_NAME" >> "$GITHUB_OUTPUT"
56+
4457
- name: Docker metadata
4558
id: meta
4659
uses: docker/metadata-action@v5
4760
with:
4861
images: ghcr.io/conductor-oss/python-sdk/harness-worker
4962
tags: |
50-
type=raw,value=latest
63+
type=raw,value=${{ steps.vars.outputs.cleaned-branch-name }}-latest,enable=${{ github.event_name != 'release' }}
5164
type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }}
5265
5366
- name: Build and push
@@ -59,9 +72,17 @@ jobs:
5972
platforms: linux/amd64,linux/arm64
6073
push: true
6174
tags: ${{ steps.meta.outputs.tags }}
75+
# Registry-backed BuildKit cache. Unchanged layers are reused across
76+
# runs so rebuilding the same commit (or one with only minor diffs)
77+
# is near-instant. The `:buildcache` tag lives alongside the image
78+
# but only stores layer blobs, not a runnable image.
79+
cache-from: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache
80+
cache-to: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache,mode=max
6281

6382
dispatch-deploy:
64-
if: github.event_name == 'release'
83+
if: |
84+
github.event_name == 'release' ||
85+
(github.event_name == 'workflow_dispatch' && inputs.deploy)
6586
needs: build-and-push
6687
runs-on: ubuntu-latest
6788
permissions:

.github/workflows/pull_request.yml

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ jobs:
1010
unit-test:
1111
runs-on: ubuntu-latest
1212
env:
13-
COVERAGE_FILE: coverage.xml
1413
COVERAGE_DIR: .coverage-reports
1514
steps:
1615
- name: Checkout code
@@ -67,31 +66,29 @@ jobs:
6766
6867
- name: Generate coverage report
6968
id: coverage_report
70-
continue-on-error: true
7169
run: |
7270
coverage combine ${{ env.COVERAGE_DIR }}/.coverage.*
7371
coverage report
74-
coverage xml
72+
coverage xml -o coverage.xml
7573
7674
- name: Verify coverage file
7775
id: verify_coverage
7876
if: always()
79-
continue-on-error: true
8077
run: |
81-
if [ ! -s "${{ env.COVERAGE_FILE }}" ]; then
82-
echo "Coverage file is empty or does not exist"
83-
ls -la ${{ env.COVERAGE_FILE }} ${{ env.COVERAGE_DIR }}
78+
if [ ! -s coverage.xml ]; then
79+
echo "coverage.xml is empty or does not exist"
80+
ls -la coverage.xml ${{ env.COVERAGE_DIR }} || true
8481
exit 1
8582
fi
86-
echo "Coverage file exists and is not empty"
83+
echo "coverage.xml exists and is not empty"
8784
8885
- name: Upload coverage to Codecov
8986
if: always() && steps.verify_coverage.outcome == 'success'
9087
continue-on-error: true
9188
uses: codecov/codecov-action@v3
9289
with:
9390
token: ${{ secrets.CODECOV_TOKEN }}
94-
file: ${{ env.COVERAGE_FILE }}
91+
file: coverage.xml
9592

9693
- name: Check test results
9794
if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure'

harness/README.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,32 @@ docker run -d \
4848
python-sdk-harness
4949
```
5050
51-
You can also run the harness locally without Docker (from the repo root):
51+
You can also run the harness locally without Docker (from the repo root).
52+
A virtual environment keeps dependencies isolated from your system Python:
5253

5354
```bash
54-
# Install the SDK in development mode (one-time)
55+
# Check if a .venv already exists
56+
ls .venv/bin/activate 2>/dev/null && echo "venv exists" || echo "no venv found"
57+
58+
# Create one if needed (one-time)
59+
python3 -m venv .venv
60+
61+
# Activate it (required every time you open a new terminal)
62+
source .venv/bin/activate # Windows: .venv\Scripts\activate
63+
64+
# Verify you're in the venv (should print the .venv path)
65+
which python3
66+
67+
# Install the SDK in development mode (one-time, or after pulling new deps)
5568
pip3 install -e .
5669

70+
# When you're done, deactivate the venv to restore your normal shell
71+
deactivate
72+
```
73+
74+
Once the venv is active and the SDK is installed:
75+
76+
```bash
5777
export CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api
5878
export CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY
5979
export CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET

harness/main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from conductor.client.automator.task_handler import TaskHandler
88
from conductor.client.configuration.configuration import Configuration
9+
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
910
from conductor.client.http.models.task_def import TaskDef
1011
from conductor.client.orkes_clients import OrkesClients
1112
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
@@ -79,10 +80,15 @@ def main() -> None:
7980
worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms)
8081
workers.append(worker)
8182

83+
metrics_port = env_int_or_default("HARNESS_METRICS_PORT", 9991)
84+
metrics_settings = MetricsSettings(http_port=metrics_port)
85+
print(f"Prometheus metrics will be served on port {metrics_port}")
86+
8287
task_handler = TaskHandler(
8388
workers=workers,
8489
configuration=configuration,
8590
scan_for_annotated_workers=False,
91+
metrics_settings=metrics_settings,
8692
)
8793

8894
workflow_executor = clients.get_workflow_executor()

harness/manifests/configmap-gcp.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ metadata:
99
labels:
1010
app: python-sdk-harness-worker
1111
data:
12-
CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api"
13-
CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a"
12+
CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.io/api"
13+
CONDUCTOR_AUTH_KEY: "25b681c1-34ec-11f1-b07a-9601c7a63373"

harness/manifests/deployment.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ spec:
1818
# note: imagePullSecrets is not needed for public images
1919
containers:
2020
- name: harness
21-
image: ghcr.io/conductor-oss/python-sdk/harness-worker:latest
21+
image: ghcr.io/conductor-oss/python-sdk/harness-worker:main-latest
2222
imagePullPolicy: Always
2323
env:
2424
# === CONDUCTOR CONNECTION (from per-cloud ConfigMap) ===
@@ -51,6 +51,11 @@ spec:
5151
- name: HARNESS_POLL_INTERVAL_MS
5252
value: "100"
5353

54+
ports:
55+
- name: metrics
56+
containerPort: 9991
57+
protocol: TCP
58+
5459
resources:
5560
requests:
5661
memory: "256Mi"

src/conductor/client/automator/task_runner.py

Lines changed: 119 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,25 @@ def __init__(
8282
)
8383
)
8484

85-
# Auth failure backoff tracking to prevent retry storms
85+
# Auth failure backoff tracking to prevent retry storms.
86+
# `_auth_failures` is capped at `_max_auth_failure_exp` so that
87+
# 2**N cannot overflow on a long-lived worker whose auth is broken.
88+
# The resulting sleep is further clamped to `_auth_backoff_cap_seconds`.
8689
self._auth_failures = 0
8790
self._last_auth_failure = 0
91+
self._auth_backoff_cap_seconds = 60
92+
self._max_auth_failure_exp = 6 # 2**6 = 64s, sleep clamped to cap
93+
94+
# Generic poll-failure backoff. This is distinct from the empty-poll
95+
# adaptive delay (`_consecutive_empty_polls`) and from the auth-error
96+
# backoff above. It kicks in when batch_poll raises an exception
97+
# (server 5xx, NGINX 502/504 under load, DNS hiccup, a closed httpx
98+
# client that couldn't heal, etc.) so we don't hot-loop the log with
99+
# stack traces while waiting for the server to recover.
100+
self._poll_failures = 0
101+
self._last_poll_failure = 0
102+
self._poll_backoff_cap_seconds = 120 # max 2 minutes between retries
103+
self._max_poll_failure_exp = 7 # 2**7 = 128s, sleep clamped to cap
88104

89105
# Thread pool for concurrent task execution
90106
# thread_count from worker configuration controls concurrency
@@ -567,15 +583,33 @@ def __batch_poll_tasks(self, count: int) -> list:
567583
logger.debug("Stop polling task for: %s", task_definition_name)
568584
return []
569585

570-
# Apply exponential backoff if we have recent auth failures
586+
# Apply exponential backoff if we have recent auth failures.
571587
if self._auth_failures > 0:
572588
now = time.time()
573-
backoff_seconds = min(2 ** self._auth_failures, 60)
589+
backoff_seconds = min(
590+
2 ** min(self._auth_failures, self._max_auth_failure_exp),
591+
self._auth_backoff_cap_seconds,
592+
)
574593
time_since_last_failure = now - self._last_auth_failure
575594
if time_since_last_failure < backoff_seconds:
576595
time.sleep(0.1)
577596
return []
578597

598+
# Apply exponential backoff for generic poll failures (5xx, network
599+
# errors, closed-client runtime errors that couldn't self-heal, etc.).
600+
# Bounded at `_poll_backoff_cap_seconds` (2 min) to avoid log floods
601+
# without giving up on recovery.
602+
if self._poll_failures > 0:
603+
now = time.time()
604+
backoff_seconds = min(
605+
2 ** min(self._poll_failures, self._max_poll_failure_exp),
606+
self._poll_backoff_cap_seconds,
607+
)
608+
time_since_last_failure = now - self._last_poll_failure
609+
if time_since_last_failure < backoff_seconds:
610+
time.sleep(0.1)
611+
return []
612+
579613
# Publish PollStarted event (metrics collector will handle via event)
580614
self.event_dispatcher.publish(PollStarted(
581615
task_type=task_definition_name,
@@ -607,15 +641,20 @@ def __batch_poll_tasks(self, count: int) -> list:
607641
tasks_received=len(tasks) if tasks else 0
608642
))
609643

610-
# Success - reset auth failure counter (any successful HTTP response means auth is working)
644+
# Success - reset both failure counters (any successful HTTP
645+
# response means auth and connectivity are working).
611646
self._auth_failures = 0
647+
self._poll_failures = 0
612648

613649
return tasks if tasks else []
614650

615651
except AuthorizationException as auth_exception:
616652
self._auth_failures += 1
617653
self._last_auth_failure = time.time()
618-
backoff_seconds = min(2 ** self._auth_failures, 60)
654+
backoff_seconds = min(
655+
2 ** min(self._auth_failures, self._max_auth_failure_exp),
656+
self._auth_backoff_cap_seconds,
657+
)
619658

620659
# Publish PollFailure event (metrics collector will handle via event)
621660
self.event_dispatcher.publish(PollFailure(
@@ -643,10 +682,55 @@ def __batch_poll_tasks(self, count: int) -> list:
643682
duration_ms=(time.time() - start_time) * 1000,
644683
cause=e
645684
))
646-
logger.error(
647-
"Failed to batch poll task for: %s, reason: %s",
685+
686+
# Bump the poll-failure counter so the next poll waits with
687+
# exponential backoff instead of hot-looping on a broken server
688+
# or connection.
689+
self._poll_failures += 1
690+
self._last_poll_failure = time.time()
691+
backoff_seconds = min(
692+
2 ** min(self._poll_failures, self._max_poll_failure_exp),
693+
self._poll_backoff_cap_seconds,
694+
)
695+
696+
# Belt-and-suspenders: if the underlying httpx client got closed
697+
# and rest.request() couldn't heal it (e.g. because the error
698+
# arrived as a non-RuntimeError), nudge it here. Pass the current
699+
# connection as `expected` so concurrent threads racing to heal
700+
# can't cause a reset storm: only the first caller per client
701+
# generation actually replaces it.
702+
try:
703+
rest_client = getattr(
704+
getattr(self.task_client, "api_client", None),
705+
"rest_client",
706+
None,
707+
)
708+
if rest_client is not None and getattr(rest_client, "_is_client_closed", lambda: False)():
709+
current_conn = getattr(rest_client, "connection", None)
710+
reset = rest_client._reset_connection(expected=current_conn)
711+
if reset:
712+
logger.warning(
713+
"rest_client was closed after poll failure; reset"
714+
)
715+
except Exception:
716+
# Healing is best-effort; never let it mask the original error.
717+
pass
718+
719+
# Log a single-line warning at a modest level to avoid drowning
720+
# ops in tracebacks when the server is flapping. Full traceback
721+
# goes to debug for when operators need it.
722+
logger.warning(
723+
"Failed to batch poll task for: %s (failure #%d). Will retry with exponential backoff (%ss). Reason: %s: %s",
648724
task_definition_name,
649-
traceback.format_exc()
725+
self._poll_failures,
726+
backoff_seconds,
727+
type(e).__name__,
728+
e,
729+
)
730+
logger.debug(
731+
"batch poll failure traceback for %s:\n%s",
732+
task_definition_name,
733+
traceback.format_exc(),
650734
)
651735
return []
652736

@@ -915,15 +999,33 @@ def __update_task(self, task_result: TaskResult):
915999
self.metrics_collector.increment_task_update_error(
9161000
task_definition_name, type(e)
9171001
)
918-
logger.error(
919-
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
920-
attempt + 1,
921-
retry_count,
922-
task_result.task_id,
923-
task_result.workflow_instance_id,
924-
task_definition_name,
925-
traceback.format_exc()
926-
)
1002+
is_last_attempt = (attempt + 1) >= retry_count
1003+
# Known recoverable transport hiccups (stale keep-alive,
1004+
# HTTP/2 GOAWAY race, client closed mid-request) are flagged
1005+
# `transient=True` by the REST layer after it self-heals. For
1006+
# those, skip the stack trace until the final attempt — the
1007+
# retry normally succeeds immediately and a full traceback per
1008+
# in-flight task just spams the log.
1009+
if getattr(e, "transient", False) and not is_last_attempt:
1010+
logger.warning(
1011+
"Transient transport error updating task; will retry (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
1012+
attempt + 1,
1013+
retry_count,
1014+
task_result.task_id,
1015+
task_result.workflow_instance_id,
1016+
task_definition_name,
1017+
getattr(e, "reason", None) or str(e),
1018+
)
1019+
else:
1020+
logger.error(
1021+
"Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s",
1022+
attempt + 1,
1023+
retry_count,
1024+
task_result.task_id,
1025+
task_result.workflow_instance_id,
1026+
task_definition_name,
1027+
traceback.format_exc()
1028+
)
9271029
continue
9281030
except Exception as e:
9291031
last_exception = e

src/conductor/client/http/async_rest.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ async def request(self, method, url, query_params=None, headers=None,
185185
await self._reset_connection()
186186
if method in idempotent_methods:
187187
continue
188-
msg = f"Protocol error: {e}"
189-
raise ApiException(status=0, reason=msg)
188+
msg = f"Protocol error ({type(e).__name__}): {e}"
189+
raise ApiException(status=0, reason=msg, transient=True) from e
190190
except httpx.TimeoutException as e:
191191
msg = f"Request timeout: {e}"
192192
raise ApiException(status=0, reason=msg)
@@ -276,7 +276,10 @@ async def PATCH(self, url, headers=None, query_params=None, post_params=None,
276276

277277
class ApiException(Exception):
278278

279-
def __init__(self, status=None, reason=None, http_resp=None, body=None):
279+
def __init__(self, status=None, reason=None, http_resp=None, body=None,
280+
transient=False):
281+
# See rest.ApiException for a description of `transient`.
282+
self.transient = transient
280283
if http_resp:
281284
self.status = http_resp.status
282285
self.code = http_resp.status

0 commit comments

Comments
 (0)