From fca946dee181542e13379fbad8291e11d3426de8 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 11 Nov 2024 11:10:27 +0800 Subject: [PATCH] enhance: move the search utilities from querynode into new module (#37531) issue: #33285 - The search utilities will be shared between query node and streaming node. Signed-off-by: chyezh --- internal/querynodev2/delegator/delegator.go | 2 +- internal/querynodev2/server.go | 8 ++++---- internal/querynodev2/server_test.go | 2 +- internal/querynodev2/services.go | 3 ++- internal/querynodev2/tasks/query_stream_task.go | 3 ++- internal/querynodev2/tasks/query_task.go | 3 ++- internal/querynodev2/tasks/search_task.go | 9 +++++---- .../searchutil}/optimizers/mock_query_hook.go | 0 .../searchutil}/optimizers/query_hook.go | 0 .../searchutil}/optimizers/query_hook_test.go | 0 .../searchutil/scheduler}/concurrent_safe_scheduler.go | 2 +- .../scheduler}/concurrent_safe_scheduler_test.go | 2 +- .../tasks => util/searchutil/scheduler}/fifo_policy.go | 2 +- .../searchutil/scheduler}/mock_task_test.go | 2 +- .../tasks => util/searchutil/scheduler}/policy_test.go | 2 +- .../tasks => util/searchutil/scheduler}/queues.go | 2 +- .../tasks => util/searchutil/scheduler}/queues_test.go | 2 +- .../tasks => util/searchutil/scheduler}/tasks.go | 2 +- .../searchutil/scheduler}/user_task_polling_policy.go | 2 +- 19 files changed, 26 insertions(+), 22 deletions(-) rename internal/{querynodev2 => util/searchutil}/optimizers/mock_query_hook.go (100%) rename internal/{querynodev2 => util/searchutil}/optimizers/query_hook.go (100%) rename internal/{querynodev2 => util/searchutil}/optimizers/query_hook_test.go (100%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/concurrent_safe_scheduler.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/concurrent_safe_scheduler_test.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/fifo_policy.go (98%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/mock_task_test.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/policy_test.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/queues.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/queues_test.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/tasks.go (99%) rename internal/{querynodev2/tasks => util/searchutil/scheduler}/user_task_polling_policy.go (99%) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 4898524e2d792..7a75a14679b0c 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -39,13 +39,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/delegator/deletebuffer" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/function" "github.com/milvus-io/milvus/internal/util/reduce" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 6e6407df4e534..f7ff0da91f840 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -50,16 +50,16 @@ import ( grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client" "github.com/milvus-io/milvus/internal/querynodev2/cluster" "github.com/milvus-io/milvus/internal/querynodev2/delegator" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/pipeline" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/registry" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" @@ -113,7 +113,7 @@ type QueryNode struct { loader segments.Loader // Search/Query - scheduler tasks.Scheduler + scheduler scheduler.Scheduler // etcd client etcdCli *clientv3.Client @@ -339,7 +339,7 @@ func (node *QueryNode) Init() error { } schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue() - node.scheduler = tasks.NewScheduler( + node.scheduler = scheduler.NewScheduler( schedulePolicy, ) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 30164e0b8bbd2..37070bbae9786 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -34,10 +34,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/querynodev2/optimizers" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/searchutil/optimizers" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 085154f6968f0..e3ac0fb7f7ebd 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -689,7 +690,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe return resp, nil } - var task tasks.Task + var task scheduler.Task if paramtable.Get().QueryNodeCfg.UseStreamComputing.GetAsBool() { task = tasks.NewStreamingSearchTask(searchCtx, collection, node.manager, req, node.serverID) } else { diff --git a/internal/querynodev2/tasks/query_stream_task.go b/internal/querynodev2/tasks/query_stream_task.go index 1626eb76bb3a1..e24c755373b11 100644 --- a/internal/querynodev2/tasks/query_stream_task.go +++ b/internal/querynodev2/tasks/query_stream_task.go @@ -6,10 +6,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" ) -var _ Task = &QueryStreamTask{} +var _ scheduler.Task = &QueryStreamTask{} func NewQueryStreamTask(ctx context.Context, collection *segments.Collection, diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index 2a655460a8aa2..da4f18f72001e 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -22,7 +23,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var _ Task = &QueryTask{} +var _ scheduler.Task = &QueryTask{} func NewQueryTask(ctx context.Context, collection *segments.Collection, diff --git a/internal/querynodev2/tasks/search_task.go b/internal/querynodev2/tasks/search_task.go index 0a2118a787290..39f25f542f0d6 100644 --- a/internal/querynodev2/tasks/search_task.go +++ b/internal/querynodev2/tasks/search_task.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -30,8 +31,8 @@ import ( ) var ( - _ Task = &SearchTask{} - _ MergeTask = &SearchTask{} + _ scheduler.Task = &SearchTask{} + _ scheduler.MergeTask = &SearchTask{} ) type SearchTask struct { @@ -346,7 +347,7 @@ func (t *SearchTask) NQ() int64 { return t.nq } -func (t *SearchTask) MergeWith(other Task) bool { +func (t *SearchTask) MergeWith(other scheduler.Task) bool { switch other := other.(type) { case *SearchTask: return t.Merge(other) @@ -416,7 +417,7 @@ func NewStreamingSearchTask(ctx context.Context, } } -func (t *StreamingSearchTask) MergeWith(other Task) bool { +func (t *StreamingSearchTask) MergeWith(other scheduler.Task) bool { return false } diff --git a/internal/querynodev2/optimizers/mock_query_hook.go b/internal/util/searchutil/optimizers/mock_query_hook.go similarity index 100% rename from internal/querynodev2/optimizers/mock_query_hook.go rename to internal/util/searchutil/optimizers/mock_query_hook.go diff --git a/internal/querynodev2/optimizers/query_hook.go b/internal/util/searchutil/optimizers/query_hook.go similarity index 100% rename from internal/querynodev2/optimizers/query_hook.go rename to internal/util/searchutil/optimizers/query_hook.go diff --git a/internal/querynodev2/optimizers/query_hook_test.go b/internal/util/searchutil/optimizers/query_hook_test.go similarity index 100% rename from internal/querynodev2/optimizers/query_hook_test.go rename to internal/util/searchutil/optimizers/query_hook_test.go diff --git a/internal/querynodev2/tasks/concurrent_safe_scheduler.go b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go similarity index 99% rename from internal/querynodev2/tasks/concurrent_safe_scheduler.go rename to internal/util/searchutil/scheduler/concurrent_safe_scheduler.go index 045420736146d..7d04fe7c18d13 100644 --- a/internal/querynodev2/tasks/concurrent_safe_scheduler.go +++ b/internal/util/searchutil/scheduler/concurrent_safe_scheduler.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/concurrent_safe_scheduler_test.go b/internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go similarity index 99% rename from internal/querynodev2/tasks/concurrent_safe_scheduler_test.go rename to internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go index 69064f18fa8b6..4017ff88f8cc5 100644 --- a/internal/querynodev2/tasks/concurrent_safe_scheduler_test.go +++ b/internal/util/searchutil/scheduler/concurrent_safe_scheduler_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "context" diff --git a/internal/querynodev2/tasks/fifo_policy.go b/internal/util/searchutil/scheduler/fifo_policy.go similarity index 98% rename from internal/querynodev2/tasks/fifo_policy.go rename to internal/util/searchutil/scheduler/fifo_policy.go index 168f30b6c0929..e23b04eaf18ff 100644 --- a/internal/querynodev2/tasks/fifo_policy.go +++ b/internal/util/searchutil/scheduler/fifo_policy.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "github.com/milvus-io/milvus/pkg/util/paramtable" diff --git a/internal/querynodev2/tasks/mock_task_test.go b/internal/util/searchutil/scheduler/mock_task_test.go similarity index 99% rename from internal/querynodev2/tasks/mock_task_test.go rename to internal/util/searchutil/scheduler/mock_task_test.go index 4705e84ba4e64..9f334cb2426c9 100644 --- a/internal/querynodev2/tasks/mock_task_test.go +++ b/internal/util/searchutil/scheduler/mock_task_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "context" diff --git a/internal/querynodev2/tasks/policy_test.go b/internal/util/searchutil/scheduler/policy_test.go similarity index 99% rename from internal/querynodev2/tasks/policy_test.go rename to internal/util/searchutil/scheduler/policy_test.go index 03ce1a811f042..c88b9972b8631 100644 --- a/internal/querynodev2/tasks/policy_test.go +++ b/internal/util/searchutil/scheduler/policy_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/queues.go b/internal/util/searchutil/scheduler/queues.go similarity index 99% rename from internal/querynodev2/tasks/queues.go rename to internal/util/searchutil/scheduler/queues.go index 3582d6ce1a519..7d0828c96a675 100644 --- a/internal/querynodev2/tasks/queues.go +++ b/internal/util/searchutil/scheduler/queues.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "container/ring" diff --git a/internal/querynodev2/tasks/queues_test.go b/internal/util/searchutil/scheduler/queues_test.go similarity index 99% rename from internal/querynodev2/tasks/queues_test.go rename to internal/util/searchutil/scheduler/queues_test.go index ead133a2a1a44..79441d1f61525 100644 --- a/internal/querynodev2/tasks/queues_test.go +++ b/internal/util/searchutil/scheduler/queues_test.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt" diff --git a/internal/querynodev2/tasks/tasks.go b/internal/util/searchutil/scheduler/tasks.go similarity index 99% rename from internal/querynodev2/tasks/tasks.go rename to internal/util/searchutil/scheduler/tasks.go index 7606642d12012..cadb78472965e 100644 --- a/internal/querynodev2/tasks/tasks.go +++ b/internal/util/searchutil/scheduler/tasks.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import "github.com/milvus-io/milvus/internal/proto/internalpb" diff --git a/internal/querynodev2/tasks/user_task_polling_policy.go b/internal/util/searchutil/scheduler/user_task_polling_policy.go similarity index 99% rename from internal/querynodev2/tasks/user_task_polling_policy.go rename to internal/util/searchutil/scheduler/user_task_polling_policy.go index 8ea88659a458d..e09d6c0c58586 100644 --- a/internal/querynodev2/tasks/user_task_polling_policy.go +++ b/internal/util/searchutil/scheduler/user_task_polling_policy.go @@ -1,4 +1,4 @@ -package tasks +package scheduler import ( "fmt"