从实际生活中的场景来理解限流:一个人能够挑100斤的担子,如果给他的肩膀上放150
斤的重物,他可能直接就趴下了,运输能力变成了0,所以我们必须保障给他肩上加的重物不超过100斤。
限流也是同样的道理,通过限流让系统工作在最高吞吐量的水位上,防止系统被击垮。
四个限流器:
单机QPS - RateLimiter -- guava
单机并发 - Semaphore -- memory
多机QPS - redis key -- redis
多机并发 - RPermitExpirableSemaphore -- redission
单机QPS - RateLimiter -- guava
通过guava的ratelimiter,实现了令牌桶算法,它不是记录上次的请求时间,而是保存下次请求到来的时间
阻塞方法 tryAccuire()
非阻塞方法 tryAccuire()带超时时间,表示允许等待该超时时间
单机并发 - Semaphore -- memory
操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
阻塞方法 acquire()
非阻塞方法 tryAcquire()
多机QPS - redis key -- redis
redis key incr方法
计数器是 Redis 的原子性自增操作可实现的最直观的模式了,它的想法相当简单:每当某个操作发生时,向 Redis 发送一个 命令。
比如在一个 web 应用程序中,如果想知道用户在一年中每天的点击量,那么只要将用户 ID 以及相关的日期信息作为键,并在每次用户点击页面时,执行一次自增操作即可。
1. 通过lua脚本实现原子性
local currentcurrent = redis.call("incr",KEYS[1])if tonumber(current) == 1 then redis.call("expire",KEYS[1],1)end
2. 思路是每秒进来的请求都是同一个key,无需设置超时时间,会有些许的精度丢失
protected static long getCurrentSecond() { return System.currentTimeMillis() / 1000; }--Long currentSecond = getCurrentSecond(); String key = config.getId() + currentSecond; long currentLimit = jedis.incr(key); if (currentLimit == 1) { jedis.expire(key, (int) (long) config.getExpireTime()); } if (currentLimit > config.getLimit()) { return false; } else { return true; }
3.思路3 使用redis的列表 + redis事务
FUNCTION LIMIT_API_CALL(ip)current = LLEN(ip)IF current > 10 THEN ERROR "too many requests per second"ELSE IF EXISTS(ip) == FALSE MULTI RPUSH(ip,ip) EXPIRE(ip,1) EXEC ELSE RPUSHX(ip,ip) END PERFORM_API_CALL()END
多机并发 - RPermitExpirableSemaphore -- redission
Redisson的可过期性信号量(PermitExpirableSemaphore)实在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。
两个算法
1、令牌桶算法 :
(1)每秒会有r个令牌放入桶中,或者说,每过1/r秒桶中增加一个令牌。(2)桶中最多存放b个令牌,如果桶满了,新放入的令牌会被丢弃。(3)当一个n字节的数据包到达时,消耗n个令牌,请求通过。(4)如果桶中可用令牌小于n,请求被拒绝实现 Guava RateLimiter
2、漏桶算法:
水(请求)先进入漏桶里,漏桶以一定的速度出水,当水流入速度过大时会直接溢出
http://ifeve.com/guava-ratelimiter/
//平滑限流 if (!needRateSmooth) { return rateLimiter.tryAcquire(); //直接返回false } else { return rateLimiter.tryAcquire(1, TimeUnit.SECONDS); //在1秒内等待 }
if (needRateSmooth) { long currentTimeMicros = getCurrentMicroSecond(); long expectedTimeMicros = getNextExecutableMicros(config, currentLimit); if (currentTimeMicros <= expectedTimeMicros) { long needToParkMicros = expectedTimeMicros - currentTimeMicros; if (needToParkMicros > 100) { LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(needToParkMicros)); } else if (needToParkMicros > 15) { while (getCurrentMicroSecond() < expectedTimeMicros) { Thread.yield(); } } else { while (getCurrentMicroSecond() < expectedTimeMicros) ; } } }