lua-resty-qless

OpenResty 的 Qless(队列/管道管理)Lua 绑定

$ opm get pintsized/lua-resty-qless

lua-resty-qless

lua-resty-qless 是对 qless-core(来自 Moz)的绑定 - 一个强大的基于 Redis 的作业排队系统,灵感来自 resque,但它是作为 Redis 的 Lua 脚本集合实现的。

此绑定通过在 OpenResty / lua-nginx-module 中运行的 Lua 脚本提供了 Qless 的完整实现,包括可以在 init_worker_by_lua 阶段启动的 worker。

本质上,有了这个模块和一个现代的 Redis 实例,你可以将你的 OpenResty 服务器变成一个相当复杂但轻量级的作业排队系统,它也与参考 Ruby 实现 Qless 兼容。

注意:此模块不是为在纯 Lua 环境中工作而设计的。

状态

此模块应被视为实验性的。

需求

哲学和命名法

一个 job 是一个工作单位,由作业 ID 或 jid 标识。一个 queue 可以包含多个作业,这些作业计划在特定时间运行,多个正在等待运行的作业,以及正在运行的作业。一个 worker 是主机上的一个进程,它被唯一标识,它从队列中请求作业,执行与该作业相关的某些进程,然后将其标记为已完成。完成时,它可以被放到另一个队列中。

作业一次只能在一个队列中。该队列是它们最后被放入的任何队列。因此,如果 worker 正在处理一个作业,而你移动了它,则 worker 完成作业的请求将被忽略。

一个作业可以被 canceled,这意味着它会消失在空中,我们再也不会理它了。一个作业可以被 dropped,这是当 worker 无法及时发送心跳或完成作业时,或者一个作业可以被 failed,这是当主机识别到作业的一些系统性问题状态时。worker 只有在错误不太可能是瞬态错误时才应该失败一个作业;否则,该 worker 应该直接将其丢弃,让系统回收它。

特性

  1. 作业不会掉到地上 有时 worker 会丢弃作业。Qless 会自动将它们捡起来,交给另一个 worker

  2. 标记/跟踪 有些作业比其他作业更有趣。跟踪这些作业以获取有关其进度的更新。

  3. 作业依赖关系 一个作业可能需要等待另一个作业完成

  4. 统计数据 Qless 会自动记录有关作业等待处理的时间和处理时间的信息。目前,我们跟踪这些时间的数量、平均值、标准差和直方图。

  5. 作业数据临时存储 作业信息会保留一段时间(可配置),这样你仍然可以查看作业的历史记录、数据等。

  6. 优先级 具有相同优先级的作业按插入顺序弹出;优先级越高,弹出速度越快

  7. 重试逻辑 每个作业都与一定数量的重试相关联,这些重试会在将其放入新队列或完成时被更新。如果一个作业被反复丢弃,那么它被认为是有问题的,并且会自动失败。

  8. Web 应用程序 Ruby 绑定 有一个基于 Sinatra 的 Web 应用程序,它让你可以控制某些操作问题

  9. 计划工作 直到作业等待规定的延迟(默认为 0)之前,作业才可能被 worker 弹出

  10. 定期作业 计划很好,但我们也支持需要定期运行的作业。

  11. 通知 跟踪的作业在完成、失败、放入、弹出等操作时,会在 pubsub 通道上发出事件。使用这些事件来获取你感兴趣的作业的进度通知。

连接

首先,需要 resty.qless 并创建一个客户端,指定你的 Redis 连接详细信息。

    local qless = require("resty.qless").new({
        host = "127.0.0.1",
        port = 6379,
    })

传递给 new 的参数会转发到 lua-resty-redis-connector。请查看那里的文档以了解连接选项,包括如何使用 Redis Sentinel 等。

此外,如果你的应用程序有一个你希望重用的 Redis 连接,那么你可以通过两种方式进行集成

1) 直接使用已建立的连接

    local qless = require("resty.qless").new({
        redis_client = my_redis,
    })

2) 为连接和关闭连接提供回调

    local qless = require("resty.qless").new({
        get_redis_client = my_connection_callback,
        close_redis_client = my_close_callback,
    })

完成与 Qless 的操作后,你应该调用 qless:set_keepalive(),这将尝试将 Redis 放回 keepalive 池中,使用你直接提供的设置,或者通过传递给 lua-resty-redis-connector 的参数,或者通过调用你的 close_redis_client 回调。

排队作业

作业本身是模块,必须通过 require 加载,并提供一个 perform 函数,该函数接受一个 job 参数。

    -- my/test/job.lua (the job's "klass" becomes "my.test.job")
    
    local _M = {}
    
    function _M.perform(job)
        -- job is an instance of Qless_Job and provides access to
        -- job.data (which is a Lua table), a means to cancel the
        -- job (job:cancel()), and more.
    
        -- return "nil, err_type, err_msg" to indicate an unexpected failure
    
        if not job.data then
            return nil, "job-error", "data missing"
        end
    
        -- Do work
    end
    
    return _M

现在你可以访问队列,并将作业添加到该队列中。

    -- This references a new or existing queue 'testing'
    local queue = qless.queues['testing']
    
    -- Let's add a job, with some data. Returns Job ID
    local jid = queue:put("my.test.job", { hello = "howdy" })
    -- = "0c53b0404c56012f69fa482a1427ab7d"
    
    -- Now we can ask for a job
    local job = queue:pop()
    
    -- And we can do the work associated with it!
    job:perform()

作业数据必须是一个表(在内部序列化为 JSON)。

queue:put() 返回的值是作业 ID,或 jid。每个 Qless 作业都有一个唯一的 jid,它提供了一种与现有作业交互的方式

    -- find an existing job by it's jid
    local job = qless.jobs:get(jid)
    
    -- Query it to find out details about it:
    job.klass -- the class of the job
    job.queue -- the queue the job is in
    job.data  -- the data for the job
    job.history -- the history of what has happened to the job sofar
    job.dependencies -- the jids of other jobs that must complete before this one
    job.dependents -- the jids of other jobs that depend on this one
    job.priority -- the priority of this job
    job.tags -- table of tags for this job
    job.original_retries -- the number of times the job is allowed to be retried
    job.retries_left -- the number of retries left
    
    -- You can also change the job in various ways:
    job:requeue("some_other_queue") -- move it to a new queue
    job:cancel() -- cancel the job
    job:tag("foo") -- add a tag
    job:untag("foo") -- remove a tag

运行 Worker

传统上,Qless 提供了一个受 Resque 启发的分叉 Ruby worker 脚本。

在 lua-resty-qless 中,我们利用 init_lua_by_worker 阶段和 ngx.timer.at API,以便在独立的“轻线程”中运行 worker,并在你的 worker 进程中进行扩展。

你可以在每个 worker 进程中并发运行多个轻线程,Nginx 会为你调度它们。

    init_worker_by_lua '
        local resty_qless_worker = require "resty.qless.worker"
        
        local worker = resty_qless_worker.new(redis_params)
        
        worker:start({
            interval = 1,
            concurrency = 4,
            reserver = "ordered",
            queues = { "my_queue", "my_other_queue" },
        })
    ';

Worker 支持三种策略(预留器)来决定从队列中弹出作业的顺序:orderedround-robinshuffled round-robin

ordered 预留器会一直从第一个队列中弹出作业,直到它为空,然后才会尝试从第二个队列中弹出作业。round-robin 预留器会从第一个队列中弹出作业,然后是第二个队列,以此类推。Shuffled 只是确保 round-robin 选择是不可预测的。

你也可以轻松地实现自己的预留器。以其他预留器为指导,确保你的预留器可以用 require "resty.qless.reserver.myreserver" 来“require”。

中间件

Worker 还支持中间件,可以用来在处理单个作业时注入逻辑。例如,当你需要重新建立数据库连接时,这很有用。

为此,你将 worker 的 middleware 设置为一个函数,并在你希望执行作业的地方调用 coroutine.yield

    local worker = resty_qless_worker.new(redis_params)
    
    worker.middleware = function(job)
        -- Do pre job work
        coroutine.yield()
        -- Do post job work
    end
    
    worker:start({ queues = "my_queue" })

作业依赖关系

假设你有一个作业依赖于另一个作业,但任务定义在根本上是不同的。你需要烤一只火鸡,还需要做一些填料,但你不能在做填料之前烤火鸡

    local queue = qless.queues['cook']
    local stuffing_jid = queue:put("jobs.make.stuffing", 
      { lots = "of butter" }
    )
    local turkey_jid  = queue:put("jobs.make.turkey", 
      { with = "stuffing" }, 
      { depends = stuffing_jid }
    )

当填料作业完成时,火鸡作业将被解锁并可以被处理。

优先级

有些作业需要比其他作业更快地弹出。无论是故障单还是调试,当你将作业放入队列时,你都可以很容易地做到这一点

    queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

当你想要调整作业在队列中等待时的优先级时会发生什么?

    local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
    job.priority = 10
    -- Now this will get popped before any job of lower priority

*注意:由于 Lua 元方法(用于更新 Redis)被调用,因此设置上面的 priority 字段是你需要做的全部操作。这可能看起来有点“自动魔法”,但目的是尽可能地保留与 Ruby 客户端的 API 设计兼容性。*

计划作业

如果你不希望作业立即运行,而是希望在未来某个时间运行,你可以指定一个延迟

    -- Run at least 10 minutes from now
    queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

这并不能保证作业会在 10 分钟后精确运行。你可以通过更改作业的优先级来实现这一点,这样,一旦 10 分钟过去,它就会被放在优先级较低的作业之前

    -- Run in 10 minutes
    queue:put("jobs.test", 
      { foo = "bar" }, 
      { delay = 600, priority = 100 }
    )

定期作业

有时仅仅计划一个作业是不够的,你需要定期运行作业。特别是,也许你有一些需要每小时运行一次的批处理操作,你并不关心哪个 worker 运行它。定期作业的指定方式与其他作业非常相似

    -- Run every hour
    local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
    -- = 22ac75008a8011e182b24cf9ab3a8f3b

你也可以以与正常作业相同的方式访问它们

    local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

更改它运行的间隔非常简单

    -- I think I only need it to run once every two hours
    job.interval = 7200

如果你希望它每小时运行一次,但现在是 2:37,你可以指定一个偏移量,即它应该等待多长时间才能弹出第一个作业

    -- 23 minutes of waiting until it should go
    queue:recur("jobs.test", 
      { howdy = "hello" }, 
      3600,
      { offset = (23 * 60) }
    )

定期作业也有优先级、可配置的重试次数和标签。这些设置不适用于定期作业,而是适用于它们生成的作业。在多个时间间隔过去后,worker 才会尝试弹出作业的情况下,__会创建多个作业__。思路是,虽然它是完全由客户端管理的,但状态不应该依赖于 worker 尝试弹出作业的频率。

    -- Recur every minute
    queue:recur("jobs.test", { lots = "of jobs" }, 60)
     
    -- Wait 5 minutes
    
    local jobs = queue:pop(10)
    ngx.say(#jobs, " jobs got popped")
    
    -- = 5 jobs got popped

配置选项

你可以获取和设置全局(在同一个 Redis 实例的上下文中)配置,以更改心跳等的行为。配置选项并不多,但一个重要的选项是作业数据保留多长时间。作业数据在完成 jobs-history 秒后会过期,但仅限于最后 jobs-history-count 个已完成的作业。默认情况下,它们是 5 万个作业和 30 天,但根据你的需求,你的需求可能会改变。如果只想保留最后 500 个作业,最多保留 7 天

    qless:config_set("jobs-history", 7 * 86400)
    qless:config_get("jobs-history-count", 500)

标记/跟踪

在 qless 中,“跟踪”意味着将作业标记为重要。跟踪的作业在它们取得进展时会发出可订阅的事件(有关更多信息,请参见下文)。

    local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
    job:track()

作业可以使用字符串进行标记,这些字符串被索引以进行快速搜索。例如,作业可能与客户帐户或其他对你的项目有意义的关键内容相关联。

    queue:put("jobs.test", {}, 
      { tags = { "12345", "foo", "bar" } }
    )

这使它们可以在 Ruby/Sinatra Web 界面或代码中进行搜索

    local jids = qless.jobs:tagged("foo")

你也可以随时添加或删除标签

    local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
    job:tag("howdy", "hello")
    job:untag("foo", "bar")

通知

跟踪 的作业在发生事件时会在特定 pubsub 通道上发出事件。无论是从队列中弹出,还是由 worker 完成等。

熟悉 Redis pub/sub 的人会注意到,Redis 连接只能在监听后才能用于 pubsub 命令。为此,events 模块会独立地传递 Redis 连接参数。

    local events = qless.events(redis_params)
    
    events:listen({ "canceled", "failed" }, function(channel, jid)
        ngx.log(ngx.INFO, jid, ": ", channel)
        -- logs "b1882e009a3d11e192d0b174d751779d: canceled" etc.
    end

你也可以监听“log”通道,它会提供所有已记录事件的 JSON 结构。

    local events = qless.events(redis_params)
    
    events:listen({ "log" }, function(channel, message)
        local message = cjson.decode(message)
        ngx.log(ngx.INFO, message.event, " ", message.jid)
    end

心跳

当 worker 获得一个作业时,它会获得该作业的独占锁。这意味着,只要 worker 报告了作业的进度,该作业就不会被分配给任何其他 worker。默认情况下,作业必须每 60 秒报告一次进度,或者完成作业,但这是一个可配置选项。对于较长的作业,这可能没有意义。

    -- Hooray! We've got a piece of work!
    local job = queue:pop()
    
    -- How long until I have to check in?
    job:ttl()
    -- = 59
    
    -- Hey! I'm still working on it!
    job:heartbeat()
    -- = 1331326141.0
    
    -- Ok, I've got some more time. Oh! Now I'm done!
    job:complete()

如果你想增加所有队列的心跳,

    -- Now jobs get 10 minutes to check in
    qless:set_config("heartbeat", 600)
    
    -- But the testing queue doesn't get as long.
    qless.queues["testing"].heartbeat = 300

在选择心跳间隔时,请注意,这是 qless 意识到作业是否已丢失之前可能经过的时间。同时,如果您的作业预计需要几个小时才能完成,您不希望 qless 每 10 秒钟就进行一次心跳。

建议您对希望定期检查其进度的长时间运行作业使用的一种习惯用法

    -- Wait until we have 5 minutes left on the heartbeat, and if we find that
    -- we've lost our lock on a job, then honorably fall on our sword
    if job:ttl() < 300 and not job:heartbeat() then
      -- exit
    end

统计

Qless 的一项很棒的功能是您可以获取有关使用情况的统计信息。统计信息按天汇总,因此当您想要了解有关队列的统计信息时,您需要说出您要谈论的队列和日期。默认情况下,您只会获得当天的统计信息。这些统计信息包括有关平均作业等待时间、标准差和直方图的信息。完成作业也会提供相同的数据。

    -- So, how're we doing today?
    local stats = queue:stats()
    -- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

时间

重要的是要注意,如果您要对数据进行任何操作(我们的脚本会执行这些操作),Redis 不允许访问系统时间。但是,我们有心跳。这意味着客户端在发出大多数请求时实际上会发送当前时间,为了保持一致性,这意味着您的工作者必须相对同步。这并不意味着要精确到十毫秒,但如果您遇到明显的时钟漂移,则应调查 NTP。

确保作业唯一性

如上所述,作业通过一个 id(它们的 jid)进行唯一标识。Qless 将为每个排队的作业生成一个 UUID,或者您也可以手动指定一个。

    queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

当您希望确保作业的唯一性时,这很有用:只需创建一个 jid,该 jid 是作业的类和数据的函数,它将保证 Qless 不会有多个具有相同类和数据的作业。

作者

James Hurst <[email protected]>

基于 Ruby Qless 参考实现。文档也改编自原始项目。

许可证

本模块根据 2 条款 BSD 许可证授权。

版权所有 (c) James Hurst <[email protected]>

保留所有权利。

在满足以下条件的情况下,允许以源代码和二进制形式重新分发和使用,无论是否修改:

  • 重新分发源代码必须保留上述版权声明、此条件列表和以下免责声明。

  • 以二进制形式重新分发必须在随分发提供的文档和/或其他材料中复制上述版权声明、此条件列表和以下免责声明。

本软件由版权持有人和贡献者“按现状”提供,任何明示或暗示的担保,包括但不限于适销性担保和特定用途适用性的担保,均不予以保证。在任何情况下,版权持有人或贡献者均不对任何直接的、间接的、偶然的、特殊的、惩罚性的或后果性的损害(包括但不限于替代商品或服务的采购;使用、数据或利润损失;或业务中断)负责,无论其是由使用本软件引起的,还是与之有关,无论损害的理论是基于合同、严格责任或侵权行为(包括疏忽或其他原因),即使已被告知此类损害的可能性。

作者

James Hurst

许可证

2bsd

依赖项

版本