Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19789,6 +19789,10 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.AtLeastOnce": {
"description": "AtLeastOnce is the at-least-once delivery settings (default behavior).",
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Authorization": {
"properties": {
"token": {
Expand Down Expand Up @@ -20102,6 +20106,20 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Delivery": {
"description": "Delivery is the delivery semantics for the pipeline.",
"properties": {
"atLeastOnce": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.AtLeastOnce",
"description": "AtLeastOnce enables at-least-once processing semantics (default behavior)."
},
"exactlyOnce": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce",
"description": "ExactlyOnce enables exactly-once processing semantics."
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Edge": {
"properties": {
"conditions": {
Expand All @@ -20125,6 +20143,20 @@
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.ExactlyOnce": {
"description": "ExactlyOnce is the exactly-once delivery settings.",
"properties": {
"consistentAck": {
"description": "ConsistentAck enables consistent acknowledgement of offsets to ISB throughout the pipeline.",
"type": "boolean"
},
"dedupWindow": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "DedupWindow is the duration for which the deduplication will be enabled."
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.FixedWindow": {
"description": "FixedWindow describes a fixed window",
"properties": {
Expand Down Expand Up @@ -21530,6 +21562,10 @@
},
"io.numaproj.numaflow.v1alpha1.PipelineSpec": {
"properties": {
"delivery": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Delivery",
"description": "Delivery is the delivery semantics for the pipeline."
},
"edges": {
"description": "Edges define the relationships between vertices",
"items": {
Expand Down Expand Up @@ -22910,6 +22946,10 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate",
"description": "Container template for the main numa container."
},
"delivery": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Delivery",
"description": "Delivery indicates the delivery semantics for the vertex, it's populated from the pipeline delivery settings."
},
"dnsConfig": {
"$ref": "#/definitions/io.k8s.api.core.v1.PodDNSConfig",
"description": "Specifies the DNS parameters of a pod. Parameters specified here will be merged to the generated DNS configuration based on DNSPolicy."
Expand Down
40 changes: 40 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19793,6 +19793,10 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.AtLeastOnce": {
"description": "AtLeastOnce is the at-least-once delivery settings (default behavior).",
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.Authorization": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -20106,6 +20110,20 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.Delivery": {
"description": "Delivery is the delivery semantics for the pipeline.",
"type": "object",
"properties": {
"atLeastOnce": {
"description": "AtLeastOnce enables at-least-once processing semantics (default behavior).",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.AtLeastOnce"
},
"exactlyOnce": {
"description": "ExactlyOnce enables exactly-once processing semantics.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ExactlyOnce"
}
}
},
"io.numaproj.numaflow.v1alpha1.Edge": {
"type": "object",
"required": [
Expand All @@ -20129,6 +20147,20 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.ExactlyOnce": {
"description": "ExactlyOnce is the exactly-once delivery settings.",
"type": "object",
"properties": {
"consistentAck": {
"description": "ConsistentAck enables consistent acknowledgement of offsets to ISB throughout the pipeline.",
"type": "boolean"
},
"dedupWindow": {
"description": "DedupWindow is the duration for which the deduplication will be enabled.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
"io.numaproj.numaflow.v1alpha1.FixedWindow": {
"description": "FixedWindow describes a fixed window",
"type": "object",
Expand Down Expand Up @@ -21517,6 +21549,10 @@
"io.numaproj.numaflow.v1alpha1.PipelineSpec": {
"type": "object",
"properties": {
"delivery": {
"description": "Delivery is the delivery semantics for the pipeline.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Delivery"
},
"edges": {
"description": "Edges define the relationships between vertices",
"type": "array",
Expand Down Expand Up @@ -22892,6 +22928,10 @@
"description": "Container template for the main numa container.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ContainerTemplate"
},
"delivery": {
"description": "Delivery indicates the delivery semantics for the vertex, it's populated from the pipeline delivery settings.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Delivery"
},
"dnsConfig": {
"description": "Specifies the DNS parameters of a pod. Parameters specified here will be merged to the generated DNS configuration based on DNSPolicy.",
"$ref": "#/definitions/io.k8s.api.core.v1.PodDNSConfig"
Expand Down
10 changes: 10 additions & 0 deletions cmd/commands/isbsvc_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"go.uber.org/zap"
Expand All @@ -40,6 +41,7 @@ func NewISBSvcCreateCommand() *cobra.Command {
buckets []string
sideInputsStore string
servingSourceStore string
dedupWindow string
)

command := &cobra.Command{
Expand Down Expand Up @@ -79,6 +81,13 @@ func NewISBSvcCreateCommand() *cobra.Command {
return err
}
opts = append(opts, isbsvc.WithConfig(isbSvcConfig.JetStream.StreamConfig))
if dedupWindow != "" {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if ppl update the delivery semantics to an existing pipeline?

d, err := time.ParseDuration(dedupWindow)
if err != nil {
return fmt.Errorf("failed to parse dedup window duration %q, %w", dedupWindow, err)
}
opts = append(opts, isbsvc.WithDedupWindow(d))
}
default:
cmd.HelpFunc()(cmd, args)
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
Expand All @@ -98,5 +107,6 @@ func NewISBSvcCreateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to create") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringVar(&servingSourceStore, "serving-store", "", "Serving source streams to create") // --serving-store=a
command.Flags().StringVar(&dedupWindow, "dedup-window", "", "Deduplication window duration for exactly-once processing, e.g. 2m")
return command
}
14 changes: 14 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ spec:
type: object
spec:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ spec:
properties:
pipeline:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down
14 changes: 14 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,20 @@ spec:
type: object
type: object
type: object
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
dnsConfig:
properties:
nameservers:
Expand Down
42 changes: 42 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9769,6 +9769,20 @@ spec:
type: object
spec:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down Expand Up @@ -22874,6 +22888,20 @@ spec:
properties:
pipeline:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down Expand Up @@ -37828,6 +37856,20 @@ spec:
type: object
type: object
type: object
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
dnsConfig:
properties:
nameservers:
Expand Down
42 changes: 42 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9769,6 +9769,20 @@ spec:
type: object
spec:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down Expand Up @@ -22874,6 +22888,20 @@ spec:
properties:
pipeline:
properties:
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
edges:
items:
properties:
Expand Down Expand Up @@ -37828,6 +37856,20 @@ spec:
type: object
type: object
type: object
delivery:
properties:
atLeastOnce:
type: object
exactlyOnce:
properties:
consistentAck:
default: false
type: boolean
dedupWindow:
default: 2m
type: string
type: object
type: object
dnsConfig:
properties:
nameservers:
Expand Down
Loading
Loading