Migrate Data Catalog to Dataplex Knowledge Catalog for CDC templates#3927
Migrate Data Catalog to Dataplex Knowledge Catalog for CDC templates#3927stankiewicz wants to merge 23 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical failure in the CDC pipeline caused by the deprecation of the legacy Google Cloud Data Catalog API. By migrating the schema publishing and retrieval logic to the Dataplex Knowledge Catalog API, the pipeline maintains compatibility with current Google Cloud infrastructure standards. The changes ensure that metadata, including system types and labels, is correctly preserved and mapped to the new Dataplex entry structures. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request migrates the CDC parent and connector modules from Google Cloud Data Catalog to Google Cloud Dataplex Catalog, updating dependencies, client initializations, and schema mapping utilities to use Dataplex's aspect-based schema model. The review feedback identifies several critical issues, including resource leaks from unclosed CatalogServiceClient instances, an N+1 query performance bottleneck when retrieving entry schemas, improper handling of InterruptedException, and potential NullPointerException risks when parsing protobuf Struct fields without defensive checks.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates the CDC common library from Google Cloud Data Catalog to Google Cloud Dataplex Catalog, updating dependencies, clients, and schema mapping utilities to use Dataplex aspects. Feedback on these changes suggests propagating exceptions during asynchronous entry group creation to avoid invalid states, filtering search results in lookupPubSubEntry to verify they belong to the @pubsub entry group, and normalizing data types to lowercase in SchemaUtils to ensure robust, case-insensitive type mapping.
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("Interrupted while creating EntryGroup", e); | ||
| } catch (ExecutionException e) { | ||
| if (e.getCause() instanceof AlreadyExistsException) { | ||
| this.entryGroupCreated = true; | ||
| } else { | ||
| LOG.error("Failed to create EntryGroup", e); | ||
| } | ||
| } |
There was a problem hiding this comment.
If createEntryGroupAsync fails with an exception other than AlreadyExistsException (or if the thread is interrupted), the error is logged but the exception is swallowed. This allows the constructor of SingleTopicSchemaManager to complete successfully in an invalid state, which will cause subsequent schema updates to fail.
To ensure fail-fast behavior and make debugging easier, we should propagate these failures by throwing a RuntimeException.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while creating EntryGroup", e);
throw new RuntimeException("Interrupted while creating EntryGroup", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof AlreadyExistsException) {
this.entryGroupCreated = true;
} else {
LOG.error("Failed to create EntryGroup", e);
throw new RuntimeException("Failed to create EntryGroup", e);
}
}References
- When throwing exceptions in environments where the caller or global exception handler might not log them (such as certain Dataflow templates), log the error message before throwing the exception to ensure the failure is recorded.
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3927 +/- ##
============================================
- Coverage 55.59% 55.58% -0.01%
+ Complexity 7052 6731 -321
============================================
Files 1103 1114 +11
Lines 67681 68377 +696
Branches 7603 7718 +115
============================================
+ Hits 37625 38006 +381
- Misses 27636 27906 +270
- Partials 2420 2465 +45
🚀 New features to boost your workflow:
|
|
Fixes: #3921 |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates the CDC application from Google Cloud Data Catalog to Google Cloud Dataplex Catalog, updating dependencies, client implementations, and schema conversion utilities to use Dataplex aspects. The review feedback highlights several critical issues: a resource leak due to an unclosed CatalogServiceClient in DataCatalogSchemaManager, swallowed exceptions in getSchemasForEntryGroup and createEntryGroup that hinder debugging, a potential thread hang from calling .get() without a timeout, misleading error handling during JSON parsing, and a lack of string trimming when parsing comma-separated topics or subscriptions.
| public abstract static class DataCatalogSchemaManager { | ||
| final String gcpProject; | ||
| final String location; | ||
| DataCatalogClient client; | ||
| CatalogServiceClient client; | ||
| private final java.util.Set<String> createdEntryGroups = new java.util.HashSet<>(); |
There was a problem hiding this comment.
The CatalogServiceClient is instantiated as an instance variable in DataCatalogSchemaManager but is never closed, which will leak the client and its underlying gRPC channels/connections. To prevent resource leaks, DataCatalogSchemaManager should implement AutoCloseable and close the client in its close() method.
| public abstract static class DataCatalogSchemaManager { | |
| final String gcpProject; | |
| final String location; | |
| DataCatalogClient client; | |
| CatalogServiceClient client; | |
| private final java.util.Set<String> createdEntryGroups = new java.util.HashSet<>(); | |
| public abstract static class DataCatalogSchemaManager implements AutoCloseable { | |
| final String gcpProject; | |
| final String location; | |
| CatalogServiceClient client; | |
| private final java.util.Set<String> createdEntryGroups = new java.util.HashSet<>(); | |
| @Override | |
| public void close() { | |
| if (client != null) { | |
| client.close(); | |
| client = null; | |
| } | |
| } |
| } catch (Exception e) { | ||
| LOG.error("Failed to list entries: ", e); | ||
| } |
There was a problem hiding this comment.
Catching and silently swallowing all exceptions in getSchemasForEntryGroup is a regression from the original code. If listing entries fails (e.g., due to permission or network issues), returning an empty map silently makes debugging extremely difficult. The exception should be logged and rethrown (wrapped in a RuntimeException).
| } catch (Exception e) { | |
| LOG.error("Failed to list entries: ", e); | |
| } | |
| } catch (Exception e) { | |
| LOG.error("Failed to list entries: ", e); | |
| throw new RuntimeException("Failed to list entries for entry group " + entryGroupId, e); | |
| } |
| client.createEntryGroupAsync(entryGroupRequest).get(); | ||
| LOG.info("Created EntryGroup: {}", entryGroupId); | ||
|
|
||
| this.entryGroupCreated = true; | ||
| this.createdEntryGroups.add(entryGroupId); | ||
| } catch (AlreadyExistsException e) { | ||
| // EntryGroup already exists. There is no further action needed. | ||
| // EntryGroup already exists. | ||
| this.createdEntryGroups.add(entryGroupId); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("Interrupted while creating EntryGroup", e); | ||
| } catch (ExecutionException e) { | ||
| if (e.getCause() instanceof AlreadyExistsException) { | ||
| this.createdEntryGroups.add(entryGroupId); | ||
| } else { | ||
| LOG.error("Failed to create EntryGroup", e); | ||
| } | ||
| } |
There was a problem hiding this comment.
In createEntryGroup, InterruptedException and ExecutionException (for non-AlreadyExists causes) are logged but swallowed, allowing the code to proceed and fail later with a less clear error. Additionally, calling .get() without a timeout can cause the thread to hang indefinitely if the service is unresponsive. We should use a timeout and propagate these exceptions wrapped in a RuntimeException to fail fast and preserve the root cause.
client.createEntryGroupAsync(entryGroupRequest).get(1, java.util.concurrent.TimeUnit.MINUTES);
LOG.info("Created EntryGroup: {}", entryGroupId);
this.createdEntryGroups.add(entryGroupId);
} catch (AlreadyExistsException e) {
// EntryGroup already exists.
this.createdEntryGroups.add(entryGroupId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while creating EntryGroup", e);
throw new RuntimeException("Interrupted while creating EntryGroup " + entryGroupId, e);
} catch (ExecutionException e) {
if (e.getCause() instanceof AlreadyExistsException) {
this.createdEntryGroups.add(entryGroupId);
} else {
LOG.error("Failed to create EntryGroup", e);
throw new RuntimeException("Failed to create EntryGroup " + entryGroupId, e.getCause());
}
} catch (java.util.concurrent.TimeoutException e) {
LOG.error("Timeout while creating EntryGroup", e);
throw new RuntimeException("Timeout while creating EntryGroup " + entryGroupId, e);
}| JsonFormat.parser().merge(schemaData.getFieldsOrThrow("type").getStringValue(), builder); | ||
| return SchemaUtils.toBeamSchema(builder.build()); | ||
| } |
There was a problem hiding this comment.
JsonFormat.parser().merge throws an IOException (specifically InvalidProtocolBufferException) if the JSON is malformed. Currently, this is caught by the outer catch block, which logs a misleading error message: "Unable to create a CatalogServiceClient". This exception should be caught separately with an accurate error message.
Struct.Builder builder = Struct.newBuilder();
try {
JsonFormat.parser().merge(schemaData.getFieldsOrThrow("type").getStringValue(), builder);
} catch (IOException e) {
LOG.error("Failed to parse schema JSON from aspect", e);
throw new RuntimeException("Failed to parse schema JSON from aspect", e);
}
return SchemaUtils.toBeamSchema(builder.build());| if (options.getInputSubscriptions() != null && !options.getInputSubscriptions().isEmpty()) { | ||
| subscriptionList = Arrays.asList(options.getInputSubscriptions().split(",")); | ||
| topicList = | ||
| subscriptionList.stream() | ||
| .map( | ||
| s -> { | ||
| try { | ||
| return PubsubUtils.getPubSubTopicFromSubscription(project, s).getTopic(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }) | ||
| .collect(Collectors.toList()); | ||
| } else { | ||
| Preconditions.checkArgument( | ||
| options.getInputTopics() != null && !options.getInputTopics().isEmpty(), | ||
| "Must provide an inputSubscriptions or inputTopics parameter."); | ||
| topicList = Arrays.asList(options.getInputTopics().split(",")); | ||
| subscriptionList = topicList.stream().map(t -> (String) null).collect(Collectors.toList()); | ||
| } |
There was a problem hiding this comment.
Splitting comma-separated options (subscriptions or topics) without trimming whitespace can cause runtime failures if users include spaces (e.g., "sub1, sub2"). It is safer to trim each element after splitting.
if (options.getInputSubscriptions() != null && !options.getInputSubscriptions().isEmpty()) {
subscriptionList = Arrays.stream(options.getInputSubscriptions().split(","))
.map(String::trim)
.collect(Collectors.toList());
topicList =
subscriptionList.stream()
.map(
s -> {
try {
return PubsubUtils.getPubSubTopicFromSubscription(project, s).getTopic();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
} else {
Preconditions.checkArgument(
options.getInputTopics() != null && !options.getInputTopics().isEmpty(),
"Must provide an inputSubscriptions or inputTopics parameter.");
topicList = Arrays.stream(options.getInputTopics().split(","))
.map(String::trim)
.collect(Collectors.toList());
subscriptionList = topicList.stream().map(t -> (String) null).collect(Collectors.toList());
}
This PR resolves the INVALID_ARGUMENT write operation failure (Project is not allowed to perform write operations due to Data Catalog deprecation) caused by the deprecation of the legacy Google Cloud Data Catalog API.
It migrates the Debezium-to-PubSub CDC pipeline's schema publishing and schema retrieval logic to use the new Dataplex Knowledge Catalog API (com.google.cloud.dataplex.v1.CatalogServiceClient).
Key Changes