|
7 | 7 | import time |
8 | 8 | import uuid |
9 | 9 | from concurrent.futures import ProcessPoolExecutor |
| 10 | +from concurrent.futures.process import BrokenProcessPool |
10 | 11 | from decimal import Decimal |
11 | 12 | from typing import Any |
12 | 13 |
|
@@ -69,6 +70,7 @@ def _get_or_create_pool(self) -> ProcessPoolExecutor: |
69 | 70 | def cleanup_process_pool(self): |
70 | 71 | """Cleanup process pool to prevent resource leaks""" |
71 | 72 | if self.process_pool: |
| 73 | + self.logger.info("Cleaning up process pool") |
72 | 74 | self.process_pool.shutdown(wait=False) |
73 | 75 | self.process_pool = None |
74 | 76 |
|
@@ -659,26 +661,44 @@ async def _process_activities_from_commits( |
659 | 661 |
|
660 | 662 | executor = self._get_or_create_pool() |
661 | 663 |
|
662 | | - futures = [ |
663 | | - loop.run_in_executor( |
664 | | - executor, |
665 | | - CommitService.process_commits_chunk, |
666 | | - chunk, |
667 | | - repo_path, |
668 | | - edge_commit_hash, |
669 | | - remote, |
670 | | - segment_id, |
671 | | - integration_id, |
672 | | - ) |
673 | | - for chunk in chunks |
674 | | - ] |
| 664 | + try: |
| 665 | + futures = [ |
| 666 | + loop.run_in_executor( |
| 667 | + executor, |
| 668 | + CommitService.process_commits_chunk, |
| 669 | + chunk, |
| 670 | + repo_path, |
| 671 | + edge_commit_hash, |
| 672 | + remote, |
| 673 | + segment_id, |
| 674 | + integration_id, |
| 675 | + ) |
| 676 | + for chunk in chunks |
| 677 | + ] |
| 678 | + self.logger.info(f"Submitted {len(futures)} tasks to process pool") |
| 679 | + except Exception as e: |
| 680 | + if isinstance(e, BrokenProcessPool): |
| 681 | + self.logger.warning("BrokenProcessPool during task submission, cleaning up") |
| 682 | + self.cleanup_process_pool() |
| 683 | + raise |
675 | 684 |
|
676 | 685 | # Save each chunk's activities as they complete |
| 686 | + completed_chunks = 0 |
677 | 687 | for future in asyncio.as_completed(futures): |
678 | | - chunk_activities_db, chunk_activities_queue = await future |
679 | | - if chunk_activities_db and chunk_activities_queue: |
680 | | - await batch_insert_activities(chunk_activities_db) |
681 | | - await self.queue_service.send_batch_activities(chunk_activities_queue) |
| 688 | + try: |
| 689 | + chunk_activities_db, chunk_activities_queue = await future |
| 690 | + completed_chunks += 1 |
| 691 | + self.logger.info(f"Chunk {completed_chunks}/{len(futures)} completed") |
| 692 | + if chunk_activities_db and chunk_activities_queue: |
| 693 | + await batch_insert_activities(chunk_activities_db) |
| 694 | + await self.queue_service.send_batch_activities(chunk_activities_queue) |
| 695 | + except Exception as e: |
| 696 | + if isinstance(e, BrokenProcessPool): |
| 697 | + self.logger.warning( |
| 698 | + f"BrokenProcessPool after {completed_chunks}/{len(futures)} chunks, cleaning up" |
| 699 | + ) |
| 700 | + self.cleanup_process_pool() |
| 701 | + raise |
682 | 702 |
|
683 | 703 | @staticmethod |
684 | 704 | def _validate_commit_structure(commit_lines: list[str]) -> bool: |
|
0 commit comments