Skip to content

Commit

Permalink
feat(data): added de-duplication
Browse files Browse the repository at this point in the history
AB#49
  • Loading branch information
tikazyq committed Dec 29, 2022
1 parent 5b0a170 commit 5b7d8f8
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 105 deletions.
10 changes: 10 additions & 0 deletions constants/results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package constants

const (
HashKey = "_h"
)

const (
DedupTypeIgnore = "ignore"
DedupTypeOverwrite = "overwrite"
)
40 changes: 38 additions & 2 deletions controllers/data_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@ package controllers
import (
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/service"
"github.com/crawlab-team/crawlab-db/mongo"
"github.com/gin-gonic/gin"
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.uber.org/dig"
"net/http"
)

var DataCollectionController *dataCollectionController

func getDataCollectionActions() []Action {
//ctx := newDataCollectionContext()
return []Action{}
ctx := newDataCollectionContext()
return []Action{
{
Method: http.MethodPost,
Path: "/:id/indexes",
HandlerFunc: ctx.postIndexes,
},
}
}

type dataCollectionController struct {
Expand All @@ -24,6 +35,31 @@ type dataCollectionContext struct {
resultSvc interfaces.ResultService
}

func (ctx *dataCollectionContext) postIndexes(c *gin.Context) {
id, err := primitive.ObjectIDFromHex(c.Param("id"))
if err != nil {
HandleErrorBadRequest(c, err)
return
}

dc, err := ctx.modelSvc.GetDataCollectionById(id)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

for _, f := range dc.Fields {
if err := mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{
Keys: f.Key,
}); err != nil {
HandleErrorInternalServerError(c, err)
return
}
}

HandleSuccess(c)
}

var _dataCollectionCtx *dataCollectionContext

func newDataCollectionContext() *dataCollectionContext {
Expand Down
2 changes: 1 addition & 1 deletion controllers/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (ctx *resultContext) getList(c *gin.Context) {
}

// service
svc, err := result.GetResultService(s)
svc, err := result.GetResultService(s.Id)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand Down
7 changes: 4 additions & 3 deletions controllers/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ func (ctx *spiderContext) saveDir(c *gin.Context) {
}

data := []byte("")
path := fmt.Sprintf("%s/%s", payload.Path, constants.FsKeepFileName)
filePath := fmt.Sprintf("%s/%s", payload.Path, constants.FsKeepFileName)

if err := fsSvc.Save(path, data); err != nil {
if err := fsSvc.Save(filePath, data); err != nil {
HandleErrorInternalServerError(c, err)
return
}
Expand Down Expand Up @@ -1042,7 +1042,8 @@ func (ctx *spiderContext) _upsertDataCollection(c *gin.Context, s *models.Spider
s.ColId = dc.Id

// create index
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{"_tid": 1}})
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.TaskKey: 1}})
_ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.HashKey: 1}})
} else {
// with id
dc, err := ctx.modelSvc.GetDataCollectionById(s.ColId)
Expand Down
20 changes: 3 additions & 17 deletions controllers/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controllers
import (
"github.com/crawlab-team/crawlab-core/config"
"github.com/crawlab-team/crawlab-core/constants"
"github.com/crawlab-team/crawlab-core/entity"
"github.com/crawlab-team/crawlab-core/errors"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/models"
Expand All @@ -22,7 +21,6 @@ import (
"go.uber.org/dig"
"net/http"
"strings"
"time"
)

var TaskController *taskController
Expand Down Expand Up @@ -83,9 +81,6 @@ type taskContext struct {
adminSvc interfaces.SpiderAdminService
schedulerSvc interfaces.TaskSchedulerService
l log.Driver

// internals
drivers entity.TTLMap
}

func (ctx *taskContext) run(c *gin.Context) {
Expand Down Expand Up @@ -363,15 +358,8 @@ func (ctx *taskContext) getData(c *gin.Context) {
return
}

// spider
s, err := ctx.modelSvc.GetSpiderById(t.SpiderId)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

// result service
resultSvc, err := result.GetResultService(s)
resultSvc, err := result.GetResultService(t.SpiderId)
if err != nil {
HandleErrorInternalServerError(c, err)
return
Expand All @@ -380,7 +368,7 @@ func (ctx *taskContext) getData(c *gin.Context) {
// query
query := generic.ListQuery{
generic.ListQueryCondition{
Key: "_tid",
Key: constants.TaskKey,
Op: generic.OpEqual,
Value: t.Id,
},
Expand Down Expand Up @@ -409,9 +397,7 @@ func (ctx *taskContext) getData(c *gin.Context) {

func newTaskContext() *taskContext {
// context
ctx := &taskContext{
drivers: entity.NewTTLMap(15 * time.Minute),
}
ctx := &taskContext{}

// dependency injection
c := dig.New()
Expand Down
9 changes: 6 additions & 3 deletions entity/ttl_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ func (t *TTLMap) Load(key string) (val interface{}) {
return expireEntry.Value
}

func NewTTLMap(ttl time.Duration) (m TTLMap) {
m.TTL = ttl
func NewTTLMap(ttl time.Duration) (m *TTLMap) {
m = &TTLMap{
TTL: ttl,
}

go func() {
for now := range time.Tick(time.Second) {
m.data.Range(func(k, v interface{}) bool {
if v.(expireEntry).ExpiresAt.After(now) {
expiresAt := v.(expireEntry).ExpiresAt
if expiresAt.Before(now) {
m.data.Delete(k)
}
return true
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/apex/log v1.9.0
github.com/blang/semver/v4 v4.0.0
github.com/cenkalti/backoff/v4 v4.1.0
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484
github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20221218100256-a28d12756f73
github.com/crawlab-team/crawlab-grpc v0.6.0-beta.20211219.1930.0.20221020032435-afa1c691f73c
github.com/crawlab-team/crawlab-vcs v0.6.0-beta.20211113.2048.0.20221024150201-467fe22630f8
Expand All @@ -31,7 +31,6 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matcornic/hermes/v2 v2.1.0
github.com/mitchellh/go-homedir v1.1.0
github.com/nleeper/goment v1.4.4 // indirect
github.com/olivere/elastic/v7 v7.0.15
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/crawlab-team/crawlab-db v0.6.0-1/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU=
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2 h1:dmC9D/BSHv6yh8khtQ7cDYFh8Yzr8NmhbAMBS36h9dA=
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221020034454-40c7a5c6cbe2/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU=
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484 h1:1CXWC3lYcVWcgPRc3PNKzZ3fcfX5WZ/V8xwzHEMUFHQ=
github.com/crawlab-team/crawlab-db v0.6.0-beta.20220417.1300.0.20221226064900-5a357ee73484/go.mod h1:gfeF0nAnFuup6iYvgHkY0in/HpO/+JktXqVNMdhoxhU=
github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940/go.mod h1:dA1G6xeiClbTMkjRuoagGrcKfQ97jJZRAhZUSwrKdoI=
github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20220912130912-a47819e0c7c9 h1:h1SCAinaakI8rtL2kh3ciTSfiHaC7RyD/HK1d3Mil7U=
github.com/crawlab-team/crawlab-fs v0.6.0-beta.20211101.1940.0.20220912130912-a47819e0c7c9/go.mod h1:y9YhLLR3GuPrDuPKe7ZuiHCITK9K2IcI8nlznF8YIEc=
Expand Down
1 change: 1 addition & 0 deletions interfaces/result_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type ResultService interface {
Insert(records ...interface{}) (err error)
List(query generic.ListQuery, opts *generic.ListOptions) (results []interface{}, err error)
Count(query generic.ListQuery) (n int, err error)
Index(fields []string)
}
5 changes: 5 additions & 0 deletions models/models/data_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ type DataCollection struct {
Id primitive.ObjectID `json:"_id" bson:"_id"`
Name string `json:"name" bson:"name"`
Fields []entity.DataField `json:"fields" bson:"fields"`
Dedup struct {
Enabled bool `json:"enabled" bson:"enabled"`
Keys []string `json:"keys" bson:"keys"`
Type string `json:"type" bson:"type"`
} `json:"dedup" bson:"dedup"`
}

func (dc *DataCollection) GetId() (id primitive.ObjectID) {
Expand Down
5 changes: 3 additions & 2 deletions models/models/result.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"github.com/crawlab-team/crawlab-core/constants"
"github.com/crawlab-team/crawlab-core/interfaces"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (r *Result) GetValue(key string) (value interface{}) {
}

func (r *Result) GetTaskId() (id primitive.ObjectID) {
res := r.GetValue("_tid")
res := r.GetValue(constants.TaskKey)
if res == nil {
return id
}
Expand All @@ -45,7 +46,7 @@ func (r *Result) GetTaskId() (id primitive.ObjectID) {
}

func (r *Result) SetTaskId(id primitive.ObjectID) {
r.SetValue("_tid", id)
r.SetValue(constants.TaskKey, id)
}

type ResultList []Result
Expand Down
37 changes: 2 additions & 35 deletions models/models/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,10 @@ type Spider struct {
Stat *SpiderStat `json:"stat,omitempty" bson:"-"`
GitId primitive.ObjectID `json:"git_id" bson:"git_id"`

IsPublic bool `json:"is_public" bson:"is_public"` // 是否公开
Envs []Env `json:"envs" bson:"envs"` // 环境变量

// 自定义爬虫
Cmd string `json:"cmd" bson:"cmd"` // 执行命令
// execution
Cmd string `json:"cmd" bson:"cmd"` // execute command
Param string `json:"param" bson:"param"` // default task param
Priority int `json:"priority" bson:"priority"`

// Scrapy 爬虫(属于自定义爬虫)
IsScrapy bool `json:"is_scrapy" bson:"is_scrapy"` // 是否为 Scrapy 爬虫
SpiderNames []string `json:"spider_names" bson:"spider_names"` // 爬虫名称列表

// 可配置爬虫
Template string `json:"template" bson:"template"` // Spiderfile模版

// Git 设置
IsGit bool `json:"is_git" bson:"is_git"` // 是否为 Git
GitUrl string `json:"git_url" bson:"git_url"` // Git URL
GitBranch string `json:"git_branch" bson:"git_branch"` // Git 分支
GitHasCredential bool `json:"git_has_credential" bson:"git_has_credential"` // Git 是否加密
GitUsername string `json:"git_username" bson:"git_username"` // Git 用户名
GitPassword string `json:"git_password" bson:"git_password"` // Git 密码
GitAutoSync bool `json:"git_auto_sync" bson:"git_auto_sync"` // Git 是否自动同步
GitSyncFrequency string `json:"git_sync_frequency" bson:"git_sync_frequency"` // Git 同步频率
GitSyncError string `json:"git_sync_error" bson:"git_sync_error"` // Git 同步错误

// 长任务
IsLongTask bool `json:"is_long_task" bson:"is_long_task"` // 是否为长任务

// 去重
IsDedup bool `json:"is_dedup" bson:"is_dedup"` // 是否去重
DedupField string `json:"dedup_field" bson:"dedup_field"` // 去重字段
DedupMethod string `json:"dedup_method" bson:"dedup_method"` // 去重方式

// Web Hook
IsWebHook bool `json:"is_web_hook" bson:"is_web_hook"` // 是否开启 Web Hook
WebHookUrl string `json:"web_hook_url" bson:"web_hook_url"` // Web Hook URL
}

func (s *Spider) GetId() (id primitive.ObjectID) {
Expand Down
29 changes: 21 additions & 8 deletions result/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package result

import (
"fmt"
"github.com/crawlab-team/crawlab-core/entity"
"github.com/crawlab-team/crawlab-core/errors"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/models"
"github.com/crawlab-team/crawlab-core/models/service"
"github.com/crawlab-team/go-trace"
"sync"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)

func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.ResultService, err error) {
Expand Down Expand Up @@ -35,9 +37,21 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res
return svc, nil
}

var store = sync.Map{}
var store = entity.NewTTLMap(5 * time.Second)

func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) {
// model service
modelSvc, err := service.GetService()
if err != nil {
return nil, trace.TraceError(err)
}

// spider
s, err := modelSvc.GetSpiderById(spiderId)
if err != nil {
return nil, trace.TraceError(err)
}

func GetResultService(s *models.Spider, opts ...Option) (svc interfaces.ResultService, err error) {
// apply options
_opts := &Options{}
for _, opt := range opts {
Expand All @@ -48,24 +62,23 @@ func GetResultService(s *models.Spider, opts ...Option) (svc interfaces.ResultSe
storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex()

// attempt to load result service from store
res, ok := store.Load(storeKey)
if ok {
svc, ok = res.(interfaces.ResultService)
res := store.Load(storeKey)
if res != nil {
svc, ok := res.(interfaces.ResultService)
if ok {
return svc, nil
}
}

// registry key
var registryKey string
modelSvc, _ := service.NewService()
ds, _ := modelSvc.GetDataSourceById(s.DataSourceId)
if ds != nil {
registryKey = ds.Type
}

// create a new result service if not exists
svc, err = NewResultService(registryKey, s)
svc, err := NewResultService(registryKey, s)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 5b7d8f8

Please sign in to comment.