API Reference

reddwarf.implementations.base

reddwarf.implementations.polis.run_pipeline(**kwargs)

Source code in reddwarf/implementations/polis.py
10
11
12
13
14
15
16
def run_pipeline(**kwargs) -> base.PolisClusteringResult:
    kwargs = {
        "reducer": "pca",
        "clusterer": "kmeans",
        **kwargs,
    }
    return base.run_pipeline(**kwargs)

reddwarf.implementations.polis

reddwarf.implementations.polis.run_pipeline(**kwargs)

Source code in reddwarf/implementations/polis.py
10
11
12
13
14
15
16
def run_pipeline(**kwargs) -> base.PolisClusteringResult:
    kwargs = {
        "reducer": "pca",
        "clusterer": "kmeans",
        **kwargs,
    }
    return base.run_pipeline(**kwargs)

reddwarf.sklearn

Various custom Scikit-Learn estimators to mimick aspects of Polis, suitable for use in Scikit-Learn workflows, pipelines, and APIs.

reddwarf.sklearn.cluster.PolisKMeans

Bases: KMeans

A modified version of scikit-learn's KMeans that allows partial initialization with user-supplied cluster centers and custom fallback strategies.

This subclass extends sklearn.cluster.KMeans with additional features around centroid initialization. Outside the behavior documented, it retains all other parameters and behavior from the base KMeans implementation.

Parameters:
  • init (('k-means++', random, polis), default: 'k-means++' ) –

    Strategy to initialize any missing cluster centers if init_centers is not fully specified. The strategies are:

    • 'k-means++': Smart centroid initialization (same as scikit-learn default)
    • 'random': Random selection of initial centers from the data (same as scikit-learn)
    • 'polis': Selects the first unique data points in X as initial centers.
      • This strategy is deterministic for any stable set of X, while determinism in the other strategies depends on random_state.

    Note

    Unlike KMeans parent class, we prevent passing ndarray args here, and expect init_centers to handle that use-case.

  • init_centers (ndarray of shape (n_clusters, n_features), default: None ) –

    Initial cluster centers to use. May contain fewer (or more) than n_clusters:

    • If more, the extras will be trimmed
    • If fewer, the remaining will be filled using the init strategy
Attributes:
  • init_centers_used_ (ndarray of shape (n_clusters, n_features)) –

    The full array of initial cluster centers actually used to initialize the algorithm, including both init_centers and any centers generated from the init strategy.

See Also

sklearn.cluster.KMeans : Original implementation with full parameter list.

Source code in reddwarf/sklearn/cluster.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class PolisKMeans(KMeans):
    """
    A modified version of scikit-learn's KMeans that allows partial initialization
    with user-supplied cluster centers and custom fallback strategies.

    This subclass extends `sklearn.cluster.KMeans` with additional features
    around centroid initialization. Outside the behavior documented, it retains
    all other parameters and behavior from the base KMeans implementation.

    Parameters
    ----------

    init : {'k-means++', 'random', 'polis'}, default='k-means++'
        Strategy to initialize any missing cluster centers if `init_centers` is
        not fully specified. The strategies are:

        - 'k-means++': Smart centroid initialization (same as scikit-learn default)
        - 'random': Random selection of initial centers from the data (same as scikit-learn)
        - 'polis': Selects the first unique data points in `X` as initial centers.
            - This strategy is deterministic for any stable set of `X`, while
            determinism in the other strategies depends on `random_state`.

        !!! note
            Unlike `KMeans` parent class, we prevent passing `ndarray` args
            here, and expect `init_centers` to handle that use-case.

    init_centers : ndarray of shape (n_clusters, n_features), optional
        Initial cluster centers to use. May contain fewer (or more) than `n_clusters`:

        - If more, the extras will be trimmed
        - If fewer, the remaining will be filled using the `init` strategy

    Attributes
    ----------

    init_centers_used_ : ndarray of shape (n_clusters, n_features)
        The full array of initial cluster centers actually used to initialize the algorithm,
        including both `init_centers` and any centers generated from the `init` strategy.

    See Also
    --------

    `sklearn.cluster.KMeans` : Original implementation with full parameter list.
    """
    def __init__(
        self,
        n_clusters=8,
        init="k-means++",  # or 'random', 'polis'
        init_centers: Optional[ArrayLike] = None,  # array-like, optional
        n_init="auto",
        max_iter=300,
        tol=1e-4,
        verbose=0,
        random_state=None,
        copy_x=True,
        algorithm="lloyd",
    ):
        super().__init__(
            n_clusters=n_clusters,
            init=init,  # will override via set_params, with our center selection logic below
            n_init=n_init,
            max_iter=max_iter,
            tol=tol,
            verbose=verbose,
            random_state=random_state,
            copy_x=copy_x,
            algorithm=algorithm,
        )
        self._init_strategy = init
        self.init_centers = init_centers
        self.init_centers_used_ = None

    def _generate_centers(self, X, x_squared_norms, n_to_generate, random_state):
        if not isinstance(self._init_strategy, str):
            raise ValueError("Internal error: _strategy must be a string.")

        if self._init_strategy == "k-means++":
            centers, _ = kmeans_plusplus(
                X, n_clusters=n_to_generate,
                random_state=random_state,
                x_squared_norms=x_squared_norms
            )
        elif self._init_strategy == "random":
            indices = random_state.choice(X.shape[0], n_to_generate, replace=False)
            centers = X[indices]
        elif self._init_strategy == "polis":
            unique_X = np.unique(X, axis=0)
            if len(unique_X) < n_to_generate:
                raise ValueError("Not enough unique rows in X for 'polis' strategy.")
            centers = unique_X[:n_to_generate]
        else:
            raise ValueError(f"Unsupported init strategy: {self._init_strategy}")
        return centers

    def fit(self, X, y=None, sample_weight=None):
        X = check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) # type:ignore
        random_state = check_random_state(self.random_state)
        x_squared_norms = np.sum(X ** 2, axis=1)

        # Determine init_centers_used_
        if self.init_centers is not None:
            init_array = np.array(self.init_centers)
            if init_array.ndim != 2 or init_array.shape[1] != X.shape[1]:
                raise ValueError("init_centers must be of shape (n, n_features)")

            n_given = init_array.shape[0]
            if n_given > self.n_clusters:
                init_array = init_array[:self.n_clusters]
            elif n_given < self.n_clusters:
                needed = self.n_clusters - n_given
                extra = self._generate_centers(X, x_squared_norms, needed, random_state)
                init_array = np.vstack([init_array, extra])
            self.init_centers_used_ = init_array.copy()
        else:
            self.init_centers_used_ = self._generate_centers(
                X, x_squared_norms, self.n_clusters, random_state
            )

        # Override the init param passed to sklearn with actual centers.
        # We take control of the initialization strategy (`k-means++`, `random`,
        # `polis`, etc) in our own code.
        super().set_params(init=self.init_centers_used_)

        return super().fit(X, y=y, sample_weight=sample_weight)

reddwarf.sklearn.cluster.PolisKMeansDownsampler

Bases: BaseEstimator, TransformerMixin

A transformer that fits PolisKMeans and returns the cluster centers as the downsampled dataset.

This will support mimicking "base clusters" from the Polis platform.

This enables use in sklearn pipelines, where intermediate steps are expected to implement both fit and transform.

Source code in reddwarf/sklearn/cluster.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class PolisKMeansDownsampler(BaseEstimator, TransformerMixin):
    """
    A transformer that fits `PolisKMeans` and returns the cluster centers as the
    downsampled dataset.

    This will support mimicking "base clusters" from the Polis platform.

    This enables use in sklearn pipelines, where intermediate steps
    are expected to implement both `fit` and `transform`.
    """
    def __init__(self,
        n_clusters=100,
        random_state=None,
        init="k-means++",
        init_centers=None,
    ):
        self.n_clusters = n_clusters
        self.random_state = random_state
        self.init = init
        self.init_centers = init_centers
        self.kmeans_ = None

    def fit(self, X, y=None):
        self.kmeans_ = PolisKMeans(
            n_clusters=self.n_clusters,
            random_state=self.random_state,
            init=self.init,
            init_centers=self.init_centers,
        )
        self.kmeans_.fit(X)
        return self

    def transform(self, X, y=None):
        return self.kmeans_.cluster_centers_ if self.kmeans_ else None

reddwarf.sklearn.model_selection.GridSearchNonCV

Bases: GridSearchCV

sklearn.model_selection.GridSearchCV, but modified to score against the full dataset (ie. not cross-validated).

Normally, GridSearchCV splits up the X data and scores each "fold" of data. This is identical, but we automatically use the full dataset in each fold.

Source code in reddwarf/sklearn/model_selection.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class GridSearchNonCV(GridSearchCV):
    """
    `sklearn.model_selection.GridSearchCV`, but modified to score against the
    full dataset (ie. not cross-validated).

    Normally, `GridSearchCV` splits up the `X` data and scores each "fold" of data.
    This is identical, but we automatically use the full dataset in each fold.
    """
    def __init__(self, estimator, param_grid, scoring=None, refit=True, **kwargs):
        # Default CV is a single fold: train = test = full dataset
        self._default_cv = None  # we'll set it in fit() when we have data size

        # User can override cv via kwargs
        self._user_provided_cv = 'cv' in kwargs
        super().__init__(
            estimator=estimator,
            param_grid=param_grid,
            scoring=scoring,
            refit=refit,
            cv=None,  # will be overwritten in fit()
            **kwargs
        )

    def fit(self, X, y=None, **fit_params):
        if not self._user_provided_cv:
            # Create full-fold cross-validation only if user didn’t specify their own.
            # This scores the full dataset for each branch of grid.
            # See: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html#:~:text=An%20iterable%20yielding%20(train%2C%20test)%20splits%20as%20arrays%20of%20indices.
            n_samples = len(X)
            full_idx = np.arange(n_samples)
            train_idx = full_idx
            test_idx = full_idx
            # Use full dataset for training/testing
            full_fold = [(train_idx, test_idx)]
            self.cv = full_fold
        return super().fit(X, y, **fit_params)

reddwarf.sklearn.transformers.SparsityAwareScaler

Bases: BaseEstimator, TransformerMixin

Scale projected points (participant or statements) based on sparsity of vote matrix, to account for any small number of votes by a participant and prevent those participants from bunching up in the center.

Attributes:
  • capture_step (str | int | None) –

    Name or index of the capture step in the pipeline.

  • X_sparse (ndarray | None) –

    A sparse array with shape (n_features,)

Source code in reddwarf/sklearn/transformers.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class SparsityAwareScaler(BaseEstimator, TransformerMixin):
    """
    Scale projected points (participant or statements) based on sparsity of vote
    matrix, to account for any small number of votes by a participant and
    prevent those participants from bunching up in the center.

    Attributes:
        capture_step (str | int | None): Name or index of the capture step in the pipeline.
        X_sparse (np.ndarray | None): A sparse array with shape (n_features,)
    """
    def __init__(self, capture_step: Optional[str | int] = None, X_sparse: Optional[Array1D | Array2D] = None):
        self.capture_step = capture_step
        self.X_sparse = X_sparse

    # See: https://scikit-learn.org/stable/modules/generated/sklearn.utils.Tags.html#sklearn.utils.Tags
    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        # Suppresses warning caused by fit() not being required before usage in transform().
        tags.requires_fit = False
        return tags

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        scaling_factors = self._calculate_scaling_factors()
        return X * scaling_factors[:, np.newaxis]

    def inverse_transform(self, X):
        scaling_factors = self._calculate_scaling_factors()
        return X / scaling_factors[:, np.newaxis]


    def _get_pipeline_step(self, step):
        """
        Fetch the parent pipeline when available via PatchedPipeline usage.
        """
        parent = getattr(self, "_parent_pipeline", None)
        if parent is None:
            raise RuntimeError(
                f"{self.__class__.__name__} cannot resolve `capture_step={step}` "
                "because it is not being used inside a `PatchedPipeline`. "
                "Either use a `PatchedPipeline` or pass `X_sparse` directly."
            )
        if isinstance(step, str):
            return parent.named_steps[step]
        elif isinstance(step, int):
            return parent.steps[step][1]
        else:
            raise ValueError("`capture_step` must be a string (name) or int (index).")

    def _resolve_X_sparse(self):
        """
        Resolve X_sparse (a sparse vote matrix) from argument or prior capture step.
        """
        if self.X_sparse is not None:
            return self.X_sparse

        capture = self._get_pipeline_step(self.capture_step)
        if not hasattr(capture, "X_captured_"):
            raise AttributeError(
                f"Step '{self.capture_step}' does not contain `.X_captured_`. "
                f"Did you run `fit/transform` on the pipeline?"
            )
        return capture.X_captured_

    def _calculate_scaling_factors(self):
        X_sparse = self._resolve_X_sparse()
        return calculate_scaling_factors(X_sparse=X_sparse)

reddwarf.utils.matrix

reddwarf.utils.matrix.generate_raw_matrix(votes, cutoff=None)

Generates a raw vote matrix from a list of vote records.

See filter_votes method for details of cutoff arg.

Parameters:
  • votes (List[Dict]) –

    An unsorted list of vote records, where each record is a dictionary containing:

    • "participant_id": The ID of the voter.
    • "statement_id": The ID of the statement being voted on.
    • "vote": The recorded vote value.
    • "modified": A unix timestamp object representing when the vote was made.
  • cutoff (int, default: None ) –

    A cutoff unix timestamp (ms) or index position in date-sorted votes list.

Returns:
  • raw_matrix( DataFrame ) –

    A full raw vote matrix DataFrame with NaN values where:

    1. rows are voters,
    2. columns are statements, and
    3. values are votes.

    This includes even voters that have no votes, and statements on which no votes were placed.

Source code in reddwarf/utils/matrix.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def generate_raw_matrix(
        votes: List[Dict],
        cutoff: Optional[int] = None,
) -> VoteMatrix:
    """
    Generates a raw vote matrix from a list of vote records.

    See `filter_votes` method for details of `cutoff` arg.

    Args:
        votes (List[Dict]): An unsorted list of vote records, where each record is a dictionary containing:

            - "participant_id": The ID of the voter.
            - "statement_id": The ID of the statement being voted on.
            - "vote": The recorded vote value.
            - "modified": A unix timestamp object representing when the vote was made.

        cutoff (int): A cutoff unix timestamp (ms) or index position in date-sorted votes list.

    Returns:
        raw_matrix (pd.DataFrame): A full raw vote matrix DataFrame with NaN values where:

            1. rows are voters,
            2. columns are statements, and
            3. values are votes.

            This includes even voters that have no votes, and statements on which no votes were placed.
    """
    if cutoff:
        votes = filter_votes(votes=votes, cutoff=cutoff)

    raw_matrix = pd.DataFrame.from_dict(votes)
    raw_matrix = raw_matrix.pivot(
        values="vote",
        index="participant_id",
        columns="statement_id",
    )

    return raw_matrix

reddwarf.utils.matrix.simple_filter_matrix(vote_matrix, mod_out_statement_ids=[])

The simple filter on the vote_matrix that is used by Polis prior to running PCA.

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix (with missing values)

  • mod_out_statement_ids (list, default: [] ) –

    A list of moderated-out participant IDs to zero out.

Returns:
  • VoteMatrix( VoteMatrix ) –

    Copy of vote_matrix with statements zero'd out

Source code in reddwarf/utils/matrix.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def simple_filter_matrix(
        vote_matrix: VoteMatrix,
        mod_out_statement_ids: list[int] = [],
) -> VoteMatrix:
    """
    The simple filter on the vote_matrix that is used by Polis prior to running PCA.

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix (with missing values)
        mod_out_statement_ids (list): A list of moderated-out participant IDs to zero out.

    Returns:
        VoteMatrix: Copy of vote_matrix with statements zero'd out
    """
    vote_matrix = vote_matrix.copy()
    for tid in mod_out_statement_ids:
        # Zero out column only if already exists (ie. has votes)
        if tid in vote_matrix.columns:
            # TODO: Add a flag to try np.nan instead of zero.
            vote_matrix.loc[:, tid] = 0

    return vote_matrix

reddwarf.utils.matrix.get_clusterable_participant_ids(vote_matrix, vote_threshold)

Find participant IDs that meet a vote threshold in a vote_matrix.

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix (with missing values)

  • vote_threshold (int) –

    Vote threshold that each participant must meet

Returns:
  • participation_ids( list ) –

    A list of participant IDs that meet the threshold

Source code in reddwarf/utils/matrix.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_clusterable_participant_ids(vote_matrix: VoteMatrix, vote_threshold: int) -> list:
    """
    Find participant IDs that meet a vote threshold in a vote_matrix.

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix (with missing values)
        vote_threshold (int): Vote threshold that each participant must meet

    Returns:
        participation_ids (list): A list of participant IDs that meet the threshold
    """
    # TODO: Make this available outside this function? To match polismath output.
    user_vote_counts = vote_matrix.count(axis="columns")
    participant_ids = list(vote_matrix[user_vote_counts >= vote_threshold].index)
    return participant_ids

reddwarf.utils.reducer

reddwarf.utils.reducer.base.run_reducer(vote_matrix, reducer='pca', n_components=2, **reducer_kwargs)

Process a prepared vote matrix to be imputed and return participant and (optionally) statement data, projected into reduced n-dimensional space.

The vote matrix should not yet be imputed, as this will happen within the method.

Parameters:
  • vote_matrix (NDArray) –

    A vote matrix of data. Non-imputed values are expected.

  • n_components (int, default: 2 ) –

    Number n of principal components to decompose the vote_matrix into.

  • reducer (Literal['pca', 'pacmap', 'localmap'], default: 'pca' ) –

    Dimensionality reduction method to use.

Returns:
  • X_participants( NDArray ) –

    A numpy array with n-d coordinates for each projected row/participant.

  • X_statements( Optional[NDArray] ) –

    A numpy array with n-d coordinates for each projected col/statement.

  • reducer_model( ReducerModel ) –

    The fitted dimensional reduction sci-kit learn estimator.

Source code in reddwarf/utils/reducer/base.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def run_reducer(
    vote_matrix: NDArray,
    reducer: ReducerType = "pca",
    n_components: int = 2,
    **reducer_kwargs,
) -> Tuple[NDArray, Optional[NDArray], ReducerModel]:
    """
    Process a prepared vote matrix to be imputed and return participant and (optionally) statement data,
    projected into reduced n-dimensional space.

    The vote matrix should not yet be imputed, as this will happen within the method.

    Args:
        vote_matrix (NDArray): A vote matrix of data. Non-imputed values are expected.
        n_components (int): Number n of principal components to decompose the `vote_matrix` into.
        reducer (Literal["pca", "pacmap", "localmap"]): Dimensionality reduction method to use.

    Returns:
        X_participants (NDArray): A numpy array with n-d coordinates for each projected row/participant.
        X_statements (Optional[NDArray]): A numpy array with n-d coordinates for each projected col/statement.
        reducer_model (ReducerModel): The fitted dimensional reduction sci-kit learn estimator.
    """
    match reducer:
        case "pca":
            pipeline = PatchedPipeline(
                [
                    ("capture", SparsityAwareCapturer()),
                    ("impute", SimpleImputer(missing_values=np.nan, strategy="mean")),
                    ("reduce", get_reducer(reducer, n_components=n_components, **reducer_kwargs)),
                    ("scale", SparsityAwareScaler(capture_step="capture")),
                ]
            )
        case "pacmap" | "localmap":
            pipeline = PatchedPipeline(
                [
                    ("impute", SimpleImputer(missing_values=np.nan, strategy="mean")),
                    ("reduce", get_reducer(reducer, n_components=n_components, **reducer_kwargs)),
                ]
            )

    # Generate projections of participants.
    X_participants = pipeline.fit_transform(vote_matrix)

    if reducer == "pca":
        # Generate projections of statements via virtual vote matrix.
        # This projects unit vectors for each feature/statement into PCA space to
        # understand their placement.
        num_cols = vote_matrix.shape[1]
        n_statements = num_cols
        virtual_vote_matrix = generate_virtual_vote_matrix(n_statements)
        X_statements = pipeline.transform(virtual_vote_matrix)
    else:
        X_statements = None

    reducer_model: ReducerModel = pipeline.named_steps["reduce"]

    return X_participants, X_statements, reducer_model

reddwarf.utils.reducer.base.get_reducer(reducer='pca', n_components=2, random_state=None, **reducer_kwargs)

Source code in reddwarf/utils/reducer/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def get_reducer(
    reducer: ReducerType = "pca",
    n_components: int = 2,
    random_state: Optional[int] = None,
    **reducer_kwargs,
) -> ReducerModel:
    # Setting n_neighbors to None defaults to 10 below 10,000 samples, and
    # slowly increases it according to a formula beyond that.
    # See: https://github.com/YingfanWang/PaCMAP?tab=readme-ov-file#parameters
    DEFAULT_N_NEIGHBORS = None
    match reducer:
        case "pacmap" | "localmap":
            pacmap = try_import("pacmap", extra="alt-algos")

            # Override with default if not set
            n_neighbors = reducer_kwargs.pop("n_neighbors", DEFAULT_N_NEIGHBORS)

            ReducerCls = pacmap.PaCMAP if reducer == "pacmap" else pacmap.LocalMAP
            return ReducerCls(
                n_components=n_components,
                random_state=random_state,
                n_neighbors=n_neighbors,  # type:ignore
                **reducer_kwargs,
            )
        case "pca" | _:
            from sklearn.decomposition import PCA

            return PCA(
                n_components=n_components,
                random_state=random_state,
                **reducer_kwargs,
            )

reddwarf.utils.clusterer

reddwarf.utils.clusterer.base.run_clusterer(X_participants_clusterable, clusterer='kmeans', force_group_count=None, max_group_count=5, init_centers=None, random_state=None, **clusterer_kwargs)

Source code in reddwarf/utils/clusterer/base.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def run_clusterer(
    X_participants_clusterable: NDArray,
    clusterer: ClustererType = "kmeans",
    force_group_count=None,
    max_group_count=5,
    init_centers=None,
    random_state=None,
    **clusterer_kwargs,
) -> Optional[ClustererModel]:
    match clusterer:
        case "kmeans":
            if force_group_count:
                k_bounds = [force_group_count, force_group_count]
            else:
                k_bounds = [2, max_group_count]

            _, _, kmeans = find_best_kmeans(
                X_to_cluster=X_participants_clusterable,
                k_bounds=k_bounds,
                # Force polis strategy of initiating cluster centers. See: PolisKMeans.
                init="polis",
                init_centers=init_centers,
                random_state=random_state,
                # TODO: Support passing in arbitrary clusterer_kwargs.
            )

            return kmeans

        case "hdbscan":
            hdbscan = try_import("hdbscan", extra="alt-algos")

            hdb = hdbscan.HDBSCAN(**clusterer_kwargs)
            hdb.fit(X_participants_clusterable)

            return hdb
        case _:
            raise NotImplementedError("clusterer type unknown")

reddwarf.utils.clusterer.kmeans.find_best_kmeans(X_to_cluster, k_bounds=[2, 5], init='k-means++', init_centers=None, random_state=None)

Use silhouette scores to find the best number of clusters k to assume to fit the data.

Parameters:
  • X_to_cluster (NDArray) –

    A n-D numpy array.

  • k_bounds (RangeLike, default: [2, 5] ) –

    An upper and low bound on n_clusters to test for. (Default: [2, 5])

  • init_centers (List, default: None ) –

    A list of xy coordinates to use as initial center guesses.

  • random_state (int, default: None ) –

    Determines random number generation for centroid initialization. Use an int to make the randomness deterministic.

Returns:
  • best_k( int ) –

    Ideal number of clusters.

  • best_silhouette_score( float ) –

    Silhouette score for this K value.

  • best_kmeans( PolisKMeans | None ) –

    The optimal fitted estimator returned from PolisKMeans.

Source code in reddwarf/utils/clusterer/kmeans.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def find_best_kmeans(
        X_to_cluster: NDArray,
        k_bounds: RangeLike = [2, 5],
        init="k-means++",
        init_centers: Optional[List] = None,
        random_state: Optional[int] = None,
) -> tuple[int, float, PolisKMeans | None]:
    """
    Use silhouette scores to find the best number of clusters k to assume to fit the data.

    Args:
        X_to_cluster (NDArray): A n-D numpy array.
        k_bounds (RangeLike): An upper and low bound on n_clusters to test for. (Default: [2, 5])
        init_centers (List): A list of xy coordinates to use as initial center guesses.
        random_state (int): Determines random number generation for centroid initialization. Use an int to make the randomness deterministic.

    Returns:
        best_k (int): Ideal number of clusters.
        best_silhouette_score (float): Silhouette score for this K value.
        best_kmeans (PolisKMeans | None): The optimal fitted estimator returned from PolisKMeans.
    """
    param_grid = {
        "n_clusters": to_range(k_bounds),
    }

    def scoring_function(estimator, X):
        labels = estimator.fit_predict(X)
        return silhouette_score(X, labels)

    search = GridSearchNonCV(
        param_grid=param_grid,
        scoring=scoring_function,
        estimator=PolisKMeans(
            init=init, # strategy
            init_centers=init_centers, # guesses
            random_state=random_state,
        ),
    )

    search.fit(X_to_cluster)

    best_k = search.best_params_['n_clusters']
    best_silhouette_score = search.best_score_
    best_kmeans = search.best_estimator_

    return best_k, best_silhouette_score, best_kmeans

reddwarf.utils.consensus

reddwarf.utils.consensus.select_consensus_statements(vote_matrix, mod_out_statement_ids=[], pick_max=5, prob_threshold=0.5, confidence=0.9)

Select consensus statements from a given vote matrix.

Parameters:
  • vote_matrix (VoteMatrix) –

    The full raw vote matrix (not just clusterable participants)

  • mod_out_statement_ids (Optional[list[int]], default: [] ) –

    Statements to ignore from consensus statement selection

  • pick_max (int, default: 5 ) –

    Max number of statements selected per agree/disagree direction

  • prob_threshold (float, default: 0.5 ) –

    The cutoff probability below which statements won't be considered for consensus

  • confidence (float, default: 0.9 ) –

    Percent confidence interval (in decimal), within which selected statements are deemed significant

Returns:
  • ConsensusResult

    A dict of agree/disagree formatted statement dicts.

Source code in reddwarf/utils/consensus.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def select_consensus_statements(
    vote_matrix: VoteMatrix,
    mod_out_statement_ids: list[int] = [],
    pick_max=5,
    prob_threshold: float = 0.5,
    confidence: float = 0.9,
) -> ConsensusResult:
    """
    Select consensus statements from a given vote matrix.

    Args:
        vote_matrix (VoteMatrix): The full raw vote matrix (not just clusterable participants)
        mod_out_statement_ids (Optional[list[int]]): Statements to ignore from consensus statement selection
        pick_max (int): Max number of statements selected per agree/disagree direction
        prob_threshold (float): The cutoff probability below which statements won't be considered for consensus
        confidence (float): Percent confidence interval (in decimal), within which selected statements are deemed significant

    Returns:
        A dict of agree/disagree formatted statement dicts.
    """
    N_g_c, N_v_g_c, P_v_g_c, _, P_v_g_c_test, *_ = calculate_comment_statistics(
        vote_matrix=vote_matrix,
        cluster_labels=None,
    )
    # When no labels provided above, mock group is used.
    MOCK_GID = 0
    df = pd.DataFrame(
        {
            "na": N_v_g_c[votes.A, MOCK_GID, :],
            "nd": N_v_g_c[votes.D, MOCK_GID, :],
            "ns": N_g_c[MOCK_GID, :],
            "pa": P_v_g_c[votes.A, MOCK_GID, :],
            "pd": P_v_g_c[votes.D, MOCK_GID, :],
            "pat": P_v_g_c_test[votes.A, MOCK_GID, :],
            "pdt": P_v_g_c_test[votes.D, MOCK_GID, :],
            # agree metric = pa * pat
            "am": P_v_g_c[votes.A, MOCK_GID, :] * P_v_g_c_test[votes.A, MOCK_GID, :],
            # disagree metric = pd * pdt
            "dm": P_v_g_c[votes.D, MOCK_GID, :] * P_v_g_c_test[votes.D, MOCK_GID, :],
        },
        index=vote_matrix.columns,
    )

    # Optional filtering for mod_out_statement_ids
    if mod_out_statement_ids:
        df = df[~df.index.isin(mod_out_statement_ids)]

    # Agree candidates: pa > threshold and significant
    agree_mask = (df["pa"] > prob_threshold) & df["pat"].apply(
        lambda x: is_significant(x, confidence)
    )
    agree_candidates = df[agree_mask].copy()
    agree_candidates["consensus_agree_rank"] = (
        (-agree_candidates["am"]).rank(method="dense").astype("Int64")
    )

    # Disagree candidates: pd > threshold and significant
    disagree_mask = (df["pd"] > prob_threshold) & df["pdt"].apply(
        lambda x: is_significant(x, confidence)
    )
    disagree_candidates = df[disagree_mask].copy()
    disagree_candidates["consensus_disagree_rank"] = (
        (-disagree_candidates["dm"]).rank(method="dense").astype("Int64")
    )

    # Merge ranks back into full df
    df["consensus_agree_rank"] = agree_candidates["consensus_agree_rank"]
    df["consensus_disagree_rank"] = disagree_candidates["consensus_disagree_rank"]

    # Select top N agree/disagree statements
    if agree_candidates.empty:
        top_agree = []
    else:
        top_agree = [
            # Drop the cons-for key from final output.
            {k: v for k, v in st.items() if k != "cons-for"}
            for st in agree_candidates.sort_values("consensus_agree_rank")
            .head(pick_max)
            .reset_index()
            .apply(format_comment_stats, axis=1)
        ]

    if disagree_candidates.empty:
        top_disagree = []
    else:
        top_disagree = [
            # Drop the cons-for key from final output.
            {k: v for k, v in st.items() if k != "cons-for"}
            for st in disagree_candidates.sort_values("consensus_disagree_rank")
            .head(pick_max)
            .reset_index()
            .apply(format_comment_stats, axis=1)
        ]

    return {
        "agree": top_agree,
        "disagree": top_disagree,
    }

reddwarf.utils.stats

reddwarf.utils.stats.select_representative_statements(grouped_stats_df, mod_out_statement_ids=[], pick_max=5, confidence=0.9)

Selects statistically representative statements from each group cluster.

This is expected to match the Polis outputs when all defaults are set.

Parameters:
  • grouped_stats_df (DataFrame) –

    MultiIndex Dataframe of statement statistics, indexed by group and statement.

  • mod_out_statement_ids (list[int], default: [] ) –

    A list of statements to ignore from selection algorithm

  • pick_max (int, default: 5 ) –

    Max number of statements selected per group

  • confidence (float, default: 0.9 ) –

    Percent confidence interval (in decimal), within which selected statements are deemed significant

Returns:
  • PolisRepness( PolisRepness ) –

    A dict object with lists of statements keyed to groups, matching Polis format.

Source code in reddwarf/utils/stats.py
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def select_representative_statements(
    grouped_stats_df: pd.DataFrame,
    mod_out_statement_ids: list[int] = [],
    pick_max: int = 5,
    confidence: float = 0.90,
) -> PolisRepness:
    """
    Selects statistically representative statements from each group cluster.

    This is expected to match the Polis outputs when all defaults are set.

    Args:
        grouped_stats_df (pd.DataFrame): MultiIndex Dataframe of statement statistics, indexed by group and statement.
        mod_out_statement_ids (list[int]): A list of statements to ignore from selection algorithm
        pick_max (int): Max number of statements selected per group
        confidence (float): Percent confidence interval (in decimal), within which selected statements are deemed significant

    Returns:
        PolisRepness: A dict object with lists of statements keyed to groups, matching Polis format.
    """
    repness = {}
    # TODO: Should this be done elsewhere? A column in MultiIndex dataframe?
    mod_out_mask = grouped_stats_df.index.get_level_values("statement_id").isin(
        mod_out_statement_ids
    )
    grouped_stats_df = grouped_stats_df[~mod_out_mask]  # type: ignore
    for gid, group_df in grouped_stats_df.groupby(level="group_id"):
        # Bring statement_id into regular column.
        group_df = group_df.reset_index()

        best_agree = None
        # Track the best-agree, to bring to top if exists.
        for _, row in group_df.iterrows():
            if beats_best_of_agrees(row, best_agree, confidence):
                best_agree = row

        sig_filter = lambda row: is_statement_significant(row, confidence)
        sufficient_statements_row_mask = group_df.apply(sig_filter, axis="columns")
        sufficient_statements = group_df[sufficient_statements_row_mask]

        # Track the best, even if doesn't meet sufficient minimum, to have at least one.
        best_overall = None
        if len(sufficient_statements) == 0:
            for _, row in group_df.iterrows():
                if beats_best_by_repness_test(row, best_overall):
                    best_overall = row
        else:
            # Finalize statements into output format.
            # TODO: Figure out how to finalize only at end in output. Change repness_metric?
            sufficient_statements = (
                pd.DataFrame(
                    [
                        format_comment_stats(row)
                        for _, row in sufficient_statements.iterrows()
                    ]
                )
                # Create a column to sort repnress, then remove.
                .assign(repness_metric=repness_metric)
                .sort_values(by="repness_metric", ascending=False)
                .drop(columns="repness_metric")
            )

        if best_agree is not None:
            best_agree = format_comment_stats(best_agree)
            best_agree.update({"n-agree": best_agree["n-success"], "best-agree": True})
            best_head = [best_agree]
        elif best_overall is not None:
            best_overall = format_comment_stats(best_overall)
            best_head = [best_overall]
        else:
            best_head = []

        selected = best_head
        selected = selected + [
            row.to_dict()
            for _, row in sufficient_statements.iterrows()
            if best_head
            # Skip any statements already in best_head
            and best_head[0]["tid"] != row["tid"]
        ]
        selected = selected[:pick_max]
        # Does the work of agrees-before-disagrees sort in polismath, since "a" before "d".
        selected = sorted(selected, key=lambda row: row["repful-for"])
        repness[gid] = selected

    return repness  # type:ignore

reddwarf.utils.stats.calculate_comment_statistics(vote_matrix, cluster_labels=None, pseudo_count=1)

Calculates comparative statement statistics across all votes and groups, using only efficient numpy operations.

Note: when no cluster_labels are supplied, we internally apply the group 0 to each row, and calculated values can be accessed in the first group index.

The representativeness metric is defined as: R_v(g,c) = P_v(g,c) / P_v(~g,c)

Where: - P_v(g,c) is probability of vote v on comment c in group g - P_v(~g,c) is probability of vote v on comment c in all groups except g

And: - N(g,c) is the total number of non-missing votes on comment c in group g - N_v(g,c) is the total number of vote v on comment c in group g

Parameters:
  • vote_matrix (VoteMatrix) –

    A raw vote_matrix

  • cluster_labels (Optional[list[int]], default: None ) –

    An optional list of cluster labels to determine groups.

Returns:
  • N_g_c( ndarray[int] ) –

    numpy matrix with counts of non-missing votes on comments/groups

  • N_v_g_c( ndarray[int] ) –

    numpy matrix with counts of vote types on comments/groups

  • P_v_g_c( ndarray[float] ) –

    numpy matrix with probabilities of vote types on comments/groups

  • R_v_g_c( ndarray[float] ) –

    numpy matrix with representativeness of vote types on comments/groups

  • P_v_g_c_test( ndarray[float] ) –

    test z-scores for probability of votes/comments/groups

  • R_v_g_c_test( ndarray[float] ) –

    test z-scores for representativeness of votes/comments/groups

  • C_v_c( ndarray[float] ) –

    group-aware consensus scores for each statement.

Source code in reddwarf/utils/stats.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def calculate_comment_statistics(
    vote_matrix: VoteMatrix,
    cluster_labels: Optional[list[int] | NDArray[np.integer]] = None,
    pseudo_count: int = 1,
) -> Tuple[
    np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray
]:
    """
    Calculates comparative statement statistics across all votes and groups, using only efficient numpy operations.

    Note: when no cluster_labels are supplied, we internally apply the group `0` to each row,
    and calculated values can be accessed in the first group index.

    The representativeness metric is defined as:
    R_v(g,c) = P_v(g,c) / P_v(~g,c)

    Where:
    - P_v(g,c) is probability of vote v on comment c in group g
    - P_v(~g,c) is probability of vote v on comment c in all groups except g

    And:
    - N(g,c) is the total number of non-missing votes on comment c in group g
    - N_v(g,c) is the total number of vote v on comment c in group g

    Args:
        vote_matrix (VoteMatrix): A raw vote_matrix
        cluster_labels (Optional[list[int]]): An optional list of cluster labels to determine groups.

    Returns:
        N_g_c (np.ndarray[int]): numpy matrix with counts of non-missing votes on comments/groups
        N_v_g_c (np.ndarray[int]): numpy matrix with counts of vote types on comments/groups
        P_v_g_c (np.ndarray[float]): numpy matrix with probabilities of vote types on comments/groups
        R_v_g_c (np.ndarray[float]): numpy matrix with representativeness of vote types on comments/groups
        P_v_g_c_test (np.ndarray[float]): test z-scores for probability of votes/comments/groups
        R_v_g_c_test (np.ndarray[float]): test z-scores for representativeness of votes/comments/groups
        C_v_c (np.ndarray[float]): group-aware consensus scores for each statement.
    """
    if cluster_labels is None:
        # Make a single group if no labels supplied.
        participant_count = len(vote_matrix.index)
        cluster_labels = [0] * participant_count

    # Get the vote matrix values
    X = vote_matrix.values

    group_count = len(set(cluster_labels))
    statement_ids = vote_matrix.columns

    # Set up all the variables to be populated.
    N_g_c = np.empty([group_count, len(statement_ids)], dtype="int32")
    N_v_g_c = np.empty(
        [len(votes.__dict__), group_count, len(statement_ids)], dtype="int32"
    )
    P_v_g_c = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    R_v_g_c = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    P_v_g_c_test = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    R_v_g_c_test = np.empty([len(votes.__dict__), group_count, len(statement_ids)])
    C_v_c = np.empty([len(votes.__dict__), len(statement_ids)])

    for gid in range(group_count):
        # Create mask for the participants in target group
        in_group_mask = np.asarray(cluster_labels) == gid
        X_in_group = X[in_group_mask]

        # Count any votes [-1, 0, 1] for all statements/features at once

        # NON-GROUP STATS

        # For in-group
        n_agree_in_group = N_v_g_c[votes.A, gid, :] = count_agree(X_in_group)  # na
        n_disagree_in_group = N_v_g_c[votes.D, gid, :] = count_disagree(
            X_in_group
        )  # nd
        n_votes_in_group = N_g_c[gid, :] = count_all_votes(X_in_group)  # ns

        # Calculate probabilities
        p_agree_in_group = P_v_g_c[votes.A, gid, :] = probability(
            n_agree_in_group, n_votes_in_group, pseudo_count
        )  # pa
        p_disagree_in_group = P_v_g_c[votes.D, gid, :] = probability(
            n_disagree_in_group, n_votes_in_group, pseudo_count
        )  # pd

        # Calculate probability test z-scores
        P_v_g_c_test[votes.A, gid, :] = one_prop_test(
            n_agree_in_group, n_votes_in_group
        )  # pat
        P_v_g_c_test[votes.D, gid, :] = one_prop_test(
            n_disagree_in_group, n_votes_in_group
        )  # pdt

        # GROUP COMPARISON STATS

        out_group_mask = ~in_group_mask
        X_out_group = X[out_group_mask]

        # For out-group
        n_agree_out_group = count_agree(X_out_group)
        n_disagree_out_group = count_disagree(X_out_group)
        n_votes_out_group = count_all_votes(X_out_group)

        # Calculate out-group probabilities
        p_agree_out_group = probability(
            n_agree_out_group, n_votes_out_group, pseudo_count
        )
        p_disagree_out_group = probability(
            n_disagree_out_group, n_votes_out_group, pseudo_count
        )

        # Calculate representativeness
        R_v_g_c[votes.A, gid, :] = p_agree_in_group / p_agree_out_group  # ra
        R_v_g_c[votes.D, gid, :] = p_disagree_in_group / p_disagree_out_group  # rd

        # Calculate representativeness test z-scores
        R_v_g_c_test[votes.A, gid, :] = two_prop_test(
            n_agree_in_group, n_agree_out_group, n_votes_in_group, n_votes_out_group
        )  # rat
        R_v_g_c_test[votes.D, gid, :] = two_prop_test(
            n_disagree_in_group,
            n_disagree_out_group,
            n_votes_in_group,
            n_votes_out_group,
        )  # rdt

    # Calculate group-aware consensus
    # For each statement, multiply probabilities across groups (aka the first axis=0)
    # Reference: https://github.com/compdemocracy/polis/blob/edge/math/src/polismath/math/conversation.clj#L615-L636
    C_v_c[votes.A, :] = P_v_g_c[votes.A, :, :].prod(axis=0)
    C_v_c[votes.D, :] = P_v_g_c[votes.D, :, :].prod(axis=0)

    return (
        N_g_c,  # ns
        N_v_g_c,  # na / nd
        P_v_g_c,  # pa / pd
        R_v_g_c,  # ra / rd
        P_v_g_c_test,  # pat / pdt
        R_v_g_c_test,  # rat / rdt
        C_v_c,  # gac
    )

reddwarf.utils.stats.calculate_comment_statistics_dataframes(vote_matrix, cluster_labels=None, pseudo_count=1)

Calculates comparative statement statistics across all votes and groups, generating dataframes.

This returns both group-specific statistics, and also overall stats (group-aware consensus).

Parameters:
  • vote_matrix (VoteMatrix) –

    The vote matrix where rows are voters, columns are statements, and values are votes (1 for agree, -1 for disagree, 0 for pass).

  • cluster_labels (ndarray, default: None ) –

    Array of cluster labels for each participant row in the vote matrix.

  • pseudo_count (int, default: 1 ) –

    Smoothing parameter to avoid division by zero. Default is 1.

Returns:
  • DataFrame

    pd.DataFrame: DataFrame (MultiIndex on group/statement) containing verbose statistics for each statement per group.

  • DataFrame

    pd.DataFrame: DataFrame containing group-aware consensus scores for each statement.

Source code in reddwarf/utils/stats.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def calculate_comment_statistics_dataframes(
    vote_matrix: VoteMatrix,
    cluster_labels: Optional[list[int] | NDArray[np.integer]] = None,
    pseudo_count: int = 1,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Calculates comparative statement statistics across all votes and groups, generating dataframes.

    This returns both group-specific statistics, and also overall stats (group-aware consensus).

    Args:
        vote_matrix (VoteMatrix): The vote matrix where rows are voters, columns are statements,
                                  and values are votes (1 for agree, -1 for disagree, 0 for pass).
        cluster_labels (np.ndarray): Array of cluster labels for each participant row in the vote matrix.
        pseudo_count (int): Smoothing parameter to avoid division by zero. Default is 1.

    Returns:
        pd.DataFrame: DataFrame (MultiIndex on group/statement) containing verbose statistics for each statement per group.
        pd.DataFrame: DataFrame containing group-aware consensus scores for each statement.
    """
    N_g_c, N_v_g_c, P_v_g_c, R_v_g_c, P_v_g_c_test, R_v_g_c_test, C_v_c = (
        calculate_comment_statistics(
            vote_matrix=vote_matrix,
            cluster_labels=cluster_labels,
            pseudo_count=pseudo_count,
        )
    )

    if cluster_labels is None:
        # Make a single group if no labels supplied.
        participant_count = len(vote_matrix.index)
        cluster_labels = [0] * participant_count

    group_count = len(set(cluster_labels))
    group_frames = []
    for group_id in range(group_count):
        group_df = pd.DataFrame(
            {
                "na": N_v_g_c[votes.A, group_id, :],  # number agree votes
                "nd": N_v_g_c[votes.D, group_id, :],  # number disagree votes
                "ns": N_g_c[group_id, :],  # number seen/total/non-missing votes
                "pa": P_v_g_c[votes.A, group_id, :],  # probability agree
                "pd": P_v_g_c[votes.D, group_id, :],  # probability disagree
                "pat": P_v_g_c_test[
                    votes.A, group_id, :
                ],  # probability agree test z-score
                "pdt": P_v_g_c_test[
                    votes.D, group_id, :
                ],  # probability disagree test z-score
                "ra": R_v_g_c[
                    votes.A, group_id, :
                ],  # repness of agree (representativeness)
                "rd": R_v_g_c[
                    votes.D, group_id, :
                ],  # repness of disagree (representativeness)
                "rat": R_v_g_c_test[
                    votes.A, group_id, :
                ],  # repress of agree test z-score
                "rdt": R_v_g_c_test[
                    votes.D, group_id, :
                ],  # repress of disagree test z-score
            },
            index=vote_matrix.columns,
        )
        group_df["group_id"] = group_id
        group_df["statement_id"] = vote_matrix.columns
        group_frames.append(group_df)
    # Create a MultiIndex dataframe
    grouped_stats_df = pd.concat(group_frames, ignore_index=True).set_index(
        ["group_id", "statement_id"]
    )

    group_aware_consensus_df = pd.DataFrame(
        {
            "group-aware-consensus": C_v_c[votes.A, :],
            "group-aware-consensus-agree": C_v_c[votes.A, :],
            "group-aware-consensus-disagree": C_v_c[votes.D, :]
        },
        index=vote_matrix.columns,
    )

    return grouped_stats_df, group_aware_consensus_df

reddwarf.utils

(These are in the process of being either moved or deprecated.)

reddwarf.utils.filter_votes(votes, cutoff=None)

Filters a list of votes.

If a cutoff is provided, votes are filtered based on either:

  • An int representing unix timestamp (ms), keeping only votes before or at that time.
    • Any int above 13_000_000_000 is considered a timestamp.
  • Any other positive or negative int is considered an index, reflecting where to trim the time-sorted vote list.
    • positive: filters in votes that many indices from start
    • negative: filters out votes that many indices from end
Parameters:
  • votes (List[Dict]) –

    An unsorted list of vote records, where each record is a dictionary containing:

    • "participant_id": The ID of the voter.
    • "statement_id": The ID of the statement being voted on.
    • "vote": The recorded vote value.
    • "modified": A unix timestamp object representing when the vote was made.
  • cutoff (int, default: None ) –

    A cutoff unix timestamp (ms) or index position in date-sorted votes list.

Returns:
  • votes( List[Dict] ) –

    An list of vote records, sorted by modified if index-based filtering occurred.

Source code in reddwarf/utils/matrix.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def filter_votes(
        votes: List[Dict],
        cutoff: Optional[int] = None,
) -> List[Dict]:
    """
    Filters a list of votes.

    If a `cutoff` is provided, votes are filtered based on either:

    - An `int` representing unix timestamp (ms), keeping only votes before or at that time.
        - Any int above 13_000_000_000 is considered a timestamp.
    - Any other positive or negative `int` is considered an index, reflecting where to trim the time-sorted vote list.
        - positive: filters in votes that many indices from start
        - negative: filters out votes that many indices from end

    Args:
        votes (List[Dict]): An unsorted list of vote records, where each record is a dictionary containing:

            - "participant_id": The ID of the voter.
            - "statement_id": The ID of the statement being voted on.
            - "vote": The recorded vote value.
            - "modified": A unix timestamp object representing when the vote was made.

        cutoff (int): A cutoff unix timestamp (ms) or index position in date-sorted votes list.

    Returns:
        votes (List[Dict]): An list of vote records, sorted by `modified` if index-based filtering occurred.
    """
    if cutoff:
        # TODO: Detect datetime object as arg instead.
        try:
            if cutoff > 1_300_000_000:
                cutoff_timestamp = cutoff
                votes = [v for v in votes if v['modified'] <= cutoff_timestamp]
            else:
                cutoff_index = cutoff
                votes = sorted(votes, key=lambda x: x["modified"])
                votes = votes[:cutoff_index]
        except KeyError as e:
            raise RedDwarfError("The `modified` key is missing from a vote object that must be sorted") from e

    return votes

reddwarf.utils.filter_matrix(vote_matrix, min_user_vote_threshold=7, active_statement_ids=[], keep_participant_ids=[], unvoted_filter_type='drop')

Generates a filtered vote matrix from a raw matrix and filter config.

Parameters:
  • vote_matrix (DataFrame) –

    The [raw] vote matrix.

  • min_user_vote_threshold (int, default: 7 ) –

    The number of votes a participant must make to avoid being filtered.

  • active_statement_ids (List[int], default: [] ) –

    The statement IDs that are not moderated out.

  • keep_participant_ids (List[int], default: [] ) –

    Preserve specific participants even if below threshold.

  • unvoted_filter_type (drop | zero, default: 'drop' ) –

    When a statement has no votes, it can't be imputed. This determined whether to drop the statement column, or set all the value to zero/pass. (Default: drop)

Returns:
  • filtered_vote_matrix( VoteMatrix ) –

    A vote matrix with the following filtered out:

    1. statements without any votes,
    2. statements that have been moderated out,
    3. participants below the vote count threshold,
    4. participants who have not been explicitly selected to circumvent above filtering.
Source code in reddwarf/utils/matrix.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def filter_matrix(
        vote_matrix: VoteMatrix,
        min_user_vote_threshold: int = 7,
        active_statement_ids: List[int] = [],
        keep_participant_ids: List[int] = [],
        unvoted_filter_type: Literal["drop", "zero"] = "drop",
) -> VoteMatrix:
    """
    Generates a filtered vote matrix from a raw matrix and filter config.

    Args:
        vote_matrix (pd.DataFrame): The [raw] vote matrix.
        min_user_vote_threshold (int): The number of votes a participant must make to avoid being filtered.
        active_statement_ids (List[int]): The statement IDs that are not moderated out.
        keep_participant_ids (List[int]): Preserve specific participants even if below threshold.
        unvoted_filter_type ("drop" | "zero"): When a statement has no votes, it can't be imputed. \
            This determined whether to drop the statement column, or set all the value to zero/pass. (Default: drop)

    Returns:
        filtered_vote_matrix (VoteMatrix): A vote matrix with the following filtered out:

            1. statements without any votes,
            2. statements that have been moderated out,
            3. participants below the vote count threshold,
            4. participants who have not been explicitly selected to circumvent above filtering.
    """
    # Filter out moderated statements.
    vote_matrix = vote_matrix.filter(active_statement_ids, axis='columns')
    # Filter out participants with less than 7 votes (keeping IDs we're forced to)
    # Ref: https://hyp.is/JbNMus5gEe-cQpfc6eVIlg/gwern.net/doc/sociology/2021-small.pdf
    participant_ids_in = get_clusterable_participant_ids(vote_matrix, min_user_vote_threshold)
    # Add in some specific participant IDs for Polismath edge-cases.
    # See: https://github.com/compdemocracy/polis/pull/1893#issuecomment-2654666421
    participant_ids_in = list(set(participant_ids_in + keep_participant_ids))
    vote_matrix = (vote_matrix
        .filter(participant_ids_in, axis='rows')
        # .filter() and .drop() lost the index name, so bring it back.
        .rename_axis("participant_id")
    )

    # This is otherwise the more efficient way, but we want to keep some participant IDs
    # to troubleshoot edge-cases in upsteam Polis math.
    # self.matrix = self.matrix.dropna(thresh=self.min_votes, axis='rows')

    unvoted_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)

    # TODO: What about statements with no votes? E.g., 53 in oprah. Filter out? zero?
    # Test this on a conversation where it will actually change statement count.
    if unvoted_filter_type == 'drop':
        vote_matrix = vote_matrix.drop(unvoted_statement_ids, axis='columns')
    elif unvoted_filter_type == 'zero':
        vote_matrix[unvoted_statement_ids] = 0

    return vote_matrix

reddwarf.utils.get_unvoted_statement_ids(vote_matrix)

A method intended to be piped into a VoteMatrix DataFrame, returning list of unvoted statement IDs.

See: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html

Parameters:
  • vote_matrix (DataFrame) –

    A pivot of statements (cols), participants (rows), with votes as values.

Returns:
  • unvoted_statement_ids( List[int] ) –

    list of statement IDs with no votes.

Example:

unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
Source code in reddwarf/utils/matrix.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_unvoted_statement_ids(vote_matrix: VoteMatrix) -> List[int]:
    """
    A method intended to be piped into a VoteMatrix DataFrame, returning list of unvoted statement IDs.

    See: <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html>

    Args:
        vote_matrix (pd.DataFrame): A pivot of statements (cols), participants (rows), with votes as values.

    Returns:
        unvoted_statement_ids (List[int]): list of statement IDs with no votes.

    Example:

        unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
    """
    null_column_mask = vote_matrix.isnull().all()
    null_column_ids = vote_matrix.columns[null_column_mask].tolist()

    return null_column_ids

reddwarf.data_loader

reddwarf.data_loader.Loader

A comprehensive data loader for Polis conversation data.

The Loader class provides a unified interface for loading Polis conversation data from multiple sources including API endpoints, CSV exports, and local JSON files. It handles data validation, caching, rate limiting, and export functionality.

The class automatically determines the appropriate loading strategy based on the provided parameters and can export data in both JSON and CSV formats compatible with the Polis platform.

Parameters:
  • polis_instance_url (str, default: None ) –

    Base URL of the Polis instance. Defaults to "https://pol.is".

  • filepaths (list[str], default: [] ) –

    List of local file paths to load data from. Defaults to [].

  • polis_id (str, default: None ) –

    Generic Polis identifier (report ID starting with 'r' or conversation ID).

  • conversation_id (str, default: None ) –

    Specific conversation ID for API requests.

  • report_id (str, default: None ) –

    Specific report ID for API requests or CSV exports.

  • is_cache_enabled (bool, default: True ) –

    Enable HTTP request caching. Defaults to True.

  • output_dir (str, default: None ) –

    Directory path for exporting loaded data. If provided, data is automatically exported.

  • data_source (str, default: 'api' ) –

    Data source type ("api" or "csv_export"). Defaults to "api".

  • directory_url (str, default: None ) –

    Direct URL to CSV export directory. If provided, forces csv_export mode.

Attributes:
  • votes_data (list[dict]) –

    Loaded vote data with participant_id, statement_id, vote, and modified fields.

  • comments_data (list[dict]) –

    Loaded statement/comment data with text, metadata, and statistics.

  • math_data (dict) –

    Mathematical analysis data including PCA projections and group clusters.

  • conversation_data (dict) –

    Conversation metadata including topic, description, and settings.

  • report_data (dict) –

    Report metadata when loaded via report_id.

  • skipped_dup_votes (list[dict]) –

    Duplicate votes that were filtered out during processing.

Examples:

Load from API using conversation ID:

>>> loader = Loader(conversation_id="12345")

Load from CSV export using report ID:

>>> loader = Loader(report_id="r67890", data_source="csv_export")

Load from local files:

>>> loader = Loader(filepaths=["votes.json", "comments.json", "math-pca2.json"])

Load and export to directory:

>>> loader = Loader(conversation_id="12345", output_dir="./exported_data")
Source code in reddwarf/data_loader.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
class Loader:
    """
    A comprehensive data loader for Polis conversation data.

    The Loader class provides a unified interface for loading Polis conversation data
    from multiple sources including API endpoints, CSV exports, and local JSON files.
    It handles data validation, caching, rate limiting, and export functionality.

    The class automatically determines the appropriate loading strategy based on the
    provided parameters and can export data in both JSON and CSV formats compatible
    with the Polis platform.

    Args:
        polis_instance_url (str, optional): Base URL of the Polis instance. Defaults to "https://pol.is".
        filepaths (list[str], optional): List of local file paths to load data from. Defaults to [].
        polis_id (str, optional): Generic Polis identifier (report ID starting with 'r' or conversation ID).
        conversation_id (str, optional): Specific conversation ID for API requests.
        report_id (str, optional): Specific report ID for API requests or CSV exports.
        is_cache_enabled (bool, optional): Enable HTTP request caching. Defaults to True.
        output_dir (str, optional): Directory path for exporting loaded data. If provided, data is automatically exported.
        data_source (str, optional): Data source type ("api" or "csv_export"). Defaults to "api".
        directory_url (str, optional): Direct URL to CSV export directory. If provided, forces csv_export mode.

    Attributes:
        votes_data (list[dict]): Loaded vote data with participant_id, statement_id, vote, and modified fields.
        comments_data (list[dict]): Loaded statement/comment data with text, metadata, and statistics.
        math_data (dict): Mathematical analysis data including PCA projections and group clusters.
        conversation_data (dict): Conversation metadata including topic, description, and settings.
        report_data (dict): Report metadata when loaded via report_id.
        skipped_dup_votes (list[dict]): Duplicate votes that were filtered out during processing.

    Examples:
        Load from API using conversation ID:
        >>> loader = Loader(conversation_id="12345")

        Load from CSV export using report ID:
        >>> loader = Loader(report_id="r67890", data_source="csv_export")

        Load from local files:
        >>> loader = Loader(filepaths=["votes.json", "comments.json", "math-pca2.json"])

        Load and export to directory:
        >>> loader = Loader(conversation_id="12345", output_dir="./exported_data")
    """

    def __init__(
        self,
        polis_instance_url=None,
        filepaths=[],
        polis_id=None,
        conversation_id=None,
        report_id=None,
        is_cache_enabled=True,
        output_dir=None,
        data_source="api",
        directory_url=None,
    ):
        self.polis_instance_url = polis_instance_url or "https://pol.is"
        self.polis_id = report_id or conversation_id or polis_id
        self.conversation_id = conversation_id
        self.report_id = report_id
        self.is_cache_enabled = is_cache_enabled
        self.output_dir = output_dir
        self.data_source = data_source
        self.filepaths = filepaths
        self.directory_url = directory_url

        self.votes_data = []
        self.comments_data = []
        self.math_data = {}
        self.conversation_data = {}
        self.report_data = {}
        self.skipped_dup_votes = []

        if self.filepaths:
            self.load_file_data()
        elif (
            self.conversation_id
            or self.report_id
            or self.polis_id
            or self.directory_url
        ):
            self.populate_polis_ids()
            self.init_http_client()
            if self.directory_url:
                self.data_source = "csv_export"

            if self.data_source == "csv_export":
                self.load_remote_export_data()
            elif self.data_source == "api":
                self.load_api_data()
            else:
                raise ValueError("Unknown data_source: {}".format(self.data_source))

        if self.output_dir:
            self.dump_data(self.output_dir)

    def populate_polis_ids(self):
        """
        Normalize and populate Polis ID fields from the provided identifiers.

        This method handles the logic for determining conversation_id and report_id
        from the generic polis_id parameter. (Report IDs start with 'r', while
        conversation IDs start with a number.)
        """
        if self.polis_id:
            # If polis_id set, set report or conversation ID.
            if self.polis_id[0] == "r":
                self.report_id = self.polis_id
            elif self.polis_id[0].isdigit():
                self.conversation_id = self.polis_id
        else:
            # If not set, write it from what's provided.
            self.polis_id = self.report_id or self.conversation_id

    # Deprecated.
    def dump_data(self, output_dir):
        """
        Export loaded data to JSON files in the specified directory.

        Args:
            output_dir (str): Directory path where JSON files will be written.

        Note:
            This method is deprecated. Use export_data() instead.
        """
        self.export_data(output_dir, format="json")

    def export_data(self, output_dir, format="csv"):
        """
        Export loaded data to files in the specified format.

        Args:
            output_dir (str): Directory path where files will be written.
            format (str): Export format, either "json" or "csv". Defaults to "csv".

        The CSV format exports multiple files compatible with Polis platform:
        - votes.csv: Individual vote records
        - comments.csv: Statement/comment data with metadata
        - comment-groups.csv: Group-specific voting statistics per statement
        - participant-votes.csv: Participant voting patterns and group assignments
        - summary.csv: Conversation summary statistics
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        if format == "json":
            self._export_data_json(output_dir)
        elif format == "csv":
            self._export_data_csv(output_dir)

    def _export_data_json(self, output_dir):
        if self.votes_data:
            with open(output_dir + "/votes.json", "w") as f:
                f.write(json.dumps(self.votes_data, indent=4))

        if self.comments_data:
            with open(output_dir + "/comments.json", "w") as f:
                f.write(json.dumps(self.comments_data, indent=4))

        if self.math_data:
            with open(output_dir + "/math-pca2.json", "w") as f:
                f.write(json.dumps(self.math_data, indent=4))

        if self.conversation_data:
            with open(output_dir + "/conversation.json", "w") as f:
                f.write(json.dumps(self.conversation_data, indent=4))

    def _export_data_csv(self, output_dir):
        self._write_polis_votes(output_dir)
        self._write_polis_comments(output_dir)
        self._write_polis_comment_groups(output_dir)
        self._write_polis_participant_votes(output_dir)
        self._write_polis_summary(output_dir)

    def _write_polis_votes(self, output_dir):
        """
        POLIS format:
            timestamp,datetime,comment-id,voter-id,vote
        """
        if not self.votes_data:
            return

        sorted_votes_data = sorted(
            self.votes_data, key=lambda x: (x["statement_id"], x["participant_id"])
        )
        with open(output_dir + "/votes.csv", "w") as f:
            writer = csv.writer(f)
            headers = ["timestamp", "datetime", "comment-id", "voter-id", "vote"]
            writer.writerow(headers)
            for entry in sorted_votes_data:
                ts, dt_str = self._format_polis_times(entry["modified"])
                row = [
                    ts,
                    dt_str,
                    entry["statement_id"],
                    entry["participant_id"],
                    entry["vote"],
                ]
                writer.writerow(row)

    def _write_polis_comments(self, output_dir):
        """
        POLIS format:
            timestamp,datetime,comment-id,author-id,agrees,disagrees,moderated,comment-body
        """
        if not self.comments_data:
            return

        with open(output_dir + "/comments.csv", "w") as f:
            headers = [
                "timestamp",
                "datetime",
                "comment-id",
                "author-id",
                "agrees",
                "disagrees",
                "moderated",
                "comment-body",
            ]
            f.write(",".join(headers) + "\n")
            # Sort comments_data by 'created' timestamp before writing
            sorted_comments = sorted(
                self.comments_data,
                key=lambda x: (x["statement_id"], x["participant_id"]),
            )
            for entry in sorted_comments:
                ts, dt_str = self._format_polis_times(entry["created"])
                single_quote = '"'
                double_quote = '""'
                row = [
                    ts,
                    dt_str,
                    entry["statement_id"],
                    entry["participant_id"],
                    entry["agree_count"],
                    entry["disagree_count"],
                    entry["moderated"],
                    f'"{str(entry["txt"]).replace(single_quote, double_quote)}"',
                ]
                f.write(",".join([str(item) for item in row]) + "\n")

    def _format_polis_times(self, time):
        """Convert timestamp or ISO string to Polis datetime format."""
        from dateutil import parser

        try:
            if isinstance(time, (int, float)):
                # Handle timestamps
                timestamp = int(str(time)[:10]) if time > 10**10 else time
                date_obj = datetime.fromtimestamp(timestamp, tz=timezone.utc)
            else:
                # Handle string inputs in ISO format
                date_obj = parser.parse(time)
                if date_obj.tzinfo is None:
                    date_obj = date_obj.replace(tzinfo=timezone.utc)

            date_obj = date_obj.astimezone(timezone.utc)
            dt_str = date_obj.strftime(
                "%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)"
            )

            return int(date_obj.timestamp()), dt_str
        except (ValueError, OSError) as error:
            raise ValueError(f"Timestamp is not in a recognizable format: {error}")

    def _write_polis_comment_groups(self, output_dir):
        """
        POLIS format:
            comment-id,comment,total-votes,total-agrees,total-disagrees,total-passes,group-a-votes,group-a-agrees,group-a-disagrees,group-a-passes,group-[next alphabetic identifier (b)]-votes,[repeat 'votes/agrees/disagrees/passes' with alphabetic identifier...]

        Each row represents a comment with total votes & votes by group
        """
        if not self.comments_data or not self.math_data:
            return

        group_votes = self.math_data.get("group-votes", {})
        group_clusters = self.math_data.get("group-clusters", [])
        group_ids = [group["id"] for group in group_clusters]
        # Map group indices to letters: 0 -> 'a', 1 -> 'b', etc.
        group_letters = [chr(ord("a") + i) for i in range(len(group_ids))]

        with open(output_dir + "/comment-groups.csv", "w") as f:
            # Build header dynamically based on available groups
            header = [
                "comment-id",
                "comment",
                "total-votes",
                "total-agrees",
                "total-disagrees",
                "total-passes",
            ]
            for i, group in enumerate(group_clusters):
                if i < len(group_letters):
                    group_letter = group_letters[i]
                    header.extend(
                        [
                            f"group-{group_letter}-votes",
                            f"group-{group_letter}-agrees",
                            f"group-{group_letter}-disagrees",
                            f"group-{group_letter}-passes",
                        ]
                    )
            f.write(",".join(header))
            f.write("\n")
            rows = []
            sorted_comments_data = sorted(
                self.comments_data, key=lambda x: x["statement_id"]
            )
            for comment in sorted_comments_data:
                comment_id = str(comment["statement_id"])
                row = [
                    comment_id,
                    comment["txt"]
                    if comment["txt"][0] == '"'
                    else '"' + comment["txt"] + '"',
                    comment["count"],
                    comment["agree_count"],
                    comment["disagree_count"],
                    comment["pass_count"],
                ]

                # Add group-specific data
                for i, group in enumerate(group_clusters):
                    if i < len(group_letters):
                        group_id = str(group["id"])
                        if (
                            group_id in group_votes
                            and comment_id in group_votes[group_id]["votes"]
                        ):
                            vote_data = group_votes[group_id]["votes"][comment_id]
                            total_votes = (
                                vote_data["A"] + vote_data["D"] + vote_data["S"]
                            )
                            row.extend(
                                [
                                    total_votes,
                                    vote_data["A"],  # agrees
                                    vote_data["D"],  # disagrees
                                    vote_data["S"],  # passes (skips)
                                ]
                            )
                        else:
                            # No votes from this group for this comment
                            row.extend([0, 0, 0, 0])
                rows.append(row)
                f.write(",".join([str(item) for item in row]) + "\n")

    def _write_polis_participant_votes(self, output_dir):
        """
        POLIS format:
            participant,group-id,n-comments,n-votes,n-agree,n-disagree,0,1,2,3,...

        Each row represents a participant with:
        - participant: participant ID
        - group-id: which group they belong to (if any)
        - n-comments: number of comments they made
        - n-votes: total number of votes they cast
        - n-agree: number of agree votes
        - n-disagree: number of disagree votes
        - 0,1,2,3...: their vote on each comment (1=agree, -1=disagree, 0=pass, empty=no vote)
        """
        if not self.votes_data:
            return

        # Get all unique participant IDs and statement IDs
        participant_ids = set()
        statement_ids = set()
        for vote in self.votes_data:
            participant_ids.add(vote["participant_id"])
            statement_ids.add(vote["statement_id"])

        # Sort to ensure consistent order
        sorted_participant_ids = sorted(participant_ids)
        sorted_statement_ids = sorted(statement_ids)

        # Build participant vote matrix
        participant_votes = {}
        for vote in self.votes_data:
            pid = vote["participant_id"]
            sid = vote["statement_id"]
            if pid not in participant_votes:
                participant_votes[pid] = {}
            participant_votes[pid][sid] = vote["vote"]

        # Get participant group assignments from math data
        participant_groups = {}
        if self.math_data and "group-clusters" in self.math_data:
            for group in self.math_data["group-clusters"]:
                group_id = group["id"]
                for member in group["members"]:
                    participant_groups[member] = group_id

        # Count comments per participant
        participant_comment_counts = {}
        if self.comments_data:
            for comment in self.comments_data:
                pid = comment["participant_id"]
                participant_comment_counts[pid] = (
                    participant_comment_counts.get(pid, 0) + 1
                )

        with open(output_dir + "/participant-votes.csv", "w") as f:
            # Build header
            header = [
                "participant",
                "group-id",
                "n-comments",
                "n-votes",
                "n-agree",
                "n-disagree",
            ]
            header.extend([str(sid) for sid in sorted_statement_ids])
            f.write(",".join(header) + "\n")

            # Write participant data
            for pid in sorted_participant_ids:
                participant_vote_data = participant_votes.get(pid, {})

                # Count votes
                n_votes = len(participant_vote_data)
                n_agree = sum(1 for v in participant_vote_data.values() if v == 1)
                n_disagree = sum(1 for v in participant_vote_data.values() if v == -1)

                # Get group assignment
                group_id = participant_groups.get(pid, "")

                # Get comment count
                n_comments = participant_comment_counts.get(pid, 0)

                row = [pid, group_id, n_comments, n_votes, n_agree, n_disagree]

                # Add vote for each statement
                for sid in sorted_statement_ids:
                    vote = participant_vote_data.get(sid, "")
                    row.append(vote)

                f.write(",".join([str(item) for item in row]) + "\n")

    def _write_polis_summary(self, output_dir):
        """
        POLIS format:
            topic,[string]
            url,http://pol.is/[report_id]
            voters,[num]
            voters-in-conv,[num]
            commenters,[num]
            comments,[num]
            groups,[num]
            conversation-description,[string]
        """
        if not self.conversation_data:
            return

        # Calculate summary statistics
        total_voters = (
            len(set(vote["participant_id"] for vote in self.votes_data))
            if self.votes_data
            else 0
        )
        total_commenters = (
            len(set(comment["participant_id"] for comment in self.comments_data))
            if self.comments_data
            else 0
        )
        total_comments = len(self.comments_data) if self.comments_data else 0
        total_groups = (
            len(self.math_data.get("group-clusters", [])) if self.math_data else 0
        )

        # Get conversation details
        topic = self.conversation_data.get("topic", "")
        description = self.conversation_data.get("description", "")
        if description:
            description = (
                description.replace("\n", "\\n")
                .replace("\r", "\\r")
                .replace("\t", "\\t")
            )

        # Build URL
        url = (
            f"{self.polis_instance_url}/{self.conversation_id}"
            if self.conversation_id
            else self.polis_id
            if self.polis_id
            else self.report_id
        )

        with open(output_dir + "/summary.csv", "w") as f:
            f.write(f'topic,"{topic}"\n')
            f.write(f"url,{url}\n")
            f.write(f"voters,{total_voters}\n")
            f.write(f"voters-in-conv,{total_voters}\n")
            f.write(f"commenters,{total_commenters}\n")
            f.write(f"comments,{total_comments}\n")
            f.write(f"groups,{total_groups}\n")
            f.write(f'conversation-description,"{description}"\n')

    def init_http_client(self):
        """
        Initialize HTTP session with rate limiting, caching, and Cloudflare bypass.

        Sets up a requests session with:
        - Rate limiting (5 requests per second)
        - Optional SQLite-based response caching (1 hour expiration)
        - Cloudflare bypass adapter for the Polis instance
        - Random user agent headers
        """
        # Throttle requests, but disable when response is already cached.
        if self.is_cache_enabled:
            # Source: https://github.com/JWCook/requests-ratelimiter/tree/main?tab=readme-ov-file#custom-session-example-requests-cache
            self.session = CachedLimiterSession(
                per_second=5,
                expire_after=timedelta(hours=1),
                cache_name="test_cache.sqlite",
                bucket_class=SQLiteBucket,
                bucket_kwargs={
                    "path": "test_cache.sqlite",
                    "isolation_level": "EXCLUSIVE",
                    "check_same_thread": False,
                },
            )
        else:
            self.session = LimiterSession(per_second=5)
        adapter = CloudflareBypassHTTPAdapter()
        self.session.mount(self.polis_instance_url, adapter)
        self.session.headers = {
            "User-Agent": ua.random,
        }

    def get_polis_export_directory_url(self, report_id):
        """
        Generate the CSV export directory URL for a given report ID.

        Args:
            report_id (str): The report ID (typically starts with 'r').

        Returns:
            str: Full URL to the CSV export directory endpoint.
        """
        return f"{self.polis_instance_url}/api/v3/reportExport/{report_id}/"

    def _is_statement_meta_field_missing(self):
        if self.comments_data:
            return self.comments_data[0]["is_meta"] is None
        else:
            # No statements loaded, so can't say.
            return False

    def load_remote_export_data(self):
        """
        Load data from remote CSV export endpoints.

        Downloads and processes CSV files from Polis export directory, including:
        - comments.csv: Statement data
        - votes.csv: Vote records

        Handles missing is_meta field by falling back to API data when necessary.
        Automatically filters duplicate votes, keeping the most recent.

        Raises:
            ValueError: If CSV export URL cannot be determined or API fallback fails.
        """
        if self.directory_url:
            directory_url = self.directory_url
        elif self.report_id:
            directory_url = self.get_polis_export_directory_url(self.report_id)
        else:
            raise ValueError(
                "Cannot determine CSV export URL without report_id or directory_url"
            )

        self.load_remote_export_data_comments(directory_url)
        self.load_remote_export_data_votes(directory_url)

        # Supplement is_meta statement field via API if missing.
        # See: https://github.com/polis-community/red-dwarf/issues/55
        if self._is_statement_meta_field_missing():
            import warnings

            warnings.warn(
                "CSV import is missing is_meta field. Attempting to load comments data from API instead..."
            )
            try:
                if self.report_id and not self.conversation_id:
                    self.load_api_data_report()
                    self.conversation_id = self.report_data["conversation_id"]
                self.load_api_data_comments()
            except Exception:
                raise ValueError(
                    " ".join(
                        [
                            "Due to an upstream bug, we must patch CSV exports using the API,",
                            "so conversation_id or report_id is required.",
                            "See: https://github.com/polis-community/red-dwarf/issues/56",
                        ]
                    )
                )

        # When multiple votes (same tid and pid), keep only most recent (vs first).
        self.filter_duplicate_votes(keep="recent")
        # self.load_remote_export_data_summary()
        # self.load_remote_export_data_participant_votes()
        # self.load_remote_export_data_comment_groups()

    def load_remote_export_data_comments(self, directory_url):
        """
        Load statement/comment data from remote CSV export.

        Args:
            directory_url (str): Base URL of the CSV export directory.
        """
        r = self.session.get(directory_url + "comments.csv")
        comments_csv = r.text
        reader = csv.DictReader(StringIO(comments_csv))
        self.comments_data = [
            Statement(**c).model_dump(mode="json") for c in list(reader)
        ]

    def load_remote_export_data_votes(self, directory_url):
        """
        Load vote data from remote CSV export.

        Args:
            directory_url (str): Base URL of the CSV export directory.
        """
        r = self.session.get(directory_url + "votes.csv")
        votes_csv = r.text
        reader = csv.DictReader(StringIO(votes_csv))
        self.votes_data = [
            Vote(**vote).model_dump(mode="json") for vote in list(reader)
        ]

    def filter_duplicate_votes(self, keep="recent"):
        """
        Remove duplicate votes from the same participant on the same statement.

        Args:
            keep (str): Which vote to keep when duplicates found.
                       "recent" keeps the most recent vote, "first" keeps the earliest.

        The filtered duplicate votes are stored in self.skipped_dup_votes for reference.

        Raises:
            ValueError: If keep parameter is not "recent" or "first".
        """
        if keep not in {"recent", "first"}:
            raise ValueError("Invalid value for 'keep'. Use 'recent' or 'first'.")

        # Sort by modified time (descending for "recent", ascending for "first")
        if keep == "recent":
            reverse_sort = True
        else:
            reverse_sort = False
        sorted_votes = sorted(
            self.votes_data, key=lambda x: x["modified"], reverse=reverse_sort
        )

        filtered_dict = {}
        for v in sorted_votes:
            key = (v["participant_id"], v["statement_id"])
            if key not in filtered_dict:
                filtered_dict[key] = v
            else:
                # Append skipped votes
                self.skipped_dup_votes.append(v)

        self.votes_data = list(filtered_dict.values())

    def load_remote_export_data_summary(self):
        # r = self.session.get(self.polis_instance_url + "/api/v3/reportExport/{}/summary.csv".format(self.report_id))
        # summary_csv = r.text
        # print(summary_csv)
        raise NotImplementedError

    def load_remote_export_data_participant_votes(self):
        # r = self.session.get(self.polis_instance_url + "/api/v3/reportExport/{}/participant-votes.csv".format(self.report_id))
        # participant_votes_csv = r.text
        # print(participant_votes_csv)
        raise NotImplementedError

    def load_remote_export_data_comment_groups(self):
        # r = self.session.get(self.polis_instance_url + "/api/v3/reportExport/{}/comment-groups.csv".format(self.report_id))
        # comment_groups_csv = r.text
        # print(comment_groups_csv)
        raise NotImplementedError

    def load_file_data(self):
        """
        Load data from local JSON files specified in self.filepaths.

        Automatically detects file types based on filename patterns:
        - votes.json: Vote records
        - comments.json: Statement/comment data
        - conversation.json: Conversation metadata
        - math-pca2.json: Mathematical analysis results

        Raises:
            ValueError: If a file type cannot be determined from its name.
        """
        for f in self.filepaths:
            if f.endswith("votes.json"):
                self.load_file_data_votes(file=f)
            elif f.endswith("comments.json"):
                self.load_file_data_comments(file=f)
            elif f.endswith("conversation.json"):
                self.load_file_data_conversation(file=f)
            elif f.endswith("math-pca2.json"):
                self.load_file_data_math(file=f)
            else:
                raise ValueError("Unknown file type")

    def load_file_data_votes(self, file=None):
        """
        Load vote data from a local JSON file.

        Args:
            file (str): Path to the votes JSON file.
        """
        with open(file) as f:
            votes_data = json.load(f)

        votes_data = [Vote(**vote).model_dump(mode="json") for vote in votes_data]
        self.votes_data = votes_data

    def load_file_data_comments(self, file=None):
        """
        Load statement/comment data from a local JSON file.

        Args:
            file (str): Path to the comments JSON file.
        """
        with open(file) as f:
            comments_data = json.load(f)

        comments_data = [Statement(**c).model_dump(mode="json") for c in comments_data]
        self.comments_data = comments_data

    def load_file_data_conversation(self, file=None):
        """
        Load conversation metadata from a local JSON file.

        Args:
            file (str): Path to the conversation JSON file.
        """
        with open(file) as f:
            convo_data = json.load(f)

        self.conversation_data = convo_data

    def load_file_data_math(self, file=None):
        """
        Load mathematical analysis data from a local JSON file.

        Args:
            file (str): Path to the math-pca2 JSON file.
        """
        with open(file) as f:
            math_data = json.load(f)

        self.math_data = math_data

    def load_api_data(self):
        """
        Load complete dataset from Polis API endpoints.

        Loads data in the following order:
        1. Report data (if report_id provided) to get conversation_id
        2. Conversation metadata
        3. Comments/statements data
        4. Mathematical analysis data (PCA, clustering)
        5. Individual participant votes (up to participant count from math data)

        Automatically handles vote sign correction for API data and resolves
        any conflicts between report_id and conversation_id parameters.

        Raises:
            ValueError: If report_id conflicts with conversation_id.
        """
        if self.report_id:
            self.load_api_data_report()
            convo_id_from_report_id = self.report_data["conversation_id"]
            if self.conversation_id and (
                self.conversation_id != convo_id_from_report_id
            ):
                raise ValueError("report_id conflicts with conversation_id")
            self.conversation_id = convo_id_from_report_id

        self.load_api_data_conversation()
        self.load_api_data_comments()
        self.load_api_data_math()
        # TODO: Add a way to do this without math data, for example
        # by checking until 5 empty responses in a row.
        # This is the best place to check though, as `voters`
        # in summary.csv omits some participants.
        participant_count = self.math_data["n"]
        # DANGER: This is potentially an issue that throws everything off by missing some participants.
        self.load_api_data_votes(last_participant_id=participant_count)

    def load_api_data_report(self):
        """
        Load report metadata from the Polis API.

        Uses the report_id to fetch report information and extract the associated
        conversation_id for subsequent API calls.
        """
        params = {
            "report_id": self.report_id,
        }
        r = self.session.get(self.polis_instance_url + "/api/v3/reports", params=params)
        reports = json.loads(r.text)
        self.report_data = reports[0]

    def load_api_data_conversation(self):
        """
        Load conversation metadata from the Polis API.

        Fetches conversation details including topic, description, and settings
        using the conversation_id.
        """
        params = {
            "conversation_id": self.conversation_id,
        }
        r = self.session.get(
            self.polis_instance_url + "/api/v3/conversations", params=params
        )
        convo = json.loads(r.text)
        self.conversation_data = convo

    def load_api_data_math(self):
        """
        Load mathematical analysis data from the Polis API.

        Fetches PCA projections, clustering results, and group statistics
        from the math/pca2 endpoint.
        """
        params = {
            "conversation_id": self.conversation_id,
        }
        r = self.session.get(
            self.polis_instance_url + "/api/v3/math/pca2", params=params
        )
        math = json.loads(r.text)
        self.math_data = math

    def load_api_data_comments(self):
        """
        Load statement/comment data from the Polis API.

        Fetches all statements with moderation status and voting patterns
        included in the response.
        """
        params = {
            "conversation_id": self.conversation_id,
            "moderation": "true",
            "include_voting_patterns": "true",
        }
        r = self.session.get(
            self.polis_instance_url + "/api/v3/comments", params=params
        )
        comments = json.loads(r.text)
        comments = [Statement(**c).model_dump(mode="json") for c in comments]
        self.comments_data = comments

    def fix_participant_vote_sign(self):
        """
        Correct vote sign inversion in API data.

        The Polis API returns votes with inverted signs compared to the expected
        format (e.g., agree votes come as -1 instead of 1). This method fixes
        the inversion by negating all vote values.
        """
        """For data coming from the API, vote signs are inverted (e.g., agree is -1)"""
        for item in self.votes_data:
            item["vote"] = -item["vote"]

    def load_api_data_votes(self, last_participant_id=None):
        """
        Load individual participant votes from the Polis API.

        Args:
            last_participant_id (int): Maximum participant ID to fetch votes for.
                                     Typically obtained from math data participant count.

        Iterates through all participant IDs from 0 to last_participant_id and
        fetches their vote records. Automatically applies vote sign correction.
        """
        for pid in range(0, last_participant_id + 1):
            params = {
                "pid": pid,
                "conversation_id": self.conversation_id,
            }
            r = self.session.get(
                self.polis_instance_url + "/api/v3/votes", params=params
            )
            participant_votes = json.loads(r.text)
            participant_votes = [
                Vote(**vote).model_dump(mode="json") for vote in participant_votes
            ]
            self.votes_data.extend(participant_votes)

        self.fix_participant_vote_sign()

    def fetch_pid(self, xid):
        """
        Fetch internal participant ID (pid) for a given external ID (xid).

        Args:
            xid (str): External participant identifier.

        Returns:
            int: Internal participant ID used by Polis system.
        """
        params = {
            "pid": "mypid",
            "xid": xid,
            "conversation_id": self.conversation_id,
        }
        r = self.session.get(
            self.polis_instance_url + "/api/v3/participationInit", params=params
        )
        data = json.loads(r.text)

        return data["ptpt"]["pid"]

    def fetch_xid_to_pid_mappings(self, xids=[]):
        """
        Create mapping dictionary from external IDs to internal participant IDs.

        Args:
            xids (list[str]): List of external participant identifiers.

        Returns:
            dict: Mapping of external IDs to internal participant IDs.
        """
        mappings = {}
        for xid in xids:
            pid = self.fetch_pid(xid)
            mappings[xid] = pid

        return mappings

dump_data(output_dir)

Export loaded data to JSON files in the specified directory.

Parameters:
  • output_dir (str) –

    Directory path where JSON files will be written.

Note

This method is deprecated. Use export_data() instead.

Source code in reddwarf/data_loader.py
131
132
133
134
135
136
137
138
139
140
141
def dump_data(self, output_dir):
    """
    Export loaded data to JSON files in the specified directory.

    Args:
        output_dir (str): Directory path where JSON files will be written.

    Note:
        This method is deprecated. Use export_data() instead.
    """
    self.export_data(output_dir, format="json")

export_data(output_dir, format='csv')

Export loaded data to files in the specified format.

Parameters:
  • output_dir (str) –

    Directory path where files will be written.

  • format (str, default: 'csv' ) –

    Export format, either "json" or "csv". Defaults to "csv".

The CSV format exports multiple files compatible with Polis platform: - votes.csv: Individual vote records - comments.csv: Statement/comment data with metadata - comment-groups.csv: Group-specific voting statistics per statement - participant-votes.csv: Participant voting patterns and group assignments - summary.csv: Conversation summary statistics

Source code in reddwarf/data_loader.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def export_data(self, output_dir, format="csv"):
    """
    Export loaded data to files in the specified format.

    Args:
        output_dir (str): Directory path where files will be written.
        format (str): Export format, either "json" or "csv". Defaults to "csv".

    The CSV format exports multiple files compatible with Polis platform:
    - votes.csv: Individual vote records
    - comments.csv: Statement/comment data with metadata
    - comment-groups.csv: Group-specific voting statistics per statement
    - participant-votes.csv: Participant voting patterns and group assignments
    - summary.csv: Conversation summary statistics
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    if format == "json":
        self._export_data_json(output_dir)
    elif format == "csv":
        self._export_data_csv(output_dir)

fetch_pid(xid)

Fetch internal participant ID (pid) for a given external ID (xid).

Parameters:
  • xid (str) –

    External participant identifier.

Returns:
  • int

    Internal participant ID used by Polis system.

Source code in reddwarf/data_loader.py
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
def fetch_pid(self, xid):
    """
    Fetch internal participant ID (pid) for a given external ID (xid).

    Args:
        xid (str): External participant identifier.

    Returns:
        int: Internal participant ID used by Polis system.
    """
    params = {
        "pid": "mypid",
        "xid": xid,
        "conversation_id": self.conversation_id,
    }
    r = self.session.get(
        self.polis_instance_url + "/api/v3/participationInit", params=params
    )
    data = json.loads(r.text)

    return data["ptpt"]["pid"]

fetch_xid_to_pid_mappings(xids=[])

Create mapping dictionary from external IDs to internal participant IDs.

Parameters:
  • xids (list[str], default: [] ) –

    List of external participant identifiers.

Returns:
  • dict

    Mapping of external IDs to internal participant IDs.

Source code in reddwarf/data_loader.py
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
def fetch_xid_to_pid_mappings(self, xids=[]):
    """
    Create mapping dictionary from external IDs to internal participant IDs.

    Args:
        xids (list[str]): List of external participant identifiers.

    Returns:
        dict: Mapping of external IDs to internal participant IDs.
    """
    mappings = {}
    for xid in xids:
        pid = self.fetch_pid(xid)
        mappings[xid] = pid

    return mappings

filter_duplicate_votes(keep='recent')

Remove duplicate votes from the same participant on the same statement.

Parameters:
  • keep (str, default: 'recent' ) –

    Which vote to keep when duplicates found. "recent" keeps the most recent vote, "first" keeps the earliest.

The filtered duplicate votes are stored in self.skipped_dup_votes for reference.

Raises:
  • ValueError

    If keep parameter is not "recent" or "first".

Source code in reddwarf/data_loader.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def filter_duplicate_votes(self, keep="recent"):
    """
    Remove duplicate votes from the same participant on the same statement.

    Args:
        keep (str): Which vote to keep when duplicates found.
                   "recent" keeps the most recent vote, "first" keeps the earliest.

    The filtered duplicate votes are stored in self.skipped_dup_votes for reference.

    Raises:
        ValueError: If keep parameter is not "recent" or "first".
    """
    if keep not in {"recent", "first"}:
        raise ValueError("Invalid value for 'keep'. Use 'recent' or 'first'.")

    # Sort by modified time (descending for "recent", ascending for "first")
    if keep == "recent":
        reverse_sort = True
    else:
        reverse_sort = False
    sorted_votes = sorted(
        self.votes_data, key=lambda x: x["modified"], reverse=reverse_sort
    )

    filtered_dict = {}
    for v in sorted_votes:
        key = (v["participant_id"], v["statement_id"])
        if key not in filtered_dict:
            filtered_dict[key] = v
        else:
            # Append skipped votes
            self.skipped_dup_votes.append(v)

    self.votes_data = list(filtered_dict.values())

fix_participant_vote_sign()

Correct vote sign inversion in API data.

The Polis API returns votes with inverted signs compared to the expected format (e.g., agree votes come as -1 instead of 1). This method fixes the inversion by negating all vote values.

Source code in reddwarf/data_loader.py
880
881
882
883
884
885
886
887
888
889
890
def fix_participant_vote_sign(self):
    """
    Correct vote sign inversion in API data.

    The Polis API returns votes with inverted signs compared to the expected
    format (e.g., agree votes come as -1 instead of 1). This method fixes
    the inversion by negating all vote values.
    """
    """For data coming from the API, vote signs are inverted (e.g., agree is -1)"""
    for item in self.votes_data:
        item["vote"] = -item["vote"]

get_polis_export_directory_url(report_id)

Generate the CSV export directory URL for a given report ID.

Parameters:
  • report_id (str) –

    The report ID (typically starts with 'r').

Returns:
  • str

    Full URL to the CSV export directory endpoint.

Source code in reddwarf/data_loader.py
546
547
548
549
550
551
552
553
554
555
556
def get_polis_export_directory_url(self, report_id):
    """
    Generate the CSV export directory URL for a given report ID.

    Args:
        report_id (str): The report ID (typically starts with 'r').

    Returns:
        str: Full URL to the CSV export directory endpoint.
    """
    return f"{self.polis_instance_url}/api/v3/reportExport/{report_id}/"

init_http_client()

Initialize HTTP session with rate limiting, caching, and Cloudflare bypass.

Sets up a requests session with: - Rate limiting (5 requests per second) - Optional SQLite-based response caching (1 hour expiration) - Cloudflare bypass adapter for the Polis instance - Random user agent headers

Source code in reddwarf/data_loader.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def init_http_client(self):
    """
    Initialize HTTP session with rate limiting, caching, and Cloudflare bypass.

    Sets up a requests session with:
    - Rate limiting (5 requests per second)
    - Optional SQLite-based response caching (1 hour expiration)
    - Cloudflare bypass adapter for the Polis instance
    - Random user agent headers
    """
    # Throttle requests, but disable when response is already cached.
    if self.is_cache_enabled:
        # Source: https://github.com/JWCook/requests-ratelimiter/tree/main?tab=readme-ov-file#custom-session-example-requests-cache
        self.session = CachedLimiterSession(
            per_second=5,
            expire_after=timedelta(hours=1),
            cache_name="test_cache.sqlite",
            bucket_class=SQLiteBucket,
            bucket_kwargs={
                "path": "test_cache.sqlite",
                "isolation_level": "EXCLUSIVE",
                "check_same_thread": False,
            },
        )
    else:
        self.session = LimiterSession(per_second=5)
    adapter = CloudflareBypassHTTPAdapter()
    self.session.mount(self.polis_instance_url, adapter)
    self.session.headers = {
        "User-Agent": ua.random,
    }

load_api_data()

Load complete dataset from Polis API endpoints.

Loads data in the following order: 1. Report data (if report_id provided) to get conversation_id 2. Conversation metadata 3. Comments/statements data 4. Mathematical analysis data (PCA, clustering) 5. Individual participant votes (up to participant count from math data)

Automatically handles vote sign correction for API data and resolves any conflicts between report_id and conversation_id parameters.

Raises:
  • ValueError

    If report_id conflicts with conversation_id.

Source code in reddwarf/data_loader.py
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
def load_api_data(self):
    """
    Load complete dataset from Polis API endpoints.

    Loads data in the following order:
    1. Report data (if report_id provided) to get conversation_id
    2. Conversation metadata
    3. Comments/statements data
    4. Mathematical analysis data (PCA, clustering)
    5. Individual participant votes (up to participant count from math data)

    Automatically handles vote sign correction for API data and resolves
    any conflicts between report_id and conversation_id parameters.

    Raises:
        ValueError: If report_id conflicts with conversation_id.
    """
    if self.report_id:
        self.load_api_data_report()
        convo_id_from_report_id = self.report_data["conversation_id"]
        if self.conversation_id and (
            self.conversation_id != convo_id_from_report_id
        ):
            raise ValueError("report_id conflicts with conversation_id")
        self.conversation_id = convo_id_from_report_id

    self.load_api_data_conversation()
    self.load_api_data_comments()
    self.load_api_data_math()
    # TODO: Add a way to do this without math data, for example
    # by checking until 5 empty responses in a row.
    # This is the best place to check though, as `voters`
    # in summary.csv omits some participants.
    participant_count = self.math_data["n"]
    # DANGER: This is potentially an issue that throws everything off by missing some participants.
    self.load_api_data_votes(last_participant_id=participant_count)

load_api_data_comments()

Load statement/comment data from the Polis API.

Fetches all statements with moderation status and voting patterns included in the response.

Source code in reddwarf/data_loader.py
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
def load_api_data_comments(self):
    """
    Load statement/comment data from the Polis API.

    Fetches all statements with moderation status and voting patterns
    included in the response.
    """
    params = {
        "conversation_id": self.conversation_id,
        "moderation": "true",
        "include_voting_patterns": "true",
    }
    r = self.session.get(
        self.polis_instance_url + "/api/v3/comments", params=params
    )
    comments = json.loads(r.text)
    comments = [Statement(**c).model_dump(mode="json") for c in comments]
    self.comments_data = comments

load_api_data_conversation()

Load conversation metadata from the Polis API.

Fetches conversation details including topic, description, and settings using the conversation_id.

Source code in reddwarf/data_loader.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
def load_api_data_conversation(self):
    """
    Load conversation metadata from the Polis API.

    Fetches conversation details including topic, description, and settings
    using the conversation_id.
    """
    params = {
        "conversation_id": self.conversation_id,
    }
    r = self.session.get(
        self.polis_instance_url + "/api/v3/conversations", params=params
    )
    convo = json.loads(r.text)
    self.conversation_data = convo

load_api_data_math()

Load mathematical analysis data from the Polis API.

Fetches PCA projections, clustering results, and group statistics from the math/pca2 endpoint.

Source code in reddwarf/data_loader.py
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
def load_api_data_math(self):
    """
    Load mathematical analysis data from the Polis API.

    Fetches PCA projections, clustering results, and group statistics
    from the math/pca2 endpoint.
    """
    params = {
        "conversation_id": self.conversation_id,
    }
    r = self.session.get(
        self.polis_instance_url + "/api/v3/math/pca2", params=params
    )
    math = json.loads(r.text)
    self.math_data = math

load_api_data_report()

Load report metadata from the Polis API.

Uses the report_id to fetch report information and extract the associated conversation_id for subsequent API calls.

Source code in reddwarf/data_loader.py
815
816
817
818
819
820
821
822
823
824
825
826
827
def load_api_data_report(self):
    """
    Load report metadata from the Polis API.

    Uses the report_id to fetch report information and extract the associated
    conversation_id for subsequent API calls.
    """
    params = {
        "report_id": self.report_id,
    }
    r = self.session.get(self.polis_instance_url + "/api/v3/reports", params=params)
    reports = json.loads(r.text)
    self.report_data = reports[0]

load_api_data_votes(last_participant_id=None)

Load individual participant votes from the Polis API.

Parameters:
  • last_participant_id (int, default: None ) –

    Maximum participant ID to fetch votes for. Typically obtained from math data participant count.

Iterates through all participant IDs from 0 to last_participant_id and fetches their vote records. Automatically applies vote sign correction.

Source code in reddwarf/data_loader.py
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
def load_api_data_votes(self, last_participant_id=None):
    """
    Load individual participant votes from the Polis API.

    Args:
        last_participant_id (int): Maximum participant ID to fetch votes for.
                                 Typically obtained from math data participant count.

    Iterates through all participant IDs from 0 to last_participant_id and
    fetches their vote records. Automatically applies vote sign correction.
    """
    for pid in range(0, last_participant_id + 1):
        params = {
            "pid": pid,
            "conversation_id": self.conversation_id,
        }
        r = self.session.get(
            self.polis_instance_url + "/api/v3/votes", params=params
        )
        participant_votes = json.loads(r.text)
        participant_votes = [
            Vote(**vote).model_dump(mode="json") for vote in participant_votes
        ]
        self.votes_data.extend(participant_votes)

    self.fix_participant_vote_sign()

load_file_data()

Load data from local JSON files specified in self.filepaths.

Automatically detects file types based on filename patterns: - votes.json: Vote records - comments.json: Statement/comment data - conversation.json: Conversation metadata - math-pca2.json: Mathematical analysis results

Raises:
  • ValueError

    If a file type cannot be determined from its name.

Source code in reddwarf/data_loader.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def load_file_data(self):
    """
    Load data from local JSON files specified in self.filepaths.

    Automatically detects file types based on filename patterns:
    - votes.json: Vote records
    - comments.json: Statement/comment data
    - conversation.json: Conversation metadata
    - math-pca2.json: Mathematical analysis results

    Raises:
        ValueError: If a file type cannot be determined from its name.
    """
    for f in self.filepaths:
        if f.endswith("votes.json"):
            self.load_file_data_votes(file=f)
        elif f.endswith("comments.json"):
            self.load_file_data_comments(file=f)
        elif f.endswith("conversation.json"):
            self.load_file_data_conversation(file=f)
        elif f.endswith("math-pca2.json"):
            self.load_file_data_math(file=f)
        else:
            raise ValueError("Unknown file type")

load_file_data_comments(file=None)

Load statement/comment data from a local JSON file.

Parameters:
  • file (str, default: None ) –

    Path to the comments JSON file.

Source code in reddwarf/data_loader.py
741
742
743
744
745
746
747
748
749
750
751
752
def load_file_data_comments(self, file=None):
    """
    Load statement/comment data from a local JSON file.

    Args:
        file (str): Path to the comments JSON file.
    """
    with open(file) as f:
        comments_data = json.load(f)

    comments_data = [Statement(**c).model_dump(mode="json") for c in comments_data]
    self.comments_data = comments_data

load_file_data_conversation(file=None)

Load conversation metadata from a local JSON file.

Parameters:
  • file (str, default: None ) –

    Path to the conversation JSON file.

Source code in reddwarf/data_loader.py
754
755
756
757
758
759
760
761
762
763
764
def load_file_data_conversation(self, file=None):
    """
    Load conversation metadata from a local JSON file.

    Args:
        file (str): Path to the conversation JSON file.
    """
    with open(file) as f:
        convo_data = json.load(f)

    self.conversation_data = convo_data

load_file_data_math(file=None)

Load mathematical analysis data from a local JSON file.

Parameters:
  • file (str, default: None ) –

    Path to the math-pca2 JSON file.

Source code in reddwarf/data_loader.py
766
767
768
769
770
771
772
773
774
775
776
def load_file_data_math(self, file=None):
    """
    Load mathematical analysis data from a local JSON file.

    Args:
        file (str): Path to the math-pca2 JSON file.
    """
    with open(file) as f:
        math_data = json.load(f)

    self.math_data = math_data

load_file_data_votes(file=None)

Load vote data from a local JSON file.

Parameters:
  • file (str, default: None ) –

    Path to the votes JSON file.

Source code in reddwarf/data_loader.py
728
729
730
731
732
733
734
735
736
737
738
739
def load_file_data_votes(self, file=None):
    """
    Load vote data from a local JSON file.

    Args:
        file (str): Path to the votes JSON file.
    """
    with open(file) as f:
        votes_data = json.load(f)

    votes_data = [Vote(**vote).model_dump(mode="json") for vote in votes_data]
    self.votes_data = votes_data

load_remote_export_data()

Load data from remote CSV export endpoints.

Downloads and processes CSV files from Polis export directory, including: - comments.csv: Statement data - votes.csv: Vote records

Handles missing is_meta field by falling back to API data when necessary. Automatically filters duplicate votes, keeping the most recent.

Raises:
  • ValueError

    If CSV export URL cannot be determined or API fallback fails.

Source code in reddwarf/data_loader.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def load_remote_export_data(self):
    """
    Load data from remote CSV export endpoints.

    Downloads and processes CSV files from Polis export directory, including:
    - comments.csv: Statement data
    - votes.csv: Vote records

    Handles missing is_meta field by falling back to API data when necessary.
    Automatically filters duplicate votes, keeping the most recent.

    Raises:
        ValueError: If CSV export URL cannot be determined or API fallback fails.
    """
    if self.directory_url:
        directory_url = self.directory_url
    elif self.report_id:
        directory_url = self.get_polis_export_directory_url(self.report_id)
    else:
        raise ValueError(
            "Cannot determine CSV export URL without report_id or directory_url"
        )

    self.load_remote_export_data_comments(directory_url)
    self.load_remote_export_data_votes(directory_url)

    # Supplement is_meta statement field via API if missing.
    # See: https://github.com/polis-community/red-dwarf/issues/55
    if self._is_statement_meta_field_missing():
        import warnings

        warnings.warn(
            "CSV import is missing is_meta field. Attempting to load comments data from API instead..."
        )
        try:
            if self.report_id and not self.conversation_id:
                self.load_api_data_report()
                self.conversation_id = self.report_data["conversation_id"]
            self.load_api_data_comments()
        except Exception:
            raise ValueError(
                " ".join(
                    [
                        "Due to an upstream bug, we must patch CSV exports using the API,",
                        "so conversation_id or report_id is required.",
                        "See: https://github.com/polis-community/red-dwarf/issues/56",
                    ]
                )
            )

    # When multiple votes (same tid and pid), keep only most recent (vs first).
    self.filter_duplicate_votes(keep="recent")

load_remote_export_data_comments(directory_url)

Load statement/comment data from remote CSV export.

Parameters:
  • directory_url (str) –

    Base URL of the CSV export directory.

Source code in reddwarf/data_loader.py
621
622
623
624
625
626
627
628
629
630
631
632
633
def load_remote_export_data_comments(self, directory_url):
    """
    Load statement/comment data from remote CSV export.

    Args:
        directory_url (str): Base URL of the CSV export directory.
    """
    r = self.session.get(directory_url + "comments.csv")
    comments_csv = r.text
    reader = csv.DictReader(StringIO(comments_csv))
    self.comments_data = [
        Statement(**c).model_dump(mode="json") for c in list(reader)
    ]

load_remote_export_data_votes(directory_url)

Load vote data from remote CSV export.

Parameters:
  • directory_url (str) –

    Base URL of the CSV export directory.

Source code in reddwarf/data_loader.py
635
636
637
638
639
640
641
642
643
644
645
646
647
def load_remote_export_data_votes(self, directory_url):
    """
    Load vote data from remote CSV export.

    Args:
        directory_url (str): Base URL of the CSV export directory.
    """
    r = self.session.get(directory_url + "votes.csv")
    votes_csv = r.text
    reader = csv.DictReader(StringIO(votes_csv))
    self.votes_data = [
        Vote(**vote).model_dump(mode="json") for vote in list(reader)
    ]

populate_polis_ids()

Normalize and populate Polis ID fields from the provided identifiers.

This method handles the logic for determining conversation_id and report_id from the generic polis_id parameter. (Report IDs start with 'r', while conversation IDs start with a number.)

Source code in reddwarf/data_loader.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def populate_polis_ids(self):
    """
    Normalize and populate Polis ID fields from the provided identifiers.

    This method handles the logic for determining conversation_id and report_id
    from the generic polis_id parameter. (Report IDs start with 'r', while
    conversation IDs start with a number.)
    """
    if self.polis_id:
        # If polis_id set, set report or conversation ID.
        if self.polis_id[0] == "r":
            self.report_id = self.polis_id
        elif self.polis_id[0].isdigit():
            self.conversation_id = self.polis_id
    else:
        # If not set, write it from what's provided.
        self.polis_id = self.report_id or self.conversation_id

reddwarf.data_presenter

reddwarf.data_presenter.generate_figure(coord_data, coord_labels=None, cluster_labels=None, flip_x=False, flip_y=False)

Generates a matplotlib scatterplot with optional bounded clusters.

The plot is drawn from a dataframe of xy values, each point labelled by index participant_id. When a list of cluster labels are supplied (corresponding to each row), concave hulls are drawn around them.

Signs of PCA projection coordinates are arbitrary, and can flip without meaning. Inverting axes can help compare results with Polis platform visualizations.

Parameters:
  • coord_data (DataFrame) –

    A dataframe of coordinates with columns named x and y, indexed by participant_id.

  • cluster_labels (List[int], default: None ) –

    A list of group labels, one for each row in coord_dataframe.

  • flip_x (bool, default: False ) –

    Flip the presentation of the X-axis so it descends left-to-right

  • flip_y (bool, default: False ) –

    Flip the presentation of the Y-axis so it descends top-to-bottom

Returns:
  • None

    None.

Source code in reddwarf/data_presenter.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def generate_figure(
    coord_data,
    coord_labels=None,
    cluster_labels: Optional[List[int]] = None,
    flip_x: bool = False,
    flip_y: bool = False,
) -> None:
    """
    Generates a matplotlib scatterplot with optional bounded clusters.

    The plot is drawn from a dataframe of xy values, each point labelled by index `participant_id`.
    When a list of cluster labels are supplied (corresponding to each row), concave hulls are drawn around them.

    Signs of PCA projection coordinates are arbitrary, and can flip without
    meaning. Inverting axes can help compare results with Polis platform
    visualizations.

    Args:
        coord_data (pd.DataFrame): A dataframe of coordinates with columns named `x` and `y`, indexed by `participant_id`.
        cluster_labels (List[int]): A list of group labels, one for each row in `coord_dataframe`.
        flip_x (bool): Flip the presentation of the X-axis so it descends left-to-right
        flip_y (bool): Flip the presentation of the Y-axis so it descends top-to-bottom

    Returns:
        None.
    """
    plt.figure(figsize=(7, 5), dpi=80)
    plt.axhline(y=0, color="k", linestyle="-", linewidth=0.5)
    plt.axvline(x=0, color="k", linestyle="-", linewidth=0.5)

    if flip_x:
        plt.gca().invert_xaxis()
    if flip_y:
        plt.gca().invert_yaxis()

    # Label points when coordinate labels are provided.
    if coord_labels is not None:
        for label, xy in zip(coord_labels, coord_data):
            plt.annotate(
                str(label),
                (float(xy[0]), float(xy[1])),
                xytext=(2, 2),
                color="gray",
                textcoords="offset points",
            )

    scatter_kwargs = defaultdict()
    scatter_kwargs["x"] = coord_data[:, 0]
    scatter_kwargs["y"] = coord_data[:, 1]
    scatter_kwargs["s"] = 10  # point size
    scatter_kwargs["alpha"] = 0.8  # point transparency

    # Wrap clusters in hulls when cluster labels are provided.
    if cluster_labels is not None:
        # Ref: https://matplotlib.org/stable/users/explain/colors/colormaps.html#qualitative
        scatter_kwargs["cmap"] = "Set1"  # color map

        # Pad cluster_labels to match the number of points
        CLUSTER_CENTER_LABEL = -2
        if len(cluster_labels) < len(coord_data):
            pad_length = len(coord_data) - len(cluster_labels)
            cluster_labels = np.concatenate(
                [cluster_labels, [CLUSTER_CENTER_LABEL] * pad_length]
            )

        scatter_kwargs["c"] = cluster_labels  # color indexes

        print("Calculating convex hulls around clusters...")
        # Subset to allow unlabelled points to just be plotted
        unique_labels = np.unique(cluster_labels)
        for label in unique_labels:
            if label in (-1, -2):
                continue  # skip hulls when special-case labels used

            label_mask = cluster_labels == label
            cluster_points = coord_data[label_mask]

            print(f"Hull {label}, bounding {len(cluster_points)} points")

            if len(cluster_points) < 3:
                # TODO: Accomodate 2 points like Polis platform does.
                print("Cannot create concave hull for less than 3 points. Skipping...")
                continue

            hull_point_indices = concave_hull.concave_hull_indexes(cluster_points, concavity=4.0)
            hull_points = cluster_points[hull_point_indices]

            polygon = patches.Polygon(
                hull_points,
                fill=True,
                color="gray",
                alpha=0.3,
                edgecolor=None,
            )
            plt.gca().add_patch(polygon)

    scatter = plt.scatter(**scatter_kwargs)

    # Add a legend if labels are provided
    if cluster_labels is not None:
        unique_labels = np.unique(cluster_labels)
        cbar = plt.colorbar(scatter, label="Cluster", ticks=unique_labels)

        tick_labels = []
        for lbl in unique_labels:
            if lbl == -1:
                tick_labels.append("[Unclustered]")
            elif lbl == -2:
                tick_labels.append("[Center Guess]")
            else:
                tick_labels.append(GROUP_LABEL_NAMES[lbl])
        cbar.ax.set_yticklabels(tick_labels)

    plt.show()

    return None

reddwarf.data_presenter.generate_figure_polis(result, show_guesses=False, flip_x=True, flip_y=False, show_pids=True)

Generate a Polis-style visualization from clustering results.

Parameters:
  • result (PolisClusteringResult) –

    The result object from run_pipeline

  • show_guesses (bool, default: False ) –

    Show the initial cluster center guesses on the plot

  • flip_x (bool, default: True ) –

    Flip the X-axis (default True to match Polis interface)

  • flip_y (bool, default: False ) –

    Flip the Y-axis (default False)

  • show_pids (bool, default: True ) –

    Show the participant IDs on the plot

Source code in reddwarf/data_presenter.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def generate_figure_polis(
    result: PolisClusteringResult,
    show_guesses=False,
    flip_x=True,
    flip_y=False,
    show_pids=True,
):
    """
    Generate a Polis-style visualization from clustering results.

    Args:
        result (PolisClusteringResult): The result object from run_pipeline
        show_guesses (bool): Show the initial cluster center guesses on the plot
        flip_x (bool): Flip the X-axis (default True to match Polis interface)
        flip_y (bool): Flip the Y-axis (default False)
        show_pids (bool): Show the participant IDs on the plot
    """
    participants_clustered_df = result.participants_df[
        result.participants_df["cluster_id"].notnull()
    ]
    cluster_labels = participants_clustered_df["cluster_id"].values

    coord_data = participants_clustered_df.loc[:, ["x", "y"]].values
    coord_labels = None
    # Add the init center guesses to the bottom of the coord stack. Internally, they
    # will be give a fake "-1" colored label that won't be used to draw clusters.
    # This is for illustration purpose to see the centroid guesses.
    if show_guesses:
        coord_data = np.vstack(
            [
                coord_data,
                np.asarray(
                    result.clusterer.init_centers_used_ if result.clusterer else []
                ),
            ]
        )

    if show_pids:
        coord_labels = [f"p{pid}" for pid in participants_clustered_df.index]

    generate_figure(
        coord_data=coord_data,
        coord_labels=coord_labels,
        cluster_labels=cluster_labels,
        # Always needs flipping to look like Polis interface.
        flip_x=flip_x,
        # Sometimes needs flipping to look like Polis interface.
        flip_y=flip_y,
    )

Types

reddwarf.implementations.base.PolisClusteringResult dataclass

Attributes:
  • raw_vote_matrix (DataFrame) –

    Raw sparse vote matrix before any processing.

  • filtered_vote_matrix (DataFrame) –

    Raw sparse vote matrix with moderated statements zero'd out.

  • reducer (ReducerModel) –

    scikit-learn reducer model fitted to vote matrix.

  • clusterer (ClustererModel) –

    scikit-learn clusterer model, fitted to participant projections. (includes labels_)

  • group_comment_stats (DataFrame) –

    A multi-index dataframes for each statement, indexed by group ID and statement.

  • statements_df (DataFrame) –

    A dataframe with all intermediary and final statement data/calculations/metadata.

  • participants_df (DataFrame) –

    A dataframe with all intermediary and final participant data/calculations/metadata.

  • participant_projections (dict) –

    A dict of participant projected coordinates, keyed to participant ID.

  • statement_projections (Optional[dict]) –

    A dict of statement projected coordinates, keyed to statement ID.

  • group_aware_consensus (dict) –

    A nested dict of statement group-aware-consensus values, keyed first by agree/disagree, then participant ID.

  • consensus (ConsensusResult) –

    A dict of the most statistically significant statements for each of agree/disagree.

  • repness (PolisRepness) –

    A dict of the most statistically significant statements most representative of each group.

Source code in reddwarf/implementations/base.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@dataclass
class PolisClusteringResult:
    """
    Attributes:
        raw_vote_matrix (DataFrame): Raw sparse vote matrix before any processing.
        filtered_vote_matrix (DataFrame): Raw sparse vote matrix with moderated statements zero'd out.
        reducer (ReducerModel): scikit-learn reducer model fitted to vote matrix.
        clusterer (ClustererModel): scikit-learn clusterer model, fitted to participant projections. (includes `labels_`)
        group_comment_stats (DataFrame): A multi-index dataframes for each statement, indexed by group ID and statement.
        statements_df (DataFrame): A dataframe with all intermediary and final statement data/calculations/metadata.
        participants_df (DataFrame): A dataframe with all intermediary and final participant data/calculations/metadata.
        participant_projections (dict): A dict of participant projected coordinates, keyed to participant ID.
        statement_projections (Optional[dict]): A dict of statement projected coordinates, keyed to statement ID.
        group_aware_consensus (dict): A nested dict of statement group-aware-consensus values, keyed first by agree/disagree, then participant ID.
        consensus (ConsensusResult): A dict of the most statistically significant statements for each of agree/disagree.
        repness (PolisRepness): A dict of the most statistically significant statements most representative of each group.
    """

    raw_vote_matrix: DataFrame
    filtered_vote_matrix: DataFrame
    reducer: ReducerModel
    # TODO: Figure out how to guarantee PolisKMeans model returned.
    clusterer: ClustererModel | None
    group_comment_stats: DataFrame
    statements_df: DataFrame
    participants_df: DataFrame
    participant_projections: dict
    statement_projections: Optional[dict]
    group_aware_consensus: dict
    consensus: ConsensusResult
    repness: PolisRepness