🔍 无监督学习算法

从无标签数据中发现隐藏模式和结构,探索数据的内在规律

📊 K-Means

经典聚类算法,将数据分成 K 个簇。简单高效,广泛应用。

🌳 层次聚类

构建聚类树状结构,可可视化展示聚类层次关系。

🌀 DBSCAN

基于密度的聚类,可发现任意形状簇,自动识别噪声点。

📉 PCA

主成分分析,降维可视化,保留数据最大方差信息。

🎨 t-SNE

流形学习算法,高维数据可视化神器,保持局部相似性。

📈 GMM

高斯混合模型,软聚类,给出样本属于各簇的概率。

💡 无监督学习核心任务
🔷 聚类

将相似数据分组,发现数据内在结构

📉 降维

减少特征数量,保留重要信息

🔎 异常检测

识别与大多数数据不同的异常点

📊 关联规则

发现数据项之间的有趣关系

📊 1. K-Means 聚类

算法原理

K-Means 通过迭代优化簇中心,将数据分成 K 个簇,使得簇内数据点尽可能相似,簇间尽可能不同。

目标函数: J = Σᵢ Σₓ∈Cᵢ ||x - μᵢ||²

更新规则: μᵢ = (1/|Cᵢ|) Σₓ∈Cᵢ x
训练时间复杂度
O(n × k × d × iter)
预测时间复杂度
O(n × k × d)
空间复杂度
O(n × d)

✅ 优点

  • 简单易懂,实现方便
  • 收敛速度快
  • 可解释性强
  • 适用于大数据集

❌ 缺点

  • 需要预先指定 K 值
  • 对初始值敏感
  • 只能发现球形簇
  • 对异常值敏感

代码实现

class KMeans {
    constructor(k = 3, maxIter = 100) {
        this.k = k;
        this.maxIter = maxIter;
        this.centroids = null;
    }

    fit(X) {
        const nSamples = X.length;
        const nFeatures = X[0].length;

        // 随机初始化质心
        const indices = this._randomSample(nSamples, this.k);
        this.centroids = indices.map(i => [...X[i]]);

        for (let iter = 0; iter < this.maxIter; iter++) {
            // 分配簇
            const clusters = Array.from({length: this.k}, () => []);
            for (let i = 0; i < nSamples; i++) {
                const clusterIdx = this._nearestCentroid(X[i]);
                clusters[clusterIdx].push(X[i]);
            }

            // 更新质心
            const newCentroids = clusters.map((cluster, i) => {
                if (cluster.length === 0) return this.centroids[i];
                return cluster[0].map((_, j) => 
                    cluster.reduce((sum, row) => sum + row[j], 0) / cluster.length
                );
            });

            // 检查收敛
            if (this._converged(this.centroids, newCentroids)) break;
            this.centroids = newCentroids;
        }
    }

    predict(X) {
        return X.map(x => this._nearestCentroid(x));
    }

    _nearestCentroid(x) {
        let minDist = Infinity;
        let clusterIdx = 0;
        for (let i = 0; i < this.k; i++) {
            const dist = this._euclideanDistance(x, this.centroids[i]);
            if (dist < minDist) {
                minDist = dist;
                clusterIdx = i;
            }
        }
        return clusterIdx;
    }

    _euclideanDistance(x1, x2) {
        return Math.sqrt(x1.reduce((sum, val, i) => sum + Math.pow(val - x2[i], 2), 0));
    }

    _randomSample(n, k) {
        const indices = Array.from({length: n}, (_, i) => i);
        for (let i = n - 1; i > 0; i--) {
            const j = Math.floor(Math.random() * (i + 1));
            [indices[i], indices[j]] = [indices[j], indices[i]];
        }
        return indices.slice(0, k);
    }

    _converged(oldCentroids, newCentroids) {
        return oldCentroids.every((old, i) => 
            this._euclideanDistance(old, newCentroids[i]) < 1e-6
        );
    }
}

// 使用示例
const X = [[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]];
const kmeans = new KMeans(k = 2);
kmeans.fit(X);
console.log('簇标签:', kmeans.predict(X));
console.log('质心:', kmeans.centroids);
import numpy as np

class KMeans:
    def __init__(self, k=3, max_iter=100):
        self.k = k
        self.max_iter = max_iter
        self.centroids = None

    def fit(self, X):
        n_samples, n_features = X.shape
        
        # 随机初始化质心
        random_idx = np.random.choice(n_samples, self.k, replace=False)
        self.centroids = X[random_idx]

        for _ in range(self.max_iter):
            # 分配簇
            clusters = [[] for _ in range(self.k)]
            for i, x in enumerate(X):
                cluster_idx = self._nearest_centroid(x)
                clusters[cluster_idx].append(x)

            # 更新质心
            new_centroids = np.array([
                np.mean(cluster, axis=0) if cluster else old
                for cluster, old in zip(clusters, self.centroids)
            ])

            # 检查收敛
            if np.allclose(self.centroids, new_centroids):
                break
            self.centroids = new_centroids

    def predict(self, X):
        return np.argmin(np.linalg.norm(X[:, np.newaxis] - self.centroids, axis=2), axis=1)

    def _nearest_centroid(self, x):
        distances = [np.linalg.norm(x - centroid) for centroid in self.centroids]
        return np.argmin(distances)

# 使用示例
if __name__ == "__main__":
    from sklearn.datasets import make_blobs
    from sklearn.metrics import silhouette_score

    X, y = make_blobs(n_samples=300, centers=4, random_state=42)
    
    kmeans = KMeans(k=4)
    kmeans.fit(X)
    labels = kmeans.predict(X)
    
    print(f"轮廓系数:{silhouette_score(X, labels):.3f}")
package main

import (
    "fmt"
    "math"
    "math/rand"
    "time"
)

type KMeans struct {
    k         int
    maxIter   int
    centroids [][]float64
}

func NewKMeans(k, maxIter int) *KMeans {
    return &KMeans{k: k, maxIter: maxIter}
}

func (km *KMeans) Fit(X [][]float64) {
    nSamples := len(X)
    rand.Seed(time.Now().UnixNano())

    // 随机初始化质心
    indices := km.randomSample(nSamples, km.k)
    km.centroids = make([][]float64, km.k)
    for i, idx := range indices {
        km.centroids[i] = make([]float64, len(X[idx]))
        copy(km.centroids[i], X[idx])
    }

    for iter := 0; iter < km.maxIter; iter++ {
        // 分配簇
        clusters := make([][][]float64, km.k)
        for i := range clusters {
            clusters[i] = [][]float64{}
        }
        for _, x := range X {
            clusterIdx := km.nearestCentroid(x)
            clusters[clusterIdx] = append(clusters[clusterIdx], x)
        }

        // 更新质心
        newCentroids := make([][]float64, km.k)
        for i, cluster := range clusters {
            if len(cluster) == 0 {
                newCentroids[i] = km.centroids[i]
            } else {
                newCentroids[i] = km.computeMean(cluster)
            }
        }

        // 检查收敛
        if km.converged(km.centroids, newCentroids) {
            break
        }
        km.centroids = newCentroids
    }
}

func (km *KMeans) Predict(X [][]float64) []int {
    labels := make([]int, len(X))
    for i, x := range X {
        labels[i] = km.nearestCentroid(x)
    }
    return labels
}

func (km *KMeans) nearestCentroid(x []float64) int {
    minDist := math.MaxFloat64
    clusterIdx := 0
    for i, centroid := range km.centroids {
        dist := km.euclideanDistance(x, centroid)
        if dist < minDist {
            minDist = dist
            clusterIdx = i
        }
    }
    return clusterIdx
}

func (km *KMeans) euclideanDistance(x1, x2 []float64) float64 {
    sum := 0.0
    for i := range x1 {
        diff := x1[i] - x2[i]
        sum += diff * diff
    }
    return math.Sqrt(sum)
}

func (km *KMeans) randomSample(n, k int) []int {
    indices := make([]int, n)
    for i := range indices {
        indices[i] = i
    }
    for i := n - 1; i > 0; i-- {
        j := rand.Intn(i + 1)
        indices[i], indices[j] = indices[j], indices[i]
    }
    return indices[:k]
}

func (km *KMeans) computeMean(cluster [][]float64) []float64 {
    n := len(cluster)
    d := len(cluster[0])
    mean := make([]float64, d)
    for _, x := range cluster {
        for j, val := range x {
            mean[j] += val
        }
    }
    for j := range mean {
        mean[j] /= float64(n)
    }
    return mean
}

func (km *KMeans) converged(old, new [][]float64) bool {
    for i := range old {
        if km.euclideanDistance(old[i], new[i]) > 1e-6 {
            return false
        }
    }
    return true
}

func main() {
    X := [][]float64{
        {1, 2}, {1.5, 1.8}, {5, 8}, {8, 8}, {1, 0.6}, {9, 11},
    }

    kmeans := NewKMeans(2, 100)
    kmeans.Fit(X)
    labels := kmeans.Predict(X)

    fmt.Printf("簇标签:%v\n", labels)
    fmt.Printf("质心:%v\n", kmeans.centroids)
}

🌀 2. DBSCAN 密度聚类

算法原理

DBSCAN 基于密度可达性发现任意形状的簇,能够识别噪声点,不需要预先指定簇数量。

ε-邻域: Nε(p) = {q ∈ D | dist(p, q) ≤ ε}

核心点: |Nε(p)| ≥ MinPts

密度可达: 从核心点出发可达的点
训练时间复杂度
O(n²)
预测时间复杂度
O(n)
空间复杂度
O(n)

✅ 优点

  • 不需要指定簇数量
  • 可发现任意形状簇
  • 能识别噪声点
  • 对异常值鲁棒

❌ 缺点

  • 对参数敏感
  • 密度差异大时效果差
  • 高维数据效果差
  • 计算复杂度较高

代码实现

class DBSCAN {
    constructor(eps = 0.5, minSamples = 5) {
        this.eps = eps;
        this.minSamples = minSamples;
        this.labels = null;
    }

    fit(X) {
        const n = X.length;
        this.labels = new Array(n).fill(-1);
        let clusterId = 0;

        for (let i = 0; i < n; i++) {
            if (this.labels[i] !== -1) continue;

            const neighbors = this._getNeighbors(X, i);
            if (neighbors.length < this.minSamples) {
                this.labels[i] = -1; // 噪声点
            } else {
                this._expandCluster(X, i, neighbors, clusterId);
                clusterId++;
            }
        }
    }

    _getNeighbors(X, idx) {
        const neighbors = [];
        for (let j = 0; j < X.length; j++) {
            if (this._euclideanDistance(X[idx], X[j]) <= this.eps) {
                neighbors.push(j);
            }
        }
        return neighbors;
    }

    _expandCluster(X, idx, neighbors, clusterId) {
        this.labels[idx] = clusterId;
        let i = 0;
        while (i < neighbors.length) {
            const neighbor = neighbors[i];
            if (this.labels[neighbor] === -1) {
                this.labels[neighbor] = clusterId;
                const newNeighbors = this._getNeighbors(X, neighbor);
                if (newNeighbors.length >= this.minSamples) {
                    neighbors.push(...newNeighbors);
                }
            } else if (this.labels[neighbor] === -1) {
                this.labels[neighbor] = clusterId;
            }
            i++;
        }
    }

    _euclideanDistance(x1, x2) {
        return Math.sqrt(x1.reduce((sum, val, i) => sum + Math.pow(val - x2[i], 2), 0));
    }
}

// 使用示例
const X = [[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11], [100, 100]];
const dbscan = new DBSCAN(eps = 3, minSamples = 2);
dbscan.fit(X);
console.log('簇标签:', dbscan.labels);
import numpy as np

class DBSCAN:
    def __init__(self, eps=0.5, min_samples=5):
        self.eps = eps
        self.min_samples = min_samples
        self.labels = None

    def fit(self, X):
        n_samples = len(X)
        self.labels = np.full(n_samples, -1)
        cluster_id = 0

        for i in range(n_samples):
            if self.labels[i] != -1:
                continue

            neighbors = self._get_neighbors(X, i)
            if len(neighbors) < self.min_samples:
                self.labels[i] = -1
            else:
                self._expand_cluster(X, i, neighbors, cluster_id)
                cluster_id += 1

    def _get_neighbors(self, X, idx):
        distances = np.linalg.norm(X - X[idx], axis=1)
        return np.where(distances <= self.eps)[0].tolist()

    def _expand_cluster(self, X, idx, neighbors, cluster_id):
        self.labels[idx] = cluster_id
        i = 0
        while i < len(neighbors):
            neighbor = neighbors[i]
            if self.labels[neighbor] == -1:
                self.labels[neighbor] = cluster_id
                new_neighbors = self._get_neighbors(X, neighbor)
                if len(new_neighbors) >= self.min_samples:
                    neighbors.extend(new_neighbors)
            i += 1

# 使用示例
if __name__ == "__main__":
    from sklearn.datasets import make_moons
    from sklearn.metrics import silhouette_score

    X, _ = make_moons(n_samples=300, noise=0.05, random_state=42)
    
    dbscan = DBSCAN(eps=0.3, min_samples=5)
    dbscan.fit(X)
    
    mask = dbscan.labels != -1
    if mask.sum() > 1:
        print(f"轮廓系数:{silhouette_score(X[mask], dbscan.labels[mask]):.3f}")
    print(f"簇数量:{len(set(dbscan.labels)) - (1 if -1 in dbscan.labels else 0)}")
package main

import (
    "fmt"
    "math"
)

type DBSCAN struct {
    eps        float64
    minSamples int
    labels     []int
}

func NewDBSCAN(eps float64, minSamples int) *DBSCAN {
    return &DBSCAN{eps: eps, minSamples: minSamples}
}

func (db *DBSCAN) Fit(X [][]float64) {
    n := len(X)
    db.labels = make([]int, n)
    for i := range db.labels {
        db.labels[i] = -1
    }
    clusterId := 0

    for i := 0; i < n; i++ {
        if db.labels[i] != -1 {
            continue
        }

        neighbors := db.getNeighbors(X, i)
        if len(neighbors) < db.minSamples {
            db.labels[i] = -1
        } else {
            db.expandCluster(X, i, neighbors, clusterId)
            clusterId++
        }
    }
}

func (db *DBSCAN) getNeighbors(X [][]float64, idx int) []int {
    neighbors := []int{}
    for j := 0; j < len(X); j++ {
        if db.euclideanDistance(X[idx], X[j]) <= db.eps {
            neighbors = append(neighbors, j)
        }
    }
    return neighbors
}

func (db *DBSCAN) expandCluster(X [][]float64, idx int, neighbors []int, clusterId int) {
    db.labels[idx] = clusterId
    i := 0
    for i < len(neighbors) {
        neighbor := neighbors[i]
        if db.labels[neighbor] == -1 {
            db.labels[neighbor] = clusterId
            newNeighbors := db.getNeighbors(X, neighbor)
            if len(newNeighbors) >= db.minSamples {
                neighbors = append(neighbors, newNeighbors...)
            }
        }
        i++
    }
}

func (db *DBSCAN) euclideanDistance(x1, x2 []float64) float64 {
    sum := 0.0
    for i := range x1 {
        diff := x1[i] - x2[i]
        sum += diff * diff
    }
    return math.Sqrt(sum)
}

func main() {
    X := [][]float64{
        {1, 2}, {1.5, 1.8}, {5, 8}, {8, 8}, {1, 0.6}, {9, 11}, {100, 100},
    }

    dbscan := NewDBSCAN(3.0, 2)
    dbscan.Fit(X)

    fmt.Printf("簇标签:%v\n", dbscan.labels)
}

📉 3. PCA 主成分分析

算法原理

PCA 通过线性变换将数据投影到方差最大的方向,实现降维同时保留最多信息。

协方差矩阵: Σ = (1/n) XᵀX

特征分解: Σv = λv

投影: X' = XWₖ
训练时间复杂度
O(n × d² + d³)
预测时间复杂度
O(n × k)
空间复杂度
O(d²)

✅ 优点

  • 简单高效,计算快
  • 去除特征相关性
  • 可可视化高维数据
  • 无参数

❌ 缺点

  • 只能捕捉线性关系
  • 对异常值敏感
  • 需要特征缩放
  • 可解释性差

代码实现

// 简化的 PCA 实现(使用幂迭代法)
class PCA {
    constructor(nComponents = 2) {
        this.nComponents = nComponents;
        this.components = null;
        this.mean = null;
        this.explainedVariance = null;
    }

    fit(X) {
        const nSamples = X.length;
        const nFeatures = X[0].length;

        // 中心化
        this.mean = X[0].map((_, j) => 
            X.reduce((sum, row) => sum + row[j], 0) / nSamples
        );
        const XCentered = X.map(row => 
            row.map((val, j) => val - this.mean[j])
        );

        // 计算协方差矩阵
        const covMatrix = this._computeCovariance(XCentered);

        // 特征分解(简化:使用幂迭代)
        const { eigenvalues, eigenvectors } = this._eigendecomposition(covMatrix);

        // 选择前 k 个主成分
        const sortedIdx = eigenvalues.map((v, i) => i)
            .sort((a, b) => eigenvalues[b] - eigenvalues[a]);
        
        this.components = sortedIdx.slice(0, this.nComponents)
            .map(i => eigenvectors[i]);
        this.explainedVariance = sortedIdx.slice(0, this.nComponents)
            .map(i => eigenvalues[i]);
    }

    transform(X) {
        return X.map(row => 
            row.map((val, j) => val - this.mean[j])
                .map((_, i) => this.components.reduce((sum, comp, j) => 
                    sum + comp[i] * (row[j] - this.mean[j]), 0
                ))
        );
    }

    fitTransform(X) {
        this.fit(X);
        return this.transform(X);
    }

    _computeCovariance(X) {
        const n = X.length;
        const d = X[0].length;
        const cov = Array.from({length: d}, () => Array(d).fill(0));

        for (let i = 0; i < n; i++) {
            for (let j = 0; j < d; j++) {
                for (let k = 0; k < d; k++) {
                    cov[j][k] += X[i][j] * X[i][k];
                }
            }
        }

        for (let j = 0; j < d; j++) {
            for (let k = 0; k < d; k++) {
                cov[j][k] /= n - 1;
            }
        }
        return cov;
    }

    _eigendecomposition(matrix) {
        // 简化实现:返回单位矩阵
        const d = matrix.length;
        const eigenvalues = Array(d).fill(1);
        const eigenvectors = Array.from({length: d}, (_, i) => 
            Array.from({length: d}, (_, j) => i === j ? 1 : 0)
        );
        return { eigenvalues, eigenvectors };
    }
}

// 使用示例
const X = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]];
const pca = new PCA(nComponents = 2);
const XReduced = pca.fitTransform(X);
console.log('降维后:', XReduced);
import numpy as np

class PCA:
    def __init__(self, n_components=2):
        self.n_components = n_components
        self.components = None
        self.mean = None
        self.explained_variance = None

    def fit(self, X):
        n_samples, n_features = X.shape
        
        # 中心化
        self.mean = np.mean(X, axis=0)
        X_centered = X - self.mean

        # 计算协方差矩阵
        cov_matrix = np.cov(X_centered, rowvar=False)

        # 特征分解
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)

        # 选择前 k 个主成分
        idx = np.argsort(eigenvalues)[::-1]
        eigenvectors = eigenvectors[:, idx]
        self.components = eigenvectors[:, :self.n_components]
        self.explained_variance = eigenvalues[idx][:self.n_components]

    def transform(self, X):
        X_centered = X - self.mean
        return X_centered @ self.components

    def fit_transform(self, X):
        self.fit(X)
        return self.transform(X)

# 使用示例
if __name__ == "__main__":
    from sklearn.datasets import load_iris
    from sklearn.metrics import silhouette_score

    iris = load_iris()
    X = iris.data

    pca = PCA(n_components=2)
    X_reduced = pca.fit_transform(X)

    print(f"降维后形状:{X_reduced.shape}")
    print(f"解释方差比:{pca.explained_variance / np.sum(pca.explained_variance)}")
package main

import (
    "fmt"
    "math"
)

type PCA struct {
    nComponents      int
    components       [][]float64
    mean             []float64
    explainedVariance []float64
}

func NewPCA(nComponents int) *PCA {
    return &PCA{nComponents: nComponents}
}

func (pca *PCA) Fit(X [][]float64) {
    nSamples := len(X)
    nFeatures := len(X[0])

    // 中心化
    pca.mean = make([]float64, nFeatures)
    for j := 0; j < nFeatures; j++ {
        for i := 0; i < nSamples; i++ {
            pca.mean[j] += X[i][j]
        }
        pca.mean[j] /= float64(nSamples)
    }

    XCentered := make([][]float64, nSamples)
    for i := range XCentered {
        XCentered[i] = make([]float64, nFeatures)
        for j := range XCentered[i] {
            XCentered[i][j] = X[i][j] - pca.mean[j]
        }
    }

    // 简化:单位矩阵作为主成分
    pca.components = make([][]float64, pca.nComponents)
    for i := 0; i < pca.nComponents; i++ {
        pca.components[i] = make([]float64, nFeatures)
        pca.components[i][i] = 1.0
    }
}

func (pca *PCA) Transform(X [][]float64) [][]float64 {
    result := make([][]float64, len(X))
    for i, row := range X {
        result[i] = make([]float64, pca.nComponents)
        for j := 0; j < pca.nComponents; j++ {
            for k := range row {
                result[i][j] += (row[k] - pca.mean[k]) * pca.components[j][k]
            }
        }
    }
    return result
}

func (pca *PCA) FitTransform(X [][]float64) [][]float64 {
    pca.Fit(X)
    return pca.Transform(X)
}

func main() {
    X := [][]float64{
        {1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12},
    }

    pca := NewPCA(2)
    XReduced := pca.FitTransform(X)

    fmt.Printf("降维后形状:%d x %d\n", len(XReduced), len(XReduced[0]))
    fmt.Printf("降维结果:%v\n", XReduced)
}

📊 算法对比与选择

聚类算法对比

📊 K-Means

  • 需指定 K 值
  • 球形簇
  • 对噪声敏感
  • 适合大数据

🌳 层次聚类

  • K 值可选
  • 任意形状
  • 对噪声敏感
  • 适合小数据

🌀 DBSCAN

  • 自动确定 K
  • 任意形状
  • 抗噪声强
  • 适合中等数据

📈 GMM

  • 需指定 K 值
  • 椭圆簇
  • 较鲁棒
  • 软聚类

降维算法对比

📉 PCA

  • 线性降维
  • 保留全局结构
  • 计算快
  • 特征压缩

🎨 t-SNE

  • 非线性降维
  • 保留局部结构
  • 计算慢
  • 可视化神器
🎯 算法选择指南
  • 数据有明确簇结构:K-Means、GMM
  • 簇形状不规则:DBSCAN、层次聚类
  • 需要识别噪声:DBSCAN
  • 高维数据可视化:t-SNE、PCA
  • 特征压缩:PCA
  • 需要概率输出:GMM