API Reference

reddwarf.implementations.base

reddwarf.implementations.polis.run_pipeline(**kwargs)

Source code in reddwarf/implementations/polis.py
10
11
12
13
14
15
16
def run_pipeline(**kwargs) -> base.PolisClusteringResult:
    kwargs = {
        "reducer": "pca",
        "clusterer": "kmeans",
        **kwargs,
    }
    return base.run_pipeline(**kwargs)

reddwarf.implementations.polis

reddwarf.implementations.polis.run_pipeline(**kwargs)

Source code in reddwarf/implementations/polis.py
10
11
12
13
14
15
16
def run_pipeline(**kwargs) -> base.PolisClusteringResult:
    kwargs = {
        "reducer": "pca",
        "clusterer": "kmeans",
        **kwargs,
    }
    return base.run_pipeline(**kwargs)

reddwarf.sklearn

Various custom Scikit-Learn estimators to mimick aspects of Polis, suitable for use in Scikit-Learn workflows, pipelines, and APIs.

reddwarf.sklearn.cluster.PolisKMeans

Bases: KMeans

A modified version of scikit-learn's KMeans that allows partial initialization with user-supplied cluster centers and custom fallback strategies.

This subclass extends sklearn.cluster.KMeans with additional features around centroid initialization. Outside the behavior documented, it retains all other parameters and behavior from the base KMeans implementation.

Parameters:
  • init (('k-means++', random, polis), default: 'k-means++' ) –

    Strategy to initialize any missing cluster centers if init_centers is not fully specified. The strategies are:

    • 'k-means++': Smart centroid initialization (same as scikit-learn default)
    • 'random': Random selection of initial centers from the data (same as scikit-learn)
    • 'polis': Selects the first unique data points in X as initial centers.
      • This strategy is deterministic for any stable set of X, while determinism in the other strategies depends on random_state.

    Note

    Unlike KMeans parent class, we prevent passing ndarray args here, and expect init_centers to handle that use-case.

  • init_centers (ndarray of shape (n_clusters, n_features), default: None ) –

    Initial cluster centers to use. May contain fewer (or more) than n_clusters:

    • If more, the extras will be trimmed
    • If fewer, the remaining will be filled using the init strategy
Attributes:
  • init_centers_used_ (ndarray of shape (n_clusters, n_features)) –

    The full array of initial cluster centers actually used to initialize the algorithm, including both init_centers and any centers generated from the init strategy.

See Also

sklearn.cluster.KMeans : Original implementation with full parameter list.

Source code in reddwarf/sklearn/cluster.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class PolisKMeans(KMeans):
    """
    A modified version of scikit-learn's KMeans that allows partial initialization
    with user-supplied cluster centers and custom fallback strategies.

    This subclass extends `sklearn.cluster.KMeans` with additional features
    around centroid initialization. Outside the behavior documented, it retains
    all other parameters and behavior from the base KMeans implementation.

    Parameters
    ----------

    init : {'k-means++', 'random', 'polis'}, default='k-means++'
        Strategy to initialize any missing cluster centers if `init_centers` is
        not fully specified. The strategies are:

        - 'k-means++': Smart centroid initialization (same as scikit-learn default)
        - 'random': Random selection of initial centers from the data (same as scikit-learn)
        - 'polis': Selects the first unique data points in `X` as initial centers.
            - This strategy is deterministic for any stable set of `X`, while
            determinism in the other strategies depends on `random_state`.

        !!! note
            Unlike `KMeans` parent class, we prevent passing `ndarray` args
            here, and expect `init_centers` to handle that use-case.

    init_centers : ndarray of shape (n_clusters, n_features), optional
        Initial cluster centers to use. May contain fewer (or more) than `n_clusters`:

        - If more, the extras will be trimmed
        - If fewer, the remaining will be filled using the `init` strategy

    Attributes
    ----------

    init_centers_used_ : ndarray of shape (n_clusters, n_features)
        The full array of initial cluster centers actually used to initialize the algorithm,
        including both `init_centers` and any centers generated from the `init` strategy.

    See Also
    --------

    `sklearn.cluster.KMeans` : Original implementation with full parameter list.
    """
    def __init__(
        self,
        n_clusters=8,
        init="k-means++",  # or 'random', 'polis'
        init_centers: Optional[ArrayLike] = None,  # array-like, optional
        n_init="auto",
        max_iter=300,
        tol=1e-4,
        verbose=0,
        random_state=None,
        copy_x=True,
        algorithm="lloyd",
    ):
        super().__init__(
            n_clusters=n_clusters,
            init=init,  # will override via set_params, with our center selection logic below
            n_init=n_init,
            max_iter=max_iter,
            tol=tol,
            verbose=verbose,
            random_state=random_state,
            copy_x=copy_x,
            algorithm=algorithm,
        )
        self._init_strategy = init
        self.init_centers = init_centers
        self.init_centers_used_ = None

    def _generate_centers(self, X, x_squared_norms, n_to_generate, random_state):
        if not isinstance(self._init_strategy, str):
            raise ValueError("Internal error: _strategy must be a string.")

        if self._init_strategy == "k-means++":
            centers, _ = kmeans_plusplus(
                X, n_clusters=n_to_generate,
                random_state=random_state,
                x_squared_norms=x_squared_norms
            )
        elif self._init_strategy == "random":
            indices = random_state.choice(X.shape[0], n_to_generate, replace=False)
            centers = X[indices]
        elif self._init_strategy == "polis":
            unique_X = np.unique(X, axis=0)
            if len(unique_X) < n_to_generate:
                raise ValueError("Not enough unique rows in X for 'polis' strategy.")
            centers = unique_X[:n_to_generate]
        else:
            raise ValueError(f"Unsupported init strategy: {self._init_strategy}")
        return centers

    def fit(self, X, y=None, sample_weight=None):
        X = check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) # type:ignore
        random_state = check_random_state(self.random_state)
        x_squared_norms = np.sum(X ** 2, axis=1)

        # Determine init_centers_used_
        if self.init_centers is not None:
            init_array = np.array(self.init_centers)
            if init_array.ndim != 2 or init_array.shape[1] != X.shape[1]:
                raise ValueError("init_centers must be of shape (n, n_features)")

            n_given = init_array.shape[0]
            if n_given > self.n_clusters:
                init_array = init_array[:self.n_clusters]
            elif n_given < self.n_clusters:
                needed = self.n_clusters - n_given
                extra = self._generate_centers(X, x_squared_norms, needed, random_state)
                init_array = np.vstack([init_array, extra])
            self.init_centers_used_ = init_array.copy()
        else:
            self.init_centers_used_ = self._generate_centers(
                X, x_squared_norms, self.n_clusters, random_state
            )

        # Override the init param passed to sklearn with actual centers.
        # We take control of the initialization strategy (`k-means++`, `random`,
        # `polis`, etc) in our own code.
        super().set_params(init=self.init_centers_used_)

        return super().fit(X, y=y, sample_weight=sample_weight)

reddwarf.sklearn.cluster.PolisKMeansDownsampler

Bases: BaseEstimator, TransformerMixin

A transformer that fits PolisKMeans and returns the cluster centers as the downsampled dataset.

This will support mimicking "base clusters" from the Polis platform.

This enables use in sklearn pipelines, where intermediate steps are expected to implement both fit and transform.

Source code in reddwarf/sklearn/cluster.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class PolisKMeansDownsampler(BaseEstimator, TransformerMixin):
    """
    A transformer that fits `PolisKMeans` and returns the cluster centers as the
    downsampled dataset.

    This will support mimicking "base clusters" from the Polis platform.

    This enables use in sklearn pipelines, where intermediate steps
    are expected to implement both `fit` and `transform`.
    """
    def __init__(self,
        n_clusters=100,
        random_state=None,
        init="k-means++",
        init_centers=None,
    ):
        self.n_clusters = n_clusters
        self.random_state = random_state
        self.init = init
        self.init_centers = init_centers
        self.kmeans_ = None

    def fit(self, X, y=None):
        self.kmeans_ = PolisKMeans(
            n_clusters=self.n_clusters,
            random_state=self.random_state,
            init=self.init,
            init_centers=self.init_centers,
        )
        self.kmeans_.fit(X)
        return self

    def transform(self, X, y=None):
        return self.kmeans_.cluster_centers_ if self.kmeans_ else None

reddwarf.sklearn.model_selection.GridSearchNonCV

Bases: GridSearchCV

sklearn.model_selection.GridSearchCV, but modified to score against the full dataset (ie. not cross-validated).

Normally, GridSearchCV splits up the X data and scores each "fold" of data. This is identical, but we automatically use the full dataset in each fold.

Source code in reddwarf/sklearn/model_selection.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class GridSearchNonCV(GridSearchCV):
    """
    `sklearn.model_selection.GridSearchCV`, but modified to score against the
    full dataset (ie. not cross-validated).

    Normally, `GridSearchCV` splits up the `X` data and scores each "fold" of data.
    This is identical, but we automatically use the full dataset in each fold.
    """
    def __init__(self, estimator, param_grid, scoring=None, refit=True, **kwargs):
        # Default CV is a single fold: train = test = full dataset
        self._default_cv = None  # we'll set it in fit() when we have data size

        # User can override cv via kwargs
        self._user_provided_cv = 'cv' in kwargs
        super().__init__(
            estimator=estimator,
            param_grid=param_grid,
            scoring=scoring,
            refit=refit,
            cv=None,  # will be overwritten in fit()
            **kwargs
        )

    def fit(self, X, y=None, **fit_params):
        if not self._user_provided_cv:
            # Create full-fold cross-validation only if user didn’t specify their own.
            # This scores the full dataset for each branch of grid.
            # See: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html#:~:text=An%20iterable%20yielding%20(train%2C%20test)%20splits%20as%20arrays%20of%20indices.
            n_samples = len(X)
            full_idx = np.arange(n_samples)
            train_idx = full_idx
            test_idx = full_idx
            # Use full dataset for training/testing
            full_fold = [(train_idx, test_idx)]
            self.cv = full_fold
        return super().fit(X, y, **fit_params)

reddwarf.sklearn.transformers.SparsityAwareScaler

Bases: BaseEstimator, TransformerMixin

Scale projected points (participant or statements) based on sparsity of vote matrix, to account for any small number of votes by a participant and prevent those participants from bunching up in the center.

Attributes:
  • capture_step (str | int | None) –

    Name or index of the capture step in the pipeline.

  • X_sparse (ndarray | None) –

    A sparse array with shape (n_features,)

Source code in reddwarf/sklearn/transformers.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class SparsityAwareScaler(BaseEstimator, TransformerMixin):
    """
    Scale projected points (participant or statements) based on sparsity of vote
    matrix, to account for any small number of votes by a participant and
    prevent those participants from bunching up in the center.

    Attributes:
        capture_step (str | int | None): Name or index of the capture step in the pipeline.
        X_sparse (np.ndarray | None): A sparse array with shape (n_features,)
    """
    def __init__(self, capture_step: Optional[str | int] = None, X_sparse: Optional[Array1D | Array2D] = None):
        self.capture_step = capture_step
        self.X_sparse = X_sparse

    # See: https://scikit-learn.org/stable/modules/generated/sklearn.utils.Tags.html#sklearn.utils.Tags
    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        # Suppresses warning caused by fit() not being required before usage in transform().
        tags.requires_fit = False
        return tags

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        scaling_factors = self._calculate_scaling_factors()
        return X * scaling_factors[:, np.newaxis]

    def inverse_transform(self, X):
        scaling_factors = self._calculate_scaling_factors()
        return X / scaling_factors[:, np.newaxis]


    def _get_pipeline_step(self, step):
        """
        Fetch the parent pipeline when available via PatchedPipeline usage.
        """
        parent = getattr(self, "_parent_pipeline", None)
        if parent is None:
            raise RuntimeError(
                f"{self.__class__.__name__} cannot resolve `capture_step={step}` "
                "because it is not being used inside a `PatchedPipeline`. "
                "Either use a `PatchedPipeline` or pass `X_sparse` directly."
            )
        if isinstance(step, str):
            return parent.named_steps[step]
        elif isinstance(step, int):
            return parent.steps[step][1]
        else:
            raise ValueError("`capture_step` must be a string (name) or int (index).")

    def _resolve_X_sparse(self):
        """
        Resolve X_sparse (a sparse vote matrix) from argument or prior capture step.
        """
        if self.X_sparse is not None:
            return self.X_sparse

        capture = self._get_pipeline_step(self.capture_step)
        if not hasattr(capture, "X_captured_"):
            raise AttributeError(
                f"Step '{self.capture_step}' does not contain `.X_captured_`. "
                f"Did you run `fit/transform` on the pipeline?"
            )
        return capture.X_captured_

    def _calculate_scaling_factors(self):
        X_sparse = self._resolve_X_sparse()
        return calculate_scaling_factors(X_sparse=X_sparse)

reddwarf.utils.matrix

reddwarf.utils.matrix.generate_raw_matrix(votes, cutoff=None)

Generates a raw vote matrix from a list of vote records.

See filter_votes method for details of cutoff arg.

Parameters:
  • votes (List[Dict]) –

    An unsorted list of vote records, where each record is a dictionary containing:

    • "participant_id": The ID of the voter.
    • "statement_id": The ID of the statement being voted on.
    • "vote": The recorded vote value.
    • "modified": A unix timestamp object representing when the vote was made.
  • cutoff (int, default: None ) –

    A cutoff unix timestamp (ms) or index position in date-sorted votes list.

Returns:
  • raw_matrix( DataFrame ) –

    A full raw vote matrix DataFrame with NaN values where:

    1. rows are voters,
    2. columns are statements, and
    3. values are votes.

    This includes even voters that have no votes, and statements on which no votes were placed.

Source code in reddwarf/utils/matrix.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def generate_raw_matrix(
        votes: List[Dict],
        cutoff: Optional[int] = None,
) -> VoteMatrix:
    """
    Generates a raw vote matrix from a list of vote records.

    See `filter_votes` method for details of `cutoff` arg.

    Args:
        votes (List[Dict]): An unsorted list of vote records, where each record is a dictionary containing:

            - "participant_id": The ID of the voter.
            - "statement_id": The ID of the statement being voted on.
            - "vote": The recorded vote value.
            - "modified": A unix timestamp object representing when the vote was made.

        cutoff (int): A cutoff unix timestamp (ms) or index position in date-sorted votes list.

    Returns:
        raw_matrix (pd.DataFrame): A full raw vote matrix DataFrame with NaN values where:

            1. rows are voters,
            2. columns are statements, and
            3. values are votes.

            This includes even voters that have no votes, and statements on which no votes were placed.
    """
    if cutoff:
        votes = filter_votes(votes=votes, cutoff=cutoff)

    raw_matrix = pd.DataFrame.from_dict(votes)
    raw_matrix = raw_matrix.pivot(
        values="vote",
        index="participant_id",
        columns="statement_id",
    )

    return raw_matrix

reddwarf.utils.matrix.simple_filter_matrix(vote_matrix, mod_out_statement_ids=[])

The simple filter on the vote_matrix that is used by Polis prior to running PCA.

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix (with missing values)

  • mod_out_statement_ids (list, default: [] ) –

    A list of moderated-out participant IDs to zero out.

Returns:
  • VoteMatrix( VoteMatrix ) –

    Copy of vote_matrix with statements zero'd out

Source code in reddwarf/utils/matrix.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def simple_filter_matrix(
        vote_matrix: VoteMatrix,
        mod_out_statement_ids: list[int] = [],
) -> VoteMatrix:
    """
    The simple filter on the vote_matrix that is used by Polis prior to running PCA.

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix (with missing values)
        mod_out_statement_ids (list): A list of moderated-out participant IDs to zero out.

    Returns:
        VoteMatrix: Copy of vote_matrix with statements zero'd out
    """
    vote_matrix = vote_matrix.copy()
    for tid in mod_out_statement_ids:
        # Zero out column only if already exists (ie. has votes)
        if tid in vote_matrix.columns:
            # TODO: Add a flag to try np.nan instead of zero.
            vote_matrix.loc[:, tid] = 0

    return vote_matrix

reddwarf.utils.matrix.get_clusterable_participant_ids(vote_matrix, vote_threshold)

Find participant IDs that meet a vote threshold in a vote_matrix.

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix (with missing values)

  • vote_threshold (int) –

    Vote threshold that each participant must meet

Returns:
  • participation_ids( list ) –

    A list of participant IDs that meet the threshold

Source code in reddwarf/utils/matrix.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_clusterable_participant_ids(vote_matrix: VoteMatrix, vote_threshold: int) -> list:
    """
    Find participant IDs that meet a vote threshold in a vote_matrix.

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix (with missing values)
        vote_threshold (int): Vote threshold that each participant must meet

    Returns:
        participation_ids (list): A list of participant IDs that meet the threshold
    """
    # TODO: Make this available outside this function? To match polismath output.
    user_vote_counts = vote_matrix.count(axis="columns")
    participant_ids = list(vote_matrix[user_vote_counts >= vote_threshold].index)
    return participant_ids

reddwarf.utils.reducer

reddwarf.utils.reducer.base.run_reducer(vote_matrix, reducer='pca', n_components=2, **reducer_kwargs)

Process a prepared vote matrix to be imputed and return participant and (optionally) statement data, projected into reduced n-dimensional space.

The vote matrix should not yet be imputed, as this will happen within the method.

Parameters:
  • vote_matrix (NDArray) –

    A vote matrix of data. Non-imputed values are expected.

  • n_components (int, default: 2 ) –

    Number n of principal components to decompose the vote_matrix into.

  • reducer (Literal['pca', 'pacmap', 'localmap'], default: 'pca' ) –

    Dimensionality reduction method to use.

Returns:
  • X_participants( NDArray ) –

    A numpy array with n-d coordinates for each projected row/participant.

  • X_statements( Optional[NDArray] ) –

    A numpy array with n-d coordinates for each projected col/statement.

  • reducer_model( ReducerModel ) –

    The fitted dimensional reduction sci-kit learn estimator.

Source code in reddwarf/utils/reducer/base.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def run_reducer(
    vote_matrix: NDArray,
    reducer: ReducerType = "pca",
    n_components: int = 2,
    **reducer_kwargs,
) -> Tuple[NDArray, Optional[NDArray], ReducerModel]:
    """
    Process a prepared vote matrix to be imputed and return participant and (optionally) statement data,
    projected into reduced n-dimensional space.

    The vote matrix should not yet be imputed, as this will happen within the method.

    Args:
        vote_matrix (NDArray): A vote matrix of data. Non-imputed values are expected.
        n_components (int): Number n of principal components to decompose the `vote_matrix` into.
        reducer (Literal["pca", "pacmap", "localmap"]): Dimensionality reduction method to use.

    Returns:
        X_participants (NDArray): A numpy array with n-d coordinates for each projected row/participant.
        X_statements (Optional[NDArray]): A numpy array with n-d coordinates for each projected col/statement.
        reducer_model (ReducerModel): The fitted dimensional reduction sci-kit learn estimator.
    """
    match reducer:
        case "pca":
            pipeline = PatchedPipeline(
                [
                    ("capture", SparsityAwareCapturer()),
                    ("impute", SimpleImputer(missing_values=np.nan, strategy="mean")),
                    ("reduce", get_reducer(reducer, n_components=n_components, **reducer_kwargs)),
                    ("scale", SparsityAwareScaler(capture_step="capture")),
                ]
            )
        case "pacmap" | "localmap":
            pipeline = PatchedPipeline(
                [
                    ("impute", SimpleImputer(missing_values=np.nan, strategy="mean")),
                    ("reduce", get_reducer(reducer, n_components=n_components, **reducer_kwargs)),
                ]
            )

    # Generate projections of participants.
    X_participants = pipeline.fit_transform(vote_matrix)

    if reducer == "pca":
        # Generate projections of statements via virtual vote matrix.
        # This projects unit vectors for each feature/statement into PCA space to
        # understand their placement.
        num_cols = vote_matrix.shape[1]
        n_statements = num_cols
        virtual_vote_matrix = generate_virtual_vote_matrix(n_statements)
        X_statements = pipeline.transform(virtual_vote_matrix)
    else:
        X_statements = None

    reducer_model: ReducerModel = pipeline.named_steps["reduce"]

    return X_participants, X_statements, reducer_model

reddwarf.utils.reducer.base.get_reducer(reducer='pca', n_components=2, random_state=None, **reducer_kwargs)

Source code in reddwarf/utils/reducer/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def get_reducer(
    reducer: ReducerType = "pca",
    n_components: int = 2,
    random_state: Optional[int] = None,
    **reducer_kwargs,
) -> ReducerModel:
    # Setting n_neighbors to None defaults to 10 below 10,000 samples, and
    # slowly increases it according to a formula beyond that.
    # See: https://github.com/YingfanWang/PaCMAP?tab=readme-ov-file#parameters
    DEFAULT_N_NEIGHBORS = None
    match reducer:
        case "pacmap" | "localmap":
            from pacmap import PaCMAP, LocalMAP

            # Override with default if not set
            n_neighbors = reducer_kwargs.pop("n_neighbors", DEFAULT_N_NEIGHBORS)

            ReducerCls = PaCMAP if reducer == "pacmap" else LocalMAP
            return ReducerCls(
                n_components=n_components,
                random_state=random_state,
                n_neighbors=n_neighbors,  # type:ignore
                **reducer_kwargs,
            )
        case "pca" | _:
            from sklearn.decomposition import PCA

            return PCA(
                n_components=n_components,
                random_state=random_state,
                **reducer_kwargs,
            )

reddwarf.utils.clusterer

reddwarf.utils.clusterer.base.run_clusterer(X_participants_clusterable, clusterer='kmeans', force_group_count=None, max_group_count=5, init_centers=None, random_state=None, **clusterer_kwargs)

Source code in reddwarf/utils/clusterer/base.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def run_clusterer(
    X_participants_clusterable: NDArray,
    clusterer: ClustererType = "kmeans",
    force_group_count=None,
    max_group_count=5,
    init_centers=None,
    random_state=None,
    **clusterer_kwargs,
) -> Optional[ClustererModel]:
    match clusterer:
        case "kmeans":
            if force_group_count:
                k_bounds = [force_group_count, force_group_count]
            else:
                k_bounds = [2, max_group_count]

            _, _, kmeans = find_best_kmeans(
                X_to_cluster=X_participants_clusterable,
                k_bounds=k_bounds,
                # Force polis strategy of initiating cluster centers. See: PolisKMeans.
                init="polis",
                init_centers=init_centers,
                random_state=random_state,
                # TODO: Support passing in arbitrary clusterer_kwargs.
            )

            return kmeans

        case "hdbscan":
            from sklearn.cluster import HDBSCAN

            hdb = HDBSCAN(**clusterer_kwargs)
            hdb.fit(X_participants_clusterable)

            return hdb
        case _:
            raise NotImplementedError("clusterer type unknown")

reddwarf.utils.clusterer.kmeans.find_best_kmeans(X_to_cluster, k_bounds=[2, 5], init='k-means++', init_centers=None, random_state=None)

Use silhouette scores to find the best number of clusters k to assume to fit the data.

Parameters:
  • X_to_cluster (NDArray) –

    A n-D numpy array.

  • k_bounds (RangeLike, default: [2, 5] ) –

    An upper and low bound on n_clusters to test for. (Default: [2, 5])

  • init_centers (List, default: None ) –

    A list of xy coordinates to use as initial center guesses.

  • random_state (int, default: None ) –

    Determines random number generation for centroid initialization. Use an int to make the randomness deterministic.

Returns:
  • best_k( int ) –

    Ideal number of clusters.

  • best_silhouette_score( float ) –

    Silhouette score for this K value.

  • best_kmeans( PolisKMeans | None ) –

    The optimal fitted estimator returned from PolisKMeans.

Source code in reddwarf/utils/clusterer/kmeans.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def find_best_kmeans(
        X_to_cluster: NDArray,
        k_bounds: RangeLike = [2, 5],
        init="k-means++",
        init_centers: Optional[List] = None,
        random_state: Optional[int] = None,
) -> tuple[int, float, PolisKMeans | None]:
    """
    Use silhouette scores to find the best number of clusters k to assume to fit the data.

    Args:
        X_to_cluster (NDArray): A n-D numpy array.
        k_bounds (RangeLike): An upper and low bound on n_clusters to test for. (Default: [2, 5])
        init_centers (List): A list of xy coordinates to use as initial center guesses.
        random_state (int): Determines random number generation for centroid initialization. Use an int to make the randomness deterministic.

    Returns:
        best_k (int): Ideal number of clusters.
        best_silhouette_score (float): Silhouette score for this K value.
        best_kmeans (PolisKMeans | None): The optimal fitted estimator returned from PolisKMeans.
    """
    param_grid = {
        "n_clusters": to_range(k_bounds),
    }

    def scoring_function(estimator, X):
        labels = estimator.fit_predict(X)
        return silhouette_score(X, labels)

    search = GridSearchNonCV(
        param_grid=param_grid,
        scoring=scoring_function,
        estimator=PolisKMeans(
            init=init, # strategy
            init_centers=init_centers, # guesses
            random_state=random_state,
        ),
    )

    search.fit(X_to_cluster)

    best_k = search.best_params_['n_clusters']
    best_silhouette_score = search.best_score_
    best_kmeans = search.best_estimator_

    return best_k, best_silhouette_score, best_kmeans

reddwarf.utils.consensus

reddwarf.utils.consensus.select_consensus_statements(vote_matrix, mod_out_statement_ids=[], pick_max=5, prob_threshold=0.5, confidence=0.9)

Select consensus statements from a given vote matrix.

Parameters:
  • vote_matrix (VoteMatrix) –

    The full raw vote matrix (not just clusterable participants)

  • mod_out_statement_ids (Optional[list[int]], default: [] ) –

    Statements to ignore from consensus statement selection

  • pick_max (int, default: 5 ) –

    Max number of statements selected per agree/disagree direction

  • prob_threshold (float, default: 0.5 ) –

    The cutoff probability below which statements won't be considered for consensus

  • confidence (float, default: 0.9 ) –

    Percent confidence interval (in decimal), within which selected statements are deemed significant

Returns:
  • ConsensusResult

    A dict of agree/disagree formatted statement dicts.

Source code in reddwarf/utils/consensus.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def select_consensus_statements(
    vote_matrix: VoteMatrix,
    mod_out_statement_ids: list[int] = [],
    pick_max=5,
    prob_threshold: float = 0.5,
    confidence: float = 0.9,
) -> ConsensusResult:
    """
    Select consensus statements from a given vote matrix.

    Args:
        vote_matrix (VoteMatrix): The full raw vote matrix (not just clusterable participants)
        mod_out_statement_ids (Optional[list[int]]): Statements to ignore from consensus statement selection
        pick_max (int): Max number of statements selected per agree/disagree direction
        prob_threshold (float): The cutoff probability below which statements won't be considered for consensus
        confidence (float): Percent confidence interval (in decimal), within which selected statements are deemed significant

    Returns:
        A dict of agree/disagree formatted statement dicts.
    """
    N_g_c, N_v_g_c, P_v_g_c, _, P_v_g_c_test, *_ = calculate_comment_statistics(
        vote_matrix=vote_matrix,
        cluster_labels=None,
    )
    # When no labels provided above, mock group is used.
    MOCK_GID = 0
    df = pd.DataFrame(
        {
            "na": N_v_g_c[votes.A, MOCK_GID, :],
            "nd": N_v_g_c[votes.D, MOCK_GID, :],
            "ns": N_g_c[MOCK_GID, :],
            "pa": P_v_g_c[votes.A, MOCK_GID, :],
            "pd": P_v_g_c[votes.D, MOCK_GID, :],
            "pat": P_v_g_c_test[votes.A, MOCK_GID, :],
            "pdt": P_v_g_c_test[votes.D, MOCK_GID, :],
            # agree metric = pa * pat
            "am": P_v_g_c[votes.A, MOCK_GID, :] * P_v_g_c_test[votes.A, MOCK_GID, :],
            # disagree metric = pd * pdt
            "dm": P_v_g_c[votes.D, MOCK_GID, :] * P_v_g_c_test[votes.D, MOCK_GID, :],
        },
        index=vote_matrix.columns,
    )

    # Optional filtering for mod_out_statement_ids
    if mod_out_statement_ids:
        df = df[~df.index.isin(mod_out_statement_ids)]

    # Agree candidates: pa > threshold and significant
    agree_mask = (df["pa"] > prob_threshold) & df["pat"].apply(
        lambda x: is_significant(x, confidence)
    )
    agree_candidates = df[agree_mask].copy()
    agree_candidates["consensus_agree_rank"] = (
        (-agree_candidates["am"]).rank(method="dense").astype("Int64")
    )

    # Disagree candidates: pd > threshold and significant
    disagree_mask = (df["pd"] > prob_threshold) & df["pdt"].apply(
        lambda x: is_significant(x, confidence)
    )
    disagree_candidates = df[disagree_mask].copy()
    disagree_candidates["consensus_disagree_rank"] = (
        (-disagree_candidates["dm"]).rank(method="dense").astype("Int64")
    )

    # Merge ranks back into full df
    df["consensus_agree_rank"] = agree_candidates["consensus_agree_rank"]
    df["consensus_disagree_rank"] = disagree_candidates["consensus_disagree_rank"]

    # Select top N agree/disagree statements
    if agree_candidates.empty:
        top_agree = []
    else:
        top_agree = [
            # Drop the cons-for key from final output.
            {k: v for k, v in st.items() if k != "cons-for"}
            for st in agree_candidates.sort_values("consensus_agree_rank")
            .head(pick_max)
            .reset_index()
            .apply(format_comment_stats, axis=1)
        ]

    if disagree_candidates.empty:
        top_disagree = []
    else:
        top_disagree = [
            # Drop the cons-for key from final output.
            {k: v for k, v in st.items() if k != "cons-for"}
            for st in disagree_candidates.sort_values("consensus_disagree_rank")
            .head(pick_max)
            .reset_index()
            .apply(format_comment_stats, axis=1)
        ]

    return {
        "agree": top_agree,
        "disagree": top_disagree,
    }

reddwarf.utils.stats

reddwarf.utils.stats.select_representative_statements(grouped_stats_df, mod_out_statement_ids=[], pick_max=5, confidence=0.9)

Selects statistically representative statements from each group cluster.

This is expected to match the Polis outputs when all defaults are set.

Parameters:
  • grouped_stats_df (DataFrame) –

    MultiIndex Dataframe of statement statistics, indexed by group and statement.

  • mod_out_statement_ids (list[int], default: [] ) –

    A list of statements to ignore from selection algorithm

  • pick_max (int, default: 5 ) –

    Max number of statements selected per group

  • confidence (float, default: 0.9 ) –

    Percent confidence interval (in decimal), within which selected statements are deemed significant

Returns:
  • PolisRepness( PolisRepness ) –

    A dict object with lists of statements keyed to groups, matching Polis format.

Source code in reddwarf/utils/stats.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def select_representative_statements(
    grouped_stats_df: pd.DataFrame,
    mod_out_statement_ids: list[int] = [],
    pick_max: int = 5,
    confidence: float = 0.90,
) -> PolisRepness:
    """
    Selects statistically representative statements from each group cluster.

    This is expected to match the Polis outputs when all defaults are set.

    Args:
        grouped_stats_df (pd.DataFrame): MultiIndex Dataframe of statement statistics, indexed by group and statement.
        mod_out_statement_ids (list[int]): A list of statements to ignore from selection algorithm
        pick_max (int): Max number of statements selected per group
        confidence (float): Percent confidence interval (in decimal), within which selected statements are deemed significant

    Returns:
        PolisRepness: A dict object with lists of statements keyed to groups, matching Polis format.
    """
    repness = {}
    # TODO: Should this be done elsewhere? A column in MultiIndex dataframe?
    mod_out_mask = grouped_stats_df.index.get_level_values("statement_id").isin(
        mod_out_statement_ids
    )
    grouped_stats_df = grouped_stats_df[~mod_out_mask]  # type: ignore
    for gid, group_df in grouped_stats_df.groupby(level="group_id"):
        # Bring statement_id into regular column.
        group_df = group_df.reset_index()

        best_agree = None
        # Track the best-agree, to bring to top if exists.
        for _, row in group_df.iterrows():
            if beats_best_of_agrees(row, best_agree, confidence):
                best_agree = row

        sig_filter = lambda row: is_statement_significant(row, confidence)
        sufficient_statements_row_mask = group_df.apply(sig_filter, axis="columns")
        sufficient_statements = group_df[sufficient_statements_row_mask]

        # Track the best, even if doesn't meet sufficient minimum, to have at least one.
        best_overall = None
        if len(sufficient_statements) == 0:
            for _, row in group_df.iterrows():
                if beats_best_by_repness_test(row, best_overall):
                    best_overall = row
        else:
            # Finalize statements into output format.
            # TODO: Figure out how to finalize only at end in output. Change repness_metric?
            sufficient_statements = (
                pd.DataFrame(
                    [
                        format_comment_stats(row)
                        for _, row in sufficient_statements.iterrows()
                    ]
                )
                # Create a column to sort repnress, then remove.
                .assign(repness_metric=repness_metric)
                .sort_values(by="repness_metric", ascending=False)
                .drop(columns="repness_metric")
            )

        if best_agree is not None:
            best_agree = format_comment_stats(best_agree)
            best_agree.update({"n-agree": best_agree["n-success"], "best-agree": True})
            best_head = [best_agree]
        elif best_overall is not None:
            best_overall = format_comment_stats(best_overall)
            best_head = [best_overall]
        else:
            best_head = []

        selected = best_head
        selected = selected + [
            row.to_dict()
            for _, row in sufficient_statements.iterrows()
            if best_head
            # Skip any statements already in best_head
            and best_head[0]["tid"] != row["tid"]
        ]
        selected = selected[:pick_max]
        # Does the work of agrees-before-disagrees sort in polismath, since "a" before "d".
        selected = sorted(selected, key=lambda row: row["repful-for"])
        repness[gid] = selected

    return repness  # type:ignore

reddwarf.utils.stats.calculate_comment_statistics(vote_matrix, cluster_labels=None, pseudo_count=1)

Calculates comparative statement statistics across all votes and groups, using only efficient numpy operations.

Note: when no cluster_labels are supplied, we internally apply the group 0 to each row, and calculated values can be accessed in the first group index.

The representativeness metric is defined as: R_v(g,c) = P_v(g,c) / P_v(~g,c)

Where: - P_v(g,c) is probability of vote v on comment c in group g - P_v(~g,c) is probability of vote v on comment c in all groups except g

And: - N(g,c) is the total number of non-missing votes on comment c in group g - N_v(g,c) is the total number of vote v on comment c in group g

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix

  • cluster_labels (Optional[list[int]], default: None ) –

    An optional list of cluster labels to determine groups.

Returns:
  • N_g_c( ndarray[int] ) –

    numpy matrix with counts of non-missing votes on comments/groups

  • N_v_g_c( ndarray[int] ) –

    numpy matrix with counts of vote types on comments/groups

  • P_v_g_c( ndarray[float] ) –

    numpy matrix with probabilities of vote types on comments/groups

  • R_v_g_c( ndarray[float] ) –

    numpy matrix with representativeness of vote types on comments/groups

  • P_v_g_c_test( ndarray[float] ) –

    test z-scores for probability of votes/comments/groups

  • R_v_g_c_test( ndarray[float] ) –

    test z-scores for representativeness of votes/comments/groups

  • C_v_c( ndarray[float] ) –

    group-aware consensus scores for each statement.

Source code in reddwarf/utils/stats.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def calculate_comment_statistics(
    vote_matrix: VoteMatrix,
    cluster_labels: Optional[list[int] | NDArray[np.integer]] = None,
    pseudo_count: int = 1,
) -> Tuple[
    np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray
]:
    """
    Calculates comparative statement statistics across all votes and groups, using only efficient numpy operations.

    Note: when no cluster_labels are supplied, we internally apply the group `0` to each row,
    and calculated values can be accessed in the first group index.

    The representativeness metric is defined as:
    R_v(g,c) = P_v(g,c) / P_v(~g,c)

    Where:
    - P_v(g,c) is probability of vote v on comment c in group g
    - P_v(~g,c) is probability of vote v on comment c in all groups except g

    And:
    - N(g,c) is the total number of non-missing votes on comment c in group g
    - N_v(g,c) is the total number of vote v on comment c in group g

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix
        cluster_labels (Optional[list[int]]): An optional list of cluster labels to determine groups.

    Returns:
        N_g_c (np.ndarray[int]): numpy matrix with counts of non-missing votes on comments/groups
        N_v_g_c (np.ndarray[int]): numpy matrix with counts of vote types on comments/groups
        P_v_g_c (np.ndarray[float]): numpy matrix with probabilities of vote types on comments/groups
        R_v_g_c (np.ndarray[float]): numpy matrix with representativeness of vote types on comments/groups
        P_v_g_c_test (np.ndarray[float]): test z-scores for probability of votes/comments/groups
        R_v_g_c_test (np.ndarray[float]): test z-scores for representativeness of votes/comments/groups
        C_v_c (np.ndarray[float]): group-aware consensus scores for each statement.
    """
    if cluster_labels is None:
        # Make a single group if no labels supplied.
        participant_count = len(vote_matrix.index)
        cluster_labels = [0] * participant_count

    # Get the vote matrix values
    X = vote_matrix.values

    group_count = len(set(cluster_labels))
    statement_ids = vote_matrix.columns

    # Set up all the variables to be populated.
    N_g_c = np.empty([group_count, len(statement_ids)], dtype="int32")
    N_v_g_c = np.empty(
        [len(votes.__dict__), group_count, len(statement_ids)], dtype="int32"
    )
    P_v_g_c = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    R_v_g_c = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    P_v_g_c_test = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    R_v_g_c_test = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    C_v_c = np.empty([len(votes.__dict__), len(statement_ids)])

    for gid in range(group_count):
        # Create mask for the participants in target group
        in_group_mask = np.asarray(cluster_labels) == gid
        X_in_group = X[in_group_mask]

        # Count any votes [-1, 0, 1] for all statements/features at once

        # NON-GROUP STATS

        # For in-group
        n_agree_in_group = N_v_g_c[votes.A, gid, :] = count_agree(X_in_group)  # na
        n_disagree_in_group = N_v_g_c[votes.D, gid, :] = count_disagree(
            X_in_group
        )  # nd
        n_votes_in_group = N_g_c[gid, :] = count_all_votes(X_in_group)  # ns

        # Calculate probabilities
        p_agree_in_group = P_v_g_c[votes.A, gid, :] = probability(
            n_agree_in_group, n_votes_in_group, pseudo_count
        )  # pa
        p_disagree_in_group = P_v_g_c[votes.D, gid, :] = probability(
            n_disagree_in_group, n_votes_in_group, pseudo_count
        )  # pd

        # Calculate probability test z-scores
        P_v_g_c_test[votes.A, gid, :] = one_prop_test(
            n_agree_in_group, n_votes_in_group
        )  # pat
        P_v_g_c_test[votes.D, gid, :] = one_prop_test(
            n_disagree_in_group, n_votes_in_group
        )  # pdt

        # GROUP COMPARISON STATS

        out_group_mask = ~in_group_mask
        X_out_group = X[out_group_mask]

        # For out-group
        n_agree_out_group = count_agree(X_out_group)
        n_disagree_out_group = count_disagree(X_out_group)
        n_votes_out_group = count_all_votes(X_out_group)

        # Calculate out-group probabilities
        p_agree_out_group = probability(
            n_agree_out_group, n_votes_out_group, pseudo_count
        )
        p_disagree_out_group = probability(
            n_disagree_out_group, n_votes_out_group, pseudo_count
        )

        # Calculate representativeness
        R_v_g_c[votes.A, gid, :] = p_agree_in_group / p_agree_out_group  # ra
        R_v_g_c[votes.D, gid, :] = p_disagree_in_group / p_disagree_out_group  # rd

        # Calculate representativeness test z-scores
        R_v_g_c_test[votes.A, gid, :] = two_prop_test(
            n_agree_in_group, n_agree_out_group, n_votes_in_group, n_votes_out_group
        )  # rat
        R_v_g_c_test[votes.D, gid, :] = two_prop_test(
            n_disagree_in_group,
            n_disagree_out_group,
            n_votes_in_group,
            n_votes_out_group,
        )  # rdt

    # Calculate group-aware consensus
    # For each statement, multiply probabilities across groups (aka the first axis=0)
    # Reference: https://github.com/compdemocracy/polis/blob/edge/math/src/polismath/math/conversation.clj#L615-L636
    C_v_c[votes.A, :] = P_v_g_c[votes.A, :, :].prod(axis=0)
    C_v_c[votes.D, :] = P_v_g_c[votes.D, :, :].prod(axis=0)

    return (
        N_g_c,  # ns
        N_v_g_c,  # na / nd
        P_v_g_c,  # pa / pd
        R_v_g_c,  # ra / rd
        P_v_g_c_test,  # pat / pdt
        R_v_g_c_test,  # rat / rdt
        C_v_c,  # gac
    )

reddwarf.utils.stats.calculate_comment_statistics_dataframes(vote_matrix, cluster_labels=None, pseudo_count=1)

Calculates comparative statement statistics across all votes and groups, generating dataframes.

This returns both group-specific statistics, and also overall stats (group-aware consensus).

Parameters:
  • vote_matrix (VoteMatrix) –

    The vote matrix where rows are voters, columns are statements, and values are votes (1 for agree, -1 for disagree, 0 for pass).

  • cluster_labels (ndarray, default: None ) –

    Array of cluster labels for each participant row in the vote matrix.

  • pseudo_count (int, default: 1 ) –

    Smoothing parameter to avoid division by zero. Default is 1.

Returns:
  • DataFrame

    pd.DataFrame: DataFrame (MultiIndex on group/statement) containing verbose statistics for each statement per group.

  • DataFrame

    pd.DataFrame: DataFrame containing group-aware consensus scores for each statement.

Source code in reddwarf/utils/stats.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def calculate_comment_statistics_dataframes(
    vote_matrix: VoteMatrix,
    cluster_labels: Optional[list[int] | NDArray[np.integer]] = None,
    pseudo_count: int = 1,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Calculates comparative statement statistics across all votes and groups, generating dataframes.

    This returns both group-specific statistics, and also overall stats (group-aware consensus).

    Args:
        vote_matrix (VoteMatrix): The vote matrix where rows are voters, columns are statements,
                                  and values are votes (1 for agree, -1 for disagree, 0 for pass).
        cluster_labels (np.ndarray): Array of cluster labels for each participant row in the vote matrix.
        pseudo_count (int): Smoothing parameter to avoid division by zero. Default is 1.

    Returns:
        pd.DataFrame: DataFrame (MultiIndex on group/statement) containing verbose statistics for each statement per group.
        pd.DataFrame: DataFrame containing group-aware consensus scores for each statement.
    """
    N_g_c, N_v_g_c, P_v_g_c, R_v_g_c, P_v_g_c_test, R_v_g_c_test, C_v_c = (
        calculate_comment_statistics(
            vote_matrix=vote_matrix,
            cluster_labels=cluster_labels,
            pseudo_count=pseudo_count,
        )
    )

    if cluster_labels is None:
        # Make a single group if no labels supplied.
        participant_count = len(vote_matrix.index)
        cluster_labels = [0] * participant_count

    group_count = len(set(cluster_labels))
    group_frames = []
    for group_id in range(group_count):
        group_df = pd.DataFrame(
            {
                "na": N_v_g_c[votes.A, group_id, :],  # number agree votes
                "nd": N_v_g_c[votes.D, group_id, :],  # number disagree votes
                "ns": N_g_c[group_id, :],  # number seen/total/non-missing votes
                "pa": P_v_g_c[votes.A, group_id, :],  # probability agree
                "pd": P_v_g_c[votes.D, group_id, :],  # probability disagree
                "pat": P_v_g_c_test[
                    votes.A, group_id, :
                ],  # probability agree test z-score
                "pdt": P_v_g_c_test[
                    votes.D, group_id, :
                ],  # probability disagree test z-score
                "ra": R_v_g_c[
                    votes.A, group_id, :
                ],  # repness of agree (representativeness)
                "rd": R_v_g_c[
                    votes.D, group_id, :
                ],  # repness of disagree (representativeness)
                "rat": R_v_g_c_test[
                    votes.A, group_id, :
                ],  # repress of agree test z-score
                "rdt": R_v_g_c_test[
                    votes.D, group_id, :
                ],  # repress of disagree test z-score
            },
            index=vote_matrix.columns,
        )
        group_df["group_id"] = group_id
        group_df["statement_id"] = vote_matrix.columns
        group_frames.append(group_df)
    # Create a MultiIndex dataframe
    grouped_stats_df = pd.concat(group_frames, ignore_index=True).set_index(
        ["group_id", "statement_id"]
    )

    group_aware_consensus_df = pd.DataFrame(
        {
            "group-aware-consensus": C_v_c[votes.A, :],
            "group-aware-consensus-agree": C_v_c[votes.A, :],
            "group-aware-consensus-disagree": C_v_c[votes.D, :]
        },
        index=vote_matrix.columns,
    )

    return grouped_stats_df, group_aware_consensus_df

reddwarf.utils

(These are in the process of being either moved or deprecated.)

reddwarf.utils.filter_votes(votes, cutoff=None)

Filters a list of votes.

If a cutoff is provided, votes are filtered based on either:

  • An int representing unix timestamp (ms), keeping only votes before or at that time.
    • Any int above 13_000_000_000 is considered a timestamp.
  • Any other positive or negative int is considered an index, reflecting where to trim the time-sorted vote list.
    • positive: filters in votes that many indices from start
    • negative: filters out votes that many indices from end
Parameters:
  • votes (List[Dict]) –

    An unsorted list of vote records, where each record is a dictionary containing:

    • "participant_id": The ID of the voter.
    • "statement_id": The ID of the statement being voted on.
    • "vote": The recorded vote value.
    • "modified": A unix timestamp object representing when the vote was made.
  • cutoff (int, default: None ) –

    A cutoff unix timestamp (ms) or index position in date-sorted votes list.

Returns:
  • votes( List[Dict] ) –

    An list of vote records, sorted by modified if index-based filtering occurred.

Source code in reddwarf/utils/matrix.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def filter_votes(
        votes: List[Dict],
        cutoff: Optional[int] = None,
) -> List[Dict]:
    """
    Filters a list of votes.

    If a `cutoff` is provided, votes are filtered based on either:

    - An `int` representing unix timestamp (ms), keeping only votes before or at that time.
        - Any int above 13_000_000_000 is considered a timestamp.
    - Any other positive or negative `int` is considered an index, reflecting where to trim the time-sorted vote list.
        - positive: filters in votes that many indices from start
        - negative: filters out votes that many indices from end

    Args:
        votes (List[Dict]): An unsorted list of vote records, where each record is a dictionary containing:

            - "participant_id": The ID of the voter.
            - "statement_id": The ID of the statement being voted on.
            - "vote": The recorded vote value.
            - "modified": A unix timestamp object representing when the vote was made.

        cutoff (int): A cutoff unix timestamp (ms) or index position in date-sorted votes list.

    Returns:
        votes (List[Dict]): An list of vote records, sorted by `modified` if index-based filtering occurred.
    """
    if cutoff:
        # TODO: Detect datetime object as arg instead.
        try:
            if cutoff > 1_300_000_000:
                cutoff_timestamp = cutoff
                votes = [v for v in votes if v['modified'] <= cutoff_timestamp]
            else:
                cutoff_index = cutoff
                votes = sorted(votes, key=lambda x: x["modified"])
                votes = votes[:cutoff_index]
        except KeyError as e:
            raise RedDwarfError("The `modified` key is missing from a vote object that must be sorted") from e

    return votes

reddwarf.utils.filter_matrix(vote_matrix, min_user_vote_threshold=7, active_statement_ids=[], keep_participant_ids=[], unvoted_filter_type='drop')

Generates a filtered vote matrix from a raw matrix and filter config.

Parameters:
  • vote_matrix (DataFrame) –

    The [raw] vote matrix.

  • min_user_vote_threshold (int, default: 7 ) –

    The number of votes a participant must make to avoid being filtered.

  • active_statement_ids (List[int], default: [] ) –

    The statement IDs that are not moderated out.

  • keep_participant_ids (List[int], default: [] ) –

    Preserve specific participants even if below threshold.

  • unvoted_filter_type (drop | zero, default: 'drop' ) –

    When a statement has no votes, it can't be imputed. This determined whether to drop the statement column, or set all the value to zero/pass. (Default: drop)

Returns:
  • filtered_vote_matrix( VoteMatrix ) –

    A vote matrix with the following filtered out:

    1. statements without any votes,
    2. statements that have been moderated out,
    3. participants below the vote count threshold,
    4. participants who have not been explicitly selected to circumvent above filtering.
Source code in reddwarf/utils/matrix.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def filter_matrix(
        vote_matrix: VoteMatrix,
        min_user_vote_threshold: int = 7,
        active_statement_ids: List[int] = [],
        keep_participant_ids: List[int] = [],
        unvoted_filter_type: Literal["drop", "zero"] = "drop",
) -> VoteMatrix:
    """
    Generates a filtered vote matrix from a raw matrix and filter config.

    Args:
        vote_matrix (pd.DataFrame): The [raw] vote matrix.
        min_user_vote_threshold (int): The number of votes a participant must make to avoid being filtered.
        active_statement_ids (List[int]): The statement IDs that are not moderated out.
        keep_participant_ids (List[int]): Preserve specific participants even if below threshold.
        unvoted_filter_type ("drop" | "zero"): When a statement has no votes, it can't be imputed. \
            This determined whether to drop the statement column, or set all the value to zero/pass. (Default: drop)

    Returns:
        filtered_vote_matrix (VoteMatrix): A vote matrix with the following filtered out:

            1. statements without any votes,
            2. statements that have been moderated out,
            3. participants below the vote count threshold,
            4. participants who have not been explicitly selected to circumvent above filtering.
    """
    # Filter out moderated statements.
    vote_matrix = vote_matrix.filter(active_statement_ids, axis='columns')
    # Filter out participants with less than 7 votes (keeping IDs we're forced to)
    # Ref: https://hyp.is/JbNMus5gEe-cQpfc6eVIlg/gwern.net/doc/sociology/2021-small.pdf
    participant_ids_in = get_clusterable_participant_ids(vote_matrix, min_user_vote_threshold)
    # Add in some specific participant IDs for Polismath edge-cases.
    # See: https://github.com/compdemocracy/polis/pull/1893#issuecomment-2654666421
    participant_ids_in = list(set(participant_ids_in + keep_participant_ids))
    vote_matrix = (vote_matrix
        .filter(participant_ids_in, axis='rows')
        # .filter() and .drop() lost the index name, so bring it back.
        .rename_axis("participant_id")
    )

    # This is otherwise the more efficient way, but we want to keep some participant IDs
    # to troubleshoot edge-cases in upsteam Polis math.
    # self.matrix = self.matrix.dropna(thresh=self.min_votes, axis='rows')

    unvoted_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)

    # TODO: What about statements with no votes? E.g., 53 in oprah. Filter out? zero?
    # Test this on a conversation where it will actually change statement count.
    if unvoted_filter_type == 'drop':
        vote_matrix = vote_matrix.drop(unvoted_statement_ids, axis='columns')
    elif unvoted_filter_type == 'zero':
        vote_matrix[unvoted_statement_ids] = 0

    return vote_matrix

reddwarf.utils.get_unvoted_statement_ids(vote_matrix)

A method intended to be piped into a VoteMatrix DataFrame, returning list of unvoted statement IDs.

See: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html

Parameters:
  • vote_matrix (DataFrame) –

    A pivot of statements (cols), participants (rows), with votes as values.

Returns:
  • unvoted_statement_ids( List[int] ) –

    list of statement IDs with no votes.

Example:

unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
Source code in reddwarf/utils/matrix.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_unvoted_statement_ids(vote_matrix: VoteMatrix) -> List[int]:
    """
    A method intended to be piped into a VoteMatrix DataFrame, returning list of unvoted statement IDs.

    See: <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html>

    Args:
        vote_matrix (pd.DataFrame): A pivot of statements (cols), participants (rows), with votes as values.

    Returns:
        unvoted_statement_ids (List[int]): list of statement IDs with no votes.

    Example:

        unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
    """
    null_column_mask = vote_matrix.isnull().all()
    null_column_ids = vote_matrix.columns[null_column_mask].tolist()

    return null_column_ids

reddwarf.data_presenter

reddwarf.data_presenter.generate_figure(coord_data, coord_labels=None, cluster_labels=None, flip_x=False, flip_y=False)

Generates a matplotlib scatterplot with optional bounded clusters.

The plot is drawn from a dataframe of xy values, each point labelled by index participant_id. When a list of cluster labels are supplied (corresponding to each row), concave hulls are drawn around them.

Signs of PCA projection coordinates are arbitrary, and can flip without meaning. Inverting axes can help compare results with Polis platform visualizations.

Parameters:
  • coord_data (DataFrame) –

    A dataframe of coordinates with columns named x and y, indexed by participant_id.

  • cluster_labels (List[int], default: None ) –

    A list of group labels, one for each row in coord_dataframe.

  • flip_x (bool, default: False ) –

    Flip the presentation of the X-axis so it descends left-to-right

  • flip_y (bool, default: False ) –

    Flip the presentation of the Y-axis so it descends top-to-bottom

Returns:
  • None

    None.

Source code in reddwarf/data_presenter.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def generate_figure(
    coord_data,
    coord_labels=None,
    cluster_labels: Optional[List[int]] = None,
    flip_x: bool = False,
    flip_y: bool = False,
) -> None:
    """
    Generates a matplotlib scatterplot with optional bounded clusters.

    The plot is drawn from a dataframe of xy values, each point labelled by index `participant_id`.
    When a list of cluster labels are supplied (corresponding to each row), concave hulls are drawn around them.

    Signs of PCA projection coordinates are arbitrary, and can flip without
    meaning. Inverting axes can help compare results with Polis platform
    visualizations.

    Args:
        coord_data (pd.DataFrame): A dataframe of coordinates with columns named `x` and `y`, indexed by `participant_id`.
        cluster_labels (List[int]): A list of group labels, one for each row in `coord_dataframe`.
        flip_x (bool): Flip the presentation of the X-axis so it descends left-to-right
        flip_y (bool): Flip the presentation of the Y-axis so it descends top-to-bottom

    Returns:
        None.
    """
    plt.figure(figsize=(7, 5), dpi=80)
    plt.axhline(y=0, color="k", linestyle="-", linewidth=0.5)
    plt.axvline(x=0, color="k", linestyle="-", linewidth=0.5)

    if flip_x:
        plt.gca().invert_xaxis()
    if flip_y:
        plt.gca().invert_yaxis()

    # Label points when coordinate labels are provided.
    if coord_labels is not None:
        for label, xy in zip(coord_labels, coord_data):
            plt.annotate(
                str(label),
                (float(xy[0]), float(xy[1])),
                xytext=(2, 2),
                color="gray",
                textcoords="offset points",
            )

    scatter_kwargs = defaultdict()
    scatter_kwargs["x"] = coord_data[:, 0]
    scatter_kwargs["y"] = coord_data[:, 1]
    scatter_kwargs["s"] = 10  # point size
    scatter_kwargs["alpha"] = 0.8  # point transparency

    # Wrap clusters in hulls when cluster labels are provided.
    if cluster_labels is not None:
        # Ref: https://matplotlib.org/stable/users/explain/colors/colormaps.html#qualitative
        scatter_kwargs["cmap"] = "Set1"  # color map

        # Pad cluster_labels to match the number of points
        CLUSTER_CENTER_LABEL = -2
        if len(cluster_labels) < len(coord_data):
            pad_length = len(coord_data) - len(cluster_labels)
            cluster_labels = np.concatenate(
                [cluster_labels, [CLUSTER_CENTER_LABEL] * pad_length]
            )

        scatter_kwargs["c"] = cluster_labels  # color indexes

        print("Calculating convex hulls around clusters...")
        # Subset to allow unlabelled points to just be plotted
        unique_labels = np.unique(cluster_labels)
        for label in unique_labels:
            if label in (-1, -2):
                continue  # skip hulls when special-case labels used

            label_mask = cluster_labels == label
            cluster_points = coord_data[label_mask]

            print(f"Hull {label}, bounding {len(cluster_points)} points")

            if len(cluster_points) < 3:
                # TODO: Accomodate 2 points like Polis platform does.
                print("Cannot create concave hull for less than 3 points. Skipping...")
                continue

            hull_point_indices = concave_hull_indexes(cluster_points, concavity=4.0)
            hull_points = cluster_points[hull_point_indices]

            polygon = patches.Polygon(
                hull_points,
                fill=True,
                color="gray",
                alpha=0.3,
                edgecolor=None,
            )
            plt.gca().add_patch(polygon)

    scatter = plt.scatter(**scatter_kwargs)

    # Add a legend if labels are provided
    if cluster_labels is not None:
        unique_labels = np.unique(cluster_labels)
        cbar = plt.colorbar(scatter, label="Cluster", ticks=unique_labels)

        tick_labels = []
        for lbl in unique_labels:
            if lbl == -1:
                tick_labels.append("[Unclustered]")
            elif lbl == -2:
                tick_labels.append("[Center Guess]")
            else:
                tick_labels.append(GROUP_LABEL_NAMES[lbl])
        cbar.ax.set_yticklabels(tick_labels)

    plt.show()

    return None

reddwarf.data_presenter.generate_figure_polis(result, show_guesses=False, flip_x=True, flip_y=False, show_pids=True)

Generate a Polis-style visualization from clustering results.

Parameters:
  • result (PolisClusteringResult) –

    The result object from run_pipeline

  • show_guesses (bool, default: False ) –

    Show the initial cluster center guesses on the plot

  • flip_x (bool, default: True ) –

    Flip the X-axis (default True to match Polis interface)

  • flip_y (bool, default: False ) –

    Flip the Y-axis (default False)

  • show_pids (bool, default: True ) –

    Show the participant IDs on the plot

Source code in reddwarf/data_presenter.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def generate_figure_polis(
    result: PolisClusteringResult,
    show_guesses=False,
    flip_x=True,
    flip_y=False,
    show_pids=True,
):
    """
    Generate a Polis-style visualization from clustering results.

    Args:
        result (PolisClusteringResult): The result object from run_pipeline
        show_guesses (bool): Show the initial cluster center guesses on the plot
        flip_x (bool): Flip the X-axis (default True to match Polis interface)
        flip_y (bool): Flip the Y-axis (default False)
        show_pids (bool): Show the participant IDs on the plot
    """
    participants_clustered_df = result.participants_df[
        result.participants_df["cluster_id"].notnull()
    ]
    cluster_labels = participants_clustered_df["cluster_id"].values

    coord_data = participants_clustered_df.loc[:, ["x", "y"]].values
    coord_labels = None
    # Add the init center guesses to the bottom of the coord stack. Internally, they
    # will be give a fake "-1" colored label that won't be used to draw clusters.
    # This is for illustration purpose to see the centroid guesses.
    if show_guesses:
        coord_data = np.vstack(
            [
                coord_data,
                np.asarray(
                    result.clusterer.init_centers_used_ if result.clusterer else []
                ),
            ]
        )

    if show_pids:
        coord_labels = [f"p{pid}" for pid in participants_clustered_df.index]

    generate_figure(
        coord_data=coord_data,
        coord_labels=coord_labels,
        cluster_labels=cluster_labels,
        # Always needs flipping to look like Polis interface.
        flip_x=flip_x,
        # Sometimes needs flipping to look like Polis interface.
        flip_y=flip_y,
    )

Types

reddwarf.implementations.base.PolisClusteringResult dataclass

Attributes:
  • raw_vote_matrix (DataFrame) –

    Raw sparse vote matrix before any processing.

  • filtered_vote_matrix (DataFrame) –

    Raw sparse vote matrix with moderated statements zero'd out.

  • reducer (ReducerModel) –

    scikit-learn reducer model fitted to vote matrix.

  • clusterer (ClustererModel) –

    scikit-learn clusterer model, fitted to participant projections. (includes labels_)

  • group_comment_stats (DataFrame) –

    A multi-index dataframes for each statement, indexed by group ID and statement.

  • statements_df (DataFrame) –

    A dataframe with all intermediary and final statement data/calculations/metadata.

  • participants_df (DataFrame) –

    A dataframe with all intermediary and final participant data/calculations/metadata.

  • participant_projections (dict) –

    A dict of participant projected coordinates, keyed to participant ID.

  • statement_projections (Optional[dict]) –

    A dict of statement projected coordinates, keyed to statement ID.

  • group_aware_consensus (dict) –

    A nested dict of statement group-aware-consensus values, keyed first by agree/disagree, then participant ID.

  • consensus (ConsensusResult) –

    A dict of the most statistically significant statements for each of agree/disagree.

  • repness (PolisRepness) –

    A dict of the most statistically significant statements most representative of each group.

Source code in reddwarf/implementations/base.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@dataclass
class PolisClusteringResult:
    """
    Attributes:
        raw_vote_matrix (DataFrame): Raw sparse vote matrix before any processing.
        filtered_vote_matrix (DataFrame): Raw sparse vote matrix with moderated statements zero'd out.
        reducer (ReducerModel): scikit-learn reducer model fitted to vote matrix.
        clusterer (ClustererModel): scikit-learn clusterer model, fitted to participant projections. (includes `labels_`)
        group_comment_stats (DataFrame): A multi-index dataframes for each statement, indexed by group ID and statement.
        statements_df (DataFrame): A dataframe with all intermediary and final statement data/calculations/metadata.
        participants_df (DataFrame): A dataframe with all intermediary and final participant data/calculations/metadata.
        participant_projections (dict): A dict of participant projected coordinates, keyed to participant ID.
        statement_projections (Optional[dict]): A dict of statement projected coordinates, keyed to statement ID.
        group_aware_consensus (dict): A nested dict of statement group-aware-consensus values, keyed first by agree/disagree, then participant ID.
        consensus (ConsensusResult): A dict of the most statistically significant statements for each of agree/disagree.
        repness (PolisRepness): A dict of the most statistically significant statements most representative of each group.
    """

    raw_vote_matrix: DataFrame
    filtered_vote_matrix: DataFrame
    reducer: ReducerModel
    # TODO: Figure out how to guarantee PolisKMeans model returned.
    clusterer: ClustererModel | None
    group_comment_stats: DataFrame
    statements_df: DataFrame
    participants_df: DataFrame
    participant_projections: dict
    statement_projections: Optional[dict]
    group_aware_consensus: dict
    consensus: ConsensusResult
    repness: PolisRepness