Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export function ParameterExpressionField({ FieldWrapper, ...props }: ParameterEx
);

const fieldErrors = getValidationErrorsForField(errors, parameter.name);
const outputVarErrors = getValidationErrorsForField(errors, "outputVar");

const parameterDefinition = useMemo(
() => findParamDefinitionByName(parameterDefinitions, parameter.name),
Expand All @@ -86,6 +87,7 @@ export function ParameterExpressionField({ FieldWrapper, ...props }: ParameterEx
testResultsToShow={testResultsState.testResultsToShow}
variableTypes={variableTypes}
fieldErrors={fieldErrors}
outputVarErrors={outputVarErrors}
endAdornment={endAdornment}
inputAdornmentEnd={inputAdornmentEnd}
/>
Expand All @@ -94,6 +96,7 @@ export function ParameterExpressionField({ FieldWrapper, ...props }: ParameterEx
endAdornment,
inputAdornmentEnd,
fieldErrors,
outputVarErrors,
isEditMode,
listFieldPath,
node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { UIParameter } from "../../../types/definition";
import type { NodeType } from "../../../types/node";
import type { NodeValidationError } from "../../../types/validation";
import { DescriptionField } from "./DescriptionField";
import { FLINK_SQL_PARAM } from "./editors/expression/ExpressionField";
import { FieldType } from "./editors/field/Field";
import { NameField } from "./NameField";
import { findParameters } from "./NodeDetailsContent/helpers";
Expand Down Expand Up @@ -42,11 +43,12 @@ export function CustomNode({
(): boolean => !!ProcessUtils.extractComponentDefinition(node, processDefinitionData.components)?.returnType || !!node.outputVar,
[node, processDefinitionData.components],
);
const isFlinkSqlNode = useMemo(() => findParameters(node).some((p) => p.name === FLINK_SQL_PARAM), [node]);

return (
<>
<NameField node={node} isEditMode={isEditMode} showValidation={showValidation} setProperty={setProperty} errors={errors} />
{hasOutputVar && (
{hasOutputVar && !isFlinkSqlNode && (
<NodeField
node={node}
isEditMode={isEditMode}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { ReactNode } from "react";
import React, { useCallback, useMemo } from "react";

import type { NodeResultsForContext } from "../../../../../common/TestResultUtils";
import { useUserSettings } from "../../../../../common/useUserSettings";
import type { UIParameter } from "../../../../../types/definition";
import type { NodeType } from "../../../../../types/node";
import type { VariableTypes } from "../../../../../types/validation";
Expand All @@ -11,9 +12,12 @@ import ExpressionTestResults from "../../tests/ExpressionTestResults";
import EditableEditor from "../EditableEditor";
import type { FieldError } from "../Validators";
import type { OnValueChange } from "./Editor";
import { FlinkSqlTemplateEditor } from "./FlinkSqlTemplateEditor";
import type { ExpressionObj } from "./types";
import { EditorType } from "./types";

export const FLINK_SQL_PARAM = "flinkSqlQuery";

export type ExpressionFieldProps = {
fieldName: string;
fieldLabel: string;
Expand All @@ -27,6 +31,7 @@ export type ExpressionFieldProps = {
testResultsToShow: NodeResultsForContext;
variableTypes: VariableTypes;
fieldErrors: FieldError[];
outputVarErrors?: FieldError[];
endAdornment?: ReactNode;
inputAdornmentEnd?: ReactNode;
};
Expand All @@ -45,6 +50,7 @@ function ExpressionField(props: ExpressionFieldProps): React.JSX.Element {
testResultsToShow,
variableTypes,
fieldErrors,
outputVarErrors,
endAdornment,
inputAdornmentEnd,
} = props;
Expand All @@ -53,6 +59,7 @@ function ExpressionField(props: ExpressionFieldProps): React.JSX.Element {
const exprTextPath = `${exprPath}.expression`;
const expressionObj = useMemo(() => get(editedNode, exprPath), [editedNode, exprPath]);
const editors = useMemo(() => parameterDefinition?.editors || [], [parameterDefinition?.editors]);
const [showFlinkSqlTemplateEditor] = useUserSettings("node.showFlinkSqlTemplateEditor");

const onValueChange: OnValueChange = useCallback(
(value: ExpressionObj) => {
Expand All @@ -61,7 +68,10 @@ function ExpressionField(props: ExpressionFieldProps): React.JSX.Element {
[exprPath, setNodeDataAt],
);

const editor = useMemo(
const isFlinkSqlQuery =
showFlinkSqlTemplateEditor && fieldName === FLINK_SQL_PARAM && editors.some((e) => e.type === EditorType.SQL_PARAMETER_EDITOR);

const sqlEditor = useMemo(
() => (
<EditableEditor
defaultValue={parameterDefinition?.defaultValue}
Expand Down Expand Up @@ -99,6 +109,38 @@ function ExpressionField(props: ExpressionFieldProps): React.JSX.Element {
],
);

const editor = useMemo(() => {
if (isFlinkSqlQuery) {
return (
<FlinkSqlTemplateEditor
expressionObj={expressionObj}
onValueChange={onValueChange}
readOnly={readOnly}
variableTypes={variableTypes}
SqlEditorComponent={sqlEditor}
outputVar={editedNode.outputVar}
onOutputVarChange={(name) => setNodeDataAt("outputVar", name)}
fieldErrors={fieldErrors}
outputVarErrors={outputVarErrors}
showValidation={showValidation}
/>
);
}
return sqlEditor;
}, [
isFlinkSqlQuery,
expressionObj,
onValueChange,
readOnly,
variableTypes,
sqlEditor,
editedNode.outputVar,
setNodeDataAt,
outputVarErrors,
fieldErrors,
showValidation,
]);

const isFixedValues = useMemo(
() =>
editors.some(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { Box, Chip, CircularProgress, FormControl, MenuItem, Select, Tooltip, Typography } from "@mui/material";
import React from "react";
import { useTranslation } from "react-i18next";

import type { VariableTypes } from "../../../../../../types/validation";
import { FormRow } from "./components/FormRow";
import { nuMenuProps, nuSelectSx } from "./components/nuInputSx";
import { getTypeColor } from "./components/typeColors";
import type { InputField } from "./types";
import { buildInputFieldsForVariable, EXCLUDED_INPUT_VARIABLES } from "./types";

interface Props {
fields: InputField[];
onChange: (fields: InputField[]) => void;
readOnly?: boolean;
variableTypes: VariableTypes;
}

function variableHasRecordTime(varName: string, variableTypes: VariableTypes): boolean {
const typingResult = variableTypes[varName];
if (!typingResult || !("fields" in typingResult) || !typingResult.fields) return false;
return "record_time" in (typingResult.fields as Record<string, unknown>);
}

function getAvailableVariables(variableTypes: VariableTypes): string[] {
return Object.entries(variableTypes)
.filter(
([name, v]) =>
!EXCLUDED_INPUT_VARIABLES.has(name) &&
"fields" in (v as object) &&
(v as { fields?: unknown }).fields &&
Object.keys((v as { fields: object }).fields).length > 0,
)
.map(([name]) => name);
}

export function InputFieldsSection({ fields, onChange, readOnly, variableTypes }: Props) {
const { t } = useTranslation();

const availableVars = getAvailableVariables(variableTypes);
const currentVar = fields[0]?.source.split(".")[0] ?? "";
const hasConflictingRecordTime = currentVar ? variableHasRecordTime(currentVar, variableTypes) : false;
const isLoadingTypes =
availableVars.length === 0 &&
Object.keys(variableTypes).some((k) => !EXCLUDED_INPUT_VARIABLES.has(k)) &&
Object.values(variableTypes).every((v) => !("fields" in (v as object)));

const handleVarChange = (varName: string) => {
if (varName) onChange(buildInputFieldsForVariable(varName, variableTypes));
};

return (
<FormRow
label={t("flinkSql.inputFields.variable", "Input variable:")}
tooltip={t(
"flinkSql.inputFields.tooltip",
"The stream variable whose fields are used as the input source. Select a variable to choose which fields are projected into the query.",
)}
>
<Box>
{isLoadingTypes ? (
<Box display="flex" alignItems="center" gap={1} height={35}>
<CircularProgress size={14} />
<Typography variant="body2" color="text.disabled" sx={{ fontSize: "0.85rem" }}>
{t("flinkSql.inputFields.loadingTypes", "Loading variable types\u2026")}
</Typography>
</Box>
) : (
<FormControl size="small" variant="outlined" fullWidth>
<Select
value={currentVar}
onChange={(e) => handleVarChange(e.target.value)}
disabled={readOnly || availableVars.length === 0}
displayEmpty
renderValue={(v) =>
v ? (
`#${v}`
) : (
<Typography variant="body2" color="text.disabled" sx={{ fontSize: "0.85rem" }}>
{t("flinkSql.inputFields.placeholder", "Select variable\u2026")}
</Typography>
)
}
MenuProps={nuMenuProps}
sx={nuSelectSx}
>
{availableVars.map((v) => (
<MenuItem key={v} value={v} sx={{ fontSize: "0.85rem" }}>
#{v}
</MenuItem>
))}
</Select>
</FormControl>
)}

{/* Field chips */}
{fields.length > 0 && (
<Box display="flex" flexWrap="wrap" gap={0.5} mt={0.75}>
{fields.map((f) => (
<Chip
key={f.name}
label={f.alias}
size="small"
sx={{
fontSize: "0.7rem",
fontWeight: 500,
height: 20,
backgroundColor: `${getTypeColor(f.type)}22`,
borderColor: getTypeColor(f.type),
border: `1px solid ${getTypeColor(f.type)}66`,
color: getTypeColor(f.type),
}}
/>
))}
{hasConflictingRecordTime ? (
<Tooltip
title={t(
"flinkSql.inputFields.recordTimeConflict",
"record_time is always added automatically and cannot be projected from the input variable.",
)}
placement="top"
>
<Chip
label="record_time"
size="small"
sx={{
fontSize: "0.7rem",
fontWeight: 500,
height: 20,
backgroundColor: "action.disabledBackground",
border: "1px solid",
borderColor: "divider",
color: "text.disabled",
textDecoration: "line-through",
cursor: "default",
}}
/>
</Tooltip>
) : (
<Typography
variant="caption"
sx={{ fontSize: "0.68rem", color: "text.disabled", alignSelf: "center", ml: 0.5 }}
>
{t("flinkSql.inputFields.recordTime", "+ record_time")}
</Typography>
)}
</Box>
)}
</Box>
</FormRow>
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import ContentCopyIcon from "@mui/icons-material/ContentCopy";
import ExpandMoreIcon from "@mui/icons-material/ExpandMore";
import { Accordion, AccordionDetails, AccordionSummary, Box, IconButton, Tooltip, Typography } from "@mui/material";
import React, { useState } from "react";
import { useTranslation } from "react-i18next";

interface Props {
sql: string;
}

export function SqlPreview({ sql }: Props) {
const { t } = useTranslation();
const [copied, setCopied] = useState(false);

const handleCopy = () => {
navigator.clipboard.writeText(sql).then(() => {
setCopied(true);
setTimeout(() => setCopied(false), 1500);
});
};

return (
<Accordion
disableGutters
elevation={0}
sx={(theme) => ({
backgroundColor: theme.palette.background.default,
border: `1px solid ${theme.palette.divider}`,
"&:before": { display: "none" },
borderRadius: "6px !important",
overflow: "hidden",
})}
>
<AccordionSummary
expandIcon={<ExpandMoreIcon sx={{ fontSize: "1.1rem" }} />}
sx={(theme) => ({
minHeight: 36,
py: 0,
px: 1.5,
"&.Mui-expanded": { minHeight: 36 },
"& .MuiAccordionSummary-content": { my: 0.75 },
})}
>
<Typography
variant="caption"
color="text.secondary"
sx={{ textTransform: "uppercase", letterSpacing: "0.08em", fontWeight: 600, fontSize: "0.7rem" }}
>
{t("flinkSql.preview.title", "Generated SQL")}
</Typography>
</AccordionSummary>
<AccordionDetails sx={{ p: 0, position: "relative" }}>
<Tooltip title={copied ? t("flinkSql.preview.copied", "Copied!") : t("flinkSql.preview.copy", "Copy")}>
<IconButton size="small" onClick={handleCopy} sx={{ position: "absolute", top: 4, right: 4, zIndex: 1 }}>
<ContentCopyIcon sx={{ fontSize: "0.9rem" }} />
</IconButton>
</Tooltip>
<Box
component="pre"
sx={(theme) => ({
m: 0,
p: 1.5,
pr: 5,
fontFamily: "'JetBrains Mono', 'Fira Code', monospace",
fontSize: "0.75rem",
color: theme.palette.text.primary,
overflowX: "auto",
whiteSpace: "pre",
lineHeight: 1.6,
backgroundColor: "rgba(0,0,0,0.15)",
})}
>
{sql}
</Box>
</AccordionDetails>
</Accordion>
);
}
Loading
Loading