Require sourceConfig.config.type for ingestion pipelines#29566
Require sourceConfig.config.type for ingestion pipelines#29566ayush-shah wants to merge 4 commits into
Conversation
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
| : "UPDATE ingestion_pipeline_entity i " | ||
| + "SET json = jsonb_set(i.json, '{sourceConfig,config,type}', " | ||
| + "to_jsonb((CASE er.fromentity " | ||
| + "WHEN 'apiService' THEN 'ApiMetadata' " | ||
| + "WHEN 'dashboardService' THEN 'DashboardMetadata' " | ||
| + "WHEN 'databaseService' THEN 'DatabaseMetadata' " | ||
| + "WHEN 'driveService' THEN 'DriveMetadata' " | ||
| + "WHEN 'mcpService' THEN 'McpMetadata' " | ||
| + "WHEN 'messagingService' THEN 'MessagingMetadata' " | ||
| + "WHEN 'mlmodelService' THEN 'MlModelMetadata' " | ||
| + "WHEN 'pipelineService' THEN 'PipelineMetadata' " | ||
| + "WHEN 'searchService' THEN 'SearchMetadata' " | ||
| + "WHEN 'securityService' THEN 'SecurityMetadata' " | ||
| + "WHEN 'storageService' THEN 'StorageMetadata' " | ||
| + "END)::text), true) " | ||
| + "FROM entity_relationship er " | ||
| + "WHERE er.toid = i.id " | ||
| + "AND er.toentity = 'ingestionPipeline' " | ||
| + "AND er.relation = 0 " | ||
| + "AND er.deleted = false " | ||
| + "AND i.json ->> 'pipelineType' = 'metadata' " | ||
| + "AND i.json #>> '{sourceConfig,config,type}' IS NULL " | ||
| + "AND jsonb_typeof(i.json #> '{sourceConfig,config}') = 'object' " | ||
| + "AND er.fromentity IN ('apiService', 'dashboardService', 'databaseService', " | ||
| + "'driveService', 'mcpService', 'messagingService', 'mlmodelService', " | ||
| + "'pipelineService', 'searchService', 'securityService', 'storageService')"; |
There was a problem hiding this comment.
Postgres CASE also has no ELSE, same silent-null risk
The Postgres variant has the same structural gap: if the CASE expression in to_jsonb((CASE ...)::text) reaches the implicit ELSE NULL, then to_jsonb(NULL::text) produces JSON null and jsonb_set writes it into the row. Same mitigation applies — add ELSE er.fromentity or a matching guard so a future list mismatch surfaces as a data error rather than a silent corrupt write.
|
Pushed a fix for the current migration review thread in |
Code Review ✅ Approved 3 resolved / 3 findingsEnforces a ✅ 3 resolved✅ Bug: Backfill added to already-released v1131 migration may never run
✅ Edge Case: Backfill no-ops on scalar sourceConfig.config rows, leaving them typeless
✅ Edge Case: Backfill only types databaseService metadata pipelines
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
Supersedes open-metadata/openmetadata-collate#4727.
Summary
Reject ingestion pipeline create/update payloads whose
sourceConfig.configobject is missing thetypediscriminator, and backfill existing metadata ingestion pipeline rows whose object config is missing that discriminator.What Changed
sourceConfig.configvalues.type.pipelineType = metadata; scalar configs remain rejected instead of guessed.Validation
rtk mvn -pl openmetadata-service -Dtest=IngestionPipelineRepositoryTest testrtk mvn -pl openmetadata-service -Dtest=org.openmetadata.service.migration.utils.v200.MigrationUtilTest testrtk mvn -pl openmetadata-service spotless:checkGreptile Summary
This PR adds repository-level validation that rejects ingestion pipeline create/update requests missing the
sourceConfig.config.typediscriminator, and ships an idempotent v200 backfill migration that inserts the correcttypevalue for existing metadata ingestion pipelines whose stored JSON is missing it.IngestionPipelineRepository):validateSourceConfigHasTypeis called fromprepare()on every create/update; it handles rawMap, typed POJOs (via JacksonconvertValue), and explicit enum values, returning HTTP 400 for null, scalar, empty-map, and untyped-object configs.MigrationUtil): Two dialect-specific SQL paths (MySQLJSON_SETvia inner JOIN, Postgresjsonb_setviaUPDATE FROM) write the service-type-to-config-type mapping; the WHERE clause (IS NULL+JSON_TYPE = 'OBJECT') makes the migration fully idempotent and limits it topipelineType = 'metadata'rows.Confidence Score: 5/5
Safe to merge — the validation is a straightforward null/type guard on every create/update, and the backfill migration is idempotent and scoped to object-typed metadata pipelines with a missing discriminator.
The repository validation correctly handles all config shapes (null, scalar, empty map, typed POJO, raw map with type), the backfill migration uses a JOIN-based approach that avoids the silent-null CASE risk from a prior design, and both dialect variants are guarded by idempotency conditions. Tests cover the key paths. No data-loss or correctness issues found.
No files require special attention.
Important Files Changed
validateSourceConfigHasTypestatic helper called fromprepare(); rejects null/scalar/untyped sourceConfig.config with a 400 — logic is correct and covers both raw Map and typed POJO configs.backfillMetadataSourceConfigTypeswith separate MySQL (JOIN + JSON_SET) and Postgres (UPDATE FROM + jsonb_set) paths; idempotent by design; the JOIN-based approach avoids the CASE-without-ELSE silent-null risk from the prior iteration.backfillMetadataSourceConfigTypes(handle)into the MySQL v200 migration runBeforeMigration sequence.backfillMetadataSourceConfigTypes(handle)into the Postgres v200 migration runBeforeMigration sequence.Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Client participant IngestionPipelineResource participant IngestionPipelineRepository participant DB Client->>IngestionPipelineResource: POST/PUT /ingestionPipelines (sourceConfig.config) IngestionPipelineResource->>IngestionPipelineRepository: create / update IngestionPipelineRepository->>IngestionPipelineRepository: prepare() IngestionPipelineRepository->>IngestionPipelineRepository: validateSourceConfigHasType() alt config is null or sourceConfig is null IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required else config is a scalar (String/Number) IngestionPipelineRepository-->>Client: 400 sourceConfig.config must be an object with type else config is Map or POJO with no type IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required else config has non-blank String or Enum type IngestionPipelineRepository->>DB: persist entity DB-->>Client: 201 / 200 end Note over DB: v200 migration (backfill)<br/>UPDATE ingestion_pipeline_entity<br/>JOIN entity_relationship<br/>SET json.sourceConfig.config.type<br/>WHERE pipelineType=metadata<br/>AND type IS NULL<br/>AND config IS OBJECT%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Client participant IngestionPipelineResource participant IngestionPipelineRepository participant DB Client->>IngestionPipelineResource: POST/PUT /ingestionPipelines (sourceConfig.config) IngestionPipelineResource->>IngestionPipelineRepository: create / update IngestionPipelineRepository->>IngestionPipelineRepository: prepare() IngestionPipelineRepository->>IngestionPipelineRepository: validateSourceConfigHasType() alt config is null or sourceConfig is null IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required else config is a scalar (String/Number) IngestionPipelineRepository-->>Client: 400 sourceConfig.config must be an object with type else config is Map or POJO with no type IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required else config has non-blank String or Enum type IngestionPipelineRepository->>DB: persist entity DB-->>Client: 201 / 200 end Note over DB: v200 migration (backfill)<br/>UPDATE ingestion_pipeline_entity<br/>JOIN entity_relationship<br/>SET json.sourceConfig.config.type<br/>WHERE pipelineType=metadata<br/>AND type IS NULL<br/>AND config IS OBJECTReviews (2): Last reviewed commit: "migration: avoid null source config back..." | Re-trigger Greptile