|
1 | 1 | import io |
2 | 2 | import zipfile |
3 | 3 | from enum import Enum |
4 | | -from typing import List |
| 4 | +from typing import List, Optional |
5 | 5 | from uuid import uuid4 |
6 | 6 |
|
| 7 | +import httpx |
7 | 8 | from fastapi import APIRouter, File, Form, HTTPException, Request, Response, UploadFile |
8 | 9 | from fastapi.responses import JSONResponse, StreamingResponse |
9 | 10 | from libs.base.typed_fastapi import TypedFastAPI |
@@ -37,6 +38,8 @@ class process_router_paths(str, Enum): |
37 | 38 | START_PROCESSING = "/start-processing" |
38 | 39 | DELETE_FILE = "/delete-file/{file_name}" |
39 | 40 | DELETE_PROCESS = "/delete-process/{process_id}" |
| 41 | + CANCEL_PROCESS = "/cancel/{process_id}" |
| 42 | + CANCEL_STATUS = "/cancel/{process_id}/status" |
40 | 43 | STATUS = "/status/{process_id}/" |
41 | 44 | RENDER_STATUS = "/status/{process_id}/render/" |
42 | 45 | PROCESS_AGENT_ACTIVITIES = "/status/{process_id}/activities" |
@@ -578,3 +581,188 @@ async def get_file_content( |
578 | 581 | raise HTTPException( |
579 | 582 | status_code=500, detail=f"Error retrieving file content: {str(e)}" |
580 | 583 | ) |
| 584 | + |
| 585 | + |
| 586 | +@router.post(process_router_paths.CANCEL_PROCESS, status_code=202) |
| 587 | +async def cancel_process( |
| 588 | + process_id: str, |
| 589 | + request: Request, |
| 590 | + reason: Optional[str] = None, |
| 591 | +): |
| 592 | + """ |
| 593 | + Request cancellation of a running process. |
| 594 | + This endpoint forwards the kill request to the Processor's Control API. |
| 595 | + The processor will observe this request and terminate the running process. |
| 596 | + """ |
| 597 | + app: TypedFastAPI = request.app |
| 598 | + logger_service: ILoggerService = app.app_context.get_service(ILoggerService) |
| 599 | + |
| 600 | + try: |
| 601 | + logger_service.log_info(f"Cancel process request for process_id: {process_id}") |
| 602 | + |
| 603 | + # Get authenticated user |
| 604 | + authenticated_user = get_authenticated_user(request) |
| 605 | + user_id = authenticated_user.user_principal_id |
| 606 | + |
| 607 | + if not user_id: |
| 608 | + raise HTTPException(status_code=401, detail="User not authenticated") |
| 609 | + |
| 610 | + # Get processor control URL from configuration |
| 611 | + config = app.app_context.configuration |
| 612 | + processor_url = config.processor_control_url or "http://processor:8080" |
| 613 | + processor_token = config.processor_control_token or "" |
| 614 | + |
| 615 | + # Prepare headers for processor control API |
| 616 | + headers = {} |
| 617 | + if processor_token: |
| 618 | + headers["Authorization"] = f"Bearer {processor_token}" |
| 619 | + |
| 620 | + # Build the full URL for the kill endpoint |
| 621 | + kill_url = f"{processor_url}/processes/{process_id}/kill" |
| 622 | + logger_service.log_info(f"Calling processor kill API at: {kill_url}") |
| 623 | + |
| 624 | + # Forward kill request to Processor Control API |
| 625 | + # Note: verify=False is needed for internal ACA communication (self-signed certs) |
| 626 | + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: |
| 627 | + response = await client.post( |
| 628 | + kill_url, |
| 629 | + json={"reason": reason or f"User {user_id} cancelled from UI"}, |
| 630 | + headers=headers, |
| 631 | + ) |
| 632 | + logger_service.log_info(f"Processor kill API response: {response.status_code}") |
| 633 | + |
| 634 | + if response.status_code == 401: |
| 635 | + logger_service.log_error("Unauthorized access to processor control API") |
| 636 | + raise HTTPException( |
| 637 | + status_code=502, |
| 638 | + detail="Failed to authenticate with processor control API", |
| 639 | + ) |
| 640 | + |
| 641 | + if response.status_code >= 400: |
| 642 | + logger_service.log_error( |
| 643 | + f"Processor control API error: {response.status_code} - {response.text}" |
| 644 | + ) |
| 645 | + raise HTTPException( |
| 646 | + status_code=502, |
| 647 | + detail=f"Processor control API error: {response.text}", |
| 648 | + ) |
| 649 | + |
| 650 | + result = response.json() |
| 651 | + |
| 652 | + logger_service.log_info( |
| 653 | + f"Cancel request sent for process {process_id}, state: {result.get('kill_state', 'unknown')}" |
| 654 | + ) |
| 655 | + |
| 656 | + return { |
| 657 | + "message": "Cancellation request submitted", |
| 658 | + "process_id": process_id, |
| 659 | + "kill_requested": result.get("kill_requested", True), |
| 660 | + "kill_state": result.get("kill_state", "pending"), |
| 661 | + "kill_requested_at": result.get("kill_requested_at", ""), |
| 662 | + } |
| 663 | + |
| 664 | + except httpx.TimeoutException: |
| 665 | + logger_service.log_error(f"Timeout connecting to processor control API") |
| 666 | + raise HTTPException( |
| 667 | + status_code=504, |
| 668 | + detail="Timeout connecting to processor control API", |
| 669 | + ) |
| 670 | + except httpx.ConnectError: |
| 671 | + logger_service.log_error(f"Failed to connect to processor control API") |
| 672 | + raise HTTPException( |
| 673 | + status_code=503, |
| 674 | + detail="Processor control API is unavailable", |
| 675 | + ) |
| 676 | + except HTTPException: |
| 677 | + raise |
| 678 | + except Exception as e: |
| 679 | + logger_service.log_error(f"Error in cancel_process: {str(e)}") |
| 680 | + raise HTTPException( |
| 681 | + status_code=500, detail=f"Error cancelling process: {str(e)}" |
| 682 | + ) |
| 683 | + |
| 684 | + |
| 685 | +@router.get(process_router_paths.CANCEL_STATUS, status_code=200) |
| 686 | +async def get_cancel_status( |
| 687 | + process_id: str, |
| 688 | + request: Request, |
| 689 | +): |
| 690 | + """ |
| 691 | + Get the cancellation status of a process. |
| 692 | + Returns the current kill state from the Processor's Control API. |
| 693 | + """ |
| 694 | + app: TypedFastAPI = request.app |
| 695 | + logger_service: ILoggerService = app.app_context.get_service(ILoggerService) |
| 696 | + |
| 697 | + try: |
| 698 | + logger_service.log_info(f"Get cancel status for process_id: {process_id}") |
| 699 | + |
| 700 | + # Get authenticated user |
| 701 | + authenticated_user = get_authenticated_user(request) |
| 702 | + user_id = authenticated_user.user_principal_id |
| 703 | + |
| 704 | + if not user_id: |
| 705 | + raise HTTPException(status_code=401, detail="User not authenticated") |
| 706 | + |
| 707 | + # Get processor control URL from configuration |
| 708 | + config = app.app_context.configuration |
| 709 | + processor_url = config.processor_control_url or "http://processor:8080" |
| 710 | + processor_token = config.processor_control_token or "" |
| 711 | + |
| 712 | + # Prepare headers for processor control API |
| 713 | + headers = {} |
| 714 | + if processor_token: |
| 715 | + headers["Authorization"] = f"Bearer {processor_token}" |
| 716 | + |
| 717 | + # Build the full URL for the control status endpoint |
| 718 | + control_url = f"{processor_url}/processes/{process_id}/control" |
| 719 | + logger_service.log_info(f"Calling processor control API at: {control_url}") |
| 720 | + |
| 721 | + # Get control status from Processor Control API |
| 722 | + # Note: verify=False is needed for internal ACA communication (self-signed certs) |
| 723 | + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: |
| 724 | + response = await client.get( |
| 725 | + control_url, |
| 726 | + headers=headers, |
| 727 | + ) |
| 728 | + logger_service.log_info(f"Processor control API response: {response.status_code}") |
| 729 | + |
| 730 | + if response.status_code == 401: |
| 731 | + logger_service.log_error("Unauthorized access to processor control API") |
| 732 | + raise HTTPException( |
| 733 | + status_code=502, |
| 734 | + detail="Failed to authenticate with processor control API", |
| 735 | + ) |
| 736 | + |
| 737 | + if response.status_code >= 400: |
| 738 | + logger_service.log_error( |
| 739 | + f"Processor control API error: {response.status_code} - {response.text}" |
| 740 | + ) |
| 741 | + raise HTTPException( |
| 742 | + status_code=502, |
| 743 | + detail=f"Processor control API error: {response.text}", |
| 744 | + ) |
| 745 | + |
| 746 | + result = response.json() |
| 747 | + |
| 748 | + return result |
| 749 | + |
| 750 | + except httpx.TimeoutException: |
| 751 | + logger_service.log_error(f"Timeout connecting to processor control API") |
| 752 | + raise HTTPException( |
| 753 | + status_code=504, |
| 754 | + detail="Timeout connecting to processor control API", |
| 755 | + ) |
| 756 | + except httpx.ConnectError: |
| 757 | + logger_service.log_error(f"Failed to connect to processor control API") |
| 758 | + raise HTTPException( |
| 759 | + status_code=503, |
| 760 | + detail="Processor control API is unavailable", |
| 761 | + ) |
| 762 | + except HTTPException: |
| 763 | + raise |
| 764 | + except Exception as e: |
| 765 | + logger_service.log_error(f"Error in get_cancel_status: {str(e)}") |
| 766 | + raise HTTPException( |
| 767 | + status_code=500, detail=f"Error getting cancel status: {str(e)}" |
| 768 | + ) |
0 commit comments