Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor checks or custom task for state change detection #2969

Closed
invernizzie opened this issue Jul 7, 2020 · 8 comments
Closed

Monitor checks or custom task for state change detection #2969

invernizzie opened this issue Jul 7, 2020 · 8 comments
Assignees

Comments

@invernizzie
Copy link

invernizzie commented Jul 7, 2020

I am migrating fault detection scripts from Kapacitor and I am running into multiple issues.

My use case is to feed events from fault detection into a stream processing system that will create alerts that are presented in a web UI and trigger notifications to end users through different channels. This is an application feature, not a systems monitoring use case.

Using monitor checks

My main problem is to split events into different buckets per environment. The built-in monitor checks only write to the _monitoring.statuses measurement, so I find the need to fan the records out. For example, for this simple CPU usage check:

import "influxdata/influxdb/monitor"

// Since monitor writes to the _monitoring.statuses measurement, this script can be used to create
// multiple tasks that will add tags by changing the outputTags variable
outputTags = {"env": "staging"}

last_cpu_usage = from(bucket: "input")
    |> range(start -1d)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "total")
    |> group(columns: ["node_id"])
    |> monitor.check(
        crit: (r) => r._value > 85.0,
        ok: (r) => r._value <= 85.0,
        data: {
            _check_id: "cpu_too_high",
            tags: outputTags
        }
    )

I will need to create multiple alerts, one per environment (which means I'll have to update them all in the event of requirement changes). And to land the records in the right measurement I need one helper task like the one below also per environment:

env = "staging"
windowSize = 5m
option task = {
    name: env + "-fault_detection_fan_out",
    every: windowSize,
    offset: 1m,
    concurrency: 1,
    retry: 5
}

from(bucket: "_monitoring")
    |> range(start: -windowSize)
    |> filter(fn: (r) => r._measurement == "status" and r.env = env)
    |> to(bucket: env + "-fault_detection", orgID: "987654321", fieldFn: (r) => ({
        r with
        // can _check_id be removed from r?
        fault_type: r._check_id
    }))

(BTW this is an example check, and not related to my data or the conditions I'm trying to detect.)

Environment separation if important for access control, among other things, so we're not willing to compromise on this. Is this the better way around this problem using monitoring checks?

Additionally, I would like to further customize the data that gets written to the destination measurement. As you can see in the task code I'd like to rename _check_id to fault_type, and probably remove other fields that are irrelevant. Is this possible?

Lastly, the way monitor.check allows for the data object to be defined seems rigid. For some faults, I need to keep columns from the input measurement itself (e.g. cpu.user and cpu.system) as snapshots of the record that triggered the check state change. Is there any way to achieve this?

Using a custom task

Because of the problems outlined above, using a custom Flux task seems like a better approach in principle. However, I'm facing an issue that I have been unable to overcome: alert state initialization.

If the following Flux task is created, it will never write anything to the output measurement. This is because the join between the processed input and the output measurement is an inner join, and there is no initial state in the output measurement. Note that in this example, as in my actual use case, new nodes can be created in the system thus creating data with a new tag value for node_id—it's not possible to just initialize state upfront.

// Given an expression that can be applied to a record to obtain a state, and a current state for
// a group key, store state changes detected by comparing the two.

env = "staging"
windowSize = 30m
option task = {
    name: env + "-high_cpu_detector",
    every: 1m,
    offset: 1m,
    concurrency: 1,
    retry: 5
}

last_cpu_usage = from(bucket: "input")
    |> range(start -windowSize)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "total")
    |> group(columns: ["node_id"])
    |> last()

detected_faults = from(bucket: "fault_detection")
    |> range(start -30d)
    |> filter(fn: (r) => r._measurement == "cpu_events" and r.fault_type == "cpu_too_high")
    |> group(columns: ["node_id"])
    |> last()

detectCpuCrossed = (tables=<-) =>
    tables
    |> map(fn: (r) => ({
        r with
            level: if r._value_cpu_usage > 85.0 then "error" else "ok"
    }))
    |> filter(fn: (r) => r.level != r._value_fault_state)

join(
    tables: {cpu_usage: last_cpu_usage, fault_state: detected_faults},
    on: ["fault_type", "node_id"])
    |> detectCpuCrossed()
    |> to(bucket: "fault_detection", orgID: "987654321", fieldFn: (r) => ({
        _measurement: "cpu_events",
        fault_type: r.fault_type,
        total_cpu: r._value_cpu_usage,
        level: r.level,
    }))

Is there a known solution to this problem? There's a proposal for time interpolation (#2428) that may help make the join outer and allowing to detect a lack of an initial state, but there's no response to the proposal.

Other issues

As you can see, in both approaches it would be useful to have parameterized tasks to avoid creating multiple tasks with the same code but different arguments, such as env in this case. This could be achieved with a field for custom/extra values in the task options.

@rickspencer3
Copy link
Contributor

@nathanielc @aanthony1243 could one of you take a look and see if you can help with this query?

@aanthony1243
Copy link
Contributor

Hi! There's a few tricks you can use. First, you'll want to use csv.from() to create a default table from a string literal. The easiest way to do this would be to query a non-empty table with the same or similar schema, copy the CSV from the query result and then modify the values you need to change.

so you can do something like:

defaultTable = csv.from(csv: " #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double #group,false,false,false,false,false,false,false,false #default,,,,,,,, ,result,table,_start,_stop,_time,region,host,_value ,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,east,A,15.43 ,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:20Z,east,B,59.25 ,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:40Z,east,C,52.62 ")

next, you want to detect whether last_cpu_usage has any rows and switch to the default if not.

countRow = last_cpu_usage
   |> count() 
   |> countValue = findColumn(
      fn: (key) => true
      column: "_value"
    )
hasRows = countRow[0] > 0

finally based on hasRows assign the table from the DB or the default:

last_cpu_table = if hasRows then last_cpu_usage else defaultTable

@invernizzie
Copy link
Author

invernizzie commented Jul 8, 2020

Hi @aanthony1243, thanks for your tips.

My problem with the CSV approach, however, is that I don't know what nodes will come around in the future as they are provisioned. The node_id is a tag in my case, that will get new values as nodes come online.

To clarify why I need this initial state, the measurement I need to have records for the join to work is detected_faults, not last_cpu_usage. It seems to me that when you do countRow[0] in your example you are evaluating only the first record from the array? I need this to work on all the resulting groups (note I'm grouping on node_id), it's unclear to me if it will.

My running thought is that I could still use csv.from without a node_id tag and then use a variable where I extract the node_id tag values from last_cpu_usage to enrich it. But I'm unsure as to how to do it, this is my take (look out for XXX comment):

last_cpu_usage = from(bucket: "input")
    |> range(start -windowSize)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "total")
    |> group(columns: ["node_id"])
    |> last()

detected_faults = from(bucket: "fault_detection")
    |> range(start -30d)
    |> filter(fn: (r) => r._measurement == "cpu_events" and r.fault_type == "cpu_too_high")
    |> group(columns: ["node_id"])
    |> last()

row_count = detected_faults
   |> count() 
   |> countValue = findColumn(
      fn: (key) => true
      column: "_value"
    )

default_faults = csv.from(csv: "
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double,string
#group,false,false,false,false,false,false,false,false
#default,,,,,,,,
,result,table,_start,_stop,_time,fault_type,total_cpu,_value
,mean,0,2018-05-08T20:50:00Z,2018-05-08T20:51:00Z,2018-05-08T20:50:00Z,cpu_too_high,0.0,ok ")
// XXX here I'd like to set the node_id tag based on values in last_cpu_usage but I don't know how to do a "cartesian product" with the tags values from that table stream
// |> setTag("node_id", last_cpu_usage)

faults_table = if row_count[0] > 0 then detected_faults else default_faults

// ... logic continues and uses faults_table for join

To exemplify, if a given run of this task finds the values C, D and E for node_id in the last_cpu_usage query, the resulting records in default_faults I expect are (in line protocol):

default_faults node_id=C,fault_type=cpu_too_high level=ok,total_cpu=0.0 1525812600000000000
default_faults node_id=D,fault_type=cpu_too_high level=ok,total_cpu=0.0 1525812600000000000
default_faults node_id=E,fault_type=cpu_too_high level=ok,total_cpu=0.0 1525812600000000000

Let me know if my question needs clarification.

@wolffcm
Copy link

wolffcm commented Jul 10, 2020

Hi @invernizzie thanks so much for describing your use case in detail. It's super helpful to us. We are thinking over the best way to solve your problem, and will respond as soon as we have a concrete recommendation.

@wolffcm
Copy link

wolffcm commented Jul 10, 2020

@invernizzie
For the amount of customization you want to get from your monitoring, I agree that a custom task is the way to go.

I wonder if the stateChangesOnly function would get you what you want. I don't see it documented in our official docs, but it's part of the Flux monitor package. This is a function that expects tables with a _level column, and filters out those points where _level does not change. It's definition is here:
https://github.com/influxdata/flux/blob/master/stdlib/influxdata/influxdb/monitor/monitor.flux#L51-L69

Here's an example of its use:

Screen Shot 2020-07-10 at 2 50 30 PM

This example is over the usage_idle field. Anything over 92% idle (the orange line in the graph) is considered OK, while below is CRIT. Applying stateChangesOnly yields the blue line, which only has points for those points in the CPU data that cross the 92% threshold. The output of this is something like what you would write to your cpu_events measurement.

This approach should be robust to new nodes coming online. One caveat however is that you would need to query back over two checks each time your task is run in order to detect a state change.

Would something like this work for you?

@rickspencer3
Copy link
Contributor

@Anaisdg can you please take a look and see if you should consider documenting this specific use case (aside from specific documentation on stateChangesOnly() which the docs team will add)?

@Anaisdg
Copy link
Contributor

Anaisdg commented Jul 22, 2020

Hello @invernizzie here is documentation on stateChangesOnly. Does that help?

Copy link

This issue has had no recent activity and will be closed soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants