forked from dapr/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
191 lines (156 loc) · 6.38 KB
/
Copy pathhelpers.py
File metadata and controls
191 lines (156 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
from urllib.parse import ParseResult, parse_qs, urlparse
from warnings import warn
class URIParseConfig:
DEFAULT_SCHEME = 'dns'
DEFAULT_HOSTNAME = 'localhost'
DEFAULT_PORT = 443
DEFAULT_AUTHORITY = ''
ACCEPTED_SCHEMES = ['dns', 'unix', 'unix-abstract', 'vsock', 'http', 'https']
class GrpcEndpoint:
_scheme: str
_hostname: str
_port: int
_tls: bool
_authority: str
_url: str
_parsed_url: ParseResult # from urllib.parse
_endpoint: str
def __init__(self, url: str):
self._authority = URIParseConfig.DEFAULT_AUTHORITY
self._url = url
self._parsed_url = urlparse(self._preprocess_uri(url))
self._validate_path_and_query()
self._set_tls()
self._set_hostname()
self._set_scheme()
self._set_port()
self._set_endpoint()
def _set_scheme(self):
if len(self._parsed_url.scheme) == 0:
self._scheme = URIParseConfig.DEFAULT_SCHEME
return
if self._parsed_url.scheme in ['http', 'https']:
self._scheme = URIParseConfig.DEFAULT_SCHEME
warn(
'http and https schemes are deprecated for grpc, use myhost?tls=false or myhost?tls=true instead'
)
return
if self._parsed_url.scheme not in URIParseConfig.ACCEPTED_SCHEMES:
raise ValueError(f"invalid scheme '{self._parsed_url.scheme}' in URL '{self._url}'")
self._scheme = self._parsed_url.scheme
@property
def scheme(self) -> str:
return self._scheme
def _set_hostname(self):
if self._parsed_url.hostname is None:
self._hostname = URIParseConfig.DEFAULT_HOSTNAME
return
if self._parsed_url.hostname.count(':') == 7:
# IPv6 address
self._hostname = f'[{self._parsed_url.hostname}]'
return
self._hostname = self._parsed_url.hostname
@property
def hostname(self) -> str:
return self._hostname
def _set_port(self):
if self._parsed_url.scheme in ['unix', 'unix-abstract']:
self._port = 0
return
if self._parsed_url.port is None:
self._port = URIParseConfig.DEFAULT_PORT
return
self._port = self._parsed_url.port
@property
def port(self) -> str:
if self._port == 0:
return ''
return str(self._port)
@property
def port_as_int(self) -> int:
return self._port
def _set_endpoint(self):
port = '' if not self._port else f':{self.port}'
if self._scheme == 'unix':
separator = '://' if self._url.startswith('unix://') else ':'
self._endpoint = f'{self._scheme}{separator}{self._hostname}'
return
if self._scheme == 'vsock':
self._endpoint = f'{self._scheme}:{self._hostname}:{self.port}'
return
if self._scheme == 'unix-abstract':
self._endpoint = f'{self._scheme}:{self._hostname}{port}'
return
if self._scheme == 'dns':
authority = f'//{self._authority}/' if self._authority else ''
self._endpoint = f'{self._scheme}:{authority}{self._hostname}{port}'
return
self._endpoint = f'{self._scheme}:{self._hostname}{port}'
@property
def endpoint(self) -> str:
return self._endpoint
# Prepares the uri string in a specific format for parsing by the urlparse function
def _preprocess_uri(self, url: str) -> str:
url_list = url.split(':')
if len(url_list) == 3 and '://' not in url:
# A URI like dns:mydomain:5000 or vsock:mycid:5000 was used
url = url.replace(':', '://', 1)
elif (
len(url_list) >= 2
and '://' not in url
and url_list[0] in URIParseConfig.ACCEPTED_SCHEMES
):
# A URI like dns:mydomain or dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
# Possibly a URI like dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
url = url.replace(':', '://', 1)
else:
url_list = url.split('://')
if len(url_list) == 1:
# If a scheme was not explicitly specified in the URL
# we need to add a default scheme,
# because of how urlparse works
url = f'{URIParseConfig.DEFAULT_SCHEME}://{url}'
else:
# If a scheme was explicitly specified in the URL
# we need to make sure it is a valid scheme
scheme = url_list[0]
if scheme not in URIParseConfig.ACCEPTED_SCHEMES:
raise ValueError(f"invalid scheme '{scheme}' in URL '{url}'")
# We should do a special check if the scheme is dns, and it uses
# an authority in the format of dns:[//authority/]host[:port]
if scheme.lower() == 'dns':
# A URI like dns://authority/mydomain was used
url_list = url.split('/')
if len(url_list) < 4:
raise ValueError(f"invalid dns authority '{url_list[2]}' in URL '{url}'")
self._authority = url_list[2]
url = f'dns://{url_list[3]}'
return url
def _set_tls(self):
query_dict = parse_qs(self._parsed_url.query)
tls_str = query_dict.get('tls', [''])[0]
tls = tls_str.lower() == 'true'
if self._parsed_url.scheme == 'https':
tls = True
self._tls = tls
@property
def tls(self) -> bool:
return self._tls
def _validate_path_and_query(self) -> None:
if self._parsed_url.path:
raise ValueError(
f"paths are not supported for gRPC endpoints: '{self._parsed_url.path}'"
)
if self._parsed_url.query:
query_dict = parse_qs(self._parsed_url.query)
if 'tls' in query_dict and self._parsed_url.scheme in ['http', 'https']:
raise ValueError(
f'the tls query parameter is not supported for http(s) endpoints: '
f"'{self._parsed_url.query}'"
)
query_dict.pop('tls', None)
if query_dict:
raise ValueError(
f'query parameters are not supported for gRPC endpoints:'
f" '{self._parsed_url.query}'"
)