API Reference
def generate_raw_matrix(
votes: List[Dict],
cutoff: Optional[int] = None,
) -> VoteMatrix:
Generates a raw vote matrix from a list of vote records.
If a `cutoff` is provided, votes are filtered based on either:
- An `int` representing unix timestamp (ms), keeping only votes before or at that time.
- Any int above 13_000_000_000 is considered a timestamp.
- Any other positive or negative `int` is considered an index, reflecting where to trim the time-sorted vote list.
- positive: filters in votes that many indices from start
- negative: filters out votes that many indices from end
votes (List[Dict]): A date-sorted list of vote records, where each record is a dictionary containing:
- "participant_id": The ID of the voter.
- "statement_id": The ID of the statement being voted on.
- "vote": The recorded vote value.
- "modified": A unix timestamp object representing when the vote was made.
cutoff (int): A cutoff unix timestamp (ms) or index position in date-sorted votes list.
raw_matrix (pd.DataFrame): A full raw vote matrix DataFrame with NaN values where:
1. rows are voters,
2. columns are statements, and
3. values are votes.
This includes even voters that have no votes, and statements on which no votes were placed.
if cutoff:
# TODO: Add tests to confirm votes list is already date-sorted for each data_source.
# TODO: Detect datetime object as arg instead.
if cutoff > 1_300_000_000:
cutoff_timestamp = cutoff
votes = [v for v in votes if v['modified'] <= cutoff_timestamp]
cutoff_index = cutoff
votes = votes[:cutoff_index]
votes = votes
raw_matrix = pd.DataFrame.from_dict(votes)
raw_matrix = raw_matrix.pivot(
participant_count = raw_matrix.index.max() + 1
comment_count = raw_matrix.columns.max() + 1
raw_matrix = raw_matrix.reindex(
return raw_matrix
Generates a filtered vote matrix from a raw matrix and filter config.
def generate_filtered_matrix(
vote_matrix: VoteMatrix,
min_user_vote_threshold: int = 7,
active_statement_ids: List[int] = [],
keep_participant_ids: List[int] = [],
unvoted_filter_type: Literal["drop", "zero"] = "drop",
) -> VoteMatrix:
Generates a filtered vote matrix from a raw matrix and filter config.
vote_matrix (pd.DataFrame): The [raw] vote matrix.
min_user_vote_threshold (int): The number of votes a participant must make to avoid being filtered.
active_statement_ids (List[int]): The statement IDs that are not moderated out.
keep_participant_ids (List[int]): Preserve specific participants even if below threshold.
unvoted_filter_type ("drop" | "zero"): When a statement has no votes, it can't be imputed. \
This determined whether to drop the statement column, or set all the value to zero/pass. (Default: drop)
filtered_vote_matrix (VoteMatrix): A vote matrix with the following filtered out:
1. statements without any votes,
2. statements that have been moderated out,
4. participants below the vote count threshold,
5. participants who have not been explicitly selected to circumvent above filtering.
# Filter out moderated statements.
vote_matrix = vote_matrix.filter(active_statement_ids, axis='columns')
# Filter out participants with less than 7 votes (keeping IDs we're forced to)
# Ref: https://hyp.is/JbNMus5gEe-cQpfc6eVIlg/gwern.net/doc/sociology/2021-small.pdf
participant_ids_meeting_vote_thresh = vote_matrix[vote_matrix.count(axis="columns") >= min_user_vote_threshold].index.to_list()
# Add in some specific participant IDs for Polismath edge-cases.
# See: https://github.com/compdemocracy/polis/pull/1893#issuecomment-2654666421
participant_ids_in = participant_ids_meeting_vote_thresh + keep_participant_ids
participant_ids_in_unique = list(set(participant_ids_in))
vote_matrix = vote_matrix.filter(participant_ids_in_unique, axis='rows')
# This is otherwise the more efficient way, but we want to keep some participant IDs
# to troubleshoot edge-cases in upsteam Polis math.
# self.matrix = self.matrix.dropna(thresh=self.min_votes, axis='rows')
unvoted_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
# TODO: What about statements with no votes? E.g., 53 in oprah. Filter out? zero?
# Test this on a conversation where it will actually change statement count.
if unvoted_filter_type == 'drop':
vote_matrix = vote_matrix.drop(unvoted_statement_ids, axis='columns')
elif unvoted_filter_type == 'zero':
vote_matrix[unvoted_statement_ids] = 0
return vote_matrix
Small, C. (2021). "Polis: Scaling Deliberation by Mapping High Dimensional Opinion Spaces." Specific highlight: https://hyp.is/8zUyWM5fEe-uIO-J34vbkg/gwern.net/doc/sociology/2021-small.pdf
def impute_missing_votes(vote_matrix: VoteMatrix) -> VoteMatrix:
Imputes missing votes in a voting matrix using column-wise mean.
Small, C. (2021). "Polis: Scaling Deliberation by Mapping High Dimensional Opinion Spaces."
Specific highlight: <https://hyp.is/8zUyWM5fEe-uIO-J34vbkg/gwern.net/doc/sociology/2021-small.pdf>
vote_matrix (pd.DataFrame): A vote matrix DataFrame with NaN values where: \
1. rows are voters, \
2. columns are statements, and \
3. values are votes.
imputed_matrix (pd.DataFrame): The same vote matrix DataFrame imputing NaN values with column mean.
mean_imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
imputed_matrix = pd.DataFrame(
return imputed_matrix
Process a prepared vote matrix to be imputed and return projected participant data, as well as eigenvectors and eigenvalues.
The vote matrix should not yet be imputed, as this will happen within the method.
def run_pca(
vote_matrix: VoteMatrix,
n_components: int,
) -> Tuple[ pd.DataFrame, np.ndarray, np.ndarray ]:
Process a prepared vote matrix to be imputed and return projected participant data,
as well as eigenvectors and eigenvalues.
The vote matrix should not yet be imputed, as this will happen within the method.
vote_matrix (pd.DataFrame): A vote matrix of data. Non-imputed values are expected.
n_components (int): Number n of principal components to decompose the `vote_matrix` into.
projected_data (pd.DataFrame): A dataframe of projected xy coordinates for each `vote_matrix` row.
eigenvectors (List[List[float]]): Principal `n` components, one per row.
eigenvalues (List[float]): Explained variance, one per row.
imputed_matrix = impute_missing_votes(vote_matrix)
pca = PCA(n_components=n_components) ## pca is apparently different, it wants
pca.fit(imputed_matrix) ## .T transposes the matrix (flips it)
eigenvectors = pca.components_
eigenvalues = pca.explained_variance_
# Project participant vote data onto 2D using eigenvectors.
projected_data = pca.transform(imputed_matrix)
projected_data = pd.DataFrame(projected_data, index=imputed_matrix.index, columns=["x", "y"])
projected_data.index.name = "participant_id"
return projected_data, eigenvectors, eigenvalues
Scale projected participant xy points based on vote matrix, to account for any small number of votes by a participant and prevent those participants from bunching up in the center.
def scale_projected_data(
projected_data: pd.DataFrame,
vote_matrix: VoteMatrix
) -> pd.DataFrame:
Scale projected participant xy points based on vote matrix, to account for any small number of
votes by a participant and prevent those participants from bunching up in the center.
projected_data (pd.DataFrame): the project xy coords of participants.
vote_matrix (VoteMatrix): the processed vote matrix data frame, from which to generate scaling factors.
scaled_projected_data (pd.DataFrame): The coord data rescaled based on participant votes.
total_active_comment_count = vote_matrix.shape[1]
participant_vote_counts = vote_matrix.count(axis="columns")
# Ref: https://hyp.is/x6nhItMMEe-v1KtYFgpOiA/gwern.net/doc/sociology/2021-small.pdf
# Ref: https://github.com/compdemocracy/polis/blob/15aa65c9ca9e37ecf57e2786d7d81a4bd4ad37ef/math/src/polismath/math/pca.clj#L155-L156
participant_scaling_coeffs = np.sqrt(total_active_comment_count / participant_vote_counts).values
# See: https://numpy.org/doc/stable/reference/generated/numpy.reshape.html
# Reshape scaling_coeffs list to match the shape of projected_data matrix
participant_scaling_coeffs = np.reshape(participant_scaling_coeffs, (-1, 1))
return projected_data * participant_scaling_coeffs
Runs K-Means clustering on a 2D DataFrame of xy points, for a specific K, and returns labels for each row and cluster centers. Optionally accepts guesses on cluster centers.
def run_kmeans(
dataframe: pd.DataFrame,
n_clusters: int = 2,
# TODO: Improve this type. 3d?
init_centers: Optional[List] = None,
) -> Tuple[np.ndarray, np.ndarray]:
Runs K-Means clustering on a 2D DataFrame of xy points, for a specific K,
and returns labels for each row and cluster centers. Optionally accepts
guesses on cluster centers.
dataframe (pd.DataFrame): A dataframe with two columns (assumed `x` and `y`).
n_clusters (int): How many clusters k to assume.
init_centers (List): A list of xy coordinates to use as initial center guesses.
cluster_labels (np.ndarray): A list of zero-indexed labels for each row in the dataframe
cluster_centers (np.ndarray): A list of center coords for clusters.
if init_centers:
# Pass an array of xy coords to see kmeans guesses.
init_arg = init_centers[:n_clusters]
# Use the default strategy in sklearn.
init_arg = "k-means++"
# TODO: Set random_state to a value eventually, so calculation is deterministic.
kmeans = KMeans(n_clusters=n_clusters, random_state=None, init=init_arg, n_init="auto").fit(dataframe)
return kmeans.labels_, kmeans.cluster_centers_
Use silhouette scores to find the best number of clusters k to assume to fit the data.
def find_optimal_k(
projected_data: pd.DataFrame,
max_group_count: int = 5,
debug: bool = False,
) -> Tuple[int, float, np.ndarray]:
Use silhouette scores to find the best number of clusters k to assume to fit the data.
projected_data (pd.DataFrame): A dataframe with two columns (assumed `x` and `y`).
max_group_count (int): The max K number of groups to test for. (Default: 5)
debug (bool): Whether to print debug output. (Default: False)
optimal_k (int): Ideal number of clusters.
optimal_silhouette_score (float): Silhouette score for this K value.
optimal_cluster_labels (np.ndarray): A list of index labels assigned a group to each row in projected_date.
K_RANGE = range(2, max_group_count+1)
k_best = 0 # Best K so far.
best_silhouette_score = -np.inf
for k_test in K_RANGE:
cluster_labels, _ = run_kmeans(dataframe=projected_data, n_clusters=k_test)
this_silhouette_score = silhouette_score(projected_data, cluster_labels)
if debug:
print(f"{k_test=}, {this_silhouette_score=}")
if this_silhouette_score >= best_silhouette_score:
k_best = k_test
best_silhouette_score = this_silhouette_score
best_cluster_labels = cluster_labels
optimal_k = k_best
optimal_silhouette = best_silhouette_score
optimal_cluster_labels = best_cluster_labels
return optimal_k, optimal_silhouette, optimal_cluster_labels
See: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html
unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
def get_unvoted_statement_ids(vote_matrix: VoteMatrix) -> List[int]:
A method intended to be piped into a VoteMatrix DataFrame, returning list of unvoted statement IDs.
See: <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pipe.html>
vote_matrix (pd.DataFrame): A pivot of statements (cols), participants (rows), with votes as values.
unvoted_statement_ids (List[int]): list of statement IDs with no votes.
unused_statement_ids = vote_matrix.pipe(get_unvoted_statement_ids)
null_column_mask = vote_matrix.isnull().all()
null_column_ids = vote_matrix.columns[null_column_mask].tolist()
return null_column_ids
Generates a matplotlib scatterplot with optional bounded clusters.
The plot is drawn from a dataframe of xy values, each point labelled by index participant_id
When a list of labels are supplied (corresponding to each row), concave hulls are drawn around them.
def generate_figure(
coord_dataframe: pd.DataFrame,
labels: List[int] = None,
) -> None:
Generates a matplotlib scatterplot with optional bounded clusters.
The plot is drawn from a dataframe of xy values, each point labelled by index `participant_id`.
When a list of labels are supplied (corresponding to each row), concave hulls are drawn around them.
coord_dataframe (pd.DataFrame): A dataframe of coordinates with columns named `x` and `y`, indexed by `participant_id`.
labels (List[int]): A list of labels, one for each row in `coord_dataframe`.
plt.figure(figsize=(7, 5), dpi=80)
plt.axhline(y=0, color='k', linestyle='-', linewidth=0.5)
plt.axvline(x=0, color='k', linestyle='-', linewidth=0.5)
# Label points with participant_id if no labels set.
for participant_id, row in coord_dataframe.iterrows():
(row["x"], row["y"]),
xytext=(2, 2),
textcoords='offset points')
scatter_kwargs = defaultdict()
scatter_kwargs["x"] = coord_dataframe.loc[:,"x"]
scatter_kwargs["y"] = coord_dataframe.loc[:,"y"]
scatter_kwargs["s"] = 10 # point size
scatter_kwargs["alpha"] = 0.8 # point transparency
if labels is not None:
# Ref: https://matplotlib.org/stable/users/explain/colors/colormaps.html#qualitative
scatter_kwargs["cmap"] = "Set1" # color map
scatter_kwargs["c"] = labels # color indexes
print("Calculating convex hulls around clusters...")
unique_labels = set(labels)
for label in unique_labels:
points = coord_dataframe[labels == label]
print(f"Hull {str(label)}, bounding {len(points)} points")
if len(points) < 3:
# TODO: Accomodate 2 points like Polis platform does.
print("Cannot create concave hull for less than 3 points. Skipping...")
vertex_indices = concave_hull_indexes(points, concavity=4.0)
hull_points = points.iloc[vertex_indices, :]
polygon = patches.Polygon(
return None