Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EPIC: scan function #4671

Open
rockstar opened this issue Apr 13, 2022 · 20 comments
Open

EPIC: scan function #4671

rockstar opened this issue Apr 13, 2022 · 20 comments

Comments

@rockstar
Copy link
Contributor

After talking about the potential for a new/improved stateChanges function, the working group came up with a scan function proposal that we'd like to create. scan will have the following signature:

builtin scan : (<-tables: stream[A], fn: (accumulator: B, element: A) => B, init: A) => stream[B]

This function was heavily inspired by the scanl (and family) functions from Haskell. Here's how it could be used:

import "experimental/array"

testdata = array.from(rows:
  {_time: 2021-01-01T00:00:00Z, _value: 100, state: "crit"},
  {_time: 2021-01-01T00:01:00Z, _value: 100, state: "crit"},
  {_time: 2021-01-01T00:02:00Z, _value: 80, state: "warn"},
  {_time: 2021-01-01T00:03:00Z, _value: 82, state: "warn"},
  {_time: 2021-01-01T00:04:00Z, _value: 80, state: "warn"},
  {_time: 2021-01-01T00:05:00Z, _value: 52, state: "ok"},
  {_time: 2021-01-01T00:06:00Z, _value: 50, state: "ok"},
])

testdata
  |> scan(fn: (acc, record) => ({record with stateChanged: acc.state != record.state}))

This would emit a table with a new boolean field stateChanged that is true whenever the state changes. This would mean that a stateChanges function could look something like this, using scan.

stateChanges = (<-tables, value) => {
  (if exists value then tables
    |> prepend(value: value) else tables)
    |> scan(fn: (acc, ele) => ({ele with stateChanged: acc.state != ele.state}))
}

Questions:

  • The init parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does the init look like? Does it have to be explicit? Could/should we create two functions, one that takes an init and one that doesn't?
  • How does scan affect the group key of a table? Does it work like reduce, where it carries the group key with it?
@samhld
Copy link

samhld commented Jun 7, 2022

+1

@sanderson
Copy link
Contributor

sanderson commented Oct 26, 2022

Another use case here would be detecting phases in data. For example, if a counter resets, increment the phase:

testdata = array.from(rows:
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0},
])

testdata
  |> scan(fn: (accumulator, r) => ({r with phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1 }))

The expected output would look something like:

_time _value phase
2021-01-01T00:00:00Z 0 0
2021-01-01T00:01:00Z 23 0
2021-01-01T00:02:00Z 50 0
2021-01-01T00:03:00Z 0 1
2021-01-01T00:04:00Z 18 1
2021-01-01T00:05:00Z 32 1
2021-01-01T00:06:00Z 0 2

The init parameter's use is still unclear. To specify it would mean knowing a lot more about data shape than one may know. What does the init look like? Does it have to be explicit? Could/should we create two functions, one that takes an init and one that doesn't?

The init's purpose is to act as the accumulator for the first row of each table. Without an init, there is nothing to compare the first row to. I'd love to have some way for the init to be implicit, but with the current type implementation, I don't know how it would work.

What if there was a concept of a "table record"—a default record with all group key columns populated and all non-group-key columns set as null. You could then extend the table record for the init. Something like:

init: {tableRecord with _value: 0, phase: 0}

With that in place, the function call could look like this:

testdata
    |> scan(
        init: {tableRecord with _value: 0, phase: 0},
        fn: (accumulator, r) => ({
            r with
            phase: if accumulator._value <= r._value then accumulator.phase else accumulator.phase + 1,
            _value: r._value,
        })
    )

How does scan affect the group key of a table? Does it work like reduce, where it carries the group key with it?

I would say yes, it carries the group key with it. It would be up to the user to group by any columns added by the transformation.

@sanderson
Copy link
Contributor

To add to this, I think there should also be an array.scan function that provides this functionality for arrays. For example:

import "array"

a = [1, 2, 3, 4]
b = 20

c = array.scan(
    arr: a,
    init: b,
    fn: (x, acc) => x + acc
)

// c = [21,23,26,30]

The signature of array.scan would be something like:

builtin scan : (<-arr: [A], fn: (acc: B, x: A) => B, init: A) => [B]

@UlrichThiess
Copy link

This proposal is urgently needed.

I have described this requirement here: https://community.influxdata.com/t/number-taxi-rides/28939

@UlrichThiess
Copy link

Mybe there is no need for an scan function?

I asked ChatGPT for help. This is our solution: (ChatGPT pushes me in the right direction.)

// Helper function to return an integer from a table
getFieldValue = (tables=<-, field) => {
  extract = tables
    |> last() // shrink table to one row
    |> findColumn(fn: (key) => key._field == field, column: "_value")

  return if length(arr: extract) == 0 then 0 else extract[0] // return 0 if there is no table else last value
}

// We need the last tripId as integer
lastId = from(bucket: "obd2")
  |> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
  |> filter(fn: (r) => r._measurement == "Taxi" and r._field == "tripId" and r._value != 0) // search for tripId
  |> last() // shrink table to one row
  |> getFieldValue(field: "_field" )

rpm_data = from(bucket: "obd2")
  |> range(start: 2023-02-24T21:48:57.8Z, stop: 2023-02-24T22:54:46.955Z)
  |> filter(fn: (r) => r._measurement == "Taxi" and r._field == "EngineRPM")
  |> map(fn: (r) => ({ r with tmp: if r._value > 0 then 1 else 0 })) // create helperfield tmp
  |> derivative(unit: 1s, nonNegative: true, columns: ["tmp"]) // only the changes from 0 to not 0
  |> map(fn: (r) => ({ r with tripId: int(v: r.tmp) })) // create tripIp with integer of tmp
  |> drop(columns: ["tmp"]) // remove helperfield tmp
  |> cumulativeSum(columns: ["tripId"])
  |> map(fn: (r) => ({ r with tripId: if r._value > 0 then r.tripId + lastId else 0 }))
  |> yield(name: "rpm")

I will make a short trigger of that and do more tests.

@github-actions
Copy link

This issue has had no recent activity and will be closed soon.

@paulwer
Copy link

paulwer commented Jun 27, 2023

any updates on this?

@UlrichThiess
Copy link

This can be closed, as I have found a solution.

@paulwer
Copy link

paulwer commented Jun 27, 2023

cannot reproduce it for our stateChanged use-case, any further explaination on your example?
if its working there has to be a documentation for this use case (from my point of view)

@UlrichThiess
Copy link

Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.

import "array"

testdata = array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0}
])

testdata

  // TripId
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0}))
  |> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true)
  |> cumulativeSum(columns: ["TripId"])
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0}))
//  |> filter(fn: (r) => r.TripId != 0)

  |> yield()

@paulwer
Copy link

paulwer commented Jun 27, 2023

i guess this wont cover cases, where you want to only get values and their time, when the value changes.
f.ex. statuses, which are saved as boolean => get only value changes => determine, which warnings were active within a timeframe

@UlrichThiess
Copy link

That sounds easier. However, I don't know the exact requirement. That discussion should be elsewhere, not here. Paul, if I can help, speak to me directly.

@github-actions
Copy link

This issue has had no recent activity and will be closed soon.

Copy link

This issue has had no recent activity and will be closed soon.

Copy link

This issue has had no recent activity and will be closed soon.

Copy link

This issue has had no recent activity and will be closed soon.

Copy link

This issue has had no recent activity and will be closed soon.

Copy link

This issue has had no recent activity and will be closed soon.

@jgladch
Copy link

jgladch commented Oct 3, 2024

Paste this into the InfluxDB 2 DataExplorer in the Script Editor Window.

import "array"

testdata = array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: 0},
  {_time: 2021-01-01T00:01:00Z, _value: 23},
  {_time: 2021-01-01T00:02:00Z, _value: 50},
  {_time: 2021-01-01T00:03:00Z, _value: 0},
  {_time: 2021-01-01T00:04:00Z, _value: 18},
  {_time: 2021-01-01T00:05:00Z, _value: 32},
  {_time: 2021-01-01T00:06:00Z, _value: 0}
])

testdata

  // TripId
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then 1 else 0}))
  |> derivative(unit: 1m, nonNegative: true, columns: ["TripId"], initialZero: true)
  |> cumulativeSum(columns: ["TripId"])
  |> map(fn: (r) => ({r with TripId: if r._value > 0 then r.TripId else 0.0}))
//  |> filter(fn: (r) => r.TripId != 0)

  |> yield()

Thanks very much for this @UlrichThiess It unblocked me 🍻

Copy link

github-actions bot commented Dec 2, 2024

This issue has had no recent activity and will be closed soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants