Skip to content

Commit

Permalink
change preorder queue config traversal to postorder
Browse files Browse the repository at this point in the history
  • Loading branch information
doupache committed Oct 25, 2023
1 parent dac7360 commit 83af587
Showing 1 changed file with 14 additions and 38 deletions.
52 changes: 14 additions & 38 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,21 @@ func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string) err
m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
m.configuredGroups = make(map[string][]string)
return m.internalProcessConfig(config, queuePath)
}

func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string) error {
// Holds user and group for which limits have been configured with specific queue path
userLimits := make(map[string]bool)
groupLimits := make(map[string]bool)
GroupConfigChanged := false
return m.internalProcessConfig(config, queuePath, userLimits, groupLimits)
}

func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string, userLimits map[string]bool, groupLimits map[string]bool) error {
if len(cur.Queues) > 0 {
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath, userLimits, groupLimits); err != nil {
return err
}
}
}
// Traverse limits of specific queue path
for _, limit := range cur.Limits {
var maxResource *resources.Resource
Expand Down Expand Up @@ -366,9 +373,6 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
zap.String("queue path", queuePath),
zap.Uint64("max application", limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
if !GroupConfigChanged {
GroupConfigChanged = m.GroupConfighasChanged(group, limitConfig)
}
if err := m.processGroupConfig(group, limitConfig, queuePath, groupLimits); err != nil {
return err
}
Expand All @@ -380,20 +384,10 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
}
}

if GroupConfigChanged {
if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
return err
}
if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
return err
}

if len(cur.Queues) > 0 {
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath); err != nil {
return err
}
}
}
return nil
}

Expand All @@ -413,24 +407,6 @@ func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, que
return nil
}

func (m *Manager) GroupConfighasChanged(group string, limitConfig *LimitConfig) bool {
groupTracker, ok := m.groupTrackers[group]

// new group limits
if !ok {
return true
}
// maxApplications limit changed
if groupTracker.queueTracker.maxRunningApps != limitConfig.maxApplications {
return true
}
// maxResources limit changed
if !resources.Equals(groupTracker.queueTracker.maxResources, limitConfig.maxResources) {
return true
}
return false
}

// clearEarlierSetLimits Clear already configured limits of users and groups for which limits have been configured before but not now
func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool, groupLimits map[string]bool, queuePath string) error {
// Clear already configured limits of group for which limits have been configured before but not now
Expand Down

0 comments on commit 83af587

Please sign in to comment.