Skip to content

dlc_utils.py

validate_option(option=None, options=None, name='option', types=None, val_range=None, permit_none=False)

Validate that option is in a list options or a list of types.

Parameters:

Name Type Description Default
option str

If none, runs no checks.

None
options lis

If provided, option must be in options.

None
name st

If provided, name of option to use in error message.

'option'
types tuple

If provided, option must be an instance of one of the types in types.

None
val_range tuple

If provided, option must be in range (min, max)

None
permit_none bool

If True, permit option to be None. Default False.

False

Raises:

Type Description
ValueError

If option is not in options.

Source code in src/spyglass/position/v1/dlc_utils.py
def validate_option(
    option=None,
    options: list = None,
    name="option",
    types: tuple = None,
    val_range: tuple = None,
    permit_none=False,
):
    """Validate that option is in a list options or a list of types.

    Parameters
    ----------
    option : str, optional
        If none, runs no checks.
    options : lis, optional
        If provided, option must be in options.
    name : st, optional
        If provided, name of option to use in error message.
    types : tuple, optional
        If provided, option must be an instance of one of the types in types.
    val_range : tuple, optional
        If provided, option must be in range (min, max)
    permit_none : bool, optional
        If True, permit option to be None. Default False.

    Raises
    ------
    ValueError
        If option is not in options.
    """
    if option is None and not permit_none:
        raise ValueError(f"{name} cannot be None")

    if options and option not in options:
        raise KeyError(
            f"Unknown {name}: {option} " f"Available options: {options}"
        )

    if types and not isinstance(option, tuple(types)):
        raise TypeError(f"{name} is {type(option)}. Available types {types}")

    if val_range and not (val_range[0] <= option <= val_range[1]):
        raise ValueError(f"{name} must be in range {val_range}")

validate_list(required_items, option_list=None, name='List', condition='', permit_none=False)

Validate that option_list contains all items in required_items.

Parameters:

Name Type Description Default
required_items list
required
option_list list

If provided, option_list must contain all items in required_items.

None
name str

If provided, name of option_list to use in error message.

'List'
condition str

If provided, condition in error message as 'when using X'.

''
permit_none bool

If True, permit option_list to be None. Default False.

False
Source code in src/spyglass/position/v1/dlc_utils.py
def validate_list(
    required_items: list,
    option_list: list = None,
    name="List",
    condition="",
    permit_none=False,
):
    """Validate that option_list contains all items in required_items.

    Parameters
    ---------
    required_items : list
    option_list : list, optional
        If provided, option_list must contain all items in required_items.
    name : str, optional
        If provided, name of option_list to use in error message.
    condition : str, optional
        If provided, condition in error message as 'when using X'.
    permit_none : bool, optional
        If True, permit option_list to be None. Default False.
    """
    if option_list is None:
        if permit_none:
            return
        else:
            raise ValueError(f"{name} cannot be None")
    if condition:
        condition = f" when using {condition}"
    if any(x not in required_items for x in option_list):
        raise KeyError(
            f"{name} must contain all items in {required_items}{condition}."
        )

validate_smooth_params(params)

If params['smooth'], validate method is in list and duration type

Source code in src/spyglass/position/v1/dlc_utils.py
def validate_smooth_params(params):
    """If params['smooth'], validate method is in list and duration type"""
    if not params.get("smooth"):
        return
    smoothing_params = params.get("smoothing_params")
    validate_option(smoother=smoothing_params, name="smoothing_params")
    validate_option(
        option=smoothing_params.get("smooth_method"),
        name="smooth_method",
        options=_key_to_smooth_func_dict,
    )
    validate_option(
        option=smoothing_params.get("smoothing_duration"),
        name="smoothing_duration",
        types=(int, float),
    )

OutputLogger

A class to wrap a logging.Logger object in order to provide context manager capabilities.

This class uses contextlib.redirect_stdout to temporarily redirect sys.stdout and thus print statements to the log file instead of, or as well as the console.

Attributes:

Name Type Description
logger Logger

logger object

name str

name of logger

level int

level of logging that the logger is set to handle

Methods:

Name Description
setup_logger

initialize or get logger object with name_logfile that writes to path_logfile

Examples:

>>> with OutputLogger(name, path, print_console=True) as logger:
...    print("this will print to logfile")
...    logger.logger.info("this will log to the logfile")
... print("this will print to the console")
... logger.logger.info("this will log to the logfile")
Source code in src/spyglass/position/v1/dlc_utils.py
class OutputLogger:  # TODO: migrate to spyglass.utils.logger
    """
    A class to wrap a logging.Logger object in order to provide context manager capabilities.

    This class uses contextlib.redirect_stdout to temporarily redirect sys.stdout and thus
    print statements to the log file instead of, or as well as the console.

    Attributes
    ----------
    logger : logging.Logger
        logger object
    name : str
        name of logger
    level : int
        level of logging that the logger is set to handle

    Methods
    -------
    setup_logger(name_logfile, path_logfile, print_console=False)
        initialize or get logger object with name_logfile
        that writes to path_logfile

    Examples
    --------
    >>> with OutputLogger(name, path, print_console=True) as logger:
    ...    print("this will print to logfile")
    ...    logger.logger.info("this will log to the logfile")
    ... print("this will print to the console")
    ... logger.logger.info("this will log to the logfile")

    """

    def __init__(self, name, path, level="INFO", **kwargs):
        self.logger = self.setup_logger(name, path, **kwargs)
        self.name = self.logger.name
        self.level = getattr(logging, level)

    def setup_logger(
        self, name_logfile, path_logfile, print_console=False
    ) -> logging.Logger:
        """
        Sets up a logger for that outputs to a file, and optionally, the console

        Parameters
        ----------
        name_logfile : str
            name of the logfile to use
        path_logfile : str
            path to the file that should be used as the file handler
        print_console : bool, default-False
            if True, prints to console as well as log file.

        Returns
        -------
        logger : logging.Logger
            the logger object with specified handlers
        """

        logger = logging.getLogger(name_logfile)
        # check to see if handlers already exist for this logger
        if logger.handlers:
            for handler in logger.handlers:
                # if it's a file handler
                # type is used instead of isinstance,
                # which doesn't work properly with logging.StreamHandler
                if type(handler) == logging.FileHandler:
                    # if paths don't match, change file handler path
                    if not os.path.samefile(handler.baseFilename, path_logfile):
                        handler.close()
                        logger.removeHandler(handler)
                        file_handler = self._get_file_handler(path_logfile)
                        logger.addHandler(file_handler)
                # if a stream handler exists and
                # if print_console is False remove streamHandler
                if type(handler) == logging.StreamHandler:
                    if not print_console:
                        handler.close()
                        logger.removeHandler(handler)
            if print_console and not any(
                type(handler) == logging.StreamHandler
                for handler in logger.handlers
            ):
                logger.addHandler(self._get_stream_handler())

        else:
            file_handler = self._get_file_handler(path_logfile)
            logger.addHandler(file_handler)
            if print_console:
                logger.addHandler(self._get_stream_handler())
        logger.setLevel(logging.INFO)
        return logger

    def _get_file_handler(self, path):
        output_dir = pathlib.Path(os.path.dirname(path))
        if not os.path.exists(output_dir):
            output_dir.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(path, mode="a")
        file_handler.setFormatter(self._get_formatter())
        return file_handler

    def _get_stream_handler(self):
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(self._get_formatter())
        return stream_handler

    def _get_formatter(self):
        return logging.Formatter(
            "[%(asctime)s] in %(pathname)s, line %(lineno)d: %(message)s",
            datefmt="%d-%b-%y %H:%M:%S",
        )

    def write(self, msg):
        if msg and not msg.isspace():
            self.logger.log(self.level, msg)

    def flush(self):
        pass

    def __enter__(self):
        self._redirector = redirect_stdout(self)
        self._redirector.__enter__()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        # let contextlib do any exception handling here
        self._redirector.__exit__(exc_type, exc_value, traceback)

setup_logger(name_logfile, path_logfile, print_console=False)

Sets up a logger for that outputs to a file, and optionally, the console

Parameters:

Name Type Description Default
name_logfile str

name of the logfile to use

required
path_logfile str

path to the file that should be used as the file handler

required
print_console (bool, default - False)

if True, prints to console as well as log file.

False

Returns:

Name Type Description
logger Logger

the logger object with specified handlers

Source code in src/spyglass/position/v1/dlc_utils.py
def setup_logger(
    self, name_logfile, path_logfile, print_console=False
) -> logging.Logger:
    """
    Sets up a logger for that outputs to a file, and optionally, the console

    Parameters
    ----------
    name_logfile : str
        name of the logfile to use
    path_logfile : str
        path to the file that should be used as the file handler
    print_console : bool, default-False
        if True, prints to console as well as log file.

    Returns
    -------
    logger : logging.Logger
        the logger object with specified handlers
    """

    logger = logging.getLogger(name_logfile)
    # check to see if handlers already exist for this logger
    if logger.handlers:
        for handler in logger.handlers:
            # if it's a file handler
            # type is used instead of isinstance,
            # which doesn't work properly with logging.StreamHandler
            if type(handler) == logging.FileHandler:
                # if paths don't match, change file handler path
                if not os.path.samefile(handler.baseFilename, path_logfile):
                    handler.close()
                    logger.removeHandler(handler)
                    file_handler = self._get_file_handler(path_logfile)
                    logger.addHandler(file_handler)
            # if a stream handler exists and
            # if print_console is False remove streamHandler
            if type(handler) == logging.StreamHandler:
                if not print_console:
                    handler.close()
                    logger.removeHandler(handler)
        if print_console and not any(
            type(handler) == logging.StreamHandler
            for handler in logger.handlers
        ):
            logger.addHandler(self._get_stream_handler())

    else:
        file_handler = self._get_file_handler(path_logfile)
        logger.addHandler(file_handler)
        if print_console:
            logger.addHandler(self._get_stream_handler())
    logger.setLevel(logging.INFO)
    return logger

get_dlc_processed_data_dir()

Returns session_dir relative to custom 'dlc_output_dir' root

Source code in src/spyglass/position/v1/dlc_utils.py
def get_dlc_processed_data_dir() -> str:
    """Returns session_dir relative to custom 'dlc_output_dir' root"""
    if "custom" in dj.config:
        if "dlc_output_dir" in dj.config["custom"]:
            dlc_output_dir = dj.config.get("custom", {}).get("dlc_output_dir")
    if dlc_output_dir:
        return pathlib.Path(dlc_output_dir)
    else:
        return pathlib.Path("/nimbus/deeplabcut/output/")

find_full_path(root_directories, relative_path)

from Datajoint Elements - unused Given a relative path, search and return the full-path from provided potential root directories (in the given order) :param root_directories: potential root directories :param relative_path: the relative path to find the valid root directory :return: full-path (pathlib.Path object)

Source code in src/spyglass/position/v1/dlc_utils.py
def find_full_path(root_directories, relative_path):
    """
    from Datajoint Elements - unused
    Given a relative path, search and return the full-path
     from provided potential root directories (in the given order)
        :param root_directories: potential root directories
        :param relative_path: the relative path to find the valid root directory
        :return: full-path (pathlib.Path object)
    """
    relative_path = _to_Path(relative_path)

    if relative_path.exists():
        return relative_path

    # Turn to list if only a single root directory is provided
    if isinstance(root_directories, (str, pathlib.Path)):
        root_directories = [_to_Path(root_directories)]

    for root_dir in root_directories:
        if (_to_Path(root_dir) / relative_path).exists():
            return _to_Path(root_dir) / relative_path

    raise FileNotFoundError(
        f"No valid full-path found (from {root_directories})"
        f" for {relative_path}"
    )

find_root_directory(root_directories, full_path)

From datajoint elements - unused Given multiple potential root directories and a full-path, search and return one directory that is the parent of the given path :param root_directories: potential root directories :param full_path: the full path to search the root directory :return: root_directory (pathlib.Path object)

Source code in src/spyglass/position/v1/dlc_utils.py
def find_root_directory(root_directories, full_path):
    """
    From datajoint elements - unused
    Given multiple potential root directories and a full-path,
    search and return one directory that is the parent of the given path
        :param root_directories: potential root directories
        :param full_path: the full path to search the root directory
        :return: root_directory (pathlib.Path object)
    """
    full_path = _to_Path(full_path)

    if not full_path.exists():
        raise FileNotFoundError(f"{full_path} does not exist!")

    # Turn to list if only a single root directory is provided
    if isinstance(root_directories, (str, pathlib.Path)):
        root_directories = [_to_Path(root_directories)]

    try:
        return next(
            _to_Path(root_dir)
            for root_dir in root_directories
            if _to_Path(root_dir) in set(full_path.parents)
        )

    except StopIteration as exc:
        raise FileNotFoundError(
            f"No valid root directory found (from {root_directories})"
            f" for {full_path}"
        ) from exc

infer_output_dir(key, makedir=True)

Return the expected pose_estimation_output_dir.

Parameters:

Name Type Description Default
key
required
Source code in src/spyglass/position/v1/dlc_utils.py
def infer_output_dir(key, makedir=True):
    """Return the expected pose_estimation_output_dir.

    Parameters
    ----------
    key: DataJoint key specifying a pairing of VideoFile and Model.
    """
    # TODO: add check to make sure interval_list_name refers to a single epoch
    # Or make key include epoch in and of itself instead of interval_list_name
    nwb_file_name = key["nwb_file_name"].split("_.")[0]
    output_dir = pathlib.Path(dlc_output_dir) / pathlib.Path(
        f"{nwb_file_name}/{nwb_file_name}_{key['epoch']:02}"
        f"_model_" + key["dlc_model_name"].replace(" ", "-")
    )
    if makedir is True:
        if not os.path.exists(output_dir):
            output_dir.mkdir(parents=True, exist_ok=True)
    return output_dir

get_video_path(key)

Given nwb_file_name and interval_list_name returns specified video file filename and path

Parameters:

Name Type Description Default
key dict

Dictionary containing nwb_file_name and interval_list_name as keys

required

Returns:

Name Type Description
video_filepath str

path to the video file, including video filename

video_filename str

filename of the video

Source code in src/spyglass/position/v1/dlc_utils.py
def get_video_path(key):
    """
    Given nwb_file_name and interval_list_name returns specified
    video file filename and path

    Parameters
    ----------
    key : dict
        Dictionary containing nwb_file_name and interval_list_name as keys

    Returns
    -------
    video_filepath : str
        path to the video file, including video filename
    video_filename : str
        filename of the video
    """
    import pynwb

    from ...common.common_behav import VideoFile

    vf_key = {"nwb_file_name": key["nwb_file_name"], "epoch": key["epoch"]}
    VideoFile()._no_transaction_make(vf_key, verbose=False)
    video_query = VideoFile & vf_key

    if len(video_query) != 1:
        print(f"Found {len(video_query)} videos for {vf_key}")
        return None, None, None, None

    video_info = video_query.fetch1()
    nwb_path = f"{raw_dir}/{video_info['nwb_file_name']}"

    with pynwb.NWBHDF5IO(path=nwb_path, mode="r") as in_out:
        nwb_file = in_out.read()
        nwb_video = nwb_file.objects[video_info["video_file_object_id"]]
        video_filepath = VideoFile.get_abs_path(
            {"nwb_file_name": key["nwb_file_name"], "epoch": key["epoch"]}
        )
        video_dir = os.path.dirname(video_filepath) + "/"
        video_filename = video_filepath.split(video_dir)[-1]
        meters_per_pixel = nwb_video.device.meters_per_pixel
        timestamps = np.asarray(nwb_video.timestamps)

    return video_dir, video_filename, meters_per_pixel, timestamps

check_videofile(video_path, output_path=dlc_video_dir, video_filename=None, video_filetype='h264')

Checks the file extension of a video file to make sure it is .mp4 for DeepLabCut processes. Converts to MP4 if not already.

Parameters:

Name Type Description Default
video_path str or PosixPath object

path to directory of the existing video file without filename

required
output_path str or PosixPath object

path to directory where converted video will be saved

dlc_video_dir
video_filename (str, Optional)

filename of the video to convert, if not provided, video_filetype must be and all video files of video_filetype in the directory will be converted

None
video_filetype str or List, Default 'h264', Optional

If video_filename is not provided, all videos of this filetype will be converted to .mp4

'h264'

Returns:

Name Type Description
output_files List of PosixPath objects

paths to converted video file(s)

Source code in src/spyglass/position/v1/dlc_utils.py
def check_videofile(
    video_path: Union[str, pathlib.PosixPath],
    output_path: Union[str, pathlib.PosixPath] = dlc_video_dir,
    video_filename: str = None,
    video_filetype: str = "h264",
):
    """
    Checks the file extension of a video file to make sure it is .mp4 for
    DeepLabCut processes. Converts to MP4 if not already.

    Parameters
    ----------
    video_path : str or PosixPath object
        path to directory of the existing video file without filename
    output_path : str or PosixPath object
        path to directory where converted video will be saved
    video_filename : str, Optional
        filename of the video to convert, if not provided, video_filetype must be
        and all video files of video_filetype in the directory will be converted
    video_filetype : str or List, Default 'h264', Optional
        If video_filename is not provided,
        all videos of this filetype will be converted to .mp4

    Returns
    -------
    output_files : List of PosixPath objects
        paths to converted video file(s)
    """

    if not video_filename:
        video_files = pathlib.Path(video_path).glob(f"*.{video_filetype}")
    else:
        video_files = [pathlib.Path(f"{video_path}/{video_filename}")]
    output_files = []
    for video_filepath in video_files:
        if video_filepath.exists():
            if video_filepath.suffix == ".mp4":
                output_files.append(video_filepath)
                continue
        video_file = (
            video_filepath.as_posix()
            .rsplit(video_filepath.parent.as_posix(), maxsplit=1)[-1]
            .split("/")[-1]
        )
        output_files.append(
            _convert_mp4(video_file, video_path, output_path, videotype="mp4")
        )
    return output_files

get_gpu_memory()

Queries the gpu cluster and returns the memory use for each core. This is used to evaluate which GPU cores are available to run jobs on (i.e. pose estimation, DLC model training)

Returns:

Name Type Description
memory_use_values dict

dictionary with core number as key and memory in use as value.

Raises:

Type Description
RuntimeError

if subproccess command errors.

Source code in src/spyglass/position/v1/dlc_utils.py
def get_gpu_memory():
    """Queries the gpu cluster and returns the memory use for each core.
    This is used to evaluate which GPU cores are available to run jobs on
    (i.e. pose estimation, DLC model training)

    Returns
    -------
    memory_use_values : dict
        dictionary with core number as key and memory in use as value.

    Raises
    ------
    RuntimeError
        if subproccess command errors.
    """

    def output_to_list(x):
        return x.decode("ascii").split("\n")[:-1]

    query_cmd = "nvidia-smi --query-gpu=memory.used --format=csv"
    try:
        memory_use_info = output_to_list(
            subprocess.check_output(query_cmd.split(), stderr=subprocess.STDOUT)
        )[1:]
    except subprocess.CalledProcessError as err:
        raise RuntimeError(
            f"command {err.cmd} return with error (code {err.returncode}): "
            + f"{err.output}"
        ) from err
    memory_use_values = {
        i: int(x.split()[0]) for i, x in enumerate(memory_use_info)
    }
    return memory_use_values

get_span_start_stop(indices)

summary

Parameters:

Name Type Description Default
indices _type_

description

required

Returns:

Type Description
_type_

description

Source code in src/spyglass/position/v1/dlc_utils.py
def get_span_start_stop(indices):
    """_summary_

    Parameters
    ----------
    indices : _type_
        _description_

    Returns
    -------
    _type_
        _description_
    """
    span_inds = []
    # Get start and stop index of spans of consecutive indices
    for k, g in groupby(enumerate(indices), lambda x: x[1] - x[0]):
        group = list(map(itemgetter(1), g))
        span_inds.append((group[0], group[-1]))
    return span_inds

convert_to_pixels(data, frame_size, cm_to_pixels=1.0)

Converts from cm to pixels and flips the y-axis.

Parameters:

Name Type Description Default
data (ndarray, shape(n_time, 2))
required
frame_size (array_like, shape(2))
required
cm_to_pixels float
1.0

Returns:

Name Type Description
converted_data (ndarray, shape(n_time, 2))
Source code in src/spyglass/position/v1/dlc_utils.py
def convert_to_pixels(data, frame_size, cm_to_pixels=1.0):
    """Converts from cm to pixels and flips the y-axis.
    Parameters
    ----------
    data : ndarray, shape (n_time, 2)
    frame_size : array_like, shape (2,)
    cm_to_pixels : float

    Returns
    -------
    converted_data : ndarray, shape (n_time, 2)
    """
    return data / cm_to_pixels