Skip to content

Commit 2f03dfb

Browse files
authored
fix: git integration accumulated objects (#3394)
1 parent aca98ee commit 2f03dfb

3 files changed

Lines changed: 129 additions & 35 deletions

File tree

services/apps/git_integration/src/crowdgit/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class ErrorCode(str, Enum):
1616
VALIDATION = "validation"
1717
NO_MAINTAINER_FILE = "no-maintainer-file"
1818
MAINTAINER_ANALYSIS_FAILED = "maintainer_analysis_failed"
19+
CLEANUP_FAILED = "cleanup-failed"
1920

2021

2122
class RepositoryState(str, Enum):

services/apps/git_integration/src/crowdgit/services/clone/clone_service.py

Lines changed: 126 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from decimal import Decimal
1717

1818
DEFAULT_CLONE_BATCH_DEPTH = 10
19+
DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB = 2000
1920

2021

2122
class CloneService(BaseService):
@@ -60,6 +61,56 @@ async def _init_minimal_clone(self, path: str, remote: str) -> None:
6061
)
6162
self.logger.info("Minimal clone initialized successfully")
6263

64+
async def _get_repo_size_mb(self, repo_path: str) -> float:
65+
try:
66+
result = await run_shell_command(["du", "-sm", repo_path], cwd=repo_path)
67+
size_mb = float(result.strip().split()[0])
68+
return size_mb
69+
except Exception as e:
70+
self.logger.warning(f"Failed to get repo size: {e}")
71+
return 0.0
72+
73+
async def _optimize_repository_storage(
74+
self, repo_path: str, threshold_mb: float = DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB
75+
) -> None:
76+
"""
77+
Optimize repository storage if size exceeds threshold.
78+
This is infrequently performed because 'git gc' is costly (CPU/IO intensive and time-consuming)
79+
only runs for repositories (>2GB) that grow due to incremental fetching creating
80+
inefficient pack files.
81+
Uses git gc to compact repository and logs timing, size before and after operation.
82+
"""
83+
try:
84+
size_before = await self._get_repo_size_mb(repo_path)
85+
86+
if size_before > threshold_mb:
87+
self.logger.info(
88+
f"Repository size {size_before:.1f}MB > {threshold_mb:.0f}MB threshold, running git gc"
89+
)
90+
91+
start_time = time.time()
92+
await run_shell_command(
93+
["git", "gc", "--keep-largest-pack", "--quiet"], cwd=repo_path
94+
)
95+
gc_duration = time.time() - start_time
96+
97+
size_after = await self._get_repo_size_mb(repo_path)
98+
reduction_pct = (
99+
((size_before - size_after) / size_before) * 100 if size_before > 0 else 0
100+
)
101+
102+
self.logger.info(
103+
f"Git gc completed in {gc_duration:.1f}s: {size_before:.1f}MB → {size_after:.1f}MB ({reduction_pct:.1f}% reduction)"
104+
)
105+
else:
106+
self.logger.debug(
107+
f"Repository size {size_before:.1f}MB is below threshold {threshold_mb:.0f}MB, skipping storage optimization (normal - most repositories stay small)"
108+
)
109+
110+
except Exception as e:
111+
self.logger.error(f"Failed to perform git gc: {repr(e)}")
112+
# Don't raise - gc failure shouldn't stop processing
113+
63114
@retry(
64115
stop=stop_after_attempt(5),
65116
wait=wait_fixed(1),
@@ -73,6 +124,8 @@ async def _clone_next_batch(self, repo_path: str, batch_depth: int):
73124
await run_shell_command(
74125
["git", "fetch", "origin", default_branch, f"--deepen={batch_depth}"], cwd=repo_path
75126
)
127+
# Optimize repository storage using git garbage collection
128+
await self._optimize_repository_storage(repo_path)
76129

77130
async def _get_batch_commit_info(
78131
self,
@@ -169,41 +222,81 @@ async def _get_edge_commit(self, repo_path: str):
169222
except FileNotFoundError:
170223
return None
171224

225+
async def _cleanup_temp_directory(self, temp_repo_path: str, repo_id: str) -> None:
226+
"""
227+
Clean up temporary directory with retries and error handling.
228+
If cleanup fails after all retries, log the failure to service execution.
229+
"""
230+
try:
231+
await self._cleanup_temp_directory_with_retries(temp_repo_path)
232+
self.logger.info(f"successfully cleaned temp dir {temp_repo_path}")
233+
except Exception as e:
234+
error_message = (
235+
f"Failed to cleanup temp directory {temp_repo_path} after retries: {repr(e)}"
236+
)
237+
self.logger.error(error_message)
238+
239+
# Save cleanup failure to service execution (only after all retries failed)
240+
try:
241+
service_execution = ServiceExecution(
242+
repo_id=repo_id,
243+
operation_type=OperationType.CLONE,
244+
status=ExecutionStatus.FAILURE,
245+
error_code=ErrorCode.CLEANUP_FAILED.value,
246+
error_message=error_message,
247+
execution_time_sec=Decimal("0.0"),
248+
)
249+
await save_service_execution(service_execution)
250+
except Exception as save_error:
251+
self.logger.error(f"Failed to save cleanup failure: {repr(save_error)}")
252+
253+
@retry(
254+
stop=stop_after_attempt(3),
255+
wait=wait_fixed(2),
256+
reraise=True,
257+
)
258+
async def _cleanup_temp_directory_with_retries(self, temp_repo_path: str) -> None:
259+
"""
260+
Actual cleanup implementation with retries.
261+
Raises exceptions so @retry can handle them.
262+
"""
263+
self.logger.info(f"cleaning temp dir {temp_repo_path}")
264+
shutil.rmtree(temp_repo_path)
265+
266+
@retry(
267+
stop=stop_after_attempt(3),
268+
wait=wait_fixed(2),
269+
reraise=False,
270+
)
172271
async def _cleanup_working_directory(self, repo_path: str) -> None:
173272
"""
174273
Remove all files and directories from the repository except the .git directory.
175274
This helps reduce disk usage while preserving git history for commit processing.
176275
"""
177-
try:
178-
self.logger.info(f"Cleaning working directory: {repo_path}")
179-
180-
# Use find command to remove everything except .git directory
181-
await run_shell_command(
182-
[
183-
"find",
184-
".",
185-
"-mindepth",
186-
"1",
187-
"-maxdepth",
188-
"1",
189-
"!",
190-
"-name",
191-
".git",
192-
"-exec",
193-
"rm",
194-
"-rf",
195-
"{}",
196-
"+",
197-
],
198-
cwd=repo_path,
199-
)
276+
self.logger.info(f"Cleaning working directory: {repo_path}")
200277

201-
self.logger.info("Working directory cleanup completed")
278+
# Use find command to remove everything except .git directory
279+
await run_shell_command(
280+
[
281+
"find",
282+
".",
283+
"-mindepth",
284+
"1",
285+
"-maxdepth",
286+
"1",
287+
"!",
288+
"-name",
289+
".git",
290+
"-exec",
291+
"rm",
292+
"-rf",
293+
"{}",
294+
"+",
295+
],
296+
cwd=repo_path,
297+
)
202298

203-
except Exception as e:
204-
self.logger.error(f"Failed to cleanup working directory: {e}")
205-
# Don't raise the exception as cleanup failure shouldn't stop the cloning process
206-
# The process can continue with the files present
299+
self.logger.info("Working directory cleanup completed")
207300

208301
async def _calculate_batch_depth(self, repo_path: str, remote: str) -> int:
209302
calculated_depth = None
@@ -213,16 +306,16 @@ async def _calculate_batch_depth(self, repo_path: str, remote: str) -> int:
213306
total_branches_tags = len(total_branches_tags.splitlines())
214307
if total_branches_tags <= 200:
215308
# Small repo, get a decent amount of history
216-
calculated_depth = 250
309+
calculated_depth = 500
217310
elif total_branches_tags <= 1000:
218311
# Medium repo, get a moderate amount of history
219-
calculated_depth = 150
312+
calculated_depth = 300
220313
elif total_branches_tags <= 5000:
221314
# Large repo, get less history
222-
calculated_depth = 10
315+
calculated_depth = 20
223316
else:
224317
# Very large repo, get a minimal history
225-
calculated_depth = 5
318+
calculated_depth = 10
226319
self.logger.info(
227320
f"total_branches_tags={total_branches_tags}, calculated_depth={calculated_depth}"
228321
)
@@ -289,8 +382,7 @@ async def clone_batches_generator(
289382
raise
290383
finally:
291384
if temp_repo_path and os.path.exists(temp_repo_path):
292-
self.logger.info(f"cleaning temp dir {temp_repo_path}")
293-
shutil.rmtree(temp_repo_path)
385+
await self._cleanup_temp_directory(temp_repo_path, repo_id)
294386

295387
# Save metrics
296388
service_execution = ServiceExecution(

services/apps/git_integration/src/crowdgit/services/queue/queue_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def _build_kafka_config(self):
2727
"client_id": self._CLIENT_ID,
2828
"acks": "all",
2929
}
30-
30+
if not CROWD_KAFKA_EXTRA:
31+
return config
3132
# Parse extra configuration from kafkajs config
3233
extra_config = json.loads(CROWD_KAFKA_EXTRA)
3334

0 commit comments

Comments
 (0)