Skip to content

Commit 7d74438

Browse files
refactor: write rest client
1 parent 9556a0c commit 7d74438

18 files changed

Lines changed: 1362 additions & 2864 deletions

influxdb_client_3/__init__.py

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import importlib.util
2+
import json
23
import os
34
import urllib.parse
45
from typing import Any, List, Literal, Optional, TYPE_CHECKING
56

67
import pyarrow as pa
78

9+
from influxdb_client_3.version import USER_AGENT
10+
from influxdb_client_3.write_client._sync import rest_client as rest
11+
812
if TYPE_CHECKING:
913
import pandas as pd
1014
import polars as pl
@@ -14,7 +18,7 @@
1418
from influxdb_client_3.exceptions import InfluxDBError
1519
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
1620
from influxdb_client_3.read_file import UploadFile
17-
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
21+
from influxdb_client_3.write_client import WriteOptions, Point
1822
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
1923
PointSettings, DefaultWriteOptions, WriteType
2024
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
@@ -189,6 +193,9 @@ def __init__(
189193
org=None,
190194
database=None,
191195
token=None,
196+
auth_scheme=None,
197+
enable_gzip=False,
198+
gzip_threshold=None,
192199
write_client_options=None,
193200
flight_client_options=None,
194201
write_port_overwrite=None,
@@ -229,6 +236,8 @@ def __init__(
229236
:key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
230237
except batching writes. As a default there is no one retry strategy.
231238
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
239+
:key str enable_gzip: TODO ???
240+
:key str gzip_threshold: TODO ???
232241
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
233242
:key bool write_accept_partial: allow partial writes when some lines fail.
234243
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
@@ -293,14 +302,35 @@ def __init__(
293302
if write_port_overwrite is not None:
294303
port = write_port_overwrite
295304

296-
self._client = _InfluxDBClient(
297-
url=f"{scheme}://{hostname}:{port}",
305+
# TODO fix retries
306+
retries = None
307+
308+
# TODO refactor
309+
auth_schema = 'Token' if auth_scheme is None else auth_scheme
310+
default_header = {
311+
'Authorization': f'{auth_schema} {token}',
312+
'User-Agent': USER_AGENT
313+
}
314+
315+
self.rest_client = rest.RestClient(
316+
base_url=f"{scheme}://{hostname}:{port}",
317+
default_header=default_header,
318+
retries=retries)
319+
320+
# TODO point_settings??
321+
# TODO enable_gzip and gzip_threshold be in WriteOptions
322+
# self.base_url = f"{scheme}://{hostname}:{port}"
323+
self._write_api = _WriteApi(
298324
token=self._token,
325+
bucket=self._database,
299326
org=self._org,
327+
gzip_threshold=gzip_threshold,
328+
enable_gzip=enable_gzip,
329+
auth_scheme=auth_scheme,
300330
timeout=write_timeout,
301-
**kwargs)
302-
303-
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
331+
rest_client=self.rest_client,
332+
**self._write_client_options
333+
)
304334

305335
if query_port_overwrite is not None:
306336
port = query_port_overwrite
@@ -658,32 +688,26 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
658688
except ArrowException as e:
659689
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
660690

661-
def get_server_version(self) -> str:
691+
def get_server_version(self) -> Any | None:
662692
"""
663-
Get the version of the connected InfluxDB server.
693+
Get the influxdb_version of the connected InfluxDB server.
664694
665-
This method makes a ping request to the server and extracts the version information
695+
This method makes a ping request to the server and extracts the influxdb_version information
666696
from either the response headers or response body.
667697
668-
:return: The version string of the InfluxDB server.
698+
:return: The influxdb_version string of the InfluxDB server.
669699
:rtype: str
670700
"""
671-
version = None
672-
(resp_body, _, header) = self._client.api_client.call_api(
673-
resource_path="/ping",
674-
method="GET",
675-
response_type=object
676-
)
677-
678-
for key, value in header.items():
701+
resp = self.rest_client.request(url='/ping', method="GET")
702+
for key, value in resp.getheaders().items():
679703
if key.lower() == "x-influxdb-version":
680-
version = value
681-
break
682-
683-
if version is None and isinstance(resp_body, dict):
684-
version = resp_body['version']
704+
return value
685705

686-
return version
706+
# TODO refactor this
707+
string = resp.get_string_body()
708+
if string is not None:
709+
return json.loads(string)['version']
710+
return None
687711

688712
def flush(self):
689713
"""
@@ -702,7 +726,6 @@ def close(self):
702726
"""Close the client and clean up resources."""
703727
self._write_api.close()
704728
self._query_api.close()
705-
self._client.close()
706729

707730
def __enter__(self):
708731
return self

influxdb_client_3/write_client/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,9 @@
44

55
from __future__ import absolute_import
66

7-
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
8-
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
9-
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
7+
from influxdb_client_3.version import VERSION
108
from influxdb_client_3.write_client.client.write.point import Point
11-
12-
from influxdb_client_3.write_client.service.write_service import WriteService
13-
9+
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
1410
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1511

16-
from influxdb_client_3.write_client.configuration import Configuration
17-
from influxdb_client_3.version import VERSION
1812
__version__ = VERSION

0 commit comments

Comments
 (0)