Skip to content
Draft
119 changes: 118 additions & 1 deletion api/stream/design.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ var Sinks = Type("Sinks", ArrayOf(Sink), func() {

var SinkType = Type("SinkType", String, func() {
Meta("struct:field:type", "= definition.SinkType", "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition")
Enum(definition.SinkTypeTable.String())
Enum(definition.SinkTypeTable.String(), definition.SinkTypeJobTrigger.String(), definition.SinkTypeKaiAgent.String())
Example(definition.SinkTypeTable.String())
})

Expand Down Expand Up @@ -1324,6 +1324,30 @@ var SinkFields = func(op OperationType) {
panic(errors.Errorf(`unexpected operation type "%v"`, op))
}

// Job trigger sub-definition
switch op {
case OpRead:
Attribute("jobTrigger", JobTriggerSink)
case OpCreate:
Attribute("jobTrigger", JobTriggerSinkCreateRequest)
case OpUpdate:
Attribute("jobTrigger", JobTriggerSinkUpdateRequest)
default:
Comment thread
Matovidlo marked this conversation as resolved.
panic(errors.Errorf(`unexpected operation type "%v"`, op))
}

// Kai-agent sub-definition
switch op {
case OpRead:
Attribute("kaiAgent", KaiAgentSink)
case OpCreate:
Attribute("kaiAgent", KaiAgentSinkCreateRequest)
case OpUpdate:
Attribute("kaiAgent", KaiAgentSinkUpdateRequest)
default:
panic(errors.Errorf(`unexpected operation type "%v"`, op))
}

// Required fields
switch op {
case OpRead:
Expand Down Expand Up @@ -1364,6 +1388,99 @@ var TableType = Type("TableType", String, func() {
Example(definition.TableTypeKeboola.String())
})

// Job Trigger Sink ----------------------------------------------------------------------------------------------------

var JobTriggerSink = Type("JobTriggerSink", func() {
JobTriggerSinkFields()
Required("componentId", "configId", "branchId")
})

var JobTriggerSinkCreateRequest = Type("JobTriggerSinkCreate", func() {
JobTriggerSinkFields()
Required("componentId", "configId", "branchId")
})

var JobTriggerSinkUpdateRequest = Type("JobTriggerSinkUpdate", func() {
JobTriggerSinkFields()
})

var JobTriggerSinkFields = func() {
Description(fmt.Sprintf(`Job trigger sink configuration for "type" = "%s". Each received record triggers a Keboola Queue job.`, definition.SinkTypeJobTrigger))
Attribute("componentId", String, func() {
Description("ID of the component to run.")
Example("keboola.ex-http")
})
Attribute("configId", String, func() {
Description("ID of the component configuration to run.")
Example("123456")
})
Attribute("branchId", Int, func() {
Description("ID of the branch on which the job runs. Use 0 for the default branch.")
Example(0)
})
Attribute("configDataTemplate", String, func() {
Description(`Optional Jsonnet template evaluated against the incoming HTTP request.
The template output must be a JSON object; it is passed as "configData" to the triggered job.
This allows webhook payload fields to override runtime job parameters.
Available functions: Body(), Header(), Ip(), Now() — same as in table column templates.
If empty, the job runs with the component's default saved configuration.`)
Example(`{ parameters: { url: Body("url") } }`)
})
}

// Kai-Agent Sink ------------------------------------------------------------------------------------------------------

var KaiAgentSink = Type("KaiAgentSink", func() {
KaiAgentSinkFields()
Required("mode")
})

var KaiAgentSinkCreateRequest = Type("KaiAgentSinkCreate", func() {
KaiAgentSinkFields()
Required("mode")
})

var KaiAgentSinkUpdateRequest = Type("KaiAgentSinkUpdate", func() {
KaiAgentSinkFields()
})

var KaiAgentSinkFields = func() {
Description(fmt.Sprintf(`Kai-agent sink configuration for "type" = "%s". Each received record is forwarded to kai-agent.keboola.com.`, definition.SinkTypeKaiAgent))
Attribute("mode", String, func() {
Description(`Selects the kai-agent endpoint: "chat" (POST /api/chat) or "suggestions" (POST /api/suggestions).`)
Enum(string(definition.KaiAgentModeChat), string(definition.KaiAgentModeSuggestions))
Example(string(definition.KaiAgentModeChat))
})
Attribute("chatId", String, func() {
Description(`Optional fixed UUID used as the chat ID for "chat" mode. When empty a new UUID is generated per record.`)
Example("550e8400-e29b-41d4-a716-446655440000")
})
Attribute("messageTemplate", String, func() {
Description(`Optional Jsonnet template for "chat" mode. Its output (a plain string) becomes the message text.
Available functions: Body(), Header(), Ip(), Now() — same as in table column templates.
When empty the raw request body is sent as the message text.`)
Example(`Body("message")`)
})
Attribute("branchId", Int, func() {
Description(`Optional Keboola branch ID forwarded to the chat for context ("chat" mode only). 0 means no branch.`)
Example(0)
})
Attribute("suggestionsContext", String, func() {
Description(`UI context for "suggestions" mode. Required when mode = "suggestions".`)
Enum(
string(definition.KaiAgentSuggestionsContextDashboard),
string(definition.KaiAgentSuggestionsContextJobDetail),
string(definition.KaiAgentSuggestionsContextConfigDetail),
)
Example(string(definition.KaiAgentSuggestionsContextJobDetail))
})
Attribute("dataTemplate", String, func() {
Description(`Optional Jsonnet template for "suggestions" mode. Its output (a JSON object) is passed as the "data" field.
When empty the full parsed JSON body is forwarded as-is.`)
Example(`{ jobId: Body("jobId"), status: Body("status") }`)
})
}

var TableID = Type("TableID", String, func() {
Example("in.c-bucket.table")
})
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading