|
16 | 16 | from infrahub.core.manager import NodeManager |
17 | 17 | from infrahub.core.node import Node |
18 | 18 | from infrahub.core.schema.schema_branch import SchemaBranch |
19 | | -from infrahub.core.timestamp import Timestamp |
20 | 19 | from infrahub.database import InfrahubDatabase |
21 | 20 | from infrahub.graphql.initialization import prepare_graphql_params |
22 | | -from infrahub.message_bus.types import KVTTL |
23 | 21 | from infrahub.permissions import AssignedPermissions, PermissionBackend |
24 | 22 | from infrahub.proposed_change.constants import ProposedChangeState |
25 | 23 | from infrahub.proposed_change.models import RequestProposedChangePipeline |
26 | 24 | from infrahub.services import InfrahubServices |
27 | 25 | from infrahub.services.adapters.workflow.local import WorkflowLocalExecution |
28 | 26 | from infrahub.services.component import InfrahubComponent |
29 | | -from infrahub.worker import WORKER_IDENTITY |
30 | 27 | from infrahub.workers.dependencies import build_client |
31 | 28 | from infrahub.workflows.catalogue import REQUEST_PROPOSED_CHANGE_PIPELINE |
32 | 29 | from infrahub.workflows.initialization import setup_deployments, setup_worker_pools |
@@ -617,18 +614,7 @@ async def test_merge_proposed_change_permission_failure( |
617 | 614 | await setup_deployments(prefect_client) |
618 | 615 |
|
619 | 616 | branch_name = "merge-proposed-change-perm" |
620 | | - branch = await create_branch(branch_name=branch_name, db=db) |
621 | | - await service.cache.set( |
622 | | - key=f"workers:schema_hash:branch:{str(branch.get_uuid)}:{service.component_type.value}:worker:{WORKER_IDENTITY}", |
623 | | - value=branch.active_schema_hash.main, |
624 | | - expires=KVTTL.TWO_HOURS, |
625 | | - ) |
626 | | - await service.cache.set( |
627 | | - key=f"workers:active:{service.component_type.value}:worker:{WORKER_IDENTITY}", |
628 | | - value=Timestamp().to_string(), |
629 | | - expires=KVTTL.FIFTEEN, |
630 | | - ) |
631 | | - await service.component.refresh_heartbeat() |
| 617 | + await create_branch(branch_name=branch_name, db=db) |
632 | 618 |
|
633 | 619 | proposed_change = await Node.init(db=db, schema=InfrahubKind.PROPOSEDCHANGE) |
634 | 620 | await proposed_change.new( |
@@ -658,6 +644,66 @@ async def test_merge_proposed_change_permission_failure( |
658 | 644 | assert not update_status.errors |
659 | 645 |
|
660 | 646 |
|
| 647 | +class TestMergeProposedChangeUnexpectedFailure(TestInfrahubApp): |
| 648 | + async def test_merge_proposed_change_unexpected_failure_resets_state( |
| 649 | + self, |
| 650 | + db: InfrahubDatabase, |
| 651 | + default_permission_backend: None, |
| 652 | + register_core_models_schema: None, |
| 653 | + session_admin: AccountSession, |
| 654 | + client: InfrahubClient, |
| 655 | + dependency_provider: Provider, |
| 656 | + ) -> None: |
| 657 | + with dependency_provider.scope(build_client, lambda: client): |
| 658 | + cache = MemoryCache() |
| 659 | + message_bus = BusRecorder() |
| 660 | + service = await InfrahubServices.new( |
| 661 | + database=db, |
| 662 | + message_bus=message_bus, |
| 663 | + workflow=WorkflowLocalExecution(), |
| 664 | + cache=cache, |
| 665 | + client=client, |
| 666 | + component=InfrahubComponent( |
| 667 | + cache=cache, db=db, message_bus=message_bus, component_type=ComponentType.NONE |
| 668 | + ), |
| 669 | + ) |
| 670 | + |
| 671 | + async with get_client(sync_client=False) as prefect_client: |
| 672 | + await setup_worker_pools(client=prefect_client) |
| 673 | + await setup_deployments(prefect_client) |
| 674 | + |
| 675 | + branch_name = "merge-proposed-change-fail" |
| 676 | + await create_branch(branch_name=branch_name, db=db) |
| 677 | + |
| 678 | + proposed_change = await Node.init(db=db, schema=InfrahubKind.PROPOSEDCHANGE) |
| 679 | + await proposed_change.new( |
| 680 | + db=db, name="pc-merge-fail-1234", destination_branch="main", source_branch=branch_name, state="open" |
| 681 | + ) |
| 682 | + await proposed_change.save(db=db) |
| 683 | + |
| 684 | + with patch( |
| 685 | + "infrahub.proposed_change.tasks.merge_branch", |
| 686 | + side_effect=RuntimeError("simulated post-merge failure"), |
| 687 | + ): |
| 688 | + update_status = await graphql_mutation( |
| 689 | + query=UPDATE_PROPOSED_CHANGE, |
| 690 | + db=db, |
| 691 | + variables={"proposed_change": proposed_change.id, "state": "merged"}, |
| 692 | + account_session=session_admin, |
| 693 | + service=service, |
| 694 | + ) |
| 695 | + |
| 696 | + assert update_status.errors |
| 697 | + assert update_status.errors[0].message == ( |
| 698 | + f"Merge failure when trying to merge {branch_name}: simulated post-merge failure" |
| 699 | + ) |
| 700 | + |
| 701 | + refreshed_pc = await NodeManager.get_one(db=db, id=proposed_change.id, raise_on_error=True) |
| 702 | + assert refreshed_pc.get_attribute("state").value.value == ProposedChangeState.OPEN.value |
| 703 | + branch = await Branch.get_by_name(db=db, name=branch_name) |
| 704 | + assert branch.status == BranchStatus.OPEN |
| 705 | + |
| 706 | + |
661 | 707 | async def test_create_thread( |
662 | 708 | db: InfrahubDatabase, |
663 | 709 | register_core_models_schema: None, |
|
0 commit comments