Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple anomaly detection models #377

Open
Knispel2 opened this issue Feb 15, 2024 · 2 comments
Open

Multiple anomaly detection models #377

Knispel2 opened this issue Feb 15, 2024 · 2 comments

Comments

@Knispel2
Copy link
Contributor

Knispel2 commented Feb 15, 2024

We want to add the option to use any number of models to compute anomaly_score. That is, we would like to have several models (model, model_1, model_2, model_3, ..., model_n), for each of which we would like to get a separate anomaly_score value (anomaly_score, anomaly_score_1, anomaly_score_2, ..., anomaly_score_n). For now, the assumption is that there will be a separate .onnx file for each model.

What would be the best way to accomplish this? Right now I see two ways:

  1. Add an additional argument to the anomaly_score function that will define the name of the model:
...
model_name = ('model', 'model_1', 'model_2', ..., 'model_n')
ad_col_name = ('anomaly_score', 'anomaly_score_1', 'anomaly_score_2', ..., 'anomaly_score_n')
for model, col_name in zip(model_name, ad_col_name):
    df = df.withColumn(col_name, anomaly_score("lc_features", model=model))

This approach would probably put too much load on the server, right?
2) Modify the function so that it goes through all models and returns the result as an array [anomaly_score, anomaly_score_1, anomaly_score_2, ..., anomaly_score_n].

@JulienPeloton
Copy link
Member

Hi @Knispel2 -- thanks for the detailed proposal. I guess both methods would lead to more or less the same memory footprint, as one needs to load all models in memory anyway. Probably the second method would be faster because one would use the same pre-processing code, and only run the inference N times. Can you provide a more detailed profiling (timing & memory) for 1 & N (N>1) models ? That would help to understand the best strategy. Let me know if you need help to put in place the profiling.

@Knispel2
Copy link
Contributor Author

Knispel2 commented Feb 18, 2024

I used the script which you used to debug the AD module and the julienpeloton/fink-ci:dev image. In the script I added the option to set the number of models on startup by passing an argument.

#profiler.py
import time
import sys
from fink_utils.spark.utils import concat_col
from fink_science.ad_features.processor import extract_features_ad

t0 = time.time()
from fink_science.anomaly_detection.processor import anomaly_score
print('Time to load the module: {:.2f} seconds'.format(time.time() - t0))

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import array
import numpy as np


num_test = int(sys.argv[1])
print(f'''==========================================================
{num_test} models test ^^^
=============================================================''')
sc = SparkContext()
spark = SparkSession(sc).builder.config("spark.python.profile", "true").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

df = spark.read.format('parquet').load('/fink_profile/fink-science/fink_science/data/alerts/datatest')

print(df.count())

# Retrieve time-series information
to_expand = [
    'jd', 'fid', 'magpsf', 'sigmapsf',
    'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
    'diffmaglim'
]

# Append temp columns with historical + current measurements
prefix = 'c'
for colname in to_expand:
    df = concat_col(df, colname, prefix=prefix)
expanded = [prefix + i for i in to_expand]

models = tuple(['']*num_test)

col = [f'anomaly_score{num}' for num in range(num_test)]
ad_args = [
    'cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId',
    'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'
]
    # Perform feature extraction + classification
df = df.withColumn('lc_features', extract_features_ad(*ad_args))
for num, model in enumerate(models):
    df = df.withColumn(f'anomaly_score{num}', anomaly_score('lc_features', (model,))

t0 = time.time()
df_small = df.select(col)
df_small.cache()
df_small.count()
result = time.time() - t0

# Show memory profiling
sc.show_profiles()
spark.stop()
print('Time to execute: {:.2f} seconds'.format(result))

All changes to processor.py of the AD module are in commit 73bdf85.

I ran the script 20 times for the number of models from 1 to 100 in increments of 5:

for i in {1..100..5}; do spark-submit --master local[2] --conf spark.python.profile.memory=true profiler.py $i > test_$i.log; done

I then combined the report for each test into a single file:

cat *.log > first_profile.log

For the second case, I modified the profiler.py code as follows:

import time
import sys
from fink_utils.spark.utils import concat_col
from fink_science.ad_features.processor import extract_features_ad

t0 = time.time()
from fink_science.anomaly_detection.processor import anomaly_score
print('Time to load the module: {:.2f} seconds'.format(time.time() - t0))

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import array
import numpy as np


num_test = int(sys.argv[1])
print(f'''==========================================================
{num_test} models test ^^^
=============================================================''')
sc = SparkContext()
spark = SparkSession(sc).builder.config("spark.python.profile", "true").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

df = spark.read.format('parquet').load('/fink_profile/fink-science/fink_science/data/alerts/datatest')

print(df.count())

# Retrieve time-series information
to_expand = [
    'jd', 'fid', 'magpsf', 'sigmapsf',
    'magnr', 'sigmagnr', 'isdiffpos', 'distnr',
    'diffmaglim'
]

# Append temp columns with historical + current measurements
prefix = 'c'
for colname in to_expand:
    df = concat_col(df, colname, prefix=prefix)
expanded = [prefix + i for i in to_expand]

models = tuple(['']*num_test)

col = [f'anomaly_score{num}' for num in range(num_test)]
ad_args = [
    'cmagpsf', 'cjd', 'csigmapsf', 'cfid', 'objectId',
    'cdistnr', 'cmagnr', 'csigmagnr', 'cisdiffpos'
]
    # Perform feature extraction + classification
df = df.withColumn('lc_features', extract_features_ad(*ad_args))
df = df.withColumn('anomaly_score', anomaly_score('lc_features', models)) #<------------------

t0 = time.time()
df_small = df.select(['anomaly_score'])
df_small.cache()
df_small.count()
result = time.time() - t0

# Show memory profiling
sc.show_profiles()
spark.stop()
print('Time to execute: {:.2f} seconds'.format(result))

Here are the final files:
first_profile.log
second_profile.log
The string {num_models} models test ^^^^ is used as a separator for the test results within the files.

I went through these files and plotted the graphs from them:
image
image

I have some doubts about whether I did it right. I have two questions:

  1. Why in the second case memory consumption grows rapidly only at the very beginning and then fluctuates? Could it be because I didn't restart SparkContext completely, but instead did only spark.stop()? Maybe I needed to reboot the entire Docker image for each test?
  2. In the first case, I plotted the memory consumption for a single process in the graph. Do I need to multiply this number by the number of models?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Feature
Development

No branches or pull requests

2 participants