lua-resty-upstream

上游连接负载均衡和故障转移模块

$ opm get hamishforbes/lua-resty-upstream

lua-resty-upstream

上游连接负载均衡和故障转移模块

状态

实验性,API 可能会在未经预告的情况下更改。

需要 ngx_lua > 0.9.5

概述

创建一个 lua 共享字典。在 init_by_lua 中定义你的上游池和主机,这将被保存到共享字典中。

使用 connect 方法返回一个已连接的 tcp 套接字

或者传入一个 resty 模块(例如 lua-resty-redislua-resty-http),该模块实现了 connect()set_timeout()

调用 process_failed_hosts 处理失败的主机,而不会阻塞当前请求。

使用 resty.upstream.api 在初始化或运行时修改上游配置,建议这样做!

resty.upstream.http 包装了来自 @pintsized 的 lua-resty-http

它允许根据 HTTP 状态代码以及套接字连接状态进行故障转移。

    lua_shared_dict my_upstream_dict 1m;
    init_by_lua '
        upstream_socket  = require("resty.upstream.socket")
        upstream_api = require("resty.upstream.api")
    
        upstream, configured = upstream_socket:new("my_upstream_dict")
        if not upstream then
            error(configured)
        end
        api = upstream_api:new(upstream)
    
        if not configured then -- Only reconfigure on start, shared mem persists across a HUP
            api:create_pool({id = "primary", timeout = 100})
            api:set_priority("primary", 0)
            api:set_method("primary", "round_robin")
            api:add_host("primary", { id="a", host = "127.0.0.1", port = "80", weight = 10 })
            api:add_host("primary", { id="b", host = "127.0.0.1", port = "81",  weight = 10 })
    
            api:create_pool({id = "dr"})
            api:set_priority("dr", 10)
            api:add_host("dr", { host = "127.0.0.1", port = "82", weight = 5 })
            api:add_host("dr", { host = "127.0.0.1", port = "83", weight = 10 })
    
            api:create_pool({id = "test", priority = 5})
            api:add_host("primary", { id="c", host = "127.0.0.1", port = "82", weight = 10 })
            api:add_host("primary", { id="d", host = "127.0.0.1", port = "83", weight = 10 })
        end
    ';
    
    init_worker_by_lua 'upstream:init_background_thread()';
    
    server {
    
        location / {
            content_by_lua '
                local sock, err = upstream:connect()
                upstream:process_failed_hosts()
            ';
        }
    
    }

upstream.socket

新建

语法:upstream, configured = upstream_socket:new(dictionary, id?)

使用提供的字典名称返回一个新的上游对象。当在 init_by_lua 中调用时,如果字典中已存在配置,则返回一个额外的变量。接收一个可选的 id 参数,如果 upstream.socket 的多个实例使用相同的字典,则此参数必须唯一。

初始化后台线程

语法:ok, err = upstream:init_background_thread()

初始化后台线程,应在 init_worker_by_lua 中调用

连接

语法:ok, err = upstream:connect(client?, key?)

尝试按照定义的池的优先级顺序使用选定的负载均衡方法连接到主机。返回一个已连接的套接字和一个包含已连接的hostpoolidpool 的表,或者返回 nil 和错误消息。

当传入一个 套接字 或 resty 模块时,它将在成功连接后返回相同的对象,或者返回 nil。

此外,哈希方法可以接收一个可选的 key 来定义如何对连接进行哈希以确定主机。默认情况下使用 ngx.var.remote_addr。当池的方法是轮循时,此值将被忽略。

    resty_redis = require('resty.redis')
    local redis = resty_redis.new()
    
    local key = ngx.req.get_headers()["X-Forwarded-For"]
    
    local redis, err = upstream:connect(redis, key)
    
    if not redis then
        ngx.log(ngx.ERR, err)
        ngx.status = 500
        return ngx.exit(ngx.status)
    end
    
    ngx.log(ngx.info, 'Connected to ' .. err.host.host .. ':' .. err.host.port)
    local ok, err = redis:get('key')

处理失败的主机

语法:ok, err = upstream:process_failed_hosts()

处理当前请求中任何失败或恢复的主机。通过 ngx.timer.at 生成一个立即回调,不会阻塞当前请求。

获取池

语法:pools = usptream:get_pools()

返回一个包含当前池和主机配置的表,例如:

    {
        primary = {
            up = true,
            method = 'round_robin',
            timeout = 100,
            priority = 0,
            hosts = {
                web01 = {
                    host = "127.0.0.1",
                    weight = 10,
                    port = "80",
                    lastfail = 0,
                    failcount = 0,
                    up = true,
                    healthcheck = true
                }
                web02 = {
                    host = "127.0.0.1",
                    weight = 10,
                    port = "80",
                    lastfail = 0,
                    failcount = 0,
                    up = true,
                    healthcheck = { interval = 30, path = '/check' }
                }
            }
        },
        secondary = {
            up = true,
            method = 'round_robin',
            timeout = 2000,
            priority = 10,
            hosts = {
                dr01 = {
                    host = "10.10.10.1",
                    weight = 10,
                    port = "80",
                    lastfail = 0,
                    failcount = 0,
                    up = true
                }
    
            }
        },
    }

保存池

语法:ok, err = upstream:save_pools(pools)

将池表保存到共享字典中,pools 必须与 get_pools 返回的格式相同

排序池

语法:ok, err = upstream:sort_pools(pools)

根据提供的池表在共享字典中生成优先级顺序

绑定

语法:ok, err = upstream:bind(event, func)

绑定一个函数,以便在事件发生时调用。func 应接收一个包含事件数据的参数。

在成功绑定时返回 true,在失败时返回 nil 和错误消息。

    local function host_down_handler(event)
        ngx.log(ngx.ERR, "Host: ", event.host.host, ":", event.host.port, " in pool '", event.pool.id,'" is down!')
    end
    local ok, err = upstream:bind('host_down', host_down_handler)

事件:host_up

当主机状态从 down 变为 up 时触发。事件数据是一个包含受影响的主机和池的表。

事件:host_down

当主机状态从 up 变为 down 时触发。事件数据是一个包含受影响的主机和池的表。

upstream.api

这些函数允许你动态重新配置上游池和主机

新建

语法:api, err = upstream_api:new(upstream)

使用提供的上游对象返回一个新的 api 对象。

设置方法

语法:ok, err = api:set_method(poolid, method)

设置指定池的负载均衡方法。目前支持随机轮循和哈希方法。

创建池

语法:ok, err = api:create_pool(pool)

从选项表中创建一个新的池,pool 必须至少包含一个键 id,该键在当前上游对象中必须唯一。

其他有效选项为

  • method 平衡方法

  • timeout 连接超时时间(毫秒)

  • priority 优先级更高的池将在后面使用

  • read_timeout

  • keepalive_timeout

  • keepalive_pool

  • status_codes 请参阅 status_codes

此时无法定义主机。

注意:此函数会将 ID 转换为字符串

默认池值

    { method = 'round_robin', timeout = 2000, priority = 0 }

设置优先级

语法:ok, err = api:set_priority(poolid, priority)

优先级必须是数字,错误时返回 nil。

添加主机

语法:ok, err = api:add_host(poolid, host)

接收池 ID 和选项表,host 必须至少包含 host。如果未指定主机 ID,它将是基于池中主机数量的数字索引。

注意:此函数会将 ID 转换为字符串

默认值

    { host = '', port = 80, weight = 0}

移除主机

语法:ok, err = api:remove_host(poolid, host)

接收一个 poolid 和一个 hostid 以将其从池中移除

主机失效

语法:ok,err = api:down_host(poolid, host)

手动将主机标记为失效,此主机不会自动恢复。

主机恢复

语法:ok,err = api:up_host(poolid, host)

手动将失效的主机恢复到池中

upstream.http

用于对上游主机发出 HTTP 请求的函数。

状态码

此池选项是一个指示请求失败的状态码数组。默认为无。

字符 x 掩盖一个数字

    {
        ['5xx'] = true, -- Matches 500, 503, 524
        ['400'] = true  -- Matches only 400
    }

新建

语法:httpc, err = upstream_http:new(upstream, ssl_opts?)

使用提供的上游对象返回一个新的 http 上游对象。

ssl_opts 是一个用于配置 SSL 支持的可选表。

  • ssl 设置为 true 以启用 SSL 握手,默认为 false

  • ssl_verify 设置为 false 以禁用 SSL 证书验证,默认为 true

  • sni_host 用于作为 sni 主机名的字符串,默认为请求的 Host 标头

    `lua https_upstream = Upstream_HTTP:new(upstream_ssl, { ssl = true, ssl_verify = true, sni_host = "foo.example.com" }) `

初始化后台线程

语法:ok, err = upstream_http:init_background_thread()

初始化后台线程,应在 init_worker_by_lua 中调用。

如果使用 upstream.http 后台线程,则不要upstream.socket 中调用 init_background_thread 方法

请求

语法:res, err_or_conn_info, status? = upstream_api:request(params)

接收与 lua-resty-http 的 request 方法相同的参数。

在请求成功时返回 lua-resty-http 对象和一个包含已连接的主机和池的表。

如果请求失败,则返回 nil、错误和建议的 http 状态代码

    local ok, err, status = upstream_http:request({
            path = "/helloworld",
            headers = {
                ["Host"] = "example.com",
            }
        })
    if not ok then
        ngx.status = status
        ngx.say(err)
        ngx.exit(status)
    else
        local host = err.host
        local pool = err.pool
    end

设置保持活动连接

语法:ok, err = upstream_http:set_keepalive()

将保持活动连接超时/池从池配置传递到 lua-resty-http 的 set_keepalive 方法。

获取重用次数

语法:ok, err = upstream_http:get_reused_times()

传递到 lua-resty-http 的 get_reused_times 方法。

关闭

语法:ok, err = upstream_http:close()

传递到 lua-resty-http 的 close 方法。

HTTP 健康检查

可以通过将 healthcheck 参数添加到主机来启用活动后台健康检查。

值为 true 将启用默认检查,即对 /GET 请求。

healthcheck 参数也可以是 lua-resty-http 的 request 方法的有效参数表。

以及一些其他参数

  • interval 用于设置健康检查之间的时间间隔(秒)。必须 >= 10 秒。默认为 60 秒

  • timeout 设置健康检查的连接超时时间。默认为池设置。

  • read_timeout 设置健康检查的读取超时时间。默认为池设置。

  • status_codes 无效响应状态码表。默认为池设置。

后台检查的失败与前端请求的参数相同,除非显式覆盖。

    -- Custom check parameters
    api:add_host("primary", {
         host = 123.123.123.123,
         port = 80,
         healthcheck = {
            interval = 30, -- check every 30s
            timeout      = (5*1000), -- 5s connect timeout
            read_timeout = (15*1000), -- 15s connect timeout
            status_codes = {["5xx"] = true, ["403"] = true}, -- 5xx and 403 responses are a fail
            -- resty-http params
            path = "/check",
            headers = {
                ["Host"] = "domain.com",
                ["Accept-Encoding"] = "gzip"
            }
         }
    })
    
    -- Default check parameters
    api:add_host("primary", {host = 123.123.123.123, port = 80, healthcheck = true})
    

待办事项

  • 基于 IP 的粘性会话

  • 缓慢启动 - 恢复的主机权重较低

  • 活动 TCP 健康检查

  • 使用 Cap'n Proto 代替 JSON 进行序列化

  • HTTP 最小上升 - 主机必须进行 n 次成功的健康检查才能被标记为正常

  • HTTP 特定选项

    • 基于 Cookie 的粘性会话

作者

Hamish Forbes

许可证

mit

依赖项

版本