Skip to content

copairs.matching

copairs.matching

Sample pairs with given column restrictions.

Matcher

Class to get pair of rows given contraints in the columns.

Source code in src/copairs/matching.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class Matcher:
    """Class to get pair of rows given contraints in the columns."""

    def __init__(self, dframe: pd.DataFrame, columns: ColumnList, seed: int):
        """max_size: max number of rows to consider from the same value."""
        rng = np.random.default_rng(seed)
        self.original_index = dframe.index
        dframe = dframe[columns].reset_index(drop=True).copy()
        if (self.original_index == dframe.index).all():
            self.original_index = None
        dframe.index.name = "__copairs_ix"

        mappers = [reverse_index(dframe[col]) for col in dframe]

        # Create a column order based on the number of potential row matches
        # Useful to solve queries with more than one sameby
        n_pairs = {}
        for mapper in mappers:
            n_combs = mapper.apply(lambda x: comb(len(x), 2)).sum()
            n_pairs[mapper.name] = n_combs
        col_order = sorted(n_pairs, key=n_pairs.get)
        self.col_order = {column: i for i, column in enumerate(col_order)}

        self.values = dframe[columns].values
        self.reverse = {mapper.name: mapper.apply(set).to_dict() for mapper in mappers}
        self.rng = rng
        self.frozen_valid = frozenset(range(len(self.values)))
        self.col_to_ix = {c: i for i, c in enumerate(columns)}
        self.columns = columns
        self.n_pairs = n_pairs
        self.rand_iter = iter([])

    def _null_sample(self, diffby_all: ColumnList, diffby_any: ColumnList):
        """Sample a pair from the frame."""
        valid = set(self.frozen_valid)
        id1 = self.integers(0, len(valid) - 1)
        valid.remove(id1)
        valid = self._filter_diffby(id1, diffby_all, diffby_any, valid)

        if len(valid) == 0:
            # row1 = self.values[id1]
            # assert np.any(row1 == self.values, axis=1).all()
            raise UnpairedException(f"{id1} has no pairs")
        id2 = self.choice(list(valid))
        return id1, id2

    def sample_null_pair(self, diffby: ColumnList, n_tries=5):
        """Sample pairs from the data. It tries multiple times before raising an error."""
        if isinstance(diffby, dict):
            diffby_all, diffby_any = diffby.get("all", []), diffby.get("any", [])
            if len(diffby_any) == 1:
                raise ValueError("diffby: any should have more than one column")
        else:
            diffby_all = [diffby] if isinstance(diffby, str) else diffby
            diffby_any = []

        for _ in range(n_tries):
            try:
                return self._null_sample(diffby_all, diffby_any)
            except UnpairedException:
                pass
        raise ValueError("Number of tries exhusted. Could not find a valid pair")

    def rand_next(self):
        """Get next value from the precomputed value."""
        try:
            value = next(self.rand_iter)
        except StopIteration:
            rands = self.rng.uniform(size=int(1e6))
            self.rand_iter = iter(rands)
            value = next(self.rand_iter)
        return value

    def integers(self, min_val, max_val):
        """Get a random integer value between the specified range."""
        return int(self.rand_next() * (max_val - min_val + 1) + min_val)

    def choice(self, items):
        """Select a random item from the given list."""
        min_val, max_val = 0, len(items) - 1
        pos = self.integers(min_val, max_val)
        return items[pos]

    def get_all_pairs(
        self,
        sameby: Union[str, ColumnList, ColumnDict],
        diffby: Union[str, ColumnList, ColumnDict],
        original_index: bool = True,
    ):
        """Get all pairs with given params."""
        sameby, diffby = self._normalize_sameby_diffby(sameby, diffby)
        sameby, diffby = self._validate_inputs(sameby, diffby)

        if not sameby["all"] and not sameby["any"]:
            return self._no_sameby(diffby)

        pairs = dict()
        if sameby["all"]:
            pairs = self._sameby_all(sameby, diffby)

        if sameby["any"]:
            pairs = self._sameby_any(sameby, diffby, pairs)

        if original_index and self.original_index is not None:
            return self._get_original_index(pairs)

        return pairs

    def _get_original_index(self, pairs):
        return {
            k: [tuple(self.original_index[i] for i in p) for p in v]
            for k, v in pairs.items()
        }

    def _normalize_sameby_diffby(self, sameby, diffby):
        """Convert sameby and diffby to a consistent format: {'all': [...], 'any': [...]}."""
        keys = ["all", "any"]
        result = []

        for param in [sameby, diffby]:
            param_dict = {key: [] for key in keys}
            if isinstance(param, dict):
                for key in keys:
                    param_dict[key] = param.get(key, [])
            else:
                param_list = [param] if isinstance(param, str) else param
                param_dict["all"] = param_list
            result.append(param_dict)

        return tuple(result)

    def _validate_inputs(self, sameby, diffby):
        def validate_condition(condition_dict):
            new_condition_dict = {"all": [], "any": []}
            for key in ["all", "any"]:
                for item in condition_dict[key]:
                    evaluated_columns = self._evaluate_and_filter(item)
                    new_condition_dict[key].extend(evaluated_columns)
            return new_condition_dict

        sameby = validate_condition(sameby)
        diffby = validate_condition(diffby)

        if set(sameby["all"] + sameby["any"]) & set(diffby["all"] + diffby["any"]):
            raise ValueError("sameby and diffby must be disjoint lists")
        if not any([sameby["all"], sameby["any"], diffby["all"], diffby["any"]]):
            raise ValueError("sameby, diffby: at least one should be provided")
        if len(sameby["any"]) == 1:
            raise ValueError("sameby: any should have more than one column")
        if len(diffby["any"]) == 1:
            raise ValueError("diffby: any should have more than one column")

        return sameby, diffby

    def _evaluate_and_filter(self, item: str) -> list:
        if item in self.columns:
            return [item]

        column_names = re.findall(r"(\w+)\s*[=<>!]+", item)
        valid_column_names = [col for col in column_names if col in self.columns]
        if not valid_column_names:
            raise ValueError(f"Invalid query or column name: {item}")

        return valid_column_names

    def _no_sameby(self, diffby):
        if not diffby["any"]:
            return self._only_diffby_all(diffby["all"])
        elif not diffby["all"]:
            return self._only_diffby_any(diffby["any"])
        else:
            return self._only_diffby_all_any(diffby["all"], diffby["any"])

    def _sameby_all(self, sameby, diffby):
        if len(sameby["all"]) == 1:
            key = next(iter(sameby["all"]))
            return self._get_all_pairs_single(key, diffby["all"], diffby["any"])
        else:
            ComposedKey = namedtuple("ComposedKey", sameby["all"])
            sameby["all"] = sorted(sameby["all"], key=self.col_order.get)
            candidates = self._get_all_pairs_single(
                sameby["all"][0], diffby["all"], diffby["any"]
            )
            col_ix = [self.col_to_ix[col] for col in sameby["all"][1:]]

            pairs = dict()
            for key, indices in candidates.items():
                for id1, id2 in indices:
                    row1 = self.values[id1]
                    row2 = self.values[id2]
                    if np.all(row1[col_ix] == row2[col_ix]):
                        vals = key, *row1[col_ix]
                        key_tuple = ComposedKey(**dict(zip(sameby["all"], vals)))
                        pair = (id1, id2)
                        pairs.setdefault(key_tuple, list()).append(pair)

            return pairs

    def _sameby_any(self, sameby, diffby, pairs):
        if pairs:
            pair_values = list(set(itertools.chain.from_iterable(pairs.values())))
            pair_values = np.asarray([list(pair) for pair in pair_values])
            pairs_any = self._filter_pairs_by_condition(
                pair_values, sameby["any"], condition="any_same"
            )
            return {
                k: [p for p in v if p in set(map(tuple, pairs_any))]
                for k, v in pairs.items()
            }
        else:
            pairs = set()
            for col in sameby["any"]:
                col_pairs = self._get_all_pairs_single(
                    col, diffby["all"], diffby["any"]
                )
                pairs.update(set(itertools.chain.from_iterable(col_pairs.values())))
            pairs = list(pairs)
            pairs.sort(key=lambda x: (x[0], x[1]))
            return {None: pairs}

    def _get_all_pairs_single(
        self, sameby: str, diffby_all: ColumnList, diffby_any: ColumnList
    ):
        """Get all valid pairs for a single column."""
        mapper = self.reverse[sameby]
        pairs = dict()
        for key, rows in mapper.items():
            processed = set()
            for id1 in rows:
                valid = set(rows)
                processed.add(id1)
                valid -= processed
                valid = self._filter_diffby(id1, diffby_all, diffby_any, valid)
                for id2 in valid:
                    pair = (id1, id2)
                    pairs.setdefault(key, list()).append(pair)
        return pairs

    def _only_diffby_all(self, diffby_all: ColumnList):
        """Generate a dict with single NaN key containing all of the pairs with different values in the column list."""
        diffby_all = sorted(diffby_all, key=self.col_order.get)

        # Cartesian product for one of the diffby columns
        mapper = self.reverse[diffby_all[0]]
        pairs = self._get_full_pairs(mapper)

        if len(diffby_all) > 1:
            pairs = self._filter_pairs_by_condition(
                pairs, diffby_all[1:], condition="all_diff"
            )

        pairs = np.unique(pairs, axis=0)
        return {None: list(map(tuple, pairs))}

    def _only_diffby_any(self, diffby: ColumnList):
        """Generate a dict with single NaN key containing all of the pairs with different values in any of specififed columns."""
        diffby = sorted(diffby, key=self.col_order.get)

        pairs = []
        for diff_col in diffby:
            mapper = self.reverse[diff_col]
            pairs.extend(self._get_full_pairs(mapper))

        pairs = np.sort(np.asarray(pairs))
        pairs = np.unique(pairs, axis=0)
        return {None: list(map(tuple, pairs))}

    def _only_diffby_all_any(self, diffby_all: ColumnList, diffby_any: ColumnList):
        """Generate a dict with single NaN key containing all of the pairs with different values in any of specififed columns."""
        diffby_all_pairs = np.asarray(self._only_diffby_all(diffby_all)[None])
        diffby_all_any = self._filter_pairs_by_condition(
            diffby_all_pairs, diffby_any, condition="any_diff"
        )
        return {None: list(map(tuple, diffby_all_any))}

    def _filter_diffby(
        self, idx: int, diffby_all: ColumnList, diffby_any: ColumnList, valid: Set[int]
    ):
        """
        Remove from valid rows that have matches with idx in any of the diffby columns.

        :idx: index of the row to be compared
        :diffby: indices of columns that should have different values
        :valid: candidate rows to be evaluated
        :returns: subset of valid after removing indices.
        """
        row = self.values[idx]
        for col in diffby_all:
            val = row[self.col_to_ix[col]]
            if pd.isna(val):
                continue
            mapper = self.reverse[col]
            valid = valid - mapper[val]
        if diffby_any:
            mapped = []
            for col in diffby_any:
                val = row[self.col_to_ix[col]]
                if pd.isna(val):
                    continue
                mapper = self.reverse[col]
                mapped.append(mapper[val])
            if mapped:
                valid = valid - set.intersection(*mapped)
        return valid

    def _get_full_pairs(self, mapper):
        pairs = []
        for key_a, key_b in itertools.combinations(mapper.keys(), 2):
            pairs.extend(itertools.product(mapper[key_a], mapper[key_b]))
        pairs = np.array(pairs)
        return pairs

    def _filter_pairs_by_condition(self, pairs, columns, condition="all_same"):
        col_ix = [self.col_to_ix[col] for col in columns]
        vals_a = self.values[pairs[:, 0]][:, col_ix]
        vals_b = self.values[pairs[:, 1]][:, col_ix]

        if "same" in condition:
            valid = vals_a == vals_b
        elif "diff" in condition:
            valid = vals_a != vals_b

        if "all" in condition:
            valid = np.all(valid, axis=1)
        elif "any" in condition:
            valid = np.any(valid, axis=1)

        return pairs[valid]

__init__(dframe, columns, seed)

max_size: max number of rows to consider from the same value.

Source code in src/copairs/matching.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(self, dframe: pd.DataFrame, columns: ColumnList, seed: int):
    """max_size: max number of rows to consider from the same value."""
    rng = np.random.default_rng(seed)
    self.original_index = dframe.index
    dframe = dframe[columns].reset_index(drop=True).copy()
    if (self.original_index == dframe.index).all():
        self.original_index = None
    dframe.index.name = "__copairs_ix"

    mappers = [reverse_index(dframe[col]) for col in dframe]

    # Create a column order based on the number of potential row matches
    # Useful to solve queries with more than one sameby
    n_pairs = {}
    for mapper in mappers:
        n_combs = mapper.apply(lambda x: comb(len(x), 2)).sum()
        n_pairs[mapper.name] = n_combs
    col_order = sorted(n_pairs, key=n_pairs.get)
    self.col_order = {column: i for i, column in enumerate(col_order)}

    self.values = dframe[columns].values
    self.reverse = {mapper.name: mapper.apply(set).to_dict() for mapper in mappers}
    self.rng = rng
    self.frozen_valid = frozenset(range(len(self.values)))
    self.col_to_ix = {c: i for i, c in enumerate(columns)}
    self.columns = columns
    self.n_pairs = n_pairs
    self.rand_iter = iter([])

choice(items)

Select a random item from the given list.

Source code in src/copairs/matching.py
146
147
148
149
150
def choice(self, items):
    """Select a random item from the given list."""
    min_val, max_val = 0, len(items) - 1
    pos = self.integers(min_val, max_val)
    return items[pos]

get_all_pairs(sameby, diffby, original_index=True)

Get all pairs with given params.

Source code in src/copairs/matching.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_all_pairs(
    self,
    sameby: Union[str, ColumnList, ColumnDict],
    diffby: Union[str, ColumnList, ColumnDict],
    original_index: bool = True,
):
    """Get all pairs with given params."""
    sameby, diffby = self._normalize_sameby_diffby(sameby, diffby)
    sameby, diffby = self._validate_inputs(sameby, diffby)

    if not sameby["all"] and not sameby["any"]:
        return self._no_sameby(diffby)

    pairs = dict()
    if sameby["all"]:
        pairs = self._sameby_all(sameby, diffby)

    if sameby["any"]:
        pairs = self._sameby_any(sameby, diffby, pairs)

    if original_index and self.original_index is not None:
        return self._get_original_index(pairs)

    return pairs

integers(min_val, max_val)

Get a random integer value between the specified range.

Source code in src/copairs/matching.py
142
143
144
def integers(self, min_val, max_val):
    """Get a random integer value between the specified range."""
    return int(self.rand_next() * (max_val - min_val + 1) + min_val)

rand_next()

Get next value from the precomputed value.

Source code in src/copairs/matching.py
132
133
134
135
136
137
138
139
140
def rand_next(self):
    """Get next value from the precomputed value."""
    try:
        value = next(self.rand_iter)
    except StopIteration:
        rands = self.rng.uniform(size=int(1e6))
        self.rand_iter = iter(rands)
        value = next(self.rand_iter)
    return value

sample_null_pair(diffby, n_tries=5)

Sample pairs from the data. It tries multiple times before raising an error.

Source code in src/copairs/matching.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def sample_null_pair(self, diffby: ColumnList, n_tries=5):
    """Sample pairs from the data. It tries multiple times before raising an error."""
    if isinstance(diffby, dict):
        diffby_all, diffby_any = diffby.get("all", []), diffby.get("any", [])
        if len(diffby_any) == 1:
            raise ValueError("diffby: any should have more than one column")
    else:
        diffby_all = [diffby] if isinstance(diffby, str) else diffby
        diffby_any = []

    for _ in range(n_tries):
        try:
            return self._null_sample(diffby_all, diffby_any)
        except UnpairedException:
            pass
    raise ValueError("Number of tries exhusted. Could not find a valid pair")

MatcherMultilabel

Class to get pair of rows given contraints in the columns.

Support one multilabel column.

Source code in src/copairs/matching.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
class MatcherMultilabel:
    """
    Class to get pair of rows given contraints in the columns.

    Support one multilabel column.
    """

    def __init__(
        self, dframe: pd.DataFrame, columns: ColumnList, multilabel_col: str, seed: int
    ):
        self.multilabel_col = multilabel_col
        self.size = dframe.shape[0]
        self.multilabel_set = dframe[multilabel_col].apply(set)
        dframe = dframe.explode(multilabel_col)
        dframe = dframe.reset_index(names="__original_index")
        self.original_index = dframe["__original_index"]
        self.matcher = Matcher(dframe, columns, seed)

    def get_all_pairs(self, sameby: Union[str, ColumnList], diffby: ColumnList):
        """Get all pairs with given params."""
        diffby_multi = self.multilabel_col in diffby
        if diffby_multi:
            # Multilabel in diffby must be 'ALL' instead of 'ANY'
            # Doing this filter afterwards
            diffby = [col for col in diffby if self.multilabel_col != col]
        if not diffby and not sameby and diffby_multi:
            return self._only_diffby_multi()
        pairs = self.matcher.get_all_pairs(sameby, diffby)
        for key, values in pairs.items():
            values = np.asarray(values)
            # Map to original_index
            values[:, 0] = self.original_index[values[:, 0]]
            values[:, 1] = self.original_index[values[:, 1]]

            # Check all of the values in the multilabel_col are different
            if diffby_multi:
                labels_a = self.multilabel_set.iloc[values[:, 0]]
                labels_b = self.multilabel_set.iloc[values[:, 1]]
                valid = [len(a & b) == 0 for a, b in zip(labels_a, labels_b)]
                values = values[valid]
            pairs[key] = list(zip(*values.T))
        return pairs

    def sample_null_pair(self, diffby: ColumnList, n_tries=5):
        """Sample pairs from the data. It tries multiple times before raising an error."""
        null_pair = self.matcher.sample_null_pair(diffby, n_tries)
        id1, id2 = self.original_index[list(null_pair)].values
        return id1, id2

    def get_null_pairs(
        self,
        diffby: ColumnList,
        size: int,
        n_tries=5,
        progress_bar: bool = True,
    ):
        """Sample multiple null pairs at the same time."""
        null_pairs = []

        iterator = range(size)
        if progress_bar:
            from tqdm.auto import tqdm

            iterator = tqdm(iterator)

        for _ in iterator:
            null_pairs.append(self.matcher.sample_null_pair(diffby, n_tries))
        null_pairs = np.array(null_pairs)
        null_pairs[:, 0] = self.original_index[null_pairs[:, 0]].values
        null_pairs[:, 1] = self.original_index[null_pairs[:, 1]].values
        return null_pairs

    def _only_diffby_multi(self):
        """Process special case when it is filter only by the diffby=multilabel_col."""
        pairs = self.get_all_pairs(self.multilabel_col, [])
        pairs = itertools.chain.from_iterable(pairs.values())
        pairs = set(map(frozenset, pairs))
        all_pairs = itertools.combinations(range(self.size), 2)

        def filter_fn(x):
            return set(x) not in pairs

        return {None: list(filter(filter_fn, all_pairs))}

get_all_pairs(sameby, diffby)

Get all pairs with given params.

Source code in src/copairs/matching.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def get_all_pairs(self, sameby: Union[str, ColumnList], diffby: ColumnList):
    """Get all pairs with given params."""
    diffby_multi = self.multilabel_col in diffby
    if diffby_multi:
        # Multilabel in diffby must be 'ALL' instead of 'ANY'
        # Doing this filter afterwards
        diffby = [col for col in diffby if self.multilabel_col != col]
    if not diffby and not sameby and diffby_multi:
        return self._only_diffby_multi()
    pairs = self.matcher.get_all_pairs(sameby, diffby)
    for key, values in pairs.items():
        values = np.asarray(values)
        # Map to original_index
        values[:, 0] = self.original_index[values[:, 0]]
        values[:, 1] = self.original_index[values[:, 1]]

        # Check all of the values in the multilabel_col are different
        if diffby_multi:
            labels_a = self.multilabel_set.iloc[values[:, 0]]
            labels_b = self.multilabel_set.iloc[values[:, 1]]
            valid = [len(a & b) == 0 for a, b in zip(labels_a, labels_b)]
            values = values[valid]
        pairs[key] = list(zip(*values.T))
    return pairs

get_null_pairs(diffby, size, n_tries=5, progress_bar=True)

Sample multiple null pairs at the same time.

Source code in src/copairs/matching.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def get_null_pairs(
    self,
    diffby: ColumnList,
    size: int,
    n_tries=5,
    progress_bar: bool = True,
):
    """Sample multiple null pairs at the same time."""
    null_pairs = []

    iterator = range(size)
    if progress_bar:
        from tqdm.auto import tqdm

        iterator = tqdm(iterator)

    for _ in iterator:
        null_pairs.append(self.matcher.sample_null_pair(diffby, n_tries))
    null_pairs = np.array(null_pairs)
    null_pairs[:, 0] = self.original_index[null_pairs[:, 0]].values
    null_pairs[:, 1] = self.original_index[null_pairs[:, 1]].values
    return null_pairs

sample_null_pair(diffby, n_tries=5)

Sample pairs from the data. It tries multiple times before raising an error.

Source code in src/copairs/matching.py
442
443
444
445
446
def sample_null_pair(self, diffby: ColumnList, n_tries=5):
    """Sample pairs from the data. It tries multiple times before raising an error."""
    null_pair = self.matcher.sample_null_pair(diffby, n_tries)
    id1, id2 = self.original_index[list(null_pair)].values
    return id1, id2

UnpairedException

Bases: Exception

Exception raised when a row can not be paired with any other row in the data.

Source code in src/copairs/matching.py
65
66
class UnpairedException(Exception):
    """Exception raised when a row can not be paired with any other row in the data."""

assign_reference_index(df, condition, reference_col='Metadata_Reference_Index', default_value=-1, inplace=False)

Assign reference index to a specified column based on a given condition.

Source code in src/copairs/matching.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def assign_reference_index(
    df: pd.DataFrame,
    condition: Union[str, pd.Index],
    reference_col: str = "Metadata_Reference_Index",
    default_value: int = -1,
    inplace: bool = False,
):
    """Assign reference index to a specified column based on a given condition."""
    if not inplace:
        df = df.copy()
    df[reference_col] = default_value
    if isinstance(condition, str):
        condition = df.query(condition).index
    df.loc[condition, reference_col] = condition
    return df if not inplace else None

dict_to_dframe(dict_pairs, sameby)

Convert the Matcher.get_all_pairs output to pd.DataFrame.

Source code in src/copairs/matching.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def dict_to_dframe(dict_pairs, sameby: Union[str, list]):
    """Convert the Matcher.get_all_pairs output to pd.DataFrame."""
    if not dict_pairs:
        raise ValueError("dict_pairs empty")
    keys = np.array(list(dict_pairs.keys()))
    counts = [len(pairs) for pairs in dict_pairs.values()]
    keys = np.repeat(keys, counts, axis=0)

    if keys.ndim > 1:
        # is a ComposedKey
        keys_df = pd.DataFrame(keys)  # , columns=sameby)
    else:
        if isinstance(sameby, list):
            sameby = sameby[0]
        keys_df = pd.DataFrame({sameby: keys})

    # Concat all pairs
    pairs_ix = itertools.chain.from_iterable(dict_pairs.values())
    pairs_df = pd.DataFrame(pairs_ix, columns=["ix1", "ix2"])
    return pd.concat([keys_df, pairs_df], axis=1)

find_pairs(dframe, sameby, diffby, rev=False)

Find the indices pairs sharing values in sameby columns but not on diffby columns.

If rev is True sameby and diffby are swapped.

Source code in src/copairs/matching.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def find_pairs(
    dframe: Union[pd.DataFrame, duckdb.DuckDBPyRelation],
    sameby: Union[str, ColumnList],
    diffby: Union[str, ColumnList],
    rev: bool = False,
) -> np.ndarray:
    """Find the indices pairs sharing values in `sameby` columns but not on `diffby` columns.

    If `rev`  is True sameby and diffby are swapped.
    """
    sameby, diffby = _validate(sameby, diffby)

    if len(set(sameby).intersection(diffby)):
        raise ValueError("sameby and diffby must be disjoint lists")

    df = dframe
    if isinstance(df, pd.DataFrame):
        df = dframe.reset_index()
    with duckdb.connect(":memory:"):
        # If rev is True, diffby and sameby are swapped
        group_1, group_2 = [
            [f"{('', 'NOT')[i - rev]} A.{x} = B.{x}" for x in y]
            for i, y in enumerate((sameby, diffby))
        ]
        string = (
            f"SELECT A.index,B.index"
            " FROM df A"
            " JOIN df B"
            " ON A.index < B.index"  #  Ensures only one of (a,b)/(b,a) and no (a,a)
            f" AND {' AND '.join((*group_1, *group_2))}"
        )
        index_d = duckdb.sql(string).fetchnumpy()

        result = np.array((index_d["index"], index_d["index_1"]), dtype=np.uint32).T
        return result

find_pairs_multilabel(dframe, sameby, diffby, multilabel_col)

Find pairs of rows in a DataFrame that have the same or different values in certain columns.

The function takes into account columns with multiple labels (i.e., a list of identifiers).

Parameters:

  • dframe (Union[DataFrame, DuckDBPyRelation]) –

    Input DataFrame.

  • sameby (Union[str, ColumnList]) –

    List of column names to consider for finding identical values.

  • diffby (Union[str, ColumnList]) –

    List of column names to consider for finding different values.

  • multilabel_col (str) –

    Name of the column containing multiple labels.

Returns:

  • ndarray

    Array of pairs of indices with matching or non-matching values in the specified columns.

Notes

The function asserts that multilabel_col is present in either sameby or diffby.

Source code in src/copairs/matching.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
def find_pairs_multilabel(
    dframe: Union[pd.DataFrame, duckdb.DuckDBPyRelation],
    sameby: Union[str, ColumnList],
    diffby: Union[str, ColumnList],
    multilabel_col: str,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
    """
    Find pairs of rows in a DataFrame that have the same or different values in certain columns.

    The function takes into account columns with multiple labels (i.e., a list of identifiers).

    Parameters
    ----------
    dframe : Union[pd.DataFrame, duckdb.DuckDBPyRelation]
        Input DataFrame.
    sameby : Union[str, ColumnList]
        List of column names to consider for finding identical values.
    diffby : Union[str, ColumnList]
        List of column names to consider for finding different values.
    multilabel_col : str
        Name of the column containing multiple labels.

    Returns
    -------
    np.ndarray
        Array of pairs of indices with matching or non-matching values in the specified columns.

    Notes
    -----
    The function asserts that `multilabel_col` is present in either `sameby` or `diffby`.
    """
    sameby, diffby = _validate(sameby, diffby)
    sameby = list(sameby)
    diffby = list(diffby)

    assert (multilabel_col in sameby) or (multilabel_col in diffby), (
        f"Missing {multilabel_col} in sameby and diffby"
    )

    df = dframe.reset_index()

    if multilabel_col in sameby:
        sameby = copy(sameby)
        sameby.remove(multilabel_col)
        shared_item = ""
    else:
        diffby = copy(diffby)
        diffby.remove(multilabel_col)
        shared_item = "NOT"

    with duckdb.connect(":memory:"):
        result = duckdb.sql(
            "SELECT * "
            " FROM (SELECT *,"
            f"list_intersect(A.{multilabel_col},B.{multilabel_col}) AS shared_items"
            " FROM df A JOIN df B ON A.index < B.index)"
            f" WHERE {shared_item} len(shared_items) > 0"
        )
        if len(sameby) or len(diffby):
            monolabel_result = find_pairs(df, sameby, diffby).T
            result = duckdb.sql(
                f"SELECT *"
                " FROM result A JOIN monolabel_result B"
                " ON A.index = B.column0"
                " AND A.index_1 = B.column1"
            )

        if shared_item == "":  # If multilabel_col is in sameby
            counts_col = "_c"

            # We assign a pair if any of the other items in the list is a pair too
            unnested = duckdb.sql(
                "SELECT *,UNNEST(shared_items) AS matched_item FROM result"
            )
            string = (
                "SELECT * FROM unnested A"
                " NATURAL JOIN (SELECT matched_item,COUNT(matched_item)"
                f" AS {counts_col} FROM unnested GROUP BY matched_item) B"
            )
            results = duckdb.sql(string)

            # Sort them to match the original implementation
            results = duckdb.sql("SELECT * FROM results ORDER BY matched_item")

            # Sorted pairs of indices (we select to reduce memory footprint)
            pairs = duckdb.sql("SELECT index,index_1 FROM results")
            pairs_np = pairs.fetchnumpy()

            # Keys are the items inside multilabel col
            # Counts are the number of occurrences of each one
            # It is important to sort again!
            keys_counts = duckdb.sql(
                f"SELECT distinct matched_item,{counts_col} FROM results ORDER BY matched_item"
            )
            keys_counts_np = keys_counts.fetchnumpy()

            result = (
                np.array(
                    [pairs_np[f"index{k}"] for k in ("", "_1")], dtype=np.uint32
                ).T,
                *[keys_counts_np[k] for k in ("matched_item", counts_col)],
            )
        else:  # if multilabel_col is in diffby return only the index
            index_d = result.fetchnumpy()
            result = np.array(
                [index_d[k] for k in ("index", "index_1")], dtype=np.uint32
            ).T

    return result

reverse_index(col)

Build a reverse_index for a given column in the DataFrame.

Source code in src/copairs/matching.py
38
39
40
def reverse_index(col: pd.Series) -> pd.Series:
    """Build a reverse_index for a given column in the DataFrame."""
    return pd.Series(col.groupby(col, observed=True).indices, name=col.name)