Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ default boolean isEnableDirectAccess() {
}

default boolean isEnableGcpFallback() {
return false;
return true;
}

default boolean isEnableBuiltInMetrics() {
Expand Down Expand Up @@ -1136,7 +1136,8 @@ public boolean isEnableDirectAccess() {

@Override
public boolean isEnableGcpFallback() {
return Boolean.parseBoolean(System.getenv(GOOGLE_SPANNER_ENABLE_GCP_FALLBACK));
String enableGcpFallback = System.getenv(GOOGLE_SPANNER_ENABLE_GCP_FALLBACK);
return enableGcpFallback == null ? true : Boolean.parseBoolean(enableGcpFallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@
import java.util.concurrent.Future;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Duration class is used in this file (e.g., at line 566) but it is not imported. Please add the missing import for java.time.Duration.

Suggested change
import java.util.concurrent.Future;
import java.time.Duration;
import java.util.concurrent.Future;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -564,6 +563,7 @@ GcpFallbackChannelOptions createFallbackChannelOptions(
.setPrimaryChannelName("directpath")
.setFallbackChannelName("cloudpath")
.setMinFailedCalls(minFailedCalls)
.setPeriod(Duration.ofMinutes(3))
.setGcpFallbackOpenTelemetry(fallbackTelemetry)
.build();
}
Expand Down Expand Up @@ -614,27 +614,13 @@ private void setupGcpFallback(
createChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);

final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> existingCloudPathConfigurator =
cloudPathProviderBuilder.getChannelConfigurator();
final AtomicReference<ManagedChannelBuilder> cloudPathBuilderRef = new AtomicReference<>();
cloudPathProviderBuilder.setChannelConfigurator(
builder -> {
ManagedChannelBuilder effectiveBuilder = builder;
if (existingCloudPathConfigurator != null) {
effectiveBuilder = existingCloudPathConfigurator.apply(effectiveBuilder);
}
cloudPathBuilderRef.set(effectiveBuilder);
return effectiveBuilder;
});

// Build the cloudPathProvider to extract the builder which will be provided to
// FallbackChannelBuilder.
try (TransportChannel ignored = cloudPathProviderBuilder.build().getTransportChannel()) {
} catch (Exception e) {
InstantiatingGrpcChannelProvider cloudPathProvider = cloudPathProviderBuilder.build();
ManagedChannelBuilder cloudPathBuilder;
try {
cloudPathBuilder = cloudPathProvider.createDecoratedChannelBuilder();
} catch (IOException e) {
throw asSpannerException(e);
}

ManagedChannelBuilder cloudPathBuilder = cloudPathBuilderRef.get();
if (cloudPathBuilder == null) {
throw new IllegalStateException("CloudPath builder was not captured.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,61 +1164,49 @@ public void testFallbackIntegration_doesNotSwitchWhenThresholdNotMet() throws Ex
OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();

SpannerOptions.useEnvironment(
new SpannerOptions.SpannerEnvironment() {
@Override
public boolean isEnableGcpFallback() {
return true;
}
});
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1") // Closed port
.setCredentials(NoCredentials.getInstance())
.setOpenTelemetry(openTelemetry);
// Make sure the ExecuteBatchDml RPC fails quickly to keep the test fast.
// Note that the timeout is actually not used. It is the fact that it does not retry that
// makes it fail fast.
builder
.getSpannerStubSettingsBuilder()
.executeBatchDmlSettings()
.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10));
// Setup Options with invalid host to force error
SpannerOptions options = builder.build();

TestableGapicSpannerRpc rpc = new TestableGapicSpannerRpc(options);
try {
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1") // Closed port
.setCredentials(NoCredentials.getInstance())
.setOpenTelemetry(openTelemetry);
// Make sure the ExecuteBatchDml RPC fails quickly to keep the test fast.
// Note that the timeout is actually not used. It is the fact that it does not retry that
// makes it fail fast.
builder
.getSpannerStubSettingsBuilder()
.executeBatchDmlSettings()
.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10));
// Setup Options with invalid host to force error
SpannerOptions options = builder.build();

TestableGapicSpannerRpc rpc = new TestableGapicSpannerRpc(options);
try {
// Make a call that is expected to fail
SpannerException exception =
assertThrows(
SpannerException.class,
() ->
rpc.executeBatchDml(
com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder()
.setSession("projects/p/instances/i/databases/d/sessions/s")
.build(),
null));
assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode());

// Wait briefly for the 10ms period to trigger the fallback check
Thread.sleep(10);

// Verify Fallback via Metrics
Collection<MetricData> metrics = metricReader.collectAllMetrics();
boolean fallbackOccurred =
metrics.stream()
.anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md));

assertFalse("Fallback metric should not be present", fallbackOccurred);

} finally {
rpc.shutdown();
}
// Make a call that is expected to fail
SpannerException exception =
assertThrows(
SpannerException.class,
() ->
rpc.executeBatchDml(
com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder()
.setSession("projects/p/instances/i/databases/d/sessions/s")
.build(),
null));
assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode());

// Wait briefly for the 10ms period to trigger the fallback check
Thread.sleep(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The test waits for 10ms, but the production code now sets the fallback period to 3 minutes (Duration.ofMinutes(3)). This will likely cause the test to fail as the fallback check won't trigger within the sleep duration. Consider overriding the period in the test to a smaller value or adjusting the test expectations. Additionally, please remove the stale code comment '10ms period' instead of updating it, as it no longer reflects the implementation.

References
  1. Remove comments that refer to stale implementations instead of updating them.


// Verify Fallback via Metrics
Collection<MetricData> metrics = metricReader.collectAllMetrics();
boolean fallbackOccurred =
metrics.stream().anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md));

assertFalse("Fallback metric should not be present", fallbackOccurred);

} finally {
SpannerOptions.useDefaultEnvironment();
rpc.shutdown();
}
}

Expand Down Expand Up @@ -1255,64 +1243,52 @@ public void testFallbackIntegration_switchesToFallbackOnFailure() throws Excepti
OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();

SpannerOptions.useEnvironment(
new SpannerOptions.SpannerEnvironment() {
@Override
public boolean isEnableGcpFallback() {
return true;
}
});
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1") // Closed port
.setCredentials(NoCredentials.getInstance())
.setOpenTelemetry(openTelemetry);
// Make sure the ExecuteBatchDml RPC fails quickly to keep the test fast.
// Note that the timeout is actually not used. It is the fact that it does not retry that
// makes it fail fast.
builder
.getSpannerStubSettingsBuilder()
.executeBatchDmlSettings()
.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10));
// Setup Options with invalid host to force error
SpannerOptions options = builder.build();

TestableGapicSpannerRpcWithLowerMinFailedCalls rpc =
new TestableGapicSpannerRpcWithLowerMinFailedCalls(options);
try {
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1") // Closed port
.setCredentials(NoCredentials.getInstance())
.setOpenTelemetry(openTelemetry);
// Make sure the ExecuteBatchDml RPC fails quickly to keep the test fast.
// Note that the timeout is actually not used. It is the fact that it does not retry that
// makes it fail fast.
builder
.getSpannerStubSettingsBuilder()
.executeBatchDmlSettings()
.setSimpleTimeoutNoRetriesDuration(Duration.ofSeconds(10));
// Setup Options with invalid host to force error
SpannerOptions options = builder.build();

TestableGapicSpannerRpcWithLowerMinFailedCalls rpc =
new TestableGapicSpannerRpcWithLowerMinFailedCalls(options);
try {
// Make a call that is expected to fail
SpannerException exception =
assertThrows(
SpannerException.class,
() ->
rpc.executeBatchDml(
com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder()
.setSession("projects/p/instances/i/databases/d/sessions/s")
.build(),
null));
assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode());

// Wait briefly for the 10ms period to trigger the fallback check
Thread.sleep(10);

// Verify Fallback via Metrics
Collection<MetricData> metrics = metricReader.collectAllMetrics();
boolean fallbackOccurred =
metrics.stream()
.anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md));

assertTrue(
"Fallback metric should be present, indicating GcpFallbackChannel is active",
fallbackOccurred);

} finally {
rpc.shutdown();
}
// Make a call that is expected to fail
SpannerException exception =
assertThrows(
SpannerException.class,
() ->
rpc.executeBatchDml(
com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder()
.setSession("projects/p/instances/i/databases/d/sessions/s")
.build(),
null));
assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode());

// Wait briefly for the 10ms period to trigger the fallback check
Thread.sleep(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous test case, the 10ms sleep is likely insufficient given the new 3-minute fallback period set in GapicSpannerRpc. The test should be updated to ensure the fallback check is triggered.


// Verify Fallback via Metrics
Collection<MetricData> metrics = metricReader.collectAllMetrics();
boolean fallbackOccurred =
metrics.stream().anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md));

assertTrue(
"Fallback metric should be present, indicating GcpFallbackChannel is active",
fallbackOccurred);

} finally {
SpannerOptions.useDefaultEnvironment();
rpc.shutdown();
}
}

Expand Down
Loading