feat: enable detection pipeline for async engine#119
Conversation
…peline Give each side-effect column a single producer so the pipeline is compatible with DataDesigner's async engine (which now raises ConfigCompilationError on duplicate producers). - Remove dead writes from parse_detected_entities (COL_INITIAL_TAGGED_TEXT and COL_SEED_ENTITIES_JSON were always overwritten before being read). - Rename prepare_validation_inputs outputs to COL_SEED_TAGGED_TEXT and COL_SEED_VALIDATION_CANDIDATES so they don't collide with merge_and_build_candidates. - Update downstream consumers (build_validation_skeleton, enrich_validation_decisions, validation prompt) to use the new names.
Bump data-designer to 0.5.6 which includes the async engine with side-effect column support. Add a benchmark script that compares sync vs async engine performance on an Anonymizer-shaped detection pipeline, supporting both mock LLMs and real NIM endpoints.
- pyproject.toml: keep data-designer==0.5.6 - detection_workflow.py: adopt main's substitute_placeholders() refactor with our COL_SEED_TAGGED_TEXT fix - uv.lock: take main's lockfile, update DD packages to 0.5.6
Custom column generators in the evaluate and repair workflows call model.generate() directly, which fails under the async engine (the HTTP client is async-only). Add model_compat.model_generate() helper that tries sync generate() first and falls back to scheduling agenerate() on DD's persistent async engine loop via run_coroutine_threadsafe.
|
The detection pipeline works with the async engine after this PR + NVIDIA-NeMo/DataDesigner#509. The rewrite pipeline has an additional issue: the evaluate and repair workflows use We have a working Anonymizer-side fix (c634b8d) that adds a Taking a look on DD to see if we can fix it there already. |
|
Btw, rewrite benchmark results using the temporary Anonymizer-side fix (3 records, 3 iterations after warmup, GLiNER NIM + GPT-5.4):
Same pattern as detection - async is faster and more consistent. The high sync variance (one run hit 358s) suggests API throttling that async handles better with concurrent dispatch. |
|
I imagine the speedup will be more remarkable on datasets with high variance in input tokens (eg some examples are 1k tokens, some are 100k). Does that sound right? |
|
Update: removed the @lipikaramaswamy yes, that's right. The async engine dispatches requests concurrently and processes results as they arrive, so high variance in input length means more slack to exploit - short examples complete and free up slots while long ones are still in flight. The sync engine is bottlenecked by the slowest request in each batch. Real Anonymizer datasets tend to have that kind of variance, so the speedup should land closer to the 1.36x we saw on the rewrite pipeline than the 1.20x from the detection benchmark (which used similar-length inputs). |
DataDesigner#545 moves the sync-to-async model bridge into DD itself, so custom column generators that call model.generate() work transparently under the async engine. Remove the Anonymizer-side model_compat.py workaround and revert to direct model.generate() calls.
DD#509 (side-effect column propagation) and DD#545 (async custom column bridge) are both required by this PR and only shipped in v0.5.7.
In async execution, merge_and_build_candidates was receiving the raw GLiNER-parsed seeds from parse_detected_entities instead of the validated seeds from apply_validation_to_seed_entities. The latter rewrote COL_SEED_ENTITIES in-place as an undeclared side effect, which the async engine silently strips. Introduce COL_VALIDATED_SEED_ENTITIES as an explicit column produced by apply_validation_to_seed_entities (declared in side_effect_columns) and consumed by merge_and_build_candidates. The async DAG now resolves the dependency correctly without duplicate-producer conflicts.
lipikaramaswamy
left a comment
There was a problem hiding this comment.
Re-ran the async repro against the latest head (71c49ba) and confirmed the fix: merge_and_build_candidates() now consumes the validated seed entities via COL_VALIDATED_SEED_ENTITIES under DATA_DESIGNER_ASYNC_ENGINE=1, so the validate-before-merge flow is preserved in the async DAG.
One follow-up we need: Anonymizer should probably expose its own async toggle (for example ANONYMIZER_ASYNC_ENGINE=1) instead of asking users to know about DATA_DESIGNER_ASYNC_ENGINE. Requiring Anonymizer users to set a DataDesigner-specific env var feels like a leaky abstraction. I wouldn’t block this PR on it since the core async-safety fix is in place, but I do think we should add this before expecting users to benefit from the speedup. And perhaps in the same PR we can add user-facing docs to add the envvar and to note that the change applies to the detection workflow specifically.
|
Chatted with @andreatgretel -- async will become default on Monday's release of Data Designer, so we can just wait for that to happen and upgrade instead of doing envvar and docs work here. |
…che Jinja template Drops asyncio.run / asyncio.to_thread / asyncio.gather from the chunked validation path. _dispatch_chunk and chunked_validate_row become sync defs; chunked_validate_row dispatches chunks via a ThreadPoolExecutor and calls facade.generate() directly. Per-alias concurrency is still enforced downstream by each facade's ThrottledModelClient, so the pool exists purely to overlap this row's chunks. Under DataDesigner's async engine the sync calls are transparently bridged to agenerate by the DD runtime (DD#545), so the code path stays engine-agnostic. Test call sites lose their asyncio.run(...) wrappers. A TODO(async-native) comment in the module docstring flags the follow-up migration once the async engine becomes the DD default. Wraps _compile_template in functools.lru_cache(maxsize=4) so a row with N chunks parses the Jinja source once instead of N times. Folds in the post-#119 column rename for this module (COL_MERGED_TAGGED_TEXT -> COL_SEED_TAGGED_TEXT, COL_VALIDATION_CANDIDATES -> COL_SEED_VALIDATION_CANDIDATES, prompt placeholder _merged_tagged_text -> _seed_tagged_text) that the rebase resolved in neighbouring modules but never applied here. Module docstring gains two follow-up-deferred paragraphs explaining (a) why per-instance validation is intentional -- dedup by (value, label) was considered and rejected because it conflates surface form with meaning -- and (b) how peak prompt memory scales with chunk count, with pointers to the orthogonal cost levers to pull if pressure shows up in a real workload. Both are tracked as separate follow-up issues. Made-with: Cursor
* feat(detect): accept validator pool + chunked-validation config knobs
Allows users to declare `entity_validator` as either a single alias
(current behavior) or a list of aliases that form a validator pool.
Scalars normalize to single-element lists so every consumer sees
`list[str]`. Adds `validation_max_entities_per_call` (default 100) and
`validation_excerpt_window_chars` (default 500) to Detect, which
commit 2 will consume when it replaces the single validation LLM call
with a chunked custom column that rotates across the pool. Behavior
is unchanged in this commit: the workflow still issues one validation
call per row using pool[0].
Adds `resolve_model_aliases()` helper for list-valued roles and
updates `validate_model_alias_references` to flag unknown aliases in
a pool by index.
Made-with: Cursor
* feat(detect): chunked validation across a validator pool
Replaces the single per-row validation LLM call with a chunked custom
column that partitions each row's validation candidates, dispatches
each chunk to one alias from the configured `entity_validator` pool
(round-robin by chunk index), and merges the decisions back into the
shape `apply_validation_decisions` already expects. Single-alias pools
behave identically to the prior path; multi-alias pools distribute
calls to bypass per-alias TPM/RPM caps.
Chunk size and the per-chunk excerpt window are driven by the Detect
config knobs added in commit 1 (`validation_max_entities_per_call`,
`validation_excerpt_window_chars`) and threaded through
`Anonymizer -> EntityDetectionWorkflow.run`. Per-row tag notation is
resolved once against the full text and forced onto every chunk so
excerpts stay internally consistent; `build_tagged_text` gains an
optional `notation=` override for that purpose.
Implementation uses data_designer's `CustomColumnConfig` with
`model_aliases=pool`, letting DD inject a `{alias: ModelFacade}` dict
and giving us its retry/throttle behavior for free. A terminal LLM
error in any chunk fails the whole row (surfaced as a `FailedRecord`);
decisions for ids outside the candidate set are dropped to mirror
the single-call path's `enrich_validation_decisions` filter.
Emits a warning when the pool has more than one alias noting that
`max_parallel_requests` is enforced per alias, so the pool multiplies
total in-flight validator calls.
Made-with: Cursor
* docs(detect): document chunked validation + validator pools
Adds a "Chunked validation" section to the detection concept doc
covering the two new Detect knobs (`validation_max_entities_per_call`,
`validation_excerpt_window_chars`), when chunking kicks in, and how to
tune in either direction. Adds a "Validator pools" subsection to the
models doc demonstrating list-form `entity_validator:` YAML, with a
warning admonition calling out that `max_parallel_requests` is
enforced per alias (so a pool multiplies total in-flight calls) and
that pool aliases should target equivalent models. Cross-links the
two sections.
Rate-limit jargon (TPM/RPM) is expanded inline on first use in each
page since a reader can enter from either; the operator-facing log
warning in detection_workflow still uses the acronyms.
Also adds a behavioral regression test pinning single-chunk vs
multi-chunk parity: same deterministic per-id decisions, pool-of-one,
three chunks vs one chunk must produce byte-identical merged
`COL_VALIDATION_DECISIONS` and byte-identical post-`apply_validation_decisions`
entities, with the expected keep/reclass/drop/untouched tallies and
the final entity list pinned.
Made-with: Cursor
* fix: format
* feat(detect): cross-alias failover + DD engine compatibility
* docs: update wording
* refactor(detect): harden chunked-validation guards per review feedback
Follow-ups from code review on #126. No user-facing behaviour change; the
happy path is unchanged and all tests in the affected modules pass.
Source hardening (chunked_validation.py):
- Replace `assert last_exc is not None` in `_dispatch_chunk` with an explicit
`if`+`RuntimeError` guard. `assert` is stripped under `python -O`, which
would turn a caller-contract violation (empty `facades`) into a confusing
`TypeError: exceptions must derive from BaseException` from `raise None`.
The new guard survives `-O` and names the precondition being violated.
- Drop the redundant `max_entities_per_call <= 0` check in `chunk_candidates`.
Positivity is already enforced at two upstream layers
(`AnonymizerDetectConfig.validation_max_entities_per_call` and
`ChunkedValidationParams.max_entities_per_call`, both `Field(gt=0)`); the
inner check was defence-in-depth at a layer below the validated boundary.
- Widen `chunk_candidates` input from `list[...]` to `Sequence[...]` and
normalize the return to `list(...)`, so the declared `list[list[tuple[...]]]`
return type is honest regardless of whether a list, tuple, or other
sequence is passed. Previously tuple input silently produced
`list[tuple[tuple[...]]]`.
Documentation (models.py):
- Expand `normalize_entity_validator`'s docstring to explicitly document
tuple acceptance (previously undocumented behaviour that matched Pydantic
v2's default `list[str]` coercion but wasn't called out).
Test coverage:
- `test_tuple_input_returns_list_of_lists` replaces the now-obsolete
`test_non_positive_limit_raises`; pins the sequence-input / list-output
contract.
- `test_tuple_coerced_to_list` on `DetectionModelSelection` pins the
`entity_validator=(...)` path.
- `test_system_prompt_is_forwarded_to_facade` and
`test_system_prompt_default_none_is_forwarded_untouched` close the
untested-forwarding gap on `ChunkedValidationParams.system_prompt`. The
field is plumbed but not populated by any production caller today; tests
pin the contract for when it's wired up (tracked in #127).
Made-with: Cursor
* docs(detect): tidy review-history residue in chunked-validation prose
Four inline prose touches in the chunked-validation module. No code or
test behaviour changes; only doc/comment rewording so the source reads
as a standalone spec rather than a narrated diff against prior versions.
- `chunk_candidates` docstring: state the ``max_entities_per_call > 0``
precondition positively, point at the two upstream enforcement layers,
drop the "so we do not re-check it here" coda.
- `_dispatch_chunk` defensive-guard comment: keep the substantive reasons
(empty-facades precondition, loud named error vs. ``raise None``, guard
survives ``python -O``) and drop the assert-vs-if comparison that only
makes sense in the context of the review exchange that introduced it.
- `_dispatch_chunk` docstring: replace "matching the pre-failover
contract" with a direct statement of the single-alias-pool behaviour
("no alternate alias to try").
- `test_system_prompt_default_none_is_forwarded_untouched` comment:
tighten the "accidentally inject a placeholder" framing to just state
the current recipe contract and what the test pins.
Made-with: Cursor
* refactor(config): dedupe validator pool and preserve native types from YAML
Addresses two review findings on the validator-pool config path.
normalize_entity_validator now collapses duplicate aliases to the first
occurrence (order preserved) and logs a warning. A duplicate would burn
a failover attempt on an already-exhausted endpoint -- almost certainly
not what the user intended. Empty pools still raise.
load_workflow_selections preserves list-valued roles as list[str]
instead of stringifying them. Stringifying would silently collapse a
pool YAML block into a single garbled alias repr, which Pydantic would
then treat as one alias. get_model_alias now raises TypeError when
called on a list-valued role with a pointer to resolve_model_aliases
for the pool shape.
Tests cover dedup, the no-dup-no-warning path, list-value preservation,
and the get_model_alias type error.
Made-with: Cursor
* chore(detect): drop unused build_validation_skeleton helper
build_validation_skeleton has been dead since chunked validation moved
skeleton construction inline into chunked_validate_row. No internal
callers remain; its tests go with it.
Made-with: Cursor
* refactor(detect): replace asyncio fan-out with ThreadPoolExecutor; cache Jinja template
Drops asyncio.run / asyncio.to_thread / asyncio.gather from the chunked
validation path. _dispatch_chunk and chunked_validate_row become sync
defs; chunked_validate_row dispatches chunks via a ThreadPoolExecutor
and calls facade.generate() directly. Per-alias concurrency is still
enforced downstream by each facade's ThrottledModelClient, so the pool
exists purely to overlap this row's chunks. Under DataDesigner's async
engine the sync calls are transparently bridged to agenerate by the DD
runtime (DD#545), so the code path stays engine-agnostic. Test call
sites lose their asyncio.run(...) wrappers. A TODO(async-native)
comment in the module docstring flags the follow-up migration once the
async engine becomes the DD default.
Wraps _compile_template in functools.lru_cache(maxsize=4) so a row with
N chunks parses the Jinja source once instead of N times.
Folds in the post-#119 column rename for this module
(COL_MERGED_TAGGED_TEXT -> COL_SEED_TAGGED_TEXT,
COL_VALIDATION_CANDIDATES -> COL_SEED_VALIDATION_CANDIDATES, prompt
placeholder _merged_tagged_text -> _seed_tagged_text) that the rebase
resolved in neighbouring modules but never applied here.
Module docstring gains two follow-up-deferred paragraphs explaining
(a) why per-instance validation is intentional -- dedup by (value,
label) was considered and rejected because it conflates surface form
with meaning -- and (b) how peak prompt memory scales with chunk count,
with pointers to the orthogonal cost levers to pull if pressure shows
up in a real workload. Both are tracked as separate follow-up issues.
Made-with: Cursor
* docs(detect): fix misleading C5 examples in chunked-validation docstring
The rejected-dedup rationale previously used "Apple"/"May" examples,
which are actually *different* (value, label) keys -- so dedup would
not affect them and the examples didn't demonstrate the concern. The
real failure case is same (value, label) carrying different correctness
across a document, which happens for short names that collide with
common English words: "Ira" / "irate" (substring false positive),
"Mark" / "mark", "Bill" / "bill", "Hope" / "hope". Validator would
keep one and drop the other; dedup broadcasts one decision across both.
Made-with: Cursor
* docs: clean up docstrings
* fix(config): re-validate selected_models overrides instead of model_copy
model_copy(update=...) in Pydantic v2 silently skips field validators,
so user overrides to selected_models bypassed
DetectionModelSelection.normalize_entity_validator entirely:
- entity_validator: [] -> accepted as [] (should raise)
- entity_validator: [a, a, b] -> not deduped, no warning
- entity_validator: [" a ", ""] -> whitespace not stripped
Fix routes overrides through type(section).model_validate(merged) in
_merge_selections so every field validator re-runs at parse time.
Applied uniformly to detection/replace/rewrite to avoid leaving the
same landmine for any validators we add to Replace or Rewrite later.
Regression tests pin all three failure modes against parse_model_configs
itself, which is the entry point the bug was reproduced against.
Made-with: Cursor
* style(tests): flatten TestParseModelConfigsRevalidatesOverrides to module-level functions
Matches the majority style of test_model_loader.py. Class-grouped
shared-rationale docstring becomes a short block comment above the
three functions.
Made-with: Cursor
* make workflow yaml validation helper work with list valued pools
* display: remove prompt template from logging of validation step
* update logging
* apply fast path for single chunk
* treat none decisions as not present when chunking validation
Summary
Enable the detection pipeline to run on DataDesigner's async engine (
AsyncTaskScheduler).ExecutionGraphwhere each side-effect column must have exactly one producer. The detection pipeline previously had stages that wrote to the same side-effect columns (e.g._initial_tagged_text,_seed_entities_json), which worked in the sync engine (sequential overwrites) but fails in the async DAGparse_detected_entities(columns that were always overwritten before being read)_seed_tagged_text,_seed_validation_candidates) to replace shared onesdata-designerto >=0.5.7 (includes async engine with side-effect column support, DD#509 and DD#545)Type of Change
Benchmark Results
Full Anonymizer detection pipeline (
EntityDetectionWorkflow.detect_and_validate_entities), 10 records, 5 iterations after warmup. Models: GLiNER NIM (self-hosted) + GPT-5.4 oninference-api.nvidia.com.The async engine is ~20% faster and significantly more consistent (stdev 1.5s vs 6.3s). The speedup is modest because the detection DAG has sequential dependencies between columns - the async engine can't parallelize across those, only within each LLM step's concurrent request dispatch.