|
| 1 | +import dataclasses |
| 2 | +import importlib |
| 3 | +import json |
| 4 | +import logging |
| 5 | +import unittest |
| 6 | +import warnings |
| 7 | +from typing import Any, cast |
| 8 | +from unittest.mock import MagicMock, patch |
| 9 | + |
| 10 | +from pythonjsonlogger.json import JsonFormatter |
| 11 | + |
| 12 | +import pyoaev.utils as module |
| 13 | + |
| 14 | + |
| 15 | +@dataclasses.dataclass |
| 16 | +class _SampleData: |
| 17 | + value: int |
| 18 | + |
| 19 | + |
| 20 | +class TestUtils(unittest.TestCase): |
| 21 | + def test_custom_json_formatter_inherits_non_deprecated_formatter(self): |
| 22 | + self.assertTrue(issubclass(module.CustomJsonFormatter, JsonFormatter)) |
| 23 | + |
| 24 | + def test_reloading_utils_does_not_raise_jsonlogger_deprecation_warning(self): |
| 25 | + with warnings.catch_warnings(record=True) as caught: |
| 26 | + warnings.simplefilter("always") |
| 27 | + importlib.reload(module) |
| 28 | + |
| 29 | + deprecation_messages = [ |
| 30 | + str(warning.message) |
| 31 | + for warning in caught |
| 32 | + if issubclass(warning.category, DeprecationWarning) |
| 33 | + ] |
| 34 | + self.assertFalse( |
| 35 | + any( |
| 36 | + "pythonjsonlogger.jsonlogger has been moved" in message |
| 37 | + for message in deprecation_messages |
| 38 | + ) |
| 39 | + ) |
| 40 | + |
| 41 | + def test_get_content_type(self): |
| 42 | + self.assertEqual( |
| 43 | + module.get_content_type("application/json; charset=utf-8"), |
| 44 | + "application/json", |
| 45 | + ) |
| 46 | + |
| 47 | + def test_copy_dict_flattens_nested_dict(self): |
| 48 | + destination = {} |
| 49 | + module.copy_dict( |
| 50 | + src={"a": 1, "meta": {"x": "y", "n": 2}}, |
| 51 | + dest=destination, |
| 52 | + ) |
| 53 | + self.assertEqual(destination, {"a": 1, "meta[x]": "y", "meta[n]": 2}) |
| 54 | + |
| 55 | + def test_remove_none_from_dict(self): |
| 56 | + self.assertEqual(module.remove_none_from_dict({"a": 1, "b": None}), {"a": 1}) |
| 57 | + |
| 58 | + def test_encoded_id_from_existing_instance_returns_same_object(self): |
| 59 | + encoded_id = module.EncodedId("with space") |
| 60 | + self.assertIs(module.EncodedId(encoded_id), encoded_id) |
| 61 | + |
| 62 | + def test_encoded_id_encodes_string_and_keeps_int(self): |
| 63 | + self.assertEqual(module.EncodedId("a/b c"), "a%2Fb%20c") |
| 64 | + self.assertEqual(module.EncodedId(42), "42") |
| 65 | + |
| 66 | + def test_encoded_id_rejects_unsupported_type(self): |
| 67 | + with self.assertRaises(TypeError): |
| 68 | + module.EncodedId(cast(Any, ["bad"])) |
| 69 | + |
| 70 | + def test_enhanced_json_encoder_serializes_dataclasses(self): |
| 71 | + self.assertEqual( |
| 72 | + json.dumps(_SampleData(value=3), cls=module.EnhancedJSONEncoder), |
| 73 | + '{"value": 3}', |
| 74 | + ) |
| 75 | + |
| 76 | + def test_required_optional_required_attribute_missing(self): |
| 77 | + rules = module.RequiredOptional(required=("name",)) |
| 78 | + with self.assertRaises(AttributeError): |
| 79 | + rules.validate_attrs(data={}) |
| 80 | + |
| 81 | + def test_required_optional_excludes_required_attribute(self): |
| 82 | + rules = module.RequiredOptional(required=("name",)) |
| 83 | + rules.validate_attrs(data={}, excludes=["name"]) |
| 84 | + |
| 85 | + def test_required_optional_exclusive_allows_only_one_key(self): |
| 86 | + rules = module.RequiredOptional(exclusive=("a", "b")) |
| 87 | + with self.assertRaises(AttributeError): |
| 88 | + rules.validate_attrs(data={"a": 1, "b": 2}) |
| 89 | + |
| 90 | + def test_required_optional_exclusive_requires_one_key(self): |
| 91 | + rules = module.RequiredOptional(exclusive=("a", "b")) |
| 92 | + with self.assertRaises(AttributeError): |
| 93 | + rules.validate_attrs(data={}) |
| 94 | + |
| 95 | + def test_required_optional_exclusive_with_single_key_is_valid(self): |
| 96 | + rules = module.RequiredOptional(exclusive=("a", "b")) |
| 97 | + rules.validate_attrs(data={"a": 1}) |
| 98 | + |
| 99 | + def test_response_content_returns_iterator_when_requested(self): |
| 100 | + response = MagicMock() |
| 101 | + response.iter_content.return_value = iter([b"a", b"b"]) |
| 102 | + |
| 103 | + iterator = module.response_content( |
| 104 | + response, |
| 105 | + streamed=False, |
| 106 | + action=None, |
| 107 | + chunk_size=10, |
| 108 | + iterator=True, |
| 109 | + ) |
| 110 | + |
| 111 | + self.assertEqual(list(iterator), [b"a", b"b"]) |
| 112 | + |
| 113 | + def test_response_content_returns_raw_content_when_not_streamed(self): |
| 114 | + response = MagicMock() |
| 115 | + response.content = b"payload" |
| 116 | + |
| 117 | + data = module.response_content( |
| 118 | + response, |
| 119 | + streamed=False, |
| 120 | + action=None, |
| 121 | + chunk_size=10, |
| 122 | + iterator=False, |
| 123 | + ) |
| 124 | + |
| 125 | + self.assertEqual(data, b"payload") |
| 126 | + |
| 127 | + def test_response_content_streamed_uses_action_for_non_empty_chunks(self): |
| 128 | + response = MagicMock() |
| 129 | + response.iter_content.return_value = [b"one", b"", b"two"] |
| 130 | + action = MagicMock() |
| 131 | + |
| 132 | + returned = module.response_content( |
| 133 | + response, |
| 134 | + streamed=True, |
| 135 | + action=action, |
| 136 | + chunk_size=10, |
| 137 | + iterator=False, |
| 138 | + ) |
| 139 | + |
| 140 | + self.assertIsNone(returned) |
| 141 | + action.assert_any_call(b"one") |
| 142 | + action.assert_any_call(b"two") |
| 143 | + self.assertEqual(action.call_count, 2) |
| 144 | + |
| 145 | + def test_response_content_streamed_defaults_to_stdout_stream(self): |
| 146 | + response = MagicMock() |
| 147 | + response.iter_content.return_value = [b"visible"] |
| 148 | + |
| 149 | + with patch("builtins.print") as mock_print: |
| 150 | + module.response_content( |
| 151 | + response, |
| 152 | + streamed=True, |
| 153 | + action=None, |
| 154 | + chunk_size=10, |
| 155 | + iterator=False, |
| 156 | + ) |
| 157 | + |
| 158 | + mock_print.assert_called_once_with(b"visible") |
| 159 | + |
| 160 | + def test_custom_json_formatter_add_fields_sets_timestamp_and_level(self): |
| 161 | + formatter = module.CustomJsonFormatter("%(message)s") |
| 162 | + record = logging.LogRecord( |
| 163 | + name="test", |
| 164 | + level=logging.INFO, |
| 165 | + pathname=__file__, |
| 166 | + lineno=1, |
| 167 | + msg="hello", |
| 168 | + args=(), |
| 169 | + exc_info=None, |
| 170 | + ) |
| 171 | + log_record = {} |
| 172 | + |
| 173 | + formatter.add_fields(log_record, record, {}) |
| 174 | + |
| 175 | + self.assertIn("timestamp", log_record) |
| 176 | + self.assertEqual(log_record["level"], "INFO") |
| 177 | + |
| 178 | + def test_setup_logging_config_json_logging_true_uses_custom_formatter(self): |
| 179 | + with patch("pyoaev.utils.logging.basicConfig") as mock_basic_config: |
| 180 | + module.setup_logging_config(logging.INFO, json_logging=True) |
| 181 | + |
| 182 | + kwargs = mock_basic_config.call_args.kwargs |
| 183 | + self.assertEqual(kwargs["level"], logging.INFO) |
| 184 | + self.assertIn("handlers", kwargs) |
| 185 | + self.assertIsInstance( |
| 186 | + kwargs["handlers"][0].formatter, module.CustomJsonFormatter |
| 187 | + ) |
| 188 | + |
| 189 | + def test_setup_logging_config_json_logging_false_calls_basic_config(self): |
| 190 | + with patch("pyoaev.utils.logging.basicConfig") as mock_basic_config: |
| 191 | + module.setup_logging_config(logging.WARNING, json_logging=False) |
| 192 | + |
| 193 | + mock_basic_config.assert_called_once_with(level=logging.WARNING) |
| 194 | + |
| 195 | + def test_app_logger_methods_delegate_to_local_logger(self): |
| 196 | + with patch("pyoaev.utils.setup_logging_config"): |
| 197 | + app_logger = module.AppLogger(logging.INFO) |
| 198 | + app_logger.local_logger = MagicMock() |
| 199 | + |
| 200 | + app_logger.debug("d", {"x": 1}) |
| 201 | + app_logger.info("i") |
| 202 | + app_logger.warning("w") |
| 203 | + app_logger.error("e") |
| 204 | + |
| 205 | + self.assertTrue(app_logger.local_logger.debug.called) |
| 206 | + self.assertTrue(app_logger.local_logger.info.called) |
| 207 | + self.assertTrue(app_logger.local_logger.warning.called) |
| 208 | + self.assertTrue(app_logger.local_logger.error.called) |
| 209 | + |
| 210 | + def test_logger_helper_returns_app_logger(self): |
| 211 | + with patch("pyoaev.utils.setup_logging_config"): |
| 212 | + helper = module.logger(logging.INFO) |
| 213 | + self.assertIsInstance(helper, module.AppLogger) |
| 214 | + |
| 215 | + def test_pingalive_ping_uses_injector_branch(self): |
| 216 | + api = MagicMock() |
| 217 | + logger = MagicMock() |
| 218 | + ping_alive = module.PingAlive( |
| 219 | + api=api, config={"id": 1}, logger=logger, ping_type="injector" |
| 220 | + ) |
| 221 | + ping_alive.exit_event.is_set = MagicMock(side_effect=[False, True]) |
| 222 | + ping_alive.exit_event.wait = MagicMock() |
| 223 | + |
| 224 | + ping_alive.ping() |
| 225 | + |
| 226 | + api.injector.create.assert_called_once_with({"id": 1}, False) |
| 227 | + ping_alive.exit_event.wait.assert_called_once_with(40) |
| 228 | + |
| 229 | + def test_pingalive_ping_uses_collector_branch_and_logs_errors(self): |
| 230 | + api = MagicMock() |
| 231 | + api.collector.create.side_effect = Exception("boom") |
| 232 | + logger = MagicMock() |
| 233 | + ping_alive = module.PingAlive( |
| 234 | + api=api, config={}, logger=logger, ping_type="collector" |
| 235 | + ) |
| 236 | + ping_alive.exit_event.is_set = MagicMock(side_effect=[False, True]) |
| 237 | + ping_alive.exit_event.wait = MagicMock() |
| 238 | + |
| 239 | + ping_alive.ping() |
| 240 | + |
| 241 | + logger.error.assert_called_once() |
| 242 | + ping_alive.exit_event.wait.assert_called_once_with(40) |
| 243 | + |
| 244 | + def test_pingalive_run_and_stop(self): |
| 245 | + ping_alive = module.PingAlive( |
| 246 | + api=MagicMock(), config={}, logger=MagicMock(), ping_type="collector" |
| 247 | + ) |
| 248 | + ping_alive.ping = MagicMock() |
| 249 | + |
| 250 | + ping_alive.run() |
| 251 | + ping_alive.stop() |
| 252 | + |
| 253 | + ping_alive.logger.info.assert_any_call("Starting PingAlive thread") |
| 254 | + ping_alive.logger.info.assert_any_call("Preparing PingAlive for clean shutdown") |
| 255 | + self.assertTrue(ping_alive.exit_event.is_set()) |
| 256 | + |
| 257 | + |
| 258 | +if __name__ == "__main__": |
| 259 | + unittest.main() |
0 commit comments