Skip to content

Commit

Permalink
fixed task scheduling issue
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 26, 2021
1 parent edca4e6 commit dab9b44
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 23 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ require (
github.com/crawlab-team/crawlab-log v0.1.0
github.com/crawlab-team/crawlab-vcs v0.1.0
github.com/crawlab-team/go-trace v0.1.0
github.com/crawlab-team/goseaweedfs v0.2.0
github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/emirpasic/gods v1.12.0
github.com/fsnotify/fsnotify v1.4.9
github.com/gamexg/proxyclient v0.0.0-20210207161252-499908056324 // indirect
github.com/gavv/httpexpect/v2 v2.2.0
github.com/gin-gonic/gin v1.6.3
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-playground/validator/v10 v10.3.0
github.com/go-playground/validator/v10 v10.3.0 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/hashicorp/go-sockaddr v1.0.0
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/imroc/req v0.3.0
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901
github.com/matcornic/hermes v1.2.0
github.com/matcornic/hermes v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/olivere/elastic/v7 v7.0.15
github.com/pkg/errors v0.9.1
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/common v0.4.0
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.0
Expand All @@ -43,10 +43,10 @@ require (
github.com/stretchr/testify v1.6.1
github.com/ztrue/tracerr v0.3.0
go.mongodb.org/mongo-driver v1.4.5
go.uber.org/atomic v1.6.0
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/dig v1.10.0
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
google.golang.org/grpc v1.34.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ github.com/crawlab-team/goseaweedfs v0.1.6 h1:4Qmz75e65h4gbj0/sTM7pPVFwVJl8MihXf
github.com/crawlab-team/goseaweedfs v0.1.6/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA=
github.com/crawlab-team/goseaweedfs v0.2.0 h1:9acRgmJVss+ghIqm/XMCNU0G2RUBAvvDaOOASvWNInc=
github.com/crawlab-team/goseaweedfs v0.2.0/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA=
github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917 h1:Kb8AErE3357UO0jPf8Q2wqG/qcmL0hKDwPMaOZ/JjcY=
github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
55 changes: 39 additions & 16 deletions task/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.uber.org/dig"
"math/rand"
"sync"
"time"
)
Expand Down Expand Up @@ -267,9 +268,9 @@ func (svc *Service) getTaskQueueItems() (tqList []models.TaskQueueItem, err erro
return tqList, nil
}

func (svc *Service) getResourcesAndNodesMap() (resources map[string]interfaces.Node, nodesMap map[primitive.ObjectID]interfaces.Node, err error) {
nodesMap = map[primitive.ObjectID]interfaces.Node{}
resources = map[string]interfaces.Node{}
func (svc *Service) getResourcesAndNodesMap() (resources map[string]models.Node, nodesMap map[primitive.ObjectID]models.Node, err error) {
nodesMap = map[primitive.ObjectID]models.Node{}
resources = map[string]models.Node{}
query := bson.M{
// enabled: true
"en": true,
Expand All @@ -288,43 +289,65 @@ func (svc *Service) getResourcesAndNodesMap() (resources map[string]interfaces.N
return nil, nil, err
}
for _, n := range nodes {
nodesMap[n.Id] = &n
nodesMap[n.Id] = n
for i := 0; i < n.AvailableRunners; i++ {
key := fmt.Sprintf("%s:%d", n.Id.Hex(), i)
resources[key] = &n
resources[key] = n
}
}
return resources, nodesMap, nil
}

func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]interfaces.Node, err error) {
func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]models.Node, err error) {
// get resources and nodes map
resources, nodesMap, err := svc.getResourcesAndNodesMap()
if err != nil {
return nil, nil, err
}
if resources == nil || len(resources) == 0 {
return nil, nil, nil
}
// TODO: shuffle resources
//var resourcesShuffled []models.Node
//rand.Seed(time.Now().Unix())
//rand.Shuffle(len(resources), func(i, j int) {
// resources[i], resources[j] = resources[j], resources[i]
//})

// resources list
var resourcesList []models.Node
for _, r := range resources {
resourcesList = append(resourcesList, r)
}

// shuffle resources list
rand.Seed(time.Now().Unix())
rand.Shuffle(len(resourcesList), func(i, j int) {
resourcesList[i], resourcesList[j] = resourcesList[j], resourcesList[i]
})

// iterate task queue items
for _, tq := range tqList {
// task
t, err := svc.modelSvc.GetTaskById(tq.GetId())
if err != nil {
return nil, nil, err
}
for key, r := range resources {

// iterate shuffled resources to match a resource
for i, r := range resourcesList {
// If node id is unset or node id of task matches with resource id (node id),
// assign corresponding resource id to the task
if t.GetNodeId().IsZero() ||
t.GetNodeId() == r.GetId() {
// assign resource id
t.NodeId = r.GetId()

// append to tasks
tasks = append(tasks, t)
delete(resources, key)

// delete from resources list
resourcesList = append(resourcesList[:i], resourcesList[(i+1):]...)

// decrement available runners
n := nodesMap[r.GetId()]
n.DecrementAvailableRunners()

// break loop
break
}
}
Expand All @@ -333,9 +356,9 @@ func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []inter
return tasks, nodesMap, nil
}

func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]interfaces.Node) (err error) {
func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]models.Node) (err error) {
for _, n := range nodesMap {
if err := delegate.NewModelNodeDelegate(n).Save(); err != nil {
if err := delegate.NewModelNodeDelegate(&n).Save(); err != nil {
return err
}
}
Expand Down
14 changes: 14 additions & 0 deletions utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package utils

import (
"errors"
"math/rand"
"reflect"
"time"
)

func StringArrayContains(arr []string, str string) bool {
Expand Down Expand Up @@ -30,3 +32,15 @@ func GetArrayItems(array interface{}) (res []interface{}, err error) {
}
return res, nil
}

func ShuffleArray(slice []interface{}) (err error) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for len(slice) > 0 {
n := len(slice)
randIndex := r.Intn(n)
slice[n-1], slice[randIndex] = slice[randIndex], slice[n-1]
slice = slice[:n-1]
}

return nil
}

0 comments on commit dab9b44

Please sign in to comment.