"""A dataset class used to train Parametric UMAP models."""
from __future__ import annotations
import pathlib
import warnings
from typing import Callable
import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.sparse._coo
from pynndescent import NNDescent
from sklearn.utils import check_random_state
from torch.utils.data import Dataset
# isort: off
# Ignore warnings from Numba deprecation:
# https://numba.readthedocs.io/en/stable/reference/deprecation.html#deprecation-of-object-mode-fall-back-behaviour-when-using-jit
# Numba is required by UMAP.
from numba.core.errors import NumbaDeprecationWarning
warnings.simplefilter("ignore", category=NumbaDeprecationWarning)
from umap.umap_ import fuzzy_simplicial_set # noqa: E402
# isort: on
[docs]
def get_umap_graph(
X: npt.NDArray,
n_neighbors: int = 10,
metric: str = "euclidean",
random_state: np.random.RandomState | None = None,
max_candidates: int = 60,
verbose: bool = True,
) -> scipy.sparse._coo.coo_matrix:
r"""Get graph used by UMAP,
the fuzzy topological representation.
Parameters
----------
X : numpy.ndarray
Data from which to build the graph.
n_neighbors : int
Number of nearest neighbors to use
when computing approximate nearest neighbors.
Parameter passed to :class:`pynndescent.NNDescent`
and :func:`umap._umap.fuzzy_simplicial_set`.
metric : str
Distance metric. Default is "cosine".
Parameter passed to :class:`pynndescent.NNDescent`
and :func:`umap._umap.fuzzy_simplicial_set`.
random_state : numpy.random.RandomState
Either a numpy.random.RandomState instance,
or None.
max_candidates : int
Default is 60.
Parameter passed to :class:`pynndescent.NNDescent`.
verbose : bool
Whether :class:`pynndescent.NNDescent` should log
finding the approximate nearest neighbors.
Default is True.
Returns
-------
graph : scipy.sparse.csr_matrix
Notes
-----
Adapted from https://github.com/timsainb/ParametricUMAP_paper
The graph returned is a graph of the probabilities of an edge exists between points.
Local, one-directional, probabilities (:math:`P^{UMAP}_{i|j}`)
are computed between a point and its neighbors to determine
the probability with which an edge (or simplex) exists,
based upon an assumption that data is uniformly distributed
across a manifold in a warped dataspace.
Under this assumption, a local notion of distance
is set by the distance to the :math:`k^{th}` nearest neighbor
and the local probability is scaled by that local notion of distance.
Where :math:`\rho_{i}` is a local connectivity parameter set
to the distance from :math:`x_i` to its nearest neighbor,
and :math:`\sigma_{i}` is a local connectivity parameter
set to match the local distance around :math:`x_i` upon its :math:`k` nearest neighbors
(where :math:`k` is a hyperparameter).
In the UMAP package, these are calculated using :func:`umap._umap.smooth_knn_dist`.
"""
random_state = (
check_random_state(None) if random_state is None else random_state
)
# number of trees in random projection forest
n_trees = 5 + int(round((X.shape[0]) ** 0.5 / 20.0))
# max number of nearest neighbor iters to perform
n_iters = max(5, int(round(np.log2(X.shape[0]))))
# get nearest neighbors
nnd = NNDescent(
X.reshape((len(X), np.product(np.shape(X)[1:]))),
n_neighbors=n_neighbors,
metric=metric,
n_trees=n_trees,
n_iters=n_iters,
max_candidates=max_candidates,
verbose=verbose,
)
# get indices and distances for 10 nearest neighbors of every point in dataset
knn_indices, knn_dists = nnd.neighbor_graph
# build fuzzy simplicial complex
umap_graph, sigmas, rhos = fuzzy_simplicial_set(
X=X,
n_neighbors=n_neighbors,
metric=metric,
random_state=random_state,
knn_indices=knn_indices,
knn_dists=knn_dists,
)
return umap_graph
[docs]
def get_graph_elements(
graph: scipy.sparse._coo.coo_matrix, n_epochs: int
) -> tuple[
scipy.sparse._coo.coo_matrix,
npt.NDArray,
npt.NDArray,
npt.NDArray,
npt.NDArray,
int,
]:
"""Get graph elements for Parametric UMAP Dataset.
Parameters
----------
graph : scipy.sparse.csr_matrix
The graph returned by :func:`get_umap_graph`.
n_epochs : int
Number of epochs model will be trained
Returns
-------
graph : scipy.sparse._coo.coo_matrix
The graph, now in COOrdinate format.
epochs_per_sample : int
head : numpy.ndarray
Graph rows.
tail : numpy.ndarray
Graph columns.
weight : numpy.ndarray
Graph data.
n_vertices : int
Number of vertices in dataset.
"""
graph = graph.tocoo()
# eliminate duplicate entries by summing them together
graph.sum_duplicates()
# number of vertices in dataset
n_vertices = graph.shape[1]
# get the number of epochs based on the size of the dataset
if n_epochs is None:
# For smaller datasets we can use more epochs
if graph.shape[0] <= 10000:
n_epochs = 500
else:
n_epochs = 200
# remove elements with very low probability
graph.data[graph.data < (graph.data.max() / float(n_epochs))] = 0.0
graph.eliminate_zeros()
# get epochs per sample based upon edge probability
epochs_per_sample = n_epochs * graph.data
head = graph.row
tail = graph.col
weight = graph.data
return graph, epochs_per_sample, head, tail, weight, n_vertices
[docs]
class ParametricUMAPDataset(Dataset):
"""A dataset class used to train Parametric UMAP models."""
[docs]
def __init__(
self,
dataset_path: str | pathlib.Path,
dataset_df: pd.DataFrame,
split: str,
subset: str | None = None,
n_epochs: int = 200,
n_neighbors: int = 10,
metric: str = "euclidean",
random_state: int | None = None,
transform: Callable | None = None,
):
"""Initialize a :class:`ParametricUMAPDataset` instance.
Parameters
----------
dataset_path : pathlib.Path
Path to directory that represents a
parametric UMAP dataset,
as created by
:func:`vak.prep.prep_parametric_umap_dataset`.
dataset_df : pandas.DataFrame
A parametric UMAP dataset,
represented as a :class:`pandas.DataFrame`.
split : str
The name of a split from the dataset,
one of {'train', 'val', 'test'}.
subset : str, optional
Name of subset to use.
If specified, this takes precedence over split.
Subsets are typically taken from the training data
for use when generating a learning curve.
n_epochs : int
Number of epochs model will be trained. Default is 200.
transform : callable, optional
"""
# subset takes precedence over split, if specified
if subset:
dataset_df = dataset_df[dataset_df.subset == subset].copy()
else:
dataset_df = dataset_df[dataset_df.split == split].copy()
data = np.stack(
[
np.load(dataset_path / spect_path)
for spect_path in dataset_df.spect_path.values
]
)
graph = get_umap_graph(
data,
n_neighbors=n_neighbors,
metric=metric,
random_state=random_state,
)
(
graph,
epochs_per_sample,
head,
tail,
weight,
n_vertices,
) = get_graph_elements(graph, n_epochs)
# we repeat each sample in (head, tail) a certain number of times depending on its probability
self.edges_to_exp, self.edges_from_exp = (
np.repeat(head, epochs_per_sample.astype("int")),
np.repeat(tail, epochs_per_sample.astype("int")),
)
# we then shuffle -- not sure this is necessary if the dataset is shuffled during training?
shuffle_mask = np.random.permutation(np.arange(len(self.edges_to_exp)))
self.edges_to_exp = self.edges_to_exp[shuffle_mask].astype(np.int64)
self.edges_from_exp = self.edges_from_exp[shuffle_mask].astype(
np.int64
)
self.data = data
self.dataset_df = dataset_df
self.transform = transform
@property
def duration(self):
return self.dataset_df["duration"].sum()
def __len__(self):
return self.edges_to_exp.shape[0]
@property
def shape(self):
tmp_x_ind = 0
tmp_item = self.__getitem__(tmp_x_ind)
return tmp_item[0].shape
def __getitem__(self, index):
edges_to_exp = self.data[self.edges_to_exp[index]]
edges_from_exp = self.data[self.edges_from_exp[index]]
if self.transform:
edges_to_exp = self.transform(edges_to_exp)
edges_from_exp = self.transform(edges_from_exp)
return (edges_to_exp, edges_from_exp)
[docs]
@classmethod
def from_dataset_path(
cls,
dataset_path: str | pathlib.Path,
split: str,
subset: str | None = None,
n_neighbors: int = 10,
metric: str = "euclidean",
random_state: int | None = None,
n_epochs: int = 200,
transform: Callable | None = None,
):
"""Make a :class:`ParametricUMAPDataset` instance,
given the path to parametric UMAP dataset.
Parameters
----------
dataset_path : pathlib.Path
Path to directory that represents a
parametric UMAP dataset,
as created by
:func:`vak.prep.prep_parametric_umap_dataset`.
split : str
The name of a split from the dataset,
one of {'train', 'val', 'test'}.
subset : str, optional
Name of subset to use.
If specified, this takes precedence over split.
Subsets are typically taken from the training data
for use when generating a learning curve.
n_neighbors : int
Number of nearest neighbors to use
when computing approximate nearest neighbors.
Parameter passed to :class:`pynndescent.NNDescent`
and :func:`umap._umap.fuzzy_simplicial_set`.
metric : str
Distance metric. Default is "cosine".
Parameter passed to :class:`pynndescent.NNDescent`
and :func:`umap._umap.fuzzy_simplicial_set`.
random_state : numpy.random.RandomState
Either a numpy.random.RandomState instance,
or None.
transform : callable
The transform applied to the input to the neural network :math:`x`.
Returns
-------
dataset : vak.datasets.parametric_umap.ParametricUMAPDataset
"""
import vak.datasets # import here just to make classmethod more explicit
dataset_path = pathlib.Path(dataset_path)
metadata = vak.datasets.parametric_umap.Metadata.from_dataset_path(
dataset_path
)
dataset_csv_path = dataset_path / metadata.dataset_csv_filename
dataset_df = pd.read_csv(dataset_csv_path)
return cls(
dataset_path,
dataset_df,
split,
subset,
n_epochs,
n_neighbors,
metric,
random_state,
transform,
)
[docs]
class ParametricUMAPInferenceDataset(Dataset):
[docs]
def __init__(
self,
data: npt.NDArray,
dataset_df: pd.DataFrame,
transform: Callable | None = None,
):
self.data = data
self.dataset_df = dataset_df
self.transform = transform
@property
def duration(self):
return self.dataset_df["duration"].sum()
def __len__(self):
return self.data.shape[0]
@property
def shape(self):
tmp_x_ind = 0
tmp_item = self.__getitem__(tmp_x_ind)
return tmp_item[0].shape
def __getitem__(self, index):
x = self.data[index]
df_index = self.dataset_df.index[index]
if self.transform:
x = self.transform(x)
return {"x": x, "df_index": df_index}
[docs]
@classmethod
def from_dataset_path(
cls,
dataset_path: str | pathlib.Path,
split: str,
n_neighbors: int = 10,
metric: str = "euclidean",
random_state: int | None = None,
n_epochs: int = 200,
transform: Callable | None = None,
):
"""
Parameters
----------
dataset_path : str, pathlib.Path
Path to a directory that represents a dataset.
split
n_neighbors
metric
random_state
n_epochs
transform
Returns
-------
"""
import vak.datasets # import here just to make classmethod more explicit
dataset_path = pathlib.Path(dataset_path)
metadata = vak.datasets.parametric_umap.Metadata.from_dataset_path(
dataset_path
)
dataset_csv_path = dataset_path / metadata.dataset_csv_filename
dataset_df = pd.read_csv(dataset_csv_path)
split_df = dataset_df[dataset_df.split == split]
data = np.stack(
[
np.load(dataset_path / spect_path)
for spect_path in split_df.spect_path.values
]
)
return cls(
data,
split_df,
transform=transform,
)