Skip to content

Commit a7b4d07

Browse files
test: add multiprocess helper test
1 parent 1dc7495 commit a7b4d07

4 files changed

Lines changed: 64 additions & 49 deletions

File tree

influxdb_client_3/write_client/_sync/rest_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ def request(self, method, url, query_params=None, headers=None,
167167
r.data = ''
168168

169169
if not 200 <= r.status <= 299:
170+
print('hahaha')
171+
print(r.data)
170172
raise ApiException(http_resp=r)
171173

172174
return r

influxdb_client_3/write_client/client/util/multiprocessing_helper.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import multiprocessing
99

10+
from influxdb_client_3 import InfluxDBClient3, write_client_options
1011
from influxdb_client_3.write_client import WriteOptions
1112
from influxdb_client_3.exceptions import InfluxDBError
1213

@@ -148,11 +149,22 @@ def write(self, **kwargs) -> None:
148149
def run(self):
149150
"""Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB."""
150151
# Initialize Client and Write API
151-
self.client = InfluxDBClient(**self.kwargs)
152-
self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()),
153-
success_callback=self.kwargs.get('success_callback', _success_callback),
154-
error_callback=self.kwargs.get('error_callback', _error_callback),
155-
retry_callback=self.kwargs.get('retry_callback', _retry_callback))
152+
wco = write_client_options(write_options=self.kwargs.get('write_options', WriteOptions()),
153+
success_callback=self.kwargs.get('success_callback', _success_callback),
154+
error_callback=self.kwargs.get('error_callback', _error_callback),
155+
retry_callback=self.kwargs.get('retry_callback', _retry_callback)
156+
)
157+
158+
# Still need to create the InfluxDBClient3 because the init logics of InfluxDBClient3 will create the WriteApi.
159+
# it will make WriteApi class created properly.
160+
self.client = InfluxDBClient3(write_client_options=wco, **self.kwargs)
161+
162+
# Close and set _query_api to None because query_api is not needed in this process.
163+
# We only need write_api.
164+
self.client._query_api.close()
165+
self.client._query_api = None
166+
167+
self.write_api = self.client._write_api
156168
# Infinite loop - until poison pill
157169
while True:
158170
next_record = self.queue_.get()
@@ -181,7 +193,7 @@ def terminate(self) -> None:
181193
self.write_api.__del__()
182194
self.write_api = None
183195
if self.client:
184-
self.client.__del__()
196+
self.client.close()
185197
self.client = None
186198
logger.info("closed")
187199

influxdb_client_3/write_client/client/write_api.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -965,6 +965,49 @@ def _should_gzip(self, payload: str, enable_gzip: bool = False, gzip_threshold:
965965

966966
return False
967967

968+
@staticmethod
969+
def _on_error(ex):
970+
logger.error("unexpected error during batching: %s", ex)
971+
972+
def _to_response(self, data: _BatchItem, delay: datetime.timedelta):
973+
return rx.of(data).pipe(
974+
ops.subscribe_on(self._write_options.write_scheduler),
975+
# use delay if its specified
976+
ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler),
977+
# invoke http call
978+
ops.map(lambda x: self._http(x, **x.key.kwargs)),
979+
# catch exception to fail batch response
980+
ops.catch(handler=lambda exception, source: rx.just(_BatchResponse(exception=exception, data=data))),
981+
)
982+
983+
def _on_next(self, response: _BatchResponse):
984+
if response.exception:
985+
logger.error("The batch item wasn't processed successfully because: %s", response.exception)
986+
if self._error_callback:
987+
try:
988+
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
989+
except Exception as e:
990+
"""
991+
Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
992+
arbitrary
993+
994+
We trap it, log that it occurred and then proceed - there's not much more that we can
995+
really do.
996+
"""
997+
logger.error("The configured error callback threw an exception: %s", e)
998+
999+
else:
1000+
logger.debug("The batch item: %s was processed successfully.", response)
1001+
if self._success_callback:
1002+
try:
1003+
self._success_callback(response.data.to_key_tuple(), response.data.data)
1004+
except Exception as e:
1005+
logger.error("The configured success callback threw an exception: %s", e)
1006+
1007+
def _on_complete(self):
1008+
self._disposable.dispose()
1009+
logger.debug("the batching processor was disposed")
1010+
9681011
def _append_default_tag(self, key, val, record):
9691012
from influxdb_client_3.write_client import Point
9701013
if isinstance(record, bytes) or isinstance(record, str):
@@ -1049,49 +1092,6 @@ def __del__(self):
10491092
"""Close WriteApi."""
10501093
self.close()
10511094

1052-
@staticmethod
1053-
def _on_error(ex):
1054-
logger.error("unexpected error during batching: %s", ex)
1055-
1056-
def _on_complete(self):
1057-
self._disposable.dispose()
1058-
logger.debug("the batching processor was disposed")
1059-
1060-
def _to_response(self, data: _BatchItem, delay: datetime.timedelta):
1061-
return rx.of(data).pipe(
1062-
ops.subscribe_on(self._write_options.write_scheduler),
1063-
# use delay if its specified
1064-
ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler),
1065-
# invoke http call
1066-
ops.map(lambda x: self._http(x, **x.key.kwargs)),
1067-
# catch exception to fail batch response
1068-
ops.catch(handler=lambda exception, source: rx.just(_BatchResponse(exception=exception, data=data))),
1069-
)
1070-
1071-
def _on_next(self, response: _BatchResponse):
1072-
if response.exception:
1073-
logger.error("The batch item wasn't processed successfully because: %s", response.exception)
1074-
if self._error_callback:
1075-
try:
1076-
self._error_callback(response.data.to_key_tuple(), response.data.data, response.exception)
1077-
except Exception as e:
1078-
"""
1079-
Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
1080-
arbitrary
1081-
1082-
We trap it, log that it occurred and then proceed - there's not much more that we can
1083-
really do.
1084-
"""
1085-
logger.error("The configured error callback threw an exception: %s", e)
1086-
1087-
else:
1088-
logger.debug("The batch item: %s was processed successfully.", response)
1089-
if self._success_callback:
1090-
try:
1091-
self._success_callback(response.data.to_key_tuple(), response.data.data)
1092-
except Exception as e:
1093-
logger.error("The configured success callback threw an exception: %s", e)
1094-
10951095
def __setstate__(self, state):
10961096
"""Set your object with the provided dict."""
10971097
self.__dict__.update(state)

tests/test_influxdb_client_3_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \
1515
WriteType, InfluxDB3ClientQueryError
1616
from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError
17+
from influxdb_client_3.write_client.client.util.multiprocessing_helper import MultiprocessingWriter
1718
from influxdb_client_3.write_client.rest import ApiException
1819
from tests.util import asyncio_run, lp_to_py_object
1920

0 commit comments

Comments
 (0)