diff --git a/pkg/cache/tas_flavor.go b/pkg/cache/tas_flavor.go index 3bd939cd17..e4104e8b72 100644 --- a/pkg/cache/tas_flavor.go +++ b/pkg/cache/tas_flavor.go @@ -54,9 +54,9 @@ type TASFlavorCache struct { client client.Client - // topologyName indicates the name of the topology specified in the + // TopologyName indicates the name of the topology specified in the // ResourceFlavor spec.topologyName field. - topologyName kueue.TopologyReference + TopologyName kueue.TopologyReference // nodeLabels is a map of nodeLabels defined in the ResourceFlavor object. NodeLabels map[string]string // levels is a list of levels defined in the Topology object referenced @@ -67,11 +67,11 @@ type TASFlavorCache struct { usage map[utiltas.TopologyDomainID]resources.Requests } -func (t *TASCache) NewTASFlavorCache(topologyName kueue.TopologyReference, labels []string, nodeLabels map[string]string) *TASFlavorCache { +func (t *TASCache) NewTASFlavorCache(topologyName kueue.TopologyReference, levels []string, nodeLabels map[string]string) *TASFlavorCache { return &TASFlavorCache{ client: t.client, - topologyName: topologyName, - Levels: slices.Clone(labels), + TopologyName: topologyName, + Levels: slices.Clone(levels), NodeLabels: maps.Clone(nodeLabels), usage: make(map[utiltas.TopologyDomainID]resources.Requests), } @@ -111,7 +111,7 @@ func (c *TASFlavorCache) snapshotForNodes(log logr.Logger, nodes []corev1.Node, log.V(3).Info("Constructing TAS snapshot", "nodeLabels", c.NodeLabels, "levels", c.Levels, "nodeCount", len(nodes), "podCount", len(pods)) - snapshot := newTASFlavorSnapshot(log, c.topologyName, c.Levels) + snapshot := newTASFlavorSnapshot(log, c.TopologyName, c.Levels) nodeToDomain := make(map[string]utiltas.TopologyDomainID) for _, node := range nodes { levelValues := utiltas.LevelValues(c.Levels, node.Labels) diff --git a/pkg/controller/tas/indexer/indexer.go b/pkg/controller/tas/indexer/indexer.go index 1efbd5bafa..f7ac417c3d 100644 --- a/pkg/controller/tas/indexer/indexer.go +++ b/pkg/controller/tas/indexer/indexer.go @@ -24,12 +24,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utiltas "sigs.k8s.io/kueue/pkg/util/tas" ) const ( - WorkloadNameKey = "metadata.workload" - ReadyNode = "metadata.ready" + WorkloadNameKey = "metadata.workload" + ReadyNode = "metadata.ready" + ResourceFlavorTopologyNameKey = "spec.topologyName" ) func indexPodWorkload(o client.Object) []string { @@ -57,6 +59,14 @@ func indexReadyNode(o client.Object) []string { return []string{"true"} } +func indexResourceFlavorTopologyName(o client.Object) []string { + flavor, ok := o.(*kueue.ResourceFlavor) + if !ok || flavor.Spec.TopologyName == nil { + return nil + } + return []string{string(*flavor.Spec.TopologyName)} +} + func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { if err := indexer.IndexField(ctx, &corev1.Pod{}, WorkloadNameKey, indexPodWorkload); err != nil { return fmt.Errorf("setting index pod workload: %w", err) @@ -66,5 +76,9 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { return fmt.Errorf("setting index node ready: %w", err) } + if err := indexer.IndexField(ctx, &kueue.ResourceFlavor{}, ResourceFlavorTopologyNameKey, indexResourceFlavorTopologyName); err != nil { + return fmt.Errorf("setting index resource flavor topology name: %w", err) + } + return nil } diff --git a/pkg/controller/tas/resource_flavor.go b/pkg/controller/tas/resource_flavor.go index cd80a185c9..06137d984f 100644 --- a/pkg/controller/tas/resource_flavor.go +++ b/pkg/controller/tas/resource_flavor.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,6 +39,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/tas/indexer" "sigs.k8s.io/kueue/pkg/queue" utiltas "sigs.k8s.io/kueue/pkg/util/tas" ) @@ -79,6 +81,7 @@ func (r *rfReconciler) setupWithManager(mgr ctrl.Manager, cache *cache.Cache, cf Named(TASResourceFlavorController). For(&kueue.ResourceFlavor{}). Watches(&corev1.Node{}, &nodeHandler). + Watches(&kueuealpha.Topology{}, &topologyHandler{client: r.client, tasCache: r.tasCache}). WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WithEventFilter(r). Complete(core.WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg)) @@ -134,6 +137,56 @@ func (h *nodeHandler) queueReconcileForNode(node *corev1.Node, q workqueue.Typed func (h *nodeHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { } +var _ handler.EventHandler = (*topologyHandler)(nil) + +// topologyHandler handles node update events. +type topologyHandler struct { + client client.Client + tasCache *cache.TASCache +} + +func (h *topologyHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + topology, isTopology := e.Object.(*kueuealpha.Topology) + if !isTopology || topology == nil { + return + } + + flavors := &kueue.ResourceFlavorList{} + if err := h.client.List(ctx, flavors, client.MatchingFields{indexer.ResourceFlavorTopologyNameKey: topology.Name}); err != nil { + log := ctrl.LoggerFrom(ctx).WithValues("topology", klog.KObj(topology)) + log.Error(err, "Could not list resource flavors") + return + } + + // trigger reconcile for TAS flavors affected by the node being created or updated + for _, flavor := range flavors.Items { + if flavor.Spec.TopologyName == nil { + continue + } + if *flavor.Spec.TopologyName == kueue.TopologyReference(topology.Name) { + q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{Name: flavor.Name}}, nodeBatchPeriod) + } + } +} + +func (h *topologyHandler) Update(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +func (h *topologyHandler) Delete(_ context.Context, e event.DeleteEvent, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { + topology, isTopology := e.Object.(*kueuealpha.Topology) + if !isTopology || topology == nil { + return + } + for name, flavor := range h.tasCache.Clone() { + if flavor.TopologyName == kueue.TopologyReference(topology.Name) { + h.tasCache.Delete(name) + } + } +} + +func (h *topologyHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx).WithValues("name", req.NamespacedName.Name) log.V(2).Info("Reconcile TAS Resource Flavor") @@ -150,7 +203,7 @@ func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re if r.tasCache.Get(flavorReference) == nil { topology := kueuealpha.Topology{} if err := r.client.Get(ctx, types.NamespacedName{Name: string(*flv.Spec.TopologyName)}, &topology); err != nil { - return reconcile.Result{}, err + return reconcile.Result{}, client.IgnoreNotFound(err) } levels := utiltas.Levels(&topology) tasInfo := r.tasCache.NewTASFlavorCache(kueue.TopologyReference(topology.Name), levels, flv.Spec.NodeLabels) diff --git a/test/integration/tas/tas_test.go b/test/integration/tas/tas_test.go index e4fa7b61c1..9c45a8e758 100644 --- a/test/integration/tas/tas_test.go +++ b/test/integration/tas/tas_test.go @@ -17,6 +17,8 @@ limitations under the License. package core import ( + "time" + "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -514,6 +516,54 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() { )) }) }) + + ginkgo.It("should not admit the workload after the topology is deleted but should admit it after the topology is created", func() { + ginkgo.By("delete topology", func() { + util.ExpectObjectToBeDeleted(ctx, k8sClient, topology, true) + }) + + // TODO(#3645): replace the sleep with waiting for CQ deactivation. + // The sleep is a temporary solution to minimize the chance for the test flaking in case + // the workload is created and admitted before the event is handled and the topology + // is removed from the cache. + time.Sleep(time.Second) + + var wl *kueue.Workload + ginkgo.By("creating a workload which requires block and can fit", func() { + wl = testing.MakeWorkload("wl", ns.Name). + Queue(localQueue.Name).Request(corev1.ResourceCPU, "1").Obj() + wl.Spec.PodSets[0].Count = 2 + wl.Spec.PodSets[0].TopologyRequest = &kueue.PodSetTopologyRequest{ + Required: ptr.To(tasBlockLabel), + } + gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) + }) + + ginkgo.By("verify the workload is inadmissible", func() { + util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 1) + }) + + topology = testing.MakeTopology("default").Levels([]string{tasBlockLabel, tasRackLabel}).Obj() + gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed()) + + ginkgo.By("verify the workload is admitted", func() { + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) + }) + + ginkgo.By("verify admission for the workload", func() { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed()) + gomega.Expect(wl.Status.Admission.PodSetAssignments[0].TopologyAssignment).Should(gomega.BeComparableTo( + &kueue.TopologyAssignment{ + Levels: []string{tasBlockLabel, tasRackLabel}, + Domains: []kueue.TopologyDomainAssignment{ + {Count: 1, Values: []string{"b1", "r1"}}, + {Count: 1, Values: []string{"b1", "r2"}}, + }, + }, + )) + }) + }) }) ginkgo.When("Node structure is mutated during test cases", func() {