-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathtest_traced_exception.py
More file actions
257 lines (208 loc) · 9.88 KB
/
test_traced_exception.py
File metadata and controls
257 lines (208 loc) · 9.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import orjson
import pytest
from airbyte_cdk.models import (
AirbyteErrorTraceMessage,
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteTraceMessage,
FailureType,
Status,
StreamDescriptor,
TraceType,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
_AN_EXCEPTION = ValueError("An exception")
_A_STREAM_DESCRIPTOR = StreamDescriptor(name="a_stream")
_ANOTHER_STREAM_DESCRIPTOR = StreamDescriptor(name="another_stream")
@pytest.fixture
def raised_exception():
try:
raise RuntimeError("an error has occurred")
except RuntimeError as e:
return e
def test_build_from_existing_exception(raised_exception):
traced_exc = AirbyteTracedException.from_exception(
raised_exception, message="my user-friendly message"
)
assert traced_exc.message == "my user-friendly message"
assert traced_exc.internal_message == "an error has occurred"
assert traced_exc.failure_type == FailureType.system_error
assert traced_exc._exception == raised_exception
def test_exception_as_airbyte_message():
traced_exc = AirbyteTracedException("an internal message")
airbyte_message = traced_exc.as_airbyte_message()
assert isinstance(airbyte_message, AirbyteMessage)
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.ERROR
assert airbyte_message.trace.emitted_at > 0
assert airbyte_message.trace.error.failure_type == FailureType.system_error
assert (
airbyte_message.trace.error.message
== "Something went wrong in the connector. See the logs for more details."
)
assert airbyte_message.trace.error.internal_message == "an internal message"
assert (
airbyte_message.trace.error.stack_trace
== "airbyte_cdk.utils.traced_exception.AirbyteTracedException: an internal message\n"
)
def test_existing_exception_as_airbyte_message(raised_exception):
traced_exc = AirbyteTracedException.from_exception(raised_exception)
airbyte_message = traced_exc.as_airbyte_message()
assert isinstance(airbyte_message, AirbyteMessage)
assert airbyte_message.type == MessageType.TRACE
assert airbyte_message.trace.type == TraceType.ERROR
assert (
airbyte_message.trace.error.message
== "Something went wrong in the connector. See the logs for more details."
)
assert airbyte_message.trace.error.internal_message == "an error has occurred"
assert airbyte_message.trace.error.stack_trace.startswith("Traceback (most recent call last):")
assert airbyte_message.trace.error.stack_trace.endswith(
'raise RuntimeError("an error has occurred")\nRuntimeError: an error has occurred\n'
)
def test_config_error_as_connection_status_message():
traced_exc = AirbyteTracedException(
"an internal message",
message="Config validation error",
failure_type=FailureType.config_error,
)
airbyte_message = traced_exc.as_connection_status_message()
assert isinstance(airbyte_message, AirbyteMessage)
assert airbyte_message.type == MessageType.CONNECTION_STATUS
assert airbyte_message.connectionStatus.status == Status.FAILED
assert airbyte_message.connectionStatus.message == "Config validation error"
def test_other_error_as_connection_status_message():
traced_exc = AirbyteTracedException(
"an internal message", failure_type=FailureType.system_error
)
airbyte_message = traced_exc.as_connection_status_message()
assert airbyte_message is None
def test_emit_message(capsys):
traced_exc = AirbyteTracedException(
internal_message="internal message",
message="user-friendly message",
exception=RuntimeError("oh no"),
)
expected_message = AirbyteMessage(
type=MessageType.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=0.0,
error=AirbyteErrorTraceMessage(
failure_type=FailureType.system_error,
message="user-friendly message",
internal_message="internal message",
stack_trace="RuntimeError: oh no\n",
),
),
)
traced_exc.emit_message()
stdout = capsys.readouterr().out
printed_message = AirbyteMessageSerializer.load(orjson.loads(stdout))
printed_message.trace.emitted_at = 0.0
assert printed_message == expected_message
def test_given_both_init_and_as_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> (
None
):
traced_exc = AirbyteTracedException(stream_descriptor=_A_STREAM_DESCRIPTOR)
message = traced_exc.as_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
def test_given_both_init_and_as_sanitized_airbyte_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> (
None
):
traced_exc = AirbyteTracedException(stream_descriptor=_A_STREAM_DESCRIPTOR)
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
def test_given_both_from_exception_and_as_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> (
None
):
traced_exc = AirbyteTracedException.from_exception(
_AN_EXCEPTION, stream_descriptor=_A_STREAM_DESCRIPTOR
)
message = traced_exc.as_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
def test_given_both_from_exception_and_as_sanitized_airbyte_message_with_stream_descriptor_when_as_airbyte_message_use_init_stream_descriptor() -> (
None
):
traced_exc = AirbyteTracedException.from_exception(
_AN_EXCEPTION, stream_descriptor=_A_STREAM_DESCRIPTOR
)
message = traced_exc.as_sanitized_airbyte_message(stream_descriptor=_ANOTHER_STREAM_DESCRIPTOR)
assert message.trace.error.stream_descriptor == _A_STREAM_DESCRIPTOR
class TestAirbyteTracedExceptionStr:
"""Tests proving that __str__ returns user-facing message instead of internal_message."""
def test_str_returns_user_facing_message_when_both_set(self) -> None:
exc = AirbyteTracedException(
internal_message="raw API error: 401 Unauthorized",
message="Authentication credentials are invalid.",
)
assert str(exc) == "Authentication credentials are invalid."
def test_str_falls_back_to_internal_message_when_message_is_none(self) -> None:
exc = AirbyteTracedException(internal_message="an internal error")
assert str(exc) == "an internal error"
def test_str_returns_empty_string_when_both_none(self) -> None:
exc = AirbyteTracedException()
assert str(exc) == ""
def test_str_returns_message_when_internal_message_is_none(self) -> None:
exc = AirbyteTracedException(message="A user-friendly error occurred.")
assert str(exc) == "A user-friendly error occurred."
def test_str_used_in_fstring_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="internal detail",
message="Connection timed out.",
)
assert f"Error: {exc}" == "Error: Connection timed out."
def test_str_used_in_logging_format_returns_user_facing_message(self) -> None:
exc = AirbyteTracedException(
internal_message="socket.timeout: read timed out",
message="Request timed out.",
)
assert "Error: %s" % exc == "Error: Request timed out."
def test_args_still_contains_internal_message(self) -> None:
"""Verify args[0] is still internal_message for traceback formatting."""
exc = AirbyteTracedException(
internal_message="internal detail",
message="user-facing message",
)
assert exc.args[0] == "internal detail"
def test_str_on_subclass_inherits_behavior(self) -> None:
"""Verify subclasses inherit the __str__ override without needing their own."""
class CustomTracedException(AirbyteTracedException):
pass
exc = CustomTracedException(
internal_message="raw error",
message="User-friendly error.",
)
assert str(exc) == "User-friendly error."
def test_str_with_from_exception_factory(self) -> None:
original = ValueError("original error")
exc = AirbyteTracedException.from_exception(
original, message="A validation error occurred."
)
assert str(exc) == "A validation error occurred."
assert exc.internal_message == "original error"
def test_str_with_from_exception_without_message(self) -> None:
original = RuntimeError("runtime failure")
exc = AirbyteTracedException.from_exception(original)
assert str(exc) == "runtime failure"
def test_stack_trace_uses_str_representation(self) -> None:
"""Verify traceback one-liner uses __str__ (user-facing message)."""
exc = AirbyteTracedException(
internal_message="internal detail for traceback",
message="User sees this.",
)
airbyte_message = exc.as_airbyte_message()
assert "User sees this." in airbyte_message.trace.error.stack_trace
def test_internal_message_preserved_in_trace_error(self) -> None:
"""Verify internal_message is still available in the trace error for debugging."""
exc = AirbyteTracedException(
internal_message="raw API error: 401",
message="Authentication failed.",
)
airbyte_message = exc.as_airbyte_message()
assert airbyte_message.trace.error.internal_message == "raw API error: 401"
assert airbyte_message.trace.error.message == "Authentication failed."