From 5b7d8f82c6ff8492089d68a9ef06d7308d29b6d5 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 29 Dec 2022 21:54:32 +0800 Subject: [PATCH] feat(data): added de-duplication AB#49 --- constants/results.go | 10 +++++++ controllers/data_collection.go | 40 ++++++++++++++++++++++++-- controllers/result.go | 2 +- controllers/spider.go | 7 +++-- controllers/task.go | 20 ++----------- entity/ttl_map.go | 9 ++++-- go.mod | 3 +- go.sum | 2 ++ interfaces/result_service.go | 1 + models/models/data_collection.go | 5 ++++ models/models/result.go | 5 ++-- models/models/spider.go | 37 ++---------------------- result/service.go | 29 +++++++++++++------ result/service_mongo.go | 49 ++++++++++++++++++++++++++++++-- result/test/service_test.go | 33 +++++++++++---------- task/stats/service.go | 14 ++------- utils/result.go | 20 ++++++++++++- 17 files changed, 181 insertions(+), 105 deletions(-) create mode 100644 constants/results.go diff --git a/constants/results.go b/constants/results.go new file mode 100644 index 0000000..7053111 --- /dev/null +++ b/constants/results.go @@ -0,0 +1,10 @@ +package constants + +const ( + HashKey = "_h" +) + +const ( + DedupTypeIgnore = "ignore" + DedupTypeOverwrite = "overwrite" +) diff --git a/controllers/data_collection.go b/controllers/data_collection.go index 8bdd034..f9143b2 100644 --- a/controllers/data_collection.go +++ b/controllers/data_collection.go @@ -3,14 +3,25 @@ package controllers import ( "github.com/crawlab-team/crawlab-core/interfaces" "github.com/crawlab-team/crawlab-core/models/service" + "github.com/crawlab-team/crawlab-db/mongo" + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson/primitive" + mongo2 "go.mongodb.org/mongo-driver/mongo" "go.uber.org/dig" + "net/http" ) var DataCollectionController *dataCollectionController func getDataCollectionActions() []Action { - //ctx := newDataCollectionContext() - return []Action{} + ctx := newDataCollectionContext() + return []Action{ + { + Method: http.MethodPost, + Path: "/:id/indexes", + HandlerFunc: ctx.postIndexes, + }, + } } type dataCollectionController struct { @@ -24,6 +35,31 @@ type dataCollectionContext struct { resultSvc interfaces.ResultService } +func (ctx *dataCollectionContext) postIndexes(c *gin.Context) { + id, err := primitive.ObjectIDFromHex(c.Param("id")) + if err != nil { + HandleErrorBadRequest(c, err) + return + } + + dc, err := ctx.modelSvc.GetDataCollectionById(id) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + + for _, f := range dc.Fields { + if err := mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{ + Keys: f.Key, + }); err != nil { + HandleErrorInternalServerError(c, err) + return + } + } + + HandleSuccess(c) +} + var _dataCollectionCtx *dataCollectionContext func newDataCollectionContext() *dataCollectionContext { diff --git a/controllers/result.go b/controllers/result.go index c8cd997..1cf8530 100644 --- a/controllers/result.go +++ b/controllers/result.go @@ -79,7 +79,7 @@ func (ctx *resultContext) getList(c *gin.Context) { } // service - svc, err := result.GetResultService(s) + svc, err := result.GetResultService(s.Id) if err != nil { HandleErrorInternalServerError(c, err) return diff --git a/controllers/spider.go b/controllers/spider.go index 8fa572e..a0861d8 100644 --- a/controllers/spider.go +++ b/controllers/spider.go @@ -246,9 +246,9 @@ func (ctx *spiderContext) saveDir(c *gin.Context) { } data := []byte("") - path := fmt.Sprintf("%s/%s", payload.Path, constants.FsKeepFileName) + filePath := fmt.Sprintf("%s/%s", payload.Path, constants.FsKeepFileName) - if err := fsSvc.Save(path, data); err != nil { + if err := fsSvc.Save(filePath, data); err != nil { HandleErrorInternalServerError(c, err) return } @@ -1042,7 +1042,8 @@ func (ctx *spiderContext) _upsertDataCollection(c *gin.Context, s *models.Spider s.ColId = dc.Id // create index - _ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{"_tid": 1}}) + _ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.TaskKey: 1}}) + _ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.HashKey: 1}}) } else { // with id dc, err := ctx.modelSvc.GetDataCollectionById(s.ColId) diff --git a/controllers/task.go b/controllers/task.go index 1c1c0f5..4ac22c8 100644 --- a/controllers/task.go +++ b/controllers/task.go @@ -3,7 +3,6 @@ package controllers import ( "github.com/crawlab-team/crawlab-core/config" "github.com/crawlab-team/crawlab-core/constants" - "github.com/crawlab-team/crawlab-core/entity" "github.com/crawlab-team/crawlab-core/errors" "github.com/crawlab-team/crawlab-core/interfaces" "github.com/crawlab-team/crawlab-core/models/models" @@ -22,7 +21,6 @@ import ( "go.uber.org/dig" "net/http" "strings" - "time" ) var TaskController *taskController @@ -83,9 +81,6 @@ type taskContext struct { adminSvc interfaces.SpiderAdminService schedulerSvc interfaces.TaskSchedulerService l log.Driver - - // internals - drivers entity.TTLMap } func (ctx *taskContext) run(c *gin.Context) { @@ -363,15 +358,8 @@ func (ctx *taskContext) getData(c *gin.Context) { return } - // spider - s, err := ctx.modelSvc.GetSpiderById(t.SpiderId) - if err != nil { - HandleErrorInternalServerError(c, err) - return - } - // result service - resultSvc, err := result.GetResultService(s) + resultSvc, err := result.GetResultService(t.SpiderId) if err != nil { HandleErrorInternalServerError(c, err) return @@ -380,7 +368,7 @@ func (ctx *taskContext) getData(c *gin.Context) { // query query := generic.ListQuery{ generic.ListQueryCondition{ - Key: "_tid", + Key: constants.TaskKey, Op: generic.OpEqual, Value: t.Id, }, @@ -409,9 +397,7 @@ func (ctx *taskContext) getData(c *gin.Context) { func newTaskContext() *taskContext { // context - ctx := &taskContext{ - drivers: entity.NewTTLMap(15 * time.Minute), - } + ctx := &taskContext{} // dependency injection c := dig.New() diff --git a/entity/ttl_map.go b/entity/ttl_map.go index 9a89feb..57f7fd6 100644 --- a/entity/ttl_map.go +++ b/entity/ttl_map.go @@ -37,13 +37,16 @@ func (t *TTLMap) Load(key string) (val interface{}) { return expireEntry.Value } -func NewTTLMap(ttl time.Duration) (m TTLMap) { - m.TTL = ttl +func NewTTLMap(ttl time.Duration) (m *TTLMap) { + m = &TTLMap{ + TTL: ttl, + } go func() { for now := range time.Tick(time.Second) { m.data.Range(func(k, v interface{}) bool { - if v.(expireEntry).ExpiresAt.After(now) { + expiresAt := v.(expireEntry).ExpiresAt + if expiresAt.Before(now) { m.data.Delete(k) } return true diff --git a/go.mod b/go.mod index 2e2680d..8d09637 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/apex/log v1.9.0 github.com/blang/semver/v4 v4.0.0 github.com/cenkalti/backoff/v4 v4.1.0 - github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2 + github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484 github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20221218100256-a28d12756f73 github.com/crawlab-team/crawlab-grpc v0.6.0-beta.20211219.1930.0.20221020032435-afa1c691f73c github.com/crawlab-team/crawlab-vcs v0.6.0-beta.20211113.2048.0.20221024150201-467fe22630f8 @@ -31,7 +31,6 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/matcornic/hermes/v2 v2.1.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/nleeper/goment v1.4.4 // indirect github.com/olivere/elastic/v7 v7.0.15 github.com/pelletier/go-toml/v2 v2.0.2 // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e344b97..1a1c26f 100644 --- a/go.sum +++ b/go.sum @@ -132,6 +132,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/crawlab-team/crawlab-db v0.6.0-1/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU= github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2 h1:dmC9D/BSHv6yh8khtQ7cDYFh8Yzr8NmhbAMBS36h9dA= github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU= +github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484 h1:1CXWC3lYcVWcgPRc3PNKzZ3fcfX5WZ/V8xwzHEMUFHQ= +github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU= github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940/go.mod h1:dA1G6xeiClbTMkjRuoagGrcKfQ97jJZRAhZUSwrKdoI= github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20220912130912-a47819e0c7c9 h1:h1SCAinaakI8rtL2kh3ciTSfiHaC7RyD/HK1d3Mil7U= github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20220912130912-a47819e0c7c9/go.mod h1:y9YhLLR3GuPrDuPKe7ZuiHCITK9K2IcI8nlznF8YIEc= diff --git a/interfaces/result_service.go b/interfaces/result_service.go index 77f251a..f30b2e0 100644 --- a/interfaces/result_service.go +++ b/interfaces/result_service.go @@ -8,4 +8,5 @@ type ResultService interface { Insert(records ...interface{}) (err error) List(query generic.ListQuery, opts *generic.ListOptions) (results []interface{}, err error) Count(query generic.ListQuery) (n int, err error) + Index(fields []string) } diff --git a/models/models/data_collection.go b/models/models/data_collection.go index 3a03d7d..9e3b12a 100644 --- a/models/models/data_collection.go +++ b/models/models/data_collection.go @@ -10,6 +10,11 @@ type DataCollection struct { Id primitive.ObjectID `json:"_id" bson:"_id"` Name string `json:"name" bson:"name"` Fields []entity.DataField `json:"fields" bson:"fields"` + Dedup struct { + Enabled bool `json:"enabled" bson:"enabled"` + Keys []string `json:"keys" bson:"keys"` + Type string `json:"type" bson:"type"` + } `json:"dedup" bson:"dedup"` } func (dc *DataCollection) GetId() (id primitive.ObjectID) { diff --git a/models/models/result.go b/models/models/result.go index efe97d7..1c9bdcc 100644 --- a/models/models/result.go +++ b/models/models/result.go @@ -1,6 +1,7 @@ package models import ( + "github.com/crawlab-team/crawlab-core/constants" "github.com/crawlab-team/crawlab-core/interfaces" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -36,7 +37,7 @@ func (r *Result) GetValue(key string) (value interface{}) { } func (r *Result) GetTaskId() (id primitive.ObjectID) { - res := r.GetValue("_tid") + res := r.GetValue(constants.TaskKey) if res == nil { return id } @@ -45,7 +46,7 @@ func (r *Result) GetTaskId() (id primitive.ObjectID) { } func (r *Result) SetTaskId(id primitive.ObjectID) { - r.SetValue("_tid", id) + r.SetValue(constants.TaskKey, id) } type ResultList []Result diff --git a/models/models/spider.go b/models/models/spider.go index ad3526c..e737def 100644 --- a/models/models/spider.go +++ b/models/models/spider.go @@ -25,43 +25,10 @@ type Spider struct { Stat *SpiderStat `json:"stat,omitempty" bson:"-"` GitId primitive.ObjectID `json:"git_id" bson:"git_id"` - IsPublic bool `json:"is_public" bson:"is_public"` // 是否公开 - Envs []Env `json:"envs" bson:"envs"` // 环境变量 - - // 自定义爬虫 - Cmd string `json:"cmd" bson:"cmd"` // 执行命令 + // execution + Cmd string `json:"cmd" bson:"cmd"` // execute command Param string `json:"param" bson:"param"` // default task param Priority int `json:"priority" bson:"priority"` - - // Scrapy 爬虫(属于自定义爬虫) - IsScrapy bool `json:"is_scrapy" bson:"is_scrapy"` // 是否为 Scrapy 爬虫 - SpiderNames []string `json:"spider_names" bson:"spider_names"` // 爬虫名称列表 - - // 可配置爬虫 - Template string `json:"template" bson:"template"` // Spiderfile模版 - - // Git 设置 - IsGit bool `json:"is_git" bson:"is_git"` // 是否为 Git - GitUrl string `json:"git_url" bson:"git_url"` // Git URL - GitBranch string `json:"git_branch" bson:"git_branch"` // Git 分支 - GitHasCredential bool `json:"git_has_credential" bson:"git_has_credential"` // Git 是否加密 - GitUsername string `json:"git_username" bson:"git_username"` // Git 用户名 - GitPassword string `json:"git_password" bson:"git_password"` // Git 密码 - GitAutoSync bool `json:"git_auto_sync" bson:"git_auto_sync"` // Git 是否自动同步 - GitSyncFrequency string `json:"git_sync_frequency" bson:"git_sync_frequency"` // Git 同步频率 - GitSyncError string `json:"git_sync_error" bson:"git_sync_error"` // Git 同步错误 - - // 长任务 - IsLongTask bool `json:"is_long_task" bson:"is_long_task"` // 是否为长任务 - - // 去重 - IsDedup bool `json:"is_dedup" bson:"is_dedup"` // 是否去重 - DedupField string `json:"dedup_field" bson:"dedup_field"` // 去重字段 - DedupMethod string `json:"dedup_method" bson:"dedup_method"` // 去重方式 - - // Web Hook - IsWebHook bool `json:"is_web_hook" bson:"is_web_hook"` // 是否开启 Web Hook - WebHookUrl string `json:"web_hook_url" bson:"web_hook_url"` // Web Hook URL } func (s *Spider) GetId() (id primitive.ObjectID) { diff --git a/result/service.go b/result/service.go index 4200fd7..1efa14d 100644 --- a/result/service.go +++ b/result/service.go @@ -2,12 +2,14 @@ package result import ( "fmt" + "github.com/crawlab-team/crawlab-core/entity" "github.com/crawlab-team/crawlab-core/errors" "github.com/crawlab-team/crawlab-core/interfaces" "github.com/crawlab-team/crawlab-core/models/models" "github.com/crawlab-team/crawlab-core/models/service" "github.com/crawlab-team/go-trace" - "sync" + "go.mongodb.org/mongo-driver/bson/primitive" + "time" ) func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.ResultService, err error) { @@ -35,9 +37,21 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res return svc, nil } -var store = sync.Map{} +var store = entity.NewTTLMap(5 * time.Second) + +func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) { + // model service + modelSvc, err := service.GetService() + if err != nil { + return nil, trace.TraceError(err) + } + + // spider + s, err := modelSvc.GetSpiderById(spiderId) + if err != nil { + return nil, trace.TraceError(err) + } -func GetResultService(s *models.Spider, opts ...Option) (svc interfaces.ResultService, err error) { // apply options _opts := &Options{} for _, opt := range opts { @@ -48,9 +62,9 @@ func GetResultService(s *models.Spider, opts ...Option) (svc interfaces.ResultSe storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex() // attempt to load result service from store - res, ok := store.Load(storeKey) - if ok { - svc, ok = res.(interfaces.ResultService) + res := store.Load(storeKey) + if res != nil { + svc, ok := res.(interfaces.ResultService) if ok { return svc, nil } @@ -58,14 +72,13 @@ func GetResultService(s *models.Spider, opts ...Option) (svc interfaces.ResultSe // registry key var registryKey string - modelSvc, _ := service.NewService() ds, _ := modelSvc.GetDataSourceById(s.DataSourceId) if ds != nil { registryKey = ds.Type } // create a new result service if not exists - svc, err = NewResultService(registryKey, s) + svc, err := NewResultService(registryKey, s) if err != nil { return nil, err } diff --git a/result/service_mongo.go b/result/service_mongo.go index 5ebc8dd..cbb399f 100644 --- a/result/service_mongo.go +++ b/result/service_mongo.go @@ -1,6 +1,7 @@ package result import ( + "github.com/crawlab-team/crawlab-core/constants" "github.com/crawlab-team/crawlab-core/interfaces" "github.com/crawlab-team/crawlab-core/models/models" "github.com/crawlab-team/crawlab-core/models/service" @@ -10,6 +11,8 @@ import ( "github.com/crawlab-team/go-trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + mongo2 "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) type ServiceMongo struct { @@ -34,13 +37,53 @@ func (svc *ServiceMongo) Count(query generic.ListQuery) (n int, err error) { } func (svc *ServiceMongo) Insert(docs ...interface{}) (err error) { - _, err = mongo.GetMongoCol(svc.dc.Name).InsertMany(docs) - if err != nil { - return trace.TraceError(err) + if svc.dc.Dedup.Enabled { + for _, doc := range docs { + hash, err := utils.GetResultHash(doc, svc.dc.Dedup.Keys) + if err != nil { + return err + } + doc.(interfaces.Result).SetValue(constants.HashKey, hash) + query := bson.M{constants.HashKey: hash} + switch svc.dc.Dedup.Type { + case constants.DedupTypeIgnore: + var o bson.M + err := mongo.GetMongoCol(svc.dc.Name).Find(query, &mongo.FindOptions{Limit: 1}).One(&o) + if err == nil { + // exists, ignore + continue + } + if err != mongo2.ErrNoDocuments { + // error + return trace.TraceError(err) + } + // not exists, insert + _, err = mongo.GetMongoCol(svc.dc.Name).Insert(doc) + if err != nil { + return trace.TraceError(err) + } + case constants.DedupTypeOverwrite: + err = mongo.GetMongoCol(svc.dc.Name).ReplaceWithOptions(query, doc, &options.ReplaceOptions{Upsert: &[]bool{true}[0]}) + if err != nil { + return trace.TraceError(err) + } + } + } + } else { + _, err = mongo.GetMongoCol(svc.dc.Name).InsertMany(docs) + if err != nil { + return trace.TraceError(err) + } } return nil } +func (svc *ServiceMongo) Index(fields []string) { + for _, field := range fields { + _ = mongo.GetMongoCol(svc.dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{field: 1}}) + } +} + func (svc *ServiceMongo) getList(query bson.M, opts *mongo.FindOptions) (results []interface{}, err error) { list, err := svc.modelColSvc.GetList(query, opts) if err != nil { diff --git a/result/test/service_test.go b/result/test/service_test.go index 114e36e..fe4a2c7 100644 --- a/result/test/service_test.go +++ b/result/test/service_test.go @@ -3,7 +3,6 @@ package test import ( "github.com/crawlab-team/crawlab-core/models/models" "github.com/stretchr/testify/require" - "go.mongodb.org/mongo-driver/bson" "testing" ) @@ -27,14 +26,14 @@ func TestResultService_GetList(t *testing.T) { require.Nil(t, err) require.Equal(t, n, len(results)) - query := bson.M{ - "i": bson.M{ - "$lt": n / 2, - }, - } - results, err = T.resultSvc.List(query, nil) - require.Nil(t, err) - require.Equal(t, n/2, len(results)) + //query := bson.M{ + // "i": bson.M{ + // "$lt": n / 2, + // }, + //} + //results, err = T.resultSvc.List(query, nil) + //require.Nil(t, err) + //require.Equal(t, n/2, len(results)) } func TestResultService_Count(t *testing.T) { @@ -57,12 +56,12 @@ func TestResultService_Count(t *testing.T) { require.Nil(t, err) require.Equal(t, n, total) - query := bson.M{ - "i": bson.M{ - "$lt": n / 2, - }, - } - total, err = T.resultSvc.Count(query) - require.Nil(t, err) - require.Equal(t, n/2, total) + //query := bson.M{ + // "i": bson.M{ + // "$lt": n / 2, + // }, + //} + //total, err = T.resultSvc.Count(query) + //require.Nil(t, err) + //require.Equal(t, n/2, total) } diff --git a/task/stats/service.go b/task/stats/service.go index 31e7f0a..1264dbd 100644 --- a/task/stats/service.go +++ b/task/stats/service.go @@ -26,8 +26,7 @@ type Service struct { // internals mu sync.Mutex - logDrivers entity.TTLMap - resultServices entity.TTLMap + resultServices *entity.TTLMap logDriver log.Driver } @@ -64,14 +63,8 @@ func (svc *Service) getResultService(id primitive.ObjectID) (resultSvc interface return nil, err } - // spider - s, err := svc.modelSvc.GetSpiderById(t.SpiderId) - if err != nil { - return nil, err - } - // result service - resultSvc, err = result.GetResultService(s) + resultSvc, err = result.GetResultService(t.SpiderId) if err != nil { return nil, err } @@ -100,8 +93,7 @@ func NewTaskStatsService(opts ...Option) (svc2 interfaces.TaskStatsService, err // service svc := &Service{ TaskBaseService: baseSvc, - logDrivers: entity.NewTTLMap(15 * time.Minute), - resultServices: entity.NewTTLMap(15 * time.Minute), + resultServices: entity.NewTTLMap(5 * time.Second), } // apply options diff --git a/utils/result.go b/utils/result.go index 1546422..7eee0bc 100644 --- a/utils/result.go +++ b/utils/result.go @@ -1,5 +1,23 @@ package utils -func ResultToJson() { +import ( + "encoding/json" + "github.com/crawlab-team/crawlab-core/interfaces" +) +func GetResultHash(value interface{}, keys []string) (res string, err error) { + m := make(map[string]interface{}) + for _, k := range keys { + _value, ok := value.(interfaces.Result) + if !ok { + continue + } + v := _value.GetValue(k) + m[k] = v + } + data, err := json.Marshal(m) + if err != nil { + return "", err + } + return EncryptMd5(string(data)), nil }