Skip to content

position_dlc_project.py

BodyPart

Bases: SpyglassMixin, Manual

Holds bodyparts for use in DeepLabCut models

Source code in src/spyglass/position/v1/position_dlc_project.py
@schema
class BodyPart(SpyglassMixin, dj.Manual):
    """Holds bodyparts for use in DeepLabCut models"""

    definition = """
    bodypart                : varchar(32)
    ---
    bodypart_description='' : varchar(80)
    """

    @classmethod
    def add_from_config(cls, bodyparts: List, descriptions: List = None):
        """Given a list of bodyparts from the config and
        an optional list of descriptions, inserts into BodyPart table.

        Parameters
        ----------
        bodyparts : List
            list of bodyparts from config
        descriptions : List, default None
            optional list of descriptions for bodyparts.
            If None, description is set to bodypart name
        """
        if descriptions is not None:
            bodyparts_dict = [
                {"bodypart": bp, "bodypart_description": desc}
                for (bp, desc) in zip(bodyparts, descriptions)
            ]
        else:
            bodyparts_dict = [
                {"bodypart": bp, "bodypart_description": bp} for bp in bodyparts
            ]
        cls().insert(bodyparts_dict, skip_duplicates=True)

add_from_config(bodyparts, descriptions=None) classmethod

Given a list of bodyparts from the config and an optional list of descriptions, inserts into BodyPart table.

Parameters:

Name Type Description Default
bodyparts List

list of bodyparts from config

required
descriptions List

optional list of descriptions for bodyparts. If None, description is set to bodypart name

None
Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def add_from_config(cls, bodyparts: List, descriptions: List = None):
    """Given a list of bodyparts from the config and
    an optional list of descriptions, inserts into BodyPart table.

    Parameters
    ----------
    bodyparts : List
        list of bodyparts from config
    descriptions : List, default None
        optional list of descriptions for bodyparts.
        If None, description is set to bodypart name
    """
    if descriptions is not None:
        bodyparts_dict = [
            {"bodypart": bp, "bodypart_description": desc}
            for (bp, desc) in zip(bodyparts, descriptions)
        ]
    else:
        bodyparts_dict = [
            {"bodypart": bp, "bodypart_description": bp} for bp in bodyparts
        ]
    cls().insert(bodyparts_dict, skip_duplicates=True)

DLCProject

Bases: SpyglassMixin, Manual

Table to facilitate creation of a new DeepLabCut model. With ability to edit config, extract frames, label frames

Source code in src/spyglass/position/v1/position_dlc_project.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
@schema
class DLCProject(SpyglassMixin, dj.Manual):
    """Table to facilitate creation of a new DeepLabCut model.
    With ability to edit config, extract frames, label frames
    """

    # Add more parameters as secondary keys...
    # TODO: collapse params into blob dict
    definition = """
    project_name     : varchar(100) # name of DLC project
    ---
    -> LabTeam
    bodyparts        : blob         # list of bodyparts to label
    frames_per_video : int          # number of frames to extract from each video
    config_path      : varchar(120) # path to config.yaml for model
    """

    class BodyPart(SpyglassMixin, dj.Part):
        """Part table to hold bodyparts used in each project."""

        definition = """
        -> DLCProject
        -> BodyPart
        """

    class File(SpyglassMixin, dj.Part):
        definition = """
        # Paths of training files (e.g., labeled pngs, CSV or video)
        -> DLCProject
        file_name: varchar(200) # Concise name to describe file
        file_ext : enum("mp4", "csv", "h5") # extension of file
        ---
        file_path: varchar(255)
        """

    def insert1(self, key, **kwargs):
        """Override insert1 to check types of key values."""
        if not isinstance(key["project_name"], str):
            raise ValueError("project_name must be a string")
        if not isinstance(key["frames_per_video"], int):
            raise ValueError("frames_per_video must be of type `int`")
        super().insert1(key, **kwargs)

    def _existing_project(self, project_name):
        if project_name in self.fetch("project_name"):
            logger.warning(f"project name: {project_name} is already in use.")
            return (self & {"project_name": project_name}).fetch(
                "project_name", "config_path", as_dict=True
            )[0]
        return None

    @classmethod
    def insert_existing_project(
        cls,
        project_name: str,
        lab_team: str,
        config_path: str,
        bodyparts: List = None,
        frames_per_video: int = None,
        add_to_files: bool = True,
        **kwargs,
    ):
        """
        insert an existing project into DLCProject table.
        Parameters
        ----------
        project_name : str
            user-friendly name of project
        lab_team : str
            name of lab team. Should match an entry in LabTeam table
        config_path : str
            path to project directory
        bodyparts : list
            optional list of bodyparts to label that
            are not already in existing config
        """
        from deeplabcut.utils.auxiliaryfunctions import read_config

        if (existing := cls()._existing_project(project_name)) is not None:
            return existing

        cfg = read_config(config_path)
        all_bodyparts = cfg["bodyparts"]
        if bodyparts:
            bodyparts_to_add = [
                bodypart
                for bodypart in bodyparts
                if bodypart not in cfg["bodyparts"]
            ]
            all_bodyparts += bodyparts_to_add

        BodyPart.add_from_config(cfg["bodyparts"])
        for bodypart in all_bodyparts:
            if not bool(BodyPart() & {"bodypart": bodypart}):
                raise ValueError(
                    f"bodypart: {bodypart} not found in BodyPart table"
                )

        # check bodyparts are in config, if not add
        if len(bodyparts_to_add) > 0:
            add_to_config(config_path, bodyparts=bodyparts_to_add)

        # Get frames per video from config. If passed as arg, check match
        if frames_per_video:
            if frames_per_video != cfg["numframes2pick"]:
                add_to_config(
                    config_path, **{"numframes2pick": frames_per_video}
                )

        config_path = Path(config_path)
        project_path = config_path.parent
        dlc_project_path = dlc_project_dir

        if dlc_project_path not in project_path.as_posix():
            project_dirname = project_path.name
            dest_folder = Path(f"{dlc_project_path}/{project_dirname}/")
            if dest_folder.exists():
                new_proj_dir = dest_folder.as_posix()
            else:
                new_proj_dir = shutil.copytree(
                    src=project_path,
                    dst=f"{dlc_project_path}/{project_dirname}/",
                )
            new_config_path = Path(f"{new_proj_dir}/config.yaml")
            assert (
                new_config_path.exists()
            ), "config.yaml does not exist in new project directory"
            config_path = new_config_path
            add_to_config(config_path, **{"project_path": new_proj_dir})

        # TODO still need to copy videos over to video dir
        key = {
            "project_name": project_name,
            "team_name": lab_team,
            "bodyparts": bodyparts,
            "config_path": config_path.as_posix(),
            "frames_per_video": frames_per_video,
        }
        cls().insert1(key, **kwargs)
        cls().BodyPart.insert(
            [
                {"project_name": project_name, "bodypart": bp}
                for bp in all_bodyparts
            ],
            **kwargs,
        )
        if add_to_files:  # Check for training files to add
            cls().add_training_files(key, **kwargs)

        return {
            "project_name": project_name,
            "config_path": config_path.as_posix(),
        }

    @classmethod
    def insert_new_project(
        cls,
        project_name: str,
        bodyparts: List,
        lab_team: str,
        frames_per_video: int,
        video_list: List,
        groupname: str = None,
        project_directory: str = dlc_project_dir,
        output_path: str = dlc_video_dir,
        **kwargs,
    ):
        """Insert a new project into DLCProject table.

        Parameters
        ----------
        project_name : str
            user-friendly name of project
        groupname : str, optional
            Name for project group. If None, defaults to username
        bodyparts : list
            list of bodyparts to label. Should match bodyparts in BodyPart table
        lab_team : str
            name of lab team. Should match an entry in LabTeam table
        project_directory : str
            directory where to create project.
            (Default is '/cumulus/deeplabcut/')
        frames_per_video : int
            number of frames to extract from each video
        video_list : list
            list of (a) dicts of to query VideoFile table for or (b) absolute
            paths to videos to train on. If dict, use format:
            [{'nwb_file_name': nwb_file_name, 'epoch': epoch #},...]
        output_path : str
            target path to output converted videos
            (Default is '/nimbus/deeplabcut/videos/')
        """
        from deeplabcut import create_new_project

        if (existing := cls()._existing_project(project_name)) is not None:
            return existing
        if not bool(LabTeam() & {"team_name": lab_team}):
            raise ValueError(f"LabTeam does not exist: {lab_team}")

        add_to_files = kwargs.pop("add_to_files", True)
        skeleton_node = None
        # If dict, assume of form {'nwb_file_name': nwb_file_name, 'epoch': epoch}
        # and pass to get_video_path to reference VideoFile table for path

        videos = cls()._process_videos(video_list, output_path)

        config_path = create_new_project(
            project=project_name,
            experimenter=sanitize_filename(lab_team),
            videos=videos,
            working_directory=project_directory,
            copy_videos=True,
            multianimal=False,
        )
        for bodypart in bodyparts:
            if not bool(BodyPart() & {"bodypart": bodypart}):
                raise ValueError(
                    f"bodypart: {bodypart} not found in BodyPart table"
                )
        kwargs_copy = copy.deepcopy(kwargs)
        kwargs_copy.update({"numframes2pick": frames_per_video, "dotsize": 3})

        add_to_config(
            config_path, bodyparts, skeleton_node=skeleton_node, **kwargs_copy
        )

        key = {
            "project_name": project_name,
            "team_name": lab_team,
            "bodyparts": bodyparts,
            "config_path": config_path,
            "frames_per_video": frames_per_video,
        }
        cls().insert1(key, **kwargs)
        cls().BodyPart.insert(
            [
                {"project_name": project_name, "bodypart": bp}
                for bp in bodyparts
            ],
            **kwargs,
        )
        if add_to_files:  # Add videos to training files
            cls().add_training_files(key, **kwargs)

        if isinstance(config_path, PosixPath):
            config_path = config_path.as_posix()
        return {"project_name": project_name, "config_path": config_path}

    def _process_videos(self, video_list, output_path):
        # If dict, assume {'nwb_file_name': nwb_file_name, 'epoch': epoch}
        if all(isinstance(n, Dict) for n in video_list):
            videos_to_convert = []
            for video in video_list:
                if (video_path := get_video_info(video))[0] is not None:
                    videos_to_convert.append(video_path)

        else:  # Otherwise, assume list of video file paths
            if not all([Path(video).exists() for video in video_list]):
                raise FileNotFoundError(f"Couldn't find video(s): {video_list}")
            videos_to_convert = []
            for video in video_list:
                vp = Path(video)
                videos_to_convert.append((vp.parent, vp.name))

        videos = [
            find_mp4(
                video_path=video[0],
                output_path=output_path,
                video_filename=video[1],
            )
            for video in videos_to_convert
        ]

        if len(videos) < 1:
            raise ValueError(f"no .mp4 videos found from {video_list}")

        return videos

    @classmethod
    def add_video_files(
        cls,
        video_list,
        config_path=None,
        key=None,
        output_path: str = dlc_video_dir,
        add_new=False,
        add_to_files=True,
        **kwargs,
    ):
        """Add videos to existing project or create new project"""
        has_config_or_key = bool(config_path) or bool(key)
        if add_new and not has_config_or_key:
            raise ValueError("If add_new, must provide key or config_path")

        config_path = config_path or (cls & key).fetch1("config_path")
        has_proj = bool(key) or len(cls & {"config_path": config_path}) == 1
        if add_to_files and not has_proj:
            raise ValueError("Cannot set add_to_files=True without passing key")

        videos = cls()._process_videos(video_list, output_path)

        if add_new:
            from deeplabcut import add_new_videos

            add_new_videos(config=config_path, videos=videos, copy_videos=True)

        if add_to_files:  # Add videos to training files
            cls().add_training_files(key, **kwargs)
        return videos

    @classmethod
    def add_training_files(cls, key, **kwargs):
        """Add training videos and labeled frames .h5
        and .csv to DLCProject.File"""
        from deeplabcut.utils.auxiliaryfunctions import read_config

        config_path = (cls & {"project_name": key["project_name"]}).fetch1(
            "config_path"
        )

        key = {  # Remove non-essential vals from key
            k: v
            for k, v in key.items()
            if k
            not in [
                "bodyparts",
                "team_name",
                "config_path",
                "frames_per_video",
            ]
        }

        cfg = read_config(config_path)
        video_names = list(cfg["video_sets"])
        label_dir = Path(cfg["project_path"]) / "labeled-data"
        training_files = []

        video_inserts = []
        for video in video_names:
            vid_path_obj = Path(video)
            video_name = vid_path_obj.stem
            training_files.extend((label_dir / video_name).glob("*Collected*"))
            key.update(
                {
                    "file_name": video_name,
                    "file_ext": vid_path_obj.suffix[1:],  # remove leading '.'
                    "file_path": video,
                }
            )
        cls().File.insert(video_inserts, **kwargs)

        if len(training_files) == 0:
            logger.warning("No training files to add")
            return

        for file in training_files:
            path_obj = Path(file)
            cls().File.insert1(
                {
                    **key,
                    "file_name": f"{path_obj.name}_labeled_data",
                    "file_ext": path_obj.suffix[1:],
                    "file_path": file,
                },
                **kwargs,
            )

    @classmethod
    def run_extract_frames(cls, key, **kwargs):
        """Convenience function to launch DLC GUI for extracting frames.
        Must be run on local machine to access GUI,
        cannot be run through ssh tunnel
        """
        config_path = (cls & key).fetch1("config_path")
        from deeplabcut import extract_frames

        extract_frames(config_path, **kwargs)

    @classmethod
    def run_label_frames(cls, key):
        """Convenience function to launch DLC GUI for labeling frames.
        Must be run on local machine to access GUI,
        cannot be run through ssh tunnel
        """
        config_path = (cls & key).fetch1("config_path")
        try:
            from deeplabcut import label_frames
        except (ModuleNotFoundError, ImportError):
            logger.error("DLC loaded in light mode, cannot label frames")
            return

        label_frames(config_path)

    @classmethod
    def check_labels(cls, key, **kwargs):
        """Convenience function to check labels on
        previously extracted and labeled frames
        """
        config_path = (cls & key).fetch1("config_path")
        from deeplabcut import check_labels

        check_labels(config_path, **kwargs)

    @classmethod
    def import_labeled_frames(
        cls,
        key: Dict,
        new_proj_path: Union[str, PosixPath],
        video_filenames: Union[str, List],
        **kwargs,
    ):
        """Function to import pre-labeled frames from an existing project
        into a new project

        Parameters
        ----------
        key : Dict
            key to specify entry in DLCProject table to add labeled frames to
        new_proj_path : Union[str, PosixPath]
            absolute path to project directory containing
            labeled frames to import
        video_filenames : str or List
            filename or list of filenames of video(s) from which to import
            frames. Without file extension
        """
        project_entry = (cls & key).fetch1()
        team_name = project_entry["team_name"].replace(" ", "_")
        this_proj_path = Path(project_entry["config_path"]).parent
        this_data_path = this_proj_path / "labeled-data"
        new_proj_path = Path(new_proj_path)  # If Path(Path), no change
        new_data_path = new_proj_path / "labeled-data"

        if not new_data_path.exists():
            raise FileNotFoundError(f"Cannot find directory: {new_data_path}")

        videos = (
            video_filenames
            if isinstance(video_filenames, List)
            else [video_filenames]
        )
        for video_file in videos:
            h5_file = next((new_data_path / video_file).glob("*h5"))
            dlc_df = pd.read_hdf(h5_file)
            dlc_df.columns = dlc_df.columns.set_levels([team_name], level=0)
            new_video_path = this_data_path / video_file
            new_video_path.mkdir(exist_ok=True)
            dlc_df.to_hdf(
                new_video_path / f"CollectedData_{team_name}.h5",
                "df_with_missing",
            )
        cls().add_training_files(key, **kwargs)

BodyPart

Bases: SpyglassMixin, Part

Part table to hold bodyparts used in each project.

Source code in src/spyglass/position/v1/position_dlc_project.py
class BodyPart(SpyglassMixin, dj.Part):
    """Part table to hold bodyparts used in each project."""

    definition = """
    -> DLCProject
    -> BodyPart
    """

insert1(key, **kwargs)

Override insert1 to check types of key values.

Source code in src/spyglass/position/v1/position_dlc_project.py
def insert1(self, key, **kwargs):
    """Override insert1 to check types of key values."""
    if not isinstance(key["project_name"], str):
        raise ValueError("project_name must be a string")
    if not isinstance(key["frames_per_video"], int):
        raise ValueError("frames_per_video must be of type `int`")
    super().insert1(key, **kwargs)

insert_existing_project(project_name, lab_team, config_path, bodyparts=None, frames_per_video=None, add_to_files=True, **kwargs) classmethod

insert an existing project into DLCProject table.

Parameters:

Name Type Description Default
project_name str

user-friendly name of project

required
lab_team str

name of lab team. Should match an entry in LabTeam table

required
config_path str

path to project directory

required
bodyparts list

optional list of bodyparts to label that are not already in existing config

None
Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def insert_existing_project(
    cls,
    project_name: str,
    lab_team: str,
    config_path: str,
    bodyparts: List = None,
    frames_per_video: int = None,
    add_to_files: bool = True,
    **kwargs,
):
    """
    insert an existing project into DLCProject table.
    Parameters
    ----------
    project_name : str
        user-friendly name of project
    lab_team : str
        name of lab team. Should match an entry in LabTeam table
    config_path : str
        path to project directory
    bodyparts : list
        optional list of bodyparts to label that
        are not already in existing config
    """
    from deeplabcut.utils.auxiliaryfunctions import read_config

    if (existing := cls()._existing_project(project_name)) is not None:
        return existing

    cfg = read_config(config_path)
    all_bodyparts = cfg["bodyparts"]
    if bodyparts:
        bodyparts_to_add = [
            bodypart
            for bodypart in bodyparts
            if bodypart not in cfg["bodyparts"]
        ]
        all_bodyparts += bodyparts_to_add

    BodyPart.add_from_config(cfg["bodyparts"])
    for bodypart in all_bodyparts:
        if not bool(BodyPart() & {"bodypart": bodypart}):
            raise ValueError(
                f"bodypart: {bodypart} not found in BodyPart table"
            )

    # check bodyparts are in config, if not add
    if len(bodyparts_to_add) > 0:
        add_to_config(config_path, bodyparts=bodyparts_to_add)

    # Get frames per video from config. If passed as arg, check match
    if frames_per_video:
        if frames_per_video != cfg["numframes2pick"]:
            add_to_config(
                config_path, **{"numframes2pick": frames_per_video}
            )

    config_path = Path(config_path)
    project_path = config_path.parent
    dlc_project_path = dlc_project_dir

    if dlc_project_path not in project_path.as_posix():
        project_dirname = project_path.name
        dest_folder = Path(f"{dlc_project_path}/{project_dirname}/")
        if dest_folder.exists():
            new_proj_dir = dest_folder.as_posix()
        else:
            new_proj_dir = shutil.copytree(
                src=project_path,
                dst=f"{dlc_project_path}/{project_dirname}/",
            )
        new_config_path = Path(f"{new_proj_dir}/config.yaml")
        assert (
            new_config_path.exists()
        ), "config.yaml does not exist in new project directory"
        config_path = new_config_path
        add_to_config(config_path, **{"project_path": new_proj_dir})

    # TODO still need to copy videos over to video dir
    key = {
        "project_name": project_name,
        "team_name": lab_team,
        "bodyparts": bodyparts,
        "config_path": config_path.as_posix(),
        "frames_per_video": frames_per_video,
    }
    cls().insert1(key, **kwargs)
    cls().BodyPart.insert(
        [
            {"project_name": project_name, "bodypart": bp}
            for bp in all_bodyparts
        ],
        **kwargs,
    )
    if add_to_files:  # Check for training files to add
        cls().add_training_files(key, **kwargs)

    return {
        "project_name": project_name,
        "config_path": config_path.as_posix(),
    }

insert_new_project(project_name, bodyparts, lab_team, frames_per_video, video_list, groupname=None, project_directory=dlc_project_dir, output_path=dlc_video_dir, **kwargs) classmethod

Insert a new project into DLCProject table.

Parameters:

Name Type Description Default
project_name str

user-friendly name of project

required
groupname str

Name for project group. If None, defaults to username

None
bodyparts list

list of bodyparts to label. Should match bodyparts in BodyPart table

required
lab_team str

name of lab team. Should match an entry in LabTeam table

required
project_directory str

directory where to create project. (Default is '/cumulus/deeplabcut/')

dlc_project_dir
frames_per_video int

number of frames to extract from each video

required
video_list list

list of (a) dicts of to query VideoFile table for or (b) absolute paths to videos to train on. If dict, use format: [{'nwb_file_name': nwb_file_name, 'epoch': epoch #},...]

required
output_path str

target path to output converted videos (Default is '/nimbus/deeplabcut/videos/')

dlc_video_dir
Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def insert_new_project(
    cls,
    project_name: str,
    bodyparts: List,
    lab_team: str,
    frames_per_video: int,
    video_list: List,
    groupname: str = None,
    project_directory: str = dlc_project_dir,
    output_path: str = dlc_video_dir,
    **kwargs,
):
    """Insert a new project into DLCProject table.

    Parameters
    ----------
    project_name : str
        user-friendly name of project
    groupname : str, optional
        Name for project group. If None, defaults to username
    bodyparts : list
        list of bodyparts to label. Should match bodyparts in BodyPart table
    lab_team : str
        name of lab team. Should match an entry in LabTeam table
    project_directory : str
        directory where to create project.
        (Default is '/cumulus/deeplabcut/')
    frames_per_video : int
        number of frames to extract from each video
    video_list : list
        list of (a) dicts of to query VideoFile table for or (b) absolute
        paths to videos to train on. If dict, use format:
        [{'nwb_file_name': nwb_file_name, 'epoch': epoch #},...]
    output_path : str
        target path to output converted videos
        (Default is '/nimbus/deeplabcut/videos/')
    """
    from deeplabcut import create_new_project

    if (existing := cls()._existing_project(project_name)) is not None:
        return existing
    if not bool(LabTeam() & {"team_name": lab_team}):
        raise ValueError(f"LabTeam does not exist: {lab_team}")

    add_to_files = kwargs.pop("add_to_files", True)
    skeleton_node = None
    # If dict, assume of form {'nwb_file_name': nwb_file_name, 'epoch': epoch}
    # and pass to get_video_path to reference VideoFile table for path

    videos = cls()._process_videos(video_list, output_path)

    config_path = create_new_project(
        project=project_name,
        experimenter=sanitize_filename(lab_team),
        videos=videos,
        working_directory=project_directory,
        copy_videos=True,
        multianimal=False,
    )
    for bodypart in bodyparts:
        if not bool(BodyPart() & {"bodypart": bodypart}):
            raise ValueError(
                f"bodypart: {bodypart} not found in BodyPart table"
            )
    kwargs_copy = copy.deepcopy(kwargs)
    kwargs_copy.update({"numframes2pick": frames_per_video, "dotsize": 3})

    add_to_config(
        config_path, bodyparts, skeleton_node=skeleton_node, **kwargs_copy
    )

    key = {
        "project_name": project_name,
        "team_name": lab_team,
        "bodyparts": bodyparts,
        "config_path": config_path,
        "frames_per_video": frames_per_video,
    }
    cls().insert1(key, **kwargs)
    cls().BodyPart.insert(
        [
            {"project_name": project_name, "bodypart": bp}
            for bp in bodyparts
        ],
        **kwargs,
    )
    if add_to_files:  # Add videos to training files
        cls().add_training_files(key, **kwargs)

    if isinstance(config_path, PosixPath):
        config_path = config_path.as_posix()
    return {"project_name": project_name, "config_path": config_path}

add_video_files(video_list, config_path=None, key=None, output_path=dlc_video_dir, add_new=False, add_to_files=True, **kwargs) classmethod

Add videos to existing project or create new project

Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def add_video_files(
    cls,
    video_list,
    config_path=None,
    key=None,
    output_path: str = dlc_video_dir,
    add_new=False,
    add_to_files=True,
    **kwargs,
):
    """Add videos to existing project or create new project"""
    has_config_or_key = bool(config_path) or bool(key)
    if add_new and not has_config_or_key:
        raise ValueError("If add_new, must provide key or config_path")

    config_path = config_path or (cls & key).fetch1("config_path")
    has_proj = bool(key) or len(cls & {"config_path": config_path}) == 1
    if add_to_files and not has_proj:
        raise ValueError("Cannot set add_to_files=True without passing key")

    videos = cls()._process_videos(video_list, output_path)

    if add_new:
        from deeplabcut import add_new_videos

        add_new_videos(config=config_path, videos=videos, copy_videos=True)

    if add_to_files:  # Add videos to training files
        cls().add_training_files(key, **kwargs)
    return videos

add_training_files(key, **kwargs) classmethod

Add training videos and labeled frames .h5 and .csv to DLCProject.File

Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def add_training_files(cls, key, **kwargs):
    """Add training videos and labeled frames .h5
    and .csv to DLCProject.File"""
    from deeplabcut.utils.auxiliaryfunctions import read_config

    config_path = (cls & {"project_name": key["project_name"]}).fetch1(
        "config_path"
    )

    key = {  # Remove non-essential vals from key
        k: v
        for k, v in key.items()
        if k
        not in [
            "bodyparts",
            "team_name",
            "config_path",
            "frames_per_video",
        ]
    }

    cfg = read_config(config_path)
    video_names = list(cfg["video_sets"])
    label_dir = Path(cfg["project_path"]) / "labeled-data"
    training_files = []

    video_inserts = []
    for video in video_names:
        vid_path_obj = Path(video)
        video_name = vid_path_obj.stem
        training_files.extend((label_dir / video_name).glob("*Collected*"))
        key.update(
            {
                "file_name": video_name,
                "file_ext": vid_path_obj.suffix[1:],  # remove leading '.'
                "file_path": video,
            }
        )
    cls().File.insert(video_inserts, **kwargs)

    if len(training_files) == 0:
        logger.warning("No training files to add")
        return

    for file in training_files:
        path_obj = Path(file)
        cls().File.insert1(
            {
                **key,
                "file_name": f"{path_obj.name}_labeled_data",
                "file_ext": path_obj.suffix[1:],
                "file_path": file,
            },
            **kwargs,
        )

run_extract_frames(key, **kwargs) classmethod

Convenience function to launch DLC GUI for extracting frames. Must be run on local machine to access GUI, cannot be run through ssh tunnel

Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def run_extract_frames(cls, key, **kwargs):
    """Convenience function to launch DLC GUI for extracting frames.
    Must be run on local machine to access GUI,
    cannot be run through ssh tunnel
    """
    config_path = (cls & key).fetch1("config_path")
    from deeplabcut import extract_frames

    extract_frames(config_path, **kwargs)

run_label_frames(key) classmethod

Convenience function to launch DLC GUI for labeling frames. Must be run on local machine to access GUI, cannot be run through ssh tunnel

Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def run_label_frames(cls, key):
    """Convenience function to launch DLC GUI for labeling frames.
    Must be run on local machine to access GUI,
    cannot be run through ssh tunnel
    """
    config_path = (cls & key).fetch1("config_path")
    try:
        from deeplabcut import label_frames
    except (ModuleNotFoundError, ImportError):
        logger.error("DLC loaded in light mode, cannot label frames")
        return

    label_frames(config_path)

check_labels(key, **kwargs) classmethod

Convenience function to check labels on previously extracted and labeled frames

Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def check_labels(cls, key, **kwargs):
    """Convenience function to check labels on
    previously extracted and labeled frames
    """
    config_path = (cls & key).fetch1("config_path")
    from deeplabcut import check_labels

    check_labels(config_path, **kwargs)

import_labeled_frames(key, new_proj_path, video_filenames, **kwargs) classmethod

Function to import pre-labeled frames from an existing project into a new project

Parameters:

Name Type Description Default
key Dict

key to specify entry in DLCProject table to add labeled frames to

required
new_proj_path Union[str, PosixPath]

absolute path to project directory containing labeled frames to import

required
video_filenames str or List

filename or list of filenames of video(s) from which to import frames. Without file extension

required
Source code in src/spyglass/position/v1/position_dlc_project.py
@classmethod
def import_labeled_frames(
    cls,
    key: Dict,
    new_proj_path: Union[str, PosixPath],
    video_filenames: Union[str, List],
    **kwargs,
):
    """Function to import pre-labeled frames from an existing project
    into a new project

    Parameters
    ----------
    key : Dict
        key to specify entry in DLCProject table to add labeled frames to
    new_proj_path : Union[str, PosixPath]
        absolute path to project directory containing
        labeled frames to import
    video_filenames : str or List
        filename or list of filenames of video(s) from which to import
        frames. Without file extension
    """
    project_entry = (cls & key).fetch1()
    team_name = project_entry["team_name"].replace(" ", "_")
    this_proj_path = Path(project_entry["config_path"]).parent
    this_data_path = this_proj_path / "labeled-data"
    new_proj_path = Path(new_proj_path)  # If Path(Path), no change
    new_data_path = new_proj_path / "labeled-data"

    if not new_data_path.exists():
        raise FileNotFoundError(f"Cannot find directory: {new_data_path}")

    videos = (
        video_filenames
        if isinstance(video_filenames, List)
        else [video_filenames]
    )
    for video_file in videos:
        h5_file = next((new_data_path / video_file).glob("*h5"))
        dlc_df = pd.read_hdf(h5_file)
        dlc_df.columns = dlc_df.columns.set_levels([team_name], level=0)
        new_video_path = this_data_path / video_file
        new_video_path.mkdir(exist_ok=True)
        dlc_df.to_hdf(
            new_video_path / f"CollectedData_{team_name}.h5",
            "df_with_missing",
        )
    cls().add_training_files(key, **kwargs)

add_to_config(config, bodyparts=None, skeleton_node=None, **kwargs)

Add necessary items to the config.yaml for the model

Parameters:

Name Type Description Default
config str

Path to config.yaml

required
bodyparts list

list of bodyparts to add to model

None
skeleton_node str

(default is None) node to link LEDs in skeleton

None
kwargs dict

Other parameters of config to modify in key:value pairs

{}
Source code in src/spyglass/position/v1/position_dlc_project.py
def add_to_config(
    config, bodyparts: List = None, skeleton_node: str = None, **kwargs
):
    """Add necessary items to the config.yaml for the model

    Parameters
    ----------
    config : str
        Path to config.yaml
    bodyparts : list
        list of bodyparts to add to model
    skeleton_node : str
        (default is None) node to link LEDs in skeleton
    kwargs : dict
        Other parameters of config to modify in key:value pairs
    """

    yaml = YAML()
    with open(config) as fp:
        data = yaml.load(fp)

    if bodyparts:
        data["bodyparts"] = bodyparts
        led_parts = [bp for bp in bodyparts if "LED" in bp]
        bodypart_skeleton = (
            [
                list(link)
                for link in combinations(led_parts, 2)
                if skeleton_node in link
            ]
            if skeleton_node
            else list(combinations(led_parts, 2))
        )
        other_parts = list(set(bodyparts) - set(led_parts))
        for ind, part in enumerate(other_parts):
            other_parts[ind] = [part, part]
        bodypart_skeleton.append(other_parts)
        data["skeleton"] = bodypart_skeleton

    kwargs.update(
        {str(k): v for k, v in kwargs.items() if not isinstance(k, str)}
    )

    with open(config, "w") as fw:
        yaml.dump(data, fw)

sanitize_filename(filename)

Sanitize filename to remove special characters

Source code in src/spyglass/position/v1/position_dlc_project.py
def sanitize_filename(filename: str) -> str:
    """Sanitize filename to remove special characters"""
    char_map = {
        " ": "_",
        ".": "_",
        ",": "-",
        "&": "and",
        "'": "",
    }
    return "".join([char_map.get(c, c) for c in filename])