Skip to content

Commit 0bb6d5e

Browse files
committed
feat(bigtable): add columnTransforms for generic cell value decoding
Add a columnTransforms parameter that maps column_family:column pairs to value transformers. Supported types: - BIG_ENDIAN_UINT64_TIMESTAMP_MS: 8-byte big-endian uint64 as timestamp Example: columnTransforms=event_data:timestamp:BIG_ENDIAN_UINT64_TIMESTAMP_MS This complements the proto decode feature — proto decode handles protobuf columns while columnTransforms handles other binary encodings. Transform priority: proto decode first, then columnTransforms. Rename PROTO_DECODED_VALUE to TRANSFORMED_VALUE to reflect the generalized transform pipeline.
1 parent 2ff5cb1 commit 0bb6d5e

8 files changed

Lines changed: 298 additions & 20 deletions

File tree

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/BigtableChangeStreamToBigQueryOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,19 @@ public interface BigtableChangeStreamToBigQueryOptions
215215
Boolean getPreserveProtoFieldNames();
216216

217217
void setPreserveProtoFieldNames(Boolean value);
218+
219+
@TemplateParameter.Text(
220+
order = 16,
221+
optional = true,
222+
description = "Column value transforms",
223+
helpText =
224+
"A comma-separated list of column value transformations in the format "
225+
+ "column_family:column:TRANSFORM_TYPE. Matched cell values are transformed "
226+
+ "before writing to BigQuery. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS "
227+
+ "(8-byte big-endian unsigned 64-bit integer as Unix epoch milliseconds, "
228+
+ "converted to a timestamp string).")
229+
@Default.String("")
230+
String getColumnTransforms();
231+
232+
void setColumnTransforms(String value);
218233
}

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/BigtableChangeStreamsToBigQuery.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
3838
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
3939
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
40+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
4041
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
4142
import com.google.cloud.teleport.v2.utils.BigtableSource;
4243
import java.util.ArrayList;
@@ -210,6 +211,13 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
210211
options.getFullProtoMessageName());
211212
}
212213

214+
// Column value transforms (optional)
215+
ValueTransformerRegistry transformerRegistry =
216+
ValueTransformerRegistry.parse(options.getColumnTransforms());
217+
if (transformerRegistry != null) {
218+
LOG.info("Column transforms enabled: {}", options.getColumnTransforms());
219+
}
220+
213221
Pipeline pipeline = Pipeline.create(options);
214222
DeadLetterQueueManager dlqManager = buildDlqManager(options);
215223

@@ -241,7 +249,9 @@ public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options)
241249
PCollection<TableRow> changeStreamMutationToTableRow =
242250
dataChangeRecord.apply(
243251
"ChangeStreamMutation To TableRow",
244-
ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery, protoDecoder)));
252+
ParDo.of(
253+
new ChangeStreamMutationToTableRowFn(
254+
sourceInfo, bigQuery, protoDecoder, transformerRegistry)));
245255

246256
Write<TableRow> bigQueryWrite =
247257
BigQueryIO.<TableRow>write()
@@ -353,12 +363,17 @@ static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation,
353363
private final BigtableSource sourceInfo;
354364
private final BigQueryUtils bigQuery;
355365
private final ProtoDecoder protoDecoder;
366+
private final ValueTransformerRegistry transformerRegistry;
356367

357368
ChangeStreamMutationToTableRowFn(
358-
BigtableSource source, BigQueryUtils bigQuery, ProtoDecoder protoDecoder) {
369+
BigtableSource source,
370+
BigQueryUtils bigQuery,
371+
ProtoDecoder protoDecoder,
372+
ValueTransformerRegistry transformerRegistry) {
359373
this.sourceInfo = source;
360374
this.bigQuery = bigQuery;
361375
this.protoDecoder = protoDecoder;
376+
this.transformerRegistry = transformerRegistry;
362377
}
363378

364379
@ProcessElement
@@ -371,8 +386,8 @@ public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow
371386
switch (modType) {
372387
case SET_CELL:
373388
mod =
374-
protoDecoder != null
375-
? new Mod(sourceInfo, input, (SetCell) entry, protoDecoder)
389+
(protoDecoder != null || transformerRegistry != null)
390+
? new Mod(sourceInfo, input, (SetCell) entry, protoDecoder, transformerRegistry)
376391
: new Mod(sourceInfo, input, (SetCell) entry);
377392
break;
378393
case DELETE_CELLS:

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/model/Mod.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigtable.data.v2.models.Range.BoundType;
2525
import com.google.cloud.bigtable.data.v2.models.SetCell;
2626
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ProtoDecoder;
27+
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.ValueTransformerRegistry;
2728
import com.google.cloud.teleport.v2.utils.BigtableSource;
2829
import com.google.common.collect.Maps;
2930
import com.google.protobuf.ByteString;
@@ -50,7 +51,7 @@ public final class Mod implements Serializable {
5051

5152
private static final String PATTERN_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
5253

53-
public static final String PROTO_DECODED_VALUE = "PROTO_DECODED_VALUE";
54+
public static final String TRANSFORMED_VALUE = "TRANSFORMED_VALUE";
5455

5556
private static final ThreadLocal<ObjectMapper> OBJECT_MAPPER =
5657
ThreadLocal.withInitial(ObjectMapper::new);
@@ -87,21 +88,38 @@ public Mod(
8788
ChangeStreamMutation mutation,
8889
SetCell setCell,
8990
ProtoDecoder protoDecoder) {
91+
this(source, mutation, setCell, protoDecoder, null);
92+
}
93+
94+
public Mod(
95+
BigtableSource source,
96+
ChangeStreamMutation mutation,
97+
SetCell setCell,
98+
ProtoDecoder protoDecoder,
99+
ValueTransformerRegistry transformerRegistry) {
90100
this(mutation.getCommitTimestamp(), ModType.SET_CELL);
91101

92102
Map<String, Object> propertiesMap = Maps.newHashMap();
93103
setCommonProperties(propertiesMap, source, mutation);
94104
setSpecificProperties(propertiesMap, setCell);
95105

96-
// Attempt proto decode if the column matches
97-
if (protoDecoder != null) {
98-
String qualifierStr = setCell.getQualifier().toStringUtf8();
99-
if (protoDecoder.matches(setCell.getFamilyName(), qualifierStr)) {
100-
String decodedJson = protoDecoder.decode(setCell.getValue().toByteArray());
101-
if (decodedJson != null) {
102-
propertiesMap.put(PROTO_DECODED_VALUE, decodedJson);
103-
}
104-
}
106+
String qualifierStr = setCell.getQualifier().toStringUtf8();
107+
String transformed = null;
108+
109+
// Try proto decode first
110+
if (protoDecoder != null && protoDecoder.matches(setCell.getFamilyName(), qualifierStr)) {
111+
transformed = protoDecoder.decode(setCell.getValue().toByteArray());
112+
}
113+
114+
// Then try column transforms
115+
if (transformed == null && transformerRegistry != null) {
116+
transformed =
117+
transformerRegistry.transform(
118+
setCell.getFamilyName(), qualifierStr, setCell.getValue().toByteArray());
119+
}
120+
121+
if (transformed != null) {
122+
propertiesMap.put(TRANSFORMED_VALUE, transformed);
105123
}
106124

107125
this.changeJson = convertPropertiesToJson(propertiesMap);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (C) 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;
17+
18+
import java.nio.ByteBuffer;
19+
import java.time.Instant;
20+
import java.time.ZoneOffset;
21+
import java.time.format.DateTimeFormatter;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* Transforms an 8-byte big-endian unsigned 64-bit integer representing Unix epoch milliseconds into
27+
* a BigQuery-compatible timestamp string.
28+
*/
29+
public class BigEndianTimestampTransformer implements ValueTransformer {
30+
31+
private static final long serialVersionUID = 1L;
32+
private static final Logger LOG = LoggerFactory.getLogger(BigEndianTimestampTransformer.class);
33+
private static final DateTimeFormatter FORMATTER =
34+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC);
35+
36+
@Override
37+
public String transform(byte[] bytes) {
38+
if (bytes == null || bytes.length != 8) {
39+
LOG.warn(
40+
"Expected 8 bytes for big-endian uint64 timestamp, got {}",
41+
bytes == null ? "null" : bytes.length);
42+
return null;
43+
}
44+
try {
45+
long millis = ByteBuffer.wrap(bytes).getLong();
46+
return FORMATTER.format(Instant.ofEpochMilli(millis));
47+
} catch (Exception e) {
48+
LOG.warn("Failed to decode big-endian timestamp: {}", e.getMessage());
49+
return null;
50+
}
51+
}
52+
}

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigquery/schemautils/BigQueryUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ public class BigQueryUtils implements Serializable {
9494
FORMATTERS.put(
9595
ChangelogColumn.VALUE_STRING,
9696
(bq, chg) -> {
97-
// Use proto-decoded JSON if available
98-
if (chg.has(Mod.PROTO_DECODED_VALUE)) {
99-
return chg.getString(Mod.PROTO_DECODED_VALUE);
97+
// Use transformed value (proto decode or column transform) if available
98+
if (chg.has(Mod.TRANSFORMED_VALUE)) {
99+
return chg.getString(Mod.TRANSFORMED_VALUE);
100100
}
101101
if (!chg.has(ChangelogColumn.VALUE_BYTES.name())) {
102102
return null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;
17+
18+
import java.io.Serializable;
19+
20+
/** Transforms a Bigtable cell value from raw bytes to a string representation for BigQuery. */
21+
public interface ValueTransformer extends Serializable {
22+
23+
/**
24+
* Transforms raw cell value bytes to a string for BigQuery.
25+
*
26+
* @param bytes the raw cell value bytes
27+
* @return the transformed string, or null if transformation fails
28+
*/
29+
String transform(byte[] bytes);
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (C) 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils;
17+
18+
import java.io.Serializable;
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
/**
23+
* Registry that maps column family:column qualifier pairs to {@link ValueTransformer} instances.
24+
*
25+
* <p>Parses a comma-separated configuration string with format: {@code
26+
* column_family:column:TRANSFORM_TYPE}
27+
*
28+
* <p>Supported transform types:
29+
*
30+
* <ul>
31+
* <li>{@code BIG_ENDIAN_UINT64_TIMESTAMP_MS} - 8-byte big-endian uint64 Unix milliseconds
32+
* </ul>
33+
*/
34+
public class ValueTransformerRegistry implements Serializable {
35+
36+
private static final long serialVersionUID = 1L;
37+
38+
private final Map<String, ValueTransformer> transformers;
39+
40+
private ValueTransformerRegistry(Map<String, ValueTransformer> transformers) {
41+
this.transformers = transformers;
42+
}
43+
44+
/**
45+
* Parses a comma-separated transform configuration string.
46+
*
47+
* @param config format: "family:column:TYPE,family2:column2:TYPE2"
48+
* @return a new registry, or null if config is null or empty
49+
* @throws IllegalArgumentException if the config format is invalid or a transform type is unknown
50+
*/
51+
public static ValueTransformerRegistry parse(String config) {
52+
if (config == null || config.isEmpty()) {
53+
return null;
54+
}
55+
56+
Map<String, ValueTransformer> transformers = new HashMap<>();
57+
for (String entry : config.split(",")) {
58+
String trimmed = entry.trim();
59+
if (trimmed.isEmpty()) {
60+
continue;
61+
}
62+
String[] parts = trimmed.split(":");
63+
if (parts.length != 3) {
64+
throw new IllegalArgumentException(
65+
"Invalid columnTransforms entry '"
66+
+ trimmed
67+
+ "'. Expected format: column_family:column:TRANSFORM_TYPE");
68+
}
69+
String family = parts[0];
70+
String column = parts[1];
71+
String type = parts[2];
72+
String key = family + ":" + column;
73+
74+
ValueTransformer transformer = createTransformer(type);
75+
transformers.put(key, transformer);
76+
}
77+
return new ValueTransformerRegistry(transformers);
78+
}
79+
80+
/**
81+
* Transforms a cell value if a transformer is registered for the given column.
82+
*
83+
* @return the transformed string, or null if no transformer matches or transformation fails
84+
*/
85+
public String transform(String family, String column, byte[] bytes) {
86+
ValueTransformer transformer = transformers.get(family + ":" + column);
87+
if (transformer == null) {
88+
return null;
89+
}
90+
return transformer.transform(bytes);
91+
}
92+
93+
/** Returns true if a transformer is registered for the given column. */
94+
public boolean hasTransformer(String family, String column) {
95+
return transformers.containsKey(family + ":" + column);
96+
}
97+
98+
private static ValueTransformer createTransformer(String type) {
99+
switch (type) {
100+
case "BIG_ENDIAN_UINT64_TIMESTAMP_MS":
101+
return new BigEndianTimestampTransformer();
102+
default:
103+
throw new IllegalArgumentException(
104+
"Unknown transform type '"
105+
+ type
106+
+ "'. Supported types: BIG_ENDIAN_UINT64_TIMESTAMP_MS");
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)