Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vam: Fix summarize where #5503

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions runtime/vam/expr/agg/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@ type count struct {
}

func (a *count) Consume(vec vector.Any) {
if c, ok := vec.(*vector.Const); ok {
val := c.Value()
if !val.IsNull() && !val.IsError() {
a.count += uint64(vec.Len())
}
if c, ok := vec.(*vector.Const); ok && c.Value().IsNull() {
return
}
if _, ok := vector.Under(vec).Type().(*super.TypeError); ok {
return
}
nulls := vector.NullsOf(vec)
if nulls == nil {
a.count += uint64(vec.Len())
return
}
for i := range vec.Len() {
if !nulls.Value(i) {
a.count++
if nulls := vector.NullsOf(vec); nulls != nil {
for i := range vec.Len() {
if !nulls.Value(i) {
a.count++
}
}
} else {
a.count += uint64(vec.Len())
}
}

Expand Down
22 changes: 13 additions & 9 deletions runtime/vam/expr/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ func (a *Aggregator) Eval(this vector.Any) vector.Any {

func (a *Aggregator) apply(args ...vector.Any) vector.Any {
vec, where := args[0], args[1]
var tags []uint32
// If type is not bool then we want to filter everything.
if where.Type().ID() == super.IDBool {
for slot := uint32(0); slot < where.Len(); slot++ {
// XXX Feels like we should have a optimzed version of this.
if vector.BoolValue(where, slot) {
tags = append(tags, slot)
}
bools, _ := BoolMask(where)
if bools.IsEmpty() {
// everything is filtered.
return vector.NewConst(super.NewValue(vec.Type(), nil), vec.Len(), nil)
}
bools.Flip(0, uint64(vec.Len()))
if !bools.IsEmpty() {
nulls := vector.NewBoolEmpty(vec.Len(), nil)
bools.WriteDenseTo(nulls.Bits)
if origNulls := vector.NullsOf(vec); origNulls != nil {
nulls = vector.Or(nulls, origNulls)
}
vector.SetNulls(vec, nulls)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying a vector that might be shared (as I think vec is since it arrived here via args) is unsafe since other branches of the flowgraph (and thus other goroutines) may also have a reference to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point no sure what to do here

}
return vector.NewView(vec, tags)
return vec
}
17 changes: 17 additions & 0 deletions runtime/ztests/op/summarize/count-where.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
zed: |
summarize
num_requests := count()
where log_time >= 2012-10-01T00:00:00Z
by client_ip
| sort client_ip
vector: true

input: |
{log_time:2012-01-01T00:00:44Z,client_ip:249.92.17.134}
{log_time:2012-10-01T00:24:30Z,client_ip:249.92.17.134}
{log_time:2012-05-12T10:23:22Z,client_ip:251.58.48.137}
output: |
{client_ip:249.92.17.134,num_requests:1(uint64)}
{client_ip:251.58.48.137,num_requests:0(uint64)}