Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed the validation check for streaming features if accuracy is specified as snapshot #876

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/py/ai/chronon/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ def validate_group_by(group_by: ttypes.GroupBy):
assert query.timeColumn is not None, (
"please specify query.timeColumn for non-snapshot accurate " "group by with event source"
)
else:
assert not utils.is_streaming(src), "SNAPSHOT accuracy should not be specified for streaming sources"
else:
if contains_windowed_aggregation(aggregations):
assert query.timeColumn, "Please specify timeColumn for entity source with windowed aggregations"
Expand Down
22 changes: 20 additions & 2 deletions api/py/test/test_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pytest, json

from ai.chronon import group_by, query
from ai.chronon.group_by import GroupBy, TimeUnit, Window, Aggregation
from ai.chronon.group_by import GroupBy, TimeUnit, Window, Aggregation, Accuracy
from ai.chronon.api import ttypes
from ai.chronon.api.ttypes import EventSource, EntitySource, Operation

Expand All @@ -41,12 +41,13 @@ def hours_unit():
return ttypes.TimeUnit.HOURS


def event_source(table):
def event_source(table, topic=None):
"""
Sample left join
"""
return ttypes.EventSource(
table=table,
topic=topic,
query=ttypes.Query(
startPartition="2020-04-09",
selects={
Expand Down Expand Up @@ -174,6 +175,23 @@ def test_validator_ok():
aggregations=None,
)

def test_validator_accuracy():
with pytest.raises(AssertionError, match="SNAPSHOT accuracy should not be specified for streaming sources"):
gb = group_by.GroupBy(
sources=event_source("table", "topic"),
keys=["subject"],
aggregations=group_by.Aggregations(
random=ttypes.Aggregation(inputColumn="event_id", operation=ttypes.Operation.SUM),
event_id=ttypes.Aggregation(operation=ttypes.Operation.LAST),
cnt=ttypes.Aggregation(operation=ttypes.Operation.COUNT),
percentile=group_by.Aggregation(
input_column="event_id", operation=group_by.Operation.APPROX_PERCENTILE([0.5, 0.75])
),
),
accuracy=Accuracy.SNAPSHOT,
)
assert all([agg.inputColumn for agg in gb.aggregations if agg.operation != ttypes.Operation.COUNT])
group_by.validate_group_by(gb)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add some new lines in between methods.


def test_generic_collector():
aggregation = group_by.Aggregation(
Expand Down