Skip to content

Commit d34cc67

Browse files
feat: scale up connection worker pool based on latency
1 parent ff45dd2 commit d34cc67

3 files changed

Lines changed: 195 additions & 22 deletions

File tree

java-bigquerystorage/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1881,12 +1881,25 @@ void setRequestSendQueueTime() {
18811881

18821882
/** Returns the current workload of this worker. */
18831883
public Load getLoad() {
1884-
return Load.create(
1885-
inflightBytes,
1886-
inflightRequests,
1887-
destinationSet.size(),
1888-
maxInflightBytes,
1889-
maxInflightRequests);
1884+
this.lock.lock();
1885+
try {
1886+
Duration timeSinceLastCallback = Duration.ZERO;
1887+
if (!inflightRequestQueue.isEmpty()) {
1888+
AppendRequestAndResponse head = inflightRequestQueue.peekFirst();
1889+
if (head != null && head.requestSendTimeStamp != null) {
1890+
timeSinceLastCallback = Duration.between(head.requestSendTimeStamp, Instant.now());
1891+
}
1892+
}
1893+
return Load.create(
1894+
timeSinceLastCallback,
1895+
inflightBytes,
1896+
inflightRequests,
1897+
destinationSet.size(),
1898+
maxInflightBytes,
1899+
maxInflightRequests);
1900+
} finally {
1901+
this.lock.unlock();
1902+
}
18901903
}
18911904

18921905
/**
@@ -1896,11 +1909,15 @@ public Load getLoad() {
18961909
@AutoValue
18971910
public abstract static class Load {
18981911

1899-
// Consider the load on this worker to be overwhelmed when above some percentage of
1900-
// in-flight bytes or in-flight requests count.
1912+
// Consider the load on this worker to be overwhelmed when above some inflight latency or
1913+
// percentage of in-flight bytes or in-flight requests count.
1914+
private static Duration overwhelmedTimeSinceLastCallback = Duration.ofSeconds(3);
19011915
private static double overwhelmedInflightCount = 0.2;
19021916
private static double overwhelmedInflightBytes = 0.2;
19031917

1918+
// Time we have spent waiting for a response in the worker.
1919+
abstract Duration timeSinceLastCallback();
1920+
19041921
// Number of in-flight requests bytes in the worker.
19051922
abstract long inFlightRequestsBytes();
19061923

@@ -1917,12 +1934,14 @@ public abstract static class Load {
19171934
abstract long maxInflightCount();
19181935

19191936
static Load create(
1937+
Duration timeSinceLastCallback,
19201938
long inFlightRequestsBytes,
19211939
long inFlightRequestsCount,
19221940
long destinationCount,
19231941
long maxInflightBytes,
19241942
long maxInflightCount) {
19251943
return new AutoValue_ConnectionWorker_Load(
1944+
timeSinceLastCallback,
19261945
inFlightRequestsBytes,
19271946
inFlightRequestsCount,
19281947
destinationCount,
@@ -1934,20 +1953,29 @@ boolean isOverwhelmed() {
19341953
// Consider only in flight bytes and count for now, as by experiment those two are the most
19351954
// efficient and has great simplity.
19361955
return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount()
1937-
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes();
1956+
|| inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes()
1957+
|| timeSinceLastCallback().compareTo(overwhelmedTimeSinceLastCallback) > 0;
19381958
}
19391959

1940-
// Compares two different load. First compare in flight request bytes split by size 1024 bucket.
1960+
// Compares two different load. First compare the timeSinceLastCallback bucketed into 1 second
1961+
// intervals.
1962+
// Then compare in flight request bytes split by size 1024 bucket.
19411963
// Then compare the inflight requests count.
19421964
// Then compare destination count of the two connections.
19431965
public static final Comparator<Load> LOAD_COMPARATOR =
1944-
Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
1966+
Comparator.comparing((Load key) -> (int) (key.timeSinceLastCallback().toMillis() / 1000))
1967+
.thenComparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024))
19451968
.thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100))
19461969
.thenComparing(Load::destinationCount);
19471970

19481971
// Compares two different load without bucket, used in smaller scale unit testing.
1972+
// First compare the timeSinceLastCallback.
1973+
// Then compare in flight request bytes.
1974+
// Then compare the inflight requests count.
1975+
// Then compare destination count of the two connections.
19491976
public static final Comparator<Load> TEST_LOAD_COMPARATOR =
1950-
Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes())
1977+
Comparator.comparing(Load::timeSinceLastCallback)
1978+
.thenComparing((Load key) -> (int) key.inFlightRequestsBytes())
19511979
.thenComparing((Load key) -> (int) key.inFlightRequestsCount())
19521980
.thenComparing(Load::destinationCount);
19531981

@@ -1960,6 +1988,11 @@ public static void setOverwhelmedBytesThreshold(double newThreshold) {
19601988
public static void setOverwhelmedCountsThreshold(double newThreshold) {
19611989
overwhelmedInflightCount = newThreshold;
19621990
}
1991+
1992+
@VisibleForTesting
1993+
public static void setOverwhelmedTimeSinceLastCallbackThreshold(Duration newThreshold) {
1994+
overwhelmedTimeSinceLastCallback = newThreshold;
1995+
}
19631996
}
19641997

19651998
@VisibleForTesting

java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ void setUp() throws Exception {
8989
.build();
9090
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5);
9191
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6);
92+
ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3));
9293
}
9394

9495
@Test
@@ -555,6 +556,55 @@ private ProtoRows createProtoRows(String[] messages) {
555556
return rowsBuilder.build();
556557
}
557558

559+
@Test
560+
void testSingleTableConnections_overwhelmed_timeSinceLastCallback() throws Exception {
561+
// Set count/bytes thresholds to be very high so they don't trigger.
562+
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.9);
563+
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.9);
564+
// Set time threshold to 100ms.
565+
ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofMillis(100));
566+
567+
// We use a pool with max 8 connections.
568+
ConnectionWorkerPool.setOptions(
569+
Settings.builder()
570+
.setMinConnectionsPerRegion(1) // Start with 1 connection to make scaling obvious.
571+
.setMaxConnectionsPerRegion(8)
572+
.build());
573+
574+
// We set maxRequests to a large value (100) so it's not overwhelmed by count (threshold 90).
575+
ConnectionWorkerPool connectionWorkerPool =
576+
createConnectionWorkerPool(
577+
/* maxRequests= */ 100, /* maxBytes= */ 1000000, java.time.Duration.ofSeconds(5));
578+
579+
// Stuck requests for 500ms (larger than 100ms threshold).
580+
testBigQueryWrite.setResponseSleep(Duration.ofMillis(500));
581+
582+
// Send 1 request. It will go to Connection 1.
583+
testBigQueryWrite.addResponse(createAppendResponse(0));
584+
StreamWriter writer = getTestStreamWriter(TEST_STREAM_1);
585+
586+
ApiFuture<AppendRowsResponse> future1 =
587+
sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"0"}, 0);
588+
589+
// Wait 200ms. Request 1 is still in flight (needs 500ms).
590+
// Connection 1 timeSinceLastCallback should be ~200ms > 100ms.
591+
// So Connection 1 is now overwhelmed.
592+
Thread.sleep(200);
593+
594+
// Send Request 2. Since Connection 1 is overwhelmed, it should scale up and create Connection
595+
// 2.
596+
testBigQueryWrite.addResponse(createAppendResponse(1));
597+
ApiFuture<AppendRowsResponse> future2 =
598+
sendFooStringTestMessage(writer, connectionWorkerPool, new String[] {"1"}, 1);
599+
600+
// Wait for both to finish.
601+
future1.get();
602+
future2.get();
603+
604+
// Verify that we created 2 connections.
605+
assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(2);
606+
}
607+
558608
ConnectionWorkerPool createConnectionWorkerPool(
559609
long maxRequests, long maxBytes, java.time.Duration maxRetryDuration) {
560610
ConnectionWorkerPool.enableTestingLogic();

java-bigquerystorage/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ void setUp() throws Exception {
9494
testBigQueryWrite = new FakeBigQueryWrite();
9595
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
9696
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10));
97+
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.2);
98+
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.2);
99+
ConnectionWorker.Load.setOverwhelmedTimeSinceLastCallbackThreshold(Duration.ofSeconds(3));
97100
serviceHelper =
98101
new MockServiceHelper(
99102
UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite));
@@ -865,29 +868,116 @@ void testLoadCompare_compareLoad() {
865868
// In flight bytes bucket is split as per 1024 requests per bucket.
866869
// When in flight bytes is in lower bucket, even destination count is higher and request count
867870
// is higher, the load is still smaller.
868-
Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10);
869-
Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10);
871+
Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 1000, 2000, 100, 1000, 10);
872+
Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 2000, 1000, 10, 1000, 10);
870873
assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0);
871874

872875
// In flight bytes in the same bucke of request bytes will compare request count.
873-
Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10);
874-
Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10);
876+
Load load3 = ConnectionWorker.Load.create(Duration.ZERO, 1, 300, 10, 0, 10);
877+
Load load4 = ConnectionWorker.Load.create(Duration.ZERO, 10, 1, 10, 0, 10);
875878
assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0);
876879

877880
// In flight request and bytes in the same bucket will compare the destination count.
878-
Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10);
879-
Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10);
881+
Load load5 = ConnectionWorker.Load.create(Duration.ZERO, 200, 1, 10, 1000, 10);
882+
Load load6 = ConnectionWorker.Load.create(Duration.ZERO, 100, 10, 10, 1000, 10);
880883
assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue();
884+
885+
// timeSinceLastCallback has the highest priority.
886+
// load7 has higher timeSinceLastCallback (2s -> bucket 2) but lower other parameters.
887+
// load8 has lower timeSinceLastCallback (0s -> bucket 0) but higher other parameters.
888+
Load load7 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 10, 10);
889+
Load load8 = ConnectionWorker.Load.create(Duration.ZERO, 10000, 10000, 100, 10, 10);
890+
assertThat(Load.LOAD_COMPARATOR.compare(load7, load8)).isGreaterThan(0);
881891
}
882892

883893
@Test
884894
void testLoadIsOverWhelmed() {
885-
// Only in flight request is considered in current overwhelmed calculation.
886-
Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100);
895+
// In-flight requests, bytes, and timeSinceLastCallback are considered in overwhelmed
896+
// calculation.
897+
898+
// Overwhelmed by request count
899+
Load load1 = ConnectionWorker.Load.create(Duration.ZERO, 60, 10, 100, 90, 100);
887900
assertThat(load1.isOverwhelmed()).isTrue();
888901

889-
Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
890-
assertThat(load2.isOverwhelmed()).isFalse();
902+
// Not overwhelmed
903+
Load load2 = ConnectionWorker.Load.create(Duration.ZERO, 1, 1, 100, 100, 100);
904+
assertFalse(load2.isOverwhelmed());
905+
906+
// Under threshold (3s) for timeSinceLastCallback
907+
Load load3 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 100, 100);
908+
assertFalse(load3.isOverwhelmed());
909+
910+
// Over threshold (3s) for timeSinceLastCallback
911+
Load load4 = ConnectionWorker.Load.create(Duration.ofSeconds(4), 0, 0, 0, 100, 100);
912+
assertTrue(load4.isOverwhelmed());
913+
}
914+
915+
@Test
916+
void testGetLoad_timeSinceLastCallback() throws Exception {
917+
ProtoSchema schema1 = createProtoSchema("foo");
918+
StreamWriter sw1 =
919+
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
920+
try (ConnectionWorker connectionWorker =
921+
new ConnectionWorker(
922+
TEST_STREAM_1,
923+
null,
924+
createProtoSchema("foo"),
925+
10,
926+
100000,
927+
Duration.ofSeconds(100),
928+
FlowController.LimitExceededBehavior.Block,
929+
TEST_TRACE_ID,
930+
null,
931+
client.getSettings(),
932+
retrySettings,
933+
/* enableRequestProfiler= */ false,
934+
/* enableOpenTelemetry= */ false,
935+
/*isMultiplexing*/ false)) {
936+
937+
// Initially empty, should be zero.
938+
assertThat(connectionWorker.getLoad().timeSinceLastCallback()).isEqualTo(Duration.ZERO);
939+
940+
// Keep response in flight
941+
testBigQueryWrite.setResponseSleep(java.time.Duration.ofSeconds(5));
942+
943+
// Send a message
944+
ApiFuture<AppendRowsResponse> future =
945+
sendTestMessage(connectionWorker, sw1, createFooProtoRows(new String[] {"hello"}), 0);
946+
947+
// Wait a bit to ensure it is sent and in flight queue
948+
Thread.sleep(500);
949+
950+
Load load = connectionWorker.getLoad();
951+
assertThat(load.timeSinceLastCallback()).isGreaterThan(Duration.ZERO);
952+
assertThat(load.timeSinceLastCallback())
953+
.isLessThan(Duration.ofSeconds(2)); // Should be around 500ms
954+
}
955+
}
956+
957+
@Test
958+
void testLoadCompare_timeSinceLastCallback() {
959+
// Same bytes, same count, same destination, different timeSinceLastCallback
960+
// Bucketed by 1 second (1000ms).
961+
962+
// 100ms and 200ms are in the same bucket (0).
963+
Load load1 = ConnectionWorker.Load.create(Duration.ofMillis(100), 0, 0, 0, 0, 0);
964+
Load load2 = ConnectionWorker.Load.create(Duration.ofMillis(200), 0, 0, 0, 0, 0);
965+
assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isEqualTo(0);
966+
967+
// 100ms and 1200ms are in different buckets (0 vs 1).
968+
Load load3 = ConnectionWorker.Load.create(Duration.ofMillis(1200), 0, 0, 0, 0, 0);
969+
assertThat(Load.LOAD_COMPARATOR.compare(load1, load3)).isLessThan(0);
970+
assertThat(Load.LOAD_COMPARATOR.compare(load3, load1)).isGreaterThan(0);
971+
}
972+
973+
@Test
974+
void testTestLoadCompare_timeSinceLastCallback() {
975+
// TEST_LOAD_COMPARATOR compares timeSinceLastCallback unbucketed.
976+
// 1s and 2s should be different.
977+
Load load1 = ConnectionWorker.Load.create(Duration.ofSeconds(1), 0, 0, 0, 0, 0);
978+
Load load2 = ConnectionWorker.Load.create(Duration.ofSeconds(2), 0, 0, 0, 0, 0);
979+
assertThat(Load.TEST_LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0);
980+
assertThat(Load.TEST_LOAD_COMPARATOR.compare(load2, load1)).isGreaterThan(0);
891981
}
892982

893983
@Test

0 commit comments

Comments
 (0)