Skip to content

feat: enable detection pipeline for async engine#119

Merged
lipikaramaswamy merged 7 commits intomainfrom
andreatgretel/bugfix/side-effect-column-collisions
Apr 22, 2026
Merged

feat: enable detection pipeline for async engine#119
lipikaramaswamy merged 7 commits intomainfrom
andreatgretel/bugfix/side-effect-column-collisions

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

@andreatgretel andreatgretel commented Apr 14, 2026

Summary

Enable the detection pipeline to run on DataDesigner's async engine (AsyncTaskScheduler).

  • The async engine builds an ExecutionGraph where each side-effect column must have exactly one producer. The detection pipeline previously had stages that wrote to the same side-effect columns (e.g. _initial_tagged_text, _seed_entities_json), which worked in the sync engine (sequential overwrites) but fails in the async DAG
  • Removes dead writes from parse_detected_entities (columns that were always overwritten before being read)
  • Introduces stage-specific column names (_seed_tagged_text, _seed_validation_candidates) to replace shared ones
  • Bumps data-designer to >=0.5.7 (includes async engine with side-effect column support, DD#509 and DD#545)
  • Adds a benchmark script comparing sync vs async engine performance

Type of Change

  • Enhancement

Benchmark Results

Full Anonymizer detection pipeline (EntityDetectionWorkflow.detect_and_validate_entities), 10 records, 5 iterations after warmup. Models: GLiNER NIM (self-hosted) + GPT-5.4 on inference-api.nvidia.com.

Engine Mean Stdev
Sync 30.9s 6.3s
Async 25.8s 1.5s
Speedup 1.20x

The async engine is ~20% faster and significantly more consistent (stdev 1.5s vs 6.3s). The speedup is modest because the detection DAG has sequential dependencies between columns - the async engine can't parallelize across those, only within each LLM step's concurrent request dispatch.

…peline

Give each side-effect column a single producer so the pipeline is
compatible with DataDesigner's async engine (which now raises
ConfigCompilationError on duplicate producers).

- Remove dead writes from parse_detected_entities (COL_INITIAL_TAGGED_TEXT
  and COL_SEED_ENTITIES_JSON were always overwritten before being read).
- Rename prepare_validation_inputs outputs to COL_SEED_TAGGED_TEXT and
  COL_SEED_VALIDATION_CANDIDATES so they don't collide with
  merge_and_build_candidates.
- Update downstream consumers (build_validation_skeleton,
  enrich_validation_decisions, validation prompt) to use the new names.
Bump data-designer to 0.5.6 which includes the async engine with
side-effect column support. Add a benchmark script that compares
sync vs async engine performance on an Anonymizer-shaped detection
pipeline, supporting both mock LLMs and real NIM endpoints.
- pyproject.toml: keep data-designer==0.5.6
- detection_workflow.py: adopt main's substitute_placeholders() refactor
  with our COL_SEED_TAGGED_TEXT fix
- uv.lock: take main's lockfile, update DD packages to 0.5.6
@andreatgretel andreatgretel requested review from a team as code owners April 14, 2026 12:33
@andreatgretel andreatgretel changed the title fix: eliminate duplicate side-effect column producers in detection pipeline feat: enable detection pipeline for async engine Apr 14, 2026
Custom column generators in the evaluate and repair workflows call
model.generate() directly, which fails under the async engine (the
HTTP client is async-only).

Add model_compat.model_generate() helper that tries sync generate()
first and falls back to scheduling agenerate() on DD's persistent
async engine loop via run_coroutine_threadsafe.
@andreatgretel andreatgretel marked this pull request as draft April 14, 2026 16:29
@andreatgretel
Copy link
Copy Markdown
Contributor Author

The detection pipeline works with the async engine after this PR + NVIDIA-NeMo/DataDesigner#509.

The rewrite pipeline has an additional issue: the evaluate and repair workflows use @custom_column_generator functions that call models[alias].generate() directly. In async engine mode the HTTP client is async-only, so these sync calls fail. DD's built-in LLM generators avoid this by having separate generate()/agenerate() methods on the ColumnGenerator class, but the @custom_column_generator decorator only supports a single (sync or async) function.

We have a working Anonymizer-side fix (c634b8d) that adds a model_generate() helper which tries sync first and falls back to scheduling agenerate() on DD's engine loop. But the proper long-term fix likely belongs in DD - either by exposing a dual-mode helper in the custom column API, or by having the async scheduler handle sync model-calling generators natively.

Taking a look on DD to see if we can fix it there already.

@andreatgretel
Copy link
Copy Markdown
Contributor Author

Btw, rewrite benchmark results using the temporary Anonymizer-side fix (3 records, 3 iterations after warmup, GLiNER NIM + GPT-5.4):

Engine Mean Stdev
Sync 244.9s 98.7s
Async 180.6s 50.4s
Speedup 1.36x

Same pattern as detection - async is faster and more consistent. The high sync variance (one run hit 358s) suggests API throttling that async handles better with concurrent dispatch.

@lipikaramaswamy
Copy link
Copy Markdown
Collaborator

I imagine the speedup will be more remarkable on datasets with high variance in input tokens (eg some examples are 1k tokens, some are 100k). Does that sound right?

@andreatgretel
Copy link
Copy Markdown
Contributor Author

Update: removed the model_compat.py workaround. DataDesigner#545 (merged today) moves the sync-to-async bridge into DD itself, so custom column generators that call model.generate() get auto-wrapped facades that transparently bridge to agenerate() under the async engine. All 530 tests pass - just waiting on a DD release that includes #509 + #545, then we pin to that version and merge.

@lipikaramaswamy yes, that's right. The async engine dispatches requests concurrently and processes results as they arrive, so high variance in input length means more slack to exploit - short examples complete and free up slots while long ones are still in flight. The sync engine is bottlenecked by the slowest request in each batch. Real Anonymizer datasets tend to have that kind of variance, so the speedup should land closer to the 1.36x we saw on the rewrite pipeline than the 1.20x from the detection benchmark (which used similar-length inputs).

DataDesigner#545 moves the sync-to-async model bridge into DD itself,
so custom column generators that call model.generate() work
transparently under the async engine. Remove the Anonymizer-side
model_compat.py workaround and revert to direct model.generate() calls.
DD#509 (side-effect column propagation) and DD#545 (async custom column
bridge) are both required by this PR and only shipped in v0.5.7.
@andreatgretel andreatgretel marked this pull request as ready for review April 21, 2026 16:13
Comment thread src/anonymizer/engine/detection/custom_columns.py Outdated
Comment thread src/anonymizer/engine/detection/detection_workflow.py
In async execution, merge_and_build_candidates was receiving the raw
GLiNER-parsed seeds from parse_detected_entities instead of the
validated seeds from apply_validation_to_seed_entities. The latter
rewrote COL_SEED_ENTITIES in-place as an undeclared side effect, which
the async engine silently strips.

Introduce COL_VALIDATED_SEED_ENTITIES as an explicit column produced by
apply_validation_to_seed_entities (declared in side_effect_columns) and
consumed by merge_and_build_candidates. The async DAG now resolves the
dependency correctly without duplicate-producer conflicts.
Copy link
Copy Markdown
Collaborator

@lipikaramaswamy lipikaramaswamy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-ran the async repro against the latest head (71c49ba) and confirmed the fix: merge_and_build_candidates() now consumes the validated seed entities via COL_VALIDATED_SEED_ENTITIES under DATA_DESIGNER_ASYNC_ENGINE=1, so the validate-before-merge flow is preserved in the async DAG.

One follow-up we need: Anonymizer should probably expose its own async toggle (for example ANONYMIZER_ASYNC_ENGINE=1) instead of asking users to know about DATA_DESIGNER_ASYNC_ENGINE. Requiring Anonymizer users to set a DataDesigner-specific env var feels like a leaky abstraction. I wouldn’t block this PR on it since the core async-safety fix is in place, but I do think we should add this before expecting users to benefit from the speedup. And perhaps in the same PR we can add user-facing docs to add the envvar and to note that the change applies to the detection workflow specifically.

@lipikaramaswamy
Copy link
Copy Markdown
Collaborator

Chatted with @andreatgretel -- async will become default on Monday's release of Data Designer, so we can just wait for that to happen and upgrade instead of doing envvar and docs work here.

@lipikaramaswamy lipikaramaswamy merged commit 110f5c0 into main Apr 22, 2026
11 checks passed
@lipikaramaswamy lipikaramaswamy deleted the andreatgretel/bugfix/side-effect-column-collisions branch April 22, 2026 19:10
lipikaramaswamy added a commit that referenced this pull request Apr 22, 2026
…che Jinja template

Drops asyncio.run / asyncio.to_thread / asyncio.gather from the chunked
validation path. _dispatch_chunk and chunked_validate_row become sync
defs; chunked_validate_row dispatches chunks via a ThreadPoolExecutor
and calls facade.generate() directly. Per-alias concurrency is still
enforced downstream by each facade's ThrottledModelClient, so the pool
exists purely to overlap this row's chunks. Under DataDesigner's async
engine the sync calls are transparently bridged to agenerate by the DD
runtime (DD#545), so the code path stays engine-agnostic. Test call
sites lose their asyncio.run(...) wrappers. A TODO(async-native)
comment in the module docstring flags the follow-up migration once the
async engine becomes the DD default.

Wraps _compile_template in functools.lru_cache(maxsize=4) so a row with
N chunks parses the Jinja source once instead of N times.

Folds in the post-#119 column rename for this module
(COL_MERGED_TAGGED_TEXT -> COL_SEED_TAGGED_TEXT,
COL_VALIDATION_CANDIDATES -> COL_SEED_VALIDATION_CANDIDATES, prompt
placeholder _merged_tagged_text -> _seed_tagged_text) that the rebase
resolved in neighbouring modules but never applied here.

Module docstring gains two follow-up-deferred paragraphs explaining
(a) why per-instance validation is intentional -- dedup by (value,
label) was considered and rejected because it conflates surface form
with meaning -- and (b) how peak prompt memory scales with chunk count,
with pointers to the orthogonal cost levers to pull if pressure shows
up in a real workload. Both are tracked as separate follow-up issues.

Made-with: Cursor
lipikaramaswamy added a commit that referenced this pull request Apr 23, 2026
* feat(detect): accept validator pool + chunked-validation config knobs

Allows users to declare `entity_validator` as either a single alias
(current behavior) or a list of aliases that form a validator pool.
Scalars normalize to single-element lists so every consumer sees
`list[str]`. Adds `validation_max_entities_per_call` (default 100) and
`validation_excerpt_window_chars` (default 500) to Detect, which
commit 2 will consume when it replaces the single validation LLM call
with a chunked custom column that rotates across the pool. Behavior
is unchanged in this commit: the workflow still issues one validation
call per row using pool[0].

Adds `resolve_model_aliases()` helper for list-valued roles and
updates `validate_model_alias_references` to flag unknown aliases in
a pool by index.

Made-with: Cursor

* feat(detect): chunked validation across a validator pool

Replaces the single per-row validation LLM call with a chunked custom
column that partitions each row's validation candidates, dispatches
each chunk to one alias from the configured `entity_validator` pool
(round-robin by chunk index), and merges the decisions back into the
shape `apply_validation_decisions` already expects. Single-alias pools
behave identically to the prior path; multi-alias pools distribute
calls to bypass per-alias TPM/RPM caps.

Chunk size and the per-chunk excerpt window are driven by the Detect
config knobs added in commit 1 (`validation_max_entities_per_call`,
`validation_excerpt_window_chars`) and threaded through
`Anonymizer -> EntityDetectionWorkflow.run`. Per-row tag notation is
resolved once against the full text and forced onto every chunk so
excerpts stay internally consistent; `build_tagged_text` gains an
optional `notation=` override for that purpose.

Implementation uses data_designer's `CustomColumnConfig` with
`model_aliases=pool`, letting DD inject a `{alias: ModelFacade}` dict
and giving us its retry/throttle behavior for free. A terminal LLM
error in any chunk fails the whole row (surfaced as a `FailedRecord`);
decisions for ids outside the candidate set are dropped to mirror
the single-call path's `enrich_validation_decisions` filter.

Emits a warning when the pool has more than one alias noting that
`max_parallel_requests` is enforced per alias, so the pool multiplies
total in-flight validator calls.

Made-with: Cursor

* docs(detect): document chunked validation + validator pools

Adds a "Chunked validation" section to the detection concept doc
covering the two new Detect knobs (`validation_max_entities_per_call`,
`validation_excerpt_window_chars`), when chunking kicks in, and how to
tune in either direction. Adds a "Validator pools" subsection to the
models doc demonstrating list-form `entity_validator:` YAML, with a
warning admonition calling out that `max_parallel_requests` is
enforced per alias (so a pool multiplies total in-flight calls) and
that pool aliases should target equivalent models. Cross-links the
two sections.

Rate-limit jargon (TPM/RPM) is expanded inline on first use in each
page since a reader can enter from either; the operator-facing log
warning in detection_workflow still uses the acronyms.

Also adds a behavioral regression test pinning single-chunk vs
multi-chunk parity: same deterministic per-id decisions, pool-of-one,
three chunks vs one chunk must produce byte-identical merged
`COL_VALIDATION_DECISIONS` and byte-identical post-`apply_validation_decisions`
entities, with the expected keep/reclass/drop/untouched tallies and
the final entity list pinned.

Made-with: Cursor

* fix: format

* feat(detect): cross-alias failover + DD engine compatibility

* docs: update wording

* refactor(detect): harden chunked-validation guards per review feedback

Follow-ups from code review on #126. No user-facing behaviour change; the
happy path is unchanged and all tests in the affected modules pass.

Source hardening (chunked_validation.py):
- Replace `assert last_exc is not None` in `_dispatch_chunk` with an explicit
  `if`+`RuntimeError` guard. `assert` is stripped under `python -O`, which
  would turn a caller-contract violation (empty `facades`) into a confusing
  `TypeError: exceptions must derive from BaseException` from `raise None`.
  The new guard survives `-O` and names the precondition being violated.
- Drop the redundant `max_entities_per_call <= 0` check in `chunk_candidates`.
  Positivity is already enforced at two upstream layers
  (`AnonymizerDetectConfig.validation_max_entities_per_call` and
  `ChunkedValidationParams.max_entities_per_call`, both `Field(gt=0)`); the
  inner check was defence-in-depth at a layer below the validated boundary.
- Widen `chunk_candidates` input from `list[...]` to `Sequence[...]` and
  normalize the return to `list(...)`, so the declared `list[list[tuple[...]]]`
  return type is honest regardless of whether a list, tuple, or other
  sequence is passed. Previously tuple input silently produced
  `list[tuple[tuple[...]]]`.

Documentation (models.py):
- Expand `normalize_entity_validator`'s docstring to explicitly document
  tuple acceptance (previously undocumented behaviour that matched Pydantic
  v2's default `list[str]` coercion but wasn't called out).

Test coverage:
- `test_tuple_input_returns_list_of_lists` replaces the now-obsolete
  `test_non_positive_limit_raises`; pins the sequence-input / list-output
  contract.
- `test_tuple_coerced_to_list` on `DetectionModelSelection` pins the
  `entity_validator=(...)` path.
- `test_system_prompt_is_forwarded_to_facade` and
  `test_system_prompt_default_none_is_forwarded_untouched` close the
  untested-forwarding gap on `ChunkedValidationParams.system_prompt`. The
  field is plumbed but not populated by any production caller today; tests
  pin the contract for when it's wired up (tracked in #127).

Made-with: Cursor

* docs(detect): tidy review-history residue in chunked-validation prose

Four inline prose touches in the chunked-validation module. No code or
test behaviour changes; only doc/comment rewording so the source reads
as a standalone spec rather than a narrated diff against prior versions.

- `chunk_candidates` docstring: state the ``max_entities_per_call > 0``
  precondition positively, point at the two upstream enforcement layers,
  drop the "so we do not re-check it here" coda.
- `_dispatch_chunk` defensive-guard comment: keep the substantive reasons
  (empty-facades precondition, loud named error vs. ``raise None``, guard
  survives ``python -O``) and drop the assert-vs-if comparison that only
  makes sense in the context of the review exchange that introduced it.
- `_dispatch_chunk` docstring: replace "matching the pre-failover
  contract" with a direct statement of the single-alias-pool behaviour
  ("no alternate alias to try").
- `test_system_prompt_default_none_is_forwarded_untouched` comment:
  tighten the "accidentally inject a placeholder" framing to just state
  the current recipe contract and what the test pins.

Made-with: Cursor

* refactor(config): dedupe validator pool and preserve native types from YAML

Addresses two review findings on the validator-pool config path.

normalize_entity_validator now collapses duplicate aliases to the first
occurrence (order preserved) and logs a warning. A duplicate would burn
a failover attempt on an already-exhausted endpoint -- almost certainly
not what the user intended. Empty pools still raise.

load_workflow_selections preserves list-valued roles as list[str]
instead of stringifying them. Stringifying would silently collapse a
pool YAML block into a single garbled alias repr, which Pydantic would
then treat as one alias. get_model_alias now raises TypeError when
called on a list-valued role with a pointer to resolve_model_aliases
for the pool shape.

Tests cover dedup, the no-dup-no-warning path, list-value preservation,
and the get_model_alias type error.

Made-with: Cursor

* chore(detect): drop unused build_validation_skeleton helper

build_validation_skeleton has been dead since chunked validation moved
skeleton construction inline into chunked_validate_row. No internal
callers remain; its tests go with it.

Made-with: Cursor

* refactor(detect): replace asyncio fan-out with ThreadPoolExecutor; cache Jinja template

Drops asyncio.run / asyncio.to_thread / asyncio.gather from the chunked
validation path. _dispatch_chunk and chunked_validate_row become sync
defs; chunked_validate_row dispatches chunks via a ThreadPoolExecutor
and calls facade.generate() directly. Per-alias concurrency is still
enforced downstream by each facade's ThrottledModelClient, so the pool
exists purely to overlap this row's chunks. Under DataDesigner's async
engine the sync calls are transparently bridged to agenerate by the DD
runtime (DD#545), so the code path stays engine-agnostic. Test call
sites lose their asyncio.run(...) wrappers. A TODO(async-native)
comment in the module docstring flags the follow-up migration once the
async engine becomes the DD default.

Wraps _compile_template in functools.lru_cache(maxsize=4) so a row with
N chunks parses the Jinja source once instead of N times.

Folds in the post-#119 column rename for this module
(COL_MERGED_TAGGED_TEXT -> COL_SEED_TAGGED_TEXT,
COL_VALIDATION_CANDIDATES -> COL_SEED_VALIDATION_CANDIDATES, prompt
placeholder _merged_tagged_text -> _seed_tagged_text) that the rebase
resolved in neighbouring modules but never applied here.

Module docstring gains two follow-up-deferred paragraphs explaining
(a) why per-instance validation is intentional -- dedup by (value,
label) was considered and rejected because it conflates surface form
with meaning -- and (b) how peak prompt memory scales with chunk count,
with pointers to the orthogonal cost levers to pull if pressure shows
up in a real workload. Both are tracked as separate follow-up issues.

Made-with: Cursor

* docs(detect): fix misleading C5 examples in chunked-validation docstring

The rejected-dedup rationale previously used "Apple"/"May" examples,
which are actually *different* (value, label) keys -- so dedup would
not affect them and the examples didn't demonstrate the concern. The
real failure case is same (value, label) carrying different correctness
across a document, which happens for short names that collide with
common English words: "Ira" / "irate" (substring false positive),
"Mark" / "mark", "Bill" / "bill", "Hope" / "hope". Validator would
keep one and drop the other; dedup broadcasts one decision across both.

Made-with: Cursor

* docs: clean up docstrings

* fix(config): re-validate selected_models overrides instead of model_copy

model_copy(update=...) in Pydantic v2 silently skips field validators,
so user overrides to selected_models bypassed
DetectionModelSelection.normalize_entity_validator entirely:

- entity_validator: []            -> accepted as [] (should raise)
- entity_validator: [a, a, b]     -> not deduped, no warning
- entity_validator: ["  a  ", ""] -> whitespace not stripped

Fix routes overrides through type(section).model_validate(merged) in
_merge_selections so every field validator re-runs at parse time.
Applied uniformly to detection/replace/rewrite to avoid leaving the
same landmine for any validators we add to Replace or Rewrite later.

Regression tests pin all three failure modes against parse_model_configs
itself, which is the entry point the bug was reproduced against.

Made-with: Cursor

* style(tests): flatten TestParseModelConfigsRevalidatesOverrides to module-level functions

Matches the majority style of test_model_loader.py. Class-grouped
shared-rationale docstring becomes a short block comment above the
three functions.

Made-with: Cursor

* make workflow yaml validation helper work with list valued pools

* display: remove prompt template from logging of validation step

* update logging

* apply fast path for single chunk

* treat none decisions as not present when chunking validation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants