Skip to content

Add support for async hidden states connector#424

Open
fynnsu wants to merge 2 commits into
mainfrom
support_async_datagen
Open

Add support for async hidden states connector#424
fynnsu wants to merge 2 commits into
mainfrom
support_async_datagen

Conversation

@fynnsu
Copy link
Copy Markdown
Collaborator

@fynnsu fynnsu commented Apr 14, 2026

PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.

Purpose

vllm-project/vllm#37374 updates the ExampleHiddenStatesConnector to write the hidden states to disk in an async manner (and also in .pt files instead of .saftensors). This pr adds support for this while maintaining support for the previous implementation.

Description

The new system creates lock files for each hidden state file before starting to write the hidden states. On the client side, we block on obtaining the lock and once it's ready the hidden states are fully written to disk and can be read.

We only employ this behavior if the lock file exists, otherwise we assume the hidden states are fully written. This supports the existing system, and also works for offline or cached data gen results.

We clean up (delete) lock files after obtaining them.

Related Issue

Tests

Will run smoke / regression tests on this pr to ensure compatibility with existing system. I have also tested this locally with the upstream pr.

I have filled in:

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan/results, such as providing test command and pasting the results.
  • (Optional) The necessary documentation update.
  • I (a human) have written or reviewed the code in this pr to the best of my ability.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 14, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: edcbfb26-fdb1-4c6b-b329-39dd00285182

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

This PR implements OS-level file locking coordination for hidden-state file generation and consumption. Support is extended from .safetensors-only to both .safetensors and .pt formats. Lock file mechanisms synchronize concurrent file access between generation and loading processes across three interconnected modules.

Changes

Cohort / File(s) Summary
Lock File Utilities
src/speculators/data_generation/vllm_client.py
Added async (wait_for_lock_async) and sync (wait_for_lock) functions for OS-level file locking using fcntl.flock. Both variants poll exclusive lock acquisition with configurable timeouts and intervals; sync variant includes TimeoutError handling.
Offline Data Generation
scripts/data_generation_offline2.py
Updated hidden-state file detection to accept both .safetensors and .pt formats via suffix-based checks. Replaced format-specific validation with check_hidden_states_file supporting both formats. Added per-file lock handling in worker save flow using wait_for_lock_async before output finalization.
Training Data Loading
src/speculators/train/data.py
Added _load_hs_file(path) helper that waits for lock files and loads both .safetensors and .pt formats transparently. Updated discovery logic to probe both file formats and delegated actual loading to the new helper. File handling now uses Path objects consistently for existence checks and operations.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Worker Process
    participant FileSystem as File System
    participant Consumer as Consumer Process
    participant Loader as Data Loader

    Worker->>FileSystem: Generate hidden states & create *.lock
    Worker->>FileSystem: Write hs_{i}.safetensors/pt
    Consumer->>FileSystem: Check for *.lock file
    Consumer->>Consumer: Detected lock, await release
    Consumer->>Consumer: Call wait_for_lock_async(path.lock)
    Consumer->>FileSystem: Poll fcntl.flock(lock_fd) with retries
    Note over Consumer: Sleep between polls
    Worker->>FileSystem: Remove *.lock after completion
    FileSystem->>Consumer: Lock acquired (file available)
    Consumer->>Loader: Load hs_{i}.safetensors/pt
    Loader->>FileSystem: Read file via safe_open/torch.load
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title mentions 'async hidden states connector' but the changeset primarily addresses async file locking mechanisms and support for both .safetensors and .pt file formats, not connector implementation. Consider a more precise title like 'Support async hidden states file locking and .pt format' to better reflect the actual changes in the PR.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description clearly explains the purpose, implementation details (lock file handling, backward compatibility), and testing approach, all related to the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch support_async_datagen

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
scripts/data_generation_offline2.py (1)

126-144: ⚠️ Potential issue | 🟠 Major

Deduplicate indices when both formats exist.

Now that both .safetensors and .pt are valid, the same sample can appear twice in this directory. get_indices_to_process() uses len(existing), so duplicates here can make the script think more samples are complete than actually are and skip generation.

💡 Proposed fix
 def get_existing_hidden_state_indices(output_path: Path) -> list[int]:
     """Find existing `hs_i.safetensors` or `hs_i.pt` files (where i is file index)"""
 
-    existing_file_indices = []
+    existing_file_indices: set[int] = set()
@@
                 file_index = int(index_str)
-                existing_file_indices.append(file_index)
+                existing_file_indices.add(file_index)
             except ValueError:
                 continue
 
     return sorted(existing_file_indices)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/data_generation_offline2.py` around lines 126 - 144,
get_existing_hidden_state_indices currently appends indices for both
.safetensors and .pt files, causing duplicate indices when a sample has both
formats; change it to deduplicate by collecting file_index values into a set
(e.g., use a set like existing_file_indices_set) when iterating over files, then
convert to a sorted list before returning so get_indices_to_process() receives
unique indices; update references inside the function (file_path, index_str,
file_index) accordingly and ensure the return type remains list[int].
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/speculators/data_generation/vllm_client.py`:
- Around line 39-45: The finally block in wait_for_lock_async currently unlinks
the lock file regardless of outcome, which can remove the file after a
timeout/cancellation or race with other waiters; change it to only close the
file descriptor (os.close(fd)) in the finally and remove any
os.remove(lock_path) there so cleanup of the lock file is left to the lock
owner; also apply the same change to the other similar wait function that
removes the lock in its finally block (the synchronous wait_for_lock
counterpart) so no finally block unlinks the lock file (handle any explicit
removals only where ownership is guaranteed).

In `@src/speculators/train/data.py`:
- Around line 312-321: The new hs file handling (hs_path, loaded_hs =
self._load_hs_file(hs_path) and the match self.on_generate block which calls
self._map_to_file_idx, shutil.move and hs_path.unlink) must be moved inside the
existing "skip-on-error" guard used when generating samples so any exceptions
from _load_hs_file, shutil.move or Path.unlink are caught, logged as warnings,
and cause the sample to be skipped rather than raising; update the code around
the sample-generation flow to perform loading/caching/deleting only after the
RPC success but within the try/except (or existing skip-on-error) that calls the
skip path on exceptions and references self.hidden_states_path,
_map_to_file_idx, _load_hs_file, shutil.move and Path.unlink accordingly.

---

Outside diff comments:
In `@scripts/data_generation_offline2.py`:
- Around line 126-144: get_existing_hidden_state_indices currently appends
indices for both .safetensors and .pt files, causing duplicate indices when a
sample has both formats; change it to deduplicate by collecting file_index
values into a set (e.g., use a set like existing_file_indices_set) when
iterating over files, then convert to a sorted list before returning so
get_indices_to_process() receives unique indices; update references inside the
function (file_path, index_str, file_index) accordingly and ensure the return
type remains list[int].
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 66384a79-7d9c-4d47-b54f-1d64324da25c

📥 Commits

Reviewing files that changed from the base of the PR and between 2d4f54c and b804380.

📒 Files selected for processing (3)
  • scripts/data_generation_offline2.py
  • src/speculators/data_generation/vllm_client.py
  • src/speculators/train/data.py

Comment thread src/speculators/data_generation/vllm_client.py Outdated
Comment thread src/speculators/train/data.py Outdated
@mergify
Copy link
Copy Markdown

mergify Bot commented Apr 14, 2026

The quality checks have failed. Please run make style and make quality under
the root directory to address the lint failures. You will need to install the
dev optional install to get the required linting packages:
https://github.com/vllm-project/speculators/blob/main/CONTRIBUTING.md

@fynnsu fynnsu force-pushed the support_async_datagen branch from 1d0d030 to 393d870 Compare April 14, 2026 20:10
@mergify mergify Bot removed the quality-failed label Apr 14, 2026
@fynnsu fynnsu force-pushed the support_async_datagen branch from 393d870 to a3be804 Compare April 15, 2026 20:01
@dsikka
Copy link
Copy Markdown
Collaborator

dsikka commented May 8, 2026

Note: for internal purposes only. External contributors are safe to ignore this message.

Tracked in JIRA: https://issues.redhat.com/browse/INFERENG-6833

JIRA Details:

  • Issue Type: Task
  • Priority: Undefined
  • Component: Speculators
  • Activity Type: Tech Debt & Quality
  • Team: INFERENG Model Optimizations
  • Status: Backlog
  • Assignee: Fynn Schmitt-Ulms

Signed-off-by: Fynn Schmitt-Ulms <fschmitt@redhat.com>
@fynnsu fynnsu force-pushed the support_async_datagen branch from a3be804 to 7f6d59e Compare May 19, 2026 20:14
@mergify
Copy link
Copy Markdown

mergify Bot commented May 19, 2026

The quality checks have failed. Please run make style and make quality under
the root directory to address the lint failures. You will need to install the
dev optional install to get the required linting packages:
https://github.com/vllm-project/speculators/blob/main/CONTRIBUTING.md

Signed-off-by: Fynn Schmitt-Ulms <fschmitt@redhat.com>
@fynnsu fynnsu force-pushed the support_async_datagen branch from 7f6d59e to d352315 Compare May 19, 2026 20:17
@mergify mergify Bot removed the quality-failed label May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants