Skip to content

Commit 84d423f

Browse files
authored
added the job id for spanner IO (#35479)
* added the job id for spanner IO * use Strings.isNullOrEmpty
1 parent 2aa3351 commit 84d423f

2 files changed

Lines changed: 67 additions & 0 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.options.ValueProvider;
4444
import org.apache.beam.sdk.util.ReleaseInfo;
4545
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
46+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
4647
import org.joda.time.Duration;
4748
import org.slf4j.Logger;
4849
import org.slf4j.LoggerFactory;
@@ -229,6 +230,10 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
229230
builder.setCredentials(NoCredentials.getInstance());
230231
}
231232
String userAgentString = USER_AGENT_PREFIX + "/" + ReleaseInfo.getReleaseInfo().getVersion();
233+
SpannerIOMetadata spannerIOMetadata = SpannerIOMetadata.create();
234+
if (!Strings.isNullOrEmpty(spannerIOMetadata.getBeamJobId())) {
235+
userAgentString = userAgentString + "/" + spannerIOMetadata.getBeamJobId();
236+
}
232237
builder.setHeaderProvider(FixedHeaderProvider.create("user-agent", userAgentString));
233238
ValueProvider<String> databaseRole = spannerConfig.getDatabaseRole();
234239
if (databaseRole != null && databaseRole.get() != null && !databaseRole.get().isEmpty()) {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
22+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
23+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
25+
import org.checkerframework.checker.nullness.qual.Nullable;
26+
27+
/** Metadata class for SpannerIO. */
28+
final class SpannerIOMetadata {
29+
30+
private final @Nullable String beamJobId;
31+
32+
static final Supplier<SpannerIOMetadata> INSTANCE =
33+
Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5, TimeUnit.MINUTES);
34+
35+
private SpannerIOMetadata(@Nullable String beamJobId) {
36+
this.beamJobId = beamJobId;
37+
}
38+
39+
/**
40+
* Creates a SpannerIOMetadata. This will request metadata properly based on which runner is being
41+
* used.
42+
*/
43+
public static SpannerIOMetadata create() {
44+
return INSTANCE.get();
45+
}
46+
47+
private static SpannerIOMetadata refreshInstance() {
48+
String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
49+
if (Strings.isNullOrEmpty(dataflowJobId)) {
50+
return new SpannerIOMetadata(null);
51+
}
52+
53+
return new SpannerIOMetadata(dataflowJobId);
54+
}
55+
56+
/*
57+
* Returns the beam job id. Can be null if it is not running on Dataflow.
58+
*/
59+
public @Nullable String getBeamJobId() {
60+
return this.beamJobId;
61+
}
62+
}

0 commit comments

Comments
 (0)