Skip to content

Commit 61ce27c

Browse files
committed
[Fix-18074][Task-Plugin] Fix SQL task parameter type error when passing from upstream to downstream
- Inject VarPool parameters that don't exist in downstream task's local params into prepareParamsMap for placeholder resolution - Use Matcher.quoteReplacement() in replaceOriginalValue() to prevent $ being treated as regex group reference - Add null safety check for parameter value before replacement - Add unit tests for both fixes
1 parent f5accbc commit 61ce27c

4 files changed

Lines changed: 154 additions & 3 deletions

File tree

dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
199199
safePutAll(prepareParamsMap, commandParamsMap);
200200
}
201201

202-
// 6. VarPool: override values only for existing IN-direction parameters
202+
// 6. VarPool: override values for existing IN-direction parameters,
203+
// and inject new parameters that only exist in VarPool for placeholder resolution
203204
List<Property> varPools = parseVarPool(taskInstance);
204205
if (CollectionUtils.isNotEmpty(varPools)) {
205206
for (Property varPool : varPools) {
@@ -208,7 +209,11 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
208209
}
209210
Property targetParam = prepareParamsMap.get(varPool.getProp());
210211
if (targetParam != null && Direct.IN.equals(targetParam.getDirect())) {
212+
// Existing IN parameter: override its value
211213
targetParam.setValue(varPool.getValue());
214+
} else if (targetParam == null) {
215+
// Parameter not in map: inject it for placeholder resolution
216+
prepareParamsMap.put(varPool.getProp(), varPool);
212217
}
213218
}
214219
}

dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,4 +538,81 @@ public void testResolvePlaceholders() throws Exception {
538538
// Ensure no unintended side effects
539539
Assertions.assertEquals(4, paramsMap.size());
540540
}
541+
542+
@Test
543+
public void testParamParsingPreparation_varPoolInjectionWhenParamNameNotInLocalParams() {
544+
// Test scenario: upstream task outputs p1=111, downstream task has p2=${p1} but no local p1
545+
TaskInstance taskInstance = new TaskInstance();
546+
taskInstance.setId(1);
547+
taskInstance.setTaskCode(1000001L);
548+
taskInstance.setTaskDefinitionVersion(1);
549+
taskInstance.setExecutePath("home/path/execute");
550+
551+
// VarPool from upstream: p1=111 (OUT direction)
552+
taskInstance.setVarPool("[{\"prop\":\"p1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"111\"}]");
553+
554+
TaskDefinition taskDefinition = new TaskDefinition();
555+
taskDefinition.setName("TaskName-1");
556+
taskDefinition.setCode(1000001L);
557+
taskDefinition.setVersion(1);
558+
559+
WorkflowInstance workflowInstance = new WorkflowInstance();
560+
workflowInstance.setId(2);
561+
final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder()
562+
.timeZone("Asia/Shanghai")
563+
.build();
564+
workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
565+
workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
566+
workflowInstance.setGlobalParams("[]");
567+
workflowInstance.setScheduleTime(DateUtils.stringToDate("2024-01-01 00:00:00"));
568+
569+
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
570+
workflowDefinition.setName("ProcessName-1");
571+
workflowDefinition.setProjectName("ProjectName");
572+
workflowDefinition.setProjectCode(3000001L);
573+
workflowDefinition.setCode(200001L);
574+
575+
Project project = new Project();
576+
project.setName("ProjectName");
577+
project.setCode(3000001L);
578+
579+
workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
580+
workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
581+
taskInstance.setTaskCode(taskDefinition.getCode());
582+
taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
583+
taskInstance.setProjectCode(workflowDefinition.getProjectCode());
584+
taskInstance.setWorkflowInstanceId(workflowInstance.getId());
585+
586+
// Local params: p2=${p1} (references p1 which only exists in varPool, not in local params)
587+
org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters sqlParameters =
588+
new org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters();
589+
sqlParameters.setType("MYSQL");
590+
sqlParameters.setSql("select 1");
591+
592+
Property p2 = new Property();
593+
p2.setProp("p2");
594+
p2.setDirect(Direct.IN);
595+
p2.setType(DataType.VARCHAR);
596+
p2.setValue("${p1}");
597+
sqlParameters.setLocalParams(Collections.singletonList(p2));
598+
599+
taskInstance.setTaskParams(JSONUtils.toJsonString(sqlParameters));
600+
601+
Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());
602+
603+
Map<String, Property> propertyMap =
604+
curingParamsServiceImpl.paramParsingPreparation(taskInstance, sqlParameters, workflowInstance,
605+
project.getName(), workflowDefinition.getName());
606+
607+
Assertions.assertNotNull(propertyMap);
608+
609+
// Assert: p1 should be injected from varPool
610+
Assertions.assertTrue(propertyMap.containsKey("p1"), "p1 should be injected from varPool");
611+
Assertions.assertEquals("111", propertyMap.get("p1").getValue());
612+
613+
// Assert: p2 should exist and its value should be resolved from ${p1} to 111
614+
Assertions.assertTrue(propertyMap.containsKey("p2"));
615+
Assertions.assertEquals("111", propertyMap.get("p2").getValue(),
616+
"p2's value ${p1} should be resolved to 111 using injected varPool param");
617+
}
541618
}

dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,12 @@ private String replaceOriginalValue(String content, String rgex, Map<String, Pro
492492
break;
493493
}
494494
String paramName = m.group(1);
495-
String paramValue = sqlParamsMap.get(paramName).getValue();
496-
content = m.replaceFirst(paramValue);
495+
Property prop = sqlParamsMap.get(paramName);
496+
if (prop == null || prop.getValue() == null) {
497+
log.warn("Cannot find parameter: {} for !{{}} replacement, skipping", paramName, paramName);
498+
break;
499+
}
500+
content = m.replaceFirst(Matcher.quoteReplacement(prop.getValue()));
497501
}
498502
return content;
499503
}

dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,4 +489,69 @@ private ResourceParametersHelper getResourceParametersHelperWithDatasourceType(D
489489
return resourceParametersHelper;
490490
}
491491

492+
@Test
493+
void testReplaceOriginalValue_withSpecialCharacters() throws Exception {
494+
Map<String, Property> paramsMap = new HashMap<>();
495+
496+
Property p1 = new Property();
497+
p1.setProp("price");
498+
p1.setValue("$100");
499+
paramsMap.put("price", p1);
500+
501+
Property p2 = new Property();
502+
p2.setProp("path");
503+
p2.setValue("C:\\Users\\test");
504+
paramsMap.put("path", p2);
505+
506+
Property p3 = new Property();
507+
p3.setProp("empty");
508+
p3.setValue("");
509+
paramsMap.put("empty", p3);
510+
511+
Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
512+
method.setAccessible(true);
513+
514+
// The regex pattern matches optional quotes around !{...}, so they are replaced too
515+
String sql1 = "select * from items where price = '!{price}'";
516+
String result1 = (String) method.invoke(sqlTask, sql1, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
517+
Assertions.assertEquals("select * from items where price = $100", result1);
518+
519+
String sql2 = "load data local inpath \"!{path}\" into table t";
520+
String result2 = (String) method.invoke(sqlTask, sql2, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
521+
Assertions.assertEquals("load data local inpath C:\\Users\\test into table t", result2);
522+
523+
String sql3 = "select !{empty} as val";
524+
String result3 = (String) method.invoke(sqlTask, sql3, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
525+
Assertions.assertEquals("select as val", result3);
526+
}
527+
528+
@Test
529+
void testReplaceOriginalValue_paramNotFound_keepsOriginalText() throws Exception {
530+
Map<String, Property> paramsMap = new HashMap<>();
531+
532+
Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
533+
method.setAccessible(true);
534+
535+
String sql = "select * from t where name = '!{unknownParam}'";
536+
String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
537+
Assertions.assertEquals(sql, result);
538+
}
539+
540+
@Test
541+
void testReplaceOriginalValue_paramValueNull_keepsOriginalText() throws Exception {
542+
Map<String, Property> paramsMap = new HashMap<>();
543+
544+
Property p = new Property();
545+
p.setProp("name");
546+
p.setValue(null);
547+
paramsMap.put("name", p);
548+
549+
Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
550+
method.setAccessible(true);
551+
552+
String sql = "select * from t where name = '!{name}'";
553+
String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
554+
Assertions.assertEquals(sql, result);
555+
}
556+
492557
}

0 commit comments

Comments
 (0)