Giovanni's Diary > Subjects > Programming > Gists >

Python / k-means.py

Implementation of k-means clustering in Python.

# k-means
# =======
#
# Implementation of k-means clustering in Python.
#
# Credits to: https://towardsdatascience.com/create-your-own-k-means-clustering-algorithm-in-python-d7d4c9077670/
# Requirements:
#  - matplotlib
#  - seaborn
#  - scikit-learn
#

import seaborn as sns
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
import sklearn.preprocessing as skp
import sklearn.datasets as skd
import numpy as np
import numpy.random as npr
import random

def euclidean(point, data):
    """
    Euclidean distance between point and data.
    Point has dimensions (m,), data has dimensions (n,m),
    and output will be of size (n,).
    """
    return np.sqrt(np.sum((point - data)**2, axis=1))

class KMeans:

    def __init__(self, n_clusters=8, max_iter=300):
        self.n_clusters = n_clusters
        self.max_iter = max_iter

    def fit(self, X_train):
        # Randomly select centroid start points, uniformly
        # distributed across the domain of the dataset
        # min_ = np.min(X_train, axis=0)
        # max_ = np.max(X_train, axis=0)
        #self.centroids = [npr.uniform(min_, max_) for _ in range(self.n_clusters)]

        # Initialize the centroids, using k-means++ method
        # where a random datapoint is selected as the first,
        # then the rest are initialized w/ probabilities
        # proportional to their distances to the first
        self.centroids = [random.choice(X_train)]
        for _ in range(self.n_clusters-1):
            dists = np.sum([euclidean(centroid, X_train) for centroid in self.centroids], axis=0)
            dists /= np.sum(dists)
            new_centroid_idx, = np.random.choice(range(len(X_train)), size=1, p=dists)
            self.centroids += [X_train[new_centroid_idx]]

        iteration = 0
        prev_centroids = None
        while np.not_equal(self.centroids, prev_centroids).any() and iteration < self.max_iter:
            # Sort each data point, assigning to nearest centroid
            sorted_points = [[] for _ in range(self.n_clusters)]
            for x in X_train:
                dists = euclidean(x, self.centroids)
                centroid_idx = np.argmin(dists)
                sorted_points[centroid_idx].append(x)

            # Push current centroids to previous, reassign
            # centroids as mean of the points belonging to them
            prev_centroids = self.centroids
            self.centroids = [np.mean(cluster, axis=0) for cluster in sorted_points]
            for i, centroid in enumerate(self.centroids):
                # Catch any np.nans, resulting from a centroid
                # having no points
                if np.isnan(centroid).any():
                    self.centroids[i] = prev_centroids[i]
            iteration += 1

    def evaluate(self, X):
        centroids = []
        centroid_idxs = []
        for x in X:
            dists = euclidean(x, self.centroids)
            centroid_idx = np.argmin(dists)
            centroids.append(self.centroids[centroid_idx])
            centroid_idxs.append(centroid_idx)

        return centroids, centroid_idx

centers = 5

# Generate random data
X_train, true_labels = skd.make_blobs(n_samples=100,
                                     centers=centers,
                                     random_state=42)
X_train = skp.StandardScaler().fit_transform(X_train)

kmeans = KMeans(n_clusters=centers)
kmeans.fit(X_train)

# View results
class_centers, classification = kmeans.evaluate(X_train)
sns.scatterplot(x=[X[0] for X in X_train],
                y=[X[1] for X in X_train],
                hue=true_labels,
                style=classification,
                palette="deep",
                legend=None)

plt.plot([x for x, _ in kmeans.centroids],
         [y for _, y in kmeans.centroids],
         '+',
         markersize=10);
plt.xlabel("x")
plt.ylabel("y")
plt.show()

Travel: Gists, Index