Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hanukkah experiment #6

Merged
merged 22 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 45 additions & 33 deletions pdf_agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,42 @@ class PDFBaseAgent(Agent, ABC):
def __init__(
self,
*args,
motor_name: str = "Grid_X",
motor_resolution: float = 0.0002,
motor_names: List[str] = ["xstage", "ystage"],
motor_origins: List[float] = [0.0, 0.0],
motor_resolution: float = 0.2, # mm
data_key: str = "chi_I",
roi_key: str = "chi_Q",
roi: Optional[Tuple] = None,
offline=False,
**kwargs,
):
self._rkvs = redis.Redis(host="info.pdf.nsls2.bnl.gov", port=6379, db=0) # redis key value store
self._motor_name = motor_name
self._motor_names = motor_names
self._motor_resolution = motor_resolution
self._motor_origins = np.array(motor_origins)
self._data_key = data_key
self._roi_key = roi_key
self._roi = roi
# Attributes pulled in from Redis
self._exposure = float(self._rkvs.get("PDF:desired_exposure_time").decode("utf-8"))
self._sample_number = int(self._rkvs.get("PDF:xpdacq:sample_number").decode("utf-8"))
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
try:
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
except AttributeError:
# None available in redis
self._background = np.zeros((2,))
if offline:
_default_kwargs = self.get_offline_objects()
else:
_default_kwargs = self.get_beamline_objects()
_default_kwargs.update(kwargs)
md = dict(
motor_name=self.motor_name,
motor_names=self.motor_names,
motor_resolution=self.motor_resolution,
data_key=self.data_key,
roi_key=self.roi_key,
Expand All @@ -77,25 +83,32 @@ def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, Dict]:
plan_kwargs : dict
Dictionary of keyword arguments to pass the plan, from a point to measure.
"""
return "agent_redisAware_XRDcount", [point], {}
return "agent_move_and_measure_hanukkah23", [], {"x": point[0], "y": point[1], "exposure": 5}

def unpack_run(self, run) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]:
"""Subtracts background and returns motor positions and data"""
y = run.primary.data[self.data_key].read().flatten()
y = y - self.background[1]
if self.background is not None:
y = y - self.background[1]
if self.roi is not None:
ordinate = np.array(run.primary.data[self.roi_key]).flatten()
idx_min = np.where(ordinate < self.roi[0])[0][-1] if len(np.where(ordinate < self.roi[0])[0]) else None
idx_max = np.where(ordinate > self.roi[1])[0][-1] if len(np.where(ordinate > self.roi[1])[0]) else None
y = y[idx_min:idx_max]
try:
x = run.start["more_info"][self.motor_name][self.motor_name]["value"]
x = np.array(
[
run.start["more_info"][motor_name][f"OT_stage_2_{motor_name[0].upper()}"]["value"]
for motor_name in self.motor_names
]
)
except KeyError:
x = run.start[self.motor_name][self.motor_name]["value"]
x = np.array([run.start[motor_name][motor_name]["value"] for motor_name in self.motor_names])
return x, y

def server_registrations(self) -> None:
self._register_property("motor_resolution")
self._register_property("motor_name")
self._register_property("motor_names")
self._register_property("exposure_time")
self._register_property("sample_number")
self._register_property("data_key")
Expand All @@ -105,13 +118,13 @@ def server_registrations(self) -> None:
return super().server_registrations()

@property
def motor_name(self):
def motor_names(self):
"""Name of motor to be used as the independent variable in the experiment"""
return self._motor_name
return self._motor_names

@motor_name.setter
def motor_name(self, value: str):
self._motor_name = value
@motor_names.setter
def motor_names(self, value: str):
self._motor_names = value

@property
def motor_resolution(self):
Expand Down Expand Up @@ -158,12 +171,15 @@ def motor_resolution(self, value: float):

@property
def background(self):
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
try:
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
except AttributeError:
self._background = np.zeros((2,))
return self._background

# @background.setter
Expand Down Expand Up @@ -230,10 +246,10 @@ def get_beamline_objects() -> dict:
kafka_consumer=kafka_consumer,
kafka_producer=kafka_producer,
tiled_data_node=tiled.client.from_uri(
"https://tiled.nsls2.bnl.gov/api/v1/node/metadata/pdf/bluesky_sandbox"
"https://tiled.nsls2.bnl.gov/api/v1/metadata/pdf/bluesky_sandbox"
),
tiled_agent_node=tiled.client.from_uri(
"https://tiled.nsls2.bnl.gov/api/v1/node/metadata/pdf/bluesky_sandbox"
"https://tiled.nsls2.bnl.gov/api/v1/metadata/pdf/bluesky_sandbox"
),
qserver=qs,
)
Expand All @@ -257,11 +273,7 @@ def get_offline_objects() -> dict:
)

def trigger_condition(self, uid) -> bool:
try:
det_pos = self.exp_catalog[uid].start["more_info"]["Det_1_Z"]["Det_1_Z"]["value"]
except KeyError:
det_pos = self.exp_catalog[uid].start["Det_1_Z"]["Det_1_Z"]["value"]
return det_pos > 4_000.0
return True


class PDFSequentialAgent(PDFBaseAgent, SequentialAgentBase):
Expand Down
27 changes: 6 additions & 21 deletions pdf_agents/monarch_bmm_subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,15 @@ def subject_ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Seq
suggestions = [suggestions]
# Keep non redundant suggestions and add to knowledge cache
for suggestion in suggestions:
hashable_suggestion = make_hashable(discretize(suggestion, self.motor_resolution))
if suggestion in self.subject_knowledge_cache:
logger.info(f"Suggestion {suggestion} is ignored as already in the subject knowledge cache")
logger.info(
f"Suggestion {suggestion} is ignored as already in the subject knowledge cache: "
f"{hashable_suggestion}"
)
continue
else:
self.subject_knowledge_cache.add(make_hashable(discretize(suggestion, self.motor_resolution)))
self.subject_knowledge_cache.add(hashable_suggestion)
kept_suggestions.append(suggestion)
_default_doc = dict(
elements=self.elements,
Expand All @@ -236,22 +240,3 @@ def subject_ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Seq
)
docs = [dict(suggestion=suggestion, **_default_doc) for suggestion in kept_suggestions]
return docs, kept_suggestions

def tell(self, x, y):
"""Update tell using relative info"""
x = x - self.pdf_origin[0]
doc = super().tell(x, y)
doc["absolute_position_offset"] = self.pdf_origin[0]
return doc

def ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Sequence[ArrayLike]]:
"""Update ask with relative info"""
docs, suggestions = super().ask(batch_size=batch_size)
for doc in docs:
doc["absolute_position_offset"] = self.pdf_origin[0]
return docs, suggestions

def measurement_plan(self, relative_point: ArrayLike) -> Tuple[str, List, dict]:
"""Send measurement plan absolute point from reltive position"""
absolute_point = relative_point + self.pdf_origin[0]
return super().measurement_plan(absolute_point)
90 changes: 73 additions & 17 deletions pdf_agents/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
from numpy.typing import ArrayLike
from scipy.stats import rv_discrete
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression

from .agents import PDFBaseAgent
from .utils import discretize, make_hashable
from .utils import discretize, make_hashable, make_wafer_grid_list

logger = logging.getLogger(__name__)


class PassiveKmeansAgent(PDFBaseAgent, ClusterAgentBase):
def __init__(self, k_clusters, *args, **kwargs):
estimator = KMeans(k_clusters)
estimator = KMeans(k_clusters, n_init="auto")
_default_kwargs = self.get_beamline_objects()
_default_kwargs.update(kwargs)
super().__init__(*args, estimator=estimator, **kwargs)
Expand All @@ -36,6 +37,22 @@ def server_registrations(self) -> None:
self._register_method("clear_caches")
return super().server_registrations()

def tell(self, x, y):
"""Update tell using relative info"""
x = x - self._motor_origins
doc = super().tell(x, y)
doc["absolute_position_offset"] = self._motor_origins
return doc

def report(self, **kwargs):
arr = np.array(self.observable_cache)
self.model.fit(arr)
return dict(
cluster_centers=self.model.cluster_centers_,
cache_len=len(self.independent_cache),
latest_data=self.tell_cache[-1],
)

@classmethod
def hud_from_report(
cls,
Expand Down Expand Up @@ -125,6 +142,10 @@ def __init__(self, *args, bounds: ArrayLike, **kwargs):
self._bounds = bounds
self.knowledge_cache = set() # Discretized knowledge cache of previously asked/told points

@property
def name(self):
return "PDFActiveKMeans"

@property
def bounds(self):
return self._bounds
Expand All @@ -139,8 +160,8 @@ def server_registrations(self) -> None:

def tell(self, x, y):
"""A tell that adds to the local discrete knowledge cache, as well as the standard caches"""
self.knowledge_cache.add(make_hashable(discretize(x, self.motor_resolution)))
doc = super().tell(x, y)
self.knowledge_cache.add(make_hashable(discretize(doc["independent_variable"], self.motor_resolution)))
doc["background"] = self.background
return doc

Expand All @@ -159,35 +180,64 @@ def _sample_uncertainty_proxy(self, batch_size=1):
"""
# Borrowing from Dan's jupyter fun
# from measurements, perform k-means
sorted_independents, sorted_observables = zip(*sorted(zip(self.independent_cache, self.observable_cache)))
try:
sorted_independents, sorted_observables = zip(
*sorted(zip(self.independent_cache, self.observable_cache))
)
except ValueError:
# Multidimensional case
sorted_independents, sorted_observables = zip(
*sorted(zip(self.independent_cache, self.observable_cache), key=lambda x: (x[0][0], x[0][1]))
)

sorted_independents = np.array(sorted_independents)
sorted_observables = np.array(sorted_observables)
self.model.fit(sorted_observables)
# retreive centers
centers = self.model.cluster_centers_
# calculate distances of all measurements from the centers
distances = self.model.transform(sorted_observables)
# determine golf-score of each point (minimum value)
min_landscape = distances.min(axis=1)
# generate 'uncertainty weights' - as a polynomial fit of the golf-score for each point
_x = np.arange(*self.bounds, self.motor_resolution)
uwx = polyval(_x, polyfit(sorted_independents, min_landscape, deg=5))
# Chose from the polynomial fit
return pick_from_distribution(_x, uwx, num_picks=batch_size), centers

if self.bounds.size == 2:
# One dimensional case, Use the Dan Olds approach
# calculate distances of all measurements from the centers
distances = self.model.transform(sorted_observables)
# determine golf-score of each point (minimum value)
min_landscape = distances.min(axis=1)
# Assume a 1d scan
# generate 'uncertainty weights' - as a polynomial fit of the golf-score for each point
_x = np.arange(*self.bounds, self.motor_resolution)
if batch_size is None:
batch_size = len(_x)
uwx = polyval(_x, polyfit(sorted_independents, min_landscape, deg=5))
# Chose from the polynomial fit
return pick_from_distribution(_x, uwx, num_picks=batch_size), centers
else:
# assume a 2d scan, use a linear model to predict the uncertainty
grid = make_wafer_grid_list(*self.bounds.ravel(), step=self.motor_resolution)
labels = self.model.predict(sorted_observables)
proby_preds = LogisticRegression().fit(sorted_independents, labels).predict_proba(grid)
shannon = -np.sum(proby_preds * np.log(1 / proby_preds), axis=-1)
top_indicies = np.argsort(shannon) if batch_size is None else np.argsort(shannon)[-batch_size:]
return grid[top_indicies], centers

def ask(self, batch_size=1):
suggestions, centers = self._sample_uncertainty_proxy(batch_size)
"""Get's a relative position from the agent. Returns a document and hashes the suggestion for redundancy"""
suggestions, centers = self._sample_uncertainty_proxy(None)
kept_suggestions = []
if not isinstance(suggestions, Iterable):
suggestions = [suggestions]
# Keep non redundant suggestions and add to knowledge cache
for suggestion in suggestions:
if suggestion in self.knowledge_cache:
logger.info(f"Suggestion {suggestion} is ignored as already in the knowledge cache")
hashable_suggestion = make_hashable(discretize(suggestion, self.motor_resolution))
if hashable_suggestion in self.knowledge_cache:
logger.warn(
f"Suggestion {suggestion} is ignored as already in the knowledge cache: {hashable_suggestion}"
)
continue
else:
self.knowledge_cache.add(make_hashable(discretize(suggestion, self.motor_resolution)))
self.knowledge_cache.add(hashable_suggestion)
kept_suggestions.append(suggestion)
if len(kept_suggestions) >= batch_size:
break

base_doc = dict(
cluster_centers=centers,
Expand All @@ -199,11 +249,17 @@ def ask(self, batch_size=1):
latest_data=self.tell_cache[-1],
requested_batch_size=batch_size,
redundant_points_discarded=batch_size - len(kept_suggestions),
absolute_position_offset=self._motor_origins,
)
docs = [dict(suggestion=suggestion, **base_doc) for suggestion in kept_suggestions]

return docs, kept_suggestions

def measurement_plan(self, relative_point: ArrayLike):
"""Send measurement plan absolute point from reltive position"""
absolute_point = relative_point + self._motor_origins
return super().measurement_plan(absolute_point)


def current_dist_gen(x, px):
"""from distribution defined by p(x), produce a discrete generator.
Expand Down
31 changes: 31 additions & 0 deletions pdf_agents/startup_scripts/mmm4-kmeans-local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np
import tiled.client.node # noqa: F401
from bluesky_adaptive.server import register_variable, shutdown_decorator, startup_decorator
from bluesky_queueserver_api.zmq import REManagerAPI

from pdf_agents.sklearn import ActiveKmeansAgent

qserver = REManagerAPI(zmq_control_addr="tcp://xf28id1-srv1:60615", zmq_info_addr="tcp://xf28id1-srv1:60625")
agent = ActiveKmeansAgent(
bounds=np.array([(-32, 32), (-32, 32)]),
ask_on_tell=False,
report_on_tell=True,
k_clusters=4,
motor_names=["xstage", "ystage"],
motor_origins=[-154.7682, 48.9615],
qserver=qserver,
)


@startup_decorator
def startup():
agent.start()


@shutdown_decorator
def shutdown_agent():
return agent.stop()


register_variable("Tell Cache", agent, "tell_cache")
register_variable("Agent Name", agent, "instance_name")
Loading
Loading