import io
import os
from pathlib import Path
import argparse
import re
import shutil
from time import gmtime, strftime
import string
import warnings
from contextlib import contextmanager
from enum import Enum
from scipy.optimize import leastsq, curve_fit
from scipy.stats import t
import pandas as pd
import numpy as np
import h5py
from refnx.reduce.peak_utils import peak_finder, centroid
import refnx.util.general as general
from refnx.util.general import resolution_double_chopper, _dict_compare
import refnx.util.ErrorProp as EP
from refnx.reduce.parabolic_motion import (
find_trajectory,
y_deflection,
parabola,
)
from refnx.reduce.event import (
events,
process_event_stream,
framebins_to_frames,
)
from refnx.reduce.rebin import rebin, rebin_along_axis
from refnx._lib import possibly_open_file
from refnx.reflect import SpinChannel
EXTENT_MULT = 2
PIXEL_OFFSET = 4
spectrum_template = """<?xml version="1.0"?>
<REFroot xmlns="">
<REFentry time="$time">
<Title>$title</Title>
<REFdata axes="lambda" rank="1" type="POINT"\
spin="UNPOLARISED" dim="$n_spectra">
<Run filename="$runnumber"/>
<R uncertainty="dR">$r</R>
<lambda uncertainty="dlambda" units="1/A">$lmda</lambda>
<dR type="SD">$dr</dR>
<dlambda type="_FWHM" units="1/A">$dlmda</dlambda>
</REFdata>
</REFentry>
</REFroot>"""
[docs]def catalogue(start, stop, data_folder=None, prefix="PLP", keys=None):
"""
Extract interesting information from Platypus NeXUS files.
Parameters
----------
start : int
start cataloguing from this run number
stop : int
stop cataloguing at this run number
data_folder : {str, Path}, optional
path specifying location of NeXUS files
prefix : {'PLP', 'SPZ'}, optional
str specifying whether you want to catalogue Platypus or Spatz files
keys : sequence of str, optional
specifies the keys to be extracted from each catalogue. Overrides the
default set of keys.
Returns
-------
catalog : pd.DataFrame
Dataframe containing interesting parameters from Platypus Nexus files
Notes
-----
The default set of keys is:
["filename", "end_time", "sample_name", "ss1vg", "ss2vg", "ss3vg",
"ss4vg", "omega", "twotheta", "total_counts", "bm1_counts", "time",
"daq_dirname", "start_time"]
"""
info = ["filename", "end_time", "sample_name"]
if prefix == "PLP":
info += ["ss1vg", "ss2vg", "ss3vg", "ss4vg"]
elif prefix == "SPZ":
info += ["ss2hg", "ss3hg", "ss4hg"]
info += [
"omega",
"twotheta",
"total_counts",
"bm1_counts",
"time",
"daq_dirname",
"start_time",
]
if keys is not None:
info = list(keys)
run_number = []
d = {key: [] for key in info}
if data_folder is None:
data_folder = "."
files = Path(data_folder).glob(f"{prefix}*.nx.hdf")
files = list(files)
files.sort()
files = [
file
for file in files
if datafile_number(file, prefix=prefix) in range(start, stop + 1)
]
for idx, file in enumerate(files):
if prefix == "PLP":
pn = PlatypusNexus(file)
elif prefix == "SPZ":
pn = SpatzNexus(file)
else:
raise RuntimeError("prefix not known yet")
cat = pn.cat.cat
run_number.append(idx)
for key, val in d.items():
data = cat[key]
if np.size(data) > 1 or type(data) is np.ndarray:
data = data[0]
if type(data) is bytes:
data = data.decode()
d[key].append(data)
df = pd.DataFrame(d, index=run_number, columns=info)
return df
[docs]class Catalogue:
"""
Extract relevant parts of a NeXus file for reflectometry reduction
"""
def __init__(self, h5d):
"""
Extract relevant parts of a NeXus file for reflectometry reduction
Access information via dict access, e.g. cat['detector'].
Parameters
----------
h5d - HDF5 file handle
"""
self.prefix = None
d = {}
file_path = Path(h5d.filename).resolve()
d["path"] = Path(file_path).parent
d["filename"] = h5d.filename
try:
d["end_time"] = h5d["entry1/end_time"][0]
except KeyError:
# Autoreduce field tests show that this key may not be present in
# some files before final write.
d["end_time"] = ""
d["detector"] = h5d["entry1/data/hmm"][:]
d["t_bins"] = h5d["entry1/data/time_of_flight"][:].astype("float64")
d["x_bins"] = h5d["entry1/data/x_bin"][:]
d["y_bins"] = h5d["entry1/data/y_bin"][:]
d["bm1_counts"] = h5d["entry1/monitor/bm1_counts"][:]
d["total_counts"] = h5d["entry1/instrument/detector/total_counts"][:]
d["time"] = h5d["entry1/instrument/detector/time"][:]
try:
event_directory_name = h5d[
"entry1/instrument/detector/daq_dirname"
][0]
d["daq_dirname"] = event_directory_name.decode()
except KeyError:
# daq_dirname doesn't exist in this file
d["daq_dirname"] = None
d["ss2vg"] = h5d["entry1/instrument/slits/second/vertical/gap"][:]
d["ss3vg"] = h5d["entry1/instrument/slits/third/vertical/gap"][:]
d["ss4vg"] = h5d["entry1/instrument/slits/fourth/vertical/gap"][:]
d["ss2hg"] = h5d["entry1/instrument/slits/second/horizontal/gap"][:]
d["ss3hg"] = h5d["entry1/instrument/slits/third/horizontal/gap"][:]
d["ss4hg"] = h5d["entry1/instrument/slits/fourth/horizontal/gap"][:]
d["sample_distance"] = h5d[
"entry1/instrument/parameters/sample_distance"
][:]
d["slit2_distance"] = h5d[
"entry1/instrument/parameters/slit2_distance"
][:]
d["slit3_distance"] = h5d[
"entry1/instrument/parameters/slit3_distance"
][:]
d["collimation_distance"] = d["slit3_distance"] - d["slit2_distance"]
try:
san = (
h5d["entry1/data/hmm"]
.attrs["axes"]
.decode("utf8")
.split(":")[0]
)
except AttributeError:
# the attribute could be a string already
san = str(h5d["entry1/data/hmm"].attrs["axes"]).split(":")[0]
finally:
d["scan_axis_name"] = san
d["scan_axis"] = h5d[f"entry1/data/{d['scan_axis_name']}"][:]
try:
d["start_time"] = h5d["entry1/instrument/detector/start_time"][:]
except KeyError:
# start times don't exist in this file
d["start_time"] = None
d["original_file_name"] = h5d["entry1/experiment/file_name"][:]
d["sample_name"] = h5d["entry1/sample/name"][:]
self.cat = d
def __getattr__(self, item):
try:
return self.cat[item]
except KeyError:
raise AttributeError
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.cat = state["cat"]
self.prefix = state["prefix"]
@property
def datafile_number(self):
return datafile_number(self.filename, prefix=self.prefix)
class SpatzCatalogue(Catalogue):
"""
Extract relevant parts of a NeXus file for reflectometry reduction
"""
def __init__(self, h5d):
"""
Extract relevant parts of a NeXus file for reflectometry reduction
Access information via dict access, e.g. cat['detector'].
Parameters
----------
h5d - HDF5 file handle
"""
super().__init__(h5d)
self.prefix = "SPZ"
d = self.cat
# grab chopper settings
master, slave, frequency, phase = self._chopper_values(h5d)
d["master"] = master
# slave == 2 --> chopper 2
# slave == 3 --> chopper 2B
# slave == 4 --> chopper 3
d["slave"] = slave
d["frequency"] = frequency
d["phase"] = phase
d["t_offset"] = None
if "t_offset" in h5d:
d["t_offset"] = h5d["entry1/instrument/parameters/t_offset"][:]
d["chopper2_distance"] = h5d["entry1/instrument/ch02_distance/pos"][:]
d["chopper2B_distance"] = h5d[
"entry1/instrument/parameters/ch02b_distance"
][:]
d["chopper3_distance"] = h5d[
"entry1/instrument/parameters/ch03_distance"
][:]
# collimation parameters
# first and second collimation slits
d["ss_coll1"] = h5d["entry1/instrument/slits/second/horizontal/gap"][:]
d["ss_coll2"] = h5d["entry1/instrument/slits/third/horizontal/gap"][:]
# sample omega, the nominal angle of incidence
d["omega"] = h5d["entry1/sample/som"][:]
d["som"] = d["omega"]
# two theta value for detector arm.
d["twotheta"] = h5d["entry1/instrument/detector/detrot"][:]
d["detrot"] = d["twotheta"]
d["dz"] = d["twotheta"]
###############################################
# detector longitudinal translation from sample
###############################################
d["dy"] = None
try:
d["dy"] = (
h5d["entry1/instrument/detector/detector_distance/pos"][:]
- d["sample_distance"]
)
except KeyError:
# entry was renamed after detector translation was added in 12268
pass
try:
d["dy"] = h5d[
"entry1/instrument/detector/longitudinal_translation"
][:]
except KeyError:
pass
if d["dy"] is None:
raise ValueError("SPZ: dy is not present in the dataset")
# logical size (mm) of 1 pixel in the scattering plane
try:
d["qz_pixel_size"] = h5d[
"entry1/instrument/parameters/qz_pixel_size"
][:]
except KeyError:
# older SPZ files didn't have qz_pixel_size
d["qz_pixel_size"] = np.array([0.326])
def _chopper_values(self, h5data):
"""
Obtains chopper settings from NeXUS file
Parameters
----------
h5data : HDF5 NeXUS file
datafile,
Returns
-------
master, slave, frequency, phase : float, float, float, float
"""
master = 1
slave = 2
d = self.cat
chopper1_speed = h5data["entry1/instrument/chopper/c01/spee"][:]
chopper2_speed = h5data["entry1/instrument/chopper/c02/spee"][:]
chopper2B_speed = h5data["entry1/instrument/chopper/c2b/spee"][:]
# chopper3_speed = h5data['entry1/instrument/chopper/c03/spee']
ch1phase = h5data["entry1/instrument/chopper/c01/spha"]
ch2phase = h5data["entry1/instrument/chopper/c02/spha"][:]
ch2Bphase = h5data["entry1/instrument/chopper/c2b/spha"][:]
# ch3phase = h5data['entry1/instrument/chopper/c03/spha']
if chopper1_speed[0] > 2:
master = 1
d["master_phase_offset"] = h5data[
"entry1/instrument/parameters/poff_c1_master"
][:]
if chopper2_speed[0] > 2:
slave = 2
else:
# chopper2B is slave
slave = 3
freq = chopper1_speed
phase = ch2phase - ch1phase
d["poff_c2_slave_1_master"] = h5data[
"entry1/instrument/parameters/poff_c2_slave_1_master"
][:]
d["poff_c2b_slave_1_master"] = h5data[
"entry1/instrument/parameters/poff_c2b_slave_1_master"
][:]
else:
master = 2
d["master_phase_offset"] = h5data[
"entry1/instrument/parameters/poff_c2_master"
][:]
d["poff_c2b_slave_2_master"] = h5data[
"entry1/instrument/parameters/poff_c2b_slave_2_master"
][:]
freq = chopper2_speed
# if slave == 3 refers to chopper 2B
assert (chopper2B_speed > 1).all()
slave = 3
phase = ch2Bphase - ch2phase
# SPZ offsets measured on 20200116
# with master = 1, slave = 2
# master_phase_offset = -25.90
# chopper2_phase_offset -0.22 degrees
return master, slave, freq, phase
class PlatypusCatalogue(Catalogue):
"""
Extract relevant parts of a NeXus file for reflectometry reduction
"""
def __init__(self, h5d):
"""
Extract relevant parts of a NeXus file for reflectometry reduction
Access information via dict access, e.g. cat['detector'].
Parameters
----------
h5d - HDF5 file handle
"""
super().__init__(h5d)
self.prefix = "PLP"
d = self.cat
d["ss1vg"] = h5d["entry1/instrument/slits/first/vertical/gap"][:]
d["ss1hg"] = h5d["entry1/instrument/slits/first/horizontal/gap"][:]
d["omega"] = h5d["entry1/instrument/parameters/omega"][:]
d["twotheta"] = h5d["entry1/instrument/parameters/twotheta"][:]
d["sth"] = h5d["entry1/sample/sth"][:]
d["mode"] = h5d["entry1/instrument/parameters/mode"][0].decode()
master, slave, frequency, phase = self._chopper_values(h5d)
d["master"] = master
d["slave"] = slave
d["frequency"] = frequency
d["phase"] = phase
d["chopper2_distance"] = h5d[
"entry1/instrument/parameters/chopper2_distance"
][:]
d["chopper3_distance"] = h5d[
"entry1/instrument/parameters/chopper3_distance"
][:]
d["chopper4_distance"] = h5d[
"entry1/instrument/parameters/chopper4_distance"
][:]
d["master_phase_offset"] = h5d[
"entry1/instrument/parameters/chopper1_phase_offset"
][:]
d["chopper2_phase_offset"] = h5d[
"entry1/instrument/parameters/chopper2_phase_offset"
][:]
d["chopper3_phase_offset"] = h5d[
"entry1/instrument/parameters/chopper3_phase_offset"
][:]
d["chopper4_phase_offset"] = h5d[
"entry1/instrument/parameters/chopper4_phase_offset"
][:]
# time offset for choppers if you're using a signal generator to
# delay T0
d["t_offset"] = None
if "t_offset" in h5d:
d["t_offset"] = h5d["entry1/instrument/parameters/t_offset"][:]
d["guide1_distance"] = h5d[
"entry1/instrument/parameters/guide1_distance"
][:]
d["guide2_distance"] = h5d[
"entry1/instrument/parameters/guide2_distance"
][:]
# collimation parameters
# first and second collimation slits
d["ss_coll1"] = h5d["entry1/instrument/slits/second/vertical/gap"][:]
d["ss_coll2"] = h5d["entry1/instrument/slits/third/vertical/gap"][:]
d["dy"] = h5d["entry1/instrument/detector/longitudinal_translation"][:]
d["dz"] = h5d["entry1/instrument/detector/vertical_translation"][:]
# pixel size (mm) in scattering plane. y_pixels_per_mm is incorrect,
# it should really be mm_per_y_pixel, but let's stick with the
# historical error
try:
d["qz_pixel_size"] = h5d[
"entry1/instrument/parameters/y_pixels_per_mm"
][:]
except KeyError:
# older PLP files didn't have y_pixels_per_mm, so use built in
# value
# warnings.warn(
# "Setting default pixel size to 1.177", RuntimeWarning
# )
d["qz_pixel_size"] = np.array([1.177])
def _chopper_values(self, h5data):
"""
Obtains chopper settings from NeXUS file
Parameters
----------
h5data : HDF5 NeXUS file
datafile,
Returns
-------
master, slave, frequency, phase : float, float, float, float
"""
chopper1_speed = h5data["entry1/instrument/disk_chopper/ch1speed"]
chopper2_speed = h5data["entry1/instrument/disk_chopper/ch2speed"]
chopper3_speed = h5data["entry1/instrument/disk_chopper/ch3speed"]
chopper4_speed = h5data["entry1/instrument/disk_chopper/ch4speed"]
ch2phase = h5data["entry1/instrument/disk_chopper/ch2phase"]
ch3phase = h5data["entry1/instrument/disk_chopper/ch3phase"]
ch4phase = h5data["entry1/instrument/disk_chopper/ch4phase"]
m = "entry1/instrument/parameters/master"
s = "entry1/instrument/parameters/slave"
if (
s in h5data
and m in h5data
and h5data[m][0] in [1, 2, 3, 4]
and h5data[s][0] in [1, 2, 3, 4]
):
# master and slave parameters have to be set correctly in order
# to use them.
master = h5data["entry1/instrument/parameters/master"][0]
slave = h5data["entry1/instrument/parameters/slave"][0]
else:
master = 1
if abs(chopper2_speed[0]) > 10:
slave = 2
elif abs(chopper3_speed[0]) > 10:
slave = 3
else:
slave = 4
speeds = np.array(
[chopper1_speed, chopper2_speed, chopper3_speed, chopper4_speed]
)
phases = np.array(
[np.zeros_like(ch2phase), ch2phase, ch3phase, ch4phase]
)
return master, slave, speeds[0] / 60.0, phases[slave - 1]
class PolarisedCatalogue(PlatypusCatalogue):
"""
Extract relevant parts of a polarised PLATYPUS
NeXus file for reflectometry reduction.
Access information via dict access, e.g. cat['pol_flip_freq'].
Parameters
----------
h5d - HDF5 file handle
"""
def __init__(self, h5d):
super().__init__(h5d)
# Is there a magnet?
self.is_magnet = False
# Is there a cryocooler?
self.is_cryo = False
# Is there a power supply?
self.is_power_supply = False
d = self.cat
self._polariser_flippers(d, h5d)
self._analyser_flippers(d, h5d)
self._check_sample_environments(d, h5d)
def _polariser_flippers(self, d, h5d):
d["pol_flip_freq"] = h5d[
"entry1/instrument/polarizer_flipper/flip_frequency"
][0]
d["pol_flip_current"] = h5d[
"entry1/instrument/polarizer_flipper/flip_current"
][0]
d["pol_flip_voltage"] = h5d[
"entry1/instrument/polarizer_flipper/flip_voltage"
][0]
d["pol_flip_status"] = h5d[
"entry1/instrument/polarizer_flipper/flip_on"
][0]
d["pol_guide_current"] = h5d[
"entry1/instrument/polarizer_flipper/guide_current"
][0]
def _analyser_flippers(self, d, h5d):
d["anal_flip_freq"] = h5d[
"entry1/instrument/analyzer_flipper/flip_frequency"
][0]
d["anal_flip_current"] = h5d[
"entry1/instrument/analyzer_flipper/flip_current"
][0]
d["anal_flip_voltage"] = h5d[
"entry1/instrument/analyzer_flipper/flip_voltage"
][0]
d["anal_flip_status"] = h5d[
"entry1/instrument/analyzer_flipper/flip_on"
][0]
d["anal_guide_current"] = h5d[
"entry1/instrument/analyzer_flipper/guide_current"
][0]
d["anal_base"] = h5d["entry1/instrument/polarizer/anal_base"][0]
d["anal_dist"] = h5d["entry1/instrument/polarizer/anal_distance"][0]
d["rotation"] = h5d["entry1/instrument/polarizer/rotation"][0]
d["z_trans"] = h5d["entry1/instrument/polarizer/z_translation"][0]
def _check_sample_environments(self, d, h5d):
try:
# Try adding temperature sensor values to dict
d["temp_sensorA"] = h5d["entry1/sample/tc1/sensor/sensorValueA"][0]
d["temp_sensorB"] = h5d["entry1/sample/tc1/sensor/sensorValueB"][0]
d["temp_sensorC"] = h5d["entry1/sample/tc1/sensor/sensorValueC"][0]
d["temp_sensorD"] = h5d["entry1/sample/tc1/sensor/sensorValueD"][0]
d["temp_setpt1"] = h5d["entry1/sample/tc1/sensor/setpoint1"][0]
d["temp_setpt2"] = h5d["entry1/sample/tc1/sensor/setpoint2"][0]
self.is_cryo = True
except KeyError:
# Temperature sensor not used in measurement - set to None
d["temp_sensorA"] = None
d["temp_sensorB"] = None
d["temp_sensorC"] = None
d["temp_sensorD"] = None
d["temp_setpt1"] = None
d["temp_setpt2"] = None
self.is_cryo = False
try:
# Try adding voltage supply to dict
d["pow_supply_volts"] = h5d["entry1/sample/power_supply/voltage"][
0
]
d["pow_supply_current"] = h5d["entry1/sample/power_supply/amps"][0]
d["pow_supply_relay"] = h5d["entry1/sample/power_supply/relay"][0]
self.is_power_supply = True
except KeyError:
# Voltage supply not used in measurement
d["pow_supply_volts"] = None
d["pow_supply_current"] = None
d["pow_supply_relay"] = None
self.is_power_supply = False
try:
# Try adding magnetic field sensor to dict
d["magnet_current_set"] = h5d[
"entry1/sample/ma1/sensor/desired_current"
][0]
d["magnet_set_field"] = h5d[
"entry1/sample/ma1/sensor/desired_field"
][0]
d["magnet_measured_field"] = h5d[
"entry1/sample/ma1/sensor/measured_field"
][0]
d["magnet_output_current"] = h5d[
"entry1/sample/ma1/sensor/nominal_outp_current"
][0]
self.is_magnet = True
except KeyError:
# Magnetic field sensor not used in measurement - set to None
d["magnet_current_set"] = None
d["magnet_set_field"] = None
d["magnet_measured_field"] = None
d["magnet_output_current"] = None
self.is_magnet = False
[docs]class SpinSet(object):
"""
Describes a set of spin-channels at a given angle of incidence,
and can process beams with individual reduction options.
Parameters
----------
down_down : str or refnx.reduce.PlatypusNexus
Input filename or PlatypusNexus object for the R-- spin
channel.
up_up : str or refnx.reduce.PlatypusNexus
Input filename or PlatypusNexus object for the R++ spin
channel.
down_up : str or refnx.reduce.PlatypusNexus, optional
Input filename or PlatypusNexus object for the R-+ spin
channel.
up_down : str or refnx.reduce.PlatypusNexus, optional
Input filename or PlatypusNexus object for the R+- spin
channel.
Attributes
----------
channels : dict
Dictionary of each measured spin channel
"dd" : refnx.reduce.PlatypusNexus (R--)
"du" : refnx.reduce.PlatypusNexus or None (R-+)
"ud" : refnx.reduce.PlatypusNexus or None (R+-)
"uu" : refnx.reduce.PlatypusNexus (R++)
sc_opts : dict of refnx.reduce.ReductionOptions
Reduction options for each spin channel ("dd", "du", "ud", "uu)
dd : refnx.reduce.PlatypusNexus
R-- spin channel
uu : refnx.reduce.PlatypusNexus
R++ spin channel
du : refnx.reduce.PlatypusNexus or None
R-+ spin channel
ud : refnx.reduce.PlatypusNexus or None
R+- spin channel
Notes
-----
Each of the `ReductionOptions` specified in `dd_opts,` etc, is used
to specify the options used to reduce each spin channel. The following
reduction options must be consistent and identical across all
spin channels so as to maintain the same wavelength axis across
the datasets:
lo_wavelength : key in refnx.reduce.ReductionOptions
hi_wavelength : key in refnx.reduce.ReductionOptions
rebin_percent : key in refnx.reduce.ReductionOptions
wavelength_bins : key in refnx.reduce.ReductionOptions
"""
def __init__(self, down_down, up_up, down_up=None, up_down=None):
# Currently only Platypus has polarisation elements
self.reflect_klass = PlatypusNexus
# initialise spin channels
self.channels = {
"dd": None,
"du": None,
"ud": None,
"uu": None,
}
self.sc_opts = {
"dd": {},
"du": {},
"ud": {},
"uu": {},
}
# initialise reduction options for each spin channel
reduction_options = ReductionOptions(
lo_wavelength=2.5,
hi_wavelength=12.5,
rebin_percent=3,
)
# Put inputs into a dictionary to iterate over
input_params = {
"dd": down_down,
"du": down_up,
"ud": up_down,
"uu": up_up,
}
_spin_channels = {
"dd": SpinChannel.DOWN_DOWN,
"du": SpinChannel.DOWN_UP,
"ud": SpinChannel.UP_DOWN,
"uu": SpinChannel.UP_UP,
}
# Load the files and check spin channels and flipper config
for sc, input_param in input_params.items():
if input_param is None:
# Spin channel not measured
continue
elif isinstance(input_param, self.reflect_klass):
# Spin channel inputted as PlatypusNexus object
channel = input_param
else:
# Spin channel inputted as file string
channel = self.reflect_klass(input_param)
# print(sc, input_param, channel.spin_state, _spin_channels[sc])
if channel.spin_state is _spin_channels[sc]:
self.channels[sc] = channel
self.sc_opts[sc] = reduction_options.copy()
else:
RuntimeError(
f"Supplied spin channel {_spin_channels[sc]} does not match flipper status"
)
@property
def dd(self):
return self.channels["dd"]
@property
def du(self):
return self.channels["du"]
@property
def ud(self):
return self.channels["ud"]
@property
def uu(self):
return self.channels["uu"]
@property
def spin_channels(self):
"""
Gives a quick indication of what spin channels were measured and
are present in this SpinSet.
Returns
-------
list of refnx.reduce.SpinChannel Enum values or None, depending on
if the spin channel was measured.
"""
return [
(
self.channels[sc].spin_state.value
if self.channels[sc] is not None
else None
)
for sc in self.channels
]
[docs] def process(self, **reduction_options):
"""
Process beams in SpinSet.
If reduction_options is None, the reduction options for each spin
channel are specified by the dictionary of spin channel reduction
options `SpinSet.sc_opts` which are initialised to the
standard options when constructing the object.
If you wish to have unique reduction options for each spin channel,
you need to ensure that the wavelength bins between each spin channel
remain identical, otherwise a ValueError will be raised.
If `reduction_options`
is not None, then SpinSet.process() will use these options for all
spin channels.
Parameters
----------
reduction_options : dict, optional
A single dict of options used to process all spectra.
"""
if reduction_options is not None:
for sc in self.sc_opts:
self.sc_opts[sc].update(reduction_options)
# Check specific reduction options are the same across all
# spin channels to ensure the same wavelength axis
_wavelength_keys = [
"lo_wavelength",
"hi_wavelength",
"rebin_percent",
"wavelength_bins",
]
# For each spin channel, if it is not empty (i.e. channel not
# measured) then check that its reduction options are the same as down_down
# reduction options for the keys in _wavelength_keys.
for sc in self.sc_opts:
if self.sc_opts[sc]:
if not general._dict_compare_keys(
self.sc_opts["dd"], self.sc_opts[sc], *_wavelength_keys
):
raise ValueError(
"Reduction options `lo_wavelength`, `hi_wavelength`,"
" `rebin_percent`, and `wavelength_bins` must be"
"identical across spin channels to preserve a common"
"wavelength axis."
)
for sc, channel in self.channels.items():
if channel is None:
continue
else:
channel.process(**self.sc_opts[sc])
[docs] def plot_spectra(self, **kwargs):
"""
Plots the processed spectrums for each spin state in the SpinSet
Requires matplotlib to be installed
"""
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.set(xlabel="Wavelength ($\\AA$)", ylabel="Intensity (a.u.)")
for spinch in [self.dd, self.du, self.ud, self.uu]:
if spinch is None:
continue
x = spinch.processed_spectrum["m_lambda"][0]
y = spinch.processed_spectrum["m_spec"][0]
yerr = spinch.processed_spectrum["m_spec_sd"][0]
ax.errorbar(x, y, yerr, label=spinch.cat.sample_name)
return fig, ax
[docs]def basename_datafile(pth):
"""
Given a NeXUS path return the basename minus the file extension.
Parameters
----------
pth : str
Returns
-------
basename : str
Examples
--------
>>> basename_datafile('a/b/c.nx.hdf')
'c'
"""
basename = Path(pth).name
return basename.split(".nx.hdf")[0]
[docs]def number_datafile(run_number, prefix="PLP"):
"""
Given a run number figure out what the file name is.
Given a file name, return the filename with the .nx.hdf extension
Parameters
----------
run_number : int or str
prefix : str, optional
The instrument prefix. Only used if `run_number` is an int
Returns
-------
file_name : str
Examples
--------
>>> number_datafile(708)
'PLP0000708.nx.hdf'
>>> number_datafile(708, prefix='QKK')
'QKK0000708.nx.hdf'
>>> number_datafile('PLP0000708.nx.hdf')
'PLP0000708.nx.hdf'
"""
try:
num = abs(int(run_number))
# you got given a run number
return f"{prefix}{num:07d}.nx.hdf"
except ValueError:
# you may have been given full filename
if run_number.endswith(".nx.hdf"):
return run_number
else:
return run_number + ".nx.hdf"
[docs]def datafile_number(fname, prefix="PLP"):
"""
From a filename figure out what the run number was
Parameters
----------
fname : str
The filename to be processed
Returns
-------
run_number : int
The run number
Examples
--------
>>> datafile_number('PLP0000708.nx.hdf')
708
"""
rstr = ".*" + prefix + "([0-9]{7}).nx.hdf"
regex = re.compile(rstr)
_fname = Path(fname).name
r = regex.search(_fname)
if r:
return int(r.groups()[0])
return None
[docs]class ReductionOptions(dict):
"""
dict specifying the options for processing a Reflectometry dataset.
Parameters
----------
h5norm : str or HDF5 NeXus file
If a str then `h5norm` is a path to the floodfield data, otherwise
it is a hdf5 file handle containing the floodfield data.
lo_wavelength : float
The low wavelength cutoff for the rebinned data (A).
hi_wavelength : float
The high wavelength cutoff for the rebinned data (A).
background : bool
Should a background subtraction be carried out?
direct : bool
Is it a direct beam you measured? This is so a gravity correction
can be applied (if the instrument needs one).
omega : float
Expected angle of incidence of beam. If this is None, then the
rough angle of incidence is obtained from the NeXus file.
twotheta : float
Expected two theta value of specular beam. If this is None then
the rough angle of incidence is obtained from the NeXus file.
rebin_percent : float
Specifies the rebinning percentage for the spectrum. If
`rebin_percent is None`, then no rebinning is done.
wavelength_bins : array_like
The wavelength bins for rebinning. If `wavelength_bins is not
None` then the `rebin_percent` parameter is ignored.
normalise : bool
Normalise by the monitor counts.
integrate : int
Specifies which scanpoints to use.
- integrate == -1
the spectrum is integrated over all the scanpoints.
- integrate >= 0
the individual spectra are calculated individually.
If `eventmode is not None`, or `event_filter is not None` then
integrate specifies which scanpoint to examine.
eventmode : None or array_like
If eventmode is `None` then the integrated detector image is used.
If eventmode is an array then the array specifies the integration
times (in seconds) for the detector image, e.g. [0, 20, 30] would
result in two spectra. The first would contain data for 0 s to 20s,
the second would contain data for 20 s to 30 s. This option can
only be used when `integrate >= -1`.
If eventmode has zero length (e.g. []), then a single time interval
for the entire acquisition is used, [0, acquisition_time]. This
would source the image from the eventmode file, rather than the
NeXUS file. The two approaches will probably not give
identical results, because the eventmode method adjusts the total
acquisition time and beam monitor counts to the frame number of the
last event detected (which may be quite different if the count rate
is very low). This parameter is disregarded if `event_filter` is
provided.
event_folder : {None, str, Path}
Specifies the path for the eventmode data. If
`event_folder is None` then the eventmode data is assumed to reside
in the same directory as the NeXUS file. If event_folder is a
string or Path, then the string specifies the path to the eventmode
data.
peak_pos : -1, None, or (float, float)
Options for finding specular peak position and peak standard
deviation.
- -1
use `manual_beam_find`.
- None
use the automatic beam finder, falling back to
`manual_beam_find` if it's provided.
- (float, float)
specify the peak and peak standard deviation. The peak standard
deviation is used to calculate the width of the foreground region,
unless `lopx_hipx` is specified.
peak_pos_tol : None or (float, float)
Convergence tolerance for the beam position and width to be
accepted from successive beam-finder calculations; see the
`tol` parameter in the `find_specular_ridge` function.
background_mask : array_like
An array of bool that specifies which y-pixels to use for
background subtraction. Should be the same length as the number of
y pixels in the detector image. Otherwise an automatic mask is
applied (if background is True).
normalise_bins : bool
Divides the intensity in each wavelength bin by the width of the
bin. This allows one to compare spectra even if they were processed
with different rebin percentages.
manual_beam_find : callable, optional
A function which allows the location of the specular ridge to be
determined. Has the signature `f(detector, detector_err, name)`
where `detector` and `detector_err` is the detector image and its
uncertainty, and name is a `str` specifying the name of
the dataset.
`detector` and `detector_err` have shape (n, t, {x, y}) where `n`
is the number of detector images, `t` is the number of
time-of-flight bins and `x` or `y` is the number of x or y pixels.
The function should return a tuple,
`(centre, centre_sd, lopx, hipx, background_pixels)`. `centre`,
`centre_sd`, `lopx`, `hipx` should be arrays of shape `(n, )`,
specifying the beam centre, beam width (standard deviation), lowest
pixel of foreground region, highest pixel of foreground region.
`background_pixels` is a list of length `n`. Each of the entries
should contain arrays of pixel numbers that specify the background
region for each of the detector images.
event_filter : callable, optional
A function, that processes the event stream, returning a `detector`
array, and a `frame_count` array. `detector` has shape
`(N, T, Y, X)`, where `N` is the number of detector images, `T` is
the number of time bins (`len(t_bins)`), etc. `frame_count` has
shape `(N,)` and contains the number of frames for each of the
detector images. The frame_count is used to determine what fraction
of the overall monitor counts should be ascribed to each detector
image (by dividing by the total number of frames). The function has
signature:
detector, frame_count = event_filter(loaded_events,
t_bins=None,
y_bins=None,
x_bins=None)
`loaded_events` is a 4-tuple of numpy arrays:
`(f_events, t_events, y_events, x_events)`, where `f_events`
contains the frame number for each neutron, landing at position
`x_events, y_events` on the detector, with time-of-flight
`t_events`.
detailed_kernel : bool
whether a detailed resolution kernel is calculating during reduction.
lopx_hipx : tuple
A two-tuple specifying the foreground region. For example, use of
(120, 123) would use pixels 120, 121, 122, 123.
"""
def __init__(
self,
h5norm=None,
lo_wavelength=2.5,
hi_wavelength=19.0,
background=True,
direct=False,
omega=None,
twotheta=None,
rebin_percent=1.0,
wavelength_bins=None,
normalise=True,
integrate=-1,
eventmode=None,
event_folder=None,
peak_pos=None,
peak_pos_tol=None,
background_mask=None,
normalise_bins=True,
manual_beam_find=None,
event_filter=None,
detailed_kernel=False,
lopx_hipx=None,
):
super().__init__()
self["h5norm"] = h5norm
self["lo_wavelength"] = lo_wavelength
self["hi_wavelength"] = hi_wavelength
self["background"] = background
self["direct"] = direct
self["omega"] = omega
self["twotheta"] = twotheta
self["rebin_percent"] = rebin_percent
self["wavelength_bins"] = wavelength_bins
self["normalise"] = normalise
self["integrate"] = integrate
self["eventmode"] = eventmode
self["event_folder"] = event_folder
self["peak_pos"] = peak_pos
self["peak_pos_tol"] = peak_pos_tol
self["background_mask"] = background_mask
self["normalise_bins"] = normalise_bins
self["manual_beam_find"] = manual_beam_find
self["event_filter"] = event_filter
self["detailed_kernel"] = detailed_kernel
self["lopx_hipx"] = lopx_hipx
class ReflectNexus:
def __init__(self):
self.cat = None
self.processed_spectrum = dict()
# _arguments is a dict that contains all the parameters used to call
# `process`. If the arguments don't change then you shouldn't need to
# call process again, thereby saving time.
self._arguments = {}
self.prefix = None
def __getattr__(self, item):
if item in self.__dict__:
return self.__dict__[item]
elif item in self.processed_spectrum:
return self.processed_spectrum[item]
else:
raise AttributeError
def __getstate__(self):
dct = self.__dict__
dct["_arguments"].pop("manual_beam_find")
return dct
def __setstate__(self, state):
self.__dict__.update(state)
def _short_circuit_process(self, _arguments):
"""
Returns the truth that two sets of arguments from successive calls to
the `process` method are the same.
Parameters
----------
_arguments : dict
arguments passed to the `process` method
Returns
-------
val : bool
Truth that __arguments is the same as self.__arguments
"""
return _dict_compare(_arguments, self._arguments)
def write_spectrum_dat(self, f, scanpoint=0):
"""
This method writes a dat representation of the corrected spectrum to
file.
Parameters
----------
f : file-like or str
The file to write the spectrum to, or a str that specifies the file
name
scanpoint : int
Which scanpoint to write
Returns
-------
processed : bool
If the file hasn't been processed then the `processed is False` and
vice versa
"""
if self.processed_spectrum is None:
return False
m_lambda = self.processed_spectrum["m_lambda"][scanpoint]
m_spec = self.processed_spectrum["m_spec"][scanpoint]
m_spec_sd = self.processed_spectrum["m_spec_sd"][scanpoint]
m_lambda_fwhm = self.processed_spectrum["m_lambda_fwhm"][scanpoint]
stacked_data = np.c_[m_lambda, m_spec, m_spec_sd, m_lambda_fwhm]
np.savetxt(f, stacked_data, delimiter="\t")
return True
def write_spectrum_xml(self, f, scanpoint=0):
"""
This method writes an XML representation of the corrected spectrum to
file.
Parameters
----------
f : file-like or str
The file to write the spectrum to, or a str that specifies the file
name
scanpoint : int
Which scanpoint to write
"""
if self.processed_spectrum is None:
return
s = string.Template(spectrum_template)
d = dict()
d["title"] = self.cat.sample_name
d["time"] = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
m_lambda = self.processed_spectrum["m_lambda"]
m_spec = self.processed_spectrum["m_spec"]
m_spec_sd = self.processed_spectrum["m_spec_sd"]
m_lambda_fwhm = self.processed_spectrum["m_lambda_fwhm"]
# sort the data
sorted = np.argsort(self.m_lambda[0])
r = m_spec[:, sorted]
lmda = m_lambda[:, sorted]
dlmda = m_lambda_fwhm[:, sorted]
dr = m_spec_sd[:, sorted]
d["n_spectra"] = self.processed_spectrum["n_spectra"]
d["runnumber"] = f"{self.prefix}{self.cat.datafile_number:07d}"
d["r"] = repr(r[scanpoint].tolist()).strip(",[]")
d["dr"] = repr(dr[scanpoint].tolist()).strip(",[]")
d["lmda"] = repr(lmda[scanpoint].tolist()).strip(",[]")
d["dlmda"] = repr(dlmda[scanpoint].tolist()).strip(",[]")
thefile = s.safe_substitute(d)
with possibly_open_file(f, "wb") as g:
if "b" in g.mode:
thefile = thefile.encode("utf-8")
g.write(thefile)
g.truncate()
return True
def plot(self, point=0, fig=None):
"""
Plot a processed spectrum.
Requires matplotlib be installed.
Parameters
----------
point: int or sequence, optional
The spectrum number to be plotted. By default the first spectrum
will be plotted. Pass `-1` to plot all spectra at once.
fig: Figure instance, optional
If `fig` is not supplied then a new figure is created. Otherwise
the graph is created on the current axes on the supplied figure.
Returns
-------
fig, ax : :class:`matplotlib.Figure`, :class:`matplotlib.Axes`
`matplotlib` figure and axes objects.
"""
lam, spec, spec_sd, _ = self.spectrum
import matplotlib.pyplot as plt
if fig is None:
fig = plt.figure()
ax = fig.add_subplot(111)
else:
ax = fig.gca()
if hasattr(point, "len"):
for p in point:
ax.plot(lam[p], spec[p])
elif point == -1:
for p in range(len(lam)):
ax.plot(lam[p], spec[p])
else:
ax.plot(lam[point], spec[point])
return fig, ax
@property
def spectrum(self):
return (
self.processed_spectrum["m_lambda"],
self.processed_spectrum["m_spec"],
self.processed_spectrum["m_spec_sd"],
self.processed_spectrum["m_lambda_fwhm"],
)
def detector_average_unwanted_direction(self, detector):
"""
Averages over non-collimated beam direction
"""
raise NotImplementedError()
def create_detector_norm(self, h5norm):
raise NotImplementedError()
def beam_divergence(self, scanpoint):
# works out the beam divergence for a given scan point
cat = self.cat
return general.div(
cat.ss_coll1[scanpoint],
cat.ss_coll2[scanpoint],
cat.collimation_distance[0],
)[0]
def estimated_beam_width_at_detector(self, scanpoint):
raise NotImplementedError()
def phase_angle(self, scanpoint):
"""
Calculates the phase angle for a given scanpoint
Parameters
----------
scanpoint : int
The scanpoint you're interested in
Returns
-------
phase_angle, master_opening : float
The phase angle and angular opening of the master chopper in
degrees.
"""
raise NotImplementedError()
def time_offset(
self,
master_phase_offset,
master_opening,
freq,
phase_angle,
z0,
flight_distance,
tof_hist,
t_offset=None,
):
raise NotImplementedError()
def correct_for_gravity(
self, detector, detector_sd, m_lambda, lo_wavelength, hi_wavelength
):
# default implementation is no gravity correction
return detector, detector_sd, None
def process(self, **reduction_options):
r"""
Processes the ReflectNexus object to produce a time of flight spectrum.
The processed spectrum is stored in the `processed_spectrum` attribute.
The specular spectrum is also returned from this function.
Parameters
----------
h5norm : str or HDF5 NeXus file
If a str then `h5norm` is a path to the floodfield data, otherwise
it is a hdf5 file handle containing the floodfield data.
lo_wavelength : float
The low wavelength cutoff for the rebinned data (A).
hi_wavelength : float
The high wavelength cutoff for the rebinned data (A).
background : bool
Should a background subtraction be carried out?
direct : bool
Is it a direct beam you measured? This is so a gravity correction
can be applied (if the instrument needs one).
omega : float
Expected angle of incidence of beam. If this is None, then the
rough angle of incidence is obtained from the NeXus file.
twotheta : float
Expected two theta value of specular beam. If this is None then
the rough angle of incidence is obtained from the NeXus file.
rebin_percent : float
Specifies the rebinning percentage for the spectrum. If
`rebin_percent is None`, then no rebinning is done.
wavelength_bins : array_like
The wavelength bins for rebinning. If `wavelength_bins is not
None` then the `rebin_percent` parameter is ignored.
normalise : bool
Normalise by the monitor counts.
integrate : int
Specifies which scanpoints to use.
- integrate == -1
the spectrum is integrated over all the scanpoints.
- integrate >= 0
the individual spectra are calculated individually.
If `eventmode is not None`, or `event_filter is not None` then
integrate specifies which scanpoint to examine.
eventmode : None or array_like
If eventmode is `None` then the integrated detector image is used.
If eventmode is an array then the array specifies the integration
times (in seconds) for the detector image, e.g. [0, 20, 30] would
result in two spectra. The first would contain data for 0 s to 20s,
the second would contain data for 20 s to 30 s. This option can
only be used when `integrate >= -1`.
If eventmode has zero length (e.g. []), then a single time interval
for the entire acquisition is used, [0, acquisition_time]. This
would source the image from the eventmode file, rather than the
NeXUS file. The two approaches will probably not give
identical results, because the eventmode method adjusts the total
acquisition time and beam monitor counts to the frame number of the
last event detected (which may be quite different if the count rate
is very low). This parameter is disregarded if `event_filter` is
provided.
event_folder : None or str
Specifies the path for the eventmode data. If
`event_folder is None` then the eventmode data is assumed to reside
in the same directory as the NeXUS file. If event_folder is a
string, then the string specifies the path to the eventmode data.
peak_pos : -1, None, or (float, float)
Options for finding specular peak position and peak standard
deviation.
- -1
use `manual_beam_find`.
- None
use the automatic beam finder, falling back to
`manual_beam_find` if it's provided.
- (float, float)
specify the peak and peak standard deviation. The peak standard
deviation is used to calculate the width of the foreground
region, unless `lopx_hipx` is specified.
peak_pos_tol : (float, float) or None
Convergence tolerance for the beam position and width to be
accepted from successive beam-finder calculations; see the
`tol` parameter in the `find_specular_ridge` function.
background_mask : array_like
An array of bool that specifies which y-pixels to use for
background subtraction. Should be the same length as the number of
y pixels in the detector image. Otherwise an automatic mask is
applied (if background is True).
normalise_bins : bool
Divides the intensity in each wavelength bin by the width of the
bin. This allows one to compare spectra even if they were processed
with different rebin percentages.
manual_beam_find : callable, optional
A function which allows the location of the specular ridge to be
determined. Has the signature `f(detector, detector_err, name)`
where `detector` and `detector_err` is the detector image and its
uncertainty, and name is a `str` specifying the name of
the dataset.
`detector` and `detector_err` have shape (n, t, {x, y}) where `n`
is the number of detector images, `t` is the number of
time-of-flight bins and `x` or `y` is the number of x or y pixels.
The function should return a tuple,
`(centre, centre_sd, lopx, hipx, background_pixels)`. `centre`,
`centre_sd`, `lopx`, `hipx` should be arrays of shape `(n, )`,
specifying the beam centre, beam width (standard deviation), lowest
pixel of foreground region, highest pixel of foreground region.
`background_pixels` is a list of length `n`. Each of the entries
should contain arrays of pixel numbers that specify the background
region for each of the detector images.
event_filter : callable, optional
A function, that processes the event stream, returning a `detector`
array, and a `frame_count` array. `detector` has shape
`(N, T, Y, X)`, where `N` is the number of detector images, `T` is
the number of time bins (`len(t_bins)`), etc. `frame_count` has
shape `(N,)` and contains the number of frames for each of the
detector images. The frame_count is used to determine what fraction
of the overall monitor counts should be ascribed to each detector
image (by dividing by the total number of frames). The function has
signature:
detector, frame_count = event_filter(loaded_events,
t_bins=None,
y_bins=None,
x_bins=None)
`loaded_events` is a 4-tuple of numpy arrays:
`(f_events, t_events, y_events, x_events)`, where `f_events`
contains the frame number for each neutron, landing at position
`x_events, y_events` on the detector, with time-of-flight
`t_events`.
detailed_kernel : bool
whether a detailed resolution kernel is calculating during reduction.
lopx_hipx : tuple
A two-tuple specifying the foreground region for integration of the
specular ridge. For example, use of (120, 123) would sum pixels
120, 121, 122, 123.
If specified this keyword overrides the specular width calculated
from the peak standard deviation calculated (or specified) in
`peak_pos`. The peak centre must lie between ``lopx`` and ``hipx``.
Notes
-----
After processing this object contains the following attributes:
- path - path to the data file
- datafilename - name of the datafile
- datafile_number - datafile number.
- m_topandtail - the corrected 2D detector image,
(n_spectra, TOF, {X, Y})
- m_topandtail_sd - corresponding standard deviations
- n_spectra - number of spectra in processed data
- bm1_counts - beam montor counts, (n_spectra,)
- m_spec - specular intensity, (n_spectra, TOF)
- m_spec_sd - corresponding standard deviations
- m_beampos - beam_centre for each spectrum, (n_spectra, )
- m_lambda - wavelengths for each spectrum, (n_spectra, TOF)
- m_lambda_fwhm - corresponding FWHM of wavelength distribution
- m_lambda_hist - wavelength bins for each spectrum,
(n_spectra, TOF + 1)
- m_spec_tof - TOF for each wavelength bin, (n_spectra, TOF)
- mode - the experimental mode, e.g. FOC/MT/POL/POLANAL/SB/DB
- detector_z - detector height or angle, (n_spectra, )
- detector_y - sample-detector distance, (n_spectra, )
- domega - collimation uncertainty
- lopx - lowest extent of specular beam (in pixels), (n_spectra, )
- hipx - highest extent of specular beam (in pixels), (n_spectra, )
- reduction_options - dict of options used to process the spectra
Returns
-------
m_lambda, m_spec, m_spec_sd: np.ndarray
Arrays containing the wavelength, specular intensity as a function
of wavelength, standard deviation of specular intensity
"""
options = ReductionOptions()
options.update(reduction_options)
h5norm = options["h5norm"]
lo_wavelength = options["lo_wavelength"]
hi_wavelength = options["hi_wavelength"]
background = options["background"]
direct = options["direct"]
omega = options["omega"]
twotheta = options["twotheta"]
rebin_percent = options["rebin_percent"]
wavelength_bins = options["wavelength_bins"]
normalise = options["normalise"]
integrate = options["integrate"]
eventmode = options["eventmode"]
event_folder = options["event_folder"]
peak_pos = options["peak_pos"]
peak_pos_tol = options["peak_pos_tol"]
background_mask = options["background_mask"]
normalise_bins = options["normalise_bins"]
manual_beam_find = options["manual_beam_find"]
event_filter = options["event_filter"]
lopx_hipx = options["lopx_hipx"]
# it can be advantageous to save processing time if the arguments
# haven't changed
# if you've already processed, then you may not need to process again
if self.processed_spectrum and self._short_circuit_process(options):
return (
self.processed_spectrum["m_lambda"],
self.processed_spectrum["m_spec"],
self.processed_spectrum["m_spec_sd"],
)
else:
self._arguments = options
cat = self.cat
scanpoint = 0
# beam monitor counts for normalising data
bm1_counts = cat.bm1_counts.astype("float64")
# TOF bins
TOF = cat.t_bins.astype("float64")
# This section controls how multiple detector images are handled.
# We want event streaming.
if eventmode is not None or event_filter is not None:
scanpoint = integrate
if integrate == -1:
scanpoint = 0
output = self.process_event_stream(
scanpoint=scanpoint,
frame_bins=eventmode,
event_folder=event_folder,
event_filter=event_filter,
)
detector, frame_count, bm1_counts = output
start_time = np.zeros(np.size(detector, 0))
if cat.start_time is not None:
start_time += cat.start_time[scanpoint]
start_time[1:] += (
np.cumsum(frame_count)[:-1] / cat.frequency[scanpoint]
)
else:
# we don't want detector streaming
detector = cat.detector
scanpoint = 0
# integrate over all spectra
if integrate == -1:
detector = np.sum(detector, 0)[np.newaxis,]
bm1_counts[:] = np.sum(bm1_counts)
start_time = np.zeros(np.size(detector, 0))
if cat.start_time is not None:
for idx in range(start_time.size):
start_time[idx] = cat.start_time[idx]
n_spectra = np.size(detector, 0)
# Up until this point detector.shape=(N, T, Y, X)
# average to (N, T, Y) - platypus or (N, T, X) - spatz
detector = self.detector_average_unwanted_direction(detector)
# calculate the counting uncertainties
detector_sd = np.sqrt(detector)
bm1_counts_sd = np.sqrt(bm1_counts)
# detector normalisation with a water file
if h5norm is not None:
with _possibly_open_hdf_file(h5norm, "r") as f:
# shape ({x, y},)
detector_norm, detector_norm_sd = self.create_detector_norm(f)
# detector has shape (N, T, Y), shape of detector_norm should
# broadcast to (1, 1, y)
# TODO: Correlated Uncertainties?
detector, detector_sd = EP.EPdiv(
detector, detector_sd, detector_norm, detector_norm_sd
)
# shape of these is (n_spectra, TOFbins)
m_spec_tof_hist = np.zeros(
(n_spectra, np.size(TOF, 0)), dtype="float64"
)
m_lambda_hist = np.zeros((n_spectra, np.size(TOF, 0)), dtype="float64")
m_spec_tof_hist[:] = TOF
"""
chopper to detector distances
note that if eventmode is specified the n_spectra is NOT
equal to the number of entries in e.g. /longitudinal_translation
this means you have to copy values in from the correct scanpoint
"""
flight_distance = np.zeros(n_spectra, dtype="float64")
d_cx = np.zeros(n_spectra, dtype="float64")
detpositions = np.zeros(n_spectra, dtype="float64")
# The angular divergence of the instrument
domega = np.zeros(n_spectra, dtype="float64")
estimated_beam_width = np.zeros(n_spectra, dtype="float64")
phase_angle = np.zeros(n_spectra, dtype="float64")
# process each of the spectra taken in the detector image
original_scanpoint = scanpoint
for idx in range(n_spectra):
freq = cat.frequency[scanpoint]
# calculate the angular divergence
domega[idx] = self.beam_divergence(scanpoint)
"""
estimated beam width in pixels at detector
"""
v = self.estimated_beam_width_at_detector(scanpoint)
estimated_beam_width[idx] = np.squeeze(v)
"""
work out the total flight length
IMPORTANT: this varies as a function of twotheta. This is
because the Platypus detector does not move on an arc.
At high angles chod can be ~ 0.75% different. This is will
visibly shift fringes.
"""
if omega is None:
omega = cat.omega[scanpoint]
if twotheta is None:
twotheta = cat.twotheta[scanpoint]
output = self.chod(omega, twotheta, scanpoint=scanpoint)
flight_distance[idx] = np.squeeze(output[0])
d_cx[idx] = output[1]
# calculate nominal phase openings
phase_angle[idx], master_opening = self.phase_angle(scanpoint)
"""
toffset - the time difference between the magnet pickup on the
choppers (TTL pulse), which is situated in the middle of the
chopper window, and the trailing edge of master chopper, which is
supposed to be time0. However, if there is a phase opening this
time offset has to be relocated slightly, as time0 is not at the
trailing edge.
"""
t_offset = self.time_offset(
cat.master_phase_offset[0],
master_opening,
freq,
phase_angle[idx],
d_cx[0],
flight_distance[idx],
m_spec_tof_hist[idx],
t_offset=cat.t_offset,
)
m_spec_tof_hist[idx] -= t_offset
detpositions[idx] = cat.dy[scanpoint]
if eventmode is not None or event_filter is not None:
m_spec_tof_hist[:] = TOF - t_offset
flight_distance[:] = flight_distance[0]
detpositions[:] = detpositions[0]
domega[:] = domega[0]
estimated_beam_width[:] = estimated_beam_width[0]
d_cx[:] = d_cx[0]
phase_angle[:] = phase_angle[0]
break
else:
scanpoint += 1
scanpoint = original_scanpoint
# convert TOF to lambda
# m_spec_tof_hist (n, t) and chod is (n,)
m_lambda_hist = general.velocity_wavelength(
1.0e3 * flight_distance[:, np.newaxis] / m_spec_tof_hist
)
m_lambda = 0.5 * (m_lambda_hist[:, 1:] + m_lambda_hist[:, :-1])
TOF -= t_offset
# gravity correction if direct beam, but only if you're on Platypus
if direct and isinstance(self, PlatypusNexus):
# TODO: Correlated Uncertainties?
detector, detector_sd, m_gravcorrcoefs = self.correct_for_gravity(
detector, detector_sd, m_lambda, lo_wavelength, hi_wavelength
)
# where is the specular ridge?
if peak_pos == -1:
# you always want to find the beam manually
ret = manual_beam_find(
detector, detector_sd, Path(cat.filename).name
)
beam_centre, beam_sd, lopx, hipx, bp = ret
full_backgnd_mask = np.zeros_like(detector, dtype=bool)
for i, v in enumerate(bp):
full_backgnd_mask[i, :, v] = True
elif peak_pos is None:
# absolute tolerance in beam pixel position for auto peak finding
# derived as a fraction of detector pixel size. 0.0142 mm at
# dy = 2512 corresponds to 0.0003 degrees.
try:
atol, rtol = peak_pos_tol
except (ValueError, TypeError):
# TypeError for unpacking None (currently the default option)
# ValueError for unpacking a single number (historical
# behaviour)
atol = 0.0142 / self.cat.qz_pixel_size[0]
rtol = 0.015
# use the auto finder, falling back to manual_beam_find
ret = find_specular_ridge(
detector,
detector_sd,
tol=(atol, rtol),
manual_beam_find=manual_beam_find,
name=Path(cat.filename).name,
)
beam_centre, beam_sd, lopx, hipx, full_backgnd_mask = ret
else:
# the specular ridge has been specified
beam_centre = np.ones(n_spectra) * peak_pos[0]
beam_sd = np.ones(n_spectra) * peak_pos[1]
lopx, hipx, bp = fore_back_region(beam_centre, beam_sd)
full_backgnd_mask = np.zeros_like(detector, dtype=bool)
for i, v in enumerate(bp):
full_backgnd_mask[i, :, v] = True
lopx = lopx.astype(int)
hipx = hipx.astype(int)
if lopx_hipx is not None:
print(lopx_hipx)
lopx = np.ones(n_spectra, dtype=int) * int(lopx_hipx[0])
hipx = np.ones(n_spectra, dtype=int) * int(lopx_hipx[1])
for i in range(n_spectra):
try:
assert lopx[i] <= beam_centre[i] <= hipx[i]
except AssertionError:
raise RuntimeError(
"One of the beam centres does not lie in the"
" manually specified lopx/hipx region."
)
# Warning if the beam appears to be much broader than the divergence
# would predict. The use of 30% tolerance is a guess. This might happen
# if the beam finder includes incoherent background region by mistake.
# if not np.allclose(estimated_beam_width, hipx - lopx + 1, rtol=0.3):
# warnings.warn(
# "The foreground width (%s) estimate"
# " does not match the divergence of the beam (%s)."
# " Consider checking with manual beam finder."
# % (str(hipx - lopx + 1), str(estimated_beam_width))
# )
if np.size(beam_centre) != n_spectra:
raise RuntimeError(
"The number of beam centres should be equal"
" to the number of detector images."
)
"""
Rebinning in lambda for all detector
Rebinning is the default option, but sometimes you don't want to.
detector shape input is (n, t, y)
"""
if wavelength_bins is not None:
rebinning = wavelength_bins
elif 0.0 < rebin_percent < 15.0:
rebinning = calculate_wavelength_bins(
lo_wavelength, hi_wavelength, rebin_percent
)
# rebin_percent percentage is zero. No rebinning, just cutoff
# wavelength
else:
rebinning = m_lambda_hist[0, :]
rebinning = rebinning[
np.searchsorted(rebinning, lo_wavelength) : np.searchsorted(
rebinning, hi_wavelength
)
]
"""
now do the rebinning for all the N detector images
rebin.rebinND could do all of these at once. However, m_lambda_hist
could vary across the range of spectra. If it was the same I could
eliminate the loop.
"""
output = []
output_sd = []
for idx in range(n_spectra):
# TODO: Correlated Uncertainties?
plane, plane_sd = rebin_along_axis(
detector[idx],
m_lambda_hist[idx],
rebinning,
y1_sd=detector_sd[idx],
)
output.append(plane)
output_sd.append(plane_sd)
detector = np.array(output)
detector_sd = np.array(output_sd)
if len(detector.shape) == 2:
detector = detector[np.newaxis,]
detector_sd = detector_sd[np.newaxis,]
# (1, T)
m_lambda_hist = np.atleast_2d(rebinning)
"""
Divide the detector intensities by the width of the wavelength bin.
This is so the intensities between different rebinning strategies can
be compared.
"""
if normalise_bins:
div = 1 / np.diff(m_lambda_hist[0])[:, np.newaxis]
detector, detector_sd = EP.EPmulk(detector, detector_sd, div)
# convert the wavelength base to a timebase
m_spec_tof_hist = (
0.001
* flight_distance[:, np.newaxis]
/ general.wavelength_velocity(m_lambda_hist)
)
m_lambda = 0.5 * (m_lambda_hist[:, 1:] + m_lambda_hist[:, :-1])
m_spec_tof = (
0.001
* flight_distance[:, np.newaxis]
/ general.wavelength_velocity(m_lambda)
)
m_spec = np.zeros((n_spectra, np.size(detector, 1)))
m_spec_sd = np.zeros_like(m_spec)
# background subtraction
if background:
if background_mask is not None:
# background_mask is (Y), need to make 3 dimensional (N, T, Y)
# first make into (T, Y)
backgnd_mask = np.repeat(
background_mask[np.newaxis, :], detector.shape[1], axis=0
)
# make into (N, T, Y)
full_backgnd_mask = np.repeat(
backgnd_mask[np.newaxis, :], n_spectra, axis=0
)
# TODO: Correlated Uncertainties?
detector, detector_sd = background_subtract(
detector, detector_sd, full_backgnd_mask
)
# assert that none of the lopx/hipx regions overlap with the
# background mask
msk = np.sum(full_backgnd_mask, axis=0) # (N, T, Y) -> (T, Y)
msk = np.sum(msk, axis=0) # (T, Y) -> (Y,)
msk = np.sum(msk[lopx[0] : hipx[0] + 1])
if msk > 0:
raise RuntimeError(
"At some point the background mask overlaps with the"
" foreground region"
)
"""
top and tail the specular beam with the known beam centres.
All this does is produce a specular intensity with shape (N, T),
i.e. integrate over specular beam
"""
for i in range(n_spectra):
m_spec[i] = np.sum(detector[i, :, lopx[i] : hipx[i] + 1], axis=1)
sd = np.sum(detector_sd[i, :, lopx[i] : hipx[i] + 1] ** 2, axis=1)
m_spec_sd[i] = np.sqrt(sd)
# assert np.isfinite(m_spec).all()
# assert np.isfinite(m_specSD).all()
# assert np.isfinite(detector).all()
# assert np.isfinite(detectorSD).all()
# normalise by beam monitor 1.
if normalise:
m_spec, m_spec_sd = EP.EPdiv(
m_spec,
m_spec_sd,
bm1_counts[:, np.newaxis],
bm1_counts_sd[:, np.newaxis],
)
output = EP.EPdiv(
detector,
detector_sd,
bm1_counts[:, np.newaxis, np.newaxis],
bm1_counts_sd[:, np.newaxis, np.newaxis],
)
detector, detector_sd = output
"""
now work out dlambda/lambda, the resolution contribution from
wavelength.
van Well, Physica B, 357(2005) pp204-207), eqn 4.
this is only an approximation for our instrument, as the 2nd and 3rd
discs have smaller openings compared to the master chopper.
Therefore the burst time needs to be looked at.
"""
tau_da = m_spec_tof_hist[:, 1:] - m_spec_tof_hist[:, :-1]
m_lambda_fwhm = resolution_double_chopper(
m_lambda,
z0=d_cx[:, np.newaxis] / 1000.0,
freq=cat.frequency[:, np.newaxis],
L=flight_distance[:, np.newaxis] / 1000.0,
H=cat.ss_coll2[original_scanpoint] / 1000.0,
xsi=phase_angle[:, np.newaxis],
tau_da=tau_da,
)
m_lambda_fwhm *= m_lambda
# put the detector positions and mode into the dictionary as well.
detector_z = cat.dz
detector_y = cat.dy
try:
mode = cat.mode
except AttributeError:
# no mode for SPZ
mode = None
d = dict()
d["path"] = cat.path
d["datafilename"] = cat.filename
d["datafile_number"] = cat.datafile_number
if h5norm is not None:
if isinstance(h5norm, h5py.File):
d["normfilename"] = h5norm.filename
else:
d["normfilename"] = h5norm
d["m_topandtail"] = detector
d["m_topandtail_sd"] = detector_sd
d["n_spectra"] = n_spectra
d["bm1_counts"] = bm1_counts
d["m_spec"] = m_spec
d["m_spec_sd"] = m_spec_sd
d["m_beampos"] = beam_centre
d["m_beampos_sd"] = beam_sd
d["m_lambda"] = m_lambda
d["m_lambda_fwhm"] = m_lambda_fwhm
d["m_lambda_hist"] = m_lambda_hist
d["m_spec_tof"] = m_spec_tof
d["mode"] = mode
d["detector_z"] = detector_z
d["detector_y"] = detector_y
d["domega"] = domega
d["lopx"] = lopx
d["hipx"] = hipx
d["start_time"] = start_time
d["reduction_options"] = options
self.processed_spectrum = d
return m_lambda, m_spec, m_spec_sd
def process_event_stream(
self,
t_bins=None,
x_bins=None,
y_bins=None,
frame_bins=None,
scanpoint=0,
event_folder=None,
event_filter=None,
):
"""
Processes the event mode dataset for the NeXUS file. Assumes that
there is a event mode directory in the same directory as the NeXUS
file, as specified by in 'entry1/instrument/detector/daq_dirname'
Parameters
----------
t_bins : array_like, optional
specifies the time bins required in the image
x_bins : array_like, optional
specifies the x bins required in the image
y_bins : array_like, optional
specifies the y bins required in the image
scanpoint : int, optional
Scanpoint you are interested in
event_folder : {None, str, Path}
Specifies the path for the eventmode data. If
`event_folder is None` then the eventmode data is assumed to reside
in the same directory as the NeXUS file. If event_folder is a
string, then the string specifies the path to the eventmode data.
frame_bins : array_like, optional
specifies the frame bins required in the image. If
frame_bins = [5, 10, 120] you will get 2 images. The first starts
at 5s and finishes at 10s. The second starts at 10s and finishes
at 120s. If frame_bins has zero length, e.g. [], then a single
interval consisting of the entire acquisition time is used:
[0, acquisition_time]. If `event_filter` is provided then this
parameter is ignored.
event_filter : callable, optional
A function, that processes the event stream, returning a `detector`
array, and a `frame_count` array. `detector` has shape
`(N, T, Y, X)`, where `N` is the number of detector images, `T` is
the number of time bins (`len(t_bins)`), etc. `frame_count` has
shape `(N,)` and contains the number of frames for each of the
detector images. The frame_count is used to determine what fraction
of the overall monitor counts should be ascribed to each detector
image. The function has signature:
detector, frame_count = event_filter(loaded_events,
t_bins=None,
y_bins=None,
x_bins=None)
`loaded_events` is a 4-tuple of numpy arrays:
`(f_events, t_events, y_events, x_events)`, where `f_events`
contains the frame number for each neutron, landing at position
`x_events, y_events` on the detector, with time-of-flight
`t_events`.
Returns
-------
detector, frame_count, bm1_counts : np.ndarray, np.ndarray, np.ndarray
Create a new detector image based on the t_bins, x_bins, y_bins and
frame_bins you supply to the method (these should all be lists/numpy
arrays specifying the edges of the required bins). If these are not
specified, then the default bins are taken from the nexus file. This
would essentially return the same detector image as the nexus file.
However, you can specify the frame_bins list to generate detector
images based on subdivided periods of the total acquisition.
For example if frame_bins = [5, 10, 120] you will get 2 images. The
first starts at 5s and finishes at 10s. The second starts at 10s
and finishes at 120s. The frame_bins are clipped to the total
acquisition time if necessary.
`frame_count` is how many frames went into making each detector image.
"""
cat = self.cat
if not t_bins:
t_bins = cat.t_bins
if not y_bins:
y_bins = cat.y_bins
if not x_bins:
x_bins = cat.x_bins
if frame_bins is None or np.size(frame_bins) == 0:
frame_bins = [0, cat.time[scanpoint]]
total_acquisition_time = cat.time[scanpoint]
frequency = cat.frequency[scanpoint]
bm1_counts_for_scanpoint = cat.bm1_counts[scanpoint]
event_directory_name = cat.daq_dirname
_eventpath = cat.path
if event_folder is not None:
_eventpath = event_folder
stream_filename = (
Path(_eventpath)
/ event_directory_name
/ f"DATASET_{scanpoint}"
/ "EOS.bin"
)
with io.open(stream_filename, "rb") as f:
last_frame = int(frame_bins[-1] * frequency)
loaded_events, end_events = events(f, max_frames=last_frame)
# convert frame_bins to list of filter frames
frames = framebins_to_frames(
np.asarray(frame_bins).astype(float, copy=False) * frequency
)
if event_filter is not None:
output = event_filter(loaded_events, t_bins, y_bins, x_bins)
else:
output = process_event_stream(
loaded_events, frames, t_bins, y_bins, x_bins
)
detector, frame_count = output
bm1_counts = (
frame_count
* bm1_counts_for_scanpoint
/ total_acquisition_time
/ frequency
)
return detector, frame_count, bm1_counts
[docs]def create_reflect_nexus(h5data):
"""
Creates a ReflectNexus object from an HDF file
Parameters
----------
h5data : {ReflectNexus, HDF5 NeXus file, str, Path}
A ReflectNexus object, an HDF5 NeXus file for Platypus/Spatz, or a
str/Path specifying the path to one
"""
if isinstance(h5data, ReflectNexus):
return h5data
with _possibly_open_hdf_file(h5data, "r") as f:
fname = _check_HDF_file(f)
if not fname:
raise ValueError(f"{h5data!r} is not a valid HDF5 file")
pth = Path(fname)
instrument = pth.name[:3]
if instrument == "PLP":
return PlatypusNexus(f)
elif instrument == "SPZ":
return SpatzNexus(f)
else:
raise ValueError(
f"{h5data!r} doesn't appear to be a Spatz or Platypus file"
)
[docs]class PlatypusNexus(ReflectNexus):
"""
Processes Platypus NeXus files to produce an intensity vs wavelength
spectrum
Parameters
----------
h5data : {HDF5 NeXus file, str, Path}
An HDF5 NeXus file for Platypus, or a str/Path specifying the path
to one
"""
def __init__(self, h5data):
"""
Initialises the PlatypusNexus object.
"""
super().__init__()
self.prefix = "PLP"
with _possibly_open_hdf_file(h5data, "r") as f:
self.cat = PlatypusCatalogue(f)
if self.cat.mode in ["POL", "POLANAL"]:
self.cat = PolarisedCatalogue(f)
# Set spin channels based of flipper statuses
if self.cat.mode == "POL":
if self.cat.pol_flip_current > 0.1:
self.spin_state = SpinChannel.UP_UP
else:
self.spin_state = SpinChannel.DOWN_DOWN
if self.cat.mode == "POLANAL":
if (
self.cat.pol_flip_current > 0.1
and self.cat.anal_flip_current > 0.1
):
self.spin_state = SpinChannel.UP_UP
elif (
self.cat.pol_flip_current > 0.1
and self.cat.anal_flip_current < 0.1
):
self.spin_state = SpinChannel.UP_DOWN
elif (
self.cat.pol_flip_current < 0.1
and self.cat.anal_flip_current > 0.1
):
self.spin_state = SpinChannel.DOWN_UP
elif (
self.cat.pol_flip_current < 0.1
and self.cat.anal_flip_current < 0.1
):
self.spin_state = SpinChannel.DOWN_DOWN
[docs] def detector_average_unwanted_direction(self, detector):
"""
Averages over non-collimated beam direction
"""
# Up until this point detector.shape=(N, T, Y, X)
# pre-average over x, leaving (n, t, y) also convert to dp
return np.sum(detector, axis=3, dtype="float64")
[docs] def create_detector_norm(self, h5norm):
"""
Produces a detector normalisation array for a neutron detector.
Here we average over N, T and X to provide a relative efficiency for
each y wire.
Parameters
----------
h5norm : hdf5 file
Containing a flood field run (water)
Returns
-------
norm, norm_sd : array_like
1D array containing the normalisation data for each y pixel
"""
x_bins = self.cat.x_bins
return create_detector_norm(h5norm, x_bins[0], x_bins[1], axis=3)
[docs] def estimated_beam_width_at_detector(self, scanpoint):
cat = self.cat
L23 = cat.cat["collimation_distance"]
L3det = (
cat.dy[scanpoint] + cat.sample_distance[0] - cat.slit3_distance[0]
)
ebw = general.height_of_beam_after_dx(
cat.ss2vg[scanpoint], cat.ss3vg[scanpoint], L23, L3det
)
umb, penumb = ebw
# convolve in detector resolution (~2.2 mm?)
# first convert to beam sd, convolve in detector, and expand sd
# back to total foreground width
# use average of umb and penumb, the calc assumes a rectangular
# distribution
penumb = (
np.sqrt((0.289 * 0.5 * (umb + penumb)) ** 2.0 + 2.2**2)
* EXTENT_MULT
* 2
)
# we need it in pixels
return penumb / cat.qz_pixel_size[0]
[docs] def correct_for_gravity(
self, detector, detector_sd, m_lambda, lo_wavelength, hi_wavelength
):
cat = self.cat
return correct_for_gravity(
detector,
detector_sd,
m_lambda,
cat.collimation_distance,
cat.dy,
lo_wavelength,
hi_wavelength,
qz_pixel_size=cat.qz_pixel_size[0],
)
[docs] def time_offset(
self,
master_phase_offset,
master_opening,
freq,
phase_angle,
z0,
flight_distance,
tof_hist,
t_offset=None,
):
"""
Timing offsets for Platypus chopper system, includes a gravity
correction for phase angle
"""
DISCRADIUS = 350.0
# calculate initial time offset from the pickup being slightly in
# the wrong place
m_offset = 1.0e6 * master_phase_offset / (2.0 * 360.0 * freq)
# make a correction for the phase angle
total_offset = m_offset - 1.0e6 * phase_angle / (360 * 2 * freq)
# assumes that the pickup/T_0 signal is issued from middle
# of chopper window. But you can override this by supplying a t_offset.
# This is for where a signal generator has been used to offset that t_0
if t_offset is not None:
total_offset += t_offset
else:
total_offset += 1.0e6 * master_opening / (2 * 360 * freq)
###########################################
# now make a gravity correction to total_offset
# work out velocities for each bin edge
velocities = 1.0e3 * flight_distance / (tof_hist - total_offset)
angles = find_trajectory(
self.cat.collimation_distance / 1000.0, 0, velocities
)
# work out distance from 1st coll slit to middle of chopper pair
# TODO ASSUMES CHOPPER 1 IS MASTER, FIX SO IT COULD BE ANY MASTER
d_c1 = -self.cat.slit2_distance
d_slave = d_c1 + z0
corr_t_offset = np.zeros_like(tof_hist)
# assumes that the pickups/T_0 signal is issued from middle
# of chopper window. `t_offset` is for where a signal generator
# has been used to offset that t_0.
if t_offset is not None:
corr_t_offset += t_offset / 1.0e6
else:
corr_t_offset += master_opening / (2 * 360 * freq)
for i, (velocity, angle) in enumerate(zip(velocities, angles)):
parab = parabola(angle, velocity)
h_c1 = parab(d_c1 / 1000.0) * 1000.0
h_slave = parab(d_slave / 1000.0) * 1000.0
# angle_corr comes about because the parabolic nature of long
# wavelength neutrons creates an apparent phase opening
angle_corr = np.degrees(np.arctan((h_slave - h_c1) / DISCRADIUS))
# master_corr comes about because the beam for long wavelength
# neutrons is lower than the optical axis, so it takes a little
# longer for the beam to start travelling through chopper system.
# as such you need to decrease their tof by increasing the
# t_offset
master_corr = -np.degrees(np.arctan(h_c1 / DISCRADIUS))
_offset = (master_phase_offset + master_corr) / (
2.0 * 360.0 * freq
)
_offset -= (phase_angle + angle_corr) / (360 * 2 * freq)
corr_t_offset[i] += np.squeeze(_offset)
corr_t_offset *= 1e6
return corr_t_offset
[docs] def phase_angle(self, scanpoint=0):
"""
Calculates the phase angle for a given scanpoint
Parameters
----------
scanpoint : int
The scanpoint you're interested in
Returns
-------
phase_angle, master_opening : float
The phase angle and angular opening of the master chopper in
degrees.
"""
disc_openings = (60.0, 10.0, 25.0, 60.0)
O_C1d, O_C2d, O_C3d, O_C4d = disc_openings
cat = self.cat
master = cat.master
slave = cat.slave
disc_phase = cat.phase[scanpoint]
phase_angle = 0
if master == 1:
phase_angle += 0.5 * O_C1d
master_opening = O_C1d
elif master == 2:
phase_angle += 0.5 * O_C2d
master_opening = O_C2d
elif master == 3:
phase_angle += 0.5 * O_C3d
master_opening = O_C3d
# the phase_offset is defined as the angle you have to add to the
# calibrated blind opening to get to the nominal optically blind
# chopper opening.
# e.g. Nominal opening for optically may be at 42.5 degrees
# but the calibrated optically blind position is 42.2 degrees
# the chopper_phase_offset would be 0.3 degrees.
if slave == 2:
phase_angle += 0.5 * O_C2d
phase_angle += -disc_phase - cat.chopper2_phase_offset[0]
elif slave == 3:
phase_angle += 0.5 * O_C3d
phase_angle += -disc_phase - cat.chopper3_phase_offset[0]
elif slave == 4:
phase_angle += 0.5 * O_C4d
phase_angle += disc_phase - cat.chopper4_phase_offset[0]
return phase_angle, master_opening
[docs] def chod(self, omega=0.0, twotheta=0.0, scanpoint=0):
"""
Calculates the flight length of the neutrons in the Platypus
instrument.
Parameters
----------
omega : float, optional
Rough angle of incidence
twotheta : float, optional
Rough 2 theta angle
scanpoint : int, optional
Which dataset is being considered
Returns
-------
chod, d_cx : float, float
Flight distance (mm), distance between chopper discs (mm)
"""
chod = 0
# guide 1 is the single deflection mirror (SB)
# its distance is from chopper 1 to the middle of the mirror (1m long)
# guide 2 is the double deflection mirror (DB)
# its distance is from chopper 1 to the middle of the second of the
# compound mirrors! (a bit weird, I know).
cat = self.cat
mode = cat.mode
# Find out chopper pairing
master = cat.master
slave = cat.slave
d_cx = 0
if master == 1:
chod = 0
elif master == 2:
chod -= cat.chopper2_distance[0]
d_cx -= chod
elif master == 3:
chod -= cat.chopper3_distance[0]
d_cx -= chod
else:
raise ValueError(
"Chopper pairing should be one of '12', '13',"
"'14', '23', '24', '34'"
)
if slave == 2:
chod -= cat.chopper2_distance[0]
d_cx += cat.chopper2_distance[0]
elif slave == 3:
chod -= cat.chopper3_distance[0]
d_cx += cat.chopper3_distance[0]
elif slave == 4:
chod -= cat.chopper4_distance[0]
d_cx += cat.chopper4_distance[0]
# start of flight length is midway between master and slave, but master
# may not necessarily be disk 1. However, all instrument lengths are
# measured from disk1
chod /= 2.0
if mode in ["FOC", "POL", "MT", "POLANAL"]:
chod += cat.sample_distance[0]
chod += cat.dy[scanpoint] / np.cos(np.radians(twotheta))
elif mode == "SB":
# assumes guide1_distance is in the MIDDLE OF THE MIRROR
chod += cat.guide1_distance[0]
chod += (cat.sample_distance[0] - cat.guide1_distance[0]) / np.cos(
np.radians(omega)
)
if twotheta > omega:
chod += cat.dy[scanpoint] / np.cos(
np.radians(twotheta - omega)
)
else:
chod += cat.dy[scanpoint] / np.cos(
np.radians(omega - twotheta)
)
elif mode == "DB":
# guide2_distance in in the middle of the 2nd compound mirror
# guide2_distance - longitudinal length from midpoint1 -> midpoint2
# + direct length from midpoint1->midpoint2
chod += cat.guide2_distance[0] + 600.0 * np.cos(
np.radians(1.2)
) * (1 - np.cos(np.radians(2.4)))
# add on distance from midpoint2 to sample
chod += (cat.sample_distance[0] - cat.guide2_distance[0]) / np.cos(
np.radians(4.8)
)
# add on sample -> detector
if twotheta > omega:
chod += cat.dy[scanpoint] / np.cos(np.radians(twotheta - 4.8))
else:
chod += cat.dy[scanpoint] / np.cos(np.radians(4.8 - twotheta))
return chod, d_cx
[docs]class SpatzNexus(ReflectNexus):
"""
Processes Spatz NeXus files to produce an intensity vs wavelength
spectrum
Parameters
----------
h5data : {HDF5 NeXus file, str, Path}
An HDF5 NeXus file for Spatz, or a `str` containing the path
to one
"""
def __init__(self, h5data):
"""
Initialises the SpatzNexus object.
"""
super().__init__()
self.prefix = "SPZ"
with _possibly_open_hdf_file(h5data, "r") as f:
self.cat = SpatzCatalogue(f)
[docs] def detector_average_unwanted_direction(self, detector):
"""
Averages over non-collimated beam direction
"""
# Up until this point detector.shape=(N, T, Y, X)
# pre-average over Y, leaving (n, t, x) also convert to dp
return np.sum(detector, axis=2, dtype="float64")
[docs] def create_detector_norm(self, h5norm):
"""
Produces a detector normalisation array for a neutron detector.
Here we average over N, T and Y to provide a relative efficiency for
each X wire.
Parameters
----------
h5norm : hdf5 file
Containing a flood field run (water)
Returns
-------
norm, norm_sd : array_like
1D array containing the normalisation data for each x pixel
"""
y_bins = self.cat.y_bins
return create_detector_norm(h5norm, y_bins[0], y_bins[1], axis=2)
[docs] def estimated_beam_width_at_detector(self, scanpoint):
cat = self.cat
L23 = cat.cat["collimation_distance"]
L3det = (
cat.dy[scanpoint] + cat.sample_distance[0] - cat.slit3_distance[0]
)
ebw = general.height_of_beam_after_dx(
cat.ss2hg[scanpoint], cat.ss3hg[scanpoint], L23, L3det
)
umb, penumb = ebw
# convolve in detector resolution (~2.2 mm?)
# first convert to beam sd, convolve in detector, and expand sd
# back to total foreground width
# use average of umb and penumb, the calc assumes a rectangular
# distribution
penumb = (
np.sqrt((0.289 * 0.5 * (umb + penumb)) ** 2.0 + 2.2**2)
* EXTENT_MULT
* 2
)
# we need it in pixels
return penumb / cat.qz_pixel_size[0]
[docs] def time_offset(
self,
master_phase_offset,
master_opening,
freq,
phase_angle,
z0,
flight_distance,
tof_hist,
t_offset=None,
):
"""
Timing offsets for Spatz chopper system
return total_offset
"""
# calculate initial time offset from the phase angle and master
# chopper offset.
m_offset = 1.0e6 * master_phase_offset / (2.0 * 360.0 * freq)
total_offset = m_offset + 1.0e6 * phase_angle / (360 * 2 * freq)
# assumes that the pickup is in the middle of the chopper disc. But
# you can override this by supplying a t_offset value.
if t_offset is not None:
total_offset += t_offset
else:
total_offset += 1.0e6 * master_opening / (2 * 360 * freq)
return total_offset
[docs] def phase_angle(self, scanpoint=0):
"""
Calculates the phase angle for a given scanpoint
Parameters
----------
scanpoint : int
The scanpoint you're interested in
Returns
-------
phase_angle, master_opening : float
The phase angle and angular opening of the master chopper in
degrees.
"""
disc_openings = (26.0, 42.0, 43.5, 126.0)
O_C1d, O_C2d, O_C2Bd, O_C3d = disc_openings
cat = self.cat
master = cat.master
slave = cat.slave
disc_phase = cat.phase[scanpoint]
phase_angle = 0
if master == 1:
phase_angle += 0.5 * O_C1d
master_opening = O_C1d
elif master == 2:
phase_angle += 0.5 * O_C2d
master_opening = O_C2d
# the phase_offset is defined as the angle you have to add to the
# calibrated blind opening to get to the nominal optically blind
# chopper opening.
# e.g. Nominal opening for optically blind may be at 34 degrees
# but the calibrated optically blind position is 34.22 degrees
# the chopper_phase_offset would be -0.22 degrees.
if slave == 2:
phase_angle += 0.5 * O_C2d
phase_angle += -disc_phase - cat.poff_c2_slave_1_master[0]
elif slave == 3:
# chopper 2B
phase_angle += 0.5 * O_C2Bd
if master == 1:
phase_angle += -disc_phase - cat.poff_c2b_slave_1_master[0]
elif master == 2:
phase_angle += -disc_phase - cat.poff_c2b_slave_2_master[0]
return phase_angle, master_opening
[docs] def chod(self, omega=0.0, twotheta=0.0, scanpoint=0):
"""
Calculates the flight length of the neutrons in the Spatz
instrument.
Parameters
----------
omega : float, optional
Rough angle of incidence
twotheta : float, optional
Rough 2 theta angle
scanpoint : int, optional
Which dataset is being considered
Returns
-------
chod, d_cx : float, float
Flight distance (mm), distance between chopper discs (mm)
"""
chod = 0
cat = self.cat
# Find out chopper pairing
master = cat.master
slave = cat.slave
if master == 1:
chod = cat.sample_distance
if slave == 2:
d_cx = cat.chopper2_distance[0]
elif slave == 3:
d_cx = cat.chopper2B_distance[0]
else:
raise RuntimeError("Couldn't figure out chopper spacing")
elif master == 2:
chod = cat.sample_distance - cat.chopper2_distance[0]
if slave == 3:
# chopper2B is the slave
d_cx = cat.chopper2B_distance[0] - cat.chopper2_distance[0]
else:
raise RuntimeError("Couldn't figure out chopper spacing")
chod += cat.dy[scanpoint] - 0.5 * d_cx
return chod, d_cx
def background_subtract(detector, detector_sd, background_mask):
"""
Background subtraction of Platypus detector image.
Shape of detector is (N, T, {X, Y}), do a linear background subn for each
(N, T) slice.
Parameters
----------
detector : np.ndarray
detector array with shape (N, T, {X, Y}).
detector_sd : np.ndarray
standard deviations for detector array
background_mask : array_like
array of bool with shape (N, T, {X, Y}) that specifies which X or Y
pixels to use for background subtraction.
Returns
-------
detector, detector_sd : np.ndarray, np.ndarray
Detector image with background subtracted
"""
ret = np.zeros_like(detector)
ret_sd = np.zeros_like(detector)
for idx in np.ndindex(detector.shape[0:2]):
ret[idx], ret_sd[idx] = background_subtract_line(
detector[idx], detector_sd[idx], background_mask[idx]
)
return ret, ret_sd
def background_subtract_line(profile, profile_sd, background_mask):
"""
Performs a linear background subtraction on a 1D peak profile
Parameters
----------
profile : np.ndarray
1D profile
profile_sd : np.ndarray
standard deviations for profile
background_mask : array_like
array of bool that specifies which Y pixels to use for background
subtraction.
Returns
-------
profile_subt, profile_subt_err : np.ndarray, np.ndarray
Background subtracted profile and its uncertainty
"""
# which values to use as a background region
mask = np.array(background_mask).astype("bool")
x_vals = np.where(mask)[0]
if np.size(x_vals) < 2:
# can't do a background subtraction if you have less than 2 points in
# the background
return profile, profile_sd
try:
y_vals = profile[x_vals]
except IndexError:
print(x_vals)
y_sdvals = profile_sd[x_vals]
x_vals = x_vals.astype("float")
# some SD values may have 0 SD, which will screw up curvefitting.
y_sdvals = np.where(y_sdvals == 0, 1, y_sdvals)
# equation for a straight line
def f(x, a, b):
return a + b * x
# estimate the linear fit
y_bar = np.mean(y_vals)
x_bar = np.mean(x_vals)
bhat = np.sum((x_vals - x_bar) * (y_vals - y_bar))
bhat /= np.sum((x_vals - x_bar) ** 2)
ahat = y_bar - bhat * x_bar
# get the weighted fit values
# we know the absolute sigma values
popt, pcov = curve_fit(
f,
x_vals,
y_vals,
sigma=y_sdvals,
p0=np.array([ahat, bhat]),
absolute_sigma=True,
)
def CI(xx, pcovmat):
return (
pcovmat[0, 0]
+ pcovmat[1, 0] * xx
+ pcovmat[0, 1] * xx
+ pcovmat[1, 1] * (xx**2)
)
bkgd = f(np.arange(np.size(profile, 0)), popt[0], popt[1])
# now work out confidence intervals
# TODO, should this be confidence interval or prediction interval?
# if you try to do a fit which has a singular matrix
if np.isfinite(pcov).all():
bkgd_sd = np.asarray(
[CI(x, pcov) for x in np.arange(len(profile))], dtype="float64"
)
else:
bkgd_sd = np.zeros_like(bkgd)
bkgd_sd = np.sqrt(bkgd_sd)
# get the t value for a two sided student t test at the 68.3 confidence
# level
bkgd_sd *= t.isf(0.1585, np.size(x_vals, 0) - 2)
return EP.EPsub(profile, profile_sd, bkgd, bkgd_sd)
def find_specular_ridge(
detector,
detector_sd,
search_increment=50,
tol=(0.05, 0.015),
manual_beam_find=None,
name=None,
):
"""
Find the specular ridges in a detector(n, t, {x, y}) plot.
Parameters
----------
detector : array_like
detector array
detector_sd : array_like
standard deviations of detector array
search_increment : int
specifies the search increment for the location process.
tol : (float, float) tuple
specifies tolerances for finding the specular beam.
tol[0] is the absolute change (in pixels) in beam centre location
below which peak finding stops.
tol[1] is the relative change in beam width below which peak finding
stops.
manual_beam_find : callable, optional
A function which allows the location of the specular ridge to be
determined. Has the signature `f(detector, detector_err, name)`
where `detector` and `detector_err` is the detector image and its
uncertainty, and name is a `str` specifying the name of
the dataset.
`detector` and `detector_err` have shape (n, t, {x, y}) where `n`
is the number of detector images, `t` is the number of
time-of-flight bins and `x` or `y` is the number of x or y pixels.
The function should return a tuple,
`(centre, centre_sd, lopx, hipx, background_pixels)`. `centre`,
`centre_sd`, `lopx`, `hipx` should be arrays of shape `(n, )`,
specifying the beam centre, beam width (standard deviation), lowest
pixel of foreground region, highest pixel of foreground region.
`background_pixels` is a list of length `n`. Each of the entries
should contain arrays of pixel numbers that specify the background
region for each of the detector images.
name: str
Name of the dataset
Returns
-------
centre, SD, lopx, hipx, background_mask : np.ndarrays
peak centre, standard deviation of peak width, lowest pixel to be
included from background region, highest pixel to be included from
background region, array specifying points to be used for background
subtraction
`np.size(centre) == n`.
Notes
-----
The search for the beam centre proceeds by taking the last
`search_increment` time bins, summing over the time axis and finding
the beam centre and width along the y-axis. It then repeats the process
with the last `2 * search_increment` time bins. This process is repeated
until the relative change in beam centre and width is lower than `tol`.
This process is designed to locate the specular ridge, even in the
presence of incoherent scattering.
"""
beam_centre = np.zeros(np.size(detector, 0))
beam_sd = np.zeros_like(beam_centre)
# unpack the tolerances
atol, rtol = tol
# lopx and hipx specify the foreground region to integrate over
lopx = np.zeros_like(beam_centre, dtype=int)
hipx = np.zeros_like(beam_centre, dtype=int)
# background mask specifies which pixels are background
background_mask = np.zeros_like(detector, dtype=bool)
search_increment = abs(search_increment)
n_increments = (
np.size(detector, 1) - search_increment
) // search_increment
# we want to integrate over the following pixel region
for j in range(np.size(detector, 0)):
last_centre = -1.0
last_sd = -1.0
converged = False
for i in range(n_increments):
how_many = -search_increment * (1 + i)
det_subset = detector[j, how_many:]
det_sd_subset = detector_sd[j, how_many:]
# Uncertainties code takes a while to run
# total_y = np.sum(det_subset, axis=0)
y_cross = np.sum(det_subset, axis=0)
y_cross_sd = np.sqrt(np.sum(det_sd_subset**2.0, axis=0))
# find the centroid and gauss peak in the last sections of the TOF
# plot
try:
centroid, gauss_peak = peak_finder(y_cross, sigma=y_cross_sd)
except RuntimeError:
continue
if np.allclose(
gauss_peak[0], last_centre, atol=atol
) and np.allclose(gauss_peak[1], last_sd, rtol=rtol, atol=0):
last_centre = gauss_peak[0]
last_sd = gauss_peak[1]
converged = True
break
last_centre = gauss_peak[0]
last_sd = gauss_peak[1]
if not converged:
warnings.warn(
"specular ridge search did not work properly"
" using last known centre",
RuntimeWarning,
)
if manual_beam_find is not None:
ret = manual_beam_find(detector[j], detector_sd[j], name)
beam_centre[j], beam_sd[j], lopx[j], hipx[j], bp = ret
background_mask[j, :, bp[0]] = True
# don't assign to beam_centre, etc, at the end of this loop
continue
beam_centre[j] = last_centre
beam_sd[j] = np.abs(last_sd)
lp, hp, bp = fore_back_region(beam_centre[j], beam_sd[j])
lopx[j] = lp
hipx[j] = hp
# bp are the background pixels. Clip to the range of the detector
bp = np.clip(bp[0], 0, np.size(detector, 2) - 1)
bp = np.unique(bp)
background_mask[j, :, bp] = True
# the foreground region needs to be totally contained within the
# detector
if (lopx < 0).any():
raise ValueError(
"The foreground region for one of the detector"
" images extends below pixel 0."
)
if (hipx > np.size(detector, 2) - 1).any():
raise ValueError(
"The foreground region for one of the detector"
" images extends above the largest detector"
" pixel."
)
return beam_centre, beam_sd, lopx, hipx, background_mask
def fore_back_region(beam_centre, beam_sd):
"""
Calculates the fore and background regions based on the beam centre and
width
Parameters
----------
beam_centre : float
beam_centre
beam_sd : float
beam width (standard deviation)
Returns
-------
lopx, hipx, background_pixels: float, float, list
Lowest pixel of foreground region
Highest pixel of foreground region
Pixels that are in the background region
Each of these should have `len(lopx) == len(beam_centre)`
"""
_b_centre = np.array(beam_centre)
_b_sd = np.array(beam_sd)
lopx = np.floor(_b_centre - _b_sd * EXTENT_MULT).astype("int")
hipx = np.ceil(_b_centre + _b_sd * EXTENT_MULT).astype("int")
background_pixels = []
# limit of background regions
# from refnx.reduce.platypusnexus
y1 = np.atleast_1d(np.round(lopx - PIXEL_OFFSET).astype("int"))
y0 = np.atleast_1d(
np.round(lopx - PIXEL_OFFSET - (EXTENT_MULT * _b_sd)).astype("int")
)
y2 = np.atleast_1d(np.round(hipx + PIXEL_OFFSET).astype("int"))
y3 = np.atleast_1d(
np.round(hipx + PIXEL_OFFSET + (EXTENT_MULT * _b_sd)).astype("int")
)
# now generate background pixels
for i in range(np.size(y0)):
background_pixels.append(
np.r_[np.arange(y0[i], y1[i] + 1), np.arange(y2[i], y3[i] + 1)]
)
return lopx, hipx, background_pixels
def correct_for_gravity(
detector,
detector_sd,
lamda,
coll_distance,
sample_det,
lo_wavelength,
hi_wavelength,
theta=0,
qz_pixel_size=0.294,
):
"""
Returns a gravity corrected yt plot, given the data, its associated errors,
the wavelength corresponding to each of the time bins, and the trajectory
of the neutrons. Low lambda and high Lambda are wavelength cutoffs to
ignore.
Parameters
----------
detector : np.ndarray
Detector image. Has shape (N, T, Y)
detector_sd : np.ndarray
Standard deviations of detector image
lamda : np.ndarray
Wavelengths corresponding to the detector image, has shape (N, T)
coll_distance : float
Collimation distance between slits, mm
sample_det : float
Sample - detector distance, mm
lo_wavelength : float
Low wavelength cut off, Angstrom
hi_wavelength : float
High wavelength cutoff, Angstrom
theta : float
Angle between second collimation slit, first collimation slit, and
horizontal
qz_pixel_size: float
size of one pixel on the detector
Returns
-------
corrected_data, corrected_data_sd, m_gravcorrcoefs :
np.ndarray, np.ndarray, np.ndarray
Corrected image. This is a theoretical prediction where the spectral
ridge is for each wavelength. This will be used to calculate the
actual angle of incidence in the reduction process.
"""
x_init = np.arange((np.size(detector, axis=2) + 1) * 1.0) - 0.5
m_gravcorrcoefs = np.zeros((np.size(detector, 0)), dtype="float64")
corrected_data = np.zeros_like(detector)
corrected_data_sd = np.zeros_like(detector)
for spec in range(np.size(detector, 0)):
neutron_speeds = general.wavelength_velocity(lamda[spec])
trajectories = find_trajectory(
coll_distance / 1000.0, theta, neutron_speeds
)
travel_distance = (coll_distance + sample_det[spec]) / 1000.0
# centres(t,)
# TODO, don't use centroids, use Gaussian peak
centroids = np.apply_along_axis(centroid, 1, detector[spec])
lopx = np.searchsorted(lamda[spec], lo_wavelength)
hipx = np.searchsorted(lamda[spec], hi_wavelength)
def f(tru_centre):
deflections = y_deflection(
trajectories[lopx:hipx],
neutron_speeds[lopx:hipx],
travel_distance,
)
model = 1000.0 * deflections / qz_pixel_size + tru_centre
diff = model - centroids[lopx:hipx, 0]
diff = diff[~np.isnan(diff)]
return diff
# find the beam centre for an infinitely fast neutron
x0 = np.array([np.nanmean(centroids[lopx:hipx, 0])])
res = leastsq(f, x0)
m_gravcorrcoefs[spec] = res[0][0]
total_deflection = 1000.0 * y_deflection(
trajectories, neutron_speeds, travel_distance
)
total_deflection /= qz_pixel_size
x_rebin = x_init.T + total_deflection[:, np.newaxis]
for wavelength in range(np.size(detector, axis=1)):
output = rebin(
x_init,
detector[spec, wavelength],
x_rebin[wavelength],
y1_sd=detector_sd[spec, wavelength],
)
corrected_data[spec, wavelength] = output[0]
corrected_data_sd[spec, wavelength] = output[1]
return corrected_data, corrected_data_sd, m_gravcorrcoefs
def create_detector_norm(h5norm, bin_min, bin_max, axis):
"""
Produces a detector normalisation array for a neutron detector
(N, T, Y, X).
The data in h5norm is averaged over N, T to start with, leaving
a (Y, X) array. The data is then average over Y (axis=2) or X (axis=3)
to provide a relative efficiency.
Parameters
----------
h5norm : hdf5 file
Containing a flood field run (water)
bin_min : float, int
Minimum bin location to use
bin_max : float, int
Maximum bin location to use
axis : int
If axis = 2 the efficiency array is produced in the X direction.
If axis = 3 the efficiency array is produced in the Y direction.
Returns
-------
norm, norm_sd : array_like
1D array containing the normalisation data for each y pixel
"""
if axis not in (2, 3):
raise RuntimeError("axis must be 2 or 3.")
# sum over N and T
detector = h5norm["entry1/data/hmm"]
if axis == 3:
norm_bins = h5norm["entry1/data/x_bin"]
elif axis == 2:
norm_bins = h5norm["entry1/data/y_bin"]
# find out what pixels to use
x_low = np.searchsorted(norm_bins, bin_min, sorter=np.argsort(norm_bins))
x_low = np.argsort(norm_bins)[x_low]
x_high = np.searchsorted(norm_bins, bin_max, sorter=np.argsort(norm_bins))
x_high = np.argsort(norm_bins)[x_high]
if x_low > x_high:
x_low, x_high = x_high, x_low
norm = np.sum(detector, axis=(0, 1), dtype="float64")
# By this point you have norm[y][x]
if axis == 3:
norm = norm[:, x_low:x_high]
norm = np.sum(norm, axis=1)
elif axis == 2:
norm = norm[x_low:x_high, :]
norm = np.sum(norm, axis=0)
mean = np.mean(norm)
return norm / mean, np.sqrt(norm) / mean
def calculate_wavelength_bins(lo_wavelength, hi_wavelength, rebin_percent):
"""
Calculates optimal logarithmically spaced wavelength histogram bins. The
bins are equal size in log10 space, but they may not be exactly be
`rebin_percent` in size. The limits would have to change slightly for that.
Parameters
----------
lo_wavelength : float
Low wavelength cutoff
hi_wavelength : float
High wavelength cutoff
rebin_percent : float
Rebinning percentage
Returns
-------
wavelength_bins : np.ndarray
"""
frac = (rebin_percent / 100.0) + 1
lowspac = rebin_percent / 100.0 * lo_wavelength
hispac = rebin_percent / 100.0 * hi_wavelength
lowl = lo_wavelength - lowspac / 2.0
hil = hi_wavelength + hispac / 2.0
num_steps = int(np.floor(np.log10(hil / lowl) / np.log10(frac)) + 1)
rebinning = np.logspace(np.log10(lowl), np.log10(hil), num=num_steps)
return rebinning
[docs]def accumulate_HDF_files(files):
r"""
Accumulates HDF files together, writing an accumulated file in the current
directory. The accumulated datafile is written in the current directory
(os.getcwd()) and has a filename based on the first file, prepended by
'ADD\_'. For example, if the first file is PLP0000708.nx.hdf then the
accumulated file is ADD_PLP0000708.nx.hdf.
Parameters
----------
files : list
Strings specifying NeXUS filenames to be added together.
"""
# don't do anything if no files were supplied.
if not len(files):
return None
# the first file is the "master file", lets copy it.
file = files[0]
pth = _check_HDF_file(file)
if not pth:
raise ValueError("All files must refer to an hdf5 file")
new_name = "ADD_" + Path(pth).name
shutil.copy(pth, Path(".") / new_name)
master_file = Path(".") / new_name
with h5py.File(master_file, "r+") as h5master:
# now go through each file and accumulate numbers:
for file in files[1:]:
pth = _check_HDF_file(file)
h5data = h5py.File(pth, "r")
h5master["entry1/data/hmm"][0] += h5data["entry1/data/hmm"][0]
h5master["entry1/monitor/bm1_counts"][0] += h5data[
"entry1/monitor/bm1_counts"
][0]
h5master["entry1/instrument/detector/total_counts"][0] += h5data[
"entry1/instrument/detector/total_counts"
][0]
h5master["entry1/instrument/detector/time"][0] += h5data[
"entry1/instrument/detector/time"
][0]
h5master.flush()
h5data.close()
def _check_HDF_file(h5data):
# If a file is an HDF5 file, then return the filename.
# otherwise return False
if isinstance(h5data, h5py.File):
return h5data.filename
else:
with h5py.File(h5data, "r") as h5data:
if isinstance(h5data, h5py.File):
return h5data.filename
return False
@contextmanager
def _possibly_open_hdf_file(f, mode="r"):
"""
Context manager for hdf5 files.
Parameters
----------
f : {file-like, str, Path}
If `f` is a file, then yield the file. If `f` is a str then open the
file and yield the newly opened file.
On leaving this context manager the file is closed, if it was opened
by this context manager (i.e. `f` was a string).
mode : str, optional
mode is an optional string that specifies the mode in which the file
is opened.
Yields
------
g : file-like
On leaving the context manager the file is closed, if it was opened by
this context manager.
"""
close_file = False
if isinstance(f, h5py.File):
g = f
else:
g = h5py.File(f, mode)
close_file = True
yield g
if close_file:
g.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process some Platypus NeXUS"
"files to produce their TOF "
"spectra."
)
parser.add_argument(
"file_list",
metavar="N",
type=int,
nargs="+",
help="integer file numbers",
)
parser.add_argument(
"-b",
"--bdir",
type=str,
help="define the location to find the nexus files",
)
parser.add_argument(
"-d",
"--direct",
action="store_true",
default=False,
help="is the file a direct beam?",
)
parser.add_argument(
"-r",
"--rebin",
type=float,
help="rebin percentage for the wavelength -1<rebin<10",
default=1,
)
parser.add_argument(
"-ll",
"--lolambda",
type=float,
help="lo wavelength cutoff for the rebinning",
default=2.5,
)
parser.add_argument(
"-hl",
"--hilambda",
type=float,
help="lo wavelength cutoff for the rebinning",
default=19.0,
)
parser.add_argument(
"-i",
"--integrate",
type=int,
help="-1 to integrate all spectra, otherwise enter the"
" spectrum number.",
default=-1,
)
args = parser.parse_args()
for file in args.file_list:
fname = f"PLP{file:07d}.nx.hdf"
path = Path(args.bdir) / fname
try:
a = PlatypusNexus(path)
a.process(
lo_wavelength=args.lolambda,
hi_wavelength=args.hilambda,
direct=args.direct,
rebin_percent=args.rebin,
integrate=args.integrate,
)
fname = "PLP%07d.spectrum" % file
out_fname = path / fname
integrate = args.integrate
if args.integrate < 0:
integrate = 0
a.write_spectrum_dat(out_fname, scanpoint=integrate)
except IOError:
print("Couldn't find file: %d. Use --basedir option" % file)
def _plot_offspec(
f,
I_min=-6,
I_max=-2,
I_step=0.1,
Qx_interval=(-0.0004, 0.0004),
Qz_interval=(0.01, 0.05),
):
"""
Generates a plot of offspecular/reciprocal space map data and returns this as a fig, ax tuple
(allowing the user to customise the graph further).
Parameters
----------
f : {str, Path, file-like}
File containing the data. Expects a numpy binary array file (i.e those generated via `np.savez`).
I_min : float
Minimum log(intensity)
I_max : float
Max log (intensity)
I_step : float
The interval for contours (log units)
Qx_interval : (float, float)
The upper and lower x-axis limits for the plot
Qz_interval : (float, float)
The upper and lower y-axis limits for the plot
"""
from matplotlib.colors import LogNorm
import matplotlib.pyplot as plt
npz = np.load(f)
qz = npz["m_qz"]
qx = npz["m_qx"]
m_ref = npz["m_ref"]
# replace negative values (which won't plot on a log scale) with the smallest available positive value
m_ref[m_ref == 0] = np.min(m_ref[m_ref > 0])
levels = np.arange(I_min, I_max, I_step)
color_levels = np.power(10, levels)
fig = plt.figure()
ax = fig.add_subplot(111)
contour = ax.contourf(qx, qz, m_ref, levels=color_levels, norm=LogNorm())
ax.set_xlim(Qx_interval)
ax.set_ylim(Qz_interval)
ax.locator_params(axis="x", nbins=3)
ax.set_ylabel(r"$Q_z (\AA^{-1})$")
ax.set_xlabel(r"$Q_x (\AA^{-1})$")
cb = fig.colorbar(contour)
cb.ax.set_ylabel("Intensity")
return fig, ax