from __future__ import annotations
import pathlib
import numpy as np
import pandas as pd
from ..common import constants
from ..common.validators import column_or_1d
from . import functional as F
__all__ = [
"AddChannel",
"PadToWindow",
"StandardizeSpect",
"ToFloatTensor",
"ToLongTensor",
"ViewAsWindowBatch",
]
# adapted from:
# https://github.com/NickleDave/hybrid-vocal-classifier/blob/master/hvc/neuralnet/utils.py
[docs]
class StandardizeSpect:
"""transform that standardizes spectrograms so they are all
on the same scale, by subtracting off the mean and dividing by the
standard deviation from a 'fit' set of spectrograms.
Attributes
----------
mean_freqs : numpy.ndarray
mean values for each frequency bin across the fit set of spectrograms
std_freqs : numpy.ndarray
standard deviation for each frequency bin across the fit set of spectrograms
non_zero_std : numpy.ndarray
boolean, indicates where std_freqs has non-zero values. Used to avoid divide-by-zero errors.
"""
[docs]
def __init__(self, mean_freqs=None, std_freqs=None, non_zero_std=None):
"""initialize a new StandardizeSpect instance
Parameters
----------
mean_freqs : numpy.ndarray
vector of mean values for each frequency bin across the fit set of spectrograms
std_freqs : numpy.ndarray
vector of standard deviations for each frequency bin across the fit set of spectrograms
non_zero_std : numpy.ndarray
boolean, indicates where std_freqs has non-zero values. Used to avoid divide-by-zero errors.
"""
if any(
[arg is not None for arg in (mean_freqs, std_freqs, non_zero_std)]
):
mean_freqs, std_freqs, non_zero_std = (
column_or_1d(arr)
for arr in (mean_freqs, std_freqs, non_zero_std)
)
if (
len(
np.unique(
[
arg.shape[0]
for arg in (mean_freqs, std_freqs, non_zero_std)
]
)
)
!= 1
):
raise ValueError(
"`mean_freqs`, `std_freqs`, and `non_zero_std` must all have the same length.\n"
f"`mean_freqs.shape`: {mean_freqs.shape}, `std_freqs.shape`: {std_freqs.shape}, "
f"`non_zero_std.shape`: {non_zero_std.shape}"
)
self.mean_freqs = mean_freqs
self.std_freqs = std_freqs
self.non_zero_std = non_zero_std
[docs]
@classmethod
def fit_dataset_path(
cls, dataset_path, split="train", subset: str | None = None
):
"""Returns a :class:`StandardizeSpect` instance
that is fit to a split from a dataset,
given the path to that dataset and the
name of the split.
Parameters
----------
dataset_path : str or pathlib.Path
Path to a dataset.
split : str
Name of split from dataset to fit.
Returns
-------
standardize_spect : StandardizeSpect
Instance that has been fit to input data from split.
"""
from vak.datasets import frame_classification
from vak.datasets.frame_classification import Metadata
dataset_path = pathlib.Path(dataset_path)
metadata = Metadata.from_dataset_path(dataset_path)
dataset_csv_path = dataset_path / metadata.dataset_csv_filename
dataset_path = dataset_csv_path.parent
dataset_df = pd.read_csv(dataset_csv_path)
if subset:
dataset_df = dataset_df[dataset_df.split == split].copy()
else:
dataset_df = dataset_df[dataset_df.split == split].copy()
frames_paths = dataset_df[
frame_classification.constants.FRAMES_PATH_COL_NAME
].values
frames = np.load(dataset_path / frames_paths[0])[constants.SPECT_KEY]
# in files, spectrograms are in orientation (freq bins, time bins)
# so we take mean and std across columns, i.e. time bins, i.e. axis 1
mean_freqs = np.mean(frames, axis=1)
std_freqs = np.std(frames, axis=1)
for frames_path in frames_paths[1:]:
frames = np.load(dataset_path / frames_path)[constants.SPECT_KEY]
mean_freqs += np.mean(frames, axis=1)
std_freqs += np.std(frames, axis=1)
mean_freqs = mean_freqs / len(frames_paths)
std_freqs = std_freqs / len(frames_paths)
non_zero_std = np.argwhere(std_freqs != 0)
return cls(mean_freqs, std_freqs, non_zero_std)
[docs]
@classmethod
def fit(cls, spect):
"""Fit a StandardizeSpect instance.
Parameters
----------
spect : numpy.ndarray
with dimensions (frequency bins, time bins)
Notes
-----
Input should be spectrogram.
Fit function finds the mean and standard deviation of each frequency bin,
which are used by `transform` method to scale other spectrograms.
"""
# TODO: make this function accept list and/or ndarray with batch dimension
if spect.ndim != 2:
raise ValueError("input spectrogram should be a 2-d array")
mean_freqs = np.mean(spect, axis=1)
std_freqs = np.std(spect, axis=1)
non_zero_std = np.argwhere(std_freqs != 0)
return cls(mean_freqs, std_freqs, non_zero_std)
def __call__(self, spect):
"""normalizes input spectrogram with fit parameters.
Parameters
----------
spect : numpy.ndarray
2-d array with dimensions (frequency bins, time bins).
Returns
-------
z_norm_spect : numpy.ndarray
array standardized to same scale as set of spectrograms that
SpectScaler was fit with
"""
if any(
[not hasattr(self, attr) for attr in ["mean_freqs", "std_freqs"]]
):
raise AttributeError(
"SpectScaler properties are set to None,"
"must call fit method first to set the"
"value of these properties before calling"
"transform"
)
if not isinstance(spect, np.ndarray):
raise TypeError(
f"type of spect must be numpy.ndarray but was: {type(spect)}"
)
if spect.shape[0] != self.mean_freqs.shape[0]:
raise ValueError(
f"number of rows in spects, {spect.shape[0]}, "
f"does not match number of elements in self.mean_freqs, {self.mean_freqs.shape[0]},"
"i.e. the number of frequency bins from the spectrogram"
"to which the scaler was fit originally"
)
return F.standardize_spect(
spect, self.mean_freqs, self.std_freqs, self.non_zero_std
)
def __repr__(self):
args = f"(mean_freqs={self.mean_freqs}, std_freqs={self.std_freqs}, non_zero_std={self.non_zero_std})"
return self.__class__.__name__ + args
[docs]
class PadToWindow:
"""pad a 1d or 2d array so that it can be reshaped
into consecutive windows of specified size
Parameters
----------
arr : numpy.ndarray
with 1 or 2 dimensions, e.g. a vector of labeled timebins
or a spectrogram.
window_size : int
width of window in number of elements.
padval : float
value to pad with. Added to end of array, the
"right side" if 2-dimensional.
return_padding_mask : bool
if True, return a boolean vector to use for cropping
back down to size before padding. padding_mask has size
equal to width of padded array, i.e. original size
plus padding at the end, and has values of 1 where
columns in padded are from the original array,
and values of 0 where columns were added for padding.
Returns
-------
padded : numpy.ndarray
padded with padval
padding_mask : np.bool
has size equal to width of padded, i.e. original size
plus padding at the end. Has values of 1 where
columns in padded are from the original array,
and values of 0 where columns were added for padding.
Only returned if return_padding_mask is True.
"""
[docs]
def __init__(self, window_size, padval=0.0, return_padding_mask=True):
if not isinstance(window_size, int) or (
isinstance(window_size, float)
and window_size.is_integer() is False
):
raise ValueError(
f"window size must be an int or a whole number float;"
f" type was {type(window_size)} and value was {window_size}"
)
if type(padval) not in (int, float):
raise TypeError(
f"type for padval must be int or float but was: {type(padval)}"
)
if not isinstance(return_padding_mask, bool):
raise TypeError(
"return_padding_mask must be boolean (True or False), "
f"but was type {type(return_padding_mask)} with value {return_padding_mask}"
)
self.window_size = window_size
self.padval = padval
self.return_padding_mask = return_padding_mask
def __call__(self, arr):
return F.pad_to_window(
arr, self.window_size, self.padval, self.return_padding_mask
)
def __repr__(self):
args = f"(window_size={self.window_size}, padval={self.padval}, return_padding_mask={self.return_padding_mask})"
return self.__class__.__name__ + args
[docs]
class ViewAsWindowBatch:
"""return view of a 1d or 2d array as a batch of non-overlapping windows
Parameters
----------
arr : numpy.ndarray
with 1 or 2 dimensions, e.g. a vector of labeled timebins
or a 2-d array representing a spectrogram.
If the array has 2-d dimensions, the returned array will
have dimensions (batch, height of array, window width)
window_width : int
width of window in number of elements.
Returns
-------
batch_windows : numpy.ndarray
with shape (batch size, window_width) if array is 1d,
or with shape (batch size, height, window_width) if array is 2d.
Batch size will be arr.shape[-1] // window_width.
Window width must divide arr.shape[-1] evenly.
To pad the array so it can be divided into windows of the specified
width, use the `pad_to_window` transform
Notes
-----
adapted from skimage.util.view_as_blocks
https://github.com/scikit-image/scikit-image/blob/f1b7cf60fb80822849129cb76269b75b8ef18db1/skimage/util/shape.py#L9
"""
[docs]
def __init__(self, window_width: int | float):
if not isinstance(window_width, int) or (
isinstance(window_width, float)
and window_width.is_integer() is False
):
raise ValueError(
f"window size must be an int or a whole number float;"
f" type was {type(window_width)} and value was {window_width}"
)
self.window_width = window_width
def __call__(self, arr):
return F.view_as_window_batch(arr, self.window_width)
def __repr__(self):
args = f"(window_width={self.window_width})"
return self.__class__.__name__ + args
[docs]
class ToFloatTensor:
"""convert Numpy array to torch.FloatTensor.
Parameters
----------
arr : numpy.ndarray
Returns
-------
float_tensor
with dtype 'float32'
"""
[docs]
def __init__(self):
pass
def __call__(self, arr):
return F.to_floattensor(arr)
def __repr__(self):
return self.__class__.__name__
[docs]
class ToLongTensor:
"""convert Numpy array to torch.LongTensor.
Parameters
----------
arr : numpy.ndarray
Returns
-------
long_tensor : torch.Tensor
with dtype 'float64'
"""
[docs]
def __init__(self):
pass
def __call__(self, arr):
return F.to_longtensor(arr)
def __repr__(self):
return self.__class__.__name__
[docs]
class AddChannel:
"""Add a "channel" dimension to a tensor.
Transform that makes it easy to treat a spectrogram as an image,
by adding a dimension with a single 'channel', analogous to grayscale.
In this way the tensor can be fed to e.g. convolutional layers.
Parameters
----------
input : torch.Tensor
with two dimensions (height, width).
channel_dim : int
dimension where "channel" is added.
Default is 0, which returns a tensor with dimensions (channel, height, width).
"""
[docs]
def __init__(self, channel_dim: int | float = 0):
if not isinstance(channel_dim, int) or (
isinstance(channel_dim, float)
and channel_dim.is_integer() is False
):
raise ValueError(
f"window size must be an int or a whole number float;"
f" type was {type(channel_dim)} and value was {channel_dim}"
)
channel_dim = int(channel_dim)
if channel_dim < 0 and channel_dim != -1:
raise ValueError(
"value of channel_dim should be a non-negative integer, or -1 (for last dimension). "
f"Value was: {channel_dim}"
)
self.channel_dim = channel_dim
def __call__(self, input):
return F.add_channel(input, channel_dim=self.channel_dim)
def __repr__(self):
args = f"(channel_dim={self.channel_dim})"
return self.__class__.__name__ + args