Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ public PipelineServiceClientResponse runPipeline(
String pipelineName = sanitizeName(ingestionPipeline.getName());
String runId = UUID.randomUUID().toString();
String correlationId = runId.substring(0, 8);
String jobName = JOB_PREFIX + pipelineName + "-" + correlationId;
String jobSuffix = "-" + correlationId;
String jobName = buildKubernetesName(JOB_PREFIX, ingestionPipeline.getName(), jobSuffix);

MDC.put(MDC_CORRELATION_ID, correlationId);
MDC.put(MDC_PIPELINE_NAME, pipelineName);
Expand Down Expand Up @@ -1318,8 +1319,7 @@ CronOMJob buildCronOMJob(IngestionPipeline pipeline) {
}

private V1JobSpec buildJobSpecForCronJob(IngestionPipeline pipeline) {
String pipelineName = sanitizeName(pipeline.getName());
String configMapName = CONFIG_MAP_PREFIX + pipelineName;
String configMapName = buildKubernetesName(CONFIG_MAP_PREFIX, pipeline.getName());

return new V1JobSpec()
.backoffLimit(k8sConfig.getBackoffLimit())
Expand Down Expand Up @@ -1995,7 +1995,7 @@ private Map<String, String> buildLabels(IngestionPipeline pipeline, String runId
labels.put(LABEL_APP, LABEL_VALUE_OPENMETADATA);
labels.put(LABEL_COMPONENT, LABEL_VALUE_INGESTION);
labels.put(LABEL_MANAGED_BY, LABEL_VALUE_OPENMETADATA);
labels.put(LABEL_PIPELINE, sanitizeName(pipeline.getName()));
labels.put(LABEL_PIPELINE, buildKubernetesName("", pipeline.getName()));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Label selector mismatch: query uses raw sanitizeName but labels use buildKubernetesName

The buildLabels() method (line 1998) now uses buildKubernetesName("", pipeline.getName()) for the LABEL_PIPELINE value, which truncates long names to 63 chars. However, several methods that query by label selector still use sanitizeName() directly (which no longer truncates):

  • killIngestion (line 796): LABEL_PIPELINE + "=" + pipelineName
  • getQueuedPipelineStatusInternal (line 898): LABEL_PIPELINE + "=" + pipelineName
  • getLastIngestionLogs (line 1083): labelSelectorMap.put(LABEL_PIPELINE, pipelineName)
  • deleteJobsForPipeline (called from deletePipeline): LABEL_PIPELINE + "=" + pipelineName

For long pipeline names, the label value stored on the resource is truncated but the label selector used in queries is not, so queries will return zero results. This means: kill won't find running jobs, delete won't clean up jobs, queued status returns empty, and logs can't be retrieved.

Suggested fix:

Use buildKubernetesName("", pipeline.getName()) consistently
for label values and selectors, or extract a shared helper:

private String buildPipelineLabel(String name) {
  return buildKubernetesName("", name);
}

Then use it in both buildLabels() and all query methods.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

labels.put(LABEL_PIPELINE_TYPE, pipeline.getPipelineType().toString().toLowerCase());

if (runId != null && !SCHEDULED_RUN_ID.equals(runId)) {
Expand Down Expand Up @@ -2214,17 +2214,13 @@ private String convertToCronSchedule(String airflowSchedule) {

String sanitizeName(String name) {
// K8s names must be lowercase, alphanumeric, or '-'
// Must start with alphanumeric, max 63 chars
// Must start with alphanumeric. Truncation is handled by buildKubernetesName.
String sanitized =
name.toLowerCase()
.replaceAll("[^a-z0-9-]", "-")
.replaceAll("-+", "-")
.replaceAll("^-+|-+$", "");

if (sanitized.length() > 53) { // Leave room for prefixes
sanitized = sanitized.substring(0, 53).replaceAll("-+$", "");
}

if (!sanitized.matches("^[a-z0-9].*")) {
sanitized = "p-" + sanitized;
}
Expand All @@ -2238,12 +2234,17 @@ private String buildKubernetesName(String prefix, String name) {

private String buildKubernetesName(String prefix, String name, String suffix) {
String sanitized = sanitizeName(name);
// Kubernetes names and label values have a 63-character limit.
int maxBaseLength = KUBERNETES_NAME_MAX_LENGTH - prefix.length() - suffix.length();
if (maxBaseLength < 1) {
throw new IllegalArgumentException("Kubernetes name prefix/suffix leaves no room for base");
throw new IllegalArgumentException(
String.format(
"Kubernetes name prefix '%s' and suffix '%s' leave no room for base name",
prefix, suffix));
}

if (sanitized.length() > maxBaseLength) {
// Truncate and remove trailing dashes to keep it a valid K8s name
sanitized = sanitized.substring(0, maxBaseLength).replaceAll("-+$", "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1705,6 +1705,44 @@ private static V1Pod pod(
.spec(new V1PodSpec().containers(containers));
}

@Test
void testBuildLabelsTruncatesLongPipelineNameToKubernetesLimit() {
// Tests P0: Label values must be <= 63 characters
String longName = "a".repeat(95);
IngestionPipeline pipeline = createTestPipeline(longName, null);

Map<String, String> labels =
assertDoesNotThrow(() -> invokePrivate(client, "buildLabels",
new Class<?>[] {IngestionPipeline.class, String.class}, pipeline, "run-123"));

String pipelineLabel = labels.get("app.kubernetes.io/pipeline");
assertNotNull(pipelineLabel);
assertTrue(
pipelineLabel.length() <= 63,
String.format("Label length %d exceeds Kubernetes limit of 63", pipelineLabel.length()));
assertTrue(pipelineLabel.startsWith("aaaaaaaa"));
}

@Test
void testJobNameTruncationWithLongPipelineName() {
// Tests P0: Job names (prefix + name + suffix) must be <= 63 characters
String longName = "very-long-pipeline-name-that-exceeds-the-normal-limits-of-kubernetes-resource-naming";
IngestionPipeline pipeline = createTestPipeline(longName, null);
String runId = "12345678-1234-1234-1234-1234567890ab";

// JOB_PREFIX (7) + name + suffix (9)
String jobName =
assertDoesNotThrow(() -> invokePrivate(client, "buildKubernetesName",
new Class<?>[] {String.class, String.class, String.class}, "om-job-", pipeline.getName(), "-" + runId.substring(0, 8)));

assertNotNull(jobName);
assertTrue(
jobName.length() <= 63,
String.format("Job name length %d exceeds Kubernetes limit of 63: %s", jobName.length(), jobName));
assertTrue(jobName.startsWith("om-job-"));
assertTrue(jobName.endsWith("-12345678"));
}

private static Workflow createTestWorkflow(String name) {
Workflow workflow = new Workflow();
workflow.setId(UUID.randomUUID());
Expand Down
Loading