Skip to content

Commit

Permalink
add redis-cluster test case
Browse files Browse the repository at this point in the history
  • Loading branch information
theweakgod committed Feb 8, 2024
1 parent 680cacf commit 4219d33
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 398 deletions.
115 changes: 20 additions & 95 deletions apisix/plugins/limit-req.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local limit_req_new = require("resty.limit.req").new
local core = require("apisix.core")
local plugin_name = "limit-req"
local limit_req_new = require("resty.limit.req").new
local core = require("apisix.core")
local redis_schema = require("apisix.utils.redis-schema")
local policy_to_additional_properties = redis_schema.schema
local plugin_name = "limit-req"
local sleep = core.sleep

local redis_single_new
local redis_cluster_new
do
local redis_src = "apisix.plugins.limit-req.limit-req-redis-single"
local redis_src = "apisix.plugins.limit-req.limit-req-redis"
redis_single_new = require(redis_src).new

local cluster_src = "apisix.plugins.limit-req.limit-req-redis-cluster"
Expand All @@ -34,74 +36,7 @@ local lrucache = core.lrucache.new({
type = "plugin",
})

local counter_type_to_additional_properties = {
redis = {
properties = {
redis_host = {
type = "string", minLength = 2
},
redis_port = {
type = "integer", minimum = 1, default = 6379,
},
redis_username = {
type = "string", minLength = 1,
},
redis_password = {
type = "string", minLength = 0,
},
redis_prefix = {
type = "string", minLength = 0, default = "limit_req", pattern = "^[0-9a-zA-Z|_]+$"
},
redis_database = {
type = "integer", minimum = 0, default = 0,
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
redis_ssl = {
type = "boolean", default = false,
},
redis_ssl_verify = {
type = "boolean", default = false,
},
},
required = {"redis_host"},
},
["shared-dict"] = {
properties = {
},
},
["redis-cluster"] = {
properties = {
redis_cluster_nodes = {
type = "array",
minItems = 2,
items = {
type = "string", minLength = 2, maxLength = 100
},
},
redis_password = {
type = "string", minLength = 0,
},
redis_prefix = {
type = "string", minLength = 0, default = "limit_req", pattern = "^[0-9a-zA-Z|_]+$"
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
redis_cluster_name = {
type = "string",
},
redis_cluster_ssl = {
type = "boolean", default = false,
},
redis_cluster_ssl_verify = {
type = "boolean", default = false,
},
},
required = {"redis_cluster_nodes", "redis_cluster_name"},
},
}

local schema = {
type = "object",
properties = {
Expand All @@ -112,10 +47,10 @@ local schema = {
enum = {"var", "var_combination"},
default = "var",
},
counter_type = {
policy = {
type = "string",
enum = {"redis", "redis-cluster", "shared-dict"},
default = "shared-dict",
enum = {"redis", "redis-cluster", "local"},
default = "local",
},
rejected_code = {
type = "integer", minimum = 200, maximum = 599, default = 503
Expand All @@ -131,31 +66,21 @@ local schema = {
required = {"rate", "burst", "key"},
["if"] = {
properties = {
counter_type = {
policy = {
enum = {"redis"},
},
},
},
["then"] = counter_type_to_additional_properties.redis,
["then"] = policy_to_additional_properties.redis,
["else"] = {
["if"] = {
properties = {
counter_type = {
enum = {"shared-dict"},
policy = {
enum = {"redis-cluster"},
},
},
},
["then"] = counter_type_to_additional_properties["shared-dict"],
["else"] = {
["if"] = {
properties = {
counter_type = {
enum = {"redis-cluster"},
},
},
},
["then"] = counter_type_to_additional_properties["redis-cluster"],
}
["then"] = policy_to_additional_properties["redis-cluster"],
}
}

Expand All @@ -179,22 +104,22 @@ end


local function create_limit_obj(conf)
if conf.counter_type == "shared-dict" then
if conf.policy == "local" then
core.log.info("create new limit-req plugin instance")
return limit_req_new(shdict_name, conf.rate, conf.burst)
elseif conf.counter_type == "redis" then
return limit_req_new("plugin-limit-req", conf.rate, conf.burst)
elseif conf.policy == "redis" then

core.log.info("create new limit-req redis plugin instance")

return redis_single_new("plugin-limit-req", conf, conf.rate, conf.burst)

elseif conf.counter_type == "redis-cluster" then
elseif conf.policy == "redis-cluster" then

core.log.info("create new limit-req redis-cluster plugin instance")

return redis_cluster_new("plugin-limit-req", conf, conf.rate, conf.burst)
else
return nil, "counter_type enum not match"
return nil, "policy enum not match"
end
end

Expand Down
120 changes: 24 additions & 96 deletions apisix/plugins/limit-req/limit-req-redis-cluster.lua
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
local rediscluster = require("resty.rediscluster")
local core = require("apisix.core")
local assert = assert
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local redis_cluster = require("apisix.utils.rediscluster")
local setmetatable = setmetatable
local math = require "math"
local abs = math.abs
local floor = math.floor
local max = math.max
local ipairs = ipairs
local ngx_now = ngx.now
local ngx_null = ngx.null
local util = require("apisix.plugins.limit-req.util")

local _M = {version = 0.1}

Expand All @@ -18,106 +26,26 @@ local mt = {
}


local function new_redis_cluster(conf)
local config = {
name = conf.redis_cluster_name,
serv_list = {},
read_timeout = conf.redis_timeout,
auth = conf.redis_password,
dict_name = "plugin-limit-req-redis-cluster-slot-lock",
connect_opts = {
ssl = conf.redis_cluster_ssl,
ssl_verify = conf.redis_cluster_ssl_verify,
}
}

for i, conf_item in ipairs(conf.redis_cluster_nodes) do
local host, port, err = core.utils.parse_addr(conf_item)
if err then
return nil, "failed to parse address: " .. conf_item
.. " err: " .. err
end

config.serv_list[i] = {ip = host, port = port}
end

local red_cli, err = rediscluster:new(config)
function _M.new(plugin_name, conf, rate, burst)
local red_cli, err = redis_cluster.new(conf, "plugin-limit-req-redis-cluster-slot-lock")
if not red_cli then
return nil, "failed to new redis cluster: " .. err
return nil, err
end

return red_cli
end


function _M.new(plugin_name, conf, rate, burst)
local self = {
conf = conf,
plugin_name = plugin_name,
burst = burst * 1000,
rate = rate * 1000,
red_cli = red_cli,
}
return setmetatable(self, mt)
end


-- the "commit" argument controls whether should we record the event in shm.
function _M.incoming(self, key, commit)
local rate = self.rate
local now = ngx_now() * 1000
local conf = self.conf

local excess

-- init redis
local red, err = new_redis_cluster(conf)
if not red then
return red, err
end

local prefix = conf.redis_prefix
key = prefix .. ":" .. key

local excess, err = red:hget(key, "excess")
if err then
return nil, err
end
local last, err = red:hget(key, "last")
if err then
return nil, err
end
core.log.error("excess: ", excess)
core.log.error("last: ", last)
if excess ~= ngx_null and last ~= ngx_null then
excess = tonumber(excess)
last = tonumber(last)
local elapsed = now - last
excess = max(excess - rate * abs(elapsed) / 1000 + 1000, 0)

if excess > self.burst then
return nil, "rejected"
end
else
excess = 0
end

if commit then
local ok
local err
ok, err = red:hset(key, "excess", excess)
if not ok then
return nil, err
end

ok, err = red:hset(key, "last", now)
if not ok then
return nil, err
end
end

-- return the delay in seconds, as well as excess
return excess / rate, excess / 1000
return util.incoming(self, self.red_cli, key, commit)
end



return _M
Loading

0 comments on commit 4219d33

Please sign in to comment.