Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TAS: Update cache on delete Topology. #3615

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/cache/tas_flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type TASFlavorCache struct {

client client.Client

// topologyName indicates the name of the topology specified in the
// TopologyName indicates the name of the topology specified in the
// ResourceFlavor spec.topologyName field.
topologyName kueue.TopologyReference
TopologyName kueue.TopologyReference
// nodeLabels is a map of nodeLabels defined in the ResourceFlavor object.
NodeLabels map[string]string
// levels is a list of levels defined in the Topology object referenced
Expand All @@ -67,11 +67,11 @@ type TASFlavorCache struct {
usage map[utiltas.TopologyDomainID]resources.Requests
}

func (t *TASCache) NewTASFlavorCache(topologyName kueue.TopologyReference, labels []string, nodeLabels map[string]string) *TASFlavorCache {
func (t *TASCache) NewTASFlavorCache(topologyName kueue.TopologyReference, levels []string, nodeLabels map[string]string) *TASFlavorCache {
return &TASFlavorCache{
client: t.client,
topologyName: topologyName,
Levels: slices.Clone(labels),
TopologyName: topologyName,
Levels: slices.Clone(levels),
NodeLabels: maps.Clone(nodeLabels),
usage: make(map[utiltas.TopologyDomainID]resources.Requests),
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (c *TASFlavorCache) snapshotForNodes(log logr.Logger, nodes []corev1.Node,

log.V(3).Info("Constructing TAS snapshot", "nodeLabels", c.NodeLabels,
"levels", c.Levels, "nodeCount", len(nodes), "podCount", len(pods))
snapshot := newTASFlavorSnapshot(log, c.topologyName, c.Levels)
snapshot := newTASFlavorSnapshot(log, c.TopologyName, c.Levels)
nodeToDomain := make(map[string]utiltas.TopologyDomainID)
for _, node := range nodes {
levelValues := utiltas.LevelValues(c.Levels, node.Labels)
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller/tas/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
)

const (
WorkloadNameKey = "metadata.workload"
ReadyNode = "metadata.ready"
WorkloadNameKey = "metadata.workload"
ReadyNode = "metadata.ready"
ResourceFlavorTopologyNameKey = "spec.topologyName"
)

func indexPodWorkload(o client.Object) []string {
Expand Down Expand Up @@ -57,6 +59,14 @@ func indexReadyNode(o client.Object) []string {
return []string{"true"}
}

func indexResourceFlavorTopologyName(o client.Object) []string {
flavor, ok := o.(*kueue.ResourceFlavor)
if !ok || flavor.Spec.TopologyName == nil {
return nil
}
return []string{string(*flavor.Spec.TopologyName)}
}

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &corev1.Pod{}, WorkloadNameKey, indexPodWorkload); err != nil {
return fmt.Errorf("setting index pod workload: %w", err)
Expand All @@ -66,5 +76,9 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return fmt.Errorf("setting index node ready: %w", err)
}

if err := indexer.IndexField(ctx, &kueue.ResourceFlavor{}, ResourceFlavorTopologyNameKey, indexResourceFlavorTopologyName); err != nil {
return fmt.Errorf("setting index resource flavor topology name: %w", err)
}

return nil
}
55 changes: 54 additions & 1 deletion pkg/controller/tas/resource_flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -38,6 +39,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/tas/indexer"
"sigs.k8s.io/kueue/pkg/queue"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
)
Expand Down Expand Up @@ -79,6 +81,7 @@ func (r *rfReconciler) setupWithManager(mgr ctrl.Manager, cache *cache.Cache, cf
Named(TASResourceFlavorController).
For(&kueue.ResourceFlavor{}).
Watches(&corev1.Node{}, &nodeHandler).
Watches(&kueuealpha.Topology{}, &topologyHandler{client: r.client, tasCache: r.tasCache}).
WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}).
WithEventFilter(r).
Complete(core.WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg))
Expand Down Expand Up @@ -134,6 +137,56 @@ func (h *nodeHandler) queueReconcileForNode(node *corev1.Node, q workqueue.Typed
func (h *nodeHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

var _ handler.EventHandler = (*topologyHandler)(nil)

// topologyHandler handles node update events.
type topologyHandler struct {
client client.Client
tasCache *cache.TASCache
}

func (h *topologyHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
topology, isTopology := e.Object.(*kueuealpha.Topology)
if !isTopology || topology == nil {
return
}

flavors := &kueue.ResourceFlavorList{}
if err := h.client.List(ctx, flavors, client.MatchingFields{indexer.ResourceFlavorTopologyNameKey: topology.Name}); err != nil {
log := ctrl.LoggerFrom(ctx).WithValues("topology", klog.KObj(topology))
log.Error(err, "Could not list resource flavors")
return
}

// trigger reconcile for TAS flavors affected by the node being created or updated
for _, flavor := range flavors.Items {
if flavor.Spec.TopologyName == nil {
continue
}
if *flavor.Spec.TopologyName == kueue.TopologyReference(topology.Name) {
q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{Name: flavor.Name}}, nodeBatchPeriod)
}
}
}

func (h *topologyHandler) Update(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (h *topologyHandler) Delete(_ context.Context, e event.DeleteEvent, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
topology, isTopology := e.Object.(*kueuealpha.Topology)
if !isTopology || topology == nil {
return
}
for name, flavor := range h.tasCache.Clone() {
if flavor.TopologyName == kueue.TopologyReference(topology.Name) {
h.tasCache.Delete(name)
}
}
}

func (h *topologyHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx).WithValues("name", req.NamespacedName.Name)
log.V(2).Info("Reconcile TAS Resource Flavor")
Expand All @@ -150,7 +203,7 @@ func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
if r.tasCache.Get(flavorReference) == nil {
topology := kueuealpha.Topology{}
if err := r.client.Get(ctx, types.NamespacedName{Name: string(*flv.Spec.TopologyName)}, &topology); err != nil {
return reconcile.Result{}, err
return reconcile.Result{}, client.IgnoreNotFound(err)
}
levels := utiltas.Levels(&topology)
tasInfo := r.tasCache.NewTASFlavorCache(kueue.TopologyReference(topology.Name), levels, flv.Spec.NodeLabels)
Expand Down
50 changes: 50 additions & 0 deletions test/integration/tas/tas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package core

import (
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -514,6 +516,54 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() {
))
})
})

ginkgo.It("should not admit the workload after the topology is deleted but should admit it after the topology is created", func() {
ginkgo.By("delete topology", func() {
util.ExpectObjectToBeDeleted(ctx, k8sClient, topology, true)
})

// TODO(#3645): replace the sleep with waiting for CQ deactivation.
// The sleep is a temporary solution to minimize the chance for the test flaking in case
// the workload is created and admitted before the event is handled and the topology
// is removed from the cache.
time.Sleep(time.Second)

var wl *kueue.Workload
ginkgo.By("creating a workload which requires block and can fit", func() {
mbobrovskyi marked this conversation as resolved.
Show resolved Hide resolved
wl = testing.MakeWorkload("wl", ns.Name).
Queue(localQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
wl.Spec.PodSets[0].Count = 2
wl.Spec.PodSets[0].TopologyRequest = &kueue.PodSetTopologyRequest{
Required: ptr.To(tasBlockLabel),
}
gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed())
})

ginkgo.By("verify the workload is inadmissible", func() {
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 1)
})

topology = testing.MakeTopology("default").Levels([]string{tasBlockLabel, tasRackLabel}).Obj()
gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed())

ginkgo.By("verify the workload is admitted", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

ginkgo.By("verify admission for the workload", func() {
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).To(gomega.Succeed())
gomega.Expect(wl.Status.Admission.PodSetAssignments[0].TopologyAssignment).Should(gomega.BeComparableTo(
&kueue.TopologyAssignment{
Levels: []string{tasBlockLabel, tasRackLabel},
Domains: []kueue.TopologyDomainAssignment{
{Count: 1, Values: []string{"b1", "r1"}},
{Count: 1, Values: []string{"b1", "r2"}},
},
},
))
})
})
})

ginkgo.When("Node structure is mutated during test cases", func() {
Expand Down