Skip to content

Commit 1dc7495

Browse files
refactor: write rest client
1 parent a022daa commit 1dc7495

20 files changed

Lines changed: 1007 additions & 2659 deletions

influxdb_client_3/__init__.py

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import multiprocessing
2+
13
import importlib.util
4+
import json
25
import os
36
import urllib.parse
47
from typing import Any, List, Literal, Optional, TYPE_CHECKING
58

69
import pyarrow as pa
710

11+
from influxdb_client_3.version import USER_AGENT
12+
from influxdb_client_3.write_client._sync import rest_client as rest
13+
814
if TYPE_CHECKING:
915
import pandas as pd
1016
import polars as pl
@@ -14,7 +20,7 @@
1420
from influxdb_client_3.exceptions import InfluxDBError
1521
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
1622
from influxdb_client_3.read_file import UploadFile
17-
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
23+
from influxdb_client_3.write_client import WriteOptions, Point
1824
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
1925
PointSettings, DefaultWriteOptions, WriteType
2026
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
@@ -189,11 +195,15 @@ def __init__(
189195
org=None,
190196
database=None,
191197
token=None,
198+
auth_scheme=None,
199+
enable_gzip=False,
200+
gzip_threshold=None,
192201
write_client_options=None,
193202
flight_client_options=None,
194203
write_port_overwrite=None,
195204
query_port_overwrite=None,
196205
disable_grpc_compression=False,
206+
point_settings=None,
197207
**kwargs):
198208
"""
199209
Initialize an InfluxDB client.
@@ -212,6 +222,12 @@ def __init__(
212222
:type flight_client_options: dict[str, any]
213223
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
214224
:type disable_grpc_compression: bool
225+
:param point_settings The settings for Points
226+
:type point_settings: PointSettings
227+
:param enable_gzip: Enable GZIP compression for write requests.
228+
:type enable_gzip: bool
229+
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
230+
:type gzip_threshold: int
215231
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
216232
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
217233
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
@@ -293,14 +309,45 @@ def __init__(
293309
if write_port_overwrite is not None:
294310
port = write_port_overwrite
295311

296-
self._client = _InfluxDBClient(
297-
url=f"{scheme}://{hostname}:{port}",
312+
auth_schema = 'Token' if auth_scheme is None else auth_scheme
313+
default_header = {
314+
'User-Agent': USER_AGENT
315+
}
316+
if self._token is not None:
317+
default_header['Authorization'] = f'{auth_schema} {self._token}'
318+
self.base_url = f"{scheme}://{hostname}:{port}"
319+
self.default_header = default_header
320+
self.rest_client = rest.RestClient(
321+
base_url=self.base_url,
322+
default_header=default_header,
323+
verify_ssl=kwargs.get('verify_ssl', True),
324+
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
325+
cert_file=kwargs.get('cert_file', None),
326+
cert_key_file=kwargs.get('cert_key_file', None),
327+
cert_key_password=kwargs.get('cert_key_password', None),
328+
ssl_context=kwargs.get('ssl_context', None),
329+
proxy=kwargs.get('proxy', None),
330+
proxy_headers=kwargs.get('proxy_headers', None),
331+
retries=kwargs.get('retries', False),
332+
connection_pool_maxsize=kwargs.get('connection_pool_maxsize', multiprocessing.cpu_count() * 5,)
333+
)
334+
335+
if point_settings is None:
336+
point_settings = PointSettings()
337+
338+
self._write_api = _WriteApi(
298339
token=self._token,
340+
bucket=self._database,
299341
org=self._org,
342+
gzip_threshold=gzip_threshold,
343+
enable_gzip=enable_gzip,
344+
auth_scheme=auth_scheme,
300345
timeout=write_timeout,
301-
**kwargs)
302-
303-
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
346+
default_header=default_header,
347+
rest_client=self.rest_client,
348+
point_settings=point_settings,
349+
**self._write_client_options
350+
)
304351

305352
if query_port_overwrite is not None:
306353
port = query_port_overwrite
@@ -658,32 +705,32 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
658705
except ArrowException as e:
659706
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
660707

661-
def get_server_version(self) -> str:
708+
def get_server_version(self) -> Optional[str]:
662709
"""
663-
Get the version of the connected InfluxDB server.
710+
Retrieves the server version by querying the designated endpoint and
711+
extracting the version information from either response headers or
712+
the response body.
664713
665-
This method makes a ping request to the server and extracts the version information
666-
from either the response headers or response body.
714+
This method interacts with a REST API endpoint to fetch the server's
715+
version details, which might be stored in a specific HTTP header or
716+
available in the response body as part of a JSON structure.
667717
668-
:return: The version string of the InfluxDB server.
669-
:rtype: str
718+
:return: The version string of the server if available, otherwise None.
719+
:rtype: Optional[str]
670720
"""
671-
version = None
672-
(resp_body, _, header) = self._client.api_client.call_api(
673-
resource_path="/ping",
674-
method="GET",
675-
response_type=object
676-
)
677-
678-
for key, value in header.items():
721+
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
722+
for key, value in resp.getheaders().items():
679723
if key.lower() == "x-influxdb-version":
680-
version = value
681-
break
682-
683-
if version is None and isinstance(resp_body, dict):
684-
version = resp_body['version']
724+
return value
685725

686-
return version
726+
string_body = resp.get_string_body()
727+
if not string_body:
728+
return None
729+
try:
730+
data = json.loads(string_body)
731+
except (ValueError, TypeError):
732+
return None
733+
return data.get('version')
687734

688735
def flush(self):
689736
"""
@@ -700,9 +747,12 @@ def flush(self):
700747

701748
def close(self):
702749
"""Close the client and clean up resources."""
703-
self._write_api.close()
704-
self._query_api.close()
705-
self._client.close()
750+
if self._write_api is not None:
751+
self._write_api.close()
752+
if self._query_api is not None:
753+
self._query_api.close()
754+
if self.rest_client is not None:
755+
self.rest_client.close()
706756

707757
def __enter__(self):
708758
return self

influxdb_client_3/write_client/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,9 @@
44

55
from __future__ import absolute_import
66

7-
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
8-
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
9-
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
7+
from influxdb_client_3.version import VERSION
108
from influxdb_client_3.write_client.client.write.point import Point
11-
12-
from influxdb_client_3.write_client.service.write_service import WriteService
13-
9+
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
1410
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1511

16-
from influxdb_client_3.write_client.configuration import Configuration
17-
from influxdb_client_3.version import VERSION
1812
__version__ = VERSION

0 commit comments

Comments
 (0)