-
Notifications
You must be signed in to change notification settings - Fork 2
/
tracespec.py
executable file
·227 lines (167 loc) · 7 KB
/
tracespec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python
import os, sys, numpy, scipy
from astropy.io import fits
import logging
import pysalt
import traceline
import prep_science
def compute_spectrum_trace(data, start_x, start_y, xbin=1,
debug=False):
# transpose - we can only trace up/down, not left/right
# data = data.T
# combined = traceline.trace_arc(
# data=data,
# start=start,
# direction=-1,
# )
# print combined
w = 30
_y = start_y
iy, ix = numpy.indices(data.shape)
positions = numpy.empty((data.shape[1]))
positions[:] = numpy.NaN
# forward and backwards
start_stop = [(start_x, data.shape[1], +1*xbin, start_y),
(start_x, -1, -1*xbin, start_y)]
for dir in start_stop:
_start, _end, _step, _start_y = dir
for x in range(_start, _end, _step):
# print x
slit = data[int(_y - w):int(_y + w),
int(x):int(x+xbin)]
pos_y = iy[int(_y - w):int(_y + w),
int(x):int(x+xbin)]
valid = numpy.isfinite(slit)
# print x, valid
if (valid.any()):
weighted_y = numpy.sum((slit * pos_y)[valid]) / numpy.sum(slit[valid])
# weighted_y = pos_y[valid][numpy.argmax(slit[valid])] #numpy.sum((slit * pos_y)[valid]) / numpy.sum(slit[valid])
else:
weighted_y = numpy.NaN
if (weighted_y < 0):
weighted_y = numpy.NaN
# print slit.shape
# print x, weighted_y
if (debug):
numpy.savetxt("slit_%04d" % (x),
numpy.append(pos_y.reshape((-1, 1)),
slit.reshape((-1, 1)), axis=1)
)
positions[x] = weighted_y
pos_x = numpy.arange(positions.shape[0])
#
# Now filter the profile to create a smoother trace profile
#
smoothtrace = prep_science.compute_smoothed_profile(
data_x=pos_x.copy(),
data_y=positions.copy(),
# n_max_neighbors=100,
avg_sample_width=100,
n_sigma=3,
)
combined = numpy.empty((positions.shape[0], 3))
combined[:,0] = pos_x
combined[:,1] = positions[:]
combined[:,2] = smoothtrace[:]
#numpy.savetxt("tracespec", positions)
#numpy.savetxt("tracespec.smooth", smoothtrace)
if (debug):
numpy.savetxt("tracespec", combined)
return combined
def compute_trace_slopes(tracedata, n_iter=3, polyorder=1):
logger = logging.getLogger("ComputeTraceSlopes")
npixels = tracedata.shape[0] #tracedata[-1,0]
detector_size = npixels / 3
poly_fits = [None]*3
for detector in range(3):
x_start = detector * detector_size
x_end = x_start + detector_size
in_detector_mask = (tracedata[:,0] >= x_start) & (tracedata[:,0] <= x_end) & (numpy.isfinite(tracedata[:,1]))
_detector_trace = tracedata[in_detector_mask]
# make space for the best-fit solutions of each iteration
detector_trace = numpy.empty((_detector_trace.shape[0], _detector_trace.shape[1]+n_iter))
detector_trace[:, :_detector_trace.shape[1]] = _detector_trace
# detector_trace = numpy.append(tracedata[in_detector_mask],
# tracedata[in_detector_mask][:,0:1], axis=1)
valid = numpy.isfinite(detector_trace[:,0]) & numpy.isfinite(detector_trace[:,1])
for iteration in range(n_iter):
try:
poly = numpy.polyfit(
x=detector_trace[:,0][valid],
y=detector_trace[:,1][valid],
deg=polyorder,
)
except Exception as e:
print e
break
fit = numpy.polyval(poly, detector_trace[:,0])
residual = detector_trace[:,1] - fit
_perc = numpy.nanpercentile(residual[valid], [16,84,50])
_med = _perc[2]
_sigma = 0.5*(_perc[1] - _perc[0])
#print _sigma, _perc
bad = (residual > 3*_sigma) | (residual < -3*_sigma)
valid[bad] = False
detector_trace[:,iteration+_detector_trace.shape[1]] = fit
poly_fits[detector] = poly
logger.debug("Detector %d: %s" % (detector+1, str(poly)))
numpy.savetxt("det_trace.%d" % (detector + 1), detector_trace)
# Now compensate all slopes to be offsets relative to the trace at the center of the detector
poly_fits = numpy.array(poly_fits)
# numpy.savetxt(sys.stdout, poly_fits)
mid_x = 0.5 * npixels
center_y = numpy.polyval(poly_fits[1], mid_x)
# print center_y
trace_offset = numpy.zeros((npixels))
for detector in range(3):
x_start = detector * detector_size
x_end = x_start + detector_size
tracepos = numpy.polyval(poly_fits[detector], numpy.arange(npixels))
trace_offset[x_start:x_end] = tracepos[x_start:x_end] - center_y
numpy.savetxt("tracespec.offset", trace_offset)
return poly_fits, trace_offset
def save_trace_offsets(trace_offset):
tbhdu = fits.ImageHDU(data=trace_offset, name="TRACEOFFSET")
return tbhdu
if __name__ == "__main__":
logger_setup = pysalt.mp_logging.setup_logging()
logger = logging.getLogger("MAIN")
hdulist = fits.open(sys.argv[1])
start_x = int(sys.argv[2])-1
start_y = int(sys.argv[3])-1
try:
xbin = int(sys.argv[4])
except:
xbin = 1
start = start_x,start_y
data = hdulist['SKYSUB.OPT'].data
fits.PrimaryHDU(data=data).writeto("tracespec.fits", clobber=True)
print "computing spectrum trace"
spectrace_data = compute_spectrum_trace(
data=data,
start_x=start_x, start_y=start_y,
xbin=xbin,
debug=True
)
print "finding trace slopes"
slopes, trace_offset = compute_trace_slopes(spectrace_data)
print slopes
numpy.savetxt("traceoffset", trace_offset)
# Now generate a ds9 region file illustrating the actual peak positions, the best-fit lines and
# 2 lines on either side to show the slit extent
print "writing ds9 region file"
with open("tracespec.reg", "w") as ds9:
# write header
print >>ds9, """# Region file format: DS9 version 4.1
global color=green dashlist=8 3 width=1 font="helvetica 10 normal roman" select=1 highlite=1 dash=0 fixed=0 edit=1 move=1 delete=1 include=1 source=1
physical"""
# write all points
print >>ds9, "\n".join(["point(%.2f,%.2f) # point=x" % (d[0]+1, d[1]+1) for d in spectrace_data[numpy.isfinite(spectrace_data[:,1])]])
slitwidth = 10
#line = trace_offset + start_x + 10
for slitwidth in [+10, -10, +30, -30]:
print >>ds9, "\n".join([
"line(%.2f,%.2f,%.2f,%.2f) # line=0 0" % (i+1, trace_offset[i]+start_y+slitwidth+1,
i+1+1, trace_offset[i+1]+start_y+slitwidth+1) for i in range(trace_offset.shape[0]-1)
])
pysalt.mp_logging.shutdown_logging(logger_setup)