-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy path__init__.py
More file actions
775 lines (659 loc) · 33.4 KB
/
Copy path__init__.py
File metadata and controls
775 lines (659 loc) · 33.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
import multiprocessing
import importlib.util
import json
import os
import urllib.parse
from typing import Any, List, Literal, Optional, TYPE_CHECKING
import pyarrow as pa
from influxdb_client_3.version import USER_AGENT
from influxdb_client_3.write_client._sync import rest_client as rest
if TYPE_CHECKING:
import pandas as pd
import polars as pl
from pyarrow import ArrowException
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
from influxdb_client_3.exceptions import InfluxDBError
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import WriteOptions, Point
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
polars = importlib.util.find_spec("polars") is not None
INFLUX_HOST = "INFLUX_HOST"
INFLUX_TOKEN = "INFLUX_TOKEN"
INFLUX_DATABASE = "INFLUX_DATABASE"
INFLUX_ORG = "INFLUX_ORG"
INFLUX_PRECISION = "INFLUX_PRECISION"
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL"
INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API"
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
def write_client_options(**kwargs):
"""
Function for providing additional arguments for the WriteApi client.
:param kwargs: Additional arguments for the WriteApi client.
:return: dict with the arguments.
"""
return kwargs
def default_client_options(**kwargs):
return kwargs
def flight_client_options(**kwargs):
"""
Function for providing additional arguments for the FlightClient.
:param kwargs: Additional arguments for the FlightClient.
:return: dict with the arguments.
"""
return kwargs
def file_parser_options(**kwargs):
"""
Function for providing additional arguments for the file parser.
:param kwargs: Additional arguments for the file parser.
:return: dict with the arguments.
"""
return kwargs
def _deep_merge(target, source):
"""
Performs a deep merge of dictionaries or lists,
recursively merging the contents, handling nested structures, and concatenation of lists.
"""
if isinstance(target, dict) and isinstance(source, dict):
for key, value in source.items():
if key in target and isinstance(target[key], (dict, list)) and isinstance(value, (dict, list)):
# If both target and source values are dictionaries or lists, merge them recursively
target[key] = _deep_merge(target[key], value)
else:
# Otherwise, replace the target value with the source value
target[key] = value
elif isinstance(target, list) and isinstance(source, list):
# If both target and source are lists, concatenate them
target.extend(source)
elif source is not None:
# For other types, simply replace the target with the source
target = source
return target
def _merge_options(defaults, exclude_keys=None, custom=None):
"""
Merge default option arguments with custom (user-provided) arguments,
excluding specific keys defined in exclude_keys.
"""
if custom is None or len(custom) == 0:
return defaults
if exclude_keys is None:
exclude_keys = []
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})
def _parse_precision(precision: str) -> WritePrecision:
"""
Parses the precision value and ensures it is valid.
This function checks that the given `precision` is one of the allowed
values defined in `WritePrecision`. If the precision is invalid, it
raises a `ValueError`. The function returns the valid precision value
if it passes validation.
:param precision: The precision value to be validated.
Must be one of WritePrecision.NS, WritePrecision.MS,
WritePrecision.S, or WritePrecision.US.
:return: The valid precision value.
:rtype: WritePrecision
:raises ValueError: If the provided precision is not valid.
"""
if precision == WritePrecision.NS or precision == "nanosecond":
return WritePrecision.NS
if precision == WritePrecision.US or precision == "microsecond":
return WritePrecision.US
if precision == WritePrecision.MS or precision == "millisecond":
return WritePrecision.MS
if precision == WritePrecision.S or precision == "second":
return WritePrecision.S
raise ValueError(f"Invalid precision value: {precision}")
def _parse_gzip_threshold(threshold: str) -> int:
"""
Parses and validates the provided threshold value.
This function ensures that the given threshold is a valid integer value,
and it raises an appropriate error if the threshold is not valid. It also
enforces that the threshold value is non-negative.
:param threshold: The input threshold value to be parsed and validated.
:type threshold: Any
:return: The validated threshold value as an integer.
:rtype: int
:raises ValueError: If the provided threshold is not an integer or if it is
negative.
"""
try:
threshold = int(threshold)
except (TypeError, ValueError):
raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.")
if threshold < 0:
raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.")
return threshold
def _parse_write_bool(value):
"""
Parses a truthy/falsy value for write options.
The input is normalized to string and matched against common truthy values.
Any non-truthy value is treated as False.
:param value: The input value to be parsed and validated.
:type value: Any
:return: Parsed boolean value.
:rtype: bool
"""
return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']
def _parse_timeout(to: str) -> int:
try:
timeout = int(to)
except (TypeError, ValueError):
raise ValueError(f"Invalid timeout value: {to}. Must be a number.")
if timeout < 0:
raise ValueError(f"Invalid timeout value: {to}. Must be non-negative.")
return timeout
class InfluxDBClient3:
def __init__(
self,
host=None,
org=None,
database=None,
token=None,
auth_scheme=None,
enable_gzip=False,
gzip_threshold=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
query_port_overwrite=None,
disable_grpc_compression=False,
point_settings=None,
**kwargs):
"""
Initialize an InfluxDB client.
:param host: The hostname or IP address of the InfluxDB server.
:type host: str
:param org: The InfluxDB organization name for operations.
:type org: str
:param database: The database for InfluxDB operations.
:type database: str
:param token: The authentication token for accessing the InfluxDB server.
:type token: str
:param write_client_options: dictionary for providing additional arguments for the WriteApi client.
:type write_client_options: dict[str, any]
:param flight_client_options: dictionary for providing additional arguments for the FlightClient.
:type flight_client_options: dict[str, any]
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
:type disable_grpc_compression: bool
:param point_settings The settings for Points
:type point_settings: PointSettings
:param enable_gzip: Enable GZIP compression for write requests.
:type enable_gzip: bool
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
:type gzip_threshold: int
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
:key str cert_file: Path to the certificate that will be used for mTLS authentication.
:key str cert_key_file: Path to the file contains private key for mTLS certificate.
:key str cert_key_password: String or function which returns password for decrypting the mTLS private key.
:key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake.
Be aware that only delivered certificate/ key files or an SSL Context are
possible.
:key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)
:key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy
authentication. (Applies to Write API only)
:key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3.
Defaults to "multiprocessing.cpu_count() * 5".
:key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests
except batching writes. As a default there is no one retry strategy.
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
:key bool write_accept_partial: allow partial writes when some lines fail.
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
:key bool write_no_sync: disable sync confirmation on V3 API endpoint writes.
:key list[str] profilers: list of enabled Flux profilers
"""
self._org = org if org is not None else "default"
self._database = database
self._token = token
kw_keys = kwargs.keys()
write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
write_no_sync = DefaultWriteOptions.no_sync.value
write_accept_partial = DefaultWriteOptions.accept_partial.value
write_use_v2_api = DefaultWriteOptions.use_v2_api.value
write_timeout = DefaultWriteOptions.timeout.value
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial)
write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api)
write_timeout = getattr(write_opts, 'timeout', write_timeout)
if kw_keys.__contains__('write_timeout'):
write_timeout = kwargs.get('write_timeout')
if kw_keys.__contains__('write_accept_partial') and kwargs.get('write_accept_partial') is not None:
write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial'))
if kw_keys.__contains__('write_use_v2_api') and kwargs.get('write_use_v2_api') is not None:
write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api'))
if kw_keys.__contains__('write_no_sync') and kwargs.get('write_no_sync') is not None:
write_no_sync = _parse_write_bool(kwargs.get('write_no_sync'))
write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
no_sync=write_no_sync,
accept_partial=write_accept_partial,
use_v2_api=write_use_v2_api,
)
self._write_client_options = {
"write_options": write_options,
**(write_client_options or {})
}
# Parse the host input
parsed_url = urllib.parse.urlparse(host)
# Determine the protocol (scheme), hostname, and port
scheme = parsed_url.scheme if parsed_url.scheme else "https"
hostname = parsed_url.hostname if parsed_url.hostname else host
port = parsed_url.port if parsed_url.port else 443
# Construct the clients using the parsed values
if write_port_overwrite is not None:
port = write_port_overwrite
auth_schema = 'Token' if auth_scheme is None else auth_scheme
default_header = {
'User-Agent': USER_AGENT
}
if self._token is not None:
default_header['Authorization'] = f'{auth_schema} {self._token}'
self.base_url = f"{scheme}://{hostname}:{port}"
self.default_header = default_header
self.rest_client = rest.RestClient(
base_url=self.base_url,
default_header=default_header,
verify_ssl=kwargs.get('verify_ssl', True),
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
cert_file=kwargs.get('cert_file', None),
cert_key_file=kwargs.get('cert_key_file', None),
cert_key_password=kwargs.get('cert_key_password', None),
ssl_context=kwargs.get('ssl_context', None),
proxy=kwargs.get('proxy', None),
proxy_headers=kwargs.get('proxy_headers', None),
retries=kwargs.get('retries', False),
connection_pool_maxsize=kwargs.get('connection_pool_maxsize', multiprocessing.cpu_count() * 5,)
)
if point_settings is None:
point_settings = PointSettings()
self._write_api = _WriteApi(
token=self._token,
bucket=self._database,
org=self._org,
gzip_threshold=gzip_threshold,
enable_gzip=enable_gzip,
auth_scheme=auth_scheme,
timeout=write_timeout,
default_header=default_header,
rest_client=self.rest_client,
point_settings=point_settings,
**self._write_client_options
)
if query_port_overwrite is not None:
port = query_port_overwrite
if scheme == 'https':
connection_string = f"grpc+tls://{hostname}:{port}"
else:
connection_string = f"grpc+tcp://{hostname}:{port}"
q_opts_builder = QueryApiOptionsBuilder()
if disable_grpc_compression:
q_opts_builder.disable_grpc_compression(True)
if kw_keys.__contains__('ssl_ca_cert'):
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
if kw_keys.__contains__('verify_ssl'):
q_opts_builder.tls_verify(kwargs.get('verify_ssl', True))
if kw_keys.__contains__('proxy'):
q_opts_builder.proxy(kwargs.get('proxy', None))
if kw_keys.__contains__('query_timeout'):
query_timeout_float = float(kwargs.get('query_timeout'))
q_opts_builder.timeout(query_timeout_float / 1000.0)
self._query_api = _QueryApi(connection_string=connection_string, token=token,
flight_client_options=flight_client_options,
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())
@classmethod
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
"""
Creates an instance of the ``InfluxDBClient3`` class using environment
variables for configuration. This method simplifies client creation by
automatically reading required information from the system environment.
It verifies the presence of required environment variables such as host,
token, and database. If any of these variables are missing or empty,
a ``ValueError`` will be raised. Optional parameters such as precision and
authentication scheme will also be extracted from the environment when
present, allowing further customization of the client.
:param kwargs: Additional parameters that are passed to the client constructor.
:type kwargs: Any
:raises ValueError: If any required environment variables are missing or empty.
:return: An initialized client object of type ``InfluxDBClient3``.
:rtype: InfluxDBClient3
"""
required_vars = {
INFLUX_HOST: os.getenv(INFLUX_HOST),
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
}
missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
write_options = WriteOptions(write_type=WriteType.synchronous)
gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
if gzip_threshold is not None:
kwargs['gzip_threshold'] = _parse_gzip_threshold(gzip_threshold)
kwargs['enable_gzip'] = True
write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
if write_no_sync is not None:
write_options.no_sync = _parse_write_bool(write_no_sync)
write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL)
if write_accept_partial is not None:
write_options.accept_partial = _parse_write_bool(write_accept_partial)
write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API)
if write_use_v2_api is not None:
write_options.use_v2_api = _parse_write_bool(write_use_v2_api)
precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
write_options.write_precision = _parse_precision(precision)
write_timeout = os.getenv(INFLUX_WRITE_TIMEOUT)
if write_timeout is not None:
# N.B. write_options value has precedent over kwargs['write_timeout'] above
write_options.timeout = _parse_timeout(write_timeout)
query_timeout = os.getenv(INFLUX_QUERY_TIMEOUT)
if query_timeout is not None:
kwargs['query_timeout'] = _parse_timeout(query_timeout)
write_client_option = {'write_options': write_options}
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
if disable_grpc_compression is not None:
disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
else:
disable_grpc_compression = False
org = os.getenv(INFLUX_ORG, "default")
return InfluxDBClient3(
host=required_vars[INFLUX_HOST],
token=required_vars[INFLUX_TOKEN],
database=required_vars[INFLUX_DATABASE],
write_client_options=write_client_option,
org=org,
disable_grpc_compression=disable_grpc_compression,
**kwargs
)
def write(self, record=None, database=None, **kwargs):
"""
Write data to InfluxDB.
Warning: When you write with only one Point or one Dict, and If that Point or Dict
contains fields with None value, those fields will not be written to InfluxDB.
If such fields are later queried explicitly, for example,
"SELECT field_with_value, field_with_null_value FROM my_table" an error will be thrown.
:param record: The data point(s) to write.
:type record: object or list of objects
:param database: The database to write to. If not provided, uses the database provided during initialization.
:type database: str
:param kwargs: Additional arguments to pass to the write API.
"""
if database is None:
database = self._database
return self._write_api.write(bucket=database, record=record, **kwargs)
def write_dataframe(
self,
df: "pd.DataFrame | pl.DataFrame",
measurement: str,
timestamp_column: str,
tags: Optional[List[str]] = None,
timestamp_timezone: Optional[str] = None,
database: Optional[str] = None,
**kwargs
):
"""
Write a DataFrame to InfluxDB.
This method supports both pandas and polars DataFrames, automatically detecting
the DataFrame type and using the appropriate serializer.
:param df: The DataFrame to write. Can be a pandas or polars DataFrame.
:type df: pandas.DataFrame or polars.DataFrame
:param measurement: The name of the measurement to write to.
:type measurement: str
:param timestamp_column: The name of the column containing timestamps.
This parameter is required for consistency between pandas and polars.
:type timestamp_column: str
:param tags: List of column names to use as tags. Remaining columns will be fields.
:type tags: list[str], optional
:param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York').
:type timestamp_timezone: str, optional
:param database: The database to write to. If not provided, uses the database from initialization.
:type database: str, optional
:param kwargs: Additional arguments to pass to the write API.
:raises TypeError: If df is not a pandas or polars DataFrame.
:raises InfluxDBError: If there is an error writing to the database.
Example:
>>> import pandas as pd
>>> df = pd.DataFrame({
... 'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
... 'city': ['London', 'Paris'],
... 'temperature': [15.0, 18.0]
... })
>>> client.write_dataframe(
... df,
... measurement='weather',
... timestamp_column='time',
... tags=['city']
... )
"""
if database is None:
database = self._database
# Detect DataFrame type
df_type = str(type(df))
if 'pandas' not in df_type and 'polars' not in df_type:
raise TypeError(
f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. "
"Please pass a valid DataFrame object."
)
try:
return self._write_api.write(
bucket=database,
record=df,
data_frame_measurement_name=measurement,
data_frame_tag_columns=tags or [],
data_frame_timestamp_column=timestamp_column,
data_frame_timestamp_timezone=timestamp_timezone,
**kwargs
)
except InfluxDBError as e:
raise e
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None,
file_parser_options=None, **kwargs):
"""
Write data from a file to InfluxDB.
:param file: The file to write.
:type file: str
:param measurement_name: The name of the measurement.
:type measurement_name: str
:param tag_columns: Tag columns.
:type tag_columns: list
:param timestamp_column: Timestamp column name. Defaults to 'time'.
:type timestamp_column: str
:param database: The database to write to. If not provided, uses the database provided during initialization.
:type database: str
:param file_parser_options: Function for providing additional arguments for the file parser.
:type file_parser_options: callable
:param kwargs: Additional arguments to pass to the write API.
"""
if database is None:
database = self._database
try:
table = UploadFile(file, file_parser_options).load_file()
df = table.to_pandas() if isinstance(table, pa.Table) else table
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database,
**kwargs)
except Exception as e:
raise e
def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column, database, **kwargs):
# This function is factored out for clarity.
# It processes a DataFrame before writing to InfluxDB.
measurement_column = None
if measurement_name is None:
measurement_column = next((col for col in ['measurement', 'iox::measurement'] if col in df.columns), None)
if measurement_column:
for measurement in df[measurement_column].unique():
df_measurement = df[df[measurement_column] == measurement].drop(columns=[measurement_column])
self._write_api.write(bucket=self._database, record=df_measurement,
data_frame_measurement_name=measurement,
data_frame_tag_columns=tag_columns,
data_frame_timestamp_column=timestamp_column)
else:
print("'measurement' column not found in the dataframe.")
else:
df = df.drop(columns=['measurement'], errors='ignore')
self._write_api.write(bucket=database, record=df,
data_frame_measurement_name=measurement_name,
data_frame_tag_columns=tag_columns,
data_frame_timestamp_column=timestamp_column, **kwargs)
def query(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
"""Query data from InfluxDB.
If you want to use query parameters, you can pass them as kwargs:
>>> client.query("select * from cpu where host=$host", query_parameters={"host": "server01"})
:param query: The query to execute on the database.
:param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
:param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
"reader" or "schema". Defaults to "all".
:param database: The database to query from. If not provided, uses the database provided during initialization.
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
set up per request headers.
:keyword query_parameters: The query parameters to use in the query.
It should be a ``dictionary`` of key-value pairs.
:return: The query result in the specified mode.
"""
if mode == "polars" and polars is False:
raise ImportError("Polars is not installed. Please install it with `pip install polars`.")
if database is None:
database = self._database
try:
return self._query_api.query(query=query, language=language, mode=mode, database=database, **kwargs)
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
def query_dataframe(
self,
query: str,
language: str = "sql",
database: Optional[str] = None,
frame_type: Literal["pandas", "polars"] = "pandas",
**kwargs
) -> "pd.DataFrame | pl.DataFrame":
"""
Query data from InfluxDB and return as a DataFrame.
This is a convenience method that wraps query() and returns the result
directly as a pandas or polars DataFrame.
:param query: The query to execute on the database.
:type query: str
:param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql".
:type language: str
:param database: The database to query from. If not provided, uses the database from initialization.
:type database: str, optional
:param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas".
:type frame_type: Literal["pandas", "polars"]
:param kwargs: Additional arguments to pass to the query API.
:keyword query_parameters: Query parameters as a dictionary of key-value pairs.
:return: Query result as a pandas or polars DataFrame.
:rtype: pandas.DataFrame or polars.DataFrame
:raises ImportError: If polars is requested but not installed.
Example:
>>> # Query and get a pandas DataFrame
>>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")
>>>
>>> # Query and get a polars DataFrame
>>> df = client.query_dataframe(
... "SELECT * FROM weather",
... frame_type="polars"
... )
"""
if frame_type == "polars" and polars is False:
raise ImportError(
"Polars is not installed. Please install it with `pip install polars`."
)
return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs)
async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
"""Query data from InfluxDB asynchronously.
If you want to use query parameters, you can pass them as kwargs:
>>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})
:param query: The query to execute on the database.
:param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
:param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
"reader" or "schema". Defaults to "all".
:param database: The database to query from. If not provided, uses the database provided during initialization.
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
set up per request headers.
:keyword query_parameters: The query parameters to use in the query.
It should be a ``dictionary`` of key-value pairs.
:return: The query result in the specified mode.
"""
if mode == "polars" and polars is False:
raise ImportError("Polars is not installed. Please install it with `pip install polars`.")
if database is None:
database = self._database
try:
return await self._query_api.query_async(query=query,
language=language,
mode=mode,
database=database,
**kwargs)
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
def get_server_version(self) -> Optional[str]:
"""
Retrieves the server version by querying the designated endpoint and
extracting the version information from either response headers or
the response body.
This method interacts with a REST API endpoint to fetch the server's
version details, which might be stored in a specific HTTP header or
available in the response body as part of a JSON structure.
:return: The version string of the server if available, otherwise None.
:rtype: Optional[str]
"""
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
for key, value in resp.getheaders().items():
if key.lower() == "x-influxdb-version":
return value
string_body = resp.get_string_body()
if not string_body:
return None
try:
data = json.loads(string_body)
except (ValueError, TypeError):
return None
return data.get('version')
def flush(self):
"""
Flush any buffered writes to InfluxDB without closing the client.
This method immediately sends all buffered data points to the server
when using batching write mode. After flushing, the client remains
open and ready for more writes.
For synchronous write mode, this is a no-op since data is written
immediately.
"""
self._write_api.flush()
def close(self):
"""Close the client and clean up resources."""
if self._write_api is not None:
self._write_api.close()
if self._query_api is not None:
self._query_api.close()
if self.rest_client is not None:
self.rest_client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
__all__ = [
"InfluxDBClient3",
"Point",
"PointSettings",
"SYNCHRONOUS",
"ASYNCHRONOUS",
"WritePrecision",
"WriteOptions",
"write_client_options",
"flight_client_options",
"file_parser_options"
]