Add support for async hidden states connector#424
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR implements OS-level file locking coordination for hidden-state file generation and consumption. Support is extended from Changes
Sequence Diagram(s)sequenceDiagram
participant Worker as Worker Process
participant FileSystem as File System
participant Consumer as Consumer Process
participant Loader as Data Loader
Worker->>FileSystem: Generate hidden states & create *.lock
Worker->>FileSystem: Write hs_{i}.safetensors/pt
Consumer->>FileSystem: Check for *.lock file
Consumer->>Consumer: Detected lock, await release
Consumer->>Consumer: Call wait_for_lock_async(path.lock)
Consumer->>FileSystem: Poll fcntl.flock(lock_fd) with retries
Note over Consumer: Sleep between polls
Worker->>FileSystem: Remove *.lock after completion
FileSystem->>Consumer: Lock acquired (file available)
Consumer->>Loader: Load hs_{i}.safetensors/pt
Loader->>FileSystem: Read file via safe_open/torch.load
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
scripts/data_generation_offline2.py (1)
126-144:⚠️ Potential issue | 🟠 MajorDeduplicate indices when both formats exist.
Now that both
.safetensorsand.ptare valid, the same sample can appear twice in this directory.get_indices_to_process()useslen(existing), so duplicates here can make the script think more samples are complete than actually are and skip generation.💡 Proposed fix
def get_existing_hidden_state_indices(output_path: Path) -> list[int]: """Find existing `hs_i.safetensors` or `hs_i.pt` files (where i is file index)""" - existing_file_indices = [] + existing_file_indices: set[int] = set() @@ file_index = int(index_str) - existing_file_indices.append(file_index) + existing_file_indices.add(file_index) except ValueError: continue return sorted(existing_file_indices)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/data_generation_offline2.py` around lines 126 - 144, get_existing_hidden_state_indices currently appends indices for both .safetensors and .pt files, causing duplicate indices when a sample has both formats; change it to deduplicate by collecting file_index values into a set (e.g., use a set like existing_file_indices_set) when iterating over files, then convert to a sorted list before returning so get_indices_to_process() receives unique indices; update references inside the function (file_path, index_str, file_index) accordingly and ensure the return type remains list[int].
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/speculators/data_generation/vllm_client.py`:
- Around line 39-45: The finally block in wait_for_lock_async currently unlinks
the lock file regardless of outcome, which can remove the file after a
timeout/cancellation or race with other waiters; change it to only close the
file descriptor (os.close(fd)) in the finally and remove any
os.remove(lock_path) there so cleanup of the lock file is left to the lock
owner; also apply the same change to the other similar wait function that
removes the lock in its finally block (the synchronous wait_for_lock
counterpart) so no finally block unlinks the lock file (handle any explicit
removals only where ownership is guaranteed).
In `@src/speculators/train/data.py`:
- Around line 312-321: The new hs file handling (hs_path, loaded_hs =
self._load_hs_file(hs_path) and the match self.on_generate block which calls
self._map_to_file_idx, shutil.move and hs_path.unlink) must be moved inside the
existing "skip-on-error" guard used when generating samples so any exceptions
from _load_hs_file, shutil.move or Path.unlink are caught, logged as warnings,
and cause the sample to be skipped rather than raising; update the code around
the sample-generation flow to perform loading/caching/deleting only after the
RPC success but within the try/except (or existing skip-on-error) that calls the
skip path on exceptions and references self.hidden_states_path,
_map_to_file_idx, _load_hs_file, shutil.move and Path.unlink accordingly.
---
Outside diff comments:
In `@scripts/data_generation_offline2.py`:
- Around line 126-144: get_existing_hidden_state_indices currently appends
indices for both .safetensors and .pt files, causing duplicate indices when a
sample has both formats; change it to deduplicate by collecting file_index
values into a set (e.g., use a set like existing_file_indices_set) when
iterating over files, then convert to a sorted list before returning so
get_indices_to_process() receives unique indices; update references inside the
function (file_path, index_str, file_index) accordingly and ensure the return
type remains list[int].
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 66384a79-7d9c-4d47-b54f-1d64324da25c
📒 Files selected for processing (3)
scripts/data_generation_offline2.pysrc/speculators/data_generation/vllm_client.pysrc/speculators/train/data.py
|
The quality checks have failed. Please run |
1d0d030 to
393d870
Compare
393d870 to
a3be804
Compare
|
Note: for internal purposes only. External contributors are safe to ignore this message. Tracked in JIRA: https://issues.redhat.com/browse/INFERENG-6833 JIRA Details:
|
Signed-off-by: Fynn Schmitt-Ulms <fschmitt@redhat.com>
a3be804 to
7f6d59e
Compare
|
The quality checks have failed. Please run |
Signed-off-by: Fynn Schmitt-Ulms <fschmitt@redhat.com>
7f6d59e to
d352315
Compare
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
Purpose
vllm-project/vllm#37374 updates the ExampleHiddenStatesConnector to write the hidden states to disk in an async manner (and also in
.ptfiles instead of.saftensors). This pr adds support for this while maintaining support for the previous implementation.Description
The new system creates lock files for each hidden state file before starting to write the hidden states. On the client side, we block on obtaining the lock and once it's ready the hidden states are fully written to disk and can be read.
We only employ this behavior if the lock file exists, otherwise we assume the hidden states are fully written. This supports the existing system, and also works for offline or cached data gen results.
We clean up (delete) lock files after obtaining them.
Related Issue
Tests
Will run smoke / regression tests on this pr to ensure compatibility with existing system. I have also tested this locally with the upstream pr.
I have filled in: