HyperLogLog (HLL) 是一种概率性数据结构,用于高效地估计集合中不同元素的数量(基数)。 由 Philippe Flajolet 等人在 2007 年提出,是著名的 LogLog 算法的改进版本。
HyperLogLog 的神奇之处在于:仅用 12KB 的内存,就可以估计最多 2^64 个不同元素, 误差率仅为 0.81%。
O(1) - 固定约12KB
O(1) - 每元素操作
≈ 0.81%
HyperLogLog 的核心思想可以用一个简单的概率游戏来解释:
问题:连续抛硬币多少次才能第一次看到连续 k 个正面?
答案:平均需要 2^k 次抛掷
例如:看到 3 个连续正面的平均次数是 2³ = 8 次
逆向思考:如果我们看到 k 个连续正面,可以推断大约抛了 2^k 次
将这个思想应用到基数估计:我们将每个元素的哈希值视为一串随机比特,然后观察这些比特串中:
元素 → 哈希值
前 p 位 → 桶编号
剩余位 → 前导零
寄存器[桶号] = max(当前值, 前导零)
哈希函数将任意输入均匀映射到 [0, 2^64-1] 区间,这保证了:
如果使用 m = 2^p 个桶,则估计的标准误差约为:
Standard Error (SE) ≈ 1.04 / √m
或者用桶数表示:SE ≈ 1.04 / 2^(p/2)
常见配置:
| 桶数 (m) | 精度参数 (p) | 标准误差 | 内存占用 |
|---|---|---|---|
| 2^10 = 1024 | 10 | ≈ 3.25% | ~8 KB |
| 2^12 = 4096 | 12 | ≈ 1.63% | ~32 KB |
| 2^14 = 16384 | 14 | ≈ 0.81% | ~128 KB |
| 2^16 = 65536 | 16 | ≈ 0.41% | ~512 KB |
如果我们直接对每个桶的最大前导零取平均,会受到异常值的严重影响。 HyperLogLog 的创新在于使用调和平均数:
Harmonic Mean = m / Σ(1/xᵢ)
其中 xᵢ 是第 i 个桶的 2^Rᵢ
调和平均数对小值更敏感,能更好地处理稀疏桶的情况。
HyperLogLog 的基数估计公式:
Estimate = αₘ × m² × (Σ 2^(-Rᵢ))⁻¹
其中:
• αₘ 是偏差校正系数
• m 是桶的数量
• Rᵢ 是第 i 个桶中记录的最大前导零数
偏差校正系数 αₘ:
αₘ = (0.7213 / (1 + 1.079/m)) 对于 m ≥ 128
假设有 16 个桶 (m=16, p=4),下面是可能的寄存器状态:
高亮的寄存器表示观察到了较大的前导零数量,意味着可能有更多不同的元素。
class HyperLogLog {
constructor(p = 14) {
this.p = p; // 精度参数
this.m = 1 << p; // 桶的数量 = 2^p
this.registers = new Array(this.m).fill(0);
this.alpha = this._getAlpha();
this.count = 0; // 实际插入的元素数(用于小基数校正)
}
_getAlpha() {
if (this.m >= 128) {
return 0.7213 / (1 + 1.079 / this.m);
}
return 1.0; // 小规模时的近似值
}
// MurmurHash3 的简化版本
_hash(value) {
let h = 0x811c9dc5;
const str = String(value);
for (let i = 0; i < str.length; i++) {
h ^= str.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
return h >>> 0; // 转为无符号整数
}
// 添加一个元素
add(value) {
this.count++;
const hash = this._hash(value);
// 提取前 p 位作为桶编号
const bucket = hash & (this.m - 1);
// 计算剩余比特中前导零的数量
// 跳过前 p 位,从第 p 位开始
const shifted = hash >>> this.p;
const zeros = this._countLeadingZeros(shifted);
// 更新寄存器的最大值
this.registers[bucket] = Math.max(this.registers[bucket], zeros);
}
// 计算前导零的数量
_countLeadingZeros(value) {
if (value === 0) return 32; // 假设32位整数
let count = 0;
while ((value & (1 << 31)) === 0 && count < 32) {
count++;
value <<= 1;
}
return count;
}
// 估计不同元素的数量
countDistinct() {
// 小基数校正:当实际元素数很小时,使用精确计数
if (this.count < this.m * 2.5) {
return new Set(this.registers.filter(r => r > 0)).size;
}
// HyperLogLog 估计公式
let sum = 0;
for (let i = 0; i < this.m; i++) {
sum += 1 / (1 << this.registers[i]);
}
const estimate = this.alpha * this.m * this.m / sum;
// 修正小估计值
return Math.floor(estimate);
}
// 合并两个 HyperLogLog
merge(other) {
if (this.m !== other.m) {
throw new Error('Cannot merge HLL with different precision');
}
for (let i = 0; i < this.m; i++) {
this.registers[i] = Math.max(this.registers[i], other.registers[i]);
}
}
}
// 测试
const hll = new HyperLogLog(12); // 4096 个桶
// 模拟插入 100000 个不同的元素
for (let i = 0; i < 100000; i++) {
hll.add(`element_${i}_${Math.random()}`);
}
console.log(`实际不同元素数: 100000`);
console.log(`HLL 估计值: ${hll.countDistinct()}`);
console.log(`误差率: ${Math.abs(hll.countDistinct() - 100000) / 100000 * 100}%`);
import math
import random
class HyperLogLog:
def __init__(self, p=14):
self.p = p # 精度参数
self.m = 1 << p # 桶的数量 = 2^p
self.registers = [0] * self.m
self.alpha = self._get_alpha()
self.count = 0 # 实际插入的元素数(用于小基数校正)
def _get_alpha(self):
if self.m >= 128:
return 0.7213 / (1 + 1.079 / self.m)
return 1.0 # 小规模时的近似值
# MurmurHash3 的简化版本
def _hash(self, value):
h = 0x811c9dc5
string = str(value)
for char in string:
h ^= ord(char)
h = (h * 0x01000193) & 0xFFFFFFFF
return h
# 添加一个元素
def add(self, value):
self.count += 1
hash_val = self._hash(value)
# 提取前 p 位作为桶编号
bucket = hash_val & (self.m - 1)
# 计算剩余比特中前导零的数量
shifted = hash_val >> self.p
zeros = self._count_leading_zeros(shifted)
# 更新寄存器的最大值
self.registers[bucket] = max(self.registers[bucket], zeros)
# 计算前导零的数量
def _count_leading_zeros(self, value):
if value == 0:
return 32 # 假设32位整数
count = 0
while (value & 0x80000000) == 0 and count < 32:
count += 1
value <<= 1
return count
# 估计不同元素的数量
def count_distinct(self):
# 小基数校正:当实际元素数很小时,使用精确计数
if self.count < self.m * 2.5:
return len(set(self.registers))
# HyperLogLog 估计公式
sum_val = 0
for reg in self.registers:
sum_val += 1 / (1 << reg)
estimate = self.alpha * self.m * self.m / sum_val
# 修正小估计值
return int(estimate)
# 合并两个 HyperLogLog
def merge(self, other):
if self.m != other.m:
raise ValueError('Cannot merge HLL with different precision')
for i in range(self.m):
self.registers[i] = max(self.registers[i], other.registers[i])
# 测试
if __name__ == "__main__":
hll = HyperLogLog(p=12) # 4096 个桶
# 模拟插入 100000 个不同的元素
for i in range(100000):
hll.add(f"element_{i}_{random.random()}")
print(f"实际不同元素数: 100000")
print(f"HLL 估计值: {hll.count_distinct()}")
print(f"误差率: {abs(hll.count_distinct() - 100000) / 100000 * 100}%")
package main
import (
"fmt"
"math"
"math/rand"
)
type HyperLogLog struct {
p int
m int
registers []int
alpha float64
count int
}
func NewHyperLogLog(p int) *HyperLogLog {
m := 1 << p
hll := &HyperLogLog{
p: p,
m: m,
registers: make([]int, m),
count: 0,
}
hll.alpha = hll.getAlpha()
return hll
}
func (h *HyperLogLog) getAlpha() float64 {
if h.m >= 128 {
return 0.7213 / (1 + 1.079/float64(h.m))
}
return 1.0
}
// MurmurHash3 的简化版本
func (h *HyperLogLog) hash(value string) uint32 {
var hVal uint32 = 0x811c9dc5
for i := 0; i < len(value); i++ {
hVal ^= uint32(value[i])
hVal = (hVal * 0x01000193) & 0xFFFFFFFF
}
return hVal
}
// 添加一个元素
func (h *HyperLogLog) Add(value string) {
h.count++
hashVal := h.hash(value)
// 提取前 p 位作为桶编号
bucket := int(hashVal & uint32(h.m-1))
// 计算剩余比特中前导零的数量
shifted := hashVal >> uint(h.p)
zeros := h.countLeadingZeros(shifted)
// 更新寄存器的最大值
if zeros > h.registers[bucket] {
h.registers[bucket] = zeros
}
}
// 计算前导零的数量
func (h *HyperLogLog) countLeadingZeros(value uint32) int {
if value == 0 {
return 32
}
count := 0
for (value&0x80000000) == 0 && count < 32 {
count++
value <<= 1
}
return count
}
// 估计不同元素的数量
func (h *HyperLogLog) CountDistinct() int {
// 小基数校正:当实际元素数很小时,使用精确计数
if h.count < h.m*5/2 {
unique := make(map[int]bool)
for _, r := range h.registers {
if r > 0 {
unique[r] = true
}
}
return len(unique)
}
// HyperLogLog 估计公式
var sum float64
for _, reg := range h.registers {
sum += 1 / math.Pow(2, float64(reg))
}
estimate := h.alpha * float64(h.m) * float64(h.m) / sum
// 修正小估计值
return int(estimate)
}
// 合并两个 HyperLogLog
func (h *HyperLogLog) Merge(other *HyperLogLog) {
if h.m != other.m {
panic("Cannot merge HLL with different precision")
}
for i := 0; i < h.m; i++ {
if other.registers[i] > h.registers[i] {
h.registers[i] = other.registers[i]
}
}
}
func main() {
hll := NewHyperLogLog(12) // 4096 个桶
// 模拟插入 100000 个不同的元素
for i := 0; i < 100000; i++ {
hll.Add(fmt.Sprintf("element_%d_%f", i, rand.Float64()))
}
fmt.Printf("实际不同元素数: 100000\n")
fmt.Printf("HLL 估计值: %d\n", hll.CountDistinct())
diff := float64(hll.CountDistinct() - 100000)
fmt.Printf("误差率: %.2f%%\n", math.Abs(diff)/100000*100)
}
| 方案 | 空间复杂度 | 精度 | 优点 | 缺点 |
|---|---|---|---|---|
| 精确计数 (Set) | O(n) | 100% | 精确 | 内存随 n 增长 |
| LogLog | O(1) | ≈ 1.3 / √m | 空间小 | 精度略低 |
| HyperLogLog | O(1) | ≈ 1.04 / √m | 精度高、空间小 | 概率性、有小偏差 |
| Count-Min Sketch | O(1) | 可配置 | 支持删除 | 需要更多空间 |
Redis 的 PFCOUNT 命令底层就是 HyperLogLog,用于快速计算 UV(独立访客数)。
在流处理系统中,需要实时估计不同用户数、页面访问量等,HLL 是理想选择。
检测网络攻击时,估计不同 IP 地址的数量,HLL 可以在有限内存下处理海量数据。
估计不同用户对某个内容的交互数量,用于计算热度、相关性等指标。
Google 在 2013 年提出的改进版,主要优化了:
当基数较小时(小于 m × 10),使用线性计数代替 HLL 估计,可提高小基数时的精度。
// Redis 命令示例
PFADD mykey "user1" "user2" "user3" // 添加元素
PFCOUNT mykey // 估计基数
PFMERGE mergedkey key1 key2 // 合并两个 HLL