Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/momento/internal/aio/_add_header_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import grpc

from momento.internal._utilities import ClientType
from momento.internal.aio._utilities import sanitize_client_call_details


Expand All @@ -16,15 +17,16 @@ def __init__(self, name: str, value: str):


class AddHeaderStreamingClientInterceptor(grpc.aio.UnaryStreamClientInterceptor):
are_only_once_headers_sent = False
client_types_that_sent_only_once_headers: set[ClientType] = set()

def __init__(self, headers: list[Header]):
def __init__(self, headers: list[Header], client_type: ClientType):
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)
self.client_type = client_type

async def intercept_unary_stream(
self,
Expand All @@ -40,24 +42,25 @@ async def intercept_unary_stream(
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)

if not AddHeaderStreamingClientInterceptor.are_only_once_headers_sent:
if self.client_type not in AddHeaderStreamingClientInterceptor.client_types_that_sent_only_once_headers:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
AddHeaderStreamingClientInterceptor.are_only_once_headers_sent = True
AddHeaderStreamingClientInterceptor.client_types_that_sent_only_once_headers.add(self.client_type)

return await continuation(new_client_call_details, request)


class AddHeaderClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
are_only_once_headers_sent = False
client_types_that_sent_only_once_headers: set[ClientType] = set()

def __init__(self, headers: list[Header]):
def __init__(self, headers: list[Header], client_type: ClientType):
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)
self.client_type = client_type

async def intercept_unary_unary(
self,
Expand All @@ -73,9 +76,9 @@ async def intercept_unary_unary(
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)

if not AddHeaderClientInterceptor.are_only_once_headers_sent:
if self.client_type not in AddHeaderClientInterceptor.client_types_that_sent_only_once_headers:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
AddHeaderClientInterceptor.are_only_once_headers_sent = True
AddHeaderClientInterceptor.client_types_that_sent_only_once_headers.add(self.client_type)

return await continuation(new_client_call_details, request)
4 changes: 2 additions & 2 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def _interceptors(
filter(
None,
[
AddHeaderClientInterceptor(headers),
AddHeaderClientInterceptor(headers, client_type),
RetryInterceptor(retry_strategy) if retry_strategy else None,
MiddlewareInterceptor(middleware, context) if middleware else None,
],
Expand All @@ -297,4 +297,4 @@ def _stream_interceptors(auth_token: str, client_type: ClientType) -> list[grpc.
Header("agent", f"python:{client_type.value}:{momento_version}"),
Header("runtime-version", f"python {PYTHON_RUNTIME_VERSION}"),
]
return [AddHeaderStreamingClientInterceptor(headers)]
return [AddHeaderStreamingClientInterceptor(headers, client_type)]
19 changes: 11 additions & 8 deletions src/momento/internal/synchronous/_add_header_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import grpc

from momento.internal._utilities import ClientType
from momento.internal.synchronous._utilities import sanitize_client_call_details

RequestType = TypeVar("RequestType")
Expand All @@ -19,15 +20,16 @@ def __init__(self, name: str, value: str):


class AddHeaderStreamingClientInterceptor(grpc.UnaryStreamClientInterceptor):
are_only_once_headers_sent = False
client_types_that_sent_only_once_headers: set[ClientType] = set()

def __init__(self, headers: list[Header]):
def __init__(self, headers: list[Header], client_type: ClientType):
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)
self.client_type = client_type

def intercept_unary_stream(
self,
Expand All @@ -43,16 +45,16 @@ def intercept_unary_stream(
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.append((header.name, header.value))

if not AddHeaderStreamingClientInterceptor.are_only_once_headers_sent:
if self.client_type not in AddHeaderStreamingClientInterceptor.client_types_that_sent_only_once_headers:
for header in self._headers_to_add_once:
new_client_call_details.metadata.append((header.name, header.value))
AddHeaderStreamingClientInterceptor.are_only_once_headers_sent = True
AddHeaderStreamingClientInterceptor.client_types_that_sent_only_once_headers.add(self.client_type)

return continuation(new_client_call_details, request)


class AddHeaderClientInterceptor(grpc.UnaryUnaryClientInterceptor):
are_only_once_headers_sent = False
client_types_that_sent_only_once_headers: set[ClientType] = set()

@staticmethod
def is_only_once_header(header: Header) -> bool:
Expand All @@ -62,9 +64,10 @@ def is_only_once_header(header: Header) -> bool:
def is_not_only_once_header(header: Header) -> bool:
return header.name not in header.once_only_headers

def __init__(self, headers: list[Header]):
def __init__(self, headers: list[Header], client_type: ClientType):
self._headers_to_add_once: list[Header] = list(filter(AddHeaderClientInterceptor.is_only_once_header, headers))
self.headers_to_add_every_time = list(filter(AddHeaderClientInterceptor.is_not_only_once_header, headers))
self.client_type = client_type

def intercept_unary_unary(
self,
Expand All @@ -77,9 +80,9 @@ def intercept_unary_unary(
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.append((header.name, header.value))

if not AddHeaderClientInterceptor.are_only_once_headers_sent:
if self.client_type not in AddHeaderClientInterceptor.client_types_that_sent_only_once_headers:
for header in self._headers_to_add_once:
new_client_call_details.metadata.append((header.name, header.value))
AddHeaderClientInterceptor.are_only_once_headers_sent = True
AddHeaderClientInterceptor.client_types_that_sent_only_once_headers.add(self.client_type)

return continuation(new_client_call_details, request)
4 changes: 2 additions & 2 deletions src/momento/internal/synchronous/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _interceptors(
filter(
None,
[
AddHeaderClientInterceptor(headers),
AddHeaderClientInterceptor(headers, client_type),
RetryInterceptor(retry_strategy) if retry_strategy else None,
MiddlewareInterceptor(middleware, context) if middleware else None,
],
Expand All @@ -309,4 +309,4 @@ def _stream_interceptors(auth_token: str, client_type: ClientType) -> list[grpc.
Header("agent", f"python:{client_type.value}:{momento_version}"),
Header("runtime-version", f"python {PYTHON_RUNTIME_VERSION}"),
]
return [AddHeaderStreamingClientInterceptor(headers)]
return [AddHeaderStreamingClientInterceptor(headers, client_type)]
Loading