5252import java .util .concurrent .Executors ;
5353import java .util .concurrent .TimeUnit ;
5454import java .util .concurrent .atomic .AtomicLong ;
55+ import java .util .concurrent .atomic .AtomicReference ;
5556import java .util .concurrent .locks .Condition ;
5657import java .util .concurrent .locks .Lock ;
5758import java .util .concurrent .locks .ReentrantLock ;
@@ -140,13 +141,16 @@ class ConnectionWorker implements AutoCloseable {
140141 * Tracks current inflight requests in the stream.
141142 */
142143 @ GuardedBy ("lock" )
143- private long inflightRequests = 0 ;
144+ private final AtomicLong inflightRequests = new AtomicLong ( 0 ) ;
144145
145146 /*
146147 * Tracks current inflight bytes in the stream.
147148 */
148149 @ GuardedBy ("lock" )
149- private long inflightBytes = 0 ;
150+ private final AtomicLong inflightBytes = new AtomicLong (0 );
151+
152+ private final TrackRequestQueueEarliestSendTime trackRequestQueueEarliestSendTime =
153+ new TrackRequestQueueEarliestSendTime ();
150154
151155 /*
152156 * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
@@ -395,7 +399,7 @@ private void gatherHealthCheckMetrics(HealthCheckFields healthCheckFields) {
395399 healthCheckFields .queuedRequestCountMax = windowedQueuedRequestsMax ;
396400 healthCheckFields .queuedRetryCountMax = windowedQueuedRetriesMax ;
397401 healthCheckFields .msecLongestResponseWaitTime = windowedMilliResponseWaitTimeMax ;
398- healthCheckFields .inflightBytes = inflightBytes ;
402+ healthCheckFields .inflightBytes = inflightBytes . get () ;
399403 healthCheckFields .requestsSentCount = windowedRequestsSent ;
400404 healthCheckFields .responseCount = windowedResponsesAcked ;
401405 if (HEALTH_CHECK_INTERVAL .toMillis () > 0 ) {
@@ -779,8 +783,8 @@ private void addMessageToFrontOfWaitingQueue(AppendRequestAndResponse requestWra
779783 @ GuardedBy ("lock" )
780784 private void addMessageToWaitingQueue (
781785 AppendRequestAndResponse requestWrapper , boolean addToFront ) {
782- ++ this .inflightRequests ;
783- this .inflightBytes += requestWrapper .messageSize ;
786+ this .inflightRequests . incrementAndGet () ;
787+ this .inflightBytes . addAndGet ( requestWrapper .messageSize ) ;
784788 hasMessageInWaitingQueue .signal ();
785789 requestProfilerHook .startOperation (
786790 RequestProfiler .OperationName .WAIT_QUEUE , requestWrapper .requestUniqueId );
@@ -896,11 +900,11 @@ private ApiFuture<AppendRowsResponse> appendInternal(
896900 }
897901 // Check if queue is going to be full before adding the request.
898902 if (this .limitExceededBehavior == FlowController .LimitExceededBehavior .ThrowException ) {
899- if (this .inflightRequests + 1 >= this .maxInflightRequests ) {
903+ if (this .inflightRequests . get () + 1 >= this .maxInflightRequests ) {
900904 throw new Exceptions .InflightRequestsLimitExceededException (
901905 writerId , this .maxInflightRequests );
902906 }
903- if (this .inflightBytes + requestWrapper .messageSize >= this .maxInflightBytes ) {
907+ if (this .inflightBytes . get () + requestWrapper .messageSize >= this .maxInflightBytes ) {
904908 throw new Exceptions .InflightBytesLimitExceededException (writerId , this .maxInflightBytes );
905909 }
906910 }
@@ -926,8 +930,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
926930 return requestWrapper .appendResult ;
927931 }
928932 requestProfilerHook .startOperation (RequestProfiler .OperationName .WAIT_QUEUE , requestUniqueId );
929- ++ this .inflightRequests ;
930- this .inflightBytes += requestWrapper .messageSize ;
933+ this .inflightRequests . incrementAndGet () ;
934+ this .inflightBytes . addAndGet ( requestWrapper .messageSize ) ;
931935 requestWrapper .placedInWaitingQueueTime = Instant .now ();
932936 waitingRequestQueue .addLast (requestWrapper );
933937 healthCheckMetrics .updateWindowedQueuedRequestsMax (
@@ -938,9 +942,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
938942 try {
939943 maybeWaitForInflightQuota ();
940944 } catch (StatusRuntimeException ex ) {
941- -- this .inflightRequests ;
945+ this .inflightRequests . decrementAndGet () ;
942946 waitingRequestQueue .pollLast ();
943- this .inflightBytes -= requestWrapper .messageSize ;
947+ this .inflightBytes . addAndGet (- requestWrapper .messageSize ) ;
944948 throw ex ;
945949 }
946950 requestProfilerHook .endOperation (
@@ -954,8 +958,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
954958 @ GuardedBy ("lock" )
955959 private void maybeWaitForInflightQuota () {
956960 long start_time = System .currentTimeMillis ();
957- while (this .inflightRequests >= this .maxInflightRequests
958- || this .inflightBytes >= this .maxInflightBytes ) {
961+ while (this .inflightRequests . get () >= this .maxInflightRequests
962+ || this .inflightBytes . get () >= this .maxInflightBytes ) {
959963 try {
960964 inflightReduced .await (100 , TimeUnit .MILLISECONDS );
961965 } catch (InterruptedException e ) {
@@ -998,6 +1002,11 @@ void setTestOnlyRunTimeExceptionInAppendLoop(
9981002 this .testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop ;
9991003 }
10001004
1005+ @ VisibleForTesting
1006+ Instant getEarliestSendTime () {
1007+ return trackRequestQueueEarliestSendTime .getEarliestSendTime ();
1008+ }
1009+
10011010 @ VisibleForTesting ()
10021011 HealthCheckMetrics .HealthCheckFields gatherTestOnlyHealthCheckMetrics () {
10031012 this .lock .lock ();
@@ -1229,7 +1238,9 @@ private void appendLoop() {
12291238 firstRequestForTableOrSchemaSwitch = true ;
12301239 }
12311240 while (!localQueue .isEmpty ()) {
1232- localQueue .peekFirst ().setRequestSendQueueTime ();
1241+ AppendRequestAndResponse head = localQueue .peekFirst ();
1242+ head .setRequestSendQueueTime ();
1243+ trackRequestQueueEarliestSendTime .captureEarliest (head .requestSendTimeStamp );
12331244 AppendRequestAndResponse wrapper = localQueue .pollFirst ();
12341245 AppendRowsRequest originalRequest = wrapper .message ;
12351246 String requestUniqueId = wrapper .requestUniqueId ;
@@ -1642,6 +1653,9 @@ private void requestCallback(AppendRowsResponse response) {
16421653 if (response .hasError ()) {
16431654 if (retryOnRetryableError (Code .values ()[response .getError ().getCode ()], requestWrapper )) {
16441655 log .info ("Attempting to retry on error: " + response .getError ().toString ());
1656+ // Note that if we are retrying a request it is still in the system so we don't refresh the
1657+ // earliest send time. That way we can keep track of the earliest send time based on the
1658+ // first time the request was sent, which gives us a better idea of load on this worker.
16451659 return ;
16461660 }
16471661 }
@@ -1653,6 +1667,10 @@ private void requestCallback(AppendRowsResponse response) {
16531667 this .lock .unlock ();
16541668 }
16551669 }
1670+ // Since we have processed a response and have now removed that request from the system, go
1671+ // ahead and refresh the earliest send time, based on the remaining requests that are
1672+ // outstanding.
1673+ trackRequestQueueEarliestSendTime .discardAndRefresh ();
16561674
16571675 // We need a separate thread pool to unblock the next request callback.
16581676 // Otherwise user may call append inside request callback, which may be blocked on waiting
@@ -1788,8 +1806,8 @@ private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) {
17881806 AppendRequestAndResponse requestWrapper =
17891807 pollLast ? inflightRequestQueue .pollLast () : inflightRequestQueue .poll ();
17901808 requestWrapper .requestSendTimeStamp = null ;
1791- -- this .inflightRequests ;
1792- this .inflightBytes -= requestWrapper .messageSize ;
1809+ this .inflightRequests . decrementAndGet () ;
1810+ this .inflightBytes . addAndGet (- requestWrapper .messageSize ) ;
17931811 this .inflightReduced .signal ();
17941812 return requestWrapper ;
17951813 }
@@ -1881,9 +1899,15 @@ void setRequestSendQueueTime() {
18811899
18821900 /** Returns the current workload of this worker. */
18831901 public Load getLoad () {
1902+ Duration timeSinceLastCallback = Duration .ZERO ;
1903+ Instant earliestSendTime = trackRequestQueueEarliestSendTime .getEarliestSendTime ();
1904+ if (earliestSendTime != null ) {
1905+ timeSinceLastCallback = Duration .between (earliestSendTime , Instant .now ());
1906+ }
18841907 return Load .create (
1885- inflightBytes ,
1886- inflightRequests ,
1908+ timeSinceLastCallback ,
1909+ inflightBytes .get (),
1910+ inflightRequests .get (),
18871911 destinationSet .size (),
18881912 maxInflightBytes ,
18891913 maxInflightRequests );
@@ -1896,11 +1920,15 @@ public Load getLoad() {
18961920 @ AutoValue
18971921 public abstract static class Load {
18981922
1899- // Consider the load on this worker to be overwhelmed when above some percentage of
1900- // in-flight bytes or in-flight requests count.
1923+ // Consider the load on this worker to be overwhelmed when above some inflight latency or
1924+ // percentage of in-flight bytes or in-flight requests count.
1925+ private static Duration overwhelmedTimeSinceLastCallback = Duration .ofSeconds (3 );
19011926 private static double overwhelmedInflightCount = 0.2 ;
19021927 private static double overwhelmedInflightBytes = 0.2 ;
19031928
1929+ // Time we have spent waiting for a response in the worker.
1930+ abstract Duration timeSinceLastCallback ();
1931+
19041932 // Number of in-flight requests bytes in the worker.
19051933 abstract long inFlightRequestsBytes ();
19061934
@@ -1917,12 +1945,14 @@ public abstract static class Load {
19171945 abstract long maxInflightCount ();
19181946
19191947 static Load create (
1948+ Duration timeSinceLastCallback ,
19201949 long inFlightRequestsBytes ,
19211950 long inFlightRequestsCount ,
19221951 long destinationCount ,
19231952 long maxInflightBytes ,
19241953 long maxInflightCount ) {
19251954 return new AutoValue_ConnectionWorker_Load (
1955+ timeSinceLastCallback ,
19261956 inFlightRequestsBytes ,
19271957 inFlightRequestsCount ,
19281958 destinationCount ,
@@ -1934,20 +1964,29 @@ boolean isOverwhelmed() {
19341964 // Consider only in flight bytes and count for now, as by experiment those two are the most
19351965 // efficient and has great simplity.
19361966 return inFlightRequestsCount () > overwhelmedInflightCount * maxInflightCount ()
1937- || inFlightRequestsBytes () > overwhelmedInflightBytes * maxInflightBytes ();
1967+ || inFlightRequestsBytes () > overwhelmedInflightBytes * maxInflightBytes ()
1968+ || timeSinceLastCallback ().compareTo (overwhelmedTimeSinceLastCallback ) > 0 ;
19381969 }
19391970
1940- // Compares two different load. First compare in flight request bytes split by size 1024 bucket.
1971+ // Compares two different load. First compare the timeSinceLastCallback bucketed into 1 second
1972+ // intervals.
1973+ // Then compare in flight request bytes split by size 1024 bucket.
19411974 // Then compare the inflight requests count.
19421975 // Then compare destination count of the two connections.
19431976 public static final Comparator <Load > LOAD_COMPARATOR =
1944- Comparator .comparing ((Load key ) -> (int ) (key .inFlightRequestsBytes () / 1024 ))
1977+ Comparator .comparing ((Load key ) -> (int ) key .timeSinceLastCallback ().getSeconds ())
1978+ .thenComparing ((Load key ) -> (int ) (key .inFlightRequestsBytes () / 1024 ))
19451979 .thenComparing ((Load key ) -> (int ) (key .inFlightRequestsCount () / 100 ))
19461980 .thenComparing (Load ::destinationCount );
19471981
19481982 // Compares two different load without bucket, used in smaller scale unit testing.
1983+ // First compare the timeSinceLastCallback.
1984+ // Then compare in flight request bytes.
1985+ // Then compare the inflight requests count.
1986+ // Then compare destination count of the two connections.
19491987 public static final Comparator <Load > TEST_LOAD_COMPARATOR =
1950- Comparator .comparing ((Load key ) -> (int ) key .inFlightRequestsBytes ())
1988+ Comparator .comparing (Load ::timeSinceLastCallback )
1989+ .thenComparing ((Load key ) -> (int ) key .inFlightRequestsBytes ())
19511990 .thenComparing ((Load key ) -> (int ) key .inFlightRequestsCount ())
19521991 .thenComparing (Load ::destinationCount );
19531992
@@ -1960,6 +1999,11 @@ public static void setOverwhelmedBytesThreshold(double newThreshold) {
19601999 public static void setOverwhelmedCountsThreshold (double newThreshold ) {
19612000 overwhelmedInflightCount = newThreshold ;
19622001 }
2002+
2003+ @ VisibleForTesting
2004+ public static void setOverwhelmedTimeSinceLastCallbackThreshold (Duration newThreshold ) {
2005+ overwhelmedTimeSinceLastCallback = newThreshold ;
2006+ }
19632007 }
19642008
19652009 @ VisibleForTesting
@@ -1985,4 +2029,36 @@ static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedS
19852029 return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp (updateTimeStamp , updatedSchema );
19862030 }
19872031 }
2032+
2033+ class TrackRequestQueueEarliestSendTime {
2034+ private final AtomicReference <Instant > earliestSendTime = new AtomicReference <>(null );
2035+
2036+ public void captureEarliest (Instant sendTime ) {
2037+ // This method records the given sendTime only if earliestSendTime is currently NULL.
2038+ if (sendTime == null ) {
2039+ return ;
2040+ }
2041+ earliestSendTime .compareAndSet (null , sendTime );
2042+ }
2043+
2044+ public void discardAndRefresh () {
2045+ Instant newEarliestSendTime = null ;
2046+ lock .lock ();
2047+ try {
2048+ if (!inflightRequestQueue .isEmpty ()) {
2049+ AppendRequestAndResponse head = inflightRequestQueue .peekFirst ();
2050+ if (head != null ) {
2051+ newEarliestSendTime = head .requestSendTimeStamp ;
2052+ }
2053+ }
2054+ } finally {
2055+ lock .unlock ();
2056+ }
2057+ earliestSendTime .set (newEarliestSendTime );
2058+ }
2059+
2060+ public Instant getEarliestSendTime () {
2061+ return earliestSendTime .get ();
2062+ }
2063+ }
19882064}
0 commit comments