Skip to content

Commit 024da4e

Browse files
committed
feat(bigtable): add column-mapping value transformations
Add a generic columnTransforms parameter for mapping column_family:column pairs to value transformers. Proto decode and column transforms share a unified TRANSFORMED_VALUE output path in BigQuery. Supported transforms: - BIG_ENDIAN_UINT64_TIMESTAMP_MS: 8-byte big-endian int64 → timestamp Architecture: ValueTransformer interface + ValueTransformerRegistry with two-level family→column map for O(1) lookup without per-element string concatenation. Transform priority: proto decode first, then columnTransforms. Includes proto decode parameters, options, pipeline wiring, BigQueryUtils formatter integration, and tests for both features.
1 parent dc90c38 commit 024da4e

8 files changed

Lines changed: 627 additions & 5 deletions

File tree

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/BigtableChangeStreamToBigQueryOptions.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,85 @@ public interface BigtableChangeStreamToBigQueryOptions
149149
String getDlqDirectory();
150150

151151
void setDlqDirectory(String value);
152+
153+
@TemplateParameter.GcsReadFile(
154+
order = 11,
155+
optional = true,
156+
description = "Cloud Storage path to the proto schema file",
157+
helpText =
158+
"The Cloud Storage location of the self-contained proto schema file. "
159+
+ "For example, gs://path/to/my/file.pb. This file can be generated with the "
160+
+ "--descriptor_set_out flag of the protoc command. The --include_imports flag "
161+
+ "guarantees that the file is self-contained. When set along with "
162+
+ "fullProtoMessageName, the pipeline decodes matching cell values as protobuf "
163+
+ "messages and writes them as JSON to BigQuery.")
164+
@Default.String("")
165+
String getProtoSchemaPath();
166+
167+
void setProtoSchemaPath(String value);
168+
169+
@TemplateParameter.Text(
170+
order = 12,
171+
optional = true,
172+
regexes = {"^.+([a-zA-Z][a-zA-Z0-9_]+\\.?)+[a-zA-Z0-9_]$"},
173+
description = "Full proto message name",
174+
helpText =
175+
"The full proto message name. For example, package.name.MessageName, "
176+
+ "where package.name is the value provided for the package statement "
177+
+ "and not the java_package statement.")
178+
@Default.String("")
179+
String getFullProtoMessageName();
180+
181+
void setFullProtoMessageName(String value);
182+
183+
@TemplateParameter.Text(
184+
order = 13,
185+
optional = true,
186+
description = "Column family containing proto values",
187+
helpText =
188+
"The Bigtable column family containing protobuf-encoded values to decode. "
189+
+ "Required when protoSchemaPath is set.")
190+
@Default.String("")
191+
String getProtoColumnFamily();
192+
193+
void setProtoColumnFamily(String value);
194+
195+
@TemplateParameter.Text(
196+
order = 14,
197+
optional = true,
198+
description = "Column qualifier containing proto values",
199+
helpText =
200+
"The Bigtable column qualifier containing protobuf-encoded values to decode. "
201+
+ "Required when protoSchemaPath is set.")
202+
@Default.String("")
203+
String getProtoColumn();
204+
205+
void setProtoColumn(String value);
206+
207+
@TemplateParameter.Boolean(
208+
order = 15,
209+
optional = true,
210+
description = "Preserve proto field names in JSON output",
211+
helpText =
212+
"When set to true, preserves original proto field names (snake_case) in the "
213+
+ "JSON output. When set to false, uses lowerCamelCase. Defaults to false.")
214+
@Default.Boolean(false)
215+
Boolean getPreserveProtoFieldNames();
216+
217+
void setPreserveProtoFieldNames(Boolean value);
218+
219+
@TemplateParameter.Text(
220+
order = 16,
221+
optional = true,
222+
description = "Column value transforms",
223+
helpText =
224+
"A comma-separated list of column value transformations in the format "
225+
+ "column_family:column:TRANSFORM_TYPE. Matched cell values are transformed "
226+
+ "before writing to BigQuery. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS "
227+
+ "(8-byte big-endian unsigned 64-bit integer as Unix epoch milliseconds, "
228+
+ "converted to a timestamp string).")
229+
@Default.String("")
230+
String getColumnTransforms();
231+
232+
void setColumnTransforms(String value);
152233
}

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/BigtableChangeStreamsToBigQuery.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
3737
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
3838
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
39+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
40+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
3941
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
4042
import com.google.cloud.teleport.v2.utils.BigtableSource;
4143
import java.util.ArrayList;
@@ -182,6 +184,38 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
182184

183185
BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);
184186

187+
ProtoDecoder protoDecoder = null;
188+
boolean hasProtoSchema = !StringUtils.isBlank(options.getProtoSchemaPath());
189+
boolean hasProtoMessage = !StringUtils.isBlank(options.getFullProtoMessageName());
190+
boolean hasProtoColumnFamily = !StringUtils.isBlank(options.getProtoColumnFamily());
191+
boolean hasProtoColumn = !StringUtils.isBlank(options.getProtoColumn());
192+
193+
if (hasProtoSchema || hasProtoMessage || hasProtoColumnFamily || hasProtoColumn) {
194+
if (!hasProtoSchema || !hasProtoMessage || !hasProtoColumnFamily || !hasProtoColumn) {
195+
throw new IllegalArgumentException(
196+
"When using protobuf decoding, all of protoSchemaPath, fullProtoMessageName, "
197+
+ "protoColumnFamily, and protoColumn must be specified.");
198+
}
199+
protoDecoder =
200+
new ProtoDecoder(
201+
options.getProtoSchemaPath(),
202+
options.getFullProtoMessageName(),
203+
options.getProtoColumnFamily(),
204+
options.getProtoColumn(),
205+
options.getPreserveProtoFieldNames());
206+
LOG.info(
207+
"Proto decoding enabled for {}.{} using message type {}",
208+
options.getProtoColumnFamily(),
209+
options.getProtoColumn(),
210+
options.getFullProtoMessageName());
211+
}
212+
213+
ValueTransformerRegistry transformerRegistry =
214+
ValueTransformerRegistry.parse(options.getColumnTransforms());
215+
if (transformerRegistry != null) {
216+
LOG.info("Column transforms enabled: {}", options.getColumnTransforms());
217+
}
218+
185219
Pipeline pipeline = Pipeline.create(options);
186220
DeadLetterQueueManager dlqManager = buildDlqManager(options);
187221

@@ -213,7 +247,9 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
213247
PCollection<TableRow> changeStreamMutationToTableRow =
214248
dataChangeRecord.apply(
215249
"ChangeStreamMutation To TableRow",
216-
ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));
250+
ParDo.of(
251+
new ChangeStreamMutationToTableRowFn(
252+
sourceInfo, bigQuery, protoDecoder, transformerRegistry)));
217253

218254
Write<TableRow> bigQueryWrite =
219255
BigQueryIO.<TableRow>write()
@@ -324,10 +360,18 @@ private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions
324360
static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {
325361
private final BigtableSource sourceInfo;
326362
private final BigQueryUtils bigQuery;
327-
328-
ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
363+
private final ProtoDecoder protoDecoder;
364+
private final ValueTransformerRegistry transformerRegistry;
365+
366+
ChangeStreamMutationToTableRowFn(
367+
BigtableSource source,
368+
BigQueryUtils bigQuery,
369+
ProtoDecoder protoDecoder,
370+
ValueTransformerRegistry transformerRegistry) {
329371
this.sourceInfo = source;
330372
this.bigQuery = bigQuery;
373+
this.protoDecoder = protoDecoder;
374+
this.transformerRegistry = transformerRegistry;
331375
}
332376

333377
@ProcessElement
@@ -339,7 +383,7 @@ public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow
339383
Mod mod = null;
340384
switch (modType) {
341385
case SET_CELL:
342-
mod = new Mod(sourceInfo, input, (SetCell) entry);
386+
mod = new Mod(sourceInfo, input, (SetCell) entry, protoDecoder, transformerRegistry);
343387
break;
344388
case DELETE_CELLS:
345389
mod = new Mod(sourceInfo, input, (DeleteCells) entry);

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/model/Mod.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
2424
import com.google.cloud.bigtable.data.v2.models.Range.BoundType;
2525
import com.google.cloud.bigtable.data.v2.models.SetCell;
26+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
27+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
2628
import com.google.cloud.teleport.v2.utils.BigtableSource;
2729
import com.google.common.collect.Maps;
2830
import com.google.protobuf.ByteString;
@@ -49,6 +51,8 @@ public final class Mod implements Serializable {
4951

5052
private static final String PATTERN_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
5153

54+
public static final String TRANSFORMED_VALUE = "TRANSFORMED_VALUE";
55+
5256
private static final ThreadLocal<ObjectMapper> OBJECT_MAPPER =
5357
ThreadLocal.withInitial(ObjectMapper::new);
5458

@@ -79,6 +83,38 @@ public Mod(BigtableSource source, ChangeStreamMutation mutation, SetCell setCell
7983
this.changeJson = convertPropertiesToJson(propertiesMap);
8084
}
8185

86+
public Mod(
87+
BigtableSource source,
88+
ChangeStreamMutation mutation,
89+
SetCell setCell,
90+
ProtoDecoder protoDecoder,
91+
ValueTransformerRegistry transformerRegistry) {
92+
this(mutation.getCommitTimestamp(), ModType.SET_CELL);
93+
94+
Map<String, Object> propertiesMap = Maps.newHashMap();
95+
setCommonProperties(propertiesMap, source, mutation);
96+
setSpecificProperties(propertiesMap, setCell);
97+
98+
if (protoDecoder != null || transformerRegistry != null) {
99+
String qualifierStr = setCell.getQualifier().toStringUtf8();
100+
byte[] valueBytes = setCell.getValue().toByteArray();
101+
String transformed = null;
102+
103+
if (protoDecoder != null && protoDecoder.matches(setCell.getFamilyName(), qualifierStr)) {
104+
transformed = protoDecoder.decode(valueBytes);
105+
}
106+
if (transformed == null && transformerRegistry != null) {
107+
transformed =
108+
transformerRegistry.transform(setCell.getFamilyName(), qualifierStr, valueBytes);
109+
}
110+
if (transformed != null) {
111+
propertiesMap.put(TRANSFORMED_VALUE, transformed);
112+
}
113+
}
114+
115+
this.changeJson = convertPropertiesToJson(propertiesMap);
116+
}
117+
82118
public Mod(BigtableSource source, ChangeStreamMutation mutation, DeleteCells deleteCells) {
83119
this.commitTimestampNanos = mutation.getCommitTimestamp().getNano();
84120
this.commitTimestampSeconds = mutation.getCommitTimestamp().getEpochSecond();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (C) 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;
17+
18+
import java.nio.ByteBuffer;
19+
import java.time.Instant;
20+
import java.time.ZoneId;
21+
import java.time.format.DateTimeFormatter;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* Transforms an 8-byte big-endian 64-bit integer representing Unix epoch milliseconds into a
27+
* BigQuery-compatible timestamp string.
28+
*/
29+
public class BigEndianTimestampTransformer implements ValueTransformer {
30+
31+
private static final long serialVersionUID = 1L;
32+
private static final Logger LOG = LoggerFactory.getLogger(BigEndianTimestampTransformer.class);
33+
private static final DateTimeFormatter FORMATTER =
34+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").withZone(ZoneId.of("UTC"));
35+
36+
@Override
37+
public String transform(byte[] bytes) {
38+
if (bytes == null || bytes.length != 8) {
39+
LOG.warn(
40+
"Expected 8 bytes for big-endian uint64 timestamp, got {}",
41+
bytes == null ? "null" : bytes.length);
42+
return null;
43+
}
44+
try {
45+
long millis = ByteBuffer.wrap(bytes).getLong();
46+
return FORMATTER.format(Instant.ofEpochMilli(millis));
47+
} catch (Exception e) {
48+
LOG.warn("Failed to decode big-endian timestamp: {}", e.getMessage());
49+
return null;
50+
}
51+
}
52+
}

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/schemautils/BigQueryUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ public class BigQueryUtils implements Serializable {
9494
FORMATTERS.put(
9595
ChangelogColumn.VALUE_STRING,
9696
(bq, chg) -> {
97+
if (chg.has(Mod.TRANSFORMED_VALUE)) {
98+
return chg.getString(Mod.TRANSFORMED_VALUE);
99+
}
97100
if (!chg.has(ChangelogColumn.VALUE_BYTES.name())) {
98101
return null;
99102
}
100-
101103
String valueEncoded = chg.getString(ChangelogColumn.VALUE_BYTES.name());
102104
return bq.convertBase64ToString(valueEncoded);
103105
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;
17+
18+
import java.io.Serializable;
19+
20+
/** Transforms a Bigtable cell value from raw bytes to a string representation for BigQuery. */
21+
public interface ValueTransformer extends Serializable {
22+
23+
/**
24+
* Transforms raw cell value bytes to a string for BigQuery.
25+
*
26+
* @param bytes the raw cell value bytes
27+
* @return the transformed string, or null if transformation fails
28+
*/
29+
String transform(byte[] bytes);
30+
}

0 commit comments

Comments
 (0)