Skip to content

Commit 5548816

Browse files
refactor: write rest client
1 parent 6a568ae commit 5548816

4 files changed

Lines changed: 27 additions & 18 deletions

File tree

influxdb_client_3/__init__.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import multiprocessing
2+
13
import importlib.util
24
import json
35
import os
@@ -326,7 +328,8 @@ def __init__(
326328
ssl_context=kwargs.get('ssl_context', None),
327329
proxy=kwargs.get('proxy', None),
328330
proxy_headers=kwargs.get('proxy_headers', None),
329-
retries=kwargs.get('retries', False)
331+
retries=kwargs.get('retries', False),
332+
connection_pool_maxsize=kwargs.get('connection_pool_maxsize', multiprocessing.cpu_count() * 5,)
330333
)
331334

332335
if point_settings is None:
@@ -721,9 +724,13 @@ def get_server_version(self) -> Optional[str]:
721724
return value
722725

723726
string_body = resp.get_string_body()
724-
if string_body is not None:
725-
return json.loads(string_body)['version']
726-
return None
727+
if not string_body:
728+
return None
729+
try:
730+
data = json.loads(string_body)
731+
except (ValueError, TypeError):
732+
return None
733+
return data.get('version')
727734

728735
def flush(self):
729736
"""
@@ -744,6 +751,8 @@ def close(self):
744751
self._write_api.close()
745752
if self._query_api is not None:
746753
self._query_api.close()
754+
if self.rest_client is not None:
755+
self.rest_client.close()
747756

748757
def __enter__(self):
749758
return self

influxdb_client_3/write_client/_sync/rest_client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,19 @@ def request(self, method, url, query_params=None, headers=None,
155155
**urlopen_kw)
156156

157157
r = RESTResponse(r)
158-
r.data = r.data.decode('utf8')
158+
if isinstance(r.data, (bytes, bytearray)):
159+
r.data = r.data.decode('utf-8')
160+
elif r.data is None:
161+
r.data = ''
159162

160163
if not 200 <= r.status <= 299:
161-
print('hahaha')
162-
print(r.data)
163164
raise ApiException(http_resp=r)
164165

165166
return r
166167

168+
def close(self):
169+
self.pool_manager.close()
170+
167171
def __getstate__(self):
168172
"""Return a dict of attributes that you want to pickle."""
169173
state = self.__dict__.copy()

influxdb_client_3/write_client/client/write_api.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def __init__(self,
319319
:param enable_gzip: Boolean indicating whether GZIP compression is enabled.
320320
:param auth_scheme: The authentication scheme to use for the connection
321321
(e.g., Basic, Bearer).
322-
:param timeout: Timeout duration (in seconds) for HTTP requests.
322+
:param timeout: Timeout duration (in milliseconds) for HTTP requests.
323323
:param pool_threads: Number of threads used for connection pools.
324324
:param default_header: Default HTTP headers to include in every request.
325325
:param rest_client: An instance of a RestClient for internal HTTP communication.
@@ -536,6 +536,10 @@ def flush(self):
536536

537537
def close(self):
538538
"""Flush data and dispose a batching buffer."""
539+
if self._pool is not None:
540+
self._pool.close()
541+
self._pool.join()
542+
self._pool = None
539543
if self._subject is None:
540544
return # Already closed
541545

@@ -1088,14 +1092,6 @@ def _on_next(self, response: _BatchResponse):
10881092
except Exception as e:
10891093
logger.error("The configured success callback threw an exception: %s", e)
10901094

1091-
def __getstate__(self):
1092-
"""Return a dict of attributes that you want to pickle."""
1093-
state = self.__dict__.copy()
1094-
# Remove rx
1095-
del state['_subject']
1096-
del state['_disposable']
1097-
return state
1098-
10991095
def __setstate__(self, state):
11001096
"""Set your object with the provided dict."""
11011097
self.__dict__.update(state)

tests/test_write_api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def test_api_error_headers(self):
249249
)
250250
)
251251
with self.assertRaises(InfluxDBError) as err:
252-
client._write_api.write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14")
252+
client._write_api.write("TEST_BUCKET", "TEST_ORG", "data,foo=bar val=3.14")
253253
self.assertEqual(body_dic['error'], err.exception.message)
254254
headers = err.exception.getheaders()
255255
self.assertEqual(4, len(headers))
@@ -290,7 +290,7 @@ def test_request_arg_timeout(self, mock_request):
290290
)
291291

292292
with pytest.raises(ConnectTimeoutError):
293-
client._write_api.write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14",
293+
client._write_api.write("TEST_BUCKET", "TEST_ORG", "data,foo=bar val=3.14",
294294
_request_timeout=100)
295295
self.assertEqual(0.1, self.received_timeout_total)
296296
self.received_timeout_total = None

0 commit comments

Comments
 (0)