|
7 | 7 | """ |
8 | 8 | import json |
9 | 9 | import socket |
| 10 | +import struct |
10 | 11 | import threading |
11 | 12 | import unittest |
12 | 13 | from unittest.mock import MagicMock |
13 | 14 | from unittest.mock import patch |
14 | 15 |
|
| 16 | +from singlestoredb.functions.ext.plugin.connection import _handle_connection_inner |
| 17 | +from singlestoredb.functions.ext.plugin.connection import _MAX_FUNCTION_NAME_LEN |
15 | 18 | from singlestoredb.functions.ext.plugin.connection import _recv_exact_py |
| 19 | +from singlestoredb.functions.ext.plugin.connection import PROTOCOL_VERSION |
16 | 20 | from singlestoredb.functions.ext.plugin.control import dispatch_control_signal |
17 | 21 | from singlestoredb.utils._lazy_import import get_numpy |
18 | 22 | from singlestoredb.utils._lazy_import import get_pandas |
@@ -215,5 +219,77 @@ def test_caching(self): |
215 | 219 | assert result1 is result2 |
216 | 220 |
|
217 | 221 |
|
| 222 | +class TestHandshakeProtocol(unittest.TestCase): |
| 223 | + """Tests for the binary handshake protocol in _handle_connection_inner.""" |
| 224 | + |
| 225 | + def _make_shared_registry(self): |
| 226 | + mock_reg = MagicMock() |
| 227 | + mock_reg.functions = {} |
| 228 | + mock_shared = MagicMock() |
| 229 | + mock_shared.get_thread_local_registry.return_value = mock_reg |
| 230 | + return mock_shared |
| 231 | + |
| 232 | + def test_eof_on_header(self): |
| 233 | + a, b = socket.socketpair() |
| 234 | + try: |
| 235 | + b.close() |
| 236 | + _handle_connection_inner( |
| 237 | + a, self._make_shared_registry(), threading.Event(), |
| 238 | + ) |
| 239 | + finally: |
| 240 | + a.close() |
| 241 | + |
| 242 | + def test_bad_protocol_version(self): |
| 243 | + a, b = socket.socketpair() |
| 244 | + try: |
| 245 | + header = struct.pack('<QQ', 999, 5) |
| 246 | + b.sendall(header) |
| 247 | + b.close() |
| 248 | + _handle_connection_inner( |
| 249 | + a, self._make_shared_registry(), threading.Event(), |
| 250 | + ) |
| 251 | + finally: |
| 252 | + a.close() |
| 253 | + |
| 254 | + def test_namelen_too_large(self): |
| 255 | + a, b = socket.socketpair() |
| 256 | + try: |
| 257 | + header = struct.pack('<QQ', PROTOCOL_VERSION, _MAX_FUNCTION_NAME_LEN + 1) |
| 258 | + b.sendall(header) |
| 259 | + b.close() |
| 260 | + _handle_connection_inner( |
| 261 | + a, self._make_shared_registry(), threading.Event(), |
| 262 | + ) |
| 263 | + finally: |
| 264 | + a.close() |
| 265 | + |
| 266 | + def test_namelen_at_limit_accepted(self): |
| 267 | + a, b = socket.socketpair() |
| 268 | + try: |
| 269 | + header = struct.pack('<QQ', PROTOCOL_VERSION, _MAX_FUNCTION_NAME_LEN) |
| 270 | + b.sendall(header) |
| 271 | + b.close() |
| 272 | + # Will fail at recvmsg (no ancdata) but header was accepted |
| 273 | + try: |
| 274 | + _handle_connection_inner( |
| 275 | + a, self._make_shared_registry(), threading.Event(), |
| 276 | + ) |
| 277 | + except (OSError, ValueError): |
| 278 | + pass |
| 279 | + finally: |
| 280 | + a.close() |
| 281 | + |
| 282 | + def test_short_header(self): |
| 283 | + a, b = socket.socketpair() |
| 284 | + try: |
| 285 | + b.sendall(b'\x00' * 8) |
| 286 | + b.close() |
| 287 | + _handle_connection_inner( |
| 288 | + a, self._make_shared_registry(), threading.Event(), |
| 289 | + ) |
| 290 | + finally: |
| 291 | + a.close() |
| 292 | + |
| 293 | + |
218 | 294 | if __name__ == '__main__': |
219 | 295 | unittest.main() |
0 commit comments