🔍 无监督学习算法
从无标签数据中发现隐藏模式和结构,探索数据的内在规律
📊 K-Means
经典聚类算法,将数据分成 K 个簇。简单高效,广泛应用。
🌳 层次聚类
构建聚类树状结构,可可视化展示聚类层次关系。
🌀 DBSCAN
基于密度的聚类,可发现任意形状簇,自动识别噪声点。
📉 PCA
主成分分析,降维可视化,保留数据最大方差信息。
🎨 t-SNE
流形学习算法,高维数据可视化神器,保持局部相似性。
📈 GMM
高斯混合模型,软聚类,给出样本属于各簇的概率。
💡 无监督学习核心任务
🔷 聚类
将相似数据分组,发现数据内在结构
📉 降维
减少特征数量,保留重要信息
🔎 异常检测
识别与大多数数据不同的异常点
📊 关联规则
发现数据项之间的有趣关系
📊 1. K-Means 聚类
算法原理
K-Means 通过迭代优化簇中心,将数据分成 K 个簇,使得簇内数据点尽可能相似,簇间尽可能不同。
目标函数: J = Σᵢ Σₓ∈Cᵢ ||x - μᵢ||²
更新规则: μᵢ = (1/|Cᵢ|) Σₓ∈Cᵢ x
更新规则: μᵢ = (1/|Cᵢ|) Σₓ∈Cᵢ x
训练时间复杂度
O(n × k × d × iter)
预测时间复杂度
O(n × k × d)
空间复杂度
O(n × d)
✅ 优点
- 简单易懂,实现方便
- 收敛速度快
- 可解释性强
- 适用于大数据集
❌ 缺点
- 需要预先指定 K 值
- 对初始值敏感
- 只能发现球形簇
- 对异常值敏感
代码实现
class KMeans {
constructor(k = 3, maxIter = 100) {
this.k = k;
this.maxIter = maxIter;
this.centroids = null;
}
fit(X) {
const nSamples = X.length;
const nFeatures = X[0].length;
// 随机初始化质心
const indices = this._randomSample(nSamples, this.k);
this.centroids = indices.map(i => [...X[i]]);
for (let iter = 0; iter < this.maxIter; iter++) {
// 分配簇
const clusters = Array.from({length: this.k}, () => []);
for (let i = 0; i < nSamples; i++) {
const clusterIdx = this._nearestCentroid(X[i]);
clusters[clusterIdx].push(X[i]);
}
// 更新质心
const newCentroids = clusters.map((cluster, i) => {
if (cluster.length === 0) return this.centroids[i];
return cluster[0].map((_, j) =>
cluster.reduce((sum, row) => sum + row[j], 0) / cluster.length
);
});
// 检查收敛
if (this._converged(this.centroids, newCentroids)) break;
this.centroids = newCentroids;
}
}
predict(X) {
return X.map(x => this._nearestCentroid(x));
}
_nearestCentroid(x) {
let minDist = Infinity;
let clusterIdx = 0;
for (let i = 0; i < this.k; i++) {
const dist = this._euclideanDistance(x, this.centroids[i]);
if (dist < minDist) {
minDist = dist;
clusterIdx = i;
}
}
return clusterIdx;
}
_euclideanDistance(x1, x2) {
return Math.sqrt(x1.reduce((sum, val, i) => sum + Math.pow(val - x2[i], 2), 0));
}
_randomSample(n, k) {
const indices = Array.from({length: n}, (_, i) => i);
for (let i = n - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[indices[i], indices[j]] = [indices[j], indices[i]];
}
return indices.slice(0, k);
}
_converged(oldCentroids, newCentroids) {
return oldCentroids.every((old, i) =>
this._euclideanDistance(old, newCentroids[i]) < 1e-6
);
}
}
// 使用示例
const X = [[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11]];
const kmeans = new KMeans(k = 2);
kmeans.fit(X);
console.log('簇标签:', kmeans.predict(X));
console.log('质心:', kmeans.centroids);import numpy as np
class KMeans:
def __init__(self, k=3, max_iter=100):
self.k = k
self.max_iter = max_iter
self.centroids = None
def fit(self, X):
n_samples, n_features = X.shape
# 随机初始化质心
random_idx = np.random.choice(n_samples, self.k, replace=False)
self.centroids = X[random_idx]
for _ in range(self.max_iter):
# 分配簇
clusters = [[] for _ in range(self.k)]
for i, x in enumerate(X):
cluster_idx = self._nearest_centroid(x)
clusters[cluster_idx].append(x)
# 更新质心
new_centroids = np.array([
np.mean(cluster, axis=0) if cluster else old
for cluster, old in zip(clusters, self.centroids)
])
# 检查收敛
if np.allclose(self.centroids, new_centroids):
break
self.centroids = new_centroids
def predict(self, X):
return np.argmin(np.linalg.norm(X[:, np.newaxis] - self.centroids, axis=2), axis=1)
def _nearest_centroid(self, x):
distances = [np.linalg.norm(x - centroid) for centroid in self.centroids]
return np.argmin(distances)
# 使用示例
if __name__ == "__main__":
from sklearn.datasets import make_blobs
from sklearn.metrics import silhouette_score
X, y = make_blobs(n_samples=300, centers=4, random_state=42)
kmeans = KMeans(k=4)
kmeans.fit(X)
labels = kmeans.predict(X)
print(f"轮廓系数:{silhouette_score(X, labels):.3f}")package main
import (
"fmt"
"math"
"math/rand"
"time"
)
type KMeans struct {
k int
maxIter int
centroids [][]float64
}
func NewKMeans(k, maxIter int) *KMeans {
return &KMeans{k: k, maxIter: maxIter}
}
func (km *KMeans) Fit(X [][]float64) {
nSamples := len(X)
rand.Seed(time.Now().UnixNano())
// 随机初始化质心
indices := km.randomSample(nSamples, km.k)
km.centroids = make([][]float64, km.k)
for i, idx := range indices {
km.centroids[i] = make([]float64, len(X[idx]))
copy(km.centroids[i], X[idx])
}
for iter := 0; iter < km.maxIter; iter++ {
// 分配簇
clusters := make([][][]float64, km.k)
for i := range clusters {
clusters[i] = [][]float64{}
}
for _, x := range X {
clusterIdx := km.nearestCentroid(x)
clusters[clusterIdx] = append(clusters[clusterIdx], x)
}
// 更新质心
newCentroids := make([][]float64, km.k)
for i, cluster := range clusters {
if len(cluster) == 0 {
newCentroids[i] = km.centroids[i]
} else {
newCentroids[i] = km.computeMean(cluster)
}
}
// 检查收敛
if km.converged(km.centroids, newCentroids) {
break
}
km.centroids = newCentroids
}
}
func (km *KMeans) Predict(X [][]float64) []int {
labels := make([]int, len(X))
for i, x := range X {
labels[i] = km.nearestCentroid(x)
}
return labels
}
func (km *KMeans) nearestCentroid(x []float64) int {
minDist := math.MaxFloat64
clusterIdx := 0
for i, centroid := range km.centroids {
dist := km.euclideanDistance(x, centroid)
if dist < minDist {
minDist = dist
clusterIdx = i
}
}
return clusterIdx
}
func (km *KMeans) euclideanDistance(x1, x2 []float64) float64 {
sum := 0.0
for i := range x1 {
diff := x1[i] - x2[i]
sum += diff * diff
}
return math.Sqrt(sum)
}
func (km *KMeans) randomSample(n, k int) []int {
indices := make([]int, n)
for i := range indices {
indices[i] = i
}
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
indices[i], indices[j] = indices[j], indices[i]
}
return indices[:k]
}
func (km *KMeans) computeMean(cluster [][]float64) []float64 {
n := len(cluster)
d := len(cluster[0])
mean := make([]float64, d)
for _, x := range cluster {
for j, val := range x {
mean[j] += val
}
}
for j := range mean {
mean[j] /= float64(n)
}
return mean
}
func (km *KMeans) converged(old, new [][]float64) bool {
for i := range old {
if km.euclideanDistance(old[i], new[i]) > 1e-6 {
return false
}
}
return true
}
func main() {
X := [][]float64{
{1, 2}, {1.5, 1.8}, {5, 8}, {8, 8}, {1, 0.6}, {9, 11},
}
kmeans := NewKMeans(2, 100)
kmeans.Fit(X)
labels := kmeans.Predict(X)
fmt.Printf("簇标签:%v\n", labels)
fmt.Printf("质心:%v\n", kmeans.centroids)
}🌀 2. DBSCAN 密度聚类
算法原理
DBSCAN 基于密度可达性发现任意形状的簇,能够识别噪声点,不需要预先指定簇数量。
ε-邻域: Nε(p) = {q ∈ D | dist(p, q) ≤ ε}
核心点: |Nε(p)| ≥ MinPts
密度可达: 从核心点出发可达的点
核心点: |Nε(p)| ≥ MinPts
密度可达: 从核心点出发可达的点
训练时间复杂度
O(n²)
预测时间复杂度
O(n)
空间复杂度
O(n)
✅ 优点
- 不需要指定簇数量
- 可发现任意形状簇
- 能识别噪声点
- 对异常值鲁棒
❌ 缺点
- 对参数敏感
- 密度差异大时效果差
- 高维数据效果差
- 计算复杂度较高
代码实现
class DBSCAN {
constructor(eps = 0.5, minSamples = 5) {
this.eps = eps;
this.minSamples = minSamples;
this.labels = null;
}
fit(X) {
const n = X.length;
this.labels = new Array(n).fill(-1);
let clusterId = 0;
for (let i = 0; i < n; i++) {
if (this.labels[i] !== -1) continue;
const neighbors = this._getNeighbors(X, i);
if (neighbors.length < this.minSamples) {
this.labels[i] = -1; // 噪声点
} else {
this._expandCluster(X, i, neighbors, clusterId);
clusterId++;
}
}
}
_getNeighbors(X, idx) {
const neighbors = [];
for (let j = 0; j < X.length; j++) {
if (this._euclideanDistance(X[idx], X[j]) <= this.eps) {
neighbors.push(j);
}
}
return neighbors;
}
_expandCluster(X, idx, neighbors, clusterId) {
this.labels[idx] = clusterId;
let i = 0;
while (i < neighbors.length) {
const neighbor = neighbors[i];
if (this.labels[neighbor] === -1) {
this.labels[neighbor] = clusterId;
const newNeighbors = this._getNeighbors(X, neighbor);
if (newNeighbors.length >= this.minSamples) {
neighbors.push(...newNeighbors);
}
} else if (this.labels[neighbor] === -1) {
this.labels[neighbor] = clusterId;
}
i++;
}
}
_euclideanDistance(x1, x2) {
return Math.sqrt(x1.reduce((sum, val, i) => sum + Math.pow(val - x2[i], 2), 0));
}
}
// 使用示例
const X = [[1, 2], [1.5, 1.8], [5, 8], [8, 8], [1, 0.6], [9, 11], [100, 100]];
const dbscan = new DBSCAN(eps = 3, minSamples = 2);
dbscan.fit(X);
console.log('簇标签:', dbscan.labels);import numpy as np
class DBSCAN:
def __init__(self, eps=0.5, min_samples=5):
self.eps = eps
self.min_samples = min_samples
self.labels = None
def fit(self, X):
n_samples = len(X)
self.labels = np.full(n_samples, -1)
cluster_id = 0
for i in range(n_samples):
if self.labels[i] != -1:
continue
neighbors = self._get_neighbors(X, i)
if len(neighbors) < self.min_samples:
self.labels[i] = -1
else:
self._expand_cluster(X, i, neighbors, cluster_id)
cluster_id += 1
def _get_neighbors(self, X, idx):
distances = np.linalg.norm(X - X[idx], axis=1)
return np.where(distances <= self.eps)[0].tolist()
def _expand_cluster(self, X, idx, neighbors, cluster_id):
self.labels[idx] = cluster_id
i = 0
while i < len(neighbors):
neighbor = neighbors[i]
if self.labels[neighbor] == -1:
self.labels[neighbor] = cluster_id
new_neighbors = self._get_neighbors(X, neighbor)
if len(new_neighbors) >= self.min_samples:
neighbors.extend(new_neighbors)
i += 1
# 使用示例
if __name__ == "__main__":
from sklearn.datasets import make_moons
from sklearn.metrics import silhouette_score
X, _ = make_moons(n_samples=300, noise=0.05, random_state=42)
dbscan = DBSCAN(eps=0.3, min_samples=5)
dbscan.fit(X)
mask = dbscan.labels != -1
if mask.sum() > 1:
print(f"轮廓系数:{silhouette_score(X[mask], dbscan.labels[mask]):.3f}")
print(f"簇数量:{len(set(dbscan.labels)) - (1 if -1 in dbscan.labels else 0)}")package main
import (
"fmt"
"math"
)
type DBSCAN struct {
eps float64
minSamples int
labels []int
}
func NewDBSCAN(eps float64, minSamples int) *DBSCAN {
return &DBSCAN{eps: eps, minSamples: minSamples}
}
func (db *DBSCAN) Fit(X [][]float64) {
n := len(X)
db.labels = make([]int, n)
for i := range db.labels {
db.labels[i] = -1
}
clusterId := 0
for i := 0; i < n; i++ {
if db.labels[i] != -1 {
continue
}
neighbors := db.getNeighbors(X, i)
if len(neighbors) < db.minSamples {
db.labels[i] = -1
} else {
db.expandCluster(X, i, neighbors, clusterId)
clusterId++
}
}
}
func (db *DBSCAN) getNeighbors(X [][]float64, idx int) []int {
neighbors := []int{}
for j := 0; j < len(X); j++ {
if db.euclideanDistance(X[idx], X[j]) <= db.eps {
neighbors = append(neighbors, j)
}
}
return neighbors
}
func (db *DBSCAN) expandCluster(X [][]float64, idx int, neighbors []int, clusterId int) {
db.labels[idx] = clusterId
i := 0
for i < len(neighbors) {
neighbor := neighbors[i]
if db.labels[neighbor] == -1 {
db.labels[neighbor] = clusterId
newNeighbors := db.getNeighbors(X, neighbor)
if len(newNeighbors) >= db.minSamples {
neighbors = append(neighbors, newNeighbors...)
}
}
i++
}
}
func (db *DBSCAN) euclideanDistance(x1, x2 []float64) float64 {
sum := 0.0
for i := range x1 {
diff := x1[i] - x2[i]
sum += diff * diff
}
return math.Sqrt(sum)
}
func main() {
X := [][]float64{
{1, 2}, {1.5, 1.8}, {5, 8}, {8, 8}, {1, 0.6}, {9, 11}, {100, 100},
}
dbscan := NewDBSCAN(3.0, 2)
dbscan.Fit(X)
fmt.Printf("簇标签:%v\n", dbscan.labels)
}📉 3. PCA 主成分分析
算法原理
PCA 通过线性变换将数据投影到方差最大的方向,实现降维同时保留最多信息。
协方差矩阵: Σ = (1/n) XᵀX
特征分解: Σv = λv
投影: X' = XWₖ
特征分解: Σv = λv
投影: X' = XWₖ
训练时间复杂度
O(n × d² + d³)
预测时间复杂度
O(n × k)
空间复杂度
O(d²)
✅ 优点
- 简单高效,计算快
- 去除特征相关性
- 可可视化高维数据
- 无参数
❌ 缺点
- 只能捕捉线性关系
- 对异常值敏感
- 需要特征缩放
- 可解释性差
代码实现
// 简化的 PCA 实现(使用幂迭代法)
class PCA {
constructor(nComponents = 2) {
this.nComponents = nComponents;
this.components = null;
this.mean = null;
this.explainedVariance = null;
}
fit(X) {
const nSamples = X.length;
const nFeatures = X[0].length;
// 中心化
this.mean = X[0].map((_, j) =>
X.reduce((sum, row) => sum + row[j], 0) / nSamples
);
const XCentered = X.map(row =>
row.map((val, j) => val - this.mean[j])
);
// 计算协方差矩阵
const covMatrix = this._computeCovariance(XCentered);
// 特征分解(简化:使用幂迭代)
const { eigenvalues, eigenvectors } = this._eigendecomposition(covMatrix);
// 选择前 k 个主成分
const sortedIdx = eigenvalues.map((v, i) => i)
.sort((a, b) => eigenvalues[b] - eigenvalues[a]);
this.components = sortedIdx.slice(0, this.nComponents)
.map(i => eigenvectors[i]);
this.explainedVariance = sortedIdx.slice(0, this.nComponents)
.map(i => eigenvalues[i]);
}
transform(X) {
return X.map(row =>
row.map((val, j) => val - this.mean[j])
.map((_, i) => this.components.reduce((sum, comp, j) =>
sum + comp[i] * (row[j] - this.mean[j]), 0
))
);
}
fitTransform(X) {
this.fit(X);
return this.transform(X);
}
_computeCovariance(X) {
const n = X.length;
const d = X[0].length;
const cov = Array.from({length: d}, () => Array(d).fill(0));
for (let i = 0; i < n; i++) {
for (let j = 0; j < d; j++) {
for (let k = 0; k < d; k++) {
cov[j][k] += X[i][j] * X[i][k];
}
}
}
for (let j = 0; j < d; j++) {
for (let k = 0; k < d; k++) {
cov[j][k] /= n - 1;
}
}
return cov;
}
_eigendecomposition(matrix) {
// 简化实现:返回单位矩阵
const d = matrix.length;
const eigenvalues = Array(d).fill(1);
const eigenvectors = Array.from({length: d}, (_, i) =>
Array.from({length: d}, (_, j) => i === j ? 1 : 0)
);
return { eigenvalues, eigenvectors };
}
}
// 使用示例
const X = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]];
const pca = new PCA(nComponents = 2);
const XReduced = pca.fitTransform(X);
console.log('降维后:', XReduced);import numpy as np
class PCA:
def __init__(self, n_components=2):
self.n_components = n_components
self.components = None
self.mean = None
self.explained_variance = None
def fit(self, X):
n_samples, n_features = X.shape
# 中心化
self.mean = np.mean(X, axis=0)
X_centered = X - self.mean
# 计算协方差矩阵
cov_matrix = np.cov(X_centered, rowvar=False)
# 特征分解
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# 选择前 k 个主成分
idx = np.argsort(eigenvalues)[::-1]
eigenvectors = eigenvectors[:, idx]
self.components = eigenvectors[:, :self.n_components]
self.explained_variance = eigenvalues[idx][:self.n_components]
def transform(self, X):
X_centered = X - self.mean
return X_centered @ self.components
def fit_transform(self, X):
self.fit(X)
return self.transform(X)
# 使用示例
if __name__ == "__main__":
from sklearn.datasets import load_iris
from sklearn.metrics import silhouette_score
iris = load_iris()
X = iris.data
pca = PCA(n_components=2)
X_reduced = pca.fit_transform(X)
print(f"降维后形状:{X_reduced.shape}")
print(f"解释方差比:{pca.explained_variance / np.sum(pca.explained_variance)}")package main
import (
"fmt"
"math"
)
type PCA struct {
nComponents int
components [][]float64
mean []float64
explainedVariance []float64
}
func NewPCA(nComponents int) *PCA {
return &PCA{nComponents: nComponents}
}
func (pca *PCA) Fit(X [][]float64) {
nSamples := len(X)
nFeatures := len(X[0])
// 中心化
pca.mean = make([]float64, nFeatures)
for j := 0; j < nFeatures; j++ {
for i := 0; i < nSamples; i++ {
pca.mean[j] += X[i][j]
}
pca.mean[j] /= float64(nSamples)
}
XCentered := make([][]float64, nSamples)
for i := range XCentered {
XCentered[i] = make([]float64, nFeatures)
for j := range XCentered[i] {
XCentered[i][j] = X[i][j] - pca.mean[j]
}
}
// 简化:单位矩阵作为主成分
pca.components = make([][]float64, pca.nComponents)
for i := 0; i < pca.nComponents; i++ {
pca.components[i] = make([]float64, nFeatures)
pca.components[i][i] = 1.0
}
}
func (pca *PCA) Transform(X [][]float64) [][]float64 {
result := make([][]float64, len(X))
for i, row := range X {
result[i] = make([]float64, pca.nComponents)
for j := 0; j < pca.nComponents; j++ {
for k := range row {
result[i][j] += (row[k] - pca.mean[k]) * pca.components[j][k]
}
}
}
return result
}
func (pca *PCA) FitTransform(X [][]float64) [][]float64 {
pca.Fit(X)
return pca.Transform(X)
}
func main() {
X := [][]float64{
{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12},
}
pca := NewPCA(2)
XReduced := pca.FitTransform(X)
fmt.Printf("降维后形状:%d x %d\n", len(XReduced), len(XReduced[0]))
fmt.Printf("降维结果:%v\n", XReduced)
}📊 算法对比与选择
聚类算法对比
📊 K-Means
- 需指定 K 值
- 球形簇
- 对噪声敏感
- 适合大数据
🌳 层次聚类
- K 值可选
- 任意形状
- 对噪声敏感
- 适合小数据
🌀 DBSCAN
- 自动确定 K
- 任意形状
- 抗噪声强
- 适合中等数据
📈 GMM
- 需指定 K 值
- 椭圆簇
- 较鲁棒
- 软聚类
降维算法对比
📉 PCA
- 线性降维
- 保留全局结构
- 计算快
- 特征压缩
🎨 t-SNE
- 非线性降维
- 保留局部结构
- 计算慢
- 可视化神器
🎯 算法选择指南
- 数据有明确簇结构:K-Means、GMM
- 簇形状不规则:DBSCAN、层次聚类
- 需要识别噪声:DBSCAN
- 高维数据可视化:t-SNE、PCA
- 特征压缩:PCA
- 需要概率输出:GMM