Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ all: build
check: verify lint test test-e2e test-e2e-contribs
.PHONY: check

GOMODS := $(shell find . -name 'go.mod' -exec dirname {} \; | grep -v hack/tools)
Comment thread
mjudeikis marked this conversation as resolved.
GOMODS := $(shell find . -name 'go.mod' -exec dirname {} \; | grep -v hack/tools | grep -v ./dex)

ldflags:
@echo $(LDFLAGS)
Expand Down Expand Up @@ -245,8 +245,12 @@ GO_TEST = $(GOTESTSUM) $(GOTESTSUM_ARGS) --
endif

COUNT ?= 1
NPROC ?= $$(( $(shell nproc) / 2 ))
E2E_PARALLELISM ?= $$(( $(NPROC) > 1 ? $(NPROC) : 1))
# Only set parallelism if user specified E2E_PARALLELISM
ifdef E2E_PARALLELISM
E2E_PARALLELISM_FLAG := -p $(E2E_PARALLELISM) -parallel $(E2E_PARALLELISM)
else
E2E_PARALLELISM_FLAG :=
endif

$(DEX):
mkdir -p $(TOOLS_DIR)
Expand Down Expand Up @@ -280,15 +284,15 @@ test-e2e: $(KCP) $(DEX) build ## Run e2e tests
$(MAKE) run-kcp &>.kcp/kcp.log & KCP_PID=$$!; \
trap 'kill -TERM $$KCP_PID; rm -rf .kcp' TERM INT EXIT && \
echo "Waiting for kcp to be ready (check .kcp/kcp.log)." && while ! KUBECONFIG=.kcp/admin.kubeconfig kubectl get --raw /readyz &>/dev/null; do sleep 1; echo -n "."; done && echo && \
KUBECONFIG=$$PWD/.kcp/admin.kubeconfig GOOS=$(OS) GOARCH=$(ARCH) $(GO_TEST) -race -count $(COUNT) -p $(E2E_PARALLELISM) -parallel $(E2E_PARALLELISM) $(WHAT) $(TEST_ARGS)
KUBECONFIG=$$PWD/.kcp/admin.kubeconfig GOOS=$(OS) GOARCH=$(ARCH) $(GO_TEST) -race -count $(COUNT) $(E2E_PARALLELISM_FLAG) $(WHAT) $(TEST_ARGS)

CONTRIBS_E2E := $(patsubst %,test-e2e-contrib-%,$(CONTRIBS))

.PHONY: test-e2e-contribs $(CONTRIBS_E2E)
test-e2e-contribs: $(CONTRIBS_E2E) ## Run e2e tests for external integrations
test-e2e-contrib-kcp: $(DEX) $(KCP)
$(CONTRIBS_E2E):
cd contrib/$(patsubst test-e2e-contrib-%,%,$@) && $(GO_TEST) -race -count $(COUNT) -p $(E2E_PARALLELISM) -parallel $(E2E_PARALLELISM) ./test/e2e/...
cd contrib/$(patsubst test-e2e-contrib-%,%,$@) && $(GO_TEST) -race -count $(COUNT) $(E2E_PARALLELISM_FLAG) ./test/e2e/...

.PHONY: test
ifdef USE_GOTESTSUM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (r *reconciler) ensureRBACClusterRole(ctx context.Context, client client.Cl
Resources: []string{"boundschemas"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{kubebindv1alpha2.GroupName},
Resources: []string{"boundschemas/status"},
Verbs: []string{"get", "update", "patch"},
},
}}
for _, export := range exports {
// Collect unique GroupResources and sort for stable rule ordering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewAPIServiceExportReconciler(
opts controller.TypedOptions[mcreconcile.Request],
) (*APIServiceExportReconciler, error) {
if err := mgr.GetFieldIndexer().IndexField(ctx, &kubebindv1alpha2.APIServiceExport{}, indexers.ServiceExportByBoundSchema,
indexers.IndexServiceExportByBoundSchema); err != nil {
indexers.IndexServiceExportByBoundSchemaControllerRuntime); err != nil {
return nil, fmt.Errorf("failed to setup ServiceExportByBoundSchema indexer: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func NewAPIServiceExportRequestReconciler(
informerScope: scope,
clusterScopedIsolation: isolation,
schemaSource: schemaSource,
getBoundSchema: func(ctx context.Context, cache cache.Cache, namespace, name string) (*kubebindv1alpha2.BoundSchema, error) {
getBoundSchema: func(ctx context.Context, cl client.Client, namespace, name string) (*kubebindv1alpha2.BoundSchema, error) {
var schema kubebindv1alpha2.BoundSchema
key := types.NamespacedName{Namespace: namespace, Name: name}
if err := cache.Get(ctx, key, &schema); err != nil {
if err := cl.Get(ctx, key, &schema); err != nil {
return nil, err
}
return &schema, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type reconciler struct {
clusterScopedIsolation kubebindv1alpha2.Isolation
schemaSource string

getBoundSchema func(ctx context.Context, cache cache.Cache, namespace, name string) (*kubebindv1alpha2.BoundSchema, error)
getBoundSchema func(ctx context.Context, cl client.Client, namespace, name string) (*kubebindv1alpha2.BoundSchema, error)
createBoundSchema func(ctx context.Context, cl client.Client, schema *kubebindv1alpha2.BoundSchema) error

getServiceExport func(ctx context.Context, cache cache.Cache, ns, name string) (*kubebindv1alpha2.APIServiceExport, error)
Expand All @@ -53,14 +53,21 @@ type reconciler struct {
}

func (r *reconciler) reconcile(ctx context.Context, cl client.Client, cache cache.Cache, req *kubebindv1alpha2.APIServiceExportRequest) error {
// We must ensure schemas are created in form of boundSchemas first for the validation.
// Worst case scenario if validation fails, we will reuse schemas for same consumer once issues are fixed.
if err := r.ensureBoundSchemas(ctx, cl, cache, req); err != nil {
conditions.SetSummary(req)
return err
return fmt.Errorf("failed to ensure bound schemas: %w", err)
}

if err := r.validate(ctx, cl, req); err != nil {
conditions.SetSummary(req)
return fmt.Errorf("failed to validate APIServiceExportRequest: %w", err)
}

if err := r.ensureExports(ctx, cl, cache, req); err != nil {
conditions.SetSummary(req)
return err
return fmt.Errorf("failed to ensure exports: %w", err)
}

// TODO(mjudeikis): we could potentially add finallizer to APIServiceExport above or "adopt" boundschemas
Expand All @@ -72,104 +79,77 @@ func (r *reconciler) reconcile(ctx context.Context, cl client.Client, cache cach
return nil
}

func (r *reconciler) ensureBoundSchemas(ctx context.Context, cl client.Client, cache cache.Cache, req *kubebindv1alpha2.APIServiceExportRequest) error {
// Ensure all bound schemas exist
for _, res := range req.Spec.Resources {
parts := strings.SplitN(r.schemaSource, ".", 3)
if len(parts) != 3 { // We check this in validation, but just in case.
return fmt.Errorf("malformed schema source: %q", r.schemaSource)
}
// getExportedSchemas will list all schemas, exported by current backend.
// Important: getExportedSchemas is using client.Client to list resources, not cache.
// This is due to fact we use dynamic client and unstructured.Unstructured to get schemas and it
// does not quite work with dynamic cache informers:
// failed to get informer for *unstructured.UnstructuredList apis.kcp.io/v1alpha1, Kind=APIResourceSchemaList: failed to find newly started informer for apis.kcp.io/v1alpha1, Kind=APIResourceSchema"}.
func (r *reconciler) getExportedSchemas(ctx context.Context, cl client.Client) (kubebindv1alpha2.ExportedSchemas, error) {
parts := strings.SplitN(r.schemaSource, ".", 3)
if len(parts) != 3 { // We check this in validation, but just in case.
return nil, fmt.Errorf("malformed schema source: %q", r.schemaSource)
}

gvk := schema.GroupVersionKind{
Kind: parts[0],
Version: parts[1],
Group: parts[2],
}
gvk := schema.GroupVersionKind{
Kind: parts[0],
Version: parts[1],
Group: parts[2],
}

// Ensure we have the List kind
listGVK := gvk
if !strings.HasSuffix(listGVK.Kind, "List") {
listGVK.Kind += "List"
}
// Ensure we have the List kind
listGVK := gvk
if !strings.HasSuffix(listGVK.Kind, "List") {
listGVK.Kind += "List"
}

list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(listGVK)
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(listGVK)

// TODO(mjudeikis): This is hardcoded here and in handlers.go for now.
labelSelector := labels.Set{
resources.ExportedCRDsLabel: "true",
}
// TODO(mjudeikis): This is hardcoded here and in handlers.go for now.
labelSelector := labels.Set{
resources.ExportedCRDsLabel: "true",
}

listOpts := []client.ListOption{}
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector.AsSelector()})
listOpts := []client.ListOption{}
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector.AsSelector()})

if err := cl.List(ctx, list, listOpts...); err != nil {
return err
}
if err := cl.List(ctx, list, listOpts...); err != nil {
return nil, err
}

for _, item := range list.Items {
var schemaFailed bool
obj := item.UnstructuredContent()
group, ok, err := unstructured.NestedString(obj, "spec", "group")
if !ok || err != nil || group == "" {
klog.FromContext(ctx).Error(err, "Skipping invalid schema: missing group", "ns", item.GetNamespace(), "name", item.GetName())
schemaFailed = true
}
plural, ok, err := unstructured.NestedString(obj, "spec", "names", "plural")
if !ok || err != nil || plural == "" {
klog.FromContext(ctx).Error(err, "Skipping invalid schema: missing names.plural", "ns", item.GetNamespace(), "name", item.GetName())
schemaFailed = true
}
boundSchemas := make(kubebindv1alpha2.ExportedSchemas, len(list.Items))
for _, item := range list.Items {
boundSchema, err := helpers.UnstructuredToBoundSchema(item)
if err != nil {
return nil, err
}
boundSchemas[boundSchema.ResourceGroupName()] = boundSchema
}

scope, ok, err := unstructured.NestedString(obj, "spec", "scope")
if !ok || err != nil || scope == "" {
klog.FromContext(ctx).Error(err, "Skipping invalid schema: missing scope", "ns", item.GetNamespace(), "name", item.GetName())
schemaFailed = true
}
return boundSchemas, nil
}

if schemaFailed {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportRequestConditionExportsReady,
"APIServiceExportRequestInvalid",
conditionsapi.ConditionSeverityError,
"APIServiceExportRequest %s is invalid: resource %s/%s has invalid schema",
req.Name, group, plural,
)
req.Status.Phase = kubebindv1alpha2.APIServiceExportRequestPhaseFailed
return fmt.Errorf("resource %s/%s is invalid", group, plural)
}
func (r *reconciler) ensureBoundSchemas(ctx context.Context, cl client.Client, cache cache.Cache, req *kubebindv1alpha2.APIServiceExportRequest) error {
exportedSchemas, err := r.getExportedSchemas(ctx, cl)
if err != nil {
return err
}

if group == res.Group && plural == res.Resource {
// Important: This checks if the resource are correctly scoped. If consumer is namespaced, we can't allow this.
// We terminate early to prevent triggering other controllers.
if r.informerScope.String() != scope && r.informerScope != kubebindv1alpha2.ClusterScope {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportRequestConditionExportsReady,
"APIServiceExportRequestInvalid",
conditionsapi.ConditionSeverityError,
"APIServiceExportRequest %s is invalid: resource %s/%s has scope %q which is incompatible with backend informer scope %q",
req.Name, group, plural, scope, r.informerScope,
)
req.Status.Phase = kubebindv1alpha2.APIServiceExportRequestPhaseFailed
req.Status.TerminalMessage = conditions.GetMessage(req, kubebindv1alpha2.APIServiceExportRequestConditionExportsReady)
// We can't proceed with this request.
return fmt.Errorf("resource %s/%s has scope %q which is incompatible with backend informer scope %q", group, plural, scope, r.informerScope)
}
// Ensure all bound schemas exist
for _, res := range req.Spec.Resources {
if len(res.Versions) == 0 {
Comment thread
mjudeikis marked this conversation as resolved.
continue
}

// https://github.com/kube-bind/kube-bind/issues/297 to fix.
boundSchema, err := helpers.UnstructuredToBoundSchema(item)
if err != nil {
return err
}
for _, boundSchema := range exportedSchemas {
if boundSchema.Spec.Group == res.Group && boundSchema.Spec.Names.Plural == res.Resource {
boundSchema.Name = res.ResourceGroupName()
Comment thread
mjudeikis marked this conversation as resolved.
boundSchema.Namespace = req.Namespace
boundSchema.Spec.InformerScope = r.informerScope
boundSchema.ResourceVersion = ""

obj, err := r.getBoundSchema(ctx, cache, boundSchema.Namespace, boundSchema.Name)
if err != nil && !apierrors.IsNotFound(err) {
obj, err := r.getBoundSchema(ctx, cl, boundSchema.Namespace, boundSchema.Name)
if err != nil && !apierrors.IsNotFound(err) && !strings.Contains(err.Error(), "no matches for kind") {
return err
}

Expand All @@ -196,7 +176,7 @@ func (r *reconciler) ensureExports(ctx context.Context, cl client.Client, cache
if req.Status.Phase == kubebindv1alpha2.APIServiceExportRequestPhasePending {
for _, res := range req.Spec.Resources {
name := res.ResourceGroupName()
boundSchema, err := r.getBoundSchema(ctx, cache, req.Namespace, name)
boundSchema, err := r.getBoundSchema(ctx, cl, req.Namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
conditions.MarkFalse(
Expand Down Expand Up @@ -258,6 +238,7 @@ func (r *reconciler) ensureExports(ctx context.Context, cl client.Client, cache
Versions: res.Versions,
})
}
export.Spec.PermissionClaims = req.Spec.PermissionClaims

logger.V(1).Info("Creating APIServiceExport", "name", export.Name, "namespace", export.Namespace)
if err := r.createServiceExport(ctx, cl, export); err != nil {
Expand All @@ -283,3 +264,106 @@ func (r *reconciler) ensureExports(ctx context.Context, cl client.Client, cache

return nil
}

// Validate validates if the APIServiceExportRequest is in a valid state.
// Currently it validates if all requested schemas are of the same scope and
// if claimable apis are allowed and valid.
//
// TODO: Move this to validatingAdmissionWebhook as this is not really part of reconciliation.
// https://github.com/kube-bind/kube-bind/issues/325
func (r *reconciler) validate(ctx context.Context, cl client.Client, req *kubebindv1alpha2.APIServiceExportRequest) error {
exportedSchemas, err := r.getExportedSchemas(ctx, cl)
if err != nil {
return err
}

if len(exportedSchemas) == 0 {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportRequestConditionExportsReady,
"SchemaNotFound",
conditionsapi.ConditionSeverityError,
"Schema not found",
)
return fmt.Errorf("no exported schemas found")
}

first := apiextensionsv1.ResourceScope("")

@cnvergence cnvergence Oct 7, 2025

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could just use var first apiextensionsv1.ResourceScope

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address in follow-up

for _, res := range req.Spec.Resources {
boundSchema, ok := exportedSchemas[res.ResourceGroupName()]
if !ok {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportRequestConditionExportsReady,
"SchemaNotFound",
conditionsapi.ConditionSeverityError,
"Schema %s not found",
res.ResourceGroupName(),
)
return fmt.Errorf("schema %s not found", res.ResourceGroupName())
}
if first == apiextensionsv1.ResourceScope("") {
first = boundSchema.Spec.Scope
continue
}
if boundSchema.Spec.Scope != first {
conditions.MarkFalse(req,
kubebindv1alpha2.APIServiceExportRequestConditionExportsReady,
"DifferentScopes",
conditionsapi.ConditionSeverityError,
"Different scopes found: %v",
boundSchema.Spec.Scope,
)
return fmt.Errorf("different scopes found for claimed resources: %v", boundSchema.Name)
}
}

// Add validation if claimable apis are valid here
for _, claim := range req.Spec.PermissionClaims {
if !isClaimableAPI(claim) {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportConditionPermissionClaim,
"InvalidPermissionClaim",
conditionsapi.ConditionSeverityError,
"Resource %s is not a valid claimable API",
claim.GroupResource.String(),
)
req.Status.Phase = kubebindv1alpha2.APIServiceExportRequestPhaseFailed
req.Status.TerminalMessage = conditions.GetMessage(req, kubebindv1alpha2.APIServiceExportConditionPermissionClaim)
return fmt.Errorf("resource %s is not a valid claimable API", claim.GroupResource.String())
}
}

// Add validation for duplicate group/resource combinations
seenGroupResources := make(map[string]bool)
for _, claim := range req.Spec.PermissionClaims {
key := claim.Group + "/" + claim.Resource
if seenGroupResources[key] {
conditions.MarkFalse(
req,
kubebindv1alpha2.APIServiceExportConditionPermissionClaim,
"DuplicatePermissionClaim",
conditionsapi.ConditionSeverityError,
"Duplicate permission claim found for group/resource %s",
claim.GroupResource.String(),
)
req.Status.Phase = kubebindv1alpha2.APIServiceExportRequestPhaseFailed
req.Status.TerminalMessage = conditions.GetMessage(req, kubebindv1alpha2.APIServiceExportConditionPermissionClaim)
return fmt.Errorf("duplicate permission claim found for group/resource %s", claim.GroupResource.String())
}
seenGroupResources[key] = true
}

return nil
}

// isClaimableAPI checks if a permission claim is for a claimable API.
func isClaimableAPI(claim kubebindv1alpha2.PermissionClaim) bool {
for _, api := range kubebindv1alpha2.ClaimableAPIs {
if claim.Group == api.GroupVersionResource.Group && claim.Resource == api.Names.Plural {
return true
}
}
return false
}
Loading