Skip to content

Commit

Permalink
Fix vector summarize partials-in handling (#5447)
Browse files Browse the repository at this point in the history
vam/op/summarize.Summarize doesn't know how to extract partials values
from an input vector when operating in partials-in mode.  Add missing
wiring for that.  (The new vam/op/summarize.Summarize.aggExprs field is
analogous to the existing sam/op/groupby.Aggregator.aggRefs field.)
  • Loading branch information
nwt authored Nov 7, 2024
1 parent 5ef24a4 commit 71dd82e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
8 changes: 7 additions & 1 deletion compiler/kernel/vop.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,15 @@ func (b *Builder) compileVamSeq(seq dag.Seq, parents []vector.Puller) ([]vector.
func (b *Builder) compileVamSummarize(s *dag.Summarize, parent vector.Puller) (vector.Puller, error) {
// compile aggs
var aggNames []field.Path
var aggExprs []vamexpr.Evaluator
var aggs []*vamexpr.Aggregator
for _, assignment := range s.Aggs {
aggNames = append(aggNames, assignment.LHS.(*dag.This).Path)
lhs, err := b.compileVamExpr(assignment.LHS)
if err != nil {
return nil, err
}
aggExprs = append(aggExprs, lhs)
agg, err := b.compileVamAgg(assignment.RHS.(*dag.Agg))
if err != nil {
return nil, err
Expand All @@ -265,7 +271,7 @@ func (b *Builder) compileVamSummarize(s *dag.Summarize, parent vector.Puller) (v
keyNames = append(keyNames, lhs.Path)
keyExprs = append(keyExprs, rhs)
}
return summarize.New(parent, b.zctx(), aggNames, aggs, keyNames, keyExprs, s.PartialsIn, s.PartialsOut)
return summarize.New(parent, b.zctx(), aggNames, aggExprs, aggs, keyNames, keyExprs, s.PartialsIn, s.PartialsOut)
}

func (b *Builder) compileVamAgg(agg *dag.Agg) (*vamexpr.Aggregator, error) {
Expand Down
18 changes: 12 additions & 6 deletions runtime/vam/op/summarize/summarize.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ type Summarize struct {
// XX Abstract this runtime into a generic table computation.
// Then the generic interface can execute fast paths for simple scenarios.
aggs []*expr.Aggregator
aggNames field.List
aggExprs []expr.Evaluator
keyExprs []expr.Evaluator
keyNames field.List
typeTable *super.TypeVectorTable
builder *vector.RecordBuilder
partialsIn bool
Expand All @@ -26,14 +25,15 @@ type Summarize struct {
results []aggTable
}

func New(parent vector.Puller, zctx *super.Context, aggPaths field.List, aggs []*expr.Aggregator, keyNames []field.Path, keyExprs []expr.Evaluator, partialsIn, partialsOut bool) (*Summarize, error) {
builder, err := vector.NewRecordBuilder(zctx, append(keyNames, aggPaths...))
func New(parent vector.Puller, zctx *super.Context, aggNames []field.Path, aggExprs []expr.Evaluator, aggs []*expr.Aggregator, keyNames []field.Path, keyExprs []expr.Evaluator, partialsIn, partialsOut bool) (*Summarize, error) {
builder, err := vector.NewRecordBuilder(zctx, append(keyNames, aggNames...))
if err != nil {
return nil, err
}
return &Summarize{
parent: parent,
aggs: aggs,
aggExprs: aggExprs,
keyExprs: keyExprs,
tables: make(map[int]aggTable),
typeTable: super.NewTypeVectorTable(),
Expand Down Expand Up @@ -69,8 +69,14 @@ func (s *Summarize) Pull(done bool) (vector.Any, error) {
for _, e := range s.keyExprs {
keys = append(keys, e.Eval(vec))
}
for _, e := range s.aggs {
vals = append(vals, e.Eval(vec))
if s.partialsIn {
for _, e := range s.aggExprs {
vals = append(vals, e.Eval(vec))
}
} else {
for _, e := range s.aggs {
vals = append(vals, e.Eval(vec))
}
}
vector.Apply(false, func(args ...vector.Any) vector.Any {
s.consume(args[:len(keys)], args[len(keys):])
Expand Down

0 comments on commit 71dd82e

Please sign in to comment.