|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | +Behavior-driven tests for the backend_safe_file_io.py file. |
| 5 | +
|
| 6 | +Tests for crash-safe atomic file writing utilities. |
| 7 | +
|
| 8 | +This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator |
| 9 | +
|
| 10 | +SPDX-FileCopyrightText: 2024-2026 Amilcar do Carmo Lucas <amilcar.lucas@iav.de> |
| 11 | +
|
| 12 | +SPDX-License-Identifier: GPL-3.0-or-later |
| 13 | +""" |
| 14 | + |
| 15 | +import io |
| 16 | +import json |
| 17 | +import os |
| 18 | +from pathlib import Path |
| 19 | +from unittest.mock import patch |
| 20 | + |
| 21 | +import pytest |
| 22 | + |
| 23 | +import ardupilot_methodic_configurator.backend_safe_file_io as sio |
| 24 | +from ardupilot_methodic_configurator.backend_safe_file_io import make_capture_safe_write, safe_write |
| 25 | + |
| 26 | +# pylint: disable=redefined-outer-name |
| 27 | + |
| 28 | + |
| 29 | +@pytest.fixture |
| 30 | +def tmp_file(tmp_path: Path) -> Path: |
| 31 | + """Fixture providing a temporary file path.""" |
| 32 | + return tmp_path / "test_output.json" |
| 33 | + |
| 34 | + |
| 35 | +class TestSafeWriteBasicBehavior: |
| 36 | + """Tests for basic safe_write behavior.""" |
| 37 | + |
| 38 | + def test_user_can_write_content_to_new_file(self, tmp_file: Path) -> None: |
| 39 | + """ |
| 40 | + User can write content to a new file atomically. |
| 41 | +
|
| 42 | + GIVEN: A target file path that does not yet exist |
| 43 | + WHEN: User calls safe_write with a write function |
| 44 | + THEN: File should be created with the correct content |
| 45 | + AND: No temporary files should remain |
| 46 | + """ |
| 47 | + content = "Hello, World!\n" |
| 48 | + |
| 49 | + safe_write(str(tmp_file), lambda f: f.write(content)) |
| 50 | + |
| 51 | + assert tmp_file.exists() |
| 52 | + assert tmp_file.read_text(encoding="utf-8") == content |
| 53 | + |
| 54 | + def test_user_can_overwrite_existing_file(self, tmp_file: Path) -> None: |
| 55 | + """ |
| 56 | + User can atomically overwrite an existing file. |
| 57 | +
|
| 58 | + GIVEN: A target file already containing some content |
| 59 | + WHEN: User calls safe_write with new content |
| 60 | + THEN: File should contain only the new content |
| 61 | + AND: Original content should be replaced |
| 62 | + """ |
| 63 | + tmp_file.write_text("old content\n", encoding="utf-8") |
| 64 | + |
| 65 | + safe_write(str(tmp_file), lambda f: f.write("new content\n")) |
| 66 | + |
| 67 | + assert tmp_file.read_text(encoding="utf-8") == "new content\n" |
| 68 | + |
| 69 | + def test_file_is_fully_written_before_target_is_replaced(self, tmp_file: Path) -> None: |
| 70 | + """ |
| 71 | + File is fully written to a temp location before replacing the target. |
| 72 | +
|
| 73 | + GIVEN: A target file with existing content |
| 74 | + WHEN: safe_write is called with large content |
| 75 | + THEN: Target should contain the complete new content |
| 76 | + AND: File should be readable immediately after write |
| 77 | + """ |
| 78 | + large_content = "x" * 100000 + "\n" |
| 79 | + |
| 80 | + safe_write(str(tmp_file), lambda f: f.write(large_content)) |
| 81 | + |
| 82 | + result = tmp_file.read_text(encoding="utf-8") |
| 83 | + assert result == large_content |
| 84 | + |
| 85 | + def test_user_can_write_json_content(self, tmp_file: Path) -> None: |
| 86 | + """ |
| 87 | + User can write JSON content to a file using safe_write. |
| 88 | +
|
| 89 | + GIVEN: A dictionary of data to persist |
| 90 | + WHEN: User writes JSON data via safe_write |
| 91 | + THEN: File should contain valid JSON with the expected data |
| 92 | + """ |
| 93 | + data = {"key": "value", "number": 42, "nested": {"list": [1, 2, 3]}} |
| 94 | + |
| 95 | + safe_write(str(tmp_file), lambda f: json.dump(data, f, indent=2)) |
| 96 | + |
| 97 | + written = json.loads(tmp_file.read_text(encoding="utf-8")) |
| 98 | + assert written == data |
| 99 | + |
| 100 | + def test_user_can_write_unicode_content(self, tmp_file: Path) -> None: |
| 101 | + """ |
| 102 | + User can write Unicode content to a file using safe_write. |
| 103 | +
|
| 104 | + GIVEN: Text content containing non-ASCII Unicode characters |
| 105 | + WHEN: User writes the content via safe_write |
| 106 | + THEN: File should be readable with the correct Unicode content preserved |
| 107 | + """ |
| 108 | + unicode_content = "Héllo Wörld — 日本語テスト\n" |
| 109 | + |
| 110 | + safe_write(str(tmp_file), lambda f: f.write(unicode_content)) |
| 111 | + |
| 112 | + assert tmp_file.read_text(encoding="utf-8") == unicode_content |
| 113 | + |
| 114 | + def test_no_temporary_files_remain_after_successful_write(self, tmp_path: Path) -> None: |
| 115 | + """ |
| 116 | + No temporary files remain after a successful write operation. |
| 117 | +
|
| 118 | + GIVEN: A target file path |
| 119 | + WHEN: safe_write completes successfully |
| 120 | + THEN: No .tmp files should remain in the directory |
| 121 | + """ |
| 122 | + tmp_file = tmp_path / "output.json" |
| 123 | + |
| 124 | + safe_write(str(tmp_file), lambda f: f.write("content")) |
| 125 | + |
| 126 | + tmp_files = list(tmp_path.glob("*.tmp")) |
| 127 | + assert not tmp_files, f"Found leftover temp files: {tmp_files}" |
| 128 | + |
| 129 | + |
| 130 | +class TestSafeWriteFilePermissions: |
| 131 | + """Tests for safe_write file permission handling.""" |
| 132 | + |
| 133 | + def test_new_file_gets_default_permissions(self, tmp_file: Path) -> None: |
| 134 | + """ |
| 135 | + New file created by safe_write gets reasonable default permissions. |
| 136 | +
|
| 137 | + GIVEN: A target file path that does not exist |
| 138 | + WHEN: safe_write creates the file |
| 139 | + THEN: File should have readable permissions |
| 140 | + """ |
| 141 | + safe_write(str(tmp_file), lambda f: f.write("content")) |
| 142 | + |
| 143 | + assert os.access(str(tmp_file), os.R_OK) |
| 144 | + assert os.access(str(tmp_file), os.W_OK) |
| 145 | + |
| 146 | + def test_file_permissions_are_preserved_on_overwrite(self, tmp_file: Path) -> None: |
| 147 | + """ |
| 148 | + Existing file permissions are preserved when safe_write overwrites the file. |
| 149 | +
|
| 150 | + GIVEN: An existing file with specific permissions |
| 151 | + WHEN: safe_write overwrites it |
| 152 | + THEN: The file permissions should be preserved (or close to original) |
| 153 | + """ |
| 154 | + tmp_file.write_text("original content", encoding="utf-8") |
| 155 | + original_mode = os.stat(str(tmp_file)).st_mode |
| 156 | + |
| 157 | + safe_write(str(tmp_file), lambda f: f.write("new content")) |
| 158 | + |
| 159 | + new_mode = os.stat(str(tmp_file)).st_mode |
| 160 | + # Permissions should be preserved (same mode bits for owner/group/other read+write) |
| 161 | + assert (new_mode & 0o666) == (original_mode & 0o666) |
| 162 | + |
| 163 | + def test_safe_write_handles_file_not_found_for_stat(self, tmp_path: Path) -> None: |
| 164 | + """ |
| 165 | + safe_write handles the case where the target doesn't exist (for stat). |
| 166 | +
|
| 167 | + GIVEN: A target file path that does not exist |
| 168 | + WHEN: safe_write tries to stat the target to preserve permissions |
| 169 | + THEN: FileNotFoundError should be handled gracefully |
| 170 | + AND: The write should still succeed with default permissions |
| 171 | + """ |
| 172 | + new_file = tmp_path / "nonexistent_parent" / "output.txt" |
| 173 | + new_file.parent.mkdir(parents=True) |
| 174 | + |
| 175 | + safe_write(str(new_file), lambda f: f.write("content")) |
| 176 | + |
| 177 | + assert new_file.exists() |
| 178 | + assert new_file.read_text(encoding="utf-8") == "content" |
| 179 | + |
| 180 | + |
| 181 | +class TestSafeWriteErrorHandling: |
| 182 | + """Tests for safe_write error handling behavior.""" |
| 183 | + |
| 184 | + def test_cleanup_happens_when_write_fails(self, tmp_path: Path) -> None: |
| 185 | + """ |
| 186 | + Temporary file is cleaned up when write function raises an exception. |
| 187 | +
|
| 188 | + GIVEN: A write function that raises an exception |
| 189 | + WHEN: safe_write is called and the write fails |
| 190 | + THEN: No temporary files should remain in the directory |
| 191 | + AND: An exception should be raised to the caller |
| 192 | + """ |
| 193 | + |
| 194 | + def failing_write(_f: io.TextIOWrapper) -> None: |
| 195 | + msg = "Simulated disk full" |
| 196 | + raise OSError(msg) |
| 197 | + |
| 198 | + tmp_file = tmp_path / "output.txt" |
| 199 | + |
| 200 | + with pytest.raises(OSError, match="Simulated disk full"): |
| 201 | + safe_write(str(tmp_file), failing_write) |
| 202 | + |
| 203 | + tmp_files = list(tmp_path.glob("*.tmp")) |
| 204 | + assert not tmp_files, f"Temp files not cleaned up: {tmp_files}" |
| 205 | + |
| 206 | + # Target should not have been created |
| 207 | + assert not tmp_file.exists() |
| 208 | + |
| 209 | + def test_directory_fsync_failure_does_not_break_write(self, tmp_file: Path) -> None: |
| 210 | + """ |
| 211 | + Directory fsync failure does not break the write operation. |
| 212 | +
|
| 213 | + GIVEN: A system where directory fsync fails |
| 214 | + WHEN: safe_write is called |
| 215 | + THEN: The file write should still succeed |
| 216 | + AND: The error from fsync should be silently ignored |
| 217 | + """ |
| 218 | + with patch("os.fsync", side_effect=[None, OSError("fsync failed")]): |
| 219 | + safe_write(str(tmp_file), lambda f: f.write("test content")) |
| 220 | + |
| 221 | + assert tmp_file.exists() |
| 222 | + assert tmp_file.read_text(encoding="utf-8") == "test content" |
| 223 | + |
| 224 | + |
| 225 | +class TestMakeCapturesSafeWrite: |
| 226 | + """Tests for the make_capture_safe_write helper function.""" |
| 227 | + |
| 228 | + def test_capture_helper_records_written_json_data(self) -> None: |
| 229 | + """ |
| 230 | + make_capture_safe_write helper records JSON data written via safe_write. |
| 231 | +
|
| 232 | + GIVEN: A mock that uses make_capture_safe_write |
| 233 | + WHEN: A function writes JSON data through the mock |
| 234 | + THEN: The captured_data should contain the written JSON |
| 235 | + AND: called[0] should be True |
| 236 | + """ |
| 237 | + captured_data, called, side_effect = make_capture_safe_write() |
| 238 | + |
| 239 | + assert called[0] is False |
| 240 | + assert not captured_data |
| 241 | + |
| 242 | + # Call the side_effect directly with a write function |
| 243 | + test_data = {"key": "value", "number": 42} |
| 244 | + |
| 245 | + def write_json(f: io.TextIOWrapper) -> None: |
| 246 | + json.dump(test_data, f) |
| 247 | + |
| 248 | + side_effect("/fake/path", write_json) |
| 249 | + |
| 250 | + assert called[0] is True |
| 251 | + assert captured_data == test_data |
| 252 | + |
| 253 | + def test_capture_helper_can_be_used_as_mock_side_effect(self) -> None: |
| 254 | + """ |
| 255 | + make_capture_safe_write helper integrates correctly as a mock side_effect. |
| 256 | +
|
| 257 | + GIVEN: A function that calls safe_write internally |
| 258 | + WHEN: safe_write is mocked with the capture side_effect |
| 259 | + THEN: The written data should be captured for verification |
| 260 | + """ |
| 261 | + captured_data, called, side_effect = make_capture_safe_write() |
| 262 | + |
| 263 | + test_data = {"component": "Flight Controller", "version": "4.5.x"} |
| 264 | + |
| 265 | + with patch("ardupilot_methodic_configurator.backend_safe_file_io.safe_write") as mock_safe_write: |
| 266 | + mock_safe_write.side_effect = side_effect |
| 267 | + |
| 268 | + # Call something that uses safe_write internally |
| 269 | + sio.safe_write("/fake/path", lambda f: json.dump(test_data, f)) |
| 270 | + |
| 271 | + assert called[0] is True |
| 272 | + assert captured_data == test_data |
| 273 | + |
| 274 | + def test_capture_helper_returns_three_elements(self) -> None: |
| 275 | + """ |
| 276 | + make_capture_safe_write returns exactly three elements. |
| 277 | +
|
| 278 | + GIVEN: The make_capture_safe_write function |
| 279 | + WHEN: Called with no arguments |
| 280 | + THEN: Should return (captured_data, called, side_effect) tuple |
| 281 | + AND: captured_data should be empty dict initially |
| 282 | + AND: called should be [False] initially |
| 283 | + AND: side_effect should be callable |
| 284 | + """ |
| 285 | + result = make_capture_safe_write() |
| 286 | + |
| 287 | + assert len(result) == 3 |
| 288 | + captured_data, called, side_effect = result |
| 289 | + assert isinstance(captured_data, dict) |
| 290 | + assert len(captured_data) == 0 |
| 291 | + assert isinstance(called, list) |
| 292 | + assert called == [False] |
| 293 | + assert callable(side_effect) |
0 commit comments