Skip to content

Commit f946fdb

Browse files
committed
feat: default to v3 write endpoint and add UseV2Api compatibility mode
1 parent 7f4aa1f commit f946fdb

11 files changed

Lines changed: 347 additions & 139 deletions

File tree

README.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,9 @@ client.writePoint(
115115
);
116116

117117
//
118-
// Write with partial acceptance
118+
// Write with partial acceptance (default behavior)
119119
//
120-
WriteOptions partialWrite = new WriteOptions.Builder()
121-
.acceptPartial(true)
122-
.build();
120+
WriteOptions partialWrite = new WriteOptions.Builder().build();
123121
try {
124122
client.writeRecords(List.of(
125123
"temperature,region=west value=20.0",
@@ -131,6 +129,14 @@ try {
131129
System.out.printf("line=%s msg=%s lp=%s%n", line.lineNumber(), line.errorMessage(), line.originalLine()));
132130
}
133131

132+
//
133+
// Write via v2 compatibility endpoint (InfluxDB Clustered)
134+
//
135+
WriteOptions useV2 = new WriteOptions.Builder()
136+
.useV2Api(true)
137+
.build();
138+
client.writeRecord("temperature,location=north value=60.0", useV2);
139+
134140
//
135141
// Write by LineProtocol
136142
//

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ static InfluxDBClient getInstance(@Nonnull final ClientConfig config) {
558558
* <li>gzipThreshold - payload size size for gzipping data</li>
559559
* <li>writeNoSync - skip waiting for WAL persistence on write</li>
560560
* <li>writeAcceptPartial - accept partial writes</li>
561+
* <li>writeUseV2Api - use v2 compatibility write endpoint</li>
561562
* </ul>
562563
*
563564
* @param connectionString connection string
@@ -592,6 +593,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
592593
* <li>INFLUX_GZIP_THRESHOLD - payload size size for gzipping data</li>
593594
* <li>INFLUX_WRITE_NO_SYNC - skip waiting for WAL persistence on write</li>
594595
* <li>INFLUX_WRITE_ACCEPT_PARTIAL - accept partial writes</li>
596+
* <li>INFLUX_WRITE_USE_V2_API - use v2 compatibility write endpoint</li>
595597
* </ul>
596598
* Supported system properties:
597599
* <ul>
@@ -604,6 +606,7 @@ static InfluxDBClient getInstance(@Nonnull final String connectionString) {
604606
* <li>influx.gzipThreshold - payload size size for gzipping data</li>
605607
* <li>influx.writeNoSync - skip waiting for WAL persistence on write</li>
606608
* <li>influx.writeAcceptPartial - accept partial writes</li>
609+
* <li>influx.writeUseV2Api - use v2 compatibility write endpoint</li>
607610
* </ul>
608611
*
609612
* @return instance of {@link InfluxDBClient}

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
5959
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
6060
* <li><code>writeAcceptPartial</code> - accept partial writes</li>
61+
* <li><code>writeUseV2Api</code> - use v2 compatibility write endpoint</li>
6162
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
6263
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
6364
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
@@ -109,6 +110,7 @@ public final class ClientConfig {
109110
private final Integer gzipThreshold;
110111
private final Boolean writeNoSync;
111112
private final Boolean writeAcceptPartial;
113+
private final Boolean writeUseV2Api;
112114
private final Map<String, String> defaultTags;
113115
@Deprecated
114116
private final Duration timeout;
@@ -220,6 +222,16 @@ public Boolean getWriteAcceptPartial() {
220222
return writeAcceptPartial;
221223
}
222224

225+
/**
226+
* Use v2 compatibility write endpoint?
227+
*
228+
* @return use v2 compatibility write endpoint
229+
*/
230+
@Nonnull
231+
public Boolean getWriteUseV2Api() {
232+
return writeUseV2Api;
233+
}
234+
223235
/**
224236
* Gets default tags used when writing points.
225237
* @return default tags
@@ -383,6 +395,7 @@ public boolean equals(final Object o) {
383395
&& Objects.equals(gzipThreshold, that.gzipThreshold)
384396
&& Objects.equals(writeNoSync, that.writeNoSync)
385397
&& Objects.equals(writeAcceptPartial, that.writeAcceptPartial)
398+
&& Objects.equals(writeUseV2Api, that.writeUseV2Api)
386399
&& Objects.equals(defaultTags, that.defaultTags)
387400
&& Objects.equals(timeout, that.timeout)
388401
&& Objects.equals(writeTimeout, that.writeTimeout)
@@ -401,7 +414,7 @@ public boolean equals(final Object o) {
401414
@Override
402415
public int hashCode() {
403416
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
404-
database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial,
417+
database, writePrecision, gzipThreshold, writeNoSync, writeAcceptPartial, writeUseV2Api,
405418
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
406419
proxy, proxyUrl, authenticator, headers,
407420
defaultTags, sslRootsFilePath, disableGRPCCompression, interceptors);
@@ -417,6 +430,7 @@ public String toString() {
417430
.add("gzipThreshold=" + gzipThreshold)
418431
.add("writeNoSync=" + writeNoSync)
419432
.add("writeAcceptPartial=" + writeAcceptPartial)
433+
.add("writeUseV2Api=" + writeUseV2Api)
420434
.add("timeout=" + timeout)
421435
.add("writeTimeout=" + writeTimeout)
422436
.add("queryTimeout=" + queryTimeout)
@@ -447,6 +461,7 @@ public static final class Builder {
447461
private Integer gzipThreshold;
448462
private Boolean writeNoSync;
449463
private Boolean writeAcceptPartial;
464+
private Boolean writeUseV2Api;
450465
private Map<String, String> defaultTags;
451466
@Deprecated
452467
private Duration timeout;
@@ -582,6 +597,19 @@ public Builder writeAcceptPartial(@Nullable final Boolean writeAcceptPartial) {
582597
return this;
583598
}
584599

600+
/**
601+
* Sets whether to use v2 compatibility write endpoint.
602+
*
603+
* @param writeUseV2Api use v2 compatibility write endpoint
604+
* @return this
605+
*/
606+
@Nonnull
607+
public Builder writeUseV2Api(@Nullable final Boolean writeUseV2Api) {
608+
609+
this.writeUseV2Api = writeUseV2Api;
610+
return this;
611+
}
612+
585613
/**
586614
* Sets default tags to be written with points.
587615
*
@@ -831,6 +859,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
831859
if (parameters.containsKey("writeAcceptPartial")) {
832860
this.writeAcceptPartial(Boolean.parseBoolean(parameters.get("writeAcceptPartial")));
833861
}
862+
if (parameters.containsKey("writeUseV2Api")) {
863+
this.writeUseV2Api(Boolean.parseBoolean(parameters.get("writeUseV2Api")));
864+
}
834865
if (parameters.containsKey("disableGRPCCompression")) {
835866
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
836867
}
@@ -890,6 +921,10 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
890921
if (writeAcceptPartial != null) {
891922
this.writeAcceptPartial(Boolean.parseBoolean(writeAcceptPartial));
892923
}
924+
final String writeUseV2Api = get.apply("INFLUX_WRITE_USE_V2_API", "influx.writeUseV2Api");
925+
if (writeUseV2Api != null) {
926+
this.writeUseV2Api(Boolean.parseBoolean(writeUseV2Api));
927+
}
893928
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
894929
if (writeTimeout != null) {
895930
long to = Long.parseLong(writeTimeout);
@@ -949,6 +984,9 @@ private ClientConfig(@Nonnull final Builder builder) {
949984
writeAcceptPartial = builder.writeAcceptPartial != null
950985
? builder.writeAcceptPartial
951986
: WriteOptions.DEFAULT_ACCEPT_PARTIAL;
987+
writeUseV2Api = builder.writeUseV2Api != null
988+
? builder.writeUseV2Api
989+
: WriteOptions.DEFAULT_USE_V2_API;
952990
defaultTags = builder.defaultTags;
953991
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
954992
writeTimeout = builder.writeTimeout != null

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -305,14 +305,21 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
305305
}
306306

307307
WritePrecision precision = options.precisionSafe(config);
308+
options.validate(config);
308309

309310
String path;
310311
Map<String, String> queryParams;
311312
boolean noSync = options.noSyncSafe(config);
312313
boolean acceptPartial = options.acceptPartialSafe(config);
313-
boolean useV3Write = noSync || acceptPartial;
314-
if (useV3Write) {
315-
// no_sync=true and accept_partial=true are supported only in the v3 API.
314+
boolean useV2Api = options.useV2ApiSafe(config);
315+
if (useV2Api) {
316+
path = "api/v2/write";
317+
queryParams = new HashMap<>() {{
318+
put("org", config.getOrganization());
319+
put("bucket", database);
320+
put("precision", WritePrecisionConverter.toV2ApiString(precision));
321+
}};
322+
} else {
316323
path = "api/v3/write_lp";
317324
queryParams = new HashMap<>() {{
318325
put("org", config.getOrganization());
@@ -322,17 +329,9 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
322329
if (noSync) {
323330
queryParams.put("no_sync", "true");
324331
}
325-
if (acceptPartial) {
326-
queryParams.put("accept_partial", "true");
332+
if (!acceptPartial) {
333+
queryParams.put("accept_partial", "false");
327334
}
328-
} else {
329-
// By default, use the v2 API.
330-
path = "api/v2/write";
331-
queryParams = new HashMap<>() {{
332-
put("org", config.getOrganization());
333-
put("bucket", database);
334-
put("precision", WritePrecisionConverter.toV2ApiString(precision));
335-
}};
336335
}
337336

338337
Map<String, String> defaultTags = options.defaultTagsSafe(config);
@@ -380,10 +379,9 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
380379
try {
381380
restClient.request(path, HttpMethod.POST, body, queryParams, headers);
382381
} catch (InfluxDBApiHttpException e) {
383-
if (useV3Write && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
384-
// Server does not support the v3 write API, can't use v3-only write options.
385-
throw new InfluxDBApiHttpException("Server doesn't support write with NoSync=true "
386-
+ "or AcceptPartial=true (supported by InfluxDB 3 Core/Enterprise servers only).",
382+
if (!useV2Api && e.statusCode() == HttpResponseStatus.METHOD_NOT_ALLOWED.code()) {
383+
throw new InfluxDBApiHttpException("Server doesn't support v3 write API. "
384+
+ "Use WriteOptions.Builder.useV2Api(true) for v2 compatibility endpoint.",
387385
e.headers(),
388386
e.statusCode());
389387
}

src/main/java/com/influxdb/v3/client/internal/RestClient.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.security.cert.X509Certificate;
3737
import java.util.ArrayList;
3838
import java.util.List;
39+
import java.util.Locale;
3940
import java.util.Map;
4041
import java.util.Optional;
4142
import java.util.stream.Stream;
@@ -245,7 +246,7 @@ HttpResponse<String> request(@Nonnull final String path,
245246

246247
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
247248
List<InfluxDBPartialWriteException.LineError> lineErrors =
248-
parsePartialWriteLineErrors(path, body, contentType);
249+
parsePartialWriteLineErrors(body, contentType);
249250
if (!lineErrors.isEmpty()) {
250251
throw new InfluxDBPartialWriteException(message, response.headers(), response.statusCode(), lineErrors);
251252
}
@@ -312,13 +313,8 @@ private String formatErrorMessage(@Nonnull final String body, @Nullable final St
312313

313314
@Nonnull
314315
private List<InfluxDBPartialWriteException.LineError> parsePartialWriteLineErrors(
315-
@Nonnull final String path,
316316
@Nonnull final String body,
317317
@Nullable final String contentType) {
318-
if (!isWriteEndpoint(path)) {
319-
return List.of();
320-
}
321-
322318
if (body.isEmpty()) {
323319
return List.of();
324320
}
@@ -337,7 +333,7 @@ private List<InfluxDBPartialWriteException.LineError> parsePartialWriteLineError
337333

338334
final String error = errNonEmptyField(root, "error");
339335
final JsonNode dataNode = root.get("data");
340-
if (error == null || dataNode == null) {
336+
if (!isV3PartialWriteError(error) || dataNode == null) {
341337
return List.of();
342338
}
343339

@@ -374,8 +370,13 @@ private List<InfluxDBPartialWriteException.LineError> parsePartialWriteLineError
374370
}
375371
}
376372

377-
private boolean isWriteEndpoint(@Nonnull final String path) {
378-
return "api/v2/write".equals(path) || "api/v3/write_lp".equals(path);
373+
private boolean isV3PartialWriteError(@Nullable final String errorMessage) {
374+
if (errorMessage == null || errorMessage.isEmpty()) {
375+
return false;
376+
}
377+
String normalized = errorMessage.toLowerCase(Locale.ROOT);
378+
return normalized.contains("partial write of line protocol occurred")
379+
|| normalized.contains("parsing failed for write_lp endpoint");
379380
}
380381

381382
@Nullable

0 commit comments

Comments
 (0)