Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fleet: init fleet webhook and ensure single fleet per namespace #409

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions pkg/webhooks/fleet_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
Copyright Kurator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhooks

import (
"context"
"fmt"
"sync"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"kurator.dev/kurator/pkg/apis/fleet/v1alpha1"
)

var _ webhook.CustomValidator = &FleetWebhook{}

// Define a map to store mutex for each namespace
var nsLocks = make(map[string]*sync.Mutex)

// Valid cluster kinds
var validClusterKinds = []string{"Cluster", "AttachedCluster", "CustomCluster"}

type FleetWebhook struct {
Client client.Reader
}

func (wh *FleetWebhook) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(&v1alpha1.Fleet{}).
WithValidator(wh).
Complete()
}

func (wh *FleetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) error {
in, ok := obj.(*v1alpha1.Fleet)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected a Fleet but got a %T", obj))
}

// Ensure only one Fleet instance in a namespace
mutex := getOrCreateMutexForNamespace(in.Namespace)
mutex.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock can only prevent inner-process creating, it cannot help for multi replicate scenarios.

One way as I said in the original issue, could make use of distributed lock.

defer mutex.Unlock()

// Check if Fleet instance already exists in the namespace
existing := &v1alpha1.Fleet{}
if err := wh.Client.Get(ctx, client.ObjectKey{Namespace: in.Namespace, Name: in.Name}, existing); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k8s has prevented creating same fleet

return apierrors.NewBadRequest(fmt.Sprintf("a Fleet instance already exists in namespace %s: %s", existing.Namespace, existing.Name))
}

return nil
}

// Utility function to get or create a mutex for a namespace
func getOrCreateMutexForNamespace(ns string) *sync.Mutex {
if _, exists := nsLocks[ns]; !exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data race r/w nsLocks

nsLocks[ns] = &sync.Mutex{}
}
return nsLocks[ns]
}

func (wh *FleetWebhook) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For update, we should prebent some fields update we do not support, this need to be reviewed one by one

_, ok := oldObj.(*v1alpha1.Fleet)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected a Fleet but got a %T", oldObj))
}

newFleet, ok := newObj.(*v1alpha1.Fleet)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected a Fleet but got a %T", newObj))
}

return wh.validate(newFleet)
}

func (wh *FleetWebhook) ValidateDelete(_ context.Context, obj runtime.Object) error {
return nil
}

func (wh *FleetWebhook) validate(in *v1alpha1.Fleet) error {
var allErrs field.ErrorList

allErrs = append(allErrs, validateCluster(in)...)

allErrs = append(allErrs, validatePlugin(in)...)

if len(allErrs) > 0 {
return apierrors.NewInvalid(v1alpha1.SchemeGroupVersion.WithKind("Fleet").GroupKind(), in.Name, allErrs)
}

return nil
}

// validateCluster checks the Clusters field of FleetSpec
func validateCluster(in *v1alpha1.Fleet) field.ErrorList {
var allErrs field.ErrorList
clusterPath := field.NewPath("spec", "clusters")

// Check each cluster reference
for i, clusterRef := range in.Spec.Clusters {
if clusterRef == nil {
allErrs = append(allErrs, field.Required(clusterPath.Index(i), "cluster reference cannot be nil"))
continue
}
if clusterRef.Name == "" {
allErrs = append(allErrs, field.Required(clusterPath.Index(i).Child("name"), "name is required"))
}
if clusterRef.Kind == "" {
allErrs = append(allErrs, field.Required(clusterPath.Index(i).Child("kind"), "kind is required"))
} else if !isValidClusterKind(clusterRef.Kind) {
allErrs = append(allErrs, field.Invalid(clusterPath.Index(i).Child("kind"), clusterRef.Kind, "unsupported cluster kind; please use AttachedCluster to manage your own cluster"))
}
}

return allErrs
}

// isValidClusterKind checks if the given kind is a valid cluster kind
func isValidClusterKind(kind string) bool {
for _, validKind := range validClusterKinds {
if kind == validKind {
return true
}
}
return false
}

// validatePlugin checks the Plugin field of FleetSpec
func validatePlugin(in *v1alpha1.Fleet) field.ErrorList {
var allErrs field.ErrorList

// TODO: add plugin validation here

return allErrs
}
94 changes: 94 additions & 0 deletions pkg/webhooks/fleet_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright Kurator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhooks

import (
"io/fs"
"os"
"path"
"path/filepath"
"testing"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"sigs.k8s.io/yaml"

"kurator.dev/kurator/pkg/apis/fleet/v1alpha1"
)

func TestValidFleetValidation(t *testing.T) {
r := path.Join("../../examples", "fleet")
caseNames := getCaseNames(t, r)

wh := &FleetWebhook{}
for _, tt := range caseNames {
t.Run(tt, func(t *testing.T) {
g := NewWithT(t)
c, err := readFleet(tt)
g.Expect(err).NotTo(HaveOccurred())

err = wh.validate(c)
g.Expect(err).NotTo(HaveOccurred())
})
}
}

func TestInvalidFleetValidation(t *testing.T) {
r := path.Join("testdata", "fleet")
caseNames := getCaseNames(t, r)

wh := &FleetWebhook{}
for _, tt := range caseNames {
t.Run(tt, func(t *testing.T) {
g := NewWithT(t)
c, err := readFleet(tt)
g.Expect(err).NotTo(HaveOccurred())

err = wh.validate(c)
g.Expect(err).To(HaveOccurred())
t.Logf("%v", err)
})
}
}

func getCaseNames(t *testing.T, rootPath string) []string {
caseNames := make([]string, 0)
err := filepath.WalkDir(rootPath, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}

caseNames = append(caseNames, path)
return nil
})
assert.NoError(t, err)
return caseNames
}

func readFleet(filename string) (*v1alpha1.Fleet, error) {
b, err := os.ReadFile(filename)
if err != nil {
return nil, err
}

c := &v1alpha1.Fleet{}
if err := yaml.Unmarshal(b, c); err != nil {
return nil, err
}

return c, nil
}
11 changes: 11 additions & 0 deletions pkg/webhooks/testdata/fleet/invalid-cluster-kind.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: fleet.kurator.dev/v1alpha1
kind: Fleet
metadata:
name: quickstart
namespace: default
spec:
clusters:
- name: kurator-member1
kind: AttachedCluster
- name: kurator-member2
kind: invalid-cluster-kind
11 changes: 11 additions & 0 deletions pkg/webhooks/testdata/fleet/miss-cluster-name-kind.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: fleet.kurator.dev/v1alpha1
kind: Fleet
metadata:
name: quickstart
namespace: default
spec:
clusters:
- name: ""
kind: AttachedCluster
- name: kurator-member2
kind: ""
Loading