Skip to content

Commit 4d42937

Browse files
toddbaertleakonvalinka
authored andcommitted
todds changes
Signed-off-by: Lea Konvalinka <lea.konvalinka@dynatrace.com>
1 parent 16956f5 commit 4d42937

3 files changed

Lines changed: 94 additions & 34 deletions

File tree

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(
6868
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6969
self.deadline = config.deadline_ms * 0.001
7070
self.connected = False
71+
self._is_fatal = False
7172
self.channel = self._generate_channel(config)
7273
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
7374

@@ -153,9 +154,14 @@ def connect(self) -> None:
153154
## block until ready or deadline reached
154155
timeout = self.deadline + time.monotonic()
155156
while not self.connected and time.monotonic() < timeout:
157+
if self._is_fatal:
158+
raise ProviderFatalError("fatal gRPC status code")
156159
time.sleep(0.05)
157160
logger.debug("Finished blocking gRPC state initialization")
158161

162+
if self._is_fatal:
163+
raise ProviderFatalError("fatal gRPC status code")
164+
159165
if not self.connected:
160166
raise ProviderNotReadyError(
161167
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
@@ -166,6 +172,8 @@ def monitor(self) -> None:
166172

167173
def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
168174
logger.debug(f"gRPC state change: {new_state}")
175+
if self._is_fatal:
176+
return
169177
if (
170178
new_state == grpc.ChannelConnectivity.READY
171179
or new_state == grpc.ChannelConnectivity.IDLE
@@ -197,6 +205,8 @@ def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
197205
self.connected = False
198206

199207
def emit_error(self) -> None:
208+
if self._is_fatal:
209+
return
200210
logger.debug("gRPC error emitted")
201211
if self.cache:
202212
self.cache.clear()
@@ -238,7 +248,15 @@ def listen(self) -> None:
238248
# although it seems like this error log is not interesting, without it, the retry is not working as expected
239249
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
240250
if e.code().name in self.config.fatal_status_codes:
241-
raise ProviderFatalError("fatal error") from e
251+
self._is_fatal = True
252+
self.active = False
253+
self.emit_provider_error(
254+
ProviderEventDetails(
255+
message=f"Fatal gRPC status code: {e.code()}",
256+
error_code=ErrorCode.PROVIDER_FATAL,
257+
)
258+
)
259+
return
242260
except ParseError:
243261
logger.exception(
244262
f"Could not parse flag data using flagd syntax: {message=}"

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(
5555
self.emit_provider_stale = emit_provider_stale
5656

5757
self.connected = False
58+
self._is_fatal = False
5859
self.thread: typing.Optional[threading.Thread] = None
5960
self.timer: typing.Optional[threading.Timer] = None
6061

@@ -137,9 +138,14 @@ def connect(self) -> None:
137138
## block until ready or deadline reached
138139
timeout = self.deadline + time.monotonic()
139140
while not self.connected and time.monotonic() < timeout:
141+
if self._is_fatal:
142+
raise ProviderFatalError("fatal gRPC status code")
140143
time.sleep(0.05)
141144
logger.debug("Finished blocking gRPC state initialization")
142145

146+
if self._is_fatal:
147+
raise ProviderFatalError("fatal gRPC status code")
148+
143149
if not self.connected:
144150
raise ProviderNotReadyError(
145151
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
@@ -150,6 +156,8 @@ def monitor(self) -> None:
150156

151157
def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
152158
logger.debug(f"gRPC state change: {new_state}")
159+
if self._is_fatal:
160+
return
153161
if (
154162
new_state == grpc.ChannelConnectivity.READY
155163
or new_state == grpc.ChannelConnectivity.IDLE
@@ -181,6 +189,8 @@ def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
181189
self.connected = False
182190

183191
def emit_error(self) -> None:
192+
if self._is_fatal:
193+
return
184194
logger.debug("gRPC error emitted")
185195
self.emit_provider_error(
186196
ProviderEventDetails(
@@ -274,7 +284,16 @@ def listen(self) -> None:
274284
return
275285
except grpc.RpcError as e: # noqa: PERF203
276286
logger.warning(f"SyncFlags stream error, {e.code()=} {e.details()=}")
277-
self._raise_on_fatal_status_code(e)
287+
if e.code().name in self.config.fatal_status_codes:
288+
self._is_fatal = True
289+
self.active = False
290+
self.emit_provider_error(
291+
ProviderEventDetails(
292+
message=f"Fatal gRPC status code: {e.code()}",
293+
error_code=ErrorCode.PROVIDER_FATAL,
294+
)
295+
)
296+
return
278297
except json.JSONDecodeError:
279298
logger.exception(
280299
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
@@ -293,8 +312,3 @@ def generate_grpc_call_args(self) -> GrpcMultiCallableArgs:
293312
if metadata is not None:
294313
call_args["metadata"] = metadata
295314
return call_args
296-
297-
def _raise_on_fatal_status_code(self, e: grpc.RpcError) -> None:
298-
if e.code().name in self.config.fatal_status_codes:
299-
logger.error(f"Fatal gRPC status code received: {e.code()}")
300-
raise ProviderFatalError(f"Fatal gRPC status code: {e.code()}") from e
Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,98 @@
1-
import os.path
1+
import logging
2+
import os
23
import time
34
import typing
45
from pathlib import Path
56

67
import grpc
78
from grpc_health.v1 import health_pb2, health_pb2_grpc
8-
from testcontainers.core.container import DockerContainer
9-
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
9+
from testcontainers.compose import DockerCompose
1010

1111
from openfeature.contrib.provider.flagd.config import ResolverType
1212

13+
logger = logging.getLogger(__name__)
14+
1315
HEALTH_CHECK = 8014
1416
LAUNCHPAD = 8080
1517
FORBIDDEN = 9212
1618

1719

18-
class FlagdContainer(DockerContainer):
20+
class FlagdContainer:
21+
"""Manages the docker-compose environment for flagd e2e tests.
22+
23+
Uses docker-compose to start both flagd and envoy containers,
24+
so the envoy forbidden endpoint (port 9212) returns a proper HTTP 403.
25+
"""
26+
1927
def __init__(
2028
self,
2129
feature: typing.Optional[str] = None,
2230
**kwargs,
2331
) -> None:
32+
self._test_harness_dir = (
33+
Path(__file__).parents[2] / "openfeature" / "test-harness"
34+
)
35+
self._version = (self._test_harness_dir / "version.txt").read_text().rstrip()
36+
2437
image: str = "ghcr.io/open-feature/flagd-testbed"
2538
if feature is not None:
2639
image = f"{image}-{feature}"
27-
path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt"
28-
data = path.read_text().rstrip()
29-
super().__init__(f"{image}:v{data}", **kwargs)
30-
self.rpc = 8013
31-
self.ipr = 8015
40+
3241
self.flagDir = Path("./flags")
3342
self.flagDir.mkdir(parents=True, exist_ok=True)
34-
self.with_exposed_ports(self.rpc, self.ipr, HEALTH_CHECK, LAUNCHPAD, FORBIDDEN)
35-
self.with_volume_mapping(os.path.abspath(self.flagDir.name), "/flags", "rw")
36-
self.waiting_for(LogMessageWaitStrategy("listening").with_startup_timeout(5))
3743

38-
def get_port(self, resolver_type: ResolverType):
44+
# Set environment variables for docker-compose substitution
45+
os.environ["IMAGE"] = image
46+
os.environ["VERSION"] = f"v{self._version}"
47+
os.environ["FLAGS_DIR"] = str(self.flagDir.absolute())
48+
49+
self._compose = DockerCompose(
50+
context=str(self._test_harness_dir),
51+
compose_file_name="docker-compose.yaml",
52+
wait=True,
53+
)
54+
55+
def get_port(self, resolver_type: ResolverType) -> int:
3956
if resolver_type == ResolverType.RPC:
40-
return self.get_exposed_port(self.rpc)
57+
return self._compose.get_service_port("flagd", 8013)
4158
else:
42-
return self.get_exposed_port(self.ipr)
59+
return self._compose.get_service_port("flagd", 8015)
4360

44-
def get_launchpad_url(self):
45-
return f"http://localhost:{self.get_exposed_port(LAUNCHPAD)}"
61+
def get_exposed_port(self, port: int) -> int:
62+
"""Get mapped port. For FORBIDDEN (9212) returns envoy port, otherwise flagd port."""
63+
if port == FORBIDDEN:
64+
return self._compose.get_service_port("envoy", FORBIDDEN)
65+
return self._compose.get_service_port("flagd", port)
66+
67+
def get_launchpad_url(self) -> str:
68+
port = self._compose.get_service_port("flagd", LAUNCHPAD)
69+
return f"http://localhost:{port}"
4670

4771
def start(self) -> "FlagdContainer":
48-
super().start()
49-
self._checker(self.get_container_host_ip(), self.get_exposed_port(HEALTH_CHECK))
72+
self._compose.start()
73+
host = self._compose.get_service_host("flagd", HEALTH_CHECK) or "localhost"
74+
port = self._compose.get_service_port("flagd", HEALTH_CHECK)
75+
self._checker(host, port)
5076
return self
5177

78+
def stop(self) -> None:
79+
self._compose.stop()
80+
5281
def _checker(self, host: str, port: int) -> None:
5382
# Give an extra second before continuing
5483
time.sleep(1)
55-
# Second we use the GRPC health check endpoint
56-
with grpc.insecure_channel(host + ":" + str(port)) as channel:
84+
# Use the GRPC health check endpoint
85+
with grpc.insecure_channel(f"{host}:{port}") as channel:
5786
health_stub = health_pb2_grpc.HealthStub(channel)
5887

5988
def health_check_call(stub: health_pb2_grpc.HealthStub):
6089
request = health_pb2.HealthCheckRequest()
61-
resp = stub.Check(request)
62-
if resp.status == health_pb2.HealthCheckResponse.SERVING:
63-
return True
64-
elif resp.status == health_pb2.HealthCheckResponse.NOT_SERVING:
90+
try:
91+
resp = stub.Check(request)
92+
return resp.status == health_pb2.HealthCheckResponse.SERVING
93+
except Exception:
6594
return False
6695

67-
# Should succeed
6896
# Check health status every 1 second for 30 seconds
6997
ok = False
7098
for _ in range(30):
@@ -74,4 +102,4 @@ def health_check_call(stub: health_pb2_grpc.HealthStub):
74102
time.sleep(1)
75103

76104
if not ok:
77-
raise ConnectionError("flagD not ready in time")
105+
raise ConnectionError("flagd not ready in time")

0 commit comments

Comments
 (0)