# coding: utf-8
"""
Batch reduction of reflectometry data based on a spreadsheet
"""
import collections
import numpy as np
import os
import os.path
import pandas as pd
import pickle
import re
import sys
import warnings
try:
import IPython.display
_have_ipython = True
except ImportError:
_have_ipython = False
from refnx.reduce import reduce_stitch, ReductionOptions
ReductionEntryTuple = collections.namedtuple(
"ReductionEntry", ["row", "ds", "name", "fname", "entry"]
)
class ReductionEntry(ReductionEntryTuple):
def rescale(self, scale_factor, write=True):
self.ds.scale(scale_factor)
if write:
with open(self.fname, "w") as w:
self.ds.save(w)
class ReductionCache(list):
"""
Cache for the reduced data to enable look-up by name, run number or row.
Entries in the cache are ReductionEntry objects.
Examples
--------
>>> reducer = BatchReducer('reduction.xls', data_folder, rebin_percent)
>>> data = reducer()
Find the filename of a run in the cache by sample name
>>> data.name('W1234').fname
Find a run in the cache by run number and plot it
>>> data = data.run(24623)
>>> plt.plot(data[0], data[1])
Search for data by run name (starting substring or regular expression)
>>> data.name_startswith('W')
>>> plot_data_sets(data.name_search('^W')
"""
_default_persistent_cache = "_reduction_cache.pickle"
def __init__(self, persistent=True):
"""
Create a new reduction cache
Parameters
----------
persistent : bool or str, optional
Reduction cache should be stored on disk to allow the reducer to
be restarted without having to rereduce the data. If a str is
given, it is used as the filename for the persistent cache,
otherwise, a default value is used.
"""
super().__init__()
self.name_cache = {}
self.run_cache = {}
self.row_cache = {}
self.persistent = persistent
if self.persistent:
self.load_cache()
def add(self, row, ds, name, fname, entry, update=True):
"""
Add (or update) a data set in the reduction cache
Parameters
----------
row : int
row number in the batch reduction spreadsheet
ds : ReflectDataset
reduced data from the reduce_stitch (or similar) function
name : str
name of the same, as specified in the spreadsheet
fname : str
filename that was used to save the data during reduction
entry : pandas.Series
the row of the spreadsheet (as a row from the pandas table)
from which the reduction was controlled
update : boolean (optional)
if the spreadsheet row (as identified by the `row` argument)
has already been added to the cache, then replace the existing
entry in the cache.
"""
data = ReductionEntry(row, ds, name, fname, entry)
# if the data is already in the cache, update it
if update and row in self.row_cache:
idx = self.row_cache[row]
self[idx] = data
else:
idx = len(self)
self.append(data)
self.name_cache[name] = idx
self.row_cache[row] = idx
# also cache the runs that made up the reduction, which may be
# several since they can be stitched together
runs = run_list(entry)
for run in runs:
self.run_cache[run] = idx
if self.persistent:
self.write_cache()
return data
def delete_rows(self, row_numbers):
"""Delete a row from the reduction cache
Parameters
----------
row_numbers: list of int
row numbers (from the reduction spreadsheet) that should be
deleted from the cache
"""
for row in row_numbers:
if row not in self.row_cache:
print("Not deleting unknown row %s" % row)
continue
self[self.row_cache[row]] = None
del self.row_cache[row]
print("Deleted row %s" % row)
if self.persistent:
self.write_cache()
def run(self, run_number):
"""select a single data set by run number
Parameters
----------
run_number : int
run number to find
"""
return self[self.run_cache[run_number]]
def runs(self, run_numbers):
"""select several data sets by run number
Parameters
----------
run_numbers : iterable
run numbers to find
"""
return [self[self.run_cache[r]] for r in run_numbers]
def row(self, row_number):
"""select a single data set by spreadsheet row number
Parameters
----------
row_number : int
row numbers to find
"""
return self[self.row_cache[row_number]]
def rows(self, row_numbers):
"""select several data sets by spreadsheet row number
Parameters
----------
row_numbers : iterable
row numbers to find
"""
return [
entry
for entry in self
if entry is not None and entry.row in row_numbers
]
def name(self, name):
"""select a single data set by sample name
Parameters
----------
name : str
sample name to find
"""
return self[self.name_cache[name]]
def name_startswith(self, name):
"""select data sets by start of sample name
Parameters
----------
name : str
fragment that must be at the start of the sample name
"""
matches = [
entry
for entry in self
if entry is not None and entry.name.startswith(name)
]
return matches
def name_search(self, search):
r"""select data sets by a regular expression on sample name
The search pattern is a `regular expression`_ that is matched with
Parameters
----------
search : str or SRE_Pattern
string or compiled regular expression (from `re.compile(pattern)`)
that will be checked against the sample name.
Examples
--------
Select all data where the name starts with `Sample 1`:
>>> data.name_search("^Sample 1")
Select all data where the name contains `pH 4.0`:
>>> data.name_search(r"pH 4\.0")
.. _`regular expression`:
https://docs.python.org/3/howto/regex.html
"""
if isinstance(search, str):
name_re = re.compile(search)
else:
name_re = search
matches = [
entry
for entry in self
if entry is not None and name_re.search(entry.name)
]
return matches
def summary(self):
"""pretty print a list of all data sets
If available, the pandas pretty printer is used with IPython HTML
display.
"""
if _have_ipython:
IPython.display.display(IPython.display.HTML(self._repr_html_()))
else:
print(self)
def _summary_dataframe(self):
"""construct a summary table of the data in the cache"""
df = pd.DataFrame(columns=self[0].entry.axes)
for i, entry in enumerate(self):
if entry is not None:
df.loc[i] = list(entry.entry)
return df
def write_cache(self, filename=None):
"""write a persistent cache of reduced data to disk
Parameters
----------
filename : str, optional
filename to which the cache should be written; if not specified
or `None`, the default filename is used.
"""
with open(self._cache_filename(filename), "wb") as fh:
pickle.dump(self, fh)
def drop_cache(self, filename=None):
"""delete the persistent cache of reduced data from disk
Parameters
----------
filename : str, optional
filename of the cache to be deleted; if not specified or `None`,
the default filename is used.
"""
os.remove(self._cache_filename(filename))
def load_cache(self, filename=None):
"""load a persistent cache of reduced data from disk
Parameters
----------
filename : str, optional
filename from which the cache should be loaded; if not specified
or `None`, the default filename is used.
"""
try:
if not os.path.getsize(self._cache_filename(filename)):
print("On-disk cache empty")
return
with open(self._cache_filename(filename), "rb") as fh:
cached = pickle.load(fh)
self.name_cache = cached.name_cache
self.run_cache = cached.run_cache
self.row_cache = cached.row_cache
self.extend(cached)
print("On-disk cache loaded")
except OSError: # (FileNotFoundError is Python 3 only)
print("On-disk cache not found")
def _cache_filename(self, filename=None):
"""return the filename for the persistent cache if it is in use"""
if not self.persistent:
return None
if filename is not None:
return filename
if self.persistent is not True:
return self.persistent
return self._default_persistent_cache
def _repr_html_(self):
df = self._summary_dataframe()
return "<b>Summary of reduced data</b>" + df.fillna("")._repr_html_()
def __str__(self):
df = self._summary_dataframe()
return "Summary of reduced data\n\n" + str(df)
[docs]class BatchReducer:
r"""
Batch reduction of reflectometry data based on spreadsheet metadata.
Example
-------
>>> from refnx.reduce import BatchReducer
>>> data_folder = r'V:\data\current'
>>> b = BatchReducer('reduction.xls', data_folder=data_folder)
>>> b.reduce()
The spreadsheet must have columns:
reduce name scale refl1 refl2 refl3 dir1 dir2 dir3
Only rows where the value of the `reduce` column is 1 and where the sample
name is set will be processed.
"""
def __init__(
self,
filename,
data_folder=None,
verbose=True,
persistent=True,
trim_trailing=True,
reduction_options=None,
prefix="PLP",
):
"""
Create a batch reducer using metadata from a spreadsheet
Parameters
----------
filename : {str, Path}
The filename of the spreadsheet to be used. Must be readable by
`pandas.read_excel` (`.xls` and `.xlsx` files).
data_folder : {str, Path, None}
Filesystem path for the raw data files. If `data_folder is None`
then the current working directory is used.
verbose : bool, optional
Prints status information during batch reduction.
persistent : bool, optional
Reduction cache should be stored on disk to allow the reducer to
be restarted without having to rereduce the data.
trim_trailing : bool, optional
When datasets are spliced together do you want to remove points in
the overlap region from the preceding dataset?
reduction_options : dict, or sequence of dict, optional
Options passed directly to `refnx.reduce.reduce_stitch`. Look at
that docstring for complete specification of options.
prefix : {"PLP", "SPZ"}
Whether you expect to be doing Platypus or Spatz reduction.
"""
self.cache = ReductionCache(persistent)
self.filename = filename
self.data_folder = os.getcwd()
if data_folder is not None:
self.data_folder = data_folder
self.trim_trailing = trim_trailing
self.reduction_options = reduction_options or ReductionOptions()
self.verbose = verbose
self.prefix = prefix
def _reduce_row(self, entry):
"""Process a single row using reduce_stitch
Parameters
----------
entry : pandas.Series
Spreadsheet row for this data set
"""
# Identify the runs to be used for reduction
runs = run_list(entry, "refl")
directs = run_list(entry, "directs")
if self.verbose:
fmt = "Reducing %s [%s]/[%s]"
print(
fmt
% (
entry["name"],
", ".join("%d" % r for r in runs),
", ".join("%d" % r for r in directs),
)
)
sys.stdout.flush() # keep progress updated
if not runs:
warnings.warn(
"Row %d (%s) has no reflection runs. Skipped."
% (entry["source"], entry["name"])
)
return None, None
if not directs:
warnings.warn(
"Row %d (%s) has no direct beam runs. Skipped."
% (entry["source"], entry["name"])
)
return None, None
if len(runs) > len(directs):
warnings.warn(
"Row %d (%s) has differing numbers of"
" direct & reflection runs. Skipped."
% (entry["source"], entry["name"])
)
return None, None
ds, fname = reduce_stitch(
runs,
directs,
trim_trailing=self.trim_trailing,
data_folder=self.data_folder,
reduction_options=self.reduction_options,
prefix=self.prefix,
)
return ds, fname
[docs] def load_runs(self):
cols = range(9)
all_runs = pd.read_excel(
self.filename,
usecols=cols,
converters={
"refl1": int,
"refl2": int,
"refl3": int,
"dir1": int,
"dir2": int,
"dir3": int,
},
)
# Add the row number in the spreadsheet as an extra column
# row numbers for the runs will start at 2 not 0
all_runs.insert(0, "source", all_runs.index + 2)
# add in some extra columns to indicate successful reduction
all_runs = all_runs.assign(reduced=False)
all_runs = all_runs.assign(filename="")
return all_runs
[docs] def select_runs(self, all_runs):
# skip samples not marked for reduction or with no sample name
mask = (all_runs.reduce == 1) & (~all_runs.name.isnull())
return mask
[docs] def reduce(self, show=True):
"""
Batch reduce data based on metadata from a spreadsheet
Parameters
----------
show : bool (optional, default=True)
display a summary table of the rows that were reduced
"""
all_runs = self.load_runs()
mask = self.select_runs(all_runs)
rows = all_runs[mask].index
# iterate through the rows that were marked for reduction
for idx in rows:
name = str(all_runs.loc[idx, "name"])
try:
ds, fname = self._reduce_row(all_runs.loc[idx])
except IOError as e:
# data file not found (normally)
reduction_ok = str(e)
warnings.warn("Run %s: %s" % (name, str(e)))
ds = None
fname = None
else:
reduction_ok = ds is not None
if reduction_ok:
# store this away to make plotting easier later
ds.name = name
# record outcomes of reduction in the table
all_runs.loc[idx, "filename"] = fname
all_runs.loc[idx, "reduced"] = reduction_ok
cached = self.cache.add(
all_runs.loc[idx, "source"], ds, name, fname, all_runs.loc[idx]
)
if reduction_ok:
scale = all_runs.loc[idx, "scale"]
if not np.isnan(scale) and scale != 1:
print("Applying scale factor %f" % scale)
sys.stdout.flush() # keep progress updated
cached.rescale(scale)
if show:
if _have_ipython:
IPython.display.display(all_runs[mask].fillna(""))
else:
print(all_runs[mask])
return self.cache
def __call__(self):
"""run the reducer as the default action for the BatchReducer"""
return self.reduce()
def run_list(entry, mode="refl"):
"""
Generates a list of run numbers from a reduction spreadsheet entry
Parameters
----------
entry : pandas.Series
A row from the reduction spreadsheet expressed
mode : 'refl' or 'directs'
Fetch either the run numbers from the reflectometry experiment
or from the direct beams.
"""
if mode not in ("refl", "directs"):
# FIXME: crap API
raise ValueError("Unknown mode %s" % mode)
if mode == "refl":
listed = [entry["refl1"], entry["refl2"], entry["refl3"]]
else:
listed = [entry["dir1"], entry["dir2"], entry["dir3"]]
valid = []
for item in listed:
if isinstance(item, str) and "," in item:
runs = [int(i) for i in item.split(",")]
else:
runs = [item]
for run in runs:
try:
if not np.isnan(run):
valid.append(run)
except TypeError:
raise ValueError(
"Value '%s' could not be interpreted as a run"
" number" % run
)
# valid = [int(r) for r in l if not np.isnan(r)]
return [int(v) for v in valid]