""""
A basic representation of a 1D dataset
"""
from pathlib import Path, PurePath
import re
import io
import numpy as np
from scipy._lib._util import check_random_state
from refnx.util.nsplice import get_scaling_in_overlap
from refnx._lib import possibly_open_file
[docs]class Data1D:
r"""
A basic representation of a 1D dataset.
Parameters
----------
data : {str, file-like, Path, tuple of np.ndarray}, optional
`data` can be a string, file-like, or Path object referring to a File
to load the dataset from. The file should be plain text and have
2 to 4 columns separated by space, comma or tab. The columns represent
`x, y [y_err [, x_err]]`.
Alternatively it is a tuple containing the data from which the dataset
will be constructed. The tuple should have between 2 and 4 members.
- data[0] - x
- data[1] - y
- data[2] - uncertainties on y, y_err
- data[3] - uncertainties on x, x_err
`data` must be at least two long, `x` and `y`.
If the tuple is at least 3 long then the third member is `y_err`.
If the tuple is 4 long then the fourth member is `x_err`.
All arrays must have the same shape.
mask : array-like
Specifies which data points are (un)masked. Must be broadcastable
to the y-data. `Data1D.mask = None` clears the mask. If a mask value
equates to `True`, then the point is included, if a mask value equates
to `False` it is excluded.
Attributes
----------
filename : {str, Path, None}
The file the data was read from
weighted : bool
Whether the y data has uncertainties
metadata : dict
Information that should be retained with the dataset.
"""
def __init__(self, data=None, mask=None, **kwds):
self.filename = None
self.name = None
self.metadata = kwds
self._x = np.zeros(0)
self._y = np.zeros(0)
self._y_err = None
self._x_err = None
self.weighted = False
# if it's a file then open and load the file.
if (
hasattr(data, "read")
or type(data) is str
or isinstance(data, PurePath)
):
self.load(data)
elif isinstance(data, Data1D):
# copy a dataset (but not it's file info)
self.name = data.name
self.filename = data.filename
self.metadata = data.metadata
self._x = data._x
self._y = data._y
self._y_err = data._y_err
self._x_err = data._x_err
self.weighted = data.weighted
self._mask = data._mask
elif data is not None:
self._x = np.array(data[0], dtype=float)
self._y = np.array(data[1], dtype=float)
if len(data) > 2:
self._y_err = np.array(data[2], dtype=float)
self.weighted = True
if len(data) > 3:
self._x_err = np.array(data[3], dtype=float)
self._mask = None
if mask is not None:
self._mask = np.broadcast_to(mask, self._y.shape)
def __len__(self):
"""
the number of unmasked points in the dataset.
"""
return self.y.size
def __str__(self):
return f"<{self.name}>, {len(self)} points"
def __repr__(self):
msk = self._mask
if np.all(self._mask):
msk = None
if self.filename is not None:
return f"Data1D(data={str(self.filename)!r}, mask={msk!r})"
else:
return f"Data1D(data={self.data!r}, mask={msk!r})"
@property
def x(self):
"""
np.ndarray : x data (possibly masked)
"""
if self._mask is not None and self._x.size:
return self._x[self.mask]
return self._x
@property
def y(self):
"""
np.ndarray : y data (possibly masked)
"""
if self._mask is not None and self._y.size:
return self._y[self.mask]
return self._y
@property
def x_err(self):
"""
np.ndarray : x uncertainty (possibly masked)
"""
if (
self._x_err is not None
and self._mask is not None
and self._x_err.size
):
return self._x_err[self.mask]
return self._x_err
@x_err.setter
def x_err(self, x_err):
"""
np.ndarray : y uncertainty (possibly masked)
"""
self._x_err = x_err
@property
def y_err(self):
"""
uncertainties on the y data (possibly masked)
"""
if (
self._y_err is not None
and self._mask is not None
and self._y_err.size
):
return self._y_err[self.mask]
return self._y_err
@property
def mask(self):
"""
mask
"""
if self._mask is None:
self._mask = np.full_like(self._y, True, dtype=bool)
return self._mask
@mask.setter
def mask(self, mask):
"""
mask
"""
if mask is None:
mask = True
self._mask = np.broadcast_to(mask, self._y.shape).astype(bool)
@property
def data(self):
"""
4-tuple containing the (`x`, `y`, `y_err`, `x_err`) data
"""
return self.x, self.y, self.y_err, self.x_err
@property
def unmasked_data(self):
"""
4-tuple containing unmasked (x, y, y_err, x_err) data
"""
return self._x, self._y, self._y_err, self._x_err
@property
def finite_data(self):
"""
4-tuple containing the (`x`, `y`, `y_err`, `x_err`) datapoints that are
finite.
"""
finite_loc = np.where(np.isfinite(self.y))
return (
self.x[finite_loc],
self.y[finite_loc],
self.y_err[finite_loc],
self.x_err[finite_loc],
)
@data.setter
def data(self, data_tuple):
"""
Set the data for this object from supplied data.
Parameters
----------
data_tuple : tuple
2 to 4 member tuple containing the (x, y, y_err, x_err) data to
specify the dataset. `y_err` and `x_err` are optional.
Notes
-----
Clears the mask for the dataset, it will need to be reapplied.
"""
self._x = np.array(data_tuple[0], dtype=float)
self._y = np.array(data_tuple[1], dtype=float)
self.weighted = False
self._y_err = None
self._x_err = None
if len(data_tuple) > 2 and data_tuple[2] is not None:
self._y_err = np.array(data_tuple[2], dtype=float)
self.weighted = True
if len(data_tuple) > 3 and data_tuple[3] is not None:
self._x_err = np.array(data_tuple[3], dtype=float)
self._mask = None
self.sort()
[docs] def scale(self, scalefactor=1.0):
"""
Scales the y and y_err data by dividing by `scalefactor`.
Parameters
----------
scalefactor : float
The scalefactor to divide by.
"""
self._y /= scalefactor
self._y_err /= scalefactor
[docs] def add_data(self, data_tuple, requires_splice=False, trim_trailing=True):
"""
Adds more data to the dataset.
Parameters
----------
data_tuple : tuple
2 to 4 member tuple containing the (x, y, y_err, x_err) data to add
to the dataset. `y_err` and `x_err` are optional.
requires_splice : bool, optional
When the new data is added to the dataset do you want to scale it
vertically so that it overlaps with the existing data? `y` and
`y_err` in `data_tuple` are both multiplied by the scaling factor.
trim_trailing : bool, optional
When the new data is concatenated do you want to remove points from
the existing data that are in the overlap region? This might be
done because the datapoints in the `data_tuple` you are adding have
have lower `y_err` than the preceding data.
Notes
-----
Raises `ValueError` if there are no points in the overlap region and
`requires_splice` was True. The added data is not masked.
"""
x, y, y_err, x_err = self.data
# dataset has no points, can just initialise with the tuple
if not len(self):
self.data = data_tuple
return
ax, ay = data_tuple[0:2]
# if ((len(data_tuple) > 2 and self.y_err is None) or
# (len(data_tuple) == 2 and self.y_err is not None)):
# raise ValueError("Both the existing Data1D and the data you're"
# " trying to add need to have y_err")
#
# if ((len(data_tuple) > 3 and self.x_err is None) or
# (len(data_tuple) == 3 and self.x_err is not None)):
# raise ValueError("Both the existing Data1D and the data you're"
# " trying to add need to have x_err")
ay_err = None
ax_err = None
if len(data_tuple) > 2:
ay_err = np.array(data_tuple[2], dtype=float)
if len(data_tuple) > 3:
ax_err = np.array(data_tuple[3], dtype=float)
mask2 = np.full_like(data_tuple[0], True, bool)
# which values in the first dataset overlap with the second
overlap_points = np.zeros_like(x, "bool")
# go through and stitch them together.
scale = 1.0
dscale = 0.0
if requires_splice and len(self) > 1:
scale, dscale, overlap_points = get_scaling_in_overlap(
x, y, y_err, ax, ay, ay_err
)
if (
(not np.isfinite(scale))
or (not np.isfinite(dscale))
or (not np.size(overlap_points, 0))
):
raise ValueError("No points in overlap region")
if not trim_trailing:
overlap_points[:] = False
qq = np.r_[x[~overlap_points], ax]
rr = np.r_[y[~overlap_points], ay * scale]
overall_mask = np.r_[self.mask[~overlap_points], mask2]
try:
dr = np.r_[y_err[~overlap_points], ay_err * scale]
except (TypeError, ValueError):
if (ay_err is not None) or (y_err is not None):
raise ValueError(
"Both the existing Data1D and the data you're"
" trying to add need to have y_err"
)
dr = None
try:
dq = np.r_[x_err[~overlap_points], ax_err]
except (TypeError, ValueError):
if (ax_err is not None) or (x_err is not None):
raise ValueError(
"Both the existing Data1D and the data you're"
" trying to add need to have x_err"
)
dq = None
self.data = (qq, rr, dr, dq)
self.mask = overall_mask
self.sort()
[docs] def sort(self):
"""
Sorts the data in ascending order
"""
sorted = np.argsort(self.x)
self._x = self.x[sorted]
self._y = self.y[sorted]
if self._mask is not None:
self._mask = self._mask[sorted]
if self.y_err is not None:
self._y_err = self.y_err[sorted]
if self.x_err is not None:
self._x_err = self.x_err[sorted]
[docs] def save(self, f, header=None):
"""
Saves the data to file. Saves the data as 4 column ASCII.
Parameters
----------
f : {file-like, str, Path}
File to save the dataset to.
"""
if header is not None:
_header = header
elif len(self.metadata):
_header = str(self.metadata)
else:
_header = ""
np.savetxt(
f,
np.column_stack((self._x, self._y, self._y_err, self._x_err)),
header=_header,
)
[docs] def load(self, f):
"""
Loads a dataset from file, and overwrites existing data.
Must be 2 to 4 column ASCII.
Parameters
----------
f : {file-like, string, Path}
File to load the dataset from.
"""
# it would be nicer to simply use np.loadtxt, but this is an
# attempt to auto ignore header lines.
with possibly_open_file(f, "r") as g:
lines = list(reversed(g.readlines()))
x = list()
y = list()
y_err = list()
x_err = list()
# a marker for how many columns in the data there will be
numcols = 0
for i, line in enumerate(lines):
try:
# parse a line for numerical tokens separated by whitespace
# or comma
nums = [
float(tok)
for tok in re.split(r"\s|,", line)
if len(tok)
]
if len(nums) in [0, 1]:
# might be trailing newlines at the end of the file,
# just ignore those
continue
if not numcols:
# figure out how many columns one has
numcols = len(nums)
elif len(nums) != numcols:
# if the number of columns changes there's an issue
break
x.append(nums[0])
y.append(nums[1])
if len(nums) > 2:
y_err.append(nums[2])
if len(nums) > 3:
x_err.append(nums[3])
except ValueError:
# you should drop into this if you can't parse tokens into
# a series of floats. But the text may be meta-data, so
# try to carry on.
continue
x.reverse()
y.reverse()
y_err.reverse()
x_err.reverse()
if len(x) == 0:
raise RuntimeError(
"Datafile didn't appear to contain any data (or"
" was the wrong format)"
)
if numcols < 3:
y_err = None
if numcols < 4:
x_err = None
self.data = (x, y, y_err, x_err)
if hasattr(f, "read") and hasattr(f, "write"):
if hasattr(f, "name"):
# file-like ?
fname = f.name
else:
# BytesIO/ StringIO?
fname = ""
else:
fname = f
if isinstance(fname, PurePath):
# use a PurePath, not a system specific path type
# because Posix systems can't deal with WindowsPath
# and vice versa. This becomes an issue when pickling.
fname = PurePath(fname)
self.filename = fname
self.name = Path(fname).stem
[docs] def refresh(self):
"""
Refreshes a previously loaded dataset.
"""
if self.filename is not None:
with open(self.filename) as f:
self.load(f)
[docs] def plot(self, fig=None):
"""
Plot the dataset.
Requires matplotlib be installed.
Parameters
----------
fig: Figure instance, optional
If `fig` is not supplied then a new figure is created. Otherwise
the graph is created on the current axes on the supplied figure.
Returns
-------
fig, ax : :class:`matplotlib.figure.Figure`, :class:`matplotlib.Axes`
`matplotlib` figure and axes objects.
"""
import matplotlib.pyplot as plt
if fig is None:
fig = plt.figure()
ax = fig.add_subplot(111)
else:
ax = fig.gca()
if self.y_err is not None:
ax.errorbar(self.x, self.y, self.y_err, label=self.name)
else:
ax.scatter(self.x, self.y, label=self.name)
return fig, ax
def __add__(self, other):
"""
Adds two datasets together. Splices the data and trims data in the
overlap region.
"""
ret = Data1D(self.data)
ret.add_data(other.data, requires_splice=True, trim_trailing=True)
return ret
def __radd__(self, other):
"""
radd of two datasets. Splices the data and trims data in the
overlap region.
"""
self.add_data(other.data, requires_splice=True, trim_trailing=True)
return self
def __iadd__(self, other):
"""
iadd of two datasets. Splices the data and trims data in the
overlap region.
"""
self.add_data(other.data, requires_splice=True, trim_trailing=True)
return self
[docs] def synthesise(self, random_state=None):
"""
Synthesise a new dataset by adding Gaussian noise onto each of the
datapoints of the existing data.
Returns
-------
dataset : :class:`Data1D`
A new synthesised dataset
random_state : {int, :class:`numpy.random.RandomState`, :class:`numpy.random.Generator`}
If `random_state` is not specified the
:class:`numpy.random.RandomState` singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is
used, seeded with random_state.
If `random_state` is already a ``RandomState`` or a ``Generator``
instance, then that object is used.
Specify `random_state` for repeatable synthesising.
"""
if self._y_err is None:
raise RuntimeError(
"Can't synthesise new dataset without y_err uncertainties"
)
rng = check_random_state(random_state)
gnoise = rng.standard_normal(size=self._y_err.shape)
new_y = self._y + gnoise * self._y_err
data = list(self.data)
data.pop(1)
data.insert(1, new_y)
dataset = Data1D()
dataset.data = data
return dataset
def _data1D_to_hdf(fname, data):
"""
Writes a Data1D object to rudimentary HDF.
Parameters
----------
fname: str, BytesIO
Filename to write to
data: Data1D
dataset to write to file
"""
import h5py
with h5py.File(fname, "w") as f:
f.create_group("entry1/data")
for attr in ["x", "y", "y_err", "x_err"]:
v = getattr(data, attr)
if v is not None:
f.create_dataset(f"entry1/data/{attr}", data=v)
def _hdf_to_data1d(fname):
"""
Reads a rudimentary HDF file into a Data1D object.
Parameters
----------
fname: str, BytesIO
File to read from
Returns
-------
data: Data1D
dataset read from file
"""
import h5py
d = []
with h5py.File(fname, "r") as f:
for attr in ["x", "y", "y_err", "x_err"]:
try:
d.append(f[f"entry1/data/{attr}"][:])
except KeyError:
pass
data = Data1D(data=d)
data.filename = fname
return data