Skip to content

Commit 281270c

Browse files
authored
Refactor StorageAPI BigQuery sink (#38264)
* Refactor StorageApi BigQuery sink to simplify cache management and management of pins. * fixes * fixes * fixes * typo
1 parent 4c8d852 commit 281270c

7 files changed

Lines changed: 676 additions & 626 deletions

File tree

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
25+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
26+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
27+
import org.checkerframework.checker.nullness.qual.NonNull;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
import org.joda.time.Duration;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* Encapsulates the cache of {@link AppendClientInfo} objects and the synchronization protocol
35+
* required to use them safely. The Guava cache object is thread-safe. However our protocol requires
36+
* that client pin the StreamAppendClient after looking up the cache, and we must ensure that the
37+
* cache is not accessed in between the lookup and the pin (any access of the cache could trigger
38+
* element expiration).
39+
*/
40+
class AppendClientCache<KeyT extends @NonNull Object> {
41+
private static final Logger LOG = LoggerFactory.getLogger(AppendClientCache.class);
42+
private final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
43+
44+
private final Cache<KeyT, AppendClientInfo> appendCache;
45+
46+
@SuppressWarnings({"FutureReturnValueIgnored"})
47+
AppendClientCache(Duration expireAfterAccess) {
48+
this.appendCache =
49+
CacheBuilder.newBuilder()
50+
.expireAfterAccess(expireAfterAccess.getMillis(), TimeUnit.MILLISECONDS)
51+
.removalListener(
52+
(RemovalNotification<KeyT, AppendClientInfo> removal) -> {
53+
LOG.info("Expiring append client for {}", removal.getKey());
54+
final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
55+
if (appendClientInfo != null) {
56+
// Remove the pin owned by the cache itself. Since the client has not been
57+
// marked as closed, we
58+
// can call unpin in this thread without worrying about blocking the thread.
59+
appendClientInfo.unpinAppendClient(null);
60+
// Close the client in another thread to avoid blocking the main thread.
61+
closeWriterExecutor.submit(appendClientInfo::close);
62+
}
63+
})
64+
.build();
65+
}
66+
67+
// The cache itself always own one pin on the object. This Callable is always used to ensure that
68+
// the cache
69+
// adds a pin before loading a value.
70+
private static Callable<AppendClientInfo> wrapWithPin(Callable<AppendClientInfo> loader) {
71+
return () -> {
72+
AppendClientInfo client = loader.call();
73+
client.pinAppendClient();
74+
return client;
75+
};
76+
}
77+
78+
/**
79+
* Atomically get an append client from the cache and add a pin. This pin is owned by the client,
80+
* which has the responsibility of removing it. If the client is not in the cache, loader will be
81+
* used to load the client; in this case an additional pin will be added owned by the cache,
82+
* removed when the item is evicted.
83+
*/
84+
public AppendClientInfo getAndPin(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
85+
synchronized (this) {
86+
AppendClientInfo info = appendCache.get(key, wrapWithPin(loader));
87+
info.pinAppendClient();
88+
return info;
89+
}
90+
}
91+
92+
/** "Refresh" an object by invalidating the old cache entry. */
93+
public AppendClientInfo refreshObjectAndPin(KeyT key, Callable<AppendClientInfo> loader)
94+
throws Exception {
95+
synchronized (this) {
96+
appendCache.invalidate(key);
97+
return getAndPin(key, loader);
98+
}
99+
}
100+
101+
public void invalidate(KeyT key, AppendClientInfo expectedClient) {
102+
// The default stream is cached across multiple different DoFns. If they all try
103+
// and
104+
// invalidate, then we can get races between threads invalidating and recreating
105+
// streams. For this reason,
106+
// we check to see that the cache still contains the object we created before
107+
// invalidating (in case another
108+
// thread has already invalidated and recreated the stream).
109+
synchronized (this) {
110+
AppendClientInfo cachedAppendClient = appendCache.getIfPresent(key);
111+
if (cachedAppendClient != null
112+
&& System.identityHashCode(cachedAppendClient)
113+
== System.identityHashCode(expectedClient)) {
114+
appendCache.invalidate(key);
115+
}
116+
}
117+
}
118+
119+
public void invalidate(KeyT key) {
120+
synchronized (this) {
121+
appendCache.invalidate(key);
122+
}
123+
}
124+
125+
public void tickle(KeyT key) {
126+
synchronized (this) {
127+
appendCache.getIfPresent(key);
128+
}
129+
}
130+
131+
public void clear() {
132+
synchronized (this) {
133+
appendCache.invalidateAll();
134+
}
135+
}
136+
137+
public void unpinAsync(AppendClientInfo appendClientInfo) {
138+
appendClientInfo.unpinAppendClient(closeWriterExecutor);
139+
}
140+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@
2727
import com.google.protobuf.Descriptors;
2828
import com.google.protobuf.DynamicMessage;
2929
import com.google.protobuf.Message;
30-
import java.util.function.Consumer;
30+
import java.util.concurrent.ExecutorService;
3131
import java.util.function.Predicate;
3232
import java.util.function.Supplier;
3333
import javax.annotation.Nullable;
34+
import org.apache.beam.sdk.function.ThrowingConsumer;
35+
import org.apache.beam.sdk.function.ThrowingRunnable;
3436
import org.apache.beam.sdk.metrics.Counter;
3537
import org.apache.beam.sdk.metrics.Metrics;
3638
import org.apache.beam.sdk.util.Preconditions;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3741

3842
/**
3943
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -42,14 +46,16 @@
4246
*/
4347
@AutoValue
4448
abstract class AppendClientInfo {
49+
private static final Logger LOG = LoggerFactory.getLogger(AppendClientInfo.class);
50+
4551
private final Counter activeStreamAppendClients =
4652
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");
4753

4854
abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();
4955

5056
abstract TableSchema getTableSchema();
5157

52-
abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();
58+
abstract ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> getCloseAppendClient();
5359

5460
abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();
5561

@@ -65,7 +71,8 @@ abstract static class Builder {
6571

6672
abstract Builder setTableSchema(TableSchema value);
6773

68-
abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);
74+
abstract Builder setCloseAppendClient(
75+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> value);
6976

7077
abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);
7178

@@ -83,7 +90,7 @@ abstract static class Builder {
8390
static AppendClientInfo of(
8491
TableSchema tableSchema,
8592
DescriptorProtos.DescriptorProto descriptor,
86-
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
93+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient)
8794
throws Exception {
8895
return new AutoValue_AppendClientInfo.Builder()
8996
.setTableSchema(tableSchema)
@@ -97,7 +104,7 @@ static AppendClientInfo of(
97104

98105
static AppendClientInfo of(
99106
TableSchema tableSchema,
100-
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
107+
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient,
101108
boolean includeCdcColumns)
102109
throws Exception {
103110
return of(
@@ -134,7 +141,12 @@ public AppendClientInfo withAppendClient(
134141
public void close() {
135142
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
136143
if (client != null) {
137-
getCloseAppendClient().accept(client);
144+
try {
145+
getCloseAppendClient().accept(client);
146+
} catch (Exception e) {
147+
// We ignore errors when closing clients.
148+
LOG.warn("Caught exception whilw trying to close append client. Ignoring", e);
149+
}
138150
activeStreamAppendClients.dec();
139151
}
140152
}
@@ -199,4 +211,32 @@ public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField
199211
throw new RuntimeException(e);
200212
}
201213
}
214+
215+
public void pinAppendClient() {
216+
BigQueryServices.StreamAppendClient client =
217+
Preconditions.checkStateNotNull(getStreamAppendClient());
218+
client.pin();
219+
}
220+
221+
public void unpinAppendClient(@Nullable ExecutorService executor) {
222+
BigQueryServices.StreamAppendClient client =
223+
Preconditions.checkStateNotNull(getStreamAppendClient());
224+
if (executor != null) {
225+
runAsyncIgnoreFailure(executor, client::unpin);
226+
} else {
227+
client.unpin();
228+
}
229+
}
230+
231+
@SuppressWarnings({"FutureReturnValueIgnored"})
232+
private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
233+
executor.submit(
234+
() -> {
235+
try {
236+
task.run();
237+
} catch (Throwable e) {
238+
LOG.info("Exception happened while executing async task. Ignoring: ", e);
239+
}
240+
});
241+
}
202242
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ default long getInflightWaitSeconds() {
270270
/**
271271
* Unpin this object. If the object has been closed, this will release any underlying resources.
272272
*/
273-
void unpin() throws Exception;
273+
void unpin();
274274
}
275275

276276
/**

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1617,7 +1617,7 @@ public void pin() {
16171617
}
16181618

16191619
@Override
1620-
public void unpin() throws Exception {
1620+
public void unpin() {
16211621
boolean closeWriter;
16221622
synchronized (this) {
16231623
Preconditions.checkState(pins > 0, "Tried to unpin when pins==0");

0 commit comments

Comments
 (0)