Skip to content

Commit e20717d

Browse files
authored
[typedb] forwarding state machine (#115)
* Bump minimum Python version to more closely match localstack core and allow the `match` statement * Use an enum to more explicitly model the ternary forwarding logic * Bump typedb extension version to 0.1.3
1 parent a21e460 commit e20717d

3 files changed

Lines changed: 35 additions & 34 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ You can install the respective extension by calling `localstack extensions insta
7575
| [Miniflare](https://github.com/localstack/localstack-extensions/tree/main/miniflare) | localstack-extension-miniflare | 0.1.0 | Experimental |
7676
| [Stripe](https://github.com/localstack/localstack-extensions/tree/main/stripe) | localstack-extension-stripe | 0.2.0 | Stable |
7777
| [Terraform Init](https://github.com/localstack/localstack-extensions/tree/main/terraform-init) | localstack-extension-terraform-init | 0.2.0 | Experimental |
78-
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.2 | Experimental |
78+
| [TypeDB](https://github.com/localstack/localstack-extensions/tree/main/typedb) | localstack-extension-typedb | 0.1.3 | Experimental |
7979

8080

8181
## Developing Extensions

typedb/localstack_typedb/utils/h2_proxy.py

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import socket
3+
from enum import Enum
34
from typing import Iterable, Callable
45

56
from h2.frame_buffer import FrameBuffer
@@ -63,6 +64,11 @@ def apply_http2_patches_for_grpc_support(
6364
)
6465
patched_connection = True
6566

67+
class ForwardingState(Enum):
68+
UNDECIDED = "undecided"
69+
FORWARDING = "forwarding"
70+
PASSTHROUGH = "passthrough"
71+
6672
class ForwardingBuffer:
6773
"""
6874
A buffer atop the HTTP2 client connection, that will hold
@@ -72,7 +78,7 @@ class ForwardingBuffer:
7278

7379
backend: TcpForwarder
7480
buffer: list
75-
proxying: bool | None
81+
state: ForwardingState
7682

7783
def __init__(self, http_response_stream):
7884
self.http_response_stream = http_response_stream
@@ -81,7 +87,7 @@ def __init__(self, http_response_stream):
8187
)
8288
self.backend = TcpForwarder(target_port, host=target_host)
8389
self.buffer = []
84-
self.proxying = None
90+
self.state = ForwardingState.UNDECIDED
8591
reactor.getThreadPool().callInThread(
8692
self.backend.receive_loop, self.received_from_backend
8793
)
@@ -91,35 +97,30 @@ def received_from_backend(self, data):
9197
self.http_response_stream.write(data)
9298

9399
def received_from_http2_client(self, data, default_handler: Callable):
94-
if self.proxying is False:
95-
# Note: Return here only if `proxying` is `False` (a value of `None` indicates
96-
# that the headers have not fully been received yet)
97-
return default_handler(data)
98-
99-
if self.proxying:
100-
assert not self.buffer
101-
# Keep sending data to the backend for the lifetime of this connection
102-
self.backend.send(data)
103-
return
104-
105-
self.buffer.append(data)
106-
107-
if not (headers := get_headers_from_data_stream(self.buffer)):
108-
# If no headers received yet, then return (method will be called again for next chunk of data)
109-
return
110-
111-
self.proxying = should_proxy_request(headers)
112-
113-
buffered_data = b"".join(self.buffer)
114-
self.buffer = []
115-
116-
if not self.proxying:
117-
# if this is not a target request, then call the default handler
118-
default_handler(buffered_data)
119-
return
120-
121-
LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend")
122-
self.backend.send(buffered_data)
100+
match self.state:
101+
case ForwardingState.PASSTHROUGH:
102+
default_handler(data)
103+
case ForwardingState.FORWARDING:
104+
assert not self.buffer
105+
# Keep sending data to the backend for the lifetime of this connection
106+
self.backend.send(data)
107+
case ForwardingState.UNDECIDED:
108+
self.buffer.append(data)
109+
110+
if headers := get_headers_from_data_stream(self.buffer):
111+
buffered_data = b"".join(self.buffer)
112+
self.buffer = []
113+
114+
if should_proxy_request(headers):
115+
self.state = ForwardingState.FORWARDING
116+
LOG.debug(
117+
f"Forwarding {len(buffered_data)} bytes to backend"
118+
)
119+
self.backend.send(buffered_data)
120+
else:
121+
self.state = ForwardingState.PASSTHROUGH
122+
# if this is not a target request, then call the default handler
123+
default_handler(buffered_data)
123124

124125
def close(self):
125126
self.backend.close()

typedb/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "localstack-extension-typedb"
7-
version = "0.1.2"
7+
version = "0.1.3"
88
description = "LocalStack Extension: TypeDB on LocalStack"
99
readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"}
10-
requires-python = ">=3.9"
10+
requires-python = ">=3.10"
1111
authors = [
1212
{ name = "LocalStack + TypeDB team"}
1313
]

0 commit comments

Comments
 (0)