Skip to content

[FIX] local backend trial cleanup#13

Open
VincentG1234 wants to merge 1 commit intomainfrom
fix/local-backend-cleanup
Open

[FIX] local backend trial cleanup#13
VincentG1234 wants to merge 1 commit intomainfrom
fix/local-backend-cleanup

Conversation

@VincentG1234
Copy link
Copy Markdown
Collaborator

Summary

  • improve local backend cleanup by tracking trial controllers alongside futures
  • request cooperative cancellation before forcing resource cleanup
  • wait briefly for running local trials to stop before clearing backend state

Why

The previous local cleanup path only cancelled futures, which is not enough when trial work is already running. This change makes shutdown and cleanup more reliable by propagating cancellation to the trial controller and attempting resource cleanup for still-running trials.

Signed-off-by: Vincent Gimenes <vincent.gimenes@gmail.com>
@VincentG1234
Copy link
Copy Markdown
Collaborator Author

@hgsmn tell me If that fix the bug you met

@VincentG1234 VincentG1234 changed the title Fix local backend trial cleanup [FIX] local backend trial cleanup Apr 27, 2026
@hgsmn
Copy link
Copy Markdown
Collaborator

hgsmn commented Apr 27, 2026

Yes it works for me. Stopping the process is pretty long because vllm needs to finish to process requests before stopping.

@hgsmn
Copy link
Copy Markdown
Collaborator

hgsmn commented Apr 27, 2026

#2

@hgsmn
Copy link
Copy Markdown
Collaborator

hgsmn commented Apr 29, 2026

In the cleanup_all_trials function, we can maybe add more comments and logs during the cleaning process to check the process that need to be forced stop.

Here is a proposal for the full function :

def cleanup_all_trials(self):
    """Force cleanup of all active local trials."""
    if not self.active_futures:
        logger.debug("No active local trials to cleanup")
        return

    logger.info(f"Cleaning up {len(self.active_futures)} active local trial(s)")

    running_jobs = [
        (job_id, future)
        for job_id, future in self.active_futures.items()
        if not future.done()
    ]

    # Phase 1 — Graceful cancellation request
    for job_id, _future in running_jobs:
        controller = self.active_controllers.get(job_id)
        if controller is None:
            logger.warning(
                f"Inconsistent state: future exists for trial {job_id} "
                "but no controller found — skipping cancellation"
            )
            continue

        try:
            controller.request_cancellation()
            logger.debug(f"Requested cancellation for local trial {job_id}")
        except Exception as e:
            logger.warning(
                f"Failed to request cancellation for local trial {job_id}: {e}"
            )

    # Phase 2 — Wait and force cleanup
    if running_jobs:
        futures_list = [future for _, future in running_jobs]

        logger.info(
            f"Waiting {self.CANCELLATION_DETECTION_WAIT}s for local trials "
            "to detect cancellation..."
        )
        done, not_done = concurrent.futures.wait(
            futures_list,
            timeout=self.CANCELLATION_DETECTION_WAIT,
        )
        logger.info(
            f"{len(done)} local trial(s) stopped after cancellation request, "
            f"{len(not_done)} still running"
        )

        for job_id, future in running_jobs:
            if future.done():
                continue

            controller = self.active_controllers.get(job_id)
            if controller is None:
                logger.warning(
                    f"Inconsistent state: future exists for trial {job_id} "
                    "but no controller found — skipping forced cleanup"
                )
                continue

            try:
                controller.cleanup_resources()
                logger.debug(f"Forced cleanup for local trial {job_id}")
            except Exception as e:
                logger.warning(
                    f"Failed forced cleanup for local trial {job_id}: {e}"
                )

        # Phase 3 — Last chance
        if any(not future.done() for _, future in running_jobs):
            concurrent.futures.wait(
                futures_list,
                timeout=self.GRACEFUL_CLEANUP_TIMEOUT,
            )

    # Finalization
    orphaned = [job_id for job_id, future in running_jobs if not future.done()]
    if orphaned:
        logger.warning(
            f"{len(orphaned)} trial(s) could not be stopped and were abandoned: "
            f"{orphaned}"
        )

    self.active_futures.clear()
    self.active_controllers.clear()
    logger.info("Completed cleanup of all active local trials")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants