Skip to content

Commit

Permalink
fix: whitespace, line lengths, unused imports
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamCorrao committed Apr 11, 2024
1 parent f221195 commit e0a7c77
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 89 deletions.
189 changes: 101 additions & 88 deletions pdf_agents/peakfit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, List, Union
from typing import Dict, List

import numpy as np
from scipy.optimize import curve_fit
Expand All @@ -12,40 +10,42 @@

logger = logging.getLogger(__name__)


class PeakFitAgent(PDFReporterMixin, PDFBaseAgent):
"""_summary_
Parameters
----------
xrois : List[tuple]
Regions of interest in x (lower limit, upper limit)
fit_func : str
Fitting function to use - can be gaussian, lorentzian, or voigt
pos_percent_lim : float
Percent of peak position for min / max bounds (e.g., upper limit = position + (position * pos_percent_lim / 100))
maxcycles : int
Maximum number of curve_fit cycles - may need to increase for convergence
Attributes
----------
xrois : List[tuple]
Regions of interest in x (lower limit, upper limit)
fit_func : str
Fitting function to use - can be gaussian, lorentzian, or voigt
pos_percent_lim : float
Percent of peak position for min / max bounds (e.g., upper limit = position + (position * pos_percent_lim / 100))
maxcycles : int
Maximum number of curve_fit cycles - may need to increase for convergence
Examples
--------
This agent is designed to be used in offline mode. It can be used as follows:
>>> import tiled.client.node # Workaround for API issue
>>> from pdf_agents.peakfit import PeakFitAgent
>>> offline_obj = PeakFitAgent.get_offline_objects()
>>> agent = PeakFitAgent(xrois= [(2.7,3.0)], fit_func='gaussian', pos_percent_lim=2, maxcycles=1000,
report_producer=offline_obj["kafka_producer"], offline=True)
"""
Parameters
----------
xrois : List[tuple]
Regions of interest in x (lower limit, upper limit)
fit_func : str
Fitting function to use - can be gaussian, lorentzian, or voigt
pos_percent_lim : float
Percent of peak position for bounds (e.g., lim = position +/- (position * pos_percent_lim / 100))
maxcycles : int
Maximum number of curve_fit cycles - may need to increase for convergence
Attributes
----------
xrois : List[tuple]
Regions of interest in x (lower limit, upper limit)
fit_func : str
Fitting function to use - can be gaussian, lorentzian, or voigt
pos_percent_lim : float
Percent of peak position for bounds (e.g., lim = position +/- (position * pos_percent_lim / 100))
maxcycles : int
Maximum number of curve_fit cycles - may need to increase for convergence
Examples
--------
This agent is designed to be used in offline mode. It can be used as follows:
>>> import tiled.client.node # Workaround for API issue
>>> from pdf_agents.peakfit import PeakFitAgent
>>> offline_obj = PeakFitAgent.get_offline_objects()
>>> agent = PeakFitAgent(xrois= [(2.7,3.0)], fit_func='gaussian', pos_percent_lim=2, maxcycles=1000,
report_producer=offline_obj["kafka_producer"], offline=True)
"""

def __init__(
self,
*,
Expand All @@ -69,35 +69,35 @@ def __init__(
def xrois(self):
return self._xrois

@xrois.setter
def xrois(self, xrois):
@xrois.setter
def xrois(self, xrois):
self._xrois = xrois
self.close_and_restart()

@property
def fit_func(self):
return self._fit_func

@fit_func.setter
def fit_func(self, fit_func):
@fit_func.setter
def fit_func(self, fit_func):
self._fit_func = fit_func
self.close_and_restart()

@property
def pos_percent_lim(self):
return self._pos_percent_lim

@pos_percent_lim.setter
def pos_percent_lim(self, pos_percent_lim):
@pos_percent_lim.setter
def pos_percent_lim(self, pos_percent_lim):
self._pos_percent_lim = pos_percent_lim
self.close_and_restart()

@property
def maxcycles(self):
return self._maxcycles

@maxcycles.setter
def maxcycles(self, maxcycles):
@maxcycles.setter
def maxcycles(self, maxcycles):
self._maxcycles = maxcycles
self.close_and_restart()

Expand All @@ -114,13 +114,13 @@ def tell(self, x, y) -> Dict[str, ArrayLike]:
return dict(independent_variable=x, observable=y, ordinate=self._ordinate)

def gaussian(self, x, amp, center, width):
return (amp / width * np.sqrt(2*np.pi)) * np.exp((-(x - center)**2.0) / ((2.0 * width)**2.0))
return (amp / width * np.sqrt(2 * np.pi)) * np.exp((-((x - center)**2)) / ((2.0 * width)**2))

def gaussian_fwhm(self, width):
return 2 * np.sqrt(2*np.log(2)) * width
return 2 * np.sqrt(2 * np.log(2)) * width

def lorentzian(self, x, amp, center, width):
return amp / np.pi * (width / ((x - center)**2 + width**2))
return amp / np.pi * (width / ((x - center) ** 2 + width**2))

def lorentzian_fwhm(self, width):
return 2 * width
Expand All @@ -142,11 +142,10 @@ def voigt(self, x, amp, center, sigma, gamma):

def voigt_fwhm(self, sigma, gamma):
return 2 * sigma * np.sqrt(2 * np.log(2)) + 2 * gamma

def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles: int):

def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles: int):
"""Slice data for region of interest, find maxima, fit with Voigt function, return fit results
Returns
-------
peak_amplitude: float
Expand Down Expand Up @@ -178,21 +177,24 @@ def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles:
peak_maxima_x = x_roi[peak_maxima_index]
peak_maxima_y = y_roi[peak_maxima_index]

if fit_func == 'voigt':
if fit_func == "voigt":

if sum(y_roi) < 0: # in case there is data on background, off-sample, or a peak is entirely missing
p0 = [0.1, peak_maxima_x, 0.01, 0.01]
if sum(y_roi) < 0: # in case there is data on background, off-sample, a peak is missing
p0 = [0.1, peak_maxima_x, 0.01, 0.01]
else:
p0 = [peak_maxima_y, peak_maxima_x, 0.01, 0.01]

try:
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim/100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim/100) # pos upper limit
bounds = ([0, pos_llim, 0.00001, 0.00001], [1000000, pos_ulim, 0.5, 0.5]) # (lower, upper) bounds for amplitude, center, sigma, gamma
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim / 100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim / 100) # pos upper limit
bounds = (
[0, pos_llim, 0.00001, 0.00001],
[1000000, pos_ulim, 0.5, 0.5],
) # (lower, upper) bounds for amplitude, center, sigma, gamma

popt, pcov = curve_fit(self.voigt, x_roi, y_roi, p0=p0, bounds=bounds, max_nfev=maxcycles)

except RuntimeError: # if fitting fails, return None for parameters
except RuntimeError: # if fitting fails, return None for parameters
popt = None

if popt is not None:
Expand All @@ -213,19 +215,22 @@ def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles:
x_roi = None

return peak_amplitude, peak_position, peak_sigma, peak_gamma, peak_fwhm, ycalc, ydiff, x_roi
elif fit_func == 'gaussian':
if sum(y_roi) < 0: # in case there is data on background, off-sample

elif fit_func == "gaussian":
if sum(y_roi) < 0: # in case there is data on background, off-sample
p0 = [0.01, peak_maxima_x, 0.01]
else:
p0 = [peak_maxima_y, peak_maxima_x, 0.01] # Initial guess for parameters: amplitude, center, width
p0 = [peak_maxima_y, peak_maxima_x, 0.01] # guess for parameters: amplitude, center, width

try:
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim/100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim/100) # pos upper limit
bounds = ([0, pos_llim, 0.00001], [1000000, pos_ulim, 1]) # (lower, upper) bounds for amplitude, center, sigma, gamma

#print(f"p0={p0} | Bounds: {bounds}")
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim / 100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim / 100) # pos upper limit
bounds = (
[0, pos_llim, 0.00001],
[1000000, pos_ulim, 1],
) # (lower, upper) bounds for amplitude, center, sigma, gamma

# print(f"p0={p0} | Bounds: {bounds}")
popt, pcov = curve_fit(self.gaussian, x_roi, y_roi, p0=p0, bounds=bounds, max_nfev=maxcycles)

except RuntimeError:
Expand All @@ -249,18 +254,21 @@ def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles:

return peak_amplitude, peak_position, peak_fwhm, ycalc, ydiff, x_roi

elif fit_func == 'lorentzian':
if sum(y_roi) < 0: # in case there is data on background, off-sample
elif fit_func == "lorentzian":
if sum(y_roi) < 0: # in case there is data on background, off-sample
p0 = [0.01, peak_maxima_x, 0.01]
else:
p0 = [peak_maxima_y, peak_maxima_x, 0.01] # Initial guess for parameters: amplitude, center, width
p0 = [peak_maxima_y, peak_maxima_x, 0.01] # guess for parameters: amplitude, center, width

try:
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim/100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim/100) # pos upper limit
bounds = ([0, pos_llim, 0.00001], [1000000, pos_ulim, 1]) # (lower, upper) bounds for amplitude, center, sigma, gamma

#print(f"p0={p0} | Bounds: {bounds}")
pos_llim = peak_maxima_x - (peak_maxima_x * pos_percent_lim / 100) # pos lower limit
pos_ulim = peak_maxima_x + (peak_maxima_x * pos_percent_lim / 100) # pos upper limit
bounds = (
[0, pos_llim, 0.00001],
[1000000, pos_ulim, 1],
) # (lower, upper) bounds for amplitude, center, sigma, gamma

# print(f"p0={p0} | Bounds: {bounds}")
popt, pcov = curve_fit(self.lorentzian, x_roi, y_roi, p0=p0, bounds=bounds, max_nfev=1000)

except RuntimeError:
Expand All @@ -285,26 +293,31 @@ def fit_roi(self, xroi: tuple, fit_func: str, pos_percent_lim: float, maxcycles:

return peak_amplitude, peak_position, peak_fwhm, ycalc, ydiff, x_roi
else:
raise NameError(f"fit_func not recognized - must be 'voigt', 'gaussian', or 'lorentzian'. Input is {fit_func}")
raise NameError(
f"fit_func not recognized - must be 'voigt', 'gaussian', or 'lorentzian'. Input is {fit_func}"
)

def report(self) -> Dict[str, ArrayLike]:

peak_amps_poss_fwhms = [] # this is what kmeans will likely consume - possibly better to handle this upstream?
peak_amps_poss_fwhms = (
[]
) # this is what kmeans will likely consume - possibly better to handle this upstream?
peak_amplitudes = []
peak_positions = []
peak_sigmas = [] # not used for gaussian / lorentzian case, None is a placeholder
peak_gammas = [] # not used for gaussian / lorentzian case, None is a placeholder
peak_sigmas = [] # not used for gaussian / lorentzian case, None is a placeholder
peak_gammas = [] # not used for gaussian / lorentzian case, None is a placeholder
peak_fwhms = []
ycalcs = []
ydiffs = []
x_rois = []


if self.fit_func is 'voigt':
if self.fit_func == "voigt":
for xroi in self.xrois:
peak_amplitude, peak_position, peak_sigma, peak_gamma, peak_fwhm, ycalc, ydiff, x_roi = self.fit_roi(xroi, self.fit_func, self.pos_percent_lim, self.maxcycles)

peak_amps_poss_fwhms.append([peak_amplitude,peak_position,peak_fwhm])
peak_amplitude, peak_position, peak_sigma, peak_gamma, peak_fwhm, ycalc, ydiff, x_roi = (
self.fit_roi(xroi, self.fit_func, self.pos_percent_lim, self.maxcycles)
)

peak_amps_poss_fwhms.append([peak_amplitude, peak_position, peak_fwhm])
peak_amplitudes.append(peak_amplitude)
peak_positions.append(peak_position)
peak_sigmas.append(peak_sigma)
Expand All @@ -314,11 +327,13 @@ def report(self) -> Dict[str, ArrayLike]:
ydiffs.append(ydiff)
x_rois.append(x_roi)

elif self.fit_func is 'gaussian' or self.fit_func is 'lorentzian':
elif self.fit_func == "gaussian" or self.fit_func == "lorentzian":
for xroi in self.xrois:
peak_amplitude, peak_position, peak_fwhm, ycalc, ydiff, x_roi = self.fit_roi(xroi, self.fit_func, self.pos_percent_lim, self.maxcycles)

peak_amps_poss_fwhms.append([peak_amplitude,peak_position,peak_fwhm])
peak_amplitude, peak_position, peak_fwhm, ycalc, ydiff, x_roi = self.fit_roi(
xroi, self.fit_func, self.pos_percent_lim, self.maxcycles
)

peak_amps_poss_fwhms.append([peak_amplitude, peak_position, peak_fwhm])
peak_amplitudes.append(peak_amplitude)
peak_positions.append(peak_position)
peak_sigmas.append(None)
Expand All @@ -328,7 +343,6 @@ def report(self) -> Dict[str, ArrayLike]:
ydiffs.append(ydiff)
x_rois.append(x_roi)

# alternatively, can return dict for voigt and gaussian/lorentzian within conditional - not optimal since we want a static report shape
return dict(
data_key=self.data_key,
roi_key=self.roi,
Expand All @@ -349,10 +363,9 @@ def report(self) -> Dict[str, ArrayLike]:
peak_fwhms=np.array(peak_fwhms),
ycalc=np.array(ycalcs),
ydiffs=np.array(ydiffs),
x_rois=np.array(x_rois)
x_rois=np.array(x_rois),
)

def ask(self, batch_size):
"""This is a passive agent, that does not request next experiments. It does analysis."""
raise NotImplementedError

2 changes: 1 addition & 1 deletion pdf_agents/startup_scripts/mmm5-tax-day/peakfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

xrois = [(2.75, 2.95), (3.6, 3.8), (4.6, 5.0)] # IMPORTANT: these bounds are for x = Q, not x = tth
fit_func = 'gaussian'
pos_percent_lim = 2
pos_percent_lim = 2
maxcycles = 1000

report_producer = PeakFitAgent.get_default_producer()
Expand Down

0 comments on commit e0a7c77

Please sign in to comment.