Skip to content

Require sourceConfig.config.type for ingestion pipelines#29566

Open
ayush-shah wants to merge 4 commits into
mainfrom
ayush-shah/ingestion-source-config-type
Open

Require sourceConfig.config.type for ingestion pipelines#29566
ayush-shah wants to merge 4 commits into
mainfrom
ayush-shah/ingestion-source-config-type

Conversation

@ayush-shah

@ayush-shah ayush-shah commented Jun 29, 2026

Copy link
Copy Markdown
Member

Supersedes open-metadata/openmetadata-collate#4727.

Summary

Reject ingestion pipeline create/update payloads whose sourceConfig.config object is missing the type discriminator, and backfill existing metadata ingestion pipeline rows whose object config is missing that discriminator.

What Changed

  • Add repository-level validation for ingestion pipeline source config discriminators.
  • Return 400 for missing, scalar, empty, or untyped sourceConfig.config values.
  • Preserve generated/typed config objects and raw config maps that include type.
  • Add an idempotent v200 MySQL/Postgres backfill for metadata ingestion pipelines across service types.
  • Keep the backfill limited to object configs and pipelineType = metadata; scalar configs remain rejected instead of guessed.

Validation

  • rtk mvn -pl openmetadata-service -Dtest=IngestionPipelineRepositoryTest test
  • rtk mvn -pl openmetadata-service -Dtest=org.openmetadata.service.migration.utils.v200.MigrationUtilTest test
  • rtk mvn -pl openmetadata-service spotless:check

Greptile Summary

This PR adds repository-level validation that rejects ingestion pipeline create/update requests missing the sourceConfig.config.type discriminator, and ships an idempotent v200 backfill migration that inserts the correct type value for existing metadata ingestion pipelines whose stored JSON is missing it.

  • Validation (IngestionPipelineRepository): validateSourceConfigHasType is called from prepare() on every create/update; it handles raw Map, typed POJOs (via Jackson convertValue), and explicit enum values, returning HTTP 400 for null, scalar, empty-map, and untyped-object configs.
  • Backfill migration (MigrationUtil): Two dialect-specific SQL paths (MySQL JSON_SET via inner JOIN, Postgres jsonb_set via UPDATE FROM) write the service-type-to-config-type mapping; the WHERE clause (IS NULL + JSON_TYPE = 'OBJECT') makes the migration fully idempotent and limits it to pipelineType = 'metadata' rows.
  • Tests: Unit tests cover all rejection paths and both acceptance paths for validation, and SQL-shape assertions verify both dialect branches of the backfill.

Confidence Score: 5/5

Safe to merge — the validation is a straightforward null/type guard on every create/update, and the backfill migration is idempotent and scoped to object-typed metadata pipelines with a missing discriminator.

The repository validation correctly handles all config shapes (null, scalar, empty map, typed POJO, raw map with type), the backfill migration uses a JOIN-based approach that avoids the silent-null CASE risk from a prior design, and both dialect variants are guarded by idempotency conditions. Tests cover the key paths. No data-loss or correctness issues found.

No files require special attention.

Important Files Changed

Filename Overview
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java Adds validateSourceConfigHasType static helper called from prepare(); rejects null/scalar/untyped sourceConfig.config with a 400 — logic is correct and covers both raw Map and typed POJO configs.
openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java Adds backfillMetadataSourceConfigTypes with separate MySQL (JOIN + JSON_SET) and Postgres (UPDATE FROM + jsonb_set) paths; idempotent by design; the JOIN-based approach avoids the CASE-without-ELSE silent-null risk from the prior iteration.
openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java Wires backfillMetadataSourceConfigTypes(handle) into the MySQL v200 migration runBeforeMigration sequence.
openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java Mirrors the MySQL migration — wires backfillMetadataSourceConfigTypes(handle) into the Postgres v200 migration runBeforeMigration sequence.
openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/IngestionPipelineRepositoryTest.java Adds unit tests for all five validation paths: null config, empty map, scalar config, raw map with type, and typed POJO with default type; good coverage of the happy and sad paths.
openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java Adds tests that verify the correct SQL dialect is chosen and that the SQL contains the expected JOIN, path operators, and WHERE clauses for both MySQL and Postgres variants.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant IngestionPipelineResource
    participant IngestionPipelineRepository
    participant DB

    Client->>IngestionPipelineResource: POST/PUT /ingestionPipelines (sourceConfig.config)
    IngestionPipelineResource->>IngestionPipelineRepository: create / update
    IngestionPipelineRepository->>IngestionPipelineRepository: prepare()
    IngestionPipelineRepository->>IngestionPipelineRepository: validateSourceConfigHasType()

    alt config is null or sourceConfig is null
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required
    else config is a scalar (String/Number)
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config must be an object with type
    else config is Map or POJO with no type
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required
    else config has non-blank String or Enum type
        IngestionPipelineRepository->>DB: persist entity
        DB-->>Client: 201 / 200
    end

    Note over DB: v200 migration (backfill)<br/>UPDATE ingestion_pipeline_entity<br/>JOIN entity_relationship<br/>SET json.sourceConfig.config.type<br/>WHERE pipelineType=metadata<br/>AND type IS NULL<br/>AND config IS OBJECT
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant IngestionPipelineResource
    participant IngestionPipelineRepository
    participant DB

    Client->>IngestionPipelineResource: POST/PUT /ingestionPipelines (sourceConfig.config)
    IngestionPipelineResource->>IngestionPipelineRepository: create / update
    IngestionPipelineRepository->>IngestionPipelineRepository: prepare()
    IngestionPipelineRepository->>IngestionPipelineRepository: validateSourceConfigHasType()

    alt config is null or sourceConfig is null
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required
    else config is a scalar (String/Number)
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config must be an object with type
    else config is Map or POJO with no type
        IngestionPipelineRepository-->>Client: 400 sourceConfig.config.type is required
    else config has non-blank String or Enum type
        IngestionPipelineRepository->>DB: persist entity
        DB-->>Client: 201 / 200
    end

    Note over DB: v200 migration (backfill)<br/>UPDATE ingestion_pipeline_entity<br/>JOIN entity_relationship<br/>SET json.sourceConfig.config.type<br/>WHERE pipelineType=metadata<br/>AND type IS NULL<br/>AND config IS OBJECT
Loading

Reviews (2): Last reviewed commit: "migration: avoid null source config back..." | Re-trigger Greptile

@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels Jun 29, 2026
@github-actions

Copy link
Copy Markdown
Contributor

The Java checkstyle failed.

Please run mvn spotless:apply in the root of your repository and commit the changes to this PR.
You can also use pre-commit to automate the Java code formatting.

You can install the pre-commit hooks with make install_test precommit_install.

@ayush-shah ayush-shah marked this pull request as ready for review June 29, 2026 13:33
@github-actions

github-actions Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

❌ PR checklist incomplete

This PR cannot be merged until the following are addressed on its linked issue:

The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically.

Maintainers can bypass this check by adding the skip-pr-checks label.

Comment on lines +132 to +157
: "UPDATE ingestion_pipeline_entity i "
+ "SET json = jsonb_set(i.json, '{sourceConfig,config,type}', "
+ "to_jsonb((CASE er.fromentity "
+ "WHEN 'apiService' THEN 'ApiMetadata' "
+ "WHEN 'dashboardService' THEN 'DashboardMetadata' "
+ "WHEN 'databaseService' THEN 'DatabaseMetadata' "
+ "WHEN 'driveService' THEN 'DriveMetadata' "
+ "WHEN 'mcpService' THEN 'McpMetadata' "
+ "WHEN 'messagingService' THEN 'MessagingMetadata' "
+ "WHEN 'mlmodelService' THEN 'MlModelMetadata' "
+ "WHEN 'pipelineService' THEN 'PipelineMetadata' "
+ "WHEN 'searchService' THEN 'SearchMetadata' "
+ "WHEN 'securityService' THEN 'SecurityMetadata' "
+ "WHEN 'storageService' THEN 'StorageMetadata' "
+ "END)::text), true) "
+ "FROM entity_relationship er "
+ "WHERE er.toid = i.id "
+ "AND er.toentity = 'ingestionPipeline' "
+ "AND er.relation = 0 "
+ "AND er.deleted = false "
+ "AND i.json ->> 'pipelineType' = 'metadata' "
+ "AND i.json #>> '{sourceConfig,config,type}' IS NULL "
+ "AND jsonb_typeof(i.json #> '{sourceConfig,config}') = 'object' "
+ "AND er.fromentity IN ('apiService', 'dashboardService', 'databaseService', "
+ "'driveService', 'mcpService', 'messagingService', 'mlmodelService', "
+ "'pipelineService', 'searchService', 'securityService', 'storageService')";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Postgres CASE also has no ELSE, same silent-null risk

The Postgres variant has the same structural gap: if the CASE expression in to_jsonb((CASE ...)::text) reaches the implicit ELSE NULL, then to_jsonb(NULL::text) produces JSON null and jsonb_set writes it into the row. Same mitigation applies — add ELSE er.fromentity or a matching guard so a future list mismatch surfaces as a data error rather than a silent corrupt write.

@ayush-shah

Copy link
Copy Markdown
Member Author

Pushed a fix for the current migration review thread in 080ec95044: the source-config backfill now uses an explicit service-to-config-type mapping join instead of CASE expressions, so unmatched rows are not written with JSON null.\n\nValidation run locally:\n- git diff --check\n- rtk mvn -pl openmetadata-service -DskipTests=false -Dskip.npm -Dskip.yarn -DskipDocker -Dlicense.skip=true -Dcheckstyle.skip=true -Dspotless.check.skip=true -Dtest=org.openmetadata.service.migration.utils.v200.MigrationUtilTest#backfillsMetadataSourceConfigTypesWithMySqlJsonSet,org.openmetadata.service.migration.utils.v200.MigrationUtilTest#backfillsMetadataSourceConfigTypesWithPostgresJsonbSet test\n- rtk mvn -pl openmetadata-service -DskipTests -Dskip.npm -Dskip.yarn -DskipDocker -Dlicense.skip=true spotless:check

@gitar-bot

gitar-bot Bot commented Jun 29, 2026

Copy link
Copy Markdown
Code Review ✅ Approved 3 resolved / 3 findings

Enforces a sourceConfig.config.type discriminator for ingestion pipelines and adds an idempotent v200 migration to backfill existing records. No open issues were found.

✅ 3 resolved
Bug: Backfill added to already-released v1131 migration may never run

📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1131/Migration.java:26 📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1131/Migration.java:26 📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1131/MigrationUtil.java:111-125
backfillDatabaseMetadataSourceConfigType is wired into mysql/v1131/Migration and postgres/v1131/Migration. On main the migration tree already contains much newer versions (up through v200 / 2.0.0). OpenMetadata's MigrationWorkflow/MigrationProcessImpl records applied versions in SERVER_CHANGE_LOG and skips runDataMigration() for any version already processed. Any deployment that has already upgraded past 1.13.1 (i.e. nearly every target user of this fix) will never re-execute v1131's runDataMigration(), so the backfill of the 'known persisted bad database metadata pipeline shape' will silently not run for them. Only brand-new installs upgrading through v1131 would get it. The backfill should be placed in the current in-development/unreleased migration version (e.g. v200 / 2.0.0) so it actually executes for existing deployments. Please confirm which version is the active upgrade target before merging.

Edge Case: Backfill no-ops on scalar sourceConfig.config rows, leaving them typeless

📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1131/MigrationUtil.java:121-124 📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v1131/MigrationUtil.java:126-135 📄 openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java:298-312
The backfill WHERE clause only guards sourceConfig.config IS NOT NULL (MySQL JSON_EXTRACT(... '$.sourceConfig.config') IS NOT NULL, Postgres i.json #> '{sourceConfig,config}' IS NOT NULL). If a persisted row stored sourceConfig.config as a scalar (e.g. the string "DatabaseMetadata") rather than an object, the row still matches the predicate, but JSON_SET(..., '$.sourceConfig.config.type', ...) / jsonb_set(..., '{sourceConfig,config,type}', ...) cannot add a member to a scalar and effectively no-ops (MySQL returns the document unchanged; Postgres jsonb_set on a non-object path also yields no member). Those rows therefore remain without a usable type, yet the new validateSourceConfigHasType will reject any subsequent create/update of them with HTTP 400. If the scalar shape is among the 'known bad' shapes this PR intends to fix, it is not covered. Consider detecting/handling the scalar-config case (or asserting it never occurs) and add a test for it.

Edge Case: Backfill only types databaseService metadata pipelines

📄 openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java:103-117
backfillDatabaseMetadataSourceConfigType only sets sourceConfig.config.type = 'DatabaseMetadata' for pipelines joined via er.fromEntity = 'databaseService' with pipelineType = 'metadata'. Metadata pipelines for other service types (dashboardService, messagingService, etc.) that were also persisted without a sourceConfig.config.type discriminator are not backfilled. If the new repository-level validation (described in the PR summary but not part of this delta) rejects any untyped sourceConfig.config across all service types, those existing pipelines would start failing on their next create/update without a corresponding backfill. If the intent is genuinely database-only, this is fine; otherwise consider broadening the backfill (or adding parallel statements) to cover the other service-type metadata config discriminators. Flagging as minor since the validation code is not in this diff and cannot be verified here.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: Ingestion workflow redeployment fails due to missing sourceConfig.config.type in automated pipeline creation

1 participant