Skip to content

Commit

Permalink
fix: Use correct policy merging growing&l0 and add unit tests (#37950)
Browse files Browse the repository at this point in the history
Related to #37574

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 24, 2024
1 parent 8c9dab5 commit 1e76d2b
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 4 deletions.
8 changes: 4 additions & 4 deletions internal/querynodev2/delegator/delta_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteDa
}

func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.Segment) error {
switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy {
case ForwardPolicyDefault, StreamingForwardPolicyBF:
switch sd.l0ForwardPolicy {
case ForwardPolicyDefault, L0ForwardPolicyBF:
return sd.addL0GrowingBF(ctx, segment)
case StreamingForwardPolicyDirect:
case L0ForwardPolicyRemoteLoad:
// forward streaming deletion without bf filtering
return sd.addL0ForGrowingLoad(ctx, segment)
default:
log.Fatal("unsupported streaming forward policy", zap.String("policy", policy))
log.Fatal("unsupported l0 forward policy", zap.String("policy", sd.l0ForwardPolicy))
}
return nil
}
Expand Down
231 changes: 231 additions & 0 deletions internal/querynodev2/delegator/delta_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package delegator

import (
"context"
"math/rand"
"testing"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
Expand Down Expand Up @@ -265,3 +268,231 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() {
func TestStreamingForward(t *testing.T) {
suite.Run(t, new(StreamingForwardSuite))
}

type GrowingMergeL0Suite struct {
suite.Suite

collectionID int64
partitionIDs []int64
replicaID int64
vchannelName string
version int64
schema *schemapb.CollectionSchema
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream

delegator *shardDelegator
chunkManager storage.ChunkManager
rootPath string
}

func (s *GrowingMergeL0Suite) SetupSuite() {
paramtable.Init()
paramtable.SetNodeID(1)
}

func (s *GrowingMergeL0Suite) SetupTest() {
s.collectionID = 1000
s.partitionIDs = []int64{500, 501}
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000v0"
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment {
ms := &segments.MockSegment{}
ms.EXPECT().ID().Return(info.GetSegmentID())
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
ms.EXPECT().Partition().Return(info.GetPartitionID())
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
return ms
})
}, nil)

// init schema
s.schema = &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Name: "id",
FieldID: 100,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
},
{
Name: "vector",
FieldID: 101,
IsPrimaryKey: false,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}
s.manager.Collection.PutOrRef(s.collectionID, s.schema, &segcorepb.CollectionIndexMeta{
MaxIndexRowCount: 100,
IndexMetas: []*segcorepb.FieldIndexMeta{
{
FieldID: 101,
CollectionID: s.collectionID,
IndexName: "binary_index",
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "BIN_IVF_FLAT",
},
{
Key: common.MetricTypeKey,
Value: metric.JACCARD,
},
},
},
},
}, &querypb.LoadMetaInfo{
PartitionIDs: s.partitionIDs,
})

s.mq = &msgstream.MockMsgStream{}
s.rootPath = "delegator_test"

// init chunkManager
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())

delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
}, 10000, nil, s.chunkManager)
s.Require().NoError(err)

sd, ok := delegator.(*shardDelegator)
s.Require().True(ok)
s.delegator = sd
}

func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() {
sd := s.delegator
sd.l0ForwardPolicy = L0ForwardPolicyBF

seg := segments.NewMockSegment(s.T())
coll := s.manager.Collection.Get(s.collectionID)
l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{
SegmentID: 10001,
CollectionID: s.collectionID,
PartitionID: common.AllPartitionsID,
InsertChannel: s.vchannelName,
})
s.Require().NoError(err)

n := 10
deltaData := storage.NewDeltaData(int64(n))

for i := 0; i < n; i++ {
deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0)
}
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
s.Require().NoError(err)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)

seg.EXPECT().ID().Return(10000)
seg.EXPECT().Partition().Return(100)
seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {
s.Equal(deltaData.DeletePks(), pk)
s.Equal(deltaData.DeleteTimestamps(), u)
return nil
}).Once()

err = sd.addL0ForGrowing(context.Background(), seg)
s.NoError(err)

seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {
return errors.New("mocked")
}).Once()
err = sd.addL0ForGrowing(context.Background(), seg)
s.Error(err)
}

func (s *GrowingMergeL0Suite) TestAddL0ForGrowingLoad() {
sd := s.delegator
sd.l0ForwardPolicy = L0ForwardPolicyRemoteLoad

seg := segments.NewMockSegment(s.T())
coll := s.manager.Collection.Get(s.collectionID)
l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{
SegmentID: 10001,
CollectionID: s.collectionID,
PartitionID: common.AllPartitionsID,
InsertChannel: s.vchannelName,
Deltalogs: []*datapb.FieldBinlog{
{Binlogs: []*datapb.Binlog{
{LogPath: "mocked_log_path"},
}},
},
})
s.Require().NoError(err)

n := 10
deltaData := storage.NewDeltaData(int64(n))

for i := 0; i < n; i++ {
deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0)
}
err = l0Segment.LoadDeltaData(context.Background(), deltaData)
s.Require().NoError(err)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment)

seg.EXPECT().ID().Return(10000)
seg.EXPECT().Partition().Return(100)
s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error {
s.ElementsMatch([]string{"mocked_log_path"}, lo.Flatten(lo.Map(fb, func(fbl *datapb.FieldBinlog, _ int) []string {
return lo.Map(fbl.Binlogs, func(bl *datapb.Binlog, _ int) string { return bl.LogPath })
})))
return nil
}).Once()

err = sd.addL0ForGrowing(context.Background(), seg)
s.NoError(err)

s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error {
return errors.New("mocked")
}).Once()
err = sd.addL0ForGrowing(context.Background(), seg)
s.Error(err)
}

func (s *GrowingMergeL0Suite) TestAddL0ForGrowingInvalid() {
sd := s.delegator
sd.l0ForwardPolicy = "invalid_policy"

seg := segments.NewMockSegment(s.T())
s.Panics(func() {
sd.addL0ForGrowing(context.Background(), seg)
})
}

func TestGrowingMergeL0(t *testing.T) {
suite.Run(t, new(GrowingMergeL0Suite))
}

0 comments on commit 1e76d2b

Please sign in to comment.