diff --git a/go.mod b/go.mod index 75a2d71..fd03d15 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/crawlab-team/crawlab-log v0.1.0 github.com/crawlab-team/crawlab-vcs v0.1.0 github.com/crawlab-team/go-trace v0.1.0 - github.com/crawlab-team/goseaweedfs v0.2.0 + github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/emirpasic/gods v1.12.0 github.com/fsnotify/fsnotify v1.4.9 @@ -21,17 +21,17 @@ require ( github.com/gavv/httpexpect/v2 v2.2.0 github.com/gin-gonic/gin v1.6.3 github.com/go-ole/go-ole v1.2.5 // indirect - github.com/go-playground/validator/v10 v10.3.0 + github.com/go-playground/validator/v10 v10.3.0 // indirect github.com/gomodule/redigo v2.0.0+incompatible github.com/google/uuid v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 - github.com/hashicorp/go-sockaddr v1.0.0 + github.com/hashicorp/go-sockaddr v1.0.0 // indirect github.com/imroc/req v0.3.0 github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 - github.com/matcornic/hermes v1.2.0 + github.com/matcornic/hermes v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 github.com/olivere/elastic/v7 v7.0.15 - github.com/pkg/errors v0.9.1 + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/common v0.4.0 github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.0 @@ -43,10 +43,10 @@ require ( github.com/stretchr/testify v1.6.1 github.com/ztrue/tracerr v0.3.0 go.mongodb.org/mongo-driver v1.4.5 - go.uber.org/atomic v1.6.0 + go.uber.org/atomic v1.6.0 // indirect go.uber.org/dig v1.10.0 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect google.golang.org/grpc v1.34.0 - gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df + gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df // indirect ) diff --git a/go.sum b/go.sum index 7ed4194..bc0d397 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/crawlab-team/goseaweedfs v0.1.6 h1:4Qmz75e65h4gbj0/sTM7pPVFwVJl8MihXf github.com/crawlab-team/goseaweedfs v0.1.6/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA= github.com/crawlab-team/goseaweedfs v0.2.0 h1:9acRgmJVss+ghIqm/XMCNU0G2RUBAvvDaOOASvWNInc= github.com/crawlab-team/goseaweedfs v0.2.0/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA= +github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917 h1:Kb8AErE3357UO0jPf8Q2wqG/qcmL0hKDwPMaOZ/JjcY= +github.com/crawlab-team/goseaweedfs v0.6.0-beta.20210725.1917/go.mod h1:u+rwfqb0rnYllTLjCctE/z1Yp+TC8L+CbbWH8E2NstA= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= diff --git a/task/scheduler/service.go b/task/scheduler/service.go index 0d23747..9d65d83 100644 --- a/task/scheduler/service.go +++ b/task/scheduler/service.go @@ -23,6 +23,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" "go.uber.org/dig" + "math/rand" "sync" "time" ) @@ -267,9 +268,9 @@ func (svc *Service) getTaskQueueItems() (tqList []models.TaskQueueItem, err erro return tqList, nil } -func (svc *Service) getResourcesAndNodesMap() (resources map[string]interfaces.Node, nodesMap map[primitive.ObjectID]interfaces.Node, err error) { - nodesMap = map[primitive.ObjectID]interfaces.Node{} - resources = map[string]interfaces.Node{} +func (svc *Service) getResourcesAndNodesMap() (resources map[string]models.Node, nodesMap map[primitive.ObjectID]models.Node, err error) { + nodesMap = map[primitive.ObjectID]models.Node{} + resources = map[string]models.Node{} query := bson.M{ // enabled: true "en": true, @@ -288,16 +289,17 @@ func (svc *Service) getResourcesAndNodesMap() (resources map[string]interfaces.N return nil, nil, err } for _, n := range nodes { - nodesMap[n.Id] = &n + nodesMap[n.Id] = n for i := 0; i < n.AvailableRunners; i++ { key := fmt.Sprintf("%s:%d", n.Id.Hex(), i) - resources[key] = &n + resources[key] = n } } return resources, nodesMap, nil } -func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]interfaces.Node, err error) { +func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]models.Node, err error) { + // get resources and nodes map resources, nodesMap, err := svc.getResourcesAndNodesMap() if err != nil { return nil, nil, err @@ -305,26 +307,47 @@ func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []inter if resources == nil || len(resources) == 0 { return nil, nil, nil } - // TODO: shuffle resources - //var resourcesShuffled []models.Node - //rand.Seed(time.Now().Unix()) - //rand.Shuffle(len(resources), func(i, j int) { - // resources[i], resources[j] = resources[j], resources[i] - //}) + + // resources list + var resourcesList []models.Node + for _, r := range resources { + resourcesList = append(resourcesList, r) + } + + // shuffle resources list + rand.Seed(time.Now().Unix()) + rand.Shuffle(len(resourcesList), func(i, j int) { + resourcesList[i], resourcesList[j] = resourcesList[j], resourcesList[i] + }) + + // iterate task queue items for _, tq := range tqList { + // task t, err := svc.modelSvc.GetTaskById(tq.GetId()) if err != nil { return nil, nil, err } - for key, r := range resources { + + // iterate shuffled resources to match a resource + for i, r := range resourcesList { + // If node id is unset or node id of task matches with resource id (node id), + // assign corresponding resource id to the task if t.GetNodeId().IsZero() || t.GetNodeId() == r.GetId() { + // assign resource id t.NodeId = r.GetId() + + // append to tasks tasks = append(tasks, t) - delete(resources, key) + // delete from resources list + resourcesList = append(resourcesList[:i], resourcesList[(i+1):]...) + + // decrement available runners n := nodesMap[r.GetId()] n.DecrementAvailableRunners() + + // break loop break } } @@ -333,9 +356,9 @@ func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []inter return tasks, nodesMap, nil } -func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]interfaces.Node) (err error) { +func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]models.Node) (err error) { for _, n := range nodesMap { - if err := delegate.NewModelNodeDelegate(n).Save(); err != nil { + if err := delegate.NewModelNodeDelegate(&n).Save(); err != nil { return err } } diff --git a/utils/array.go b/utils/array.go index 8b51166..a5e958c 100644 --- a/utils/array.go +++ b/utils/array.go @@ -2,7 +2,9 @@ package utils import ( "errors" + "math/rand" "reflect" + "time" ) func StringArrayContains(arr []string, str string) bool { @@ -30,3 +32,15 @@ func GetArrayItems(array interface{}) (res []interface{}, err error) { } return res, nil } + +func ShuffleArray(slice []interface{}) (err error) { + r := rand.New(rand.NewSource(time.Now().Unix())) + for len(slice) > 0 { + n := len(slice) + randIndex := r.Intn(n) + slice[n-1], slice[randIndex] = slice[randIndex], slice[n-1] + slice = slice[:n-1] + } + + return nil +}