Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: refine the datacoord meta related interfaces #37957

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/datacoord/analyze_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func (m *analyzeMeta) AddAnalyzeTask(task *indexpb.AnalyzeTask) error {
return m.saveTask(task)
}

func (m *analyzeMeta) DropAnalyzeTask(taskID int64) error {
func (m *analyzeMeta) DropAnalyzeTask(ctx context.Context, taskID int64) error {
m.Lock()
defer m.Unlock()

log.Info("drop analyze task", zap.Int64("taskID", taskID))
if err := m.catalog.DropAnalyzeTask(m.ctx, taskID); err != nil {
if err := m.catalog.DropAnalyzeTask(ctx, taskID); err != nil {
log.Warn("drop analyze task by catalog failed", zap.Int64("taskID", taskID),
zap.Error(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/analyze_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *AnalyzeMetaSuite) Test_AnalyzeMeta() {
})

s.Run("DropAnalyzeTask", func() {
err := am.DropAnalyzeTask(7)
err := am.DropAnalyzeTask(ctx, 7)
s.NoError(err)
s.Equal(6, len(am.GetAllTasks()))
})
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *AnalyzeMetaSuite) Test_failCase() {
})

s.Run("DropAnalyzeTask", func() {
err := am.DropAnalyzeTask(1)
err := am.DropAnalyzeTask(ctx, 1)
s.Error(err)
s.NotNil(am.GetTask(1))
})
Expand Down
22 changes: 11 additions & 11 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
isFull() bool
// get compaction tasks by signal id
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(signalID int64) *compactionInfo
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
}

Expand Down Expand Up @@ -96,8 +96,8 @@
stopWg sync.WaitGroup
}

func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo {
tasks := c.meta.GetCompactionTasksByTriggerID(triggerID)
func (c *compactionPlanHandler) getCompactionInfo(ctx context.Context, triggerID int64) *compactionInfo {
tasks := c.meta.GetCompactionTasksByTriggerID(ctx, triggerID)
return summaryCompactionState(tasks)
}

Expand Down Expand Up @@ -323,7 +323,7 @@

func (c *compactionPlanHandler) loadMeta() {
// TODO: make it compatible to all types of compaction with persist meta
triggers := c.meta.GetCompactionTasks()
triggers := c.meta.GetCompactionTasks(context.TODO())
for _, tasks := range triggers {
for _, task := range tasks {
state := task.GetState()
Expand All @@ -346,7 +346,7 @@
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
c.meta.DropCompactionTask(context.TODO(), task)

Check warning on line 349 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L349

Added line #L349 was not covered by tests
continue
}
if t.NeedReAssignNodeID() {
Expand Down Expand Up @@ -434,14 +434,14 @@

func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
// gc clustering compaction tasks
triggers := c.meta.GetCompactionTasks()
triggers := c.meta.GetCompactionTasks(context.TODO())
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
err := c.meta.DropCompactionTask(context.TODO(), task)
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
if err != nil {
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
Expand Down Expand Up @@ -478,7 +478,7 @@
for _, info := range unusedPartStats {
log.Debug("collection has been dropped, remove partition stats",
zap.Int64("collID", info.GetCollectionID()))
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
if err := c.meta.CleanPartitionStatsInfo(context.TODO(), info); err != nil {

Check warning on line 481 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L481

Added line #L481 was not covered by tests
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
Expand All @@ -492,7 +492,7 @@
if len(infos) > 2 {
for i := 2; i < len(infos); i++ {
info := infos[i]
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
if err := c.meta.CleanPartitionStatsInfo(context.TODO(), info); err != nil {

Check warning on line 495 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L495

Added line #L495 was not covered by tests
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
Expand Down Expand Up @@ -592,7 +592,7 @@
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
c.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false)

Check warning on line 595 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L595

Added line #L595 was not covered by tests
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
Expand All @@ -614,7 +614,7 @@
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(context.TODO(), t.GetInputSegments())
if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context
segment.GetLevel() == datapb.SegmentLevel_L2 &&
segment.isCompacting
}
segments := policy.meta.SelectSegments(SegmentFilterFunc(getCompactingL2Segment))
segments := policy.meta.SelectSegments(ctx, SegmentFilterFunc(getCompactingL2Segment))
if len(segments) > 0 {
log.Ctx(ctx).Info("there are some segments are compacting",
zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionAbnormal() {
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKeySchema() {
ctx := context.Background()
coll := &collectionInfo{
ID: 100,
Schema: newTestSchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
s.meta.compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
Expand All @@ -230,13 +231,14 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNoClusteringKe
}

func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
ctx := context.Background()
coll := &collectionInfo{
ID: 100,
Schema: newTestScalarClusteringKeySchema(),
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(coll, nil)

s.meta.compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
s.meta.compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: 100,
Expand All @@ -250,6 +252,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionCompacting() {
}

func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting() {
ctx := context.Background()
s.Run("no collection is compacting", func() {
compacting, triggerID := s.clusteringCompactionPolicy.collectionIsClusteringCompacting(collID)
s.False(compacting)
Expand Down Expand Up @@ -280,7 +283,7 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting()
s.clusteringCompactionPolicy.meta = &meta{
compactionTaskMeta: compactionTaskMeta,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/compaction_policy_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *SingleCompactionPolicySuite) TestIsDeleteRowsTooManySegment() {
}

func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
ctx := context.Background()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.IndexBasedCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.IndexBasedCompaction.Key)

Expand All @@ -134,7 +135,7 @@ func (s *SingleCompactionPolicySuite) TestL2SingleCompaction() {
compactionTaskMeta: compactionTaskMeta,
segments: segmentsInfo,
}
compactionTaskMeta.SaveCompactionTask(&datapb.CompactionTask{
compactionTaskMeta.SaveCompactionTask(ctx, &datapb.CompactionTask{
TriggerID: 1,
PlanID: 10,
CollectionID: collID,
Expand Down
20 changes: 10 additions & 10 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
log := log.With(zap.Int64("taskID", taskProto.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

for _, segID := range taskProto.GetInputSegments() {
segInfo := t.meta.GetHealthySegment(segID)
segInfo := t.meta.GetHealthySegment(context.TODO(), segID)
if segInfo == nil {
return nil, merr.WrapErrSegmentNotFound(segID)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})

_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetTaskProto(), t.result)
_, metricMutation, err := t.meta.CompleteCompactionMutation(context.TODO(), t.GetTaskProto(), t.result)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("markResultSegmentVisible UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markResultSegmentVisible UpdateSegmentsInfo", err)
Expand All @@ -449,7 +449,7 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
for _, segID := range t.GetTaskProto().GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
Expand Down Expand Up @@ -519,7 +519,7 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}

func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
t.meta.SetSegmentsCompacting(context.TODO(), t.GetTaskProto().GetInputSegments(), false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Expand All @@ -532,7 +532,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}
isInputDropped := false
for _, segID := range t.GetTaskProto().GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
if t.meta.GetHealthySegment(context.TODO(), segID) == nil {
isInputDropped = true
break
}
Expand All @@ -559,7 +559,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
Expand All @@ -576,7 +576,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
Expand All @@ -593,7 +593,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Version: t.GetTaskProto().GetPlanID(),
SegmentIDs: t.GetTaskProto().GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
err := t.meta.CleanPartitionStatsInfo(context.TODO(), partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}
Expand Down Expand Up @@ -703,7 +703,7 @@ func (t *clusteringCompactionTask) checkTimeout() bool {
}

func (t *clusteringCompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(task)
return t.meta.SaveCompactionTask(context.TODO(), task)
}

func (t *clusteringCompactionTask) SaveTaskMeta() error {
Expand Down
28 changes: 14 additions & 14 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processPipelining()

seg11 := s.meta.GetSegment(101)
seg11 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
seg21 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)

Expand Down Expand Up @@ -165,21 +165,21 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processFailedOrTimeout()

seg12 := s.meta.GetSegment(101)
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
s.Equal(commonpb.SegmentState_Dropped, seg12.State)

seg22 := s.meta.GetSegment(102)
seg22 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg22.State)

seg32 := s.meta.GetSegment(103)
seg32 := s.meta.GetSegment(context.TODO(), 103)
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
s.Equal(int64(0), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg32.State)

seg42 := s.meta.GetSegment(104)
seg42 := s.meta.GetSegment(context.TODO(), 104)
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
s.Equal(int64(0), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg42.State)
Expand Down Expand Up @@ -254,29 +254,29 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang

task.processFailedOrTimeout()

seg12 := s.meta.GetSegment(101)
seg12 := s.meta.GetSegment(context.TODO(), 101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
seg22 := s.meta.GetSegment(102)
seg22 := s.meta.GetSegment(context.TODO(), 102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)

seg32 := s.meta.GetSegment(103)
seg32 := s.meta.GetSegment(context.TODO(), 103)
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
s.Equal(commonpb.SegmentState_Dropped, seg32.State)
s.True(seg32.IsInvisible)

seg42 := s.meta.GetSegment(104)
seg42 := s.meta.GetSegment(context.TODO(), 104)
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
s.Equal(commonpb.SegmentState_Dropped, seg42.State)
s.True(seg42.IsInvisible)

seg52 := s.meta.GetSegment(105)
seg52 := s.meta.GetSegment(context.TODO(), 105)
s.Equal(datapb.SegmentLevel_L2, seg52.Level)
s.Equal(int64(10001), seg52.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg52.State)
s.True(seg52.IsInvisible)

seg62 := s.meta.GetSegment(106)
seg62 := s.meta.GetSegment(context.TODO(), 106)
s.Equal(datapb.SegmentLevel_L2, seg62.Level)
s.Equal(int64(10001), seg62.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg62.State)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
}

task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
err := s.meta.indexMeta.CreateIndex(index)
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
s.NoError(err)

s.False(task.Process())
Expand All @@ -650,7 +650,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
CollectionID: 1,
IndexID: 3,
}
err := s.meta.indexMeta.CreateIndex(index)
err := s.meta.indexMeta.CreateIndex(context.TODO(), index)
s.NoError(err)

s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
Expand Down
Loading
Loading