前言

十年前你说生如夏花般绚烂,十年后你说平凡才是唯一的答案.没有经历生如夏花,平凡不是我现在想要的答案.

一.令牌桶算法

令牌桶算法的概念里总共有2个桶: 一个桶叫“令牌桶”,里面装满了令牌,明确标明最多装有多少令牌,这些令牌就是调用的次数,调用一次API会消耗一个令牌,令牌没了将不能调用API。这就是单位时间内能调用多少次API,最多调用的次数即“令牌桶”中令牌的最大承载数。 而另一个桶叫“补桶”,补桶里装有所有的令牌,令牌数等于这个API的“总可调用的次数”,“补桶”定期向“令牌桶”里补令牌,每次补令牌后“令牌桶”中的令牌的总数不能多于令牌桶的最大承载数。因为“令牌桶”用一次就花掉一个令牌,而补桶多久向令牌桶补一次令牌,即是“限流的单位时间”。两个桶进行协调合作就能完成限流。 在对API进行限流时,一般补桶中的令牌可以视为无穷大。  

二.令牌桶实践

2.1.谷歌guava工具RateLimiter

①maven导入guava

<!-- 分布式 https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

②注解类Limit

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface Limit {

    /**
     * 资源key
     *
     * @return
     */
    String key() default "";

    /**
     * 最多访问次数
     *
     * @return
     */
    double permitsPerSecond();

    /**
     * 时间
     *
     * @return
     */
    long timeout();

    /**
     * 时间类型
     *
     * @return
     */
    TimeUnit timeunit() default TimeUnit.MILLISECONDS;

    /**
     * 提示信息
     *
     * @return
     */
    String msg() default "系统繁忙,请稍后再试";
}

③Aop拦截LimitAspect

@Slf4j
@Aspect
@Component
public class LimitAspect {
    private final Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();

    @Around("@annotation(com.yy.sro.api.annotation.Limit)")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature signature = (MethodSignature)pjp.getSignature();
        Method method = signature.getMethod();
        // 拿limit的注解
        Limit limit = method.getAnnotation(Limit.class);
        if (limit != null) {
            // key作用:不同的接口,不同的流量控制
            String key = limit.key();
            RateLimiter rateLimiter;
            // 验证缓存是否有命中key
            if (!limitMap.containsKey(key)) {
                // 创建令牌桶
                rateLimiter = RateLimiter.create(limit.permitsPerSecond());
                limitMap.put(key, rateLimiter);
                log.info("新建了令牌桶==>{}, 容量==>{}", key, limit.permitsPerSecond());
            }
            rateLimiter = limitMap.get(key);
            // 拿令牌
            boolean acquire = rateLimiter.tryAcquire(limit.timeout(), limit.timeunit());
            // 拿不到命令,直接返回异常提示
            if (!acquire) {
                log.debug("令牌桶==>{}, 获取令牌失败", key);
                throw new LimitException(limit.msg());
            }
        }
        return pjp.proceed();
    }
}

④异常拦截处理

public class LimitException extends RuntimeException{

    private static final long serialVersionUID = -5580240629530833877L;

    protected String errorCode;

    protected String errorMsg;

    public LimitException(String errorMsg) {
        super(errorMsg);
        this.errorMsg = errorMsg;
    }

    public LimitException(Throwable cause) {
        super(cause);
    }

    public LimitException(String errorMsg, Throwable cause) {
        super(errorMsg, cause);
        this.errorMsg = errorMsg;
    }

    public LimitException(String errorCode, String errorMsg) {
        super(errorMsg);
        this.errorCode = errorCode;
        this.errorMsg = errorMsg;
    }

    public LimitException(String errorCode, String errorMsg, Throwable cause) {
        super(errorMsg, cause);
        this.errorCode = errorCode;
        this.errorMsg = errorMsg;
    }

    @Override
    public Throwable fillInStackTrace() {
        return this;
    }

    public String getErrorCode() {
        return errorCode;
    }

    public void setErrorCode(String errorCode) {
        this.errorCode = errorCode;
    }

    public String getErrorMsg() {
        return errorMsg;
    }

    public void setErrorMsg(String errorMsg) {
        this.errorMsg = errorMsg;
    }
}
Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {

    /**
     * 处理自定义异常
     *
     * @param req
     * @param e
     * @return
     */
    @ExceptionHandler(value = LimitException.class)
    @ResponseBody
    public DResult LimitExceptionHandler(HttpServletRequest req, LimitException e) {
        log.error("发生业务异常!原因是:{}", e.getErrorMsg());
        if (StrUtil.isNotBlank(e.getErrorCode())) {
            return DResult.failed(e.getErrorCode(), e.getErrorMsg());
        }else{
            return DResult.failed(ResultCode.KMS_UNKNOWN_ERROR.getCode(), e.getErrorMsg());
        }
    }

}

⑤接口访问验证

@Slf4j
@RestController
@RequestMapping("index/v1/")
public class IndexController {

    @Limit(key = "limitApi", permitsPerSecond = 1, timeout = 500, msg = "当前排队人数较多,请稍后再试!")
    @GetMapping("limit1")
    public DResult limitApi(){
        User user = new User();
        user.setId(System.currentTimeMillis());
        user.setName(UUID.randomUUID().toString());
        user.setRegisterDate(new Date());
        return DResult.of(user);
    }

}

⑥接口验证结果

{"data":null,"code":"400","msg":"当前排队人数较多,请稍后再试!"}

2.2.Redis+Lua实现

①maven引入以及相关配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

②注解类RedisLimit

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface RedisLimit {

    /**
     * 资源名称
     *
     * @return
     */
    String name() default "";

    /**
     * 资源key
     *
     * @return
     */
    String key() default "";

    /**
     * 前缀
     *
     * @return
     */
    String prefix() default "";

    /**
     * 时间
     *
     * @return
     */
    long period();

    /**
     * 最多访问次数
     *
     * @return
     */
    long count();

    /**
     * 类型
     *
     * @return
     */
    LimitType limitType() default LimitType.CUSTOMER;

    /**
     * 提示信息
     *
     * @return
     */
    String msg() default "系统繁忙,请稍后再试";
}

③Aop处理RedisLimitAspect

@Slf4j
@Aspect
@Component
public class RedisLimitAspect {

    private final RedisTemplate<String, Object> redisTemplate;

    public RedisLimitAspect(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Around("@annotation(com.yy.sro.api.annotation.RedisLimit)")
    public Object around(ProceedingJoinPoint pjp) {
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        RedisLimit annotation = method.getAnnotation(RedisLimit.class);
        LimitType limitType = annotation.limitType();

        String name = annotation.name();
        String key;

        long period = annotation.period();
        long count = annotation.count();

        switch (limitType) {
            case IP:
                key = IpUtils.getRequestIp();
                break;
            case CUSTOMER:
                key = annotation.key();
                break;
            default:
                key = StringUtils.upperCase(method.getName());
        }
        ImmutableList<String> keys = ImmutableList.of(StringUtils.join(annotation.prefix(), key));
        try {
            String luaScript = buildLuaScript();
            // Redis int类型对应long
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);
            Long number = redisTemplate.execute(redisScript, keys, count, period);
            log.info("Access try count is {} for name = {} and key = {}", number, name, key);
            if (number != null && number.intValue() == 1) {
                return pjp.proceed();
            }
            throw new LimitException(annotation.msg());
        } catch (Throwable e) {
            if (e instanceof LimitException) {
                log.debug("令牌桶={},获取令牌失败", key);
                throw new LimitException(e.getLocalizedMessage());
            }
            e.printStackTrace();
            throw new RuntimeException("服务器异常");
        }
    }

    public String buildLuaScript() {
        return "redis.replicate_commands(); local listLen,time" +
                "\nlistLen = redis.call('LLEN', KEYS[1])" +
                // 不超过最大值,则直接写入时间
                "\nif listLen and tonumber(listLen) < tonumber(ARGV[1]) then" +
                "\nlocal a = redis.call('TIME');" +
                "\nredis.call('LPUSH', KEYS[1], a[1]*1000000+a[2])" +
                "\nelse" +
                // 取出现存的最早的那个时间,和当前时间比较,看是小于时间间隔
                "\ntime = redis.call('LINDEX', KEYS[1], -1)" +
                "\nlocal a = redis.call('TIME');" +
                "\nif a[1]*1000000+a[2] - time < tonumber(ARGV[2])*1000000 then" +
                // 访问频率超过了限制,返回0表示失败
                "\nreturn 0;" +
                "\nelse" +
                "\nredis.call('LPUSH', KEYS[1], a[1]*1000000+a[2])" +
                "\nredis.call('LTRIM', KEYS[1], 0, tonumber(ARGV[1])-1)" +
                "\nend" +
                "\nend" +
                "\nreturn 1;";
    }
}

④接口验证

@Slf4j
@RestController
@RequestMapping("index/v1/")
public class IndexController {

@RedisLimit(key = "limitApiRedis", count = 2, period = 2, msg = "当前排队人数较多,请稍后再试!")
@GetMapping("limit2")
public DResult limitApiRedis(){
User user = new User();
user.setId(System.currentTimeMillis());
user.setName(UUID.randomUUID().toString());
user.setRegisterDate(new Date());
return DResult.of(user);
}
}

⑤结果

{"data":null,"code":"400","msg":"当前排队人数较多,请稍后再试!"}

 

今夜江河之源,只亮我的酥油灯,只照我的心上人.