-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
140 lines (112 loc) · 4.97 KB
/
utils.py
File metadata and controls
140 lines (112 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from codetide.core.defaults import DEFAULT_ENCODING
from typing import List, Union
import aiofiles.os
import aiofiles
import os
import re
async def trim_to_patch_section(filename):
"""Remove all lines before '*** Begin Patch' and after '*** End Patch'"""
lines_to_keep = []
capturing = False
if not os.path.exists(filename):
return
async with aiofiles.open(filename, 'r', encoding=DEFAULT_ENCODING) as f:
async for line in f:
if '*** Begin Patch' in line:
capturing = True
lines_to_keep.append(line) # Include the begin marker
elif '*** End Patch' in line:
lines_to_keep.append(line) # Include the end marker
capturing = False # Stop capturing but continue processing
elif capturing:
lines_to_keep.append(line)
if lines_to_keep: # Write back only the lines we want to keep
async with aiofiles.open(filename, 'w', encoding=DEFAULT_ENCODING) as f:
await f.writelines(lines_to_keep)
else: # Otherwise, delete the file
try:
os.remove(filename)
except FileNotFoundError:
pass
def parse_blocks(text: str, block_word: str = "Commit", multiple: bool = True) -> Union[str, List[str], None]:
"""
Extract content between *** Begin <block_word> and *** End <block_word> markers (exclusive),
ensuring that both markers are at zero indentation (start of line, no leading spaces).
Handles trailing spaces after the block_word.
Args:
text: Full input text containing one or more blocks.
block_word: The word to use in the block markers (e.g., "Commit", "Section", "Code").
multiple: If True, return a list of all blocks. If False, return the first match.
Returns:
A string (single block), list of strings (multiple blocks), or None if not found.
"""
# Escape the block_word to handle any special regex characters
escaped_word = re.escape(block_word)
# Create pattern with the parameterized block word, allowing trailing spaces
# \s* allows for zero or more whitespace characters after the block_word
pattern = rf"(?m)^\*\*\* Begin {escaped_word}\s*\n([\s\S]*?)^\*\*\* End {escaped_word}\s*$"
matches = re.findall(pattern, text)
if not matches:
return None
if multiple:
return [match.strip() for match in matches]
else:
return matches[0].strip()
def parse_steps_markdown(md: str):
steps = []
# Extract only content between *** Begin Steps and *** End Steps
match = re.search(r"\*\*\* Begin Steps(.*?)\*\*\* End Steps", md, re.DOTALL)
if not match:
return []
steps_block = match.group(1).strip()
# Split steps by '---'
raw_steps = [s.strip() for s in steps_block.split('---') if s.strip()]
for raw_step in raw_steps:
# Match step number and description
step_header = re.match(r"(\d+)\.\s+\*\*(.*?)\*\*", raw_step)
if not step_header:
continue
step_num = int(step_header.group(1))
description = step_header.group(2).strip()
# Match instructions
instructions_match = re.search(r"\*\*instructions\*\*:\s*(.*?)(?=\*\*context_identifiers\*\*:)", raw_step, re.DOTALL)
instructions = instructions_match.group(1).strip() if instructions_match else ""
# Match context identifiers
context_match = re.search(r"\*\*context_identifiers\*\*:\s*(.*?)(?=\*\*modify_identifiers\*\*:)", raw_step, re.DOTALL)
context_block = context_match.group(1).strip() if context_match else ""
context_identifiers = re.findall(r"- (.+)", context_block)
# Match modifying identifiers
modify_match = re.search(r"\*\*modify_identifiers\*\*:\s*(.*)", raw_step, re.DOTALL)
modify_match = modify_match.group(1).strip() if modify_match else ""
modify_identifiers = re.findall(r"- (.+)", modify_match)
steps.append({
"step": step_num,
"description": description,
"instructions": instructions,
"context_identifiers": [identifier.strip() for identifier in context_identifiers],
"modify_identifiers": [identifier.strip() for identifier in modify_identifiers]
})
return steps
async def delete_file(file_path: str) -> bool:
"""
Safely delete a file asynchronously.
Args:
file_path: Path to the file to delete
Returns:
bool: True if file was deleted, False if it didn't exist
Raises:
PermissionError: If lacking permissions to delete
OSError: For other OS-related errors
"""
try:
await aiofiles.os.remove(file_path)
return True
except FileNotFoundError:
# File doesn't exist - consider this "success"
return False
except PermissionError:
# Handle permission issues
raise
except OSError:
# Handle other OS errors (disk full, etc.)
raise