保护系统:
- 防止系统过载
- 保证服务稳定性
- 公平分配资源
应用场景:
- API限流
- 秒杀系统
- 防止爬虫
- DDoS防护
在固定时间窗口内统计请求数。
import (
"sync"
"time"
)
type FixedWindowLimiter struct {
limit int
window time.Duration
counter int
lastTime time.Time
mu sync.Mutex
}
func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
limit: limit,
window: window,
lastTime: time.Now(),
}
}
func (l *FixedWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// 新窗口,重置计数器
if now.Sub(l.lastTime) >= l.window {
l.counter = 0
l.lastTime = now
}
// 检查是否超限
if l.counter < l.limit {
l.counter++
return true
}
return false
}缺点:窗口边界突刺问题
将时间窗口分为多个小格子,更精细地控制流量。
import (
"sync"
"time"
)
type SlidingWindowLimiter struct {
limit int
window time.Duration
slotCount int
slots []int
slotTime time.Duration
currentIdx int
lastTime time.Time
mu sync.Mutex
}
func NewSlidingWindowLimiter(limit int, window time.Duration, slotCount int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
limit: limit,
window: window,
slotCount: slotCount,
slots: make([]int, slotCount),
slotTime: window / time.Duration(slotCount),
lastTime: time.Now(),
}
}
func (l *SlidingWindowLimiter) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// 计算时间差
elapsed := now.Sub(l.lastTime)
slotsToMove := int(elapsed / l.slotTime)
// 移动窗口
if slotsToMove > 0 {
if slotsToMove >= l.slotCount {
// 清空所有槽
for i := range l.slots {
l.slots[i] = 0
}
} else {
// 移动部分槽
for i := 0; i < slotsToMove; i++ {
l.currentIdx = (l.currentIdx + 1) % l.slotCount
l.slots[l.currentIdx] = 0
}
}
l.lastTime = now
}
// 统计当前窗口内的请求数
total := 0
for _, count := range l.slots {
total += count
}
if total < l.limit {
l.slots[l.currentIdx]++
return true
}
return false
}- 固定速率产生令牌
- 桶有容量上限
- 请求消耗令牌
import (
"sync"
"time"
)
type TokenBucket struct {
capacity int // 桶容量
tokens int // 当前令牌数
rate int // 每秒产生令牌数
lastTokenTime time.Time
mu sync.Mutex
}
func NewTokenBucket(capacity, rate int) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastTokenTime: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
// 计算应该产生的令牌数
elapsed := now.Sub(tb.lastTokenTime)
tokensToAdd := int(elapsed.Seconds() * float64(tb.rate))
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastTokenTime = now
}
// 消耗令牌
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
// AllowN 消耗N个令牌
func (tb *TokenBucket) AllowN(n int) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastTokenTime)
tokensToAdd := int(elapsed.Seconds() * float64(tb.rate))
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastTokenTime = now
}
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}优点:
- 允许突发流量(桶满时)
- 平滑限流
- 支持预消费
- 固定速率处理请求
- 请求进入队列
- 队列满则拒绝
import (
"sync"
"time"
)
type LeakyBucket struct {
capacity int // 桶容量
water int // 当前水量
rate int // 漏水速率(每秒)
lastLeakTime time.Time
mu sync.Mutex
}
func NewLeakyBucket(capacity, rate int) *LeakyBucket {
lb := &LeakyBucket{
capacity: capacity,
water: 0,
rate: rate,
lastLeakTime: time.Now(),
}
// 启动漏水协程
go lb.leak()
return lb
}
func (lb *LeakyBucket) Allow() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
// 检查桶是否满
if lb.water < lb.capacity {
lb.water++
return true
}
return false
}
func (lb *LeakyBucket) leak() {
ticker := time.NewTicker(time.Second / time.Duration(lb.rate))
defer ticker.Stop()
for range ticker.C {
lb.mu.Lock()
if lb.water > 0 {
lb.water--
}
lb.mu.Unlock()
}
}优点:
- 流量整形
- 平滑输出
缺点:
- 无法应对突发流量
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = redis.call('TIME')
local timestamp = tonumber(now[1]) + tonumber(now[2]) / 1000000
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or timestamp
local delta = math.max(0, timestamp - last_time)
local new_tokens = math.min(capacity, tokens + delta * rate)
if new_tokens >= requested then
new_tokens = new_tokens - requested
redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', timestamp)
redis.call('EXPIRE', key, 3600)
return 1
else
return 0
endimport (
"context"
"github.com/go-redis/redis/v8"
)
type RedisRateLimiter struct {
client *redis.Client
script *redis.Script
capacity int
rate float64
}
func NewRedisRateLimiter(client *redis.Client, capacity int, rate float64) *RedisRateLimiter {
script := redis.NewScript(`
-- Lua脚本内容
`)
return &RedisRateLimiter{
client: client,
script: script,
capacity: capacity,
rate: rate,
}
}
func (rl *RedisRateLimiter) Allow(ctx context.Context, key string) (bool, error) {
result, err := rl.script.Run(ctx, rl.client, []string{key},
rl.capacity, rl.rate, 1).Int()
if err != nil {
return false, err
}
return result == 1, nil
}| 算法 | 突发流量 | 平滑性 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | ❌ | 低 | 简单 | 粗粒度限流 |
| 滑动窗口 | ✅ | 中 | 中等 | API限流 |
| 令牌桶 | ✅ | 高 | 中等 | 网络流量整形 |
| 漏桶 | ❌ | 高 | 中等 | 削峰填谷 |
- API限流:滑动窗口或令牌桶
- 秒杀系统:令牌桶(预热)
- 防爬虫:固定窗口(IP维度)
- 流量整形:漏桶
type MultiLevelLimiter struct {
globalLimiter *TokenBucket // 全局限流
userLimiter map[string]*TokenBucket // 用户级限流
ipLimiter map[string]*TokenBucket // IP级限流
}
func (ml *MultiLevelLimiter) Allow(userID, ip string) bool {
// 1. 全局限流
if !ml.globalLimiter.Allow() {
return false
}
// 2. 用户级限流
if limiter, ok := ml.userLimiter[userID]; ok {
if !limiter.Allow() {
return false
}
}
// 3. IP级限流
if limiter, ok := ml.ipLimiter[ip]; ok {
if !limiter.Allow() {
return false
}
}
return true
}关键要点:
- ✅ 令牌桶支持突发流量,最常用
- ✅ 漏桶平滑输出,适合流量整形
- ✅ 滑动窗口解决固定窗口的边界问题
- ✅ Redis实现分布式限流
💡 思考题:
- 令牌桶和漏桶的区别是什么?
- 如何实现分布式限流?
- 秒杀系统应该选择哪种限流算法?