Skip to content

Commit

Permalink
Add query describe endpoint
Browse files Browse the repository at this point in the history
This commit adds the query describe endpoint- a service endpoint the
returns information about a posted query. This endpoint is meant for
internal use from within ZUI and is not meant for public consumption.
  • Loading branch information
mattnibs committed May 31, 2024
1 parent 8e69667 commit c8260ac
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 1 deletion.
26 changes: 26 additions & 0 deletions compiler/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package compiler

import (
"context"

"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/describe"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/lakeparse"
)

func Describe(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) {
seq, sset, err := Parse(query)
if err != nil {
return nil, err
}
entry, err := semantic.AnalyzeAddSource(ctx, seq, src, head)
if err != nil {
if list, ok := err.(parser.ErrorList); ok {
list.SetSourceSet(sset)
}
return nil, err
}
return describe.Analyze(ctx, src, entry)
}
156 changes: 156 additions & 0 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package describe

import (
"context"
"fmt"

"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
)

type Info struct {
Sources []Source `json:"sources"`
Channels []Channel `json:"channels"`
}

type Source interface {
Source()
}

type (
LakeMeta struct {
Kind string `json:"kind"`
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
}
)

func (*LakeMeta) Source() {}
func (*Pool) Source() {}
func (*Path) Source() {}

type Channel struct {
AggregationKeys field.List `json:"aggregation_keys"`
Sort *order.SortKey `json:"sort"`
}

func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) {
var info Info
var err error
if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, source).SortKeys(seq)
if err != nil {
return nil, err
}
aggKeys := describeAggs(seq, []field.List{nil})
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
var s *order.SortKey
if !sortKeys[i].IsNil() {
s = &sortKeys[i]
}
info.Channels = append(info.Channels, Channel{
Sort: s,
AggregationKeys: aggKeys[i],
})
}
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
switch o := o.(type) {
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0])
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
}}, nil
}

func describeAggs(seq dag.Seq, parents []field.List) []field.List {
for _, op := range seq {
parents = describeOpAggs(op, parents)
}
return parents
}

func describeOpAggs(op dag.Op, parents []field.List) []field.List {
switch op := op.(type) {
case *dag.Fork:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Scatter:
var aggs []field.List
for _, p := range op.Paths {
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
}
return aggs
case *dag.Summarize:
// The field list for aggregation with no keys is an empty slice and
// not nil.
keys := field.List{}
for _, k := range op.Keys {
keys = append(keys, k.LHS.(*dag.This).Path)
}
return []field.List{keys}
}
// If more than one parent reset to nil aggregation.
if len(parents) > 1 {
return []field.List{nil}
}
return parents
}
6 changes: 5 additions & 1 deletion compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
})
}

func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) {
return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil})
}

// propagateSortKey analyzes a Seq and attempts to push the scan order of the data source
// into the first downstream aggregation. (We could continue the analysis past that
// point but don't bother yet because we do not yet support any optimization
Expand Down Expand Up @@ -330,7 +334,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or
// We'll live this as unknown for now even though the groupby
// and not try to optimize downstream of the first groupby
// unless there is an excplicit sort encountered.
return nil, nil
return []order.SortKey{order.Nil}, nil
case *dag.Fork:
var keys []order.SortKey
for _, seq := range op.Paths {
Expand Down
1 change: 1 addition & 0 deletions service/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (c *Core) addAPIServerRoutes() {
c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE")
c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET")
c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST")
c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST")
c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET")
}

Expand Down
16 changes: 16 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/brimdata/zed/api"
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/lake"
lakeapi "github.com/brimdata/zed/lake/api"
"github.com/brimdata/zed/lake/commits"
"github.com/brimdata/zed/lake/journal"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/exec"
"github.com/brimdata/zed/runtime/sam/op"
Expand Down Expand Up @@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) {
w.Respond(http.StatusOK, ast)
}

func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) {
var req api.QueryRequest
if !r.Unmarshal(w, &req) {
return
}
src := data.NewSource(storage.NewRemoteEngine(), c.root)
info, err := compiler.Describe(r.Context(), req.Query, src, &req.Head)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
}
w.Respond(http.StatusOK, info)
}

func handleBranchGet(c *Core, w *ResponseWriter, r *Request) {
branchName, ok := r.StringFromPath(w, "branch")
if !ok {
Expand Down
Loading

0 comments on commit c8260ac

Please sign in to comment.