|
10 | 10 | # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF |
11 | 11 | # ANY KIND, either express or implied. See the License for the specific |
12 | 12 | # language governing permissions and limitations under the License. |
| 13 | +import os |
13 | 14 | import sqlite3 |
| 15 | +import stat |
| 16 | +from pathlib import Path |
14 | 17 | from unittest.mock import MagicMock, PropertyMock, patch |
15 | 18 |
|
16 | 19 | import pytest |
|
27 | 30 | CLISessionOrchestrator, |
28 | 31 | add_session_id_component_to_user_agent_extra, |
29 | 32 | ) |
| 33 | +from awscli.testutils import FileCreator |
30 | 34 | from tests.markers import skip_if_windows |
31 | 35 |
|
32 | 36 |
|
@@ -130,6 +134,71 @@ def execute(self, query, *parameters): |
130 | 134 | assert cursor.fetchall() == [] |
131 | 135 |
|
132 | 136 |
|
| 137 | +@skip_if_windows |
| 138 | +class TestCLISessionDatabaseConnectionPermissions: |
| 139 | + def setup_method(self): |
| 140 | + self.files = FileCreator() |
| 141 | + |
| 142 | + def teardown_method(self): |
| 143 | + self.files.remove_all() |
| 144 | + |
| 145 | + def test_create_directory_with_secure_permissions(self): |
| 146 | + cache_dir = self.files.full_path('cache') |
| 147 | + cache_path = Path(cache_dir) |
| 148 | + |
| 149 | + with patch('awscli.telemetry._CACHE_DIR', cache_path): |
| 150 | + with patch('awscli.telemetry._DATABASE_FILENAME', 'session.db'): |
| 151 | + conn = CLISessionDatabaseConnection() |
| 152 | + conn._connection.close() |
| 153 | + |
| 154 | + assert os.path.isdir(cache_dir) |
| 155 | + dir_mode = stat.S_IMODE(os.stat(cache_dir).st_mode) |
| 156 | + assert dir_mode == 0o700 |
| 157 | + |
| 158 | + def test_tighten_existing_directory_permissions(self): |
| 159 | + cache_dir = self.files.full_path('cache') |
| 160 | + os.makedirs(cache_dir, mode=0o755) |
| 161 | + cache_path = Path(cache_dir) |
| 162 | + |
| 163 | + with patch('awscli.telemetry._CACHE_DIR', cache_path): |
| 164 | + with patch('awscli.telemetry._DATABASE_FILENAME', 'session.db'): |
| 165 | + conn = CLISessionDatabaseConnection() |
| 166 | + conn._connection.close() |
| 167 | + |
| 168 | + dir_mode = stat.S_IMODE(os.stat(cache_dir).st_mode) |
| 169 | + assert dir_mode == 0o700 |
| 170 | + |
| 171 | + def test_create_database_file_with_secure_permissions(self): |
| 172 | + cache_dir = self.files.full_path('cache') |
| 173 | + db_file = os.path.join(cache_dir, 'session.db') |
| 174 | + cache_path = Path(cache_dir) |
| 175 | + |
| 176 | + with patch('awscli.telemetry._CACHE_DIR', cache_path): |
| 177 | + with patch('awscli.telemetry._DATABASE_FILENAME', 'session.db'): |
| 178 | + conn = CLISessionDatabaseConnection() |
| 179 | + conn._connection.close() |
| 180 | + |
| 181 | + assert os.path.isfile(db_file) |
| 182 | + file_mode = stat.S_IMODE(os.stat(db_file).st_mode) |
| 183 | + assert file_mode == 0o600 |
| 184 | + |
| 185 | + def test_tighten_existing_database_file_permissions(self): |
| 186 | + cache_dir = self.files.full_path('cache') |
| 187 | + os.makedirs(cache_dir, mode=0o700) |
| 188 | + db_file = os.path.join(cache_dir, 'session.db') |
| 189 | + open(db_file, 'a').close() |
| 190 | + os.chmod(db_file, 0o644) |
| 191 | + cache_path = Path(cache_dir) |
| 192 | + |
| 193 | + with patch('awscli.telemetry._CACHE_DIR', cache_path): |
| 194 | + with patch('awscli.telemetry._DATABASE_FILENAME', 'session.db'): |
| 195 | + conn = CLISessionDatabaseConnection() |
| 196 | + conn._connection.close() |
| 197 | + |
| 198 | + file_mode = stat.S_IMODE(os.stat(db_file).st_mode) |
| 199 | + assert file_mode == 0o600 |
| 200 | + |
| 201 | + |
133 | 202 | class TestCLISessionDatabaseWriter: |
134 | 203 | def test_write(self, session_writer, session_reader, session_sweeper): |
135 | 204 | session_writer.write( |
|
0 commit comments