-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy path_add_header_client_interceptor.py
More file actions
79 lines (61 loc) · 3.05 KB
/
_add_header_client_interceptor.py
File metadata and controls
79 lines (61 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from __future__ import annotations
from typing import Callable
import grpc
from momento.internal.aio._utilities import sanitize_client_call_details
class Header:
once_only_headers = ["agent", "runtime-version"]
def __init__(self, name: str, value: str):
self.name = name
self.value = value
class AddHeaderStreamingClientInterceptor(grpc.aio.UnaryStreamClientInterceptor):
def __init__(self, headers: list[Header]):
self.are_only_once_headers_sent = False
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)
async def intercept_unary_stream(
self,
continuation: Callable[
[grpc.aio._interceptor.ClientCallDetails, grpc.aio._typing.RequestType],
grpc.aio._call.UnaryStreamCall,
],
client_call_details: grpc.aio._interceptor.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryStreamCall | grpc.aio._typing.ResponseType:
new_client_call_details = sanitize_client_call_details(client_call_details)
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)
if not self.are_only_once_headers_sent:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
self.are_only_once_headers_sent = True
return await continuation(new_client_call_details, request)
class AddHeaderClientInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
def __init__(self, headers: list[Header]):
self.are_only_once_headers_sent = False
self._headers_to_add_once: list[Header] = list(
filter(lambda header: header.name in header.once_only_headers, headers)
)
self.headers_to_add_every_time = list(
filter(lambda header: header.name not in header.once_only_headers, headers)
)
async def intercept_unary_unary(
self,
continuation: Callable[
[grpc.aio._interceptor.ClientCallDetails, grpc.aio._typing.RequestType],
grpc.aio._call.UnaryUnaryCall,
],
client_call_details: grpc.aio._interceptor.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryUnaryCall | grpc.aio._typing.ResponseType:
new_client_call_details = sanitize_client_call_details(client_call_details)
for header in self.headers_to_add_every_time:
new_client_call_details.metadata.add(header.name, header.value)
if not self.are_only_once_headers_sent:
for header in self._headers_to_add_once:
new_client_call_details.metadata.add(header.name, header.value)
self.are_only_once_headers_sent = True
return await continuation(new_client_call_details, request)