Skip to content

dj_merge_tables.py

Merge

Bases: Manual

Adds funcs to support standard Merge table operations.

Many methods have the @classmethod decorator to permit MergeTable.method() symtax. This makes access to instance attributes (e.g., (MergeTable & "example='restriction'").restriction) harder, but these attributes have limited utility when the user wants to, for example, restrict the merged view rather than the master table itself.

Source code in src/spyglass/utils/dj_merge_tables.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
class Merge(dj.Manual):
    """Adds funcs to support standard Merge table operations.

    Many methods have the @classmethod decorator to permit MergeTable.method()
    symtax. This makes access to instance attributes (e.g., (MergeTable &
    "example='restriction'").restriction) harder, but these attributes have
    limited utility when the user wants to, for example, restrict the merged
    view rather than the master table itself.
    """

    def __init__(self):
        super().__init__()
        self._reserved_pk = RESERVED_PRIMARY_KEY
        self._reserved_sk = RESERVED_SECONDARY_KEY
        merge_def = (
            f"\n    {self._reserved_pk}: uuid\n    ---\n"
            + f"    {self._reserved_sk}: varchar({RESERVED_SK_LENGTH})\n    "
        )
        if not self.is_declared:
            # remove comments after # from each line of definition
            if self._remove_comments(self.definition) != merge_def:
                logger.warn(
                    "Merge table with non-default definition\n\t"
                    + f"Expected: {merge_def.strip()}\n\t"
                    + f"Actual  : {self.definition.strip()}"
                )
            for part in self.parts(as_objects=True):
                if part.primary_key != self.primary_key:
                    logger.warn(
                        f"Unexpected primary key in {part.table_name}"
                        + f"\n\tExpected: {self.primary_key}"
                        + f"\n\tActual  : {part.primary_key}"
                    )
        self._analysis_nwbfile = None

    @property  # CB: This is a property to avoid circular import
    def analysis_nwbfile(self):
        if self._analysis_nwbfile is None:
            from spyglass.common import AnalysisNwbfile  # noqa F401

            self._analysis_nwbfile = AnalysisNwbfile
        return self._analysis_nwbfile

    def _remove_comments(self, definition):
        """Use regular expressions to remove comments and blank lines"""
        return re.sub(  # First remove comments, then blank lines
            r"\n\s*\n", "\n", re.sub(r"#.*\n", "\n", definition)
        )

    @classmethod
    def _merge_restrict_parts(
        cls,
        restriction: str = True,
        as_objects: bool = True,
        return_empties: bool = True,
        add_invalid_restrict: bool = True,
    ) -> list:
        """Returns a list of parts with restrictions applied.

        Parameters
        ---------
        restriction: str, optional
            Restriction to apply to the parts. Default True, no restrictions.
        as_objects: bool, optional
            Default True. Return part tables as objects
        return_empties: bool, optional
            Default True. Return empty part tables
        add_invalid_restrict: bool, optional
            Default True. Include part for which the restriction is invalid.

        Returns
        ------
        list
            list of datajoint tables, parts of Merge Table
        """

        cls._ensure_dependencies_loaded()

        if not restriction:
            restriction = True

        # Normalize restriction to sql string
        restr_str = make_condition(cls(), restriction, set())

        parts_all = cls.parts(as_objects=True)
        # If the restriction makes ref to a source, we only want that part
        if (
            not return_empties
            and isinstance(restr_str, str)
            and f"`{cls()._reserved_sk}`" in restr_str
        ):
            parts_all = [
                part
                for part in parts_all
                if from_camel_case(
                    restr_str.split(f'`{cls()._reserved_sk}`="')[-1].split('"')[
                        0
                    ]
                )  # Only look at source part table
                in part.full_table_name
            ]
        if isinstance(restriction, dict):  # restr by source already done above
            _ = restriction.pop(cls()._reserved_sk, None)  # won't work for str
            # If a dict restriction has all invalid keys, it is treated as True
            if not add_invalid_restrict:
                parts_all = [  # so exclude tables w/ nonmatching attrs
                    p
                    for p in parts_all
                    if all([k in p.heading.names for k in restriction.keys()])
                ]

        parts = []
        for part in parts_all:
            try:
                parts.append(part.restrict(restriction))
            except DataJointError:  # If restriction not valid on given part
                if add_invalid_restrict:
                    parts.append(part)

        if not return_empties:
            parts = [p for p in parts if len(p)]
        if not as_objects:
            parts = [p.full_table_name for p in parts]

        return parts

    @classmethod
    def _merge_restrict_parents(
        cls,
        restriction: str = True,
        parent_name: str = None,
        as_objects: bool = True,
        return_empties: bool = True,
        add_invalid_restrict: bool = True,
    ) -> list:
        """Returns a list of part parents with restrictions applied.

        Rather than part tables, we look at parents of those parts, the source
        of the data.

        Parameters
        ---------
        restriction: str, optional
            Restriction to apply to the returned parent. Default True, no
            restrictions.
        parent_name: str, optional
            CamelCase name of the parent.
        as_objects: bool, optional
            Default True. Return part tables as objects
        return_empties: bool, optional
            Default True. Return empty part tables
        add_invalid_restrict: bool, optional
            Default True. Include part for which the restriction is invalid.

        Returns
        ------
        list
            list of datajoint tables, parents of parts of Merge Table
        """
        # .restrict(restriction) does not work on returned part FreeTable
        # & part.fetch below restricts parent to entries in merge table
        part_parents = [
            parent
            & part.fetch(*part.heading.secondary_attributes, as_dict=True)
            for part in cls()._merge_restrict_parts(
                restriction=restriction,
                return_empties=return_empties,
                add_invalid_restrict=add_invalid_restrict,
            )
            for parent in part.parents(as_objects=True)  # ID respective parents
            if cls().table_name not in parent.full_table_name  # Not merge table
        ]
        if parent_name:
            part_parents = [
                p
                for p in part_parents
                if from_camel_case(parent_name) in p.full_table_name
            ]
        if not as_objects:
            part_parents = [p.full_table_name for p in part_parents]

        return part_parents

    @classmethod
    def _merge_repr(cls, restriction: str = True) -> dj.expression.Union:
        """Merged view, including null entries for columns unique to one part.

        Parameters
        ---------
        restriction: str, optional
            Restriction to apply to the merged view

        Returns
        ------
        datajoint.expression.Union
        """

        parts = [
            cls() * p  # join with master to include sec key (i.e., 'source')
            for p in cls._merge_restrict_parts(
                restriction=restriction,
                add_invalid_restrict=False,
                return_empties=True,
            )
        ]

        primary_attrs = list(
            dict.fromkeys(  # get all columns from parts
                iter_chain.from_iterable([p.heading.names for p in parts])
            )
        )
        # primary_attrs.append(cls()._reserved_sk)
        query = dj.U(*primary_attrs) * parts[0].proj(  # declare query
            ...,  # include all attributes from part 0
            **{
                a: "NULL"  # add null value where part has no column
                for a in primary_attrs
                if a not in parts[0].heading.names
            },
        )
        for part in parts[1:]:  # add to declared query for each part
            query += dj.U(*primary_attrs) * part.proj(
                ...,
                **{
                    a: "NULL"
                    for a in primary_attrs
                    if a not in part.heading.names
                },
            )
        return query

    @classmethod
    def _merge_insert(
        cls, rows: list, part_name: str = None, mutual_exclusvity=True, **kwargs
    ) -> None:
        """Insert rows into merge, ensuring db integrity and mutual exclusivity

        Parameters
        ---------
        rows: List[dict]
            An iterable where an element is a dictionary.
        part: str, optional
            CamelCase name of the part table

        Raises
        ------
        TypeError
            If rows is not a list of dicts
        ValueError
            If entry already exists, mutual exclusivity errors
            If data doesn't exist in part parents, integrity error
        """
        cls._ensure_dependencies_loaded()

        try:
            for r in iter(rows):
                assert isinstance(
                    r, dict
                ), 'Input "rows" must be a list of dictionaries'
        except TypeError:
            raise TypeError('Input "rows" must be a list of dictionaries')

        parts = cls._merge_restrict_parts(as_objects=True)
        if part_name:
            parts = [
                p
                for p in parts
                if from_camel_case(part_name) in p.full_table_name
            ]

        master_entries = []
        parts_entries = {p: [] for p in parts}
        for row in rows:
            keys = []  # empty to-be-inserted key
            for part in parts:  # check each part
                part_parent = part.parents(as_objects=True)[-1]
                part_name = to_camel_case(part.table_name.split("__")[-1])
                if part_parent & row:  # if row is in part parent
                    if keys and mutual_exclusvity:  # if key from other part
                        raise ValueError(
                            "Mutual Exclusivity Error! Entry exists in more "
                            + f"than one table - Entry: {row}"
                        )

                    keys = (part_parent & row).fetch("KEY")  # get pk
                    if len(keys) > 1:
                        raise ValueError(
                            "Ambiguous entry. Data has mult rows in "
                            + f"{part_name}:\n\tData:{row}\n\t{keys}"
                        )
                    master_pk = {  # make uuid
                        cls()._reserved_pk: dj.hash.key_hash(keys[0]),
                    }
                    parts_entries[part].append({**master_pk, **keys[0]})
                    master_entries.append(
                        {**master_pk, cls()._reserved_sk: part_name}
                    )

            if not keys:
                raise ValueError(
                    "Non-existing entry in any of the parent tables - Entry: "
                    + f"{row}"
                )

        with cls._safe_context():
            super().insert(cls(), master_entries, **kwargs)
            for part, part_entries in parts_entries.items():
                part.insert(part_entries, **kwargs)

    @classmethod
    def _safe_context(cls):
        """Return transaction if not already in one."""
        return (
            cls.connection.transaction
            if not cls.connection.in_transaction
            else nullcontext()
        )

    @classmethod
    def _ensure_dependencies_loaded(cls) -> None:
        """Ensure connection dependencies loaded.

        Otherwise parts returns none
        """
        if not dj.conn.connection.dependencies._loaded:
            dj.conn.connection.dependencies.load()

    def insert(self, rows: list, mutual_exclusvity=True, **kwargs):
        """Merges table specific insert

        Ensuring db integrity and mutual exclusivity

        Parameters
        ---------
        rows: List[dict]
            An iterable where an element is a dictionary.
        mutual_exclusvity: bool
            Check for mutual exclusivity before insert. Default True.

        Raises
        ------
        TypeError
            If rows is not a list of dicts
        ValueError
            If entry already exists, mutual exclusivity errors
            If data doesn't exist in part parents, integrity error
        """
        self._merge_insert(rows, mutual_exclusvity=mutual_exclusvity, **kwargs)

    @classmethod
    def merge_view(cls, restriction: str = True):
        """Prints merged view, including null entries for unique columns.

        Note: To handle this Union as a table-like object, use `merge_resrict`

        Parameters
        ---------
        restriction: str, optional
            Restriction to apply to the merged view
        """

        # If we overwrite `preview`, we then encounter issues with operators
        # getting passed a `Union`, which doesn't have a method we can
        # intercept to manage master/parts

        return pprint(cls._merge_repr(restriction=restriction))

    @classmethod
    def merge_html(cls, restriction: str = True):
        """Displays HTML in notebooks."""

        return HTML(repr_html(cls._merge_repr(restriction=restriction)))

    @classmethod
    def merge_restrict(cls, restriction: str = True) -> dj.U:
        """Given a restriction, return a merged view with restriction applied.

        Example
        -------
            >>> MergeTable.merge_restrict("field = 1")

        Parameters
        ----------
        restriction: str
            Restriction one would apply if `merge_view` was a real table.

        Returns
        -------
        datajoint.Union
            Merged view with restriction applied.
        """
        return cls._merge_repr(restriction=restriction)

    @classmethod
    def merge_delete(cls, restriction: str = True, **kwargs):
        """Given a restriction string, delete corresponding entries.

        Parameters
        ----------
        restriction: str
            Optional restriction to apply before deletion from master/part
            tables. If not provided, delete all entries.
        kwargs: dict
            Additional keyword arguments for DataJoint delete.

        Example
        -------
            >>> MergeTable.merge_delete("field = 1")
        """
        uuids = [
            {k: v}
            for entry in cls.merge_restrict(restriction).fetch("KEY")
            for k, v in entry.items()
            if k == cls()._reserved_pk
        ]
        (cls() & uuids).delete(**kwargs)

    @classmethod
    def merge_delete_parent(
        cls, restriction: str = True, dry_run=True, **kwargs
    ) -> list:
        """Delete entries from merge master, part, and respective part parents

        Note: Clears merge entries from their respective parents.

        Parameters
        ----------
        restriction: str
            Optional restriction to apply before deletion from parents. If not
            provided, delete all entries present in Merge Table.
        dry_run: bool
            Default True. If true, return list of tables with entries that would
            be deleted. Otherwise, table entries.
        kwargs: dict
            Additional keyword arguments for DataJoint delete.
        """
        part_parents = cls._merge_restrict_parents(
            restriction=restriction, as_objects=True, return_empties=False
        )

        if dry_run:
            return part_parents

        merge_ids = cls.merge_restrict(restriction).fetch(
            RESERVED_PRIMARY_KEY, as_dict=True
        )

        # CB: Removed transaction protection here bc 'no' confirmation resp
        # still resulted in deletes. If re-add, consider transaction=False
        super().delete((cls & merge_ids), **kwargs)

        if cls & merge_ids:  # If 'no' on del prompt from above, skip below
            return  # User can still abort del below, but yes/no is unlikly

        for part_parent in part_parents:
            super().delete(part_parent, **kwargs)

    @classmethod
    def fetch_nwb(
        cls,
        restriction: str = True,
        multi_source=False,
        disable_warning=False,
        *attrs,
        **kwargs,
    ):
        """Return the AnalysisNwbfile file linked in the source.

        Parameters
        ----------
        restriction: str, optional
            Restriction to apply to parents before running fetch. Default True.
        multi_source: bool
            Return from multiple parents. Default False.
        """
        if not disable_warning:
            _warn_on_restriction(table=cls, restriction=restriction)

        part_parents = cls._merge_restrict_parents(
            restriction=restriction,
            return_empties=False,
            add_invalid_restrict=False,
        )

        if not multi_source and len(part_parents) != 1:
            raise ValueError(
                f"{len(part_parents)} possible sources found in Merge Table:"
                + " and ".join([p.full_table_name for p in part_parents])
            )

        nwbs = []
        for part_parent in part_parents:
            nwbs.extend(
                fetch_nwb(
                    part_parent,
                    (cls().analysis_nwbfile, "analysis_file_abs_path"),
                    *attrs,
                    **kwargs,
                )
            )
        return nwbs

    @classmethod
    def merge_get_part(
        cls,
        restriction: str = True,
        join_master: bool = False,
        restrict_part=True,
        multi_source=False,
    ) -> dj.Table:
        """Retrieve part table from a restricted Merge table.

        Note: unlike other Merge Table methods, returns the native table, not
        a FreeTable

        Parameters
        ----------
        restriction: str
            Optional restriction to apply before determining part to return.
            Default True.
        join_master: bool
            Join part with Merge master to show source field. Default False.
        restrict_part: bool
            Apply restriction to part. Default True. If False, return the
            native part table.
        multi_source: bool
            Return multiple parts. Default False.

        Returns
        ------
        Union[dj.Table, List[dj.Table]]
            Native part table(s) of Merge. If `multi_source`, returns list.

        Example
        -------
            >>> (MergeTable & restriction).get_part_table()
            >>> MergeTable().merge_get_part(restriction, join_master=True)

        Raises
        ------
        ValueError
            If multiple sources are found, but not expected lists and suggests
            restricting
        """
        sources = [
            to_camel_case(n.split("__")[-1].strip("`"))  # friendly part name
            for n in cls._merge_restrict_parts(
                restriction=restriction,
                as_objects=False,
                return_empties=False,
                add_invalid_restrict=False,
            )
        ]

        if not multi_source and len(sources) != 1:
            raise ValueError(
                f"Found {len(sources)} potential parts: {sources}\n\t"
                + "Try adding a restriction before invoking `get_part`.\n\t"
                + "Or permitting multiple sources with `multi_source=True`."
            )

        parts = [
            getattr(cls, source)().restrict(restriction)
            if restrict_part  # Re-apply restriction or don't
            else getattr(cls, source)()
            for source in sources
        ]
        if join_master:
            parts = [cls * part for part in parts]

        return parts if multi_source else parts[0]

    @classmethod
    def merge_get_parent(
        cls,
        restriction: str = True,
        join_master: bool = False,
        multi_source=False,
    ) -> dj.FreeTable:
        """Returns a list of part parents with restrictions applied.

        Rather than part tables, we look at parents of those parts, the source
        of the data, and only the rows that have keys inserted in the merge
        table.

        Parameters
        ----------
        restriction: str
            Optional restriction to apply before determining parent to return.
            Default True.
        join_master: bool
            Default False. Join part with Merge master to show uuid and source

        Returns
        ------
        dj.FreeTable
            Parent of parts of Merge Table as FreeTable.
        """

        part_parents = cls._merge_restrict_parents(
            restriction=restriction,
            as_objects=True,
            return_empties=False,
            add_invalid_restrict=False,
        )

        if not multi_source and len(part_parents) != 1:
            raise ValueError(
                f"Found  {len(part_parents)} potential parents: {part_parents}"
                + "\n\tTry adding a string restriction when invoking "
                + "`get_parent`. Or permitting multiple sources with "
                + "`multi_source=True`."
            )

        if join_master:
            part_parents = [cls * part for part in part_parents]

        return part_parents if multi_source else part_parents[0]

    @classmethod
    def merge_fetch(self, restriction: str = True, *attrs, **kwargs) -> list:
        """Perform a fetch across all parts. If >1 result, return as a list.

        Parameters
        ----------
        restriction: str
            Optional restriction to apply before determining parent to return.
            Default True.
        attrs, kwargs
            arguments passed to DataJoint `fetch` call

        Returns
        -------
        Union[ List[np.array], List[dict], List[pd.DataFrame] ]
            Table contents, with type determined by kwargs
        """
        results = []
        parts = self()._merge_restrict_parts(
            restriction=restriction,
            as_objects=True,
            return_empties=False,
            add_invalid_restrict=False,
        )

        for part in parts:
            try:
                results.extend(part.fetch(*attrs, **kwargs))
            except DataJointError as e:
                logger.warn(
                    f"{e.args[0]} Skipping "
                    + to_camel_case(part.table_name.split("__")[-1])
                )

        # Note: this could collapse results like merge_view, but user may call
        # for recarray, pd.DataFrame, or dict, and fetched contents differ if
        # attrs or "KEY" called. Intercept format, merge, and then transform?

        if not results:
            logger.info(
                "No merge_fetch results.\n\t"
                + "If not restricting, try: `M.merge_fetch(True,'attr')\n\t"
                + "If restricting by source, use dict: "
                + "`M.merge_fetch({'source':'X'})"
            )
        return results[0] if len(results) == 1 else results

    @classmethod
    def merge_populate(source: str, key=None):
        raise NotImplementedError(
            "CBroz: In the future, this command will support executing "
            + "part_parent `make` and then inserting all entries into Merge"
        )

insert(rows, mutual_exclusvity=True, **kwargs)

Merges table specific insert

Ensuring db integrity and mutual exclusivity

Parameters:

Name Type Description Default
rows list

An iterable where an element is a dictionary.

required
mutual_exclusvity

Check for mutual exclusivity before insert. Default True.

True

Raises:

Type Description
TypeError

If rows is not a list of dicts

ValueError

If entry already exists, mutual exclusivity errors If data doesn't exist in part parents, integrity error

Source code in src/spyglass/utils/dj_merge_tables.py
def insert(self, rows: list, mutual_exclusvity=True, **kwargs):
    """Merges table specific insert

    Ensuring db integrity and mutual exclusivity

    Parameters
    ---------
    rows: List[dict]
        An iterable where an element is a dictionary.
    mutual_exclusvity: bool
        Check for mutual exclusivity before insert. Default True.

    Raises
    ------
    TypeError
        If rows is not a list of dicts
    ValueError
        If entry already exists, mutual exclusivity errors
        If data doesn't exist in part parents, integrity error
    """
    self._merge_insert(rows, mutual_exclusvity=mutual_exclusvity, **kwargs)

merge_view(restriction=True) classmethod

Prints merged view, including null entries for unique columns.

Note: To handle this Union as a table-like object, use merge_resrict

Parameters:

Name Type Description Default
restriction str

Restriction to apply to the merged view

True
Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_view(cls, restriction: str = True):
    """Prints merged view, including null entries for unique columns.

    Note: To handle this Union as a table-like object, use `merge_resrict`

    Parameters
    ---------
    restriction: str, optional
        Restriction to apply to the merged view
    """

    # If we overwrite `preview`, we then encounter issues with operators
    # getting passed a `Union`, which doesn't have a method we can
    # intercept to manage master/parts

    return pprint(cls._merge_repr(restriction=restriction))

merge_html(restriction=True) classmethod

Displays HTML in notebooks.

Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_html(cls, restriction: str = True):
    """Displays HTML in notebooks."""

    return HTML(repr_html(cls._merge_repr(restriction=restriction)))

merge_restrict(restriction=True) classmethod

Given a restriction, return a merged view with restriction applied.

Example
>>> MergeTable.merge_restrict("field = 1")

Parameters:

Name Type Description Default
restriction str

Restriction one would apply if merge_view was a real table.

True

Returns:

Type Description
Union

Merged view with restriction applied.

Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_restrict(cls, restriction: str = True) -> dj.U:
    """Given a restriction, return a merged view with restriction applied.

    Example
    -------
        >>> MergeTable.merge_restrict("field = 1")

    Parameters
    ----------
    restriction: str
        Restriction one would apply if `merge_view` was a real table.

    Returns
    -------
    datajoint.Union
        Merged view with restriction applied.
    """
    return cls._merge_repr(restriction=restriction)

merge_delete(restriction=True, **kwargs) classmethod

Given a restriction string, delete corresponding entries.

Parameters:

Name Type Description Default
restriction str

Optional restriction to apply before deletion from master/part tables. If not provided, delete all entries.

True
kwargs

Additional keyword arguments for DataJoint delete.

{}
Example
>>> MergeTable.merge_delete("field = 1")
Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_delete(cls, restriction: str = True, **kwargs):
    """Given a restriction string, delete corresponding entries.

    Parameters
    ----------
    restriction: str
        Optional restriction to apply before deletion from master/part
        tables. If not provided, delete all entries.
    kwargs: dict
        Additional keyword arguments for DataJoint delete.

    Example
    -------
        >>> MergeTable.merge_delete("field = 1")
    """
    uuids = [
        {k: v}
        for entry in cls.merge_restrict(restriction).fetch("KEY")
        for k, v in entry.items()
        if k == cls()._reserved_pk
    ]
    (cls() & uuids).delete(**kwargs)

merge_delete_parent(restriction=True, dry_run=True, **kwargs) classmethod

Delete entries from merge master, part, and respective part parents

Note: Clears merge entries from their respective parents.

Parameters:

Name Type Description Default
restriction str

Optional restriction to apply before deletion from parents. If not provided, delete all entries present in Merge Table.

True
dry_run

Default True. If true, return list of tables with entries that would be deleted. Otherwise, table entries.

True
kwargs

Additional keyword arguments for DataJoint delete.

{}
Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_delete_parent(
    cls, restriction: str = True, dry_run=True, **kwargs
) -> list:
    """Delete entries from merge master, part, and respective part parents

    Note: Clears merge entries from their respective parents.

    Parameters
    ----------
    restriction: str
        Optional restriction to apply before deletion from parents. If not
        provided, delete all entries present in Merge Table.
    dry_run: bool
        Default True. If true, return list of tables with entries that would
        be deleted. Otherwise, table entries.
    kwargs: dict
        Additional keyword arguments for DataJoint delete.
    """
    part_parents = cls._merge_restrict_parents(
        restriction=restriction, as_objects=True, return_empties=False
    )

    if dry_run:
        return part_parents

    merge_ids = cls.merge_restrict(restriction).fetch(
        RESERVED_PRIMARY_KEY, as_dict=True
    )

    # CB: Removed transaction protection here bc 'no' confirmation resp
    # still resulted in deletes. If re-add, consider transaction=False
    super().delete((cls & merge_ids), **kwargs)

    if cls & merge_ids:  # If 'no' on del prompt from above, skip below
        return  # User can still abort del below, but yes/no is unlikly

    for part_parent in part_parents:
        super().delete(part_parent, **kwargs)

fetch_nwb(restriction=True, multi_source=False, disable_warning=False, *attrs, **kwargs) classmethod

Return the AnalysisNwbfile file linked in the source.

Parameters:

Name Type Description Default
restriction str

Restriction to apply to parents before running fetch. Default True.

True
multi_source

Return from multiple parents. Default False.

False
Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def fetch_nwb(
    cls,
    restriction: str = True,
    multi_source=False,
    disable_warning=False,
    *attrs,
    **kwargs,
):
    """Return the AnalysisNwbfile file linked in the source.

    Parameters
    ----------
    restriction: str, optional
        Restriction to apply to parents before running fetch. Default True.
    multi_source: bool
        Return from multiple parents. Default False.
    """
    if not disable_warning:
        _warn_on_restriction(table=cls, restriction=restriction)

    part_parents = cls._merge_restrict_parents(
        restriction=restriction,
        return_empties=False,
        add_invalid_restrict=False,
    )

    if not multi_source and len(part_parents) != 1:
        raise ValueError(
            f"{len(part_parents)} possible sources found in Merge Table:"
            + " and ".join([p.full_table_name for p in part_parents])
        )

    nwbs = []
    for part_parent in part_parents:
        nwbs.extend(
            fetch_nwb(
                part_parent,
                (cls().analysis_nwbfile, "analysis_file_abs_path"),
                *attrs,
                **kwargs,
            )
        )
    return nwbs

merge_get_part(restriction=True, join_master=False, restrict_part=True, multi_source=False) classmethod

Retrieve part table from a restricted Merge table.

Note: unlike other Merge Table methods, returns the native table, not a FreeTable

Parameters:

Name Type Description Default
restriction str

Optional restriction to apply before determining part to return. Default True.

True
join_master bool

Join part with Merge master to show source field. Default False.

False
restrict_part

Apply restriction to part. Default True. If False, return the native part table.

True
multi_source

Return multiple parts. Default False.

False

Returns:

Type Description
Union[Table, List[Table]]

Native part table(s) of Merge. If multi_source, returns list.

Example
>>> (MergeTable & restriction).get_part_table()
>>> MergeTable().merge_get_part(restriction, join_master=True)

Raises:

Type Description
ValueError

If multiple sources are found, but not expected lists and suggests restricting

Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_get_part(
    cls,
    restriction: str = True,
    join_master: bool = False,
    restrict_part=True,
    multi_source=False,
) -> dj.Table:
    """Retrieve part table from a restricted Merge table.

    Note: unlike other Merge Table methods, returns the native table, not
    a FreeTable

    Parameters
    ----------
    restriction: str
        Optional restriction to apply before determining part to return.
        Default True.
    join_master: bool
        Join part with Merge master to show source field. Default False.
    restrict_part: bool
        Apply restriction to part. Default True. If False, return the
        native part table.
    multi_source: bool
        Return multiple parts. Default False.

    Returns
    ------
    Union[dj.Table, List[dj.Table]]
        Native part table(s) of Merge. If `multi_source`, returns list.

    Example
    -------
        >>> (MergeTable & restriction).get_part_table()
        >>> MergeTable().merge_get_part(restriction, join_master=True)

    Raises
    ------
    ValueError
        If multiple sources are found, but not expected lists and suggests
        restricting
    """
    sources = [
        to_camel_case(n.split("__")[-1].strip("`"))  # friendly part name
        for n in cls._merge_restrict_parts(
            restriction=restriction,
            as_objects=False,
            return_empties=False,
            add_invalid_restrict=False,
        )
    ]

    if not multi_source and len(sources) != 1:
        raise ValueError(
            f"Found {len(sources)} potential parts: {sources}\n\t"
            + "Try adding a restriction before invoking `get_part`.\n\t"
            + "Or permitting multiple sources with `multi_source=True`."
        )

    parts = [
        getattr(cls, source)().restrict(restriction)
        if restrict_part  # Re-apply restriction or don't
        else getattr(cls, source)()
        for source in sources
    ]
    if join_master:
        parts = [cls * part for part in parts]

    return parts if multi_source else parts[0]

merge_get_parent(restriction=True, join_master=False, multi_source=False) classmethod

Returns a list of part parents with restrictions applied.

Rather than part tables, we look at parents of those parts, the source of the data, and only the rows that have keys inserted in the merge table.

Parameters:

Name Type Description Default
restriction str

Optional restriction to apply before determining parent to return. Default True.

True
join_master bool

Default False. Join part with Merge master to show uuid and source

False

Returns:

Type Description
FreeTable

Parent of parts of Merge Table as FreeTable.

Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_get_parent(
    cls,
    restriction: str = True,
    join_master: bool = False,
    multi_source=False,
) -> dj.FreeTable:
    """Returns a list of part parents with restrictions applied.

    Rather than part tables, we look at parents of those parts, the source
    of the data, and only the rows that have keys inserted in the merge
    table.

    Parameters
    ----------
    restriction: str
        Optional restriction to apply before determining parent to return.
        Default True.
    join_master: bool
        Default False. Join part with Merge master to show uuid and source

    Returns
    ------
    dj.FreeTable
        Parent of parts of Merge Table as FreeTable.
    """

    part_parents = cls._merge_restrict_parents(
        restriction=restriction,
        as_objects=True,
        return_empties=False,
        add_invalid_restrict=False,
    )

    if not multi_source and len(part_parents) != 1:
        raise ValueError(
            f"Found  {len(part_parents)} potential parents: {part_parents}"
            + "\n\tTry adding a string restriction when invoking "
            + "`get_parent`. Or permitting multiple sources with "
            + "`multi_source=True`."
        )

    if join_master:
        part_parents = [cls * part for part in part_parents]

    return part_parents if multi_source else part_parents[0]

merge_fetch(restriction=True, *attrs, **kwargs) classmethod

Perform a fetch across all parts. If >1 result, return as a list.

Parameters:

Name Type Description Default
restriction str

Optional restriction to apply before determining parent to return. Default True.

True
attrs

arguments passed to DataJoint fetch call

()
kwargs

arguments passed to DataJoint fetch call

()

Returns:

Type Description
Union[List[array], List[dict], List[DataFrame]]

Table contents, with type determined by kwargs

Source code in src/spyglass/utils/dj_merge_tables.py
@classmethod
def merge_fetch(self, restriction: str = True, *attrs, **kwargs) -> list:
    """Perform a fetch across all parts. If >1 result, return as a list.

    Parameters
    ----------
    restriction: str
        Optional restriction to apply before determining parent to return.
        Default True.
    attrs, kwargs
        arguments passed to DataJoint `fetch` call

    Returns
    -------
    Union[ List[np.array], List[dict], List[pd.DataFrame] ]
        Table contents, with type determined by kwargs
    """
    results = []
    parts = self()._merge_restrict_parts(
        restriction=restriction,
        as_objects=True,
        return_empties=False,
        add_invalid_restrict=False,
    )

    for part in parts:
        try:
            results.extend(part.fetch(*attrs, **kwargs))
        except DataJointError as e:
            logger.warn(
                f"{e.args[0]} Skipping "
                + to_camel_case(part.table_name.split("__")[-1])
            )

    # Note: this could collapse results like merge_view, but user may call
    # for recarray, pd.DataFrame, or dict, and fetched contents differ if
    # attrs or "KEY" called. Intercept format, merge, and then transform?

    if not results:
        logger.info(
            "No merge_fetch results.\n\t"
            + "If not restricting, try: `M.merge_fetch(True,'attr')\n\t"
            + "If restricting by source, use dict: "
            + "`M.merge_fetch({'source':'X'})"
        )
    return results[0] if len(results) == 1 else results

delete_downstream_merge(table, restriction=None, dry_run=True, recurse_level=2, disable_warning=False, **kwargs)

Given a table/restriction, id or delete relevant downstream merge entries

Parameters:

Name Type Description Default
table Table

DataJoint table or restriction thereof

required
restriction str

Optional restriction to apply before deletion from merge/part tables. If not provided, delete all downstream entries.

None
dry_run

Default True. If true, return list of tuples, merge/part tables downstream of table input. Otherwise, delete merge/part table entries.

True
recurse_level

Default 2. Depth to recurse into table descendants.

2
disable_warning

Default False. If True, don't warn about restrictions on table object.

False
kwargs

Additional keyword arguments for DataJoint delete.

{}

Returns:

Type Description
List[Tuple[Table, Table]]

Entries in merge/part tables downstream of table input.

Source code in src/spyglass/utils/dj_merge_tables.py
def delete_downstream_merge(
    table: dj.Table,
    restriction: str = None,
    dry_run=True,
    recurse_level=2,
    disable_warning=False,
    **kwargs,
) -> list:
    """Given a table/restriction, id or delete relevant downstream merge entries

    Parameters
    ----------
    table: dj.Table
        DataJoint table or restriction thereof
    restriction: str
        Optional restriction to apply before deletion from merge/part
        tables. If not provided, delete all downstream entries.
    dry_run: bool
        Default True. If true, return list of tuples, merge/part tables
        downstream of table input. Otherwise, delete merge/part table entries.
    recurse_level: int
        Default 2. Depth to recurse into table descendants.
    disable_warning: bool
        Default False. If True, don't warn about restrictions on table object.
    kwargs: dict
        Additional keyword arguments for DataJoint delete.

    Returns
    -------
    List[Tuple[dj.Table, dj.Table]]
        Entries in merge/part tables downstream of table input.
    """
    if not disable_warning:
        _warn_on_restriction(table, restriction)

    if not restriction:
        restriction = True

    descendants = _unique_descendants(table, recurse_level)
    merge_table_pairs = _master_table_pairs(
        table_list=descendants,
        restricted_parent=(table & restriction),
    )

    # restrict the merge table based on uuids in part
    # don't need part for del, but show on dry_run
    merge_pairs = [
        (merge & part.fetch(RESERVED_PRIMARY_KEY, as_dict=True), part)
        for merge, part in merge_table_pairs
    ]

    if dry_run:
        return merge_pairs

    for merge_table, _ in merge_pairs:
        merge_table.delete(**kwargs)