|
3 | 3 | import tempfile |
4 | 4 | import os |
5 | 5 | import sys |
| 6 | +import shutil |
| 7 | +from unittest.mock import patch |
6 | 8 | from dotenv import load_dotenv |
7 | 9 |
|
| 10 | +# Load environment variables |
8 | 11 | load_dotenv() |
9 | 12 | sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/..") |
10 | 13 |
|
| 14 | +from src import file_store |
11 | 15 | from src.file_store import read, write |
12 | 16 |
|
13 | | -class TestExample(unittest.TestCase): |
| 17 | +class TestFileStore(unittest.TestCase): |
14 | 18 |
|
15 | 19 | def setUp(self): |
16 | | - self.test_user_id = 'test_user_id' |
17 | | - self.test_data = {'key': 'value'} |
| 20 | + # Create temporary directory for testing |
| 21 | + self.temp_dir = tempfile.mkdtemp(prefix='test_file_store_') |
18 | 22 |
|
19 | | - def test_write(self): |
20 | | - # Use temporary file for testing |
21 | | - with tempfile.NamedTemporaryFile() as f: |
| 23 | + # Set FILE_STORE_PATH to our temp directory |
| 24 | + self.env_patcher = patch.dict(os.environ, {'FILE_STORE_PATH': self.temp_dir}) |
| 25 | + self.env_patcher.start() |
22 | 26 |
|
23 | | - # Write to file |
24 | | - write_status, write_code = write(self.test_data, self.test_user_id) |
| 27 | + # Re-import file_store to pick up the new environment variable |
| 28 | + import importlib |
| 29 | + importlib.reload(file_store) |
| 30 | + from src.file_store import read, write |
| 31 | + globals()['read'] = read |
| 32 | + globals()['write'] = write |
25 | 33 |
|
26 | | - # Test status and code |
27 | | - self.assertEqual(write_status, 'ok') |
28 | | - self.assertEqual(write_code, 200) |
| 34 | + # Test data |
| 35 | + self.test_user_id = 'testuser1' |
| 36 | + self.test_data = {'key': 'value', 'data': [1, 2, 3]} |
| 37 | + self.empty_data = {} |
| 38 | + self.none_data = None |
29 | 39 |
|
30 | | - # Read from file |
31 | | - read_data, read_code = read(self.test_user_id) |
| 40 | + def tearDown(self): |
| 41 | + self.env_patcher.stop() |
| 42 | + # Clean up temp directory |
| 43 | + shutil.rmtree(self.temp_dir, ignore_errors=True) |
32 | 44 |
|
33 | | - # Test read data and code |
34 | | - self.assertEqual(read_data, self.test_data) |
35 | | - self.assertEqual(read_code, 200) |
| 45 | + def test_write_valid_data(self): |
| 46 | + """Test writing valid data""" |
| 47 | + status, code = write(self.test_data, self.test_user_id) |
| 48 | + self.assertEqual(status, 'ok') |
| 49 | + self.assertEqual(code, 200) |
36 | 50 |
|
37 | | - def test_read_not_found(self): |
38 | | - # Use temporary file for testing |
39 | | - with tempfile.TemporaryDirectory() as tempdir: |
40 | | - os.environ['FILE_STORE_PATH'] = tempdir |
| 51 | + # Verify file was created and contains correct data |
| 52 | + data, read_code = read(self.test_user_id) |
| 53 | + self.assertEqual(read_code, 200) |
| 54 | + self.assertEqual(data, self.test_data) |
41 | 55 |
|
42 | | - # Read from unexisting file |
43 | | - read_data, read_code = read(self.test_user_id + '0000') |
| 56 | + def test_write_empty_data(self): |
| 57 | + """Test writing empty dictionary""" |
| 58 | + status, code = write(self.empty_data, self.test_user_id) |
| 59 | + self.assertEqual(status, 'ok') |
| 60 | + self.assertEqual(code, 200) |
44 | 61 |
|
45 | | - # Test not found data and code |
46 | | - self.assertEqual(read_data, 'File not found') |
47 | | - self.assertEqual(read_code, 404) |
| 62 | + data, read_code = read(self.test_user_id) |
| 63 | + self.assertEqual(read_code, 200) |
| 64 | + self.assertEqual(data, {}) |
48 | 65 |
|
49 | | - def test_read(self): |
50 | | - # Use temporary file for testing |
51 | | - with tempfile.TemporaryDirectory() as tempdir: |
52 | | - # Read from unexisting file |
53 | | - write(self.test_data, self.test_user_id) |
54 | | - read_data, read_code = read(self.test_user_id) |
| 66 | + def test_write_none_data(self): |
| 67 | + """Test writing None data (should be converted to empty dict)""" |
| 68 | + status, code = write(self.none_data, self.test_user_id) |
| 69 | + self.assertEqual(status, 'ok') |
| 70 | + self.assertEqual(code, 200) |
55 | 71 |
|
56 | | - # Test not found data and code |
57 | | - self.assertEqual(read_data, self.test_data) |
58 | | - self.assertEqual(read_code, 200) |
| 72 | + data, read_code = read(self.test_user_id) |
| 73 | + self.assertEqual(read_code, 200) |
| 74 | + self.assertEqual(data, {}) |
| 75 | + |
| 76 | + def test_write_complex_data(self): |
| 77 | + """Test writing complex nested data""" |
| 78 | + complex_data = { |
| 79 | + 'users': [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}], |
| 80 | + 'settings': {'theme': 'dark', 'notifications': True}, |
| 81 | + 'timestamp': '2025-11-10T16:59:40' |
| 82 | + } |
| 83 | + status, code = write(complex_data, self.test_user_id) |
| 84 | + self.assertEqual(status, 'ok') |
| 85 | + self.assertEqual(code, 200) |
| 86 | + |
| 87 | + data, read_code = read(self.test_user_id) |
| 88 | + self.assertEqual(read_code, 200) |
| 89 | + self.assertEqual(data, complex_data) |
| 90 | + |
| 91 | + def test_read_existing_file(self): |
| 92 | + """Test reading existing file""" |
| 93 | + write(self.test_data, self.test_user_id) |
| 94 | + data, code = read(self.test_user_id) |
| 95 | + self.assertEqual(code, 200) |
| 96 | + self.assertEqual(data, self.test_data) |
| 97 | + |
| 98 | + def test_read_nonexistent_file(self): |
| 99 | + """Test reading non-existent file""" |
| 100 | + data, code = read('nonexistent_user') |
| 101 | + self.assertEqual(code, 404) |
| 102 | + self.assertEqual(data, 'File not found') |
| 103 | + |
| 104 | + def test_read_after_write_overwrite(self): |
| 105 | + """Test overwriting existing data""" |
| 106 | + # Write initial data |
| 107 | + write({'initial': 'data'}, self.test_user_id) |
| 108 | + |
| 109 | + # Overwrite with new data |
| 110 | + new_data = {'updated': 'data', 'version': 2} |
| 111 | + write(new_data, self.test_user_id) |
| 112 | + |
| 113 | + # Read should return new data |
| 114 | + data, code = read(self.test_user_id) |
| 115 | + self.assertEqual(code, 200) |
| 116 | + self.assertEqual(data, new_data) |
| 117 | + |
| 118 | + def test_multiple_users_isolation(self): |
| 119 | + """Test that different users have isolated data""" |
| 120 | + user1_data = {'user': 'user1', 'value': 100} |
| 121 | + user2_data = {'user': 'user2', 'value': 200} |
| 122 | + |
| 123 | + write(user1_data, 'user1') |
| 124 | + write(user2_data, 'user2') |
| 125 | + |
| 126 | + data1, code1 = read('user1') |
| 127 | + data2, code2 = read('user2') |
| 128 | + |
| 129 | + self.assertEqual(code1, 200) |
| 130 | + self.assertEqual(code2, 200) |
| 131 | + self.assertEqual(data1, user1_data) |
| 132 | + self.assertEqual(data2, user2_data) |
| 133 | + self.assertNotEqual(data1, data2) |
| 134 | + |
| 135 | + def test_file_path_construction(self): |
| 136 | + """Test that files are created in the correct location""" |
| 137 | + write(self.test_data, self.test_user_id) |
| 138 | + |
| 139 | + # Check if file exists in temp directory |
| 140 | + expected_path = os.path.join(self.temp_dir, f"{self.test_user_id}.json") |
| 141 | + self.assertTrue(os.path.exists(expected_path)) |
| 142 | + |
| 143 | + # Verify file contents |
| 144 | + with open(expected_path, 'r') as f: |
| 145 | + file_data = json.load(f) |
| 146 | + self.assertEqual(file_data, self.test_data) |
| 147 | + |
| 148 | + def test_write_creates_directory_if_needed(self): |
| 149 | + """Test that write creates necessary directories""" |
| 150 | + # This test assumes FILE_STORE_PATH is set to a temp directory |
| 151 | + # The write function should work even if subdirectories don't exist |
| 152 | + status, code = write(self.test_data, self.test_user_id) |
| 153 | + self.assertEqual(status, 'ok') |
| 154 | + self.assertEqual(code, 200) |
| 155 | + |
| 156 | + def test_read_file_corruption(self): |
| 157 | + """Test reading corrupted JSON file""" |
| 158 | + # Create a corrupted JSON file manually |
| 159 | + file_path = os.path.join(self.temp_dir, f"{self.test_user_id}.json") |
| 160 | + with open(file_path, 'w') as f: |
| 161 | + f.write('{"invalid": json syntax}') |
| 162 | + |
| 163 | + # This should raise a JSONDecodeError since the file_store doesn't handle corrupted JSON |
| 164 | + with self.assertRaises(json.JSONDecodeError): |
| 165 | + read(self.test_user_id) |
| 166 | + |
| 167 | + def test_environment_variable_file_store_path(self): |
| 168 | + """Test that FILE_STORE_PATH environment variable is respected""" |
| 169 | + # With our mocking, files should go to temp_dir |
| 170 | + write(self.test_data, self.test_user_id) |
| 171 | + expected_path = os.path.join(self.temp_dir, f"{self.test_user_id}.json") |
| 172 | + |
| 173 | + # The file should exist in the mocked temp directory |
| 174 | + self.assertTrue(os.path.exists(expected_path)) |
| 175 | + |
| 176 | + # Clean up |
| 177 | + if os.path.exists(expected_path): |
| 178 | + os.remove(expected_path) |
59 | 179 |
|
60 | 180 | if __name__ == '__main__': |
61 | 181 | unittest.main() |
0 commit comments