Skip to content

Commit 200c024

Browse files
refactor: write rest client
1 parent 588e4da commit 200c024

19 files changed

Lines changed: 923 additions & 2555 deletions

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ commands:
8181
- run:
8282
name: Collecting coverage reports
8383
command: |
84-
curl -k https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
8584
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov
8685
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM
8786
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM.sig
87+
curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import
8888
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
8989
shasum -a 256 -c codecov.SHA256SUM
9090
sudo chmod +x codecov

influxdb_client_3/__init__.py

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import importlib.util
2+
import json
23
import os
34
import urllib.parse
45
from typing import Any, List, Literal, Optional, TYPE_CHECKING
56

67
import pyarrow as pa
78

9+
from influxdb_client_3.version import USER_AGENT
10+
from influxdb_client_3.write_client._sync import rest_client as rest
11+
812
if TYPE_CHECKING:
913
import pandas as pd
1014
import polars as pl
@@ -14,7 +18,7 @@
1418
from influxdb_client_3.exceptions import InfluxDBError
1519
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
1620
from influxdb_client_3.read_file import UploadFile
17-
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
21+
from influxdb_client_3.write_client import WriteOptions, Point
1822
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
1923
PointSettings, DefaultWriteOptions, WriteType
2024
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
@@ -185,10 +189,13 @@ def _parse_timeout(to: str) -> int:
185189
class InfluxDBClient3:
186190
def __init__(
187191
self,
188-
host=None,
192+
host='localhost',
189193
org=None,
190194
database=None,
191195
token=None,
196+
auth_scheme=None,
197+
enable_gzip=False,
198+
gzip_threshold=None,
192199
write_client_options=None,
193200
flight_client_options=None,
194201
write_port_overwrite=None,
@@ -212,6 +219,10 @@ def __init__(
212219
:type flight_client_options: dict[str, any]
213220
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
214221
:type disable_grpc_compression: bool
222+
:param enable_gzip: Enable GZIP compression for write requests.
223+
:type enable_gzip: bool
224+
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
225+
:type gzip_threshold: int
215226
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
216227
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
217228
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
@@ -293,14 +304,45 @@ def __init__(
293304
if write_port_overwrite is not None:
294305
port = write_port_overwrite
295306

296-
self._client = _InfluxDBClient(
297-
url=f"{scheme}://{hostname}:{port}",
307+
# TODO fix retries
308+
retries = None
309+
310+
auth_schema = 'Token' if auth_scheme is None else auth_scheme
311+
default_header = {
312+
'User-Agent': USER_AGENT
313+
}
314+
if self._token is not None:
315+
default_header['Authorization'] = f'{auth_schema} {self._token}'
316+
self.base_url = f"{scheme}://{hostname}:{port}"
317+
self.default_header = default_header
318+
self.rest_client = rest.RestClient(
319+
base_url=self.base_url,
320+
default_header=default_header,
321+
verify_ssl=kwargs.get('verify_ssl', True),
322+
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
323+
cert_file=kwargs.get('cert_file', None),
324+
cert_key_file=kwargs.get('cert_key_file', None),
325+
cert_key_password=kwargs.get('cert_key_password', None),
326+
ssl_context=kwargs.get('ssl_context', None),
327+
proxy=kwargs.get('proxy', None),
328+
proxy_headers=kwargs.get('proxy_headers', None),
329+
retries=retries,
330+
)
331+
332+
# TODO point_settings??
333+
334+
self._write_api = _WriteApi(
298335
token=self._token,
336+
bucket=self._database,
299337
org=self._org,
338+
gzip_threshold=gzip_threshold,
339+
enable_gzip=enable_gzip,
340+
auth_scheme=auth_scheme,
300341
timeout=write_timeout,
301-
**kwargs)
302-
303-
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
342+
default_header=default_header,
343+
rest_client=self.rest_client,
344+
**self._write_client_options
345+
)
304346

305347
if query_port_overwrite is not None:
306348
port = query_port_overwrite
@@ -658,32 +700,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
658700
except ArrowException as e:
659701
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
660702

661-
def get_server_version(self) -> str:
703+
def get_server_version(self) -> Optional[str]:
662704
"""
663-
Get the version of the connected InfluxDB server.
705+
Get the influxdb_version of the connected InfluxDB server.
664706
665-
This method makes a ping request to the server and extracts the version information
707+
This method makes a ping request to the server and extracts the influxdb_version information
666708
from either the response headers or response body.
667709
668-
:return: The version string of the InfluxDB server.
710+
:return: The influxdb_version string of the InfluxDB server.
669711
:rtype: str
670712
"""
671-
version = None
672-
(resp_body, _, header) = self._client.api_client.call_api(
673-
resource_path="/ping",
674-
method="GET",
675-
response_type=object
676-
)
677-
678-
for key, value in header.items():
713+
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
714+
for key, value in resp.getheaders().items():
679715
if key.lower() == "x-influxdb-version":
680-
version = value
681-
break
682-
683-
if version is None and isinstance(resp_body, dict):
684-
version = resp_body['version']
716+
return value
685717

686-
return version
718+
string_body = resp.get_string_body()
719+
if string_body is not None:
720+
return json.loads(string_body)['version']
721+
return None
687722

688723
def flush(self):
689724
"""
@@ -702,7 +737,6 @@ def close(self):
702737
"""Close the client and clean up resources."""
703738
self._write_api.close()
704739
self._query_api.close()
705-
self._client.close()
706740

707741
def __enter__(self):
708742
return self

influxdb_client_3/write_client/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,9 @@
44

55
from __future__ import absolute_import
66

7-
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
8-
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
9-
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
7+
from influxdb_client_3.version import VERSION
108
from influxdb_client_3.write_client.client.write.point import Point
11-
12-
from influxdb_client_3.write_client.service.write_service import WriteService
13-
9+
from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
1410
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
1511

16-
from influxdb_client_3.write_client.configuration import Configuration
17-
from influxdb_client_3.version import VERSION
1812
__version__ = VERSION

0 commit comments

Comments
 (0)