Skip to content

Commit dbebd03

Browse files
authored
Merge branch 'v2' into #7209
Signed-off-by: Barry Wu <a0987818905@gmail.com>
2 parents 95e38b3 + 96af897 commit dbebd03

29 files changed

Lines changed: 643 additions & 422 deletions

File tree

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ COPY flyteplugins flyteplugins
1717
COPY flytestdlib flytestdlib
1818
COPY gen/go gen/go
1919
COPY actions actions
20+
COPY app app
2021
COPY events events
2122
COPY runs runs
2223
COPY cache_service cache_service

Makefile

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,22 @@ build: verify ## Build all Go service binaries
3131
$(MAKE) -C runs build
3232
$(MAKE) -C executor build
3333

34-
# =============================================================================
35-
# Sandbox Commands
36-
# =============================================================================
37-
38-
.PHONY: sandbox-build
39-
sandbox-build: ## Build and start the flyte sandbox (docker/devbox-bundled)
40-
$(MAKE) -C docker/devbox-bundled build
41-
42-
# Run in dev mode with extra arg FLYTE_DEV=True
43-
.PHONY: sandbox-run
44-
sandbox-run: ## Start the flyte sandbox without rebuilding the image
45-
$(MAKE) -C docker/devbox-bundled start
46-
47-
.PHONY: sandbox-stop
48-
sandbox-stop: ## Stop the flyte sandbox
49-
$(MAKE) -C docker/devbox-bundled stop
50-
5134
# =============================================================================
5235
# Devbox Commands
5336
# =============================================================================
5437

5538
.PHONY: devbox-build
56-
devbox-build: ## Build and start the flyte devbox cluster (docker/devbox-bundled)
39+
devbox-build: ## Build the flyte devbox image (docker/devbox-bundled)
5740
$(MAKE) -C docker/devbox-bundled build
5841

42+
# Run in dev mode with extra arg FLYTE_DEV=True
5943
.PHONY: devbox-run
60-
devbox-run: ## Start the flyte devbox cluster without rebuilding the image
61-
$(MAKE) -C docker/devbox-bundled start
44+
devbox-run: ## Start the flyte devbox and install Knative with app routing config
45+
$(MAKE) -C docker/devbox-bundled start FLYTE_DEV=$(FLYTE_DEV)
46+
$(MAKE) -C docker/devbox-bundled setup-knative
6247

6348
.PHONY: devbox-stop
64-
devbox-stop: ## Stop the flyte devbox cluster
49+
devbox-stop: ## Stop the flyte devbox
6550
$(MAKE) -C docker/devbox-bundled stop
6651

6752
.PHONY: help
@@ -83,10 +68,9 @@ sep:
8368
# Helper to time a step: $(call timed,step_name,command)
8469
define timed
8570
@start=$$(date +%s); \
86-
$(2); rc=$$?; \
71+
$(2); \
8772
elapsed=$$(( $$(date +%s) - $$start )); \
88-
echo "⏱ $(1) completed in $${elapsed}s"; \
89-
exit $$rc
73+
echo "⏱ $(1) completed in $${elapsed}s"
9074
endef
9175

9276
.PHONY: buf-dep

app/config/config.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,33 @@ func DefaultAppConfig() *AppConfig {
2020
CacheTTL: 30 * time.Second,
2121
}
2222
}
23+
24+
// InternalAppConfig holds configuration for the data plane InternalAppService.
25+
type InternalAppConfig struct {
26+
// Enabled controls whether the InternalAppService is started.
27+
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`
28+
29+
// BaseDomain is the base domain used to generate public URLs for apps.
30+
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
31+
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`
32+
33+
// Scheme is the URL scheme used for public app URLs ("http" or "https").
34+
// Defaults to "https" if unset.
35+
Scheme string `json:"scheme" pflag:",URL scheme for app public URLs (http or https)"`
36+
37+
// DefaultRequestTimeout is the request timeout applied to apps that don't specify one.
38+
DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"`
39+
40+
// MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s).
41+
MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"`
42+
43+
// IngressAppsPort is the port appended to the public app URL (e.g. 30081).
44+
// Set to 0 to omit the port when behind a standard 80/443 proxy.
45+
IngressAppsPort int `json:"ingressAppsPort" pflag:",Port for app subdomain URLs (0 = omit)"`
46+
47+
// DefaultEnvVars is a map of environment variables injected into every KService
48+
// pod at deploy time, in addition to any env vars specified in the app spec.
49+
// Use this to inject cluster-internal endpoints (e.g. _U_EP_OVERRIDE) that app
50+
// processes need to connect back to the Flyte manager.
51+
DefaultEnvVars map[string]string `json:"defaultEnvVars" pflag:"-,Default env vars injected into every app pod"`
52+
}

app/internal/config/config.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,7 @@
11
package config
22

3-
import "time"
3+
import appconfig "github.com/flyteorg/flyte/v2/app/config"
44

5-
// InternalAppConfig holds configuration for the data plane app deployment controller.
6-
type InternalAppConfig struct {
7-
// Enabled controls whether the app deployment controller is started.
8-
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`
9-
10-
// BaseDomain is the base domain used to generate public URLs for apps.
11-
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
12-
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`
13-
14-
// DefaultRequestTimeout is the request timeout applied to apps that don't specify one.
15-
DefaultRequestTimeout time.Duration `json:"defaultRequestTimeout" pflag:",Default request timeout for apps"`
16-
17-
// MaxRequestTimeout is the hard cap on request timeout (Knative max is 3600s).
18-
MaxRequestTimeout time.Duration `json:"maxRequestTimeout" pflag:",Maximum allowed request timeout for apps"`
19-
}
5+
// InternalAppConfig is an alias of the public config type so internal packages
6+
// can import it without depending on the public app/config path directly.
7+
type InternalAppConfig = appconfig.InternalAppConfig

app/internal/k8s/app_client.go

Lines changed: 118 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
1414
corev1 "k8s.io/api/core/v1"
1515
k8serrors "k8s.io/apimachinery/pkg/api/errors"
16+
k8sresource "k8s.io/apimachinery/pkg/api/resource"
1617
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1718
"k8s.io/apimachinery/pkg/types"
1819
k8swatch "k8s.io/apimachinery/pkg/watch"
@@ -21,8 +22,10 @@ import (
2122
"sigs.k8s.io/controller-runtime/pkg/client"
2223

2324
"github.com/flyteorg/flyte/v2/app/internal/config"
25+
"github.com/flyteorg/flyte/v2/flytestdlib/k8s"
2426
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
2527
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
28+
flytecore "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
2629
)
2730

2831
const (
@@ -55,10 +58,9 @@ type AppK8sClientInterface interface {
5558
GetStatus(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.Status, error)
5659

5760
// List returns apps for the given project/domain scope with optional pagination.
58-
// If appName is non-empty, only the app with that name is returned.
5961
// limit=0 means no limit. token is the K8s continue token from a previous call.
6062
// Returns the apps, the continue token for the next page (empty if last page), and any error.
61-
List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error)
63+
List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error)
6264

6365
// Delete removes the KService CRD entirely. The app must be re-created from scratch.
6466
// Use Stop to scale to zero while preserving the KService.
@@ -104,6 +106,10 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error {
104106
ns := appNamespace(appID.GetProject(), appID.GetDomain())
105107
name := kserviceName(appID)
106108

109+
if err := k8s.EnsureNamespaceExists(ctx, c.k8sClient, ns); err != nil {
110+
return fmt.Errorf("failed to ensure namespace %s: %w", ns, err)
111+
}
112+
107113
ksvc, err := c.buildKService(app)
108114
if err != nil {
109115
return fmt.Errorf("failed to build KService for app %s: %w", name, err)
@@ -122,15 +128,27 @@ func (c *AppK8sClient) Deploy(ctx context.Context, app *flyteapp.App) error {
122128
return fmt.Errorf("failed to get KService %s: %w", name, err)
123129
}
124130

125-
// Skip update if spec has not changed.
126131
if existing.Annotations[annotationSpecSHA] == ksvc.Annotations[annotationSpecSHA] {
127132
logger.Debugf(ctx, "KService %s/%s spec unchanged, skipping update", ns, name)
128133
return nil
129134
}
130135

131136
existing.Spec = ksvc.Spec
132-
existing.Labels = ksvc.Labels
133-
existing.Annotations = ksvc.Annotations
137+
// Merge labels and annotations rather than replacing them wholesale.
138+
// Knative sets immutable annotations (e.g. serving.knative.dev/creator)
139+
// on creation; overwriting them causes the admission webhook to reject the update.
140+
if existing.Labels == nil {
141+
existing.Labels = make(map[string]string)
142+
}
143+
for k, v := range ksvc.Labels {
144+
existing.Labels[k] = v
145+
}
146+
if existing.Annotations == nil {
147+
existing.Annotations = make(map[string]string)
148+
}
149+
for k, v := range ksvc.Annotations {
150+
existing.Annotations[k] = v
151+
}
134152
if err := c.k8sClient.Update(ctx, existing); err != nil {
135153
return fmt.Errorf("failed to update KService %s: %w", name, err)
136154
}
@@ -416,16 +434,12 @@ func (c *AppK8sClient) GetStatus(ctx context.Context, appID *flyteapp.Identifier
416434
}
417435

418436
// List returns apps for the given project/domain scope with optional pagination.
419-
func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string, limit uint32, token string) ([]*flyteapp.App, string, error) {
437+
func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit uint32, token string) ([]*flyteapp.App, string, error) {
420438
ns := appNamespace(project, domain)
421439

422-
matchLabels := client.MatchingLabels{labelAppManaged: "true"}
423-
if appName != "" {
424-
matchLabels[labelAppName] = strings.ToLower(appName)
425-
}
426440
listOpts := []client.ListOption{
427441
client.InNamespace(ns),
428-
matchLabels,
442+
client.MatchingLabels{labelAppManaged: "true"},
429443
}
430444
if limit > 0 {
431445
listOpts = append(listOpts, client.Limit(int64(limit)))
@@ -451,6 +465,25 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain, appName string
451465
return apps, list.Continue, nil
452466
}
453467

468+
// publicIngress returns the deterministic public URL for an app using the same
469+
// logic as the service layer so GetStatus/List/Watch are consistent with Create.
470+
func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress {
471+
if c.cfg.BaseDomain == "" {
472+
return nil
473+
}
474+
scheme := c.cfg.Scheme
475+
if scheme == "" {
476+
scheme = "https"
477+
}
478+
host := strings.ToLower(fmt.Sprintf("%s-%s-%s.%s",
479+
id.GetName(), id.GetProject(), id.GetDomain(), c.cfg.BaseDomain))
480+
url := scheme + "://" + host
481+
if c.cfg.IngressAppsPort != 0 {
482+
url += fmt.Sprintf(":%d", c.cfg.IngressAppsPort)
483+
}
484+
return &flyteapp.Ingress{PublicUrl: url}
485+
}
486+
454487
// --- Helpers ---
455488

456489
// kserviceName returns the KService name for an app. Since each app is deployed
@@ -495,6 +528,15 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err
495528
if err != nil {
496529
return nil, err
497530
}
531+
// Inject cluster-level default env vars (e.g. _U_EP_OVERRIDE) before user vars
532+
// so they can be overridden by app-specific env vars if needed.
533+
if len(c.cfg.DefaultEnvVars) > 0 && len(podSpec.Containers) > 0 {
534+
defaults := make([]corev1.EnvVar, 0, len(c.cfg.DefaultEnvVars))
535+
for k, v := range c.cfg.DefaultEnvVars {
536+
defaults = append(defaults, corev1.EnvVar{Name: k, Value: v})
537+
}
538+
podSpec.Containers[0].Env = append(defaults, podSpec.Containers[0].Env...)
539+
}
498540

499541
templateAnnotations := buildAutoscalingAnnotations(spec, c.cfg)
500542

@@ -546,17 +588,28 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) {
546588
case *flyteapp.Spec_Container:
547589
c := p.Container
548590
container := corev1.Container{
549-
Name: "app",
550-
Image: c.GetImage(),
551-
Args: c.GetArgs(),
591+
Name: "app",
592+
Image: c.GetImage(),
593+
Command: c.GetCommand(),
594+
Args: c.GetArgs(),
552595
}
553596
for _, e := range c.GetEnv() {
554597
container.Env = append(container.Env, corev1.EnvVar{
555598
Name: e.GetKey(),
556599
Value: e.GetValue(),
557600
})
558601
}
559-
return corev1.PodSpec{Containers: []corev1.Container{container}}, nil
602+
for _, p := range c.GetPorts() {
603+
container.Ports = append(container.Ports, corev1.ContainerPort{
604+
ContainerPort: int32(p.GetContainerPort()),
605+
Name: p.GetName(),
606+
})
607+
}
608+
container.Resources = buildResourceRequirements(c.GetResources())
609+
return corev1.PodSpec{
610+
Containers: []corev1.Container{container},
611+
EnableServiceLinks: boolPtr(false),
612+
}, nil
560613

561614
case *flyteapp.Spec_Pod:
562615
// K8sPod payloads are not yet supported — the pod spec serialization
@@ -568,6 +621,49 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) {
568621
}
569622
}
570623

624+
// buildResourceRequirements maps flyteidl2 Resources to corev1.ResourceRequirements.
625+
func buildResourceRequirements(res *flytecore.Resources) corev1.ResourceRequirements {
626+
if res == nil {
627+
return corev1.ResourceRequirements{}
628+
}
629+
reqs := corev1.ResourceRequirements{}
630+
if len(res.GetRequests()) > 0 {
631+
reqs.Requests = make(corev1.ResourceList)
632+
for _, e := range res.GetRequests() {
633+
if name, ok := protoResourceName(e.GetName()); ok {
634+
reqs.Requests[name] = k8sresource.MustParse(e.GetValue())
635+
}
636+
}
637+
}
638+
if len(res.GetLimits()) > 0 {
639+
reqs.Limits = make(corev1.ResourceList)
640+
for _, e := range res.GetLimits() {
641+
if name, ok := protoResourceName(e.GetName()); ok {
642+
reqs.Limits[name] = k8sresource.MustParse(e.GetValue())
643+
}
644+
}
645+
}
646+
return reqs
647+
}
648+
649+
// protoResourceName maps a flyteidl2 ResourceName to the equivalent corev1.ResourceName.
650+
func protoResourceName(name flytecore.Resources_ResourceName) (corev1.ResourceName, bool) {
651+
switch name {
652+
case flytecore.Resources_CPU:
653+
return corev1.ResourceCPU, true
654+
case flytecore.Resources_MEMORY:
655+
return corev1.ResourceMemory, true
656+
case flytecore.Resources_STORAGE:
657+
return corev1.ResourceStorage, true
658+
case flytecore.Resources_EPHEMERAL_STORAGE:
659+
return corev1.ResourceEphemeralStorage, true
660+
default:
661+
return "", false
662+
}
663+
}
664+
665+
func boolPtr(b bool) *bool { return &b }
666+
571667
// buildAutoscalingAnnotations returns the Knative autoscaling annotations for the revision template.
572668
func buildAutoscalingAnnotations(spec *flyteapp.Spec, cfg *config.InternalAppConfig) map[string]string {
573669
annotations := map[string]string{}
@@ -644,10 +740,13 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser
644740

645741
status := statusWithPhase(phase, message)
646742

647-
// Populate ingress URL from KService route status.
648-
if url := ksvc.Status.URL; url != nil {
649-
status.Ingress = &flyteapp.Ingress{
650-
PublicUrl: url.String(),
743+
// Populate ingress URL from the app annotation so the URL is consistent
744+
// with the Create response regardless of Knative route readiness.
745+
if appIDStr := ksvc.Annotations[annotationAppID]; appIDStr != "" {
746+
parts := strings.SplitN(appIDStr, "/", 3)
747+
if len(parts) == 3 {
748+
appID := &flyteapp.Identifier{Project: parts[0], Domain: parts[1], Name: parts[2]}
749+
status.Ingress = c.publicIngress(appID)
651750
}
652751
}
653752

app/internal/k8s/app_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func TestList(t *testing.T) {
285285
},
286286
}
287287

288-
apps, nextToken, err := c.List(context.Background(), "proj", "dev", "", 0, "")
288+
apps, nextToken, err := c.List(context.Background(), "proj", "dev", 0, "")
289289
require.NoError(t, err)
290290
assert.Empty(t, nextToken)
291291
require.Len(t, apps, 1)

0 commit comments

Comments
 (0)