Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] PoC for performance improvements using dedicated conditions for resource and scope #36497

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions processor/transformprocessor/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (c *ContextID) UnmarshalText(text []byte) error {
}

type ContextStatements struct {
Context ContextID `mapstructure:"context"`
Conditions []string `mapstructure:"conditions"`
Statements []string `mapstructure:"statements"`
Context ContextID `mapstructure:"context"`
Conditions []string `mapstructure:"conditions"`
ResourceConditions []string `mapstructure:"resource_conditions"`
ScopeConditions []string `mapstructure:"scope_conditions"`
Statements []string `mapstructure:"statements"`
}
44 changes: 40 additions & 4 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ var _ consumer.Logs = &logStatements{}

type logStatements struct {
ottl.StatementSequence[ottllog.TransformContext]
expr.BoolExpr[ottllog.TransformContext]
LogRecordGlobalExpr expr.BoolExpr[ottllog.TransformContext]
baseGlobalExpressions
}

func (l logStatements) Capabilities() consumer.Capabilities {
Expand All @@ -34,12 +35,32 @@ func (l logStatements) Capabilities() consumer.Capabilities {
func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)

resourceCtx := ottlresource.NewTransformContext(rlogs.Resource(), plog.NewResourceLogs())
rCondition, err := l.ResourceGlobalExpr.Eval(ctx, resourceCtx)
if err != nil {
return err
}
if !rCondition {
continue
}

for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)

scopeCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource(), plog.NewScopeLogs())
sCondition, err := l.ScopeGlobalExpr.Eval(ctx, scopeCtx)
if err != nil {
return err
}
if !sCondition {
continue
}

logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs)
condition, err := l.BoolExpr.Eval(ctx, tCtx)
condition, err := l.LogRecordGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -114,12 +135,27 @@ func (pc LogParserCollection) ParseContextStatements(contextStatements ContextSt
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs())
globalExpr, errGlobalBoolExpr := parseGlobalExpr(
filterottl.NewBoolExprForLog,
contextStatements.Conditions,
pc.parserCollection,
filterottl.StandardLogFuncs(),
)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}

bge, err := pc.parseCommonGlobalExpressions(contextStatements)
if err != nil {
return nil, err
}

lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode))
return logStatements{lStatements, globalExpr}, nil
return logStatements{
StatementSequence: lStatements,
LogRecordGlobalExpr: globalExpr,
baseGlobalExpressions: *bge,
}, nil
default:
statements, err := pc.parseCommonContextStatements(contextStatements)
if err != nil {
Expand Down
107 changes: 107 additions & 0 deletions processor/transformprocessor/internal/common/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package common

import (
"context"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"testing"
)

func Benchmark_ConsumeLogs(b *testing.B) {
statementsMatchingResourceCondition := ContextStatements{
Context: Log,
ResourceConditions: []string{
`attributes["foo"] == "bar"`,
},
Statements: []string{
`set(attributes["namespace"], "test") where body == "something"`,
},
}

statementsMatchingResourceConditionInWhereClause := ContextStatements{
Context: Log,
Statements: []string{
`set(attributes["namespace"], "test") where resource.attributes["foo"] == "bar" and body == "something"`,
},
}

statementsNonMatchingResourceCondition := ContextStatements{
Context: Log,
ResourceConditions: []string{
`attributes["foo"] == "baz"`,
},
Statements: []string{
`set(attributes["namespace"], "test") where body == "something"`,
},
}

statementsNonMatchingResourceConditionInWhereClause := ContextStatements{
Context: Log,
Statements: []string{
`set(attributes["namespace"], "test") where resource.attributes["foo"] == "baz" and body == "something"`,
},
}

nLogRecords := 1000

b.Run(
"matching condition in resource condition",
func(b *testing.B) {
benchmarkConsumeLogs(b, statementsMatchingResourceCondition, nLogRecords)
},
)
b.Run(
"matching resource condition in where clause",
func(b *testing.B) {
benchmarkConsumeLogs(b, statementsMatchingResourceConditionInWhereClause, nLogRecords)
},
)
b.Run(
"non matching resource condition",
func(b *testing.B) {
benchmarkConsumeLogs(b, statementsNonMatchingResourceCondition, nLogRecords)
},
)
b.Run(
"non matching resource condition in where clause",
func(b *testing.B) {
benchmarkConsumeLogs(b, statementsNonMatchingResourceConditionInWhereClause, nLogRecords)
},
)
}

func benchmarkConsumeLogs(b *testing.B, statements ContextStatements, nLogRecords int) {
pc, err := NewLogParserCollection(
componenttest.NewNopTelemetrySettings(),
WithLogParser(ottlfuncs.StandardFuncs[ottllog.TransformContext]()),
)
require.NoError(b, err)

contextStatements, err := pc.ParseContextStatements(statements)
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {
err = contextStatements.ConsumeLogs(context.Background(), generateLogs(nLogRecords))
}
require.NoError(b, err)
}

func generateLogs(n int) plog.Logs {
logs := plog.NewLogs()

rLogs := logs.ResourceLogs().AppendEmpty()
rLogs.Resource().Attributes().PutStr("foo", "bar")
sLogs := rLogs.ScopeLogs().AppendEmpty()

for i := 0; i < n; i++ {
logRecord := sLogs.LogRecords().AppendEmpty()
logRecord.Body().SetStr("something")
logRecord.Attributes().PutStr("foo", "bar")
}
return logs
}
87 changes: 77 additions & 10 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var _ consumer.Metrics = &metricStatements{}

type metricStatements struct {
ottl.StatementSequence[ottlmetric.TransformContext]
expr.BoolExpr[ottlmetric.TransformContext]
MetricGlobalExpr expr.BoolExpr[ottlmetric.TransformContext]
baseGlobalExpressions
}

func (m metricStatements) Capabilities() consumer.Capabilities {
Expand All @@ -36,12 +37,31 @@ func (m metricStatements) Capabilities() consumer.Capabilities {
func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rmetrics := md.ResourceMetrics().At(i)

resourceCtx := ottlresource.NewTransformContext(rmetrics.Resource(), pmetric.NewResourceMetrics())
rCondition, err := m.ResourceGlobalExpr.Eval(ctx, resourceCtx)
if err != nil {
return err
}
if !rCondition {
continue
}

for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ {
smetrics := rmetrics.ScopeMetrics().At(j)

scopeCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource(), pmetric.NewScopeMetrics())
sCondition, err := m.ScopeGlobalExpr.Eval(ctx, scopeCtx)
if err != nil {
return err
}
if !sCondition {
continue
}
metrics := smetrics.Metrics()
for k := 0; k < metrics.Len(); k++ {
tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics)
condition, err := m.BoolExpr.Eval(ctx, tCtx)
condition, err := m.MetricGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand All @@ -61,7 +81,8 @@ var _ consumer.Metrics = &dataPointStatements{}

type dataPointStatements struct {
ottl.StatementSequence[ottldatapoint.TransformContext]
expr.BoolExpr[ottldatapoint.TransformContext]
MetricGlobalExpr expr.BoolExpr[ottldatapoint.TransformContext]
baseGlobalExpressions
}

func (d dataPointStatements) Capabilities() consumer.Capabilities {
Expand All @@ -73,8 +94,28 @@ func (d dataPointStatements) Capabilities() consumer.Capabilities {
func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rmetrics := md.ResourceMetrics().At(i)

resourceCtx := ottlresource.NewTransformContext(rmetrics.Resource(), pmetric.NewResourceMetrics())
rCondition, err := d.ResourceGlobalExpr.Eval(ctx, resourceCtx)
if err != nil {
return err
}
if !rCondition {
continue
}

for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ {
smetrics := rmetrics.ScopeMetrics().At(j)

scopeCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource(), pmetric.NewScopeMetrics())
sCondition, err := d.ScopeGlobalExpr.Eval(ctx, scopeCtx)
if err != nil {
return err
}
if !sCondition {
continue
}

metrics := smetrics.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
Expand Down Expand Up @@ -104,7 +145,7 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr
func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error {
for i := 0; i < dps.Len(); i++ {
tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics)
condition, err := d.BoolExpr.Eval(ctx, tCtx)
condition, err := d.MetricGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand All @@ -121,7 +162,7 @@ func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pme
func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error {
for i := 0; i < dps.Len(); i++ {
tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics)
condition, err := d.BoolExpr.Eval(ctx, tCtx)
condition, err := d.MetricGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand All @@ -138,7 +179,7 @@ func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps
func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error {
for i := 0; i < dps.Len(); i++ {
tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics)
condition, err := d.BoolExpr.Eval(ctx, tCtx)
condition, err := d.MetricGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand All @@ -155,7 +196,7 @@ func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Con
func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error {
for i := 0; i < dps.Len(); i++ {
tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics)
condition, err := d.BoolExpr.Eval(ctx, tCtx)
condition, err := d.MetricGlobalExpr.Eval(ctx, tCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -240,12 +281,28 @@ func (pc MetricParserCollection) ParseContextStatements(contextStatements Contex
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.parserCollection, filterottl.StandardMetricFuncs())

globalExpr, errGlobalBoolExpr := parseGlobalExpr(
filterottl.NewBoolExprForMetric,
contextStatements.Conditions,
pc.parserCollection,
filterottl.StandardMetricFuncs(),
)
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}

bge, err := pc.parseCommonGlobalExpressions(contextStatements)
if err != nil {
return nil, err
}

mStatements := ottlmetric.NewStatementSequence(parseStatements, pc.settings, ottlmetric.WithStatementSequenceErrorMode(pc.errorMode))
return metricStatements{mStatements, globalExpr}, nil
return metricStatements{
StatementSequence: mStatements,
MetricGlobalExpr: globalExpr,
baseGlobalExpressions: *bge,
}, nil
case DataPoint:
parsedStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements)
if err != nil {
Expand All @@ -255,8 +312,18 @@ func (pc MetricParserCollection) ParseContextStatements(contextStatements Contex
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}

bge, err := pc.parseCommonGlobalExpressions(contextStatements)
if err != nil {
return nil, err
}

dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.settings, ottldatapoint.WithStatementSequenceErrorMode(pc.errorMode))
return dataPointStatements{dpStatements, globalExpr}, nil
return dataPointStatements{
StatementSequence: dpStatements,
MetricGlobalExpr: globalExpr,
baseGlobalExpressions: *bge,
}, nil
default:
statements, err := pc.parseCommonContextStatements(contextStatements)
if err != nil {
Expand Down
Loading
Loading