Skip to content

Commit

Permalink
describe endpoint: source inferrence
Browse files Browse the repository at this point in the history
Update the describe endpoint to include whether or not the source of a
query was inferred.
  • Loading branch information
mattnibs committed Jun 10, 2024
1 parent 4b7dda7 commit 06375a7
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 62 deletions.
26 changes: 0 additions & 26 deletions compiler/describe.go

This file was deleted.

67 changes: 44 additions & 23 deletions compiler/describe/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/compiler/semantic"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/segmentio/ksuid"
Expand All @@ -28,13 +31,15 @@ type (
Meta string `json:"meta"`
}
Pool struct {
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
Kind string `json:"kind"`
Name string `json:"name"`
ID ksuid.KSUID `json:"id"`
Inferred bool `json:"inferred"`
}
Path struct {
Kind string `json:"kind"`
URI string `json:"uri"`
Kind string `json:"kind"`
URI string `json:"uri"`
Inferred bool `json:"inferred"`
}
)

Expand All @@ -47,17 +52,32 @@ type Channel struct {
Sort *order.SortKey `json:"sort"`
}

func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) {
// func Describe(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) {
func Analyze(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*Info, error) {
seq, sset, err := parser.ParseZed(nil, query)
if err != nil {
return nil, err
}
entry, err := semantic.Analyze(ctx, seq, src, head)
if err != nil {
if list, ok := err.(parser.ErrorList); ok {
list.SetSourceSet(sset)
}
return nil, err
}
srcInferred := !semantic.HasSource(entry)
if err = semantic.AddDefaultSource(ctx, &entry, src, head); err != nil {
return nil, err
}
var info Info
var err error
if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil {
if info.Sources, err = describeSources(ctx, src.Lake(), entry[0], srcInferred); err != nil {
return nil, err
}
sortKeys, err := optimizer.New(ctx, source).SortKeys(seq)
sortKeys, err := optimizer.New(ctx, src).SortKeys(entry)
if err != nil {
return nil, err
}
aggKeys := describeAggs(seq, []field.List{nil})
aggKeys := describeAggs(entry, []field.List{nil})
for i := range sortKeys {
// Convert SortKey to a pointer so a nil sort is encoded as null for
// JSON/ZSON.
Expand All @@ -73,48 +93,49 @@ func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, erro
return &info, nil
}

func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) {
func describeSources(ctx context.Context, lk *lake.Root, o dag.Op, inferred bool) ([]Source, error) {
switch o := o.(type) {
case *dag.Fork:
var s []Source
for _, p := range o.Paths {
out, err := describeSources(ctx, lk, p[0])
out, err := describeSources(ctx, lk, p[0], inferred)
if err != nil {
return nil, err
}
s = append(s, out...)
}
return s, nil
case *dag.DefaultScan:
return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil
return []Source{&Path{Kind: "Path", URI: "stdio://stdin", Inferred: inferred}}, nil
case *dag.FileScan:
return []Source{&Path{Kind: "Path", URI: o.Path}}, nil
return []Source{&Path{Kind: "Path", URI: o.Path, Inferred: inferred}}, nil
case *dag.HTTPScan:
return []Source{&Path{Kind: "Path", URI: o.URL}}, nil
return []Source{&Path{Kind: "Path", URI: o.URL, Inferred: inferred}}, nil
case *dag.PoolScan:
return sourceOfPool(ctx, lk, o.ID)
return sourceOfPool(ctx, lk, o.ID, inferred)
case *dag.Lister:
return sourceOfPool(ctx, lk, o.Pool)
return sourceOfPool(ctx, lk, o.Pool, inferred)
case *dag.SeqScan:
return sourceOfPool(ctx, lk, o.Pool)
return sourceOfPool(ctx, lk, o.Pool, inferred)
case *dag.CommitMetaScan:
return sourceOfPool(ctx, lk, o.Pool)
return sourceOfPool(ctx, lk, o.Pool, inferred)
case *dag.LakeMetaScan:
return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil
default:
return nil, fmt.Errorf("unsupported source type %T", o)
}
}

func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) {
func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID, inferred bool) ([]Source, error) {
p, err := lk.OpenPool(ctx, id)
if err != nil {
return nil, err
}
return []Source{&Pool{
Kind: "Pool",
ID: id,
Name: p.Name,
Kind: "Pool",
ID: id,
Name: p.Name,
Inferred: inferred,
}}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func AnalyzeAddSource(ctx context.Context, seq ast.Seq, source *data.Source, hea
return nil, a.errors
}
if !HasSource(s) {
if err := a.addDefaultSource(&s); err != nil {
if err := AddDefaultSource(ctx, &s, source, head); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -73,17 +73,17 @@ func HasSource(seq dag.Seq) bool {
return false
}

func (a *analyzer) addDefaultSource(seq *dag.Seq) error {
func AddDefaultSource(ctx context.Context, seq *dag.Seq, source *data.Source, head *lakeparse.Commitish) error {
if HasSource(*seq) {
return nil
}
// No from so add a source.
if a.head == nil {
if head == nil {
seq.Prepend(&dag.DefaultScan{Kind: "DefaultScan"})
return nil
}
// Verify pool exists for HEAD
if _, err := a.source.PoolID(a.ctx, a.head.Pool); err != nil {
if _, err := source.PoolID(ctx, head.Pool); err != nil {
return err
}
pool := &ast.Pool{
Expand All @@ -95,7 +95,7 @@ func (a *analyzer) addDefaultSource(seq *dag.Seq) error {
},
},
}
ops := a.semPool(pool)
ops := newAnalyzer(ctx, source, head).semPool(pool)
seq.Prepend(ops[0])
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/brimdata/zed/api/queryio"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/describe"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/compiler/parser"
"github.com/brimdata/zed/lake"
Expand Down Expand Up @@ -178,7 +179,7 @@ func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) {
return
}
src := data.NewSource(storage.NewRemoteEngine(), c.root)
info, err := compiler.Describe(r.Context(), req.Query, src, &req.Head)
info, err := describe.Analyze(r.Context(), req.Query, src, &req.Head)
if err != nil {
w.Error(srverr.ErrInvalid(err))
return
Expand Down
21 changes: 14 additions & 7 deletions service/ztests/query-describe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ outputs:
{
"kind": "Pool",
"name": "test1",
"id": "XXX"
"id": "XXX",
"inferred": false
},
{
"kind": "Pool",
"name": "test2",
"id": "XXX"
"id": "XXX",
"inferred": false
}
],
"channels": [
Expand All @@ -70,7 +72,8 @@ outputs:
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
"id": "XXX",
"inferred": true
},
"channels": [
{
Expand All @@ -91,7 +94,8 @@ outputs:
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
"id": "XXX",
"inferred": true
},
"channels": [
{
Expand All @@ -106,12 +110,14 @@ outputs:
{
"kind": "Pool",
"name": "test1",
"id": "XXX"
"id": "XXX",
"inferred": false
},
{
"kind": "Pool",
"name": "test2",
"id": "XXX"
"id": "XXX",
"inferred": false
}
],
"channels": [
Expand Down Expand Up @@ -141,7 +147,8 @@ outputs:
"sources": {
"kind": "Pool",
"name": "test1",
"id": "XXX"
"id": "XXX",
"inferred": true
},
"channels": [
{
Expand Down

0 comments on commit 06375a7

Please sign in to comment.