Priority Level
High (Major improvement)
Problem
Public async DataDesigner.create(...) runs do not expose durable capacity diagnostics that explain low throughput, delayed first checkpoints, zero final rows, or apparent endpoint idle time. For large fire-and-forget jobs, users can observe slow or missing artifact progress while model endpoints are completing healthy requests, but the public artifacts do not show enough scheduler, request-admission, row-group, or checkpoint state to diagnose the cause.
This makes it hard to distinguish between cases such as:
- no row group has completed yet because admitted row groups are still dependency-incomplete,
- scheduler task admission is saturated,
- request admission is saturated or cooling down after rate limits,
- queued model-task demand is accumulating,
- row-group admission is blocked by the active horizon or guardrails,
- capacity metadata is unavailable from the runtime.
Current workarounds require private scheduler instrumentation or external endpoint telemetry. Those are not available to typical public API users and do not fit fire-and-forget workflows.
Product Decisions
- Collect lightweight async capacity diagnostics automatically for async-engine runs. Users should not need
async_trace=True or a new opt-in flag.
- Treat the diagnostics event schema as a stable, versioned public v1 contract.
- Make the durable event artifact the primary v1 user-facing surface. Do not block v1 on a new
DatasetCreationResults helper.
- Persist machine-readable facts and explanation codes only. Do not emit prescriptive tuning advice such as “lower
buffer_size”.
- Use an append-only event-shaped artifact, not a periodically overwritten latest-state snapshot. Users and tools must be able to reconstruct the time sequence and state deltas by replaying events.
- Include verbatim provider/model/resource/column labels in v1 diagnostics, but never prompts, completions, row values, credentials, request payloads, or response payloads.
- Implement this at the async-engine diagnostics layer rather than designing separate public products for
create, acreate, and preview.
Goals
- Write a lightweight durable diagnostics event before the first row group checkpoints.
- Keep the event stream useful after interruption, including interruption before
metadata.json exists.
- Explain current and observed async capacity state with stable machine-readable event payloads.
- Preserve ordering, timing, and deltas so users can reconstruct what changed during a run.
- Make the artifact sufficient to answer:
- why has no row group checkpointed yet?
- when did queue pressure first appear or clear?
- when did request admission become saturated or recover?
- are row groups admitted but incomplete?
- which capacity limits were configured vs. observed over time?
- Keep
async_trace=True as a separate heavy per-task tracing feature. Capacity diagnostics must not require it.
Non-goals
- No automatic tuning decisions in v1.
- No natural-language diagnosis or tuning advice in the artifact.
- No prompt, response, row-value, credential, or request-payload persistence.
- No transport-pool utilization unless an adapter already exposes it cleanly.
- No per-task trace dump by default. The event stream should be coarse capacity diagnostics, not a replacement for
async_trace=True.
- No requirement to expose a new
DatasetCreationResults API in v1, though helpers for reading/replaying the event log can be added later.
Artifact Contract
Write an append-only JSON Lines artifact at the dataset root, beside metadata.json:
<artifact_root>/<resolved_dataset_name>/async_diagnostics.jsonl
Each complete line is one JSON event. The artifact is the source of truth for diagnostics history; a current snapshot can be reconstructed by replaying events. Implementations may keep an in-memory latest-state summary for emission logic, but v1 must not expose only an overwritten latest-state artifact.
Events must be appended durably enough for fire-and-forget diagnosis:
- append each event as one newline-terminated JSON object,
- flush/fsync after writes that mark run lifecycle, checkpoint, pressure-state change, handled interruption, early shutdown, failure, or completion,
- tolerate a final partial/truncated line after hard process termination by requiring readers to ignore an incomplete trailing line while preserving all prior complete events.
Emit events at minimum on:
- scheduler/job start,
- each async progress interval as a
capacity_sample event,
- row-group admission,
- row-group checkpoint,
- material scheduler pressure state changes,
- material request-admission pressure state changes,
- explanation-code appearance or clearance,
- handled cancellation or interruption when possible,
- early shutdown,
- final success or handled failure.
Event Envelope v1
The implementation may use dataclasses or pydantic models, but every event should preserve this conceptual envelope:
{
"schema_version": 1,
"engine": "async",
"run_id": "...",
"sequence": 0,
"event_id": "<run_id>:<sequence>",
"event_type": "run_started|capacity_sample|row_group_admitted|row_group_checkpointed|scheduler_pressure_changed|request_admission_pressure_changed|explanation_changed|run_completed|run_failed|run_cancelled|early_shutdown",
"emitted_at": "ISO-8601 timestamp",
"elapsed_seconds": 0.0,
"scope": {
"row_group": null,
"resource": null,
"provider_model": null,
"column": null
},
"payload": {}
}
sequence must be monotonically increasing within a run and is the primary ordering key. emitted_at is for humans and cross-system correlation; replay should use sequence.
Required Event Payloads
run_started
Must include configured run facts and static capacity facts known at startup:
{
"dataset_name": "...",
"target_num_records": 0,
"buffer_size": 0,
"row_group_count": 0,
"graph_column_count": 0,
"graph_depth": 0,
"configured_capacity": {},
"column_scheduling": []
}
capacity_sample
Must include point-in-time scheduler, request-admission, checkpoint, and observed-maximum facts. This is an event in the timeline, not an overwritten snapshot.
{
"dataset": {
"actual_num_records": 0,
"completed_row_groups": 0,
"active_row_groups": 0,
"oldest_open_row_group_age_seconds": null
},
"scheduler": {
"queued_total": 0,
"queued_by_group": {},
"queued_demand_by_resource": {},
"leased_resources": {},
"resource_limits": {},
"resources_available": {},
"in_flight_tasks": 0,
"active_workers": 0,
"row_group_admission_blocked_reasons": {}
},
"request_admission": {
"resources": {},
"provider_models": {}
},
"capacity": {
"configured": {},
"runtime_snapshot": {},
"observed_maxima": {}
},
"checkpointing": {
"last_checkpointed_row_group": null,
"last_checkpoint_age_seconds": null,
"row_groups_checkpointed": 0,
"records_checkpointed": 0
},
"active_explanations": []
}
The capacity section should be derived from the existing scheduler capacity-plan concepts where possible: configured capacity, runtime request-admission snapshot, and observed maxima.
row_group_admitted
Must include row-group id, row-group size, active row-group counts, and the admission capacity context at the time of admission.
row_group_checkpointed
Must include row-group id, row-group size, dropped rows, surviving rows, result, records checkpointed so far, and row groups checkpointed so far.
scheduler_pressure_changed
Must fire when coarse scheduler pressure state appears or clears. Payload should include the current pressure code(s), previous pressure code(s), queue facts, lease facts, and resource limits.
request_admission_pressure_changed
Must fire when coarse request-admission pressure state appears or clears for a resource/provider-model. Payload should include current/effective limits, in-flight counts, waiters, cooldown/blocking state, and previous/current pressure code(s).
explanation_changed
Must fire when a stable explanation code appears or clears. Payload should include:
{
"code": "no_checkpoint_yet",
"state": "active|cleared",
"evidence": {}
}
Terminal events
run_completed, run_failed, run_cancelled, and early_shutdown must include final counts and active explanation codes at termination.
Required Explanation Codes
V1 should expose a small stable enum of machine-readable codes:
no_checkpoint_yet: no row group has checkpointed in this invocation.
row_group_incomplete: at least one admitted row group is still waiting for required cells or columns.
row_group_admission_blocked: new row groups are blocked by row-group admission limits or guardrails.
scheduler_submission_saturated: scheduler task admission is at configured submission capacity.
scheduler_queue_backlog: scheduler ready queue has accumulated demand.
request_admission_saturated: request admission has reached current or effective request limits.
request_admission_waiters: model requests are waiting for request-admission leases.
request_admission_cooldown: request admission is temporarily blocked by AIMD/rate-limit cooldown.
checkpoint_progress: at least one row group checkpointed during this run.
capacity_metadata_unavailable: provider/model capacity metadata was not available from the runtime.
Codes must include enough evidence values for automated tooling to classify the state without parsing logs.
Reconstruction Requirements
A reader that replays complete events in sequence order must be able to reconstruct:
- configured async capacity at run start,
- the timeline of row-group admission and checkpoint progress,
- the timeline of scheduler pressure appearing and clearing,
- the timeline of request-admission pressure appearing and clearing,
- active explanation codes at any event sequence,
- final run status and durable record/checkpoint counts when a terminal event exists,
- last known state when a terminal event is absent because the process exited abruptly.
Implementation Notes
Relevant existing code paths:
AsyncTaskScheduler._scheduler_health_diagnostics(...) already exposes queue, lease, resource, row-group, and request-pressure fields.
AsyncTaskScheduler.capacity_plan() already produces a structured async capacity explanation.
SchedulerAdmissionEvent and request-admission events already provide an internal event-shaped vocabulary, but v1 diagnostics should define a stable public coarse event schema rather than leaking every internal event as-is.
ArtifactStorage.write_metadata(...) contains the existing durability pattern for filesystem writes.
RowGroupBufferManager.write_metadata(...) already writes incremental progress after row-group checkpoints.
DatasetCreationResults.task_traces is in-memory and current-invocation scoped; do not rely on it for this durable diagnostics feature.
A likely implementation shape is:
- add engine-owned
AsyncDiagnosticsEvent DTO/schema with schema_version=1,
- add
ArtifactStorage.async_diagnostics_file_path plus append/read helpers,
- have the async builder/scheduler publish coarse public diagnostics events through a lightweight sink/callback,
- emit
capacity_sample events on the progress interval and targeted change events on checkpoint/pressure transitions,
- track active explanation codes to emit
explanation_changed only when state changes,
- keep serialization JSON-safe for dataclasses/enums/resource keys,
- provide tests or utilities that replay the JSONL log into a latest-state object for assertions.
Acceptance Criteria
- Async engine runs append
async_diagnostics.jsonl at job start, before any parquet checkpoint or metadata.json is required.
- Every complete line in the artifact is valid JSON matching the v1 event envelope.
- Event
sequence values are monotonically increasing and sufficient to reconstruct run order.
- Readers tolerate an incomplete trailing line after abrupt process termination while preserving all prior complete events.
- Interrupting a run before the first row-group checkpoint leaves complete events showing
completed_row_groups=0, current scheduler/request-admission facts, and no_checkpoint_yet plus any applicable bottleneck codes.
- A run with request-admission waiters records request resources, current/effective limits, waiters, in-flight counts, and applicable request-admission pressure/explanation events.
- A run with scheduler queue pressure records queued totals, grouped demand, resource demand, and applicable scheduler pressure/explanation events.
- Successful async runs end with a terminal
run_completed event whose checkpoint/record counts match durable parquet metadata.
- The artifact never contains prompts, completions, row data, credentials, request payloads, or response payloads.
- Existing
async_trace=True behavior remains unchanged and is not required for diagnostics.
- Tests cover fresh runs, resumed runs, pre-first-checkpoint interruption, replay reconstruction, and at least one request-admission-pressure scenario.
Agent Investigation
Large public-API-only probes showed the diagnostics gap:
| Scenario shape |
Public settings |
Outcome |
What was missing |
| 50k-100k wide LLM columns |
larger buffer_size, high max_in_flight_tasks, high provider cap |
no callback/final rows in practical timebox while endpoint completed requests |
no durable queue/request/checkpoint diagnostics timeline |
| 1M wide run interrupted before first callback |
async_trace=True |
artifact directory had no durable progress beyond config metadata |
no persisted capacity event stream |
| 1M chained run interrupted after checkpoints |
async_trace=True |
resume worked for completed row groups |
no persisted capacity event stream |
The investigation did not find a distinct resume correctness bug in the checkpointed chained case. The issue is observability: interrupted and slow public runs do not explain how endpoint work, scheduler pressure, request pressure, and checkpoint progress evolved over time.
Checklist
Priority Level
High (Major improvement)
Problem
Public async
DataDesigner.create(...)runs do not expose durable capacity diagnostics that explain low throughput, delayed first checkpoints, zero final rows, or apparent endpoint idle time. For large fire-and-forget jobs, users can observe slow or missing artifact progress while model endpoints are completing healthy requests, but the public artifacts do not show enough scheduler, request-admission, row-group, or checkpoint state to diagnose the cause.This makes it hard to distinguish between cases such as:
Current workarounds require private scheduler instrumentation or external endpoint telemetry. Those are not available to typical public API users and do not fit fire-and-forget workflows.
Product Decisions
async_trace=Trueor a new opt-in flag.DatasetCreationResultshelper.buffer_size”.create,acreate, andpreview.Goals
metadata.jsonexists.async_trace=Trueas a separate heavy per-task tracing feature. Capacity diagnostics must not require it.Non-goals
async_trace=True.DatasetCreationResultsAPI in v1, though helpers for reading/replaying the event log can be added later.Artifact Contract
Write an append-only JSON Lines artifact at the dataset root, beside
metadata.json:Each complete line is one JSON event. The artifact is the source of truth for diagnostics history; a current snapshot can be reconstructed by replaying events. Implementations may keep an in-memory latest-state summary for emission logic, but v1 must not expose only an overwritten latest-state artifact.
Events must be appended durably enough for fire-and-forget diagnosis:
Emit events at minimum on:
capacity_sampleevent,Event Envelope v1
The implementation may use dataclasses or pydantic models, but every event should preserve this conceptual envelope:
{ "schema_version": 1, "engine": "async", "run_id": "...", "sequence": 0, "event_id": "<run_id>:<sequence>", "event_type": "run_started|capacity_sample|row_group_admitted|row_group_checkpointed|scheduler_pressure_changed|request_admission_pressure_changed|explanation_changed|run_completed|run_failed|run_cancelled|early_shutdown", "emitted_at": "ISO-8601 timestamp", "elapsed_seconds": 0.0, "scope": { "row_group": null, "resource": null, "provider_model": null, "column": null }, "payload": {} }sequencemust be monotonically increasing within a run and is the primary ordering key.emitted_atis for humans and cross-system correlation; replay should usesequence.Required Event Payloads
run_startedMust include configured run facts and static capacity facts known at startup:
{ "dataset_name": "...", "target_num_records": 0, "buffer_size": 0, "row_group_count": 0, "graph_column_count": 0, "graph_depth": 0, "configured_capacity": {}, "column_scheduling": [] }capacity_sampleMust include point-in-time scheduler, request-admission, checkpoint, and observed-maximum facts. This is an event in the timeline, not an overwritten snapshot.
{ "dataset": { "actual_num_records": 0, "completed_row_groups": 0, "active_row_groups": 0, "oldest_open_row_group_age_seconds": null }, "scheduler": { "queued_total": 0, "queued_by_group": {}, "queued_demand_by_resource": {}, "leased_resources": {}, "resource_limits": {}, "resources_available": {}, "in_flight_tasks": 0, "active_workers": 0, "row_group_admission_blocked_reasons": {} }, "request_admission": { "resources": {}, "provider_models": {} }, "capacity": { "configured": {}, "runtime_snapshot": {}, "observed_maxima": {} }, "checkpointing": { "last_checkpointed_row_group": null, "last_checkpoint_age_seconds": null, "row_groups_checkpointed": 0, "records_checkpointed": 0 }, "active_explanations": [] }The
capacitysection should be derived from the existing scheduler capacity-plan concepts where possible: configured capacity, runtime request-admission snapshot, and observed maxima.row_group_admittedMust include row-group id, row-group size, active row-group counts, and the admission capacity context at the time of admission.
row_group_checkpointedMust include row-group id, row-group size, dropped rows, surviving rows, result, records checkpointed so far, and row groups checkpointed so far.
scheduler_pressure_changedMust fire when coarse scheduler pressure state appears or clears. Payload should include the current pressure code(s), previous pressure code(s), queue facts, lease facts, and resource limits.
request_admission_pressure_changedMust fire when coarse request-admission pressure state appears or clears for a resource/provider-model. Payload should include current/effective limits, in-flight counts, waiters, cooldown/blocking state, and previous/current pressure code(s).
explanation_changedMust fire when a stable explanation code appears or clears. Payload should include:
{ "code": "no_checkpoint_yet", "state": "active|cleared", "evidence": {} }Terminal events
run_completed,run_failed,run_cancelled, andearly_shutdownmust include final counts and active explanation codes at termination.Required Explanation Codes
V1 should expose a small stable enum of machine-readable codes:
no_checkpoint_yet: no row group has checkpointed in this invocation.row_group_incomplete: at least one admitted row group is still waiting for required cells or columns.row_group_admission_blocked: new row groups are blocked by row-group admission limits or guardrails.scheduler_submission_saturated: scheduler task admission is at configured submission capacity.scheduler_queue_backlog: scheduler ready queue has accumulated demand.request_admission_saturated: request admission has reached current or effective request limits.request_admission_waiters: model requests are waiting for request-admission leases.request_admission_cooldown: request admission is temporarily blocked by AIMD/rate-limit cooldown.checkpoint_progress: at least one row group checkpointed during this run.capacity_metadata_unavailable: provider/model capacity metadata was not available from the runtime.Codes must include enough
evidencevalues for automated tooling to classify the state without parsing logs.Reconstruction Requirements
A reader that replays complete events in
sequenceorder must be able to reconstruct:Implementation Notes
Relevant existing code paths:
AsyncTaskScheduler._scheduler_health_diagnostics(...)already exposes queue, lease, resource, row-group, and request-pressure fields.AsyncTaskScheduler.capacity_plan()already produces a structured async capacity explanation.SchedulerAdmissionEventand request-admission events already provide an internal event-shaped vocabulary, but v1 diagnostics should define a stable public coarse event schema rather than leaking every internal event as-is.ArtifactStorage.write_metadata(...)contains the existing durability pattern for filesystem writes.RowGroupBufferManager.write_metadata(...)already writes incremental progress after row-group checkpoints.DatasetCreationResults.task_tracesis in-memory and current-invocation scoped; do not rely on it for this durable diagnostics feature.A likely implementation shape is:
AsyncDiagnosticsEventDTO/schema withschema_version=1,ArtifactStorage.async_diagnostics_file_pathplus append/read helpers,capacity_sampleevents on the progress interval and targeted change events on checkpoint/pressure transitions,explanation_changedonly when state changes,Acceptance Criteria
async_diagnostics.jsonlat job start, before any parquet checkpoint ormetadata.jsonis required.sequencevalues are monotonically increasing and sufficient to reconstruct run order.completed_row_groups=0, current scheduler/request-admission facts, andno_checkpoint_yetplus any applicable bottleneck codes.run_completedevent whose checkpoint/record counts match durable parquet metadata.async_trace=Truebehavior remains unchanged and is not required for diagnostics.Agent Investigation
Large public-API-only probes showed the diagnostics gap:
buffer_size, highmax_in_flight_tasks, high provider capasync_trace=Trueasync_trace=TrueThe investigation did not find a distinct resume correctness bug in the checkpointed chained case. The issue is observability: interrupted and slow public runs do not explain how endpoint work, scheduler pressure, request pressure, and checkpoint progress evolved over time.
Checklist