Source code for vak.transforms.frame_labels.functional

"""Functional forms of transformations
related to frame labels,
i.e., vectors where each element represents
a label for a frame, either a single sample in audio
or a single time bin from a spectrogram.

This module is structured as followed:
- from_segments: transform to get frame labels from annotations
- to_labels: transform to get back just string labels from frame labels,
  used to evaluate a model
- to_segments: transform to get back segment onsets, offsets, and labels from frame labels.
  Inverse of ``from_segments``.
- post-processing transforms that can be used to "clean up" a vector of frame labels
  - segment_inds_list_from_class_labels: helper function used to find segments in a vector of frame labels
  - remove_short_segments: remove any segment less than a minimum duration
  - take_majority_vote: take a "majority vote" within each segment bounded by the background label,
    and apply the most "popular" label within each segment to all timebins in that segment
  - postprocess: combines remove_short_segments and take_majority_vote in one transform
"""
from __future__ import annotations

import numpy as np
import numpy.typing as npt
import scipy.stats
import torch

from ... import common
from ...common.timebins import timebin_dur_from_vec
from ...common.validators import column_or_1d, row_or_1d

__all__ = [
    # keep alphabetized
    "boundary_inds_from_boundary_labels",
    "from_segments",
    "postprocess",
    "remove_short_segments",
    "take_majority_vote",
    "segment_inds_list_from_class_labels",
    "to_boundary_times",
    "to_labels",
    "to_segments",
]


[docs] def from_segments( labels_int: npt.NDArray, onsets_s: npt.NDArray, offsets_s: npt.NDArray, time_bins: npt.NDArray, background_label: int = 0, ) -> npt.NDArray: """Make a vector of labels for a vector of frames, given labeled segments in the form of onset times, offset times, and segment labels. Parameters ---------- labels_int : list, numpy.ndarray A list or array of labels from the annotation for a vocalization, mapped to integers onsets_s : numpy.ndarray 1-d vector of floats, segment onsets in seconds. offsets_s : numpy.ndarray 1-d vector of floats, segment offsets in seconds. time_bins : numpy.ndarray 1-d vector of floats, time in seconds for center of each time bin of a spectrogram. background_label : int Label assigned to frames that do not have labels associated with them. Default is 0. Returns ------- frame_labels : numpy.ndarray same length as time_bins, with each element a label for each time bin """ if ( isinstance(labels_int, list) and not all([isinstance(lbl, int) for lbl in labels_int]) ) or ( isinstance(labels_int, np.ndarray) and labels_int.dtype not in [np.int8, np.int16, np.int32, np.int64] ): raise TypeError( "labels_int must be a list or numpy.ndarray of integers" ) label_vec = ( np.ones((time_bins.shape[-1],), dtype="int8") * background_label ) onset_inds = [np.argmin(np.abs(time_bins - onset)) for onset in onsets_s] offset_inds = [ np.argmin(np.abs(time_bins - offset)) for offset in offsets_s ] for label, onset, offset in zip(labels_int, onset_inds, offset_inds): # offset_inds[ind]+1 because offset time bin is still "part of" syllable label_vec[onset : offset + 1] = label # noqa: E203 return label_vec
[docs] def to_labels( frame_labels: npt.NDArray, labelmap: dict, background_label: str = common.constants.DEFAULT_BACKGROUND_LABEL, ) -> str: """Convert vector of frame labels to a string, one character for each continuous segment. Allows for converting output of network from a label for each frame to one label for each continuous segment, in order to compute string-based metrics like edit distance. Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. Typically, the output of a neural network. labelmap : dict That maps string labels to integers. The mapping is inverted to convert back to string labels. background_label: str, optional The string label applied to segments belonging to the background class. Default is :const:`vak.common.constants.DEFAULT_BACKGROUND_LABEL`. Returns ------- labels : str The label at the onset of each continuous segment in ``frame_labels``, mapped back to string labels in ``labelmap``. """ frame_labels = row_or_1d(frame_labels) # NOTE this works differently from `to_boundary_times`; # In that function we do `nonzero(diff(frame_labels))[0] + 1` # to get the indices of boundary times # (the start of a segment, by convention). # Instead, here we call `diff(frame_labels)` and # *importantly* we cast that `astype(bool)`. # Because `diff` will only be non-zero at boundaries, # those elements will be True and all others will be False. # This lets us insert one more `True` as index zero # to shift all other `True`s to the right by one, # and then use the **boolean** vector to index into # frame labels to get the label at the start of each segment. # This is equivalent to adding +1 to the result of # nonzero(diff(frame_labels)) to get the (integer) indices, # and *only after* that, adding a `0` at the start of # the (integer) indexing vector. # We just avoid getting the numeric indices here. # Adding this big verbose comment because I was afraid this was a bug, # after adding `to_boundary_times` and then getting confused # (because I didn't read this closely enough). onset_inds = np.diff(frame_labels, axis=0).astype(bool) onset_inds = np.insert(onset_inds, 0, True) labels = frame_labels[onset_inds] # remove background label if background_label in labelmap: labels = labels[labels != labelmap[background_label]] if len(labels) < 1: # if removing all the 'unlabeled' leaves nothing return "" # only invert mapping and then map integer labels to characters inverse_labelmap = dict((v, k) for k, v in labelmap.items()) labels = labels.tolist() labels = [inverse_labelmap[label] for label in labels] return "".join(labels)
[docs] def to_segments( frame_labels: npt.NDArray, labelmap: dict, frame_times: npt.NDArray, n_decimals_trunc: int = 5, background_label: str = common.constants.DEFAULT_BACKGROUND_LABEL, ) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """Convert a vector of frame labels into segments in the form of onset indices, offset indices, and labels. Finds where continuous runs of a single label start and stop in timebins, and considers each of these runs a segment. The function returns vectors of labels and onsets and offsets in units of seconds. Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. Output of a neural network. labelmap : dict That maps labels to consecutive integers. The mapping is inverted to convert back to labels. frame_times : numpy.ndarray Vector of times; the times are either the time of samples in audio, or the bin centers of columns in a spectrogram, returned by function that generated spectrogram. Used to convert onset and offset indices in frame_labels to seconds. n_decimals_trunc : int Number of decimal places to keep when truncating the timebin duration calculated from the vector of times t. Default is 5. Returns ------- labels : numpy.ndarray Vector where each element is a label for a segment with its onset and offset indices given by the corresponding element in onset_inds and offset_inds. onsets_s : numpy.ndarray Vector where each element is the onset in seconds a segment. Each onset corresponds to the value at the same index in labels. offsets_s : numpy.ndarray Vector where each element is the offset in seconds of a segment. Each offset corresponds to the value at the same index in labels. """ frame_labels = column_or_1d(frame_labels) if background_label in labelmap: # handle the case when all time bins are predicted to be unlabeled # see https://github.com/NickleDave/vak/issues/383 uniq_frame_labels = np.unique(frame_labels) if ( len(uniq_frame_labels) == 1 and uniq_frame_labels[0] == labelmap[background_label] ): return None, None, None # used to find onsets/offsets below; compute here so if we fail we do so early timebin_dur = timebin_dur_from_vec(frame_times, n_decimals_trunc) offset_inds = np.nonzero(np.diff(frame_labels, axis=0))[ 0 ] # [0] because nonzero return tuple onset_inds = offset_inds + 1 offset_inds = np.concatenate( (offset_inds, np.asarray([frame_labels.shape[0] - 1])) ) onset_inds = np.concatenate((np.asarray([0]), onset_inds)) labels = frame_labels[onset_inds] # remove background label if background_label in labelmap: keep = np.where(labels != labelmap[background_label])[0] labels = labels[keep] onset_inds = onset_inds[keep] offset_inds = offset_inds[keep] # handle case where removing 'unlabeled' leaves no segments if all([len(vec) == 0 for vec in (labels, onset_inds, offset_inds)]): return None, None, None inverse_labelmap = dict((v, k) for k, v in labelmap.items()) labels = labels.tolist() labels = np.asarray([inverse_labelmap[label] for label in labels]) # the 'best' estimate we can get of onset and offset times, # given binned times, and labels applied to each time bin, # is "some time" between the last labeled bin for one segment, # i.e. its offset, and the first labeled bin for the next # segment, i.e. its onset. In other words if the whole bin is labeled # as belonging to that segment, and the bin preceding it is labeled as # belonging to the previous section, then the onset of the current # segment must be the time between the two bins. To find those times # we use the bin centers and either subtract (for onsets) or add # (for offsets) half a timebin duration. This half a timebin # duration puts our onsets and offsets at the time "between" bins. onsets_s = frame_times[onset_inds] - (timebin_dur / 2) offsets_s = frame_times[offset_inds] + (timebin_dur / 2) # but this estimate will be "wrong" if we set the onset or offset time # outside the possible times in our timebin vector. Need to clean up. if onsets_s[0] < 0.0: onsets_s[0] = 0.0 if offsets_s[-1] > frame_times[-1]: offsets_s[-1] = frame_times[-1] return labels, onsets_s, offsets_s
[docs] def segment_inds_list_from_class_labels( frame_labels: npt.NDArray, background_label: int = 0 ) -> list[npt.NDArray]: """Given a vector of frame labels, returns a list of indexing vectors, one for each segment in the vector that is not labeled with the background label. Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. background_label : int Label that was given to segments that were not labeled in annotation, e.g. silent periods between annotated segments. Default is 0. Returns ------- segment_inds_list : list Of fancy indexing arrays. Each array can be used to index one segment in ``frame_labels``. """ segment_inds = np.nonzero(frame_labels != background_label)[0] return np.split(segment_inds, np.where(np.diff(segment_inds) != 1)[0] + 1)
[docs] def remove_short_segments( frame_labels: npt.NDArray, segment_inds_list: list[npt.NDArray], timebin_dur: float, min_segment_dur: float | int, background_label: int = 0, ) -> tuple[npt.NDArray, list[npt.NDArray]]: """Remove segments from vector of frame labels that are shorter than a specified duration. Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. Output of a neural network. segment_inds_list : list Of numpy.ndarray, indices that will recover segments list from ``frame_labels``. Returned by function ``vak.labels.frame_labels_segment_inds_list``. timebin_dur : float Duration of a single timebin in the spectrogram, in seconds. Used to convert onset and offset indices in ``frame_labels`` to seconds. min_segment_dur : float Minimum duration of segment, in seconds. If specified, then any segment with a duration less than min_segment_dur is removed from frame_labels. Default is None, in which case no segments are removed. background_label : int Label that was given to segments that were not labeled in annotation, e.g. silent periods between annotated segments. Default is 0. Returns ------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. With segments whose duration is shorter than ``min_segment_dur`` set to ``background_label`` segment_inds_list : list Of numpy.ndarray, with arrays removed that represented segments in ``frame_labels`` that were shorter than ``min_segment_dur``. """ new_segment_inds_list = [] for segment_inds in segment_inds_list: if segment_inds.shape[-1] * timebin_dur < min_segment_dur: frame_labels[segment_inds] = background_label # DO NOT keep segment_inds array else: # do keep segment_inds array, don't change frame_labels new_segment_inds_list.append(segment_inds) return frame_labels, new_segment_inds_list
[docs] def take_majority_vote( frame_labels: npt.NDArray, segment_inds_list: list[npt.NDArray] ) -> npt.NDArray: """Transform segments containing multiple labels into segments with a single label by taking a "majority vote", i.e. assign all frames in the segment the most frequently occurring label in the segment. Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. Output of a neural network. segment_inds_list : list Of numpy.ndarray, indices that will recover segments list from frame_labels. Returned by function ``vak.labels.frame_labels_segment_inds_list``. Returns ------- frame_labels : numpy.ndarray After the majority vote transform has been applied. """ for segment_inds in segment_inds_list: segment = frame_labels[segment_inds] majority = scipy.stats.mode(segment, keepdims=False)[0].item() frame_labels[segment_inds] = majority return frame_labels
[docs] def boundary_inds_from_boundary_labels( boundary_labels: npt.NDArray, force_boundary_first_ind: bool = True ) -> npt.NDArray: """Return a :class:`numpy.ndarray` with the indices of boundaries, given a 1-D vector of boundary labels. Parameters ---------- boundary_labels : numpy.ndarray Vector of integers ``{0, 1}``, where ``1`` indicates a boundary, and ``0`` indicates no boundary. Output of a frame classification model trained to classify each frame as "boundary" or "no boundary". force_boundary_first_ind : bool If ``True``, and the first index of ``boundary_labels`` is not classified as a boundary, force it to be a boundary. """ boundary_labels = row_or_1d(boundary_labels) boundary_inds = np.nonzero(boundary_labels)[0] if force_boundary_first_ind: if len(boundary_inds) == 0: # handle edge case where no boundaries were predicted boundary_inds = np.array([0]) # replace with a single boundary, at index 0 else: if boundary_inds[0] != 0: # force there to be a boundary at index 0 np.insert(boundary_inds, 0, 0) return boundary_inds
[docs] def segment_inds_list_from_boundary_labels( boundary_labels: npt.NDArray, force_boundary_first_ind: bool = True ) -> list[npt.NDArray]: """Given an array of boundary labels, return a list of :class:`numpy.ndarray` vectors, each of which can be used to index one segment in a vector of frame labels. Parameters ---------- boundary_labels : numpy.ndarray Vector of integers ``{0, 1}``, where ``1`` indicates a boundary, and ``0`` indicates no boundary. Output of a frame classification model trained to classify each frame as "boundary" or "no boundary". force_boundary_first_ind : bool If ``True``, and the first index of ``boundary_labels`` is not classified as a boundary, force it to be a boundary. Returns ------- segment_inds_list : list Of fancy indexing arrays. Each array can be used to index one segment in ``frame_labels``. """ boundary_inds = boundary_inds_from_boundary_labels( boundary_labels, force_boundary_first_ind ) # at the end of `boundary_inds``, insert an imaginary "last" boundary we use just with ``np.arange`` below np.insert(boundary_inds, boundary_inds.shape[0], boundary_labels.shape[0]) segment_inds_list = [] for start, stop in zip(boundary_inds[:-1], boundary_inds[1:]): segment_inds_list.append(np.arange(start, stop)) return segment_inds_list
[docs] def postprocess( frame_labels: npt.NDArray, timebin_dur: float, background_label: int = 0, min_segment_dur: float | None = None, majority_vote: bool = False, boundary_labels: npt.NDArray | None = None, ) -> npt.NDArray: """Apply post-processing transformations to a vector of frame labels. Optional post-processing consist of two transforms, that both rely on there being a label that corresponds to the background class. The first removes any segments that are shorter than a specified duration, by converting labels in those segments to the background class label. The second performs a "majority vote" transform within run of labels that is bordered on both sides by the "background" label. I.e., it counts the number of times any label occurs in that segment, and then assigns all bins the most common label. The function performs those steps in this order (pseudo-code): .. code-block:: if min_segment_dur: frame_labels = remove_short_segments(frame_labels, labelmap, min_segment_dur) if majority_vote: frame_labels = majority_vote(frame_labels, labelmap) return frame_labels Parameters ---------- frame_labels : numpy.ndarray A vector where each element represents a label for a frame, either a single sample in audio or a single time bin from a spectrogram. Output of a neural network. timebin_dur : float Duration of a time bin in a spectrogram, e.g., as estimated from vector of times using ``vak.timebins.timebin_dur_from_vec``. background_label : int Label that was given to segments that were not labeled in annotation, e.g. silent periods between annotated segments. Default is 0. min_segment_dur : float Minimum duration of segment, in seconds. If specified, then any segment with a duration less than min_segment_dur is removed from frame_labels. Default is None, in which case no segments are removed. majority_vote : bool If True, transform segments containing multiple labels into segments with a single label by taking a "majority vote", i.e. assign all time bins in the segment the most frequentlygt occurring label in the segment. This transform requires either a background label or a vector of boundary labels. Default is False. boundary_labels : numpy.ndarray, optional. Vector of integers ``{0, 1}``, where ``1`` indicates a boundary, and ``0`` indicates no boundary. Output of one head of a frame classification model, that has been trained to classify each frame as either "boundary" or "no boundary". Optional, default is None. If supplied, this vector is used to find segments before applying post-processing, instead of recovering them from ``frame_labels`` using the ``background_label`` Returns ------- frame_labels : numpy.ndarray Vector of frame labels after post-processing is applied. """ # NOTE: we need to clone to avoid mutating tensors inside a model, # since calling `torch.tensor.numpy()` inside the model gives us # a numpy array that shares memory with the tensor. # Not doing so can make metrics wrong, # e.g., for frame labels *before* post-processing, # if we mutate that tensor here. # see:https://github.com/vocalpy/vak/issues/821 frame_labels = np.copy(frame_labels) frame_labels = row_or_1d(frame_labels) if boundary_labels is not None: boundary_labels = row_or_1d(boundary_labels) # handle the case when all time bins are predicted to be unlabeled # see https://github.com/NickleDave/vak/issues/383 uniq_frame_labels = np.unique(frame_labels) if ( len(uniq_frame_labels) == 1 and uniq_frame_labels[0] == background_label ): return frame_labels # -> no need to do any of the post-processing if boundary_labels is not None: segment_inds_list = segment_inds_list_from_boundary_labels( boundary_labels ) else: segment_inds_list = segment_inds_list_from_class_labels( frame_labels, background_label=background_label ) if min_segment_dur is not None: frame_labels, segment_inds_list = remove_short_segments( frame_labels, segment_inds_list, timebin_dur, min_segment_dur, background_label, ) if len(segment_inds_list) == 0: # no segments left after removing return frame_labels # -> no need to do any of the post-processing if majority_vote: frame_labels = take_majority_vote(frame_labels, segment_inds_list) return frame_labels
[docs] def to_boundary_times( frame_labels: torch.LongTensor, frame_times: torch.FloatTensor, padval: float = common.constants.DEFAULT_BOUNDARY_TIMES_PADVAL, ) -> torch.FloatTensor: """Convert a 1-dimensional tensor of integer frame labels to a tensor of float boundary times. Parameters ---------- frame_labels : torch.LongTensor Vector of class labels for each frame. frame_times : torch.FloatTensor The time for each frame, e.g., the vector of bin center times that is computed for a spectrogram. Notes ----- By convention, this function places a boundary at the first frame of any continuous run of a single integer class. E.g., ``` [0, 0, 0, 1, 1, 1, 0, 0, 2, 2, 0, 0, 1, 1, 0, 0] ^ ^ ^ ^ ^ ^ ^ ``` (where carets (``^``) indicate boundaries). In other words, the start index of any segment is considered a boundary. """ from vak.common import validators validators.is_2d_tensor(frame_labels) validators.is_2d_tensor(frame_times) if frame_labels.shape[0] != frame_times.shape[0]: raise ValueError( "`frame_labels` and `frame_times` should have same batch size but " f"frame_labels.shape[0]={frame_labels.shape[0]} != frame_times.shape[0]={frame_times.shape[0]}" ) if frame_labels.shape[1] != frame_times.shape[1]: raise ValueError( "`frame_labels` and `frame_times` should have same length (number of frames), but " f"`frame_labels.shape[1]={frame_labels.shape[-1]} != frame_times.shape[1]={frame_times.shape[-1]}" ) if not isinstance(padval, float): raise TypeError( f"Type of `padval` must be float but was {type(padval)}" ) if padval >= 0.0: raise ValueError( "`padval` must be a negative number, to avoid clashing with valid boundary times, " f"but was: {padval}" ) frame_labels_batch = torch.unbind(frame_labels) frame_times_batch = torch.unbind(frame_times) boundary_times_batch = [] for frame_labels_item, frame_times_item in zip( frame_labels_batch, frame_times_batch ): # a boundary occurs in frame labels # wherever the first-order difference is not 0. # **Notice** we add +1 because this is the first-order difference, # so the actually change occurs at :math:`i`=np.diff(frame_labels) + 1. boundary_inds = torch.nonzero( torch.diff(frame_labels_item), as_tuple=True, )[0] + 1 # We force the first index to be a boundary. boundary_inds = torch.cat( (torch.LongTensor([0]).to(boundary_inds.device), boundary_inds) ) boundary_times_item = frame_times_item[boundary_inds] boundary_times_batch.append(boundary_times_item) boundary_times_batch = torch.nn.utils.rnn.pad_sequence( boundary_times_batch, batch_first=True, padding_value=padval, padding_side="right", ) return boundary_times_batch