Skip to content

Commit d63b9f4

Browse files
committed
migrate: Add a replace_file_atomically() function
This utility function replaces a whole file. We also update the function `replace_file_contents_atomically()` to use the new function and avoid duplication. Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
1 parent 326cba9 commit d63b9f4

2 files changed

Lines changed: 90 additions & 62 deletions

File tree

.github/cookiecutter-migrate.template.py

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,50 @@ def apply_patch(patch_content: str) -> None:
8585
subprocess.run(["patch", "-p1"], input=patch_content.encode(), check=True)
8686

8787

88+
def replace_file_atomically( # noqa; DOC501, DOC503
89+
filepath: str | Path, new_content: str
90+
) -> None:
91+
"""Replace a file atomically with the given content.
92+
93+
The replacement is done atomically by writing to a temporary file in the
94+
same directory and then moving it to the target location.
95+
96+
Args:
97+
filepath: The path to the file to replace.
98+
new_content: The content to write to the file.
99+
"""
100+
if isinstance(filepath, str):
101+
filepath = Path(filepath)
102+
103+
tmp_dir = filepath.parent
104+
tmp_dir.mkdir(parents=True, exist_ok=True)
105+
106+
# pylint: disable-next=consider-using-with
107+
tmp = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
108+
109+
try:
110+
st = None
111+
try:
112+
st = os.stat(filepath)
113+
except FileNotFoundError:
114+
st = None
115+
116+
tmp.write(new_content)
117+
tmp.flush()
118+
os.fsync(tmp.fileno())
119+
tmp.close()
120+
121+
if st is not None:
122+
os.chmod(tmp.name, st.st_mode)
123+
124+
os.replace(tmp.name, filepath)
125+
126+
except BaseException:
127+
tmp.close()
128+
os.unlink(tmp.name)
129+
raise
130+
131+
88132
def replace_file_contents_atomically( # noqa; DOC501
89133
filepath: str | Path,
90134
old: str,
@@ -112,37 +156,7 @@ def replace_file_contents_atomically( # noqa; DOC501
112156
if content is None:
113157
content = filepath.read_text(encoding="utf-8")
114158

115-
content = content.replace(old, new, count)
116-
117-
# Create temporary file in the same directory to ensure atomic move
118-
tmp_dir = filepath.parent
119-
120-
# pylint: disable-next=consider-using-with
121-
tmp = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
122-
123-
try:
124-
# Copy original file permissions
125-
st = os.stat(filepath)
126-
127-
# Write the new content
128-
tmp.write(content)
129-
130-
# Ensure all data is written to disk
131-
tmp.flush()
132-
os.fsync(tmp.fileno())
133-
tmp.close()
134-
135-
# Copy original file permissions to the new file
136-
os.chmod(tmp.name, st.st_mode)
137-
138-
# Perform atomic replace
139-
os.rename(tmp.name, filepath)
140-
141-
except BaseException:
142-
# Clean up the temporary file in case of errors
143-
tmp.close()
144-
os.unlink(tmp.name)
145-
raise
159+
replace_file_atomically(filepath, content.replace(old, new, count))
146160

147161

148162
def calculate_file_sha256_skip_lines(filepath: Path, skip_lines: int) -> str | None:

cookiecutter/migrate.py

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,50 @@ def apply_patch(patch_content: str) -> None:
366366
subprocess.run(["patch", "-p1"], input=patch_content.encode(), check=True)
367367

368368

369+
def replace_file_atomically( # noqa; DOC501, DOC503
370+
filepath: str | Path, new_content: str
371+
) -> None:
372+
"""Replace a file atomically with the given content.
373+
374+
The replacement is done atomically by writing to a temporary file in the
375+
same directory and then moving it to the target location.
376+
377+
Args:
378+
filepath: The path to the file to replace.
379+
new_content: The content to write to the file.
380+
"""
381+
if isinstance(filepath, str):
382+
filepath = Path(filepath)
383+
384+
tmp_dir = filepath.parent
385+
tmp_dir.mkdir(parents=True, exist_ok=True)
386+
387+
# pylint: disable-next=consider-using-with
388+
tmp = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
389+
390+
try:
391+
st = None
392+
try:
393+
st = os.stat(filepath)
394+
except FileNotFoundError:
395+
st = None
396+
397+
tmp.write(new_content)
398+
tmp.flush()
399+
os.fsync(tmp.fileno())
400+
tmp.close()
401+
402+
if st is not None:
403+
os.chmod(tmp.name, st.st_mode)
404+
405+
os.replace(tmp.name, filepath)
406+
407+
except BaseException:
408+
tmp.close()
409+
os.unlink(tmp.name)
410+
raise
411+
412+
369413
def replace_file_contents_atomically( # noqa; DOC501
370414
filepath: str | Path,
371415
old: str,
@@ -393,37 +437,7 @@ def replace_file_contents_atomically( # noqa; DOC501
393437
if content is None:
394438
content = filepath.read_text(encoding="utf-8")
395439

396-
content = content.replace(old, new, count)
397-
398-
# Create temporary file in the same directory to ensure atomic move
399-
tmp_dir = filepath.parent
400-
401-
# pylint: disable-next=consider-using-with
402-
tmp = tempfile.NamedTemporaryFile(mode="w", dir=tmp_dir, delete=False)
403-
404-
try:
405-
# Copy original file permissions
406-
st = os.stat(filepath)
407-
408-
# Write the new content
409-
tmp.write(content)
410-
411-
# Ensure all data is written to disk
412-
tmp.flush()
413-
os.fsync(tmp.fileno())
414-
tmp.close()
415-
416-
# Copy original file permissions to the new file
417-
os.chmod(tmp.name, st.st_mode)
418-
419-
# Perform atomic replace
420-
os.rename(tmp.name, filepath)
421-
422-
except BaseException:
423-
# Clean up the temporary file in case of errors
424-
tmp.close()
425-
os.unlink(tmp.name)
426-
raise
440+
replace_file_atomically(filepath, content.replace(old, new, count))
427441

428442

429443
def calculate_file_sha256_skip_lines(filepath: Path, skip_lines: int) -> str | None:

0 commit comments

Comments
 (0)