Skip to content

Commit f61e264

Browse files
committed
Refactor StorageApi BigQuery sink to simplify cache management and management of pins.
1 parent b56d94f commit f61e264

15 files changed

Lines changed: 816 additions & 723 deletions
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
25+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
26+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
27+
import org.checkerframework.checker.nullness.qual.NonNull;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
import org.joda.time.Duration;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* Encapsulates the cache of {@link AppendClientInfo} objects and the synchronization protocol
35+
* required to use them safely. The Guava cache object is thread-safe. However our protocol requires
36+
* that client pin the StreamAppendClient after looking up the cache, and we must ensure that the
37+
* cache is not accessed in between the lookup and the pin (any access of the cache could trigger
38+
* element expiration).
39+
*/
40+
class AppendClientCache<KeyT extends @NonNull Object> {
41+
private static final Logger LOG = LoggerFactory.getLogger(AppendClientCache.class);
42+
private final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
43+
44+
private final Cache<KeyT, AppendClientInfo> appendCache;
45+
46+
@SuppressWarnings({"FutureReturnValueIgnored"})
47+
AppendClientCache(Duration expireAfterAccess) {
48+
this.appendCache =
49+
CacheBuilder.newBuilder()
50+
.expireAfterAccess(expireAfterAccess.getMillis(), TimeUnit.MILLISECONDS)
51+
.removalListener(
52+
(RemovalNotification<KeyT, AppendClientInfo> removal) -> {
53+
LOG.info("Expiring append client for {}", removal.getKey());
54+
final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
55+
if (appendClientInfo != null) {
56+
// Remove the pin owned by the cache itself. Since the client has not been
57+
// marked as closed, we
58+
// can call unpin in this thread without worrying about blocking the thread.
59+
appendClientInfo.unpinAppendClient(null);
60+
// Close the client in another thread to avoid blocking the main thread.
61+
closeWriterExecutor.submit(appendClientInfo::close);
62+
}
63+
})
64+
.build();
65+
}
66+
67+
// The cache itself always own one pin on the object. This Callable is always used to ensure that
68+
// the cache
69+
// adds a pin before loading a value.
70+
private static Callable<AppendClientInfo> wrapWithPin(Callable<AppendClientInfo> loader) {
71+
return () -> {
72+
AppendClientInfo client = loader.call();
73+
client.pinAppendClient();
74+
return client;
75+
};
76+
}
77+
78+
/**
79+
* Atomically get an append client from the cache and add a pin. This pin is owned by the client,
80+
* which has the responsibility of removing it. If the client is not in the cache, loader will be
81+
* used to load the client; in this case an additional pin will be added owned by the cache,
82+
* removed when the item is evicted.
83+
*/
84+
public AppendClientInfo getAndPin(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
85+
synchronized (this) {
86+
AppendClientInfo info = appendCache.get(key, wrapWithPin(loader));
87+
info.pinAppendClient();
88+
return info;
89+
}
90+
}
91+
92+
public AppendClientInfo get(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
93+
return appendCache.get(key, wrapWithPin(loader));
94+
}
95+
96+
public AppendClientInfo putAndPin(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
97+
synchronized (this) {
98+
AppendClientInfo info = wrapWithPin(loader).call();
99+
appendCache.put(key, info);
100+
info.pinAppendClient();
101+
return info;
102+
}
103+
}
104+
105+
public AppendClientInfo put(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
106+
AppendClientInfo info = wrapWithPin(loader).call();
107+
appendCache.put(key, info);
108+
return info;
109+
}
110+
111+
public void invalidate(KeyT key, AppendClientInfo expectedClient) {
112+
// The default stream is cached across multiple different DoFns. If they all try
113+
// and
114+
// invalidate, then we can get races between threads invalidating and recreating
115+
// streams. For this reason,
116+
// we check to see that the cache still contains the object we created before
117+
// invalidating (in case another
118+
// thread has already invalidated and recreated the stream).
119+
synchronized (this) {
120+
AppendClientInfo cachedAppendClient = appendCache.getIfPresent(key);
121+
if (cachedAppendClient != null
122+
&& System.identityHashCode(cachedAppendClient)
123+
== System.identityHashCode(expectedClient)) {
124+
appendCache.invalidate(key);
125+
}
126+
}
127+
}
128+
129+
public void invalidate(KeyT key) {
130+
synchronized (this) {
131+
appendCache.invalidate(key);
132+
}
133+
}
134+
135+
public void tickle(KeyT key) {
136+
appendCache.getIfPresent(key);
137+
}
138+
139+
public void clear() {
140+
synchronized (this) {
141+
appendCache.invalidateAll();
142+
}
143+
}
144+
145+
public void unpinAsync(AppendClientInfo appendClientInfo) {
146+
appendClientInfo.unpinAppendClient(closeWriterExecutor);
147+
}
148+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,19 @@
2727
import com.google.protobuf.Descriptors;
2828
import com.google.protobuf.DynamicMessage;
2929
import com.google.protobuf.Message;
30-
import java.util.function.Consumer;
30+
import java.util.Arrays;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.function.Predicate;
3233
import java.util.function.Supplier;
34+
import java.util.stream.Collectors;
3335
import javax.annotation.Nullable;
36+
import org.apache.beam.sdk.function.ThrowingConsumer;
37+
import org.apache.beam.sdk.function.ThrowingRunnable;
3438
import org.apache.beam.sdk.metrics.Counter;
3539
import org.apache.beam.sdk.metrics.Metrics;
3640
import org.apache.beam.sdk.util.Preconditions;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
3743

3844
/**
3945
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -42,14 +48,16 @@
4248
*/
4349
@AutoValue
4450
abstract class AppendClientInfo {
51+
private static final Logger LOG = LoggerFactory.getLogger(AppendClientInfo.class);
52+
4553
private final Counter activeStreamAppendClients =
4654
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");
4755

4856
abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();
4957

5058
abstract TableSchema getTableSchema();
5159

52-
abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();
60+
abstract ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> getCloseAppendClient();
5361

5462
abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();
5563

@@ -65,7 +73,8 @@ abstract static class Builder {
6573

6674
abstract Builder setTableSchema(TableSchema value);
6775

68-
abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);
76+
abstract Builder setCloseAppendClient(
77+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> value);
6978

7079
abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);
7180

@@ -83,7 +92,7 @@ abstract static class Builder {
8392
static AppendClientInfo of(
8493
TableSchema tableSchema,
8594
DescriptorProtos.DescriptorProto descriptor,
86-
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
95+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient)
8796
throws Exception {
8897
return new AutoValue_AppendClientInfo.Builder()
8998
.setTableSchema(tableSchema)
@@ -97,7 +106,7 @@ static AppendClientInfo of(
97106

98107
static AppendClientInfo of(
99108
TableSchema tableSchema,
100-
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
109+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient,
101110
boolean includeCdcColumns)
102111
throws Exception {
103112
return of(
@@ -134,7 +143,18 @@ public AppendClientInfo withAppendClient(
134143
public void close() {
135144
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
136145
if (client != null) {
137-
getCloseAppendClient().accept(client);
146+
try {
147+
getCloseAppendClient().accept(client);
148+
} catch (Exception e) {
149+
// We ignore errors when closing clients.
150+
String msg =
151+
e
152+
+ "\n"
153+
+ Arrays.stream(e.getStackTrace())
154+
.map(StackTraceElement::toString)
155+
.collect(Collectors.joining("\n"));
156+
LOG.warn("Caught exception whilw trying to close append client. Ignoring {}", msg);
157+
}
138158
activeStreamAppendClients.dec();
139159
}
140160
}
@@ -199,4 +219,38 @@ public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField
199219
throw new RuntimeException(e);
200220
}
201221
}
222+
223+
public void pinAppendClient() {
224+
BigQueryServices.StreamAppendClient client =
225+
Preconditions.checkStateNotNull(getStreamAppendClient());
226+
client.pin();
227+
}
228+
229+
public void unpinAppendClient(@Nullable ExecutorService executor) {
230+
BigQueryServices.StreamAppendClient client =
231+
Preconditions.checkStateNotNull(getStreamAppendClient());
232+
if (executor != null) {
233+
runAsyncIgnoreFailure(executor, client::unpin);
234+
} else {
235+
client.unpin();
236+
}
237+
}
238+
239+
@SuppressWarnings({"FutureReturnValueIgnored"})
240+
private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
241+
executor.submit(
242+
() -> {
243+
try {
244+
task.run();
245+
} catch (Throwable e) {
246+
String msg =
247+
e.toString()
248+
+ "\n"
249+
+ Arrays.stream(e.getStackTrace())
250+
.map(StackTraceElement::toString)
251+
.collect(Collectors.joining("\n"));
252+
System.err.println("Exception happened while executing async task. Ignoring: " + msg);
253+
}
254+
});
255+
}
202256
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4257,7 +4257,6 @@ private <DestinationT> WriteResult continueExpandTyped(
42574257
!getIgnoreUnknownValues(),
42584258
"ignoreUnknownValues not supported when using writeProtos."
42594259
+ " Try setting withDirectWriteProtos(false)");
4260-
42614260
storageApiDynamicDestinations =
42624261
(StorageApiDynamicDestinations<T, DestinationT>)
42634262
new StorageApiDynamicDestinationsProto(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ default long getInflightWaitSeconds() {
270270
/**
271271
* Unpin this object. If the object has been closed, this will release any underlying resources.
272272
*/
273-
void unpin() throws Exception;
273+
void unpin();
274274
}
275275

276276
/**

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1617,7 +1617,7 @@ public void pin() {
16171617
}
16181618

16191619
@Override
1620-
public void unpin() throws Exception {
1620+
public void unpin() {
16211621
boolean closeWriter;
16221622
synchronized (this) {
16231623
Preconditions.checkState(pins > 0, "Tried to unpin when pins==0");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchTableSchemaDoFn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void processElement(
152152
} while (BackOffUtils.next(Sleeper.DEFAULT, backoff));
153153
if (schemaOutOfDate) {
154154
// This could be due to an out-of-date schema.
155-
LOG.info("Schema out of date. Refreshing.");
155+
LOG.info("Schema out of date due to failure {}. Refreshing.", lastException);
156156
messageConverter.updateSchemaFromTable();
157157
} else {
158158
// We ran out of retries.

0 commit comments

Comments
 (0)