11import importlib .util
2+ import json
23import os
34import urllib .parse
45from typing import Any , List , Literal , Optional , TYPE_CHECKING
56
67import pyarrow as pa
78
9+ from influxdb_client_3 .version import USER_AGENT
10+ from influxdb_client_3 .write_client ._sync import rest_client as rest
11+
812if TYPE_CHECKING :
913 import pandas as pd
1014 import polars as pl
1418from influxdb_client_3 .exceptions import InfluxDBError
1519from influxdb_client_3 .query .query_api import QueryApi as _QueryApi , QueryApiOptionsBuilder
1620from influxdb_client_3 .read_file import UploadFile
17- from influxdb_client_3 .write_client import InfluxDBClient as _InfluxDBClient , WriteOptions , Point
21+ from influxdb_client_3 .write_client import WriteOptions , Point
1822from influxdb_client_3 .write_client .client .write_api import WriteApi as _WriteApi , SYNCHRONOUS , ASYNCHRONOUS , \
1923 PointSettings , DefaultWriteOptions , WriteType
2024from influxdb_client_3 .write_client .domain .write_precision import WritePrecision
@@ -189,6 +193,9 @@ def __init__(
189193 org = None ,
190194 database = None ,
191195 token = None ,
196+ auth_scheme = None ,
197+ enable_gzip = False ,
198+ gzip_threshold = None ,
192199 write_client_options = None ,
193200 flight_client_options = None ,
194201 write_port_overwrite = None ,
@@ -229,6 +236,8 @@ def __init__(
229236 :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
230237 except batching writes. As a default there is no one retry strategy.
231238 :key str query_timeout: int value used to set the client query API timeout in milliseconds.
239+ :key str enable_gzip: TODO ???
240+ :key str gzip_threshold: TODO ???
232241 :key str write_timeout: int value used to set the client write API timeout in milliseconds.
233242 :key bool write_accept_partial: allow partial writes when some lines fail.
234243 :key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
@@ -293,14 +302,36 @@ def __init__(
293302 if write_port_overwrite is not None :
294303 port = write_port_overwrite
295304
296- self ._client = _InfluxDBClient (
297- url = f"{ scheme } ://{ hostname } :{ port } " ,
305+ # TODO fix retries
306+ retries = None
307+
308+ auth_schema = 'Token' if auth_scheme is None else auth_scheme
309+ default_header = {
310+ 'Authorization' : f'{ auth_schema } { self ._token } ' ,
311+ 'User-Agent' : USER_AGENT
312+ }
313+ self .base_url = f"{ scheme } ://{ hostname } :{ port } "
314+ self .default_header = default_header
315+ self .rest_client = rest .RestClient (
316+ base_url = self .base_url ,
317+ default_header = default_header ,
318+ retries = retries )
319+
320+ # TODO point_settings??
321+ # TODO enable_gzip and gzip_threshold be in WriteOptions
322+
323+ self ._write_api = _WriteApi (
298324 token = self ._token ,
325+ bucket = self ._database ,
299326 org = self ._org ,
327+ gzip_threshold = gzip_threshold ,
328+ enable_gzip = enable_gzip ,
329+ auth_scheme = auth_scheme ,
300330 timeout = write_timeout ,
301- ** kwargs )
302-
303- self ._write_api = _WriteApi (influxdb_client = self ._client , ** self ._write_client_options )
331+ default_header = default_header ,
332+ rest_client = self .rest_client ,
333+ ** self ._write_client_options
334+ )
304335
305336 if query_port_overwrite is not None :
306337 port = query_port_overwrite
@@ -658,32 +689,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
658689 except ArrowException as e :
659690 raise InfluxDB3ClientQueryError (f"Error while executing query: { e } " )
660691
661- def get_server_version (self ) -> str :
692+ def get_server_version (self ) -> Any | None :
662693 """
663- Get the version of the connected InfluxDB server.
694+ Get the influxdb_version of the connected InfluxDB server.
664695
665- This method makes a ping request to the server and extracts the version information
696+ This method makes a ping request to the server and extracts the influxdb_version information
666697 from either the response headers or response body.
667698
668- :return: The version string of the InfluxDB server.
699+ :return: The influxdb_version string of the InfluxDB server.
669700 :rtype: str
670701 """
671- version = None
672- (resp_body , _ , header ) = self ._client .api_client .call_api (
673- resource_path = "/ping" ,
674- method = "GET" ,
675- response_type = object
676- )
677-
678- for key , value in header .items ():
702+ resp = self .rest_client .request (url = '/ping' , method = "GET" , headers = self .default_header )
703+ for key , value in resp .getheaders ().items ():
679704 if key .lower () == "x-influxdb-version" :
680- version = value
681- break
682-
683- if version is None and isinstance (resp_body , dict ):
684- version = resp_body ['version' ]
705+ return value
685706
686- return version
707+ string_body = resp .get_string_body ()
708+ if string_body is not None :
709+ return json .loads (string_body )['version' ]
710+ return None
687711
688712 def flush (self ):
689713 """
@@ -702,7 +726,6 @@ def close(self):
702726 """Close the client and clean up resources."""
703727 self ._write_api .close ()
704728 self ._query_api .close ()
705- self ._client .close ()
706729
707730 def __enter__ (self ):
708731 return self
0 commit comments