Skip to content

Commit

Permalink
fix: Fix subscription leak (#37382)
Browse files Browse the repository at this point in the history
Close (unsubscribe) the msg stream after completing the
`PreCreatedTopic` check to prevent backlog issue.

issue: #36021

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Nov 8, 2024
1 parent ae227e3 commit ebc3c82
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions internal/rootcoord/dml_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
}

if params.PreCreatedTopicEnabled.GetAsBool() {
subName := fmt.Sprintf("pre-created-topic-check-%s", name)
ms.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown)
// check if topic is existed
// kafka and rmq will err if the topic does not yet exist, pulsar will not
// allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty
// if there are any message that not belong to milvus, will skip it
err := ms.CheckTopicValid(name)
if err != nil {
log.Error("created topic is invaild", zap.String("name", name), zap.Error(err))
panic("created topic is invaild")
}
d.checkPreCreatedTopic(ctx, factory, name)
}

ms.AsProducer([]string{name})
Expand All @@ -220,6 +210,29 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
return d
}

func (d *dmlChannels) checkPreCreatedTopic(ctx context.Context, factory msgstream.Factory, name string) {
tmpMs, err := factory.NewMsgStream(ctx)
if err != nil {
panic(fmt.Sprintf("failed to add msgstream, name:%s, err:%v", name, err))
}
defer tmpMs.Close()

subName := fmt.Sprintf("pre-created-topic-check-%s", name)
err = tmpMs.AsConsumer(ctx, []string{name}, subName, common.SubscriptionPositionUnknown)
if err != nil {
panic(fmt.Sprintf("failed to add consumer, name:%s, err:%v", name, err))
}

// check if topic is existed
// kafka and rmq will err if the topic does not yet exist, pulsar will not
// allow topics is not empty, for the reason that when restart or upgrade, the topic is not empty
// if there are any message that not belong to milvus, will skip it
err = tmpMs.CheckTopicValid(name)
if err != nil {
panic(fmt.Sprintf("created topic is invalid, name:%s, err:%v", name, err))
}
}

func (d *dmlChannels) getChannelNames(count int) []string {
d.mut.Lock()
defer d.mut.Unlock()
Expand Down

0 comments on commit ebc3c82

Please sign in to comment.