lua-resty-rabbitmqstomp

基于 cosocket API 的 ngx_lua 应用的 Lua RabbitMQ 客户端库

$ opm get DevonStrawn/lua-resty-rabbitmqstomp

简介

lua-resty-rabbitmqstomp - Lua RabbitMQ 客户端库,使用 cosocket api 通过 STOMP 1.2 与具有 STOMP 插件的 RabbitMQ 代理进行通信。

限制

该库有自己的观点,并有一些假设和限制,这些假设和限制可能在将来得到解决;

  • RabbitMQ 服务器应该启用支持 STOMP v1.2 的 STOMP 适配器

  • 假设用户、虚拟主机、交换机、队列和绑定已经设置

状态

此库被认为可以用于向 RabbitMQ 发布可靠消息。

STOMP v1.2 客户端实现

该库使用 STOMP 1.2 与 RabbitMQ 代理进行通信,并实现了 RabbitMQ Stomp 插件的扩展和限制。

在内部,RabbitMQ 使用 AMQP 进行进一步的通信。这样,该库就可以实现使用 STOMP 通过 AMQP 与 RabbitMQ 代理进行通信的消费者和生产者。该协议是基于帧的,并且在 TCP 流上有一个命令、头和以 EOL(^@)结尾的主体,该 EOL 由 \r (013) 和必需的 \n (010) 组成

    COMMAND
    header1:value1
    header2: value2

    BODY^@

COMMAND 后面是 EOL,然后是 EOL 分隔的键值对格式的头,然后是空白行,空白行之后是 BODY 的开始,帧以 ^@ EOL 结尾。COMMAND 和头是 UTF-8 编码的。

连接

为了连接,我们在 cosocket api 提供的 TCP 套接字上创建并发送 CONNECT 帧,连接到代理 IP,同时支持 IPv4 和 IPv6。在帧中,我们使用登录名、密码进行身份验证,使用 accept-version 强制执行客户端 STOMP 版本支持,并使用 host 选择代理的 VHOST。

    CONNECT
    accept-version:1.2
    login:guest
    passcode:guest
    host:/devnode
    heart-beat:optional

    ^@

发生错误时,将返回 ERROR 帧,例如

    ERROR
    message:Bad CONNECT
    content-type:text/plain
    version:1.0,1.1,1.2
    content-length:32

    Access refused for user 'admin'^@

连接成功时,代理将返回 CONNECTED 帧,例如

    CONNECTED
    session:session-sGF0vjCKH1bLhFr6w9QwuQ
    heart-beat:0,0
    server:RabbitMQ/3.0.4
    version:1.2

为了创建连接,应提供用户名、密码、虚拟主机、心跳、代理主机和端口。

发布

我们可以使用 SEND 命令向交换机发布消息,消息带有路由键、持久性模式、传递模式和其他头。

    SEND
    destination:/exchange/exchange_name/routing_key
    app-id: luaresty
    delivery-mode:2
    persistent:true
    content-type:json/application
    content-length:5

    hello^@

请注意,content-length 包含消息和 EOL 字节。

方法

new

语法:rabbit, err = rabbitmqstomp:new({username = username,password = password, vost = vhost} )

创建 RabbitMQ 对象。如果失败,则返回 nil 和描述错误的字符串。

如果提供的是 nil 表,则将对登录进行默认值假设

  • username: guest

  • password: guest

  • vhost: /

set_timeout

语法:rabbit:set_timeout(time)

设置后续操作(包括 connect 方法)的超时(以毫秒为单位)保护。请注意,在创建对象后调用任何其他方法之前,应先设置超时。

connect

语法:ok, err = rabbit:connect(host,port)

尝试连接到 RabbitMQ STOMP 适配器所在的 stomp 代理,代理监听的是 host,port。

send

语法:rabbit:send(msg, headers)

使用一组头发布消息。请注意,msg 应该是字符串。

可以设置的一些头值

destination: 消息的目标,例如 /exchange/name/binding persistent: 要传递持久消息,如果已声明,则值为“true” receipt: 确认传递的回执 content-type: 消息的类型,例如 application/json

有关支持的头的列表,请参见 STOMP 协议扩展和限制页面:https://rabbitmq.cn/stomp.html

subscribe

语法:rabbit:subscribe(headers)

使用 headers 订阅队列。如果持久,则应该具有 id。订阅成功后,代理将发送 MESSAGE 帧。

unsubscribe

语法:rabbit:unsubscribe(headers)

使用 headers 取消订阅队列。取消订阅成功后,代理将停止发送 MESSAGE 帧。

receive

语法:rabbit:receive()

尝试读取收到的任何 MESSAGE 帧并返回消息。尝试在没有有效订阅的情况下接收会导致错误。

get_reused_times

语法:times, err = rabbit:get_reused_times()

该方法返回当前连接的(成功)重复使用次数。如果发生错误,则返回 nil 和描述错误的字符串。

如果当前连接不是来自内置连接池,则该方法始终返回 0,即连接从未被重复使用过(尚未)。如果连接来自连接池,则返回值始终是非零的。因此,该方法也可以用于确定当前连接是否来自池。

set_keepalive

语法:ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

将当前 RabbitMQ 连接立即放入 ngx_lua cosocket 连接池。

可以指定连接在池中的最大空闲超时(以毫秒为单位)以及每个 nginx 工作进程池的最大大小。

成功时返回 1。如果发生错误,则返回 nil 和描述错误的字符串。

仅在您原本会调用 close 方法的地方调用该方法。调用该方法将立即将当前 rabbitmq 对象置于关闭状态。对当前对象进行除 connect() 之外的任何后续操作都将返回关闭错误。

close

语法:ok, err = rabbit:close()

通过向 RabbitMQ STOMP 代理发送 DISCONNECT 来优雅地关闭当前 RabbitMQ 连接,并返回状态。

成功时返回 1。如果发生错误,则返回 nil 和描述错误的字符串。

示例

一个简单的生产者,可以向交换机发送可靠的持久消息,交换机具有某些绑定

    local strlen =  string.len
    local cjson = require "cjson"
    local rabbitmq = require "resty.rabbitmqstomp"

    local opts = { username = "guest",
                   password = "guest",
                   vhost = "/" }

    local mq, err = rabbitmq:new(opts)

    if not mq then
          return
    end

    mq:set_timeout(10000)

    local ok, err = mq:connect("127.0.0.1",61613) 

    if not ok then
        return
    end

    local msg = {key="value1", key2="value2"}
    local headers = {}
    headers["destination"] = "/exchange/test/binding"
    headers["receipt"] = "msg#1"
    headers["app-id"] = "luaresty"
    headers["persistent"] = "true"
    headers["content-type"] = "application/json"

    local ok, err = mq:send(cjson.encode(msg), headers)
    if not ok then
        return
    end
    ngx.log(ngx.INFO, "Published: " .. msg)

    local headers = {}
    headers["destination"] = "/amq/queue/queuename"
    headers["persistent"] = "true"
    headers["id"] = "123"

    local ok, err = mq:subscribe(headers)
    if not ok then
        return
    end

    local data, err = mq:receive()
    if not ok then
        return
    end
    ngx.log(ngx.INFO, "Consumed: " .. data)

    local headers = {}
    headers["persistent"] = "true"
    headers["id"] = "123"

    local ok, err = mq:unsubscribe(headers)

    local ok, err = mq:set_keepalive(10000, 10000)
    if not ok then
        return
    end

resty-upstream 池示例

local cjson = require "cjson" local rabbitmq = require "resty.rabbitmqstomp"

local mq, err = rabbitmq:new()

if not mq then return nil, err end

local mq, info = upstream_pool:connect(mq)

if not mq then return nil, info end

local msg = {key="value1", key2="value2"} local headers = {} headers["destination"] = "/exchange/test/binding" headers["receipt"] = "msg#1" headers["app-id"] = "luaresty" headers["persistent"] = "true" headers["content-type"] = "application/json"

local ok, err = mq:send(cjson.encode(msg), headers)

if not ok then return nil, err end

ok, err = mq:set_keepalive(info.pool.keepalive_timeout,info.pool.keepalive_pool)

TODO

  • 编写测试

  • 检查和解析来自代理的回复

作者

Rohit "bhaisaab" Yadav, rohit.yadav@wingify.com

贡献

https://github.com/wingify/lua-resty-rabbitmqstomp 上发送拉取请求

您可以联系作者和 Openresty 社区

版权和许可

此模块在 MIT 许可下获得许可。

版权所有 2013 Rohit "bhaisaab" Yadav, Wingify

特此免费授予任何获得本软件和相关文档文件(以下简称“软件”)副本的人无限制地处理软件的权利,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本,以及允许获得软件的人员这样做,但须符合以下条件:

以上版权声明和本许可声明应包含在软件的所有副本或重要部分中。

软件按“原样”提供,不提供任何形式的明示或暗示保证,包括但不限于适销性、特定用途的适用性和非侵权的保证。在任何情况下,作者或版权持有者均不对因软件或使用或以其他方式处理软件而引起的任何索赔、损害或其他责任(无论是在合同、侵权或其他情况下)负责。

另请参见

作者

Wingify (@Wingify)

许可证

mit

版本