lua-resty-kafka

基于 cosocket API 的 OpenResty Lua Kafka 客户端驱动

$ opm get doujiang24/lua-resty-kafka

名称

lua-resty-kafka - 基于 cosocket API 的 ngx_lua Lua Kafka 客户端驱动

状态

该库仍处于早期开发阶段,尚处于实验性阶段。

描述

此 Lua 库是 ngx_lua nginx 模块的 Kafka 客户端驱动

http://wiki.nginx.org/HttpLuaModule

此 Lua 库利用了 ngx_lua 的 cosocket API,确保了 100% 的非阻塞行为。

请注意,至少需要 ngx_lua 0.9.3openresty 1.4.3.7,并且不幸的是只支持 LuaJIT (--with-luajit)。

对于 ssl 连接,至少需要 ngx_lua 0.9.11openresty 1.7.4.1,并且不幸的是只支持 LuaJIT (--with-luajit)。

概要

        lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
    
        server {
            location /test {
                content_by_lua '
                    local cjson = require "cjson"
                    local client = require "resty.kafka.client"
                    local producer = require "resty.kafka.producer"
    
                    local broker_list = {
                        {
                            host = "127.0.0.1",
                            port = 9092,
    
                            -- optional auth
                            sasl_config = {
                                mechanism = "PLAIN",
                                user = "USERNAME",
                                password = "PASSWORD",
                            },
                        },
                    }
    
                    local key = "key"
                    local message = "halo world"
    
                    -- usually we do not use this library directly
                    local cli = client:new(broker_list)
                    local brokers, partitions = cli:fetch_metadata("test")
                    if not brokers then
                        ngx.say("fetch_metadata failed, err:", partitions)
                    end
                    ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
    
    
                    -- sync producer_type
                    local p = producer:new(broker_list)
    
                    local offset, err = p:send("test", key, message)
                    if not offset then
                        ngx.say("send err:", err)
                        return
                    end
                    ngx.say("send success, offset: ", tonumber(offset))
    
                    -- this is async producer_type and bp will be reused in the whole nginx worker
                    local bp = producer:new(broker_list, { producer_type = "async" })
    
                    local ok, err = bp:send("test", key, message)
                    if not ok then
                        ngx.say("send err:", err)
                        return
                    end
    
                    ngx.say("send success, ok:", ok)
                ';
            }
        }

模块

resty.kafka.client

要加载此模块,只需执行以下操作

        local client = require "resty.kafka.client"

方法

new

语法: c = client:new(broker_list, client_config)

broker_list 是代理列表,如下所示

    [
        {
            "host": "127.0.0.1",
            "port": 9092,
    
            // optional auth
            "sasl_config": {
                "mechanism": "PLAIN",
                "user": "USERNAME",
                "password": "PASSWORD"
            }
        }
    ]

可以指定一个可选的 client_config 表格。以下选项如下所示

客户端配置

  • socket_timeout

    指定网络超时阈值(以毫秒为单位)。应该大于 request_timeout

  • keepalive_timeout

    指定保持活动连接的最大空闲超时时间(以毫秒为单位)。

  • keepalive_size

    指定每个 Nginx 工作进程允许的最大连接数。

  • refresh_interval

    指定以毫秒为单位自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。

  • ssl

    指定客户端是否应使用 ssl 连接。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    指定客户端是否应执行 SSL 验证。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    指定一个主机解析函数,该函数返回一个 IP 字符串或 nil,以覆盖系统默认的主机解析器。默认为 nil,不执行解析。示例 function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

语法: brokers, partitions = c:fetch_metadata(topic)

如果成功,则返回 topic 的所有代理和分区。如果出错,则返回 nil 和描述错误的字符串。

refresh

语法: brokers, partitions = c:refresh()

这将刷新所有通过 fetch_metadata 获取的主题的元数据。如果成功,则返回所有代理和所有主题的所有分区。如果出错,则返回 nil 和描述错误的字符串。

choose_api_version

语法: api_version = c:choose_api_version(api_key, min_version, max_version)

这有助于客户端为 API 选择正确的 api_key 版本。

当提供 min_versionmax_version 时,它将充当限制,并且返回值中的所选版本无论代理支持的 API 版本有多高或多低都不会超过其限制。当它们未提供时,它将遵循代理支持的版本范围。

提示:版本选择策略是在允许的范围内选择最大版本。

resty.kafka.producer

要加载此模块,只需执行以下操作

        local producer = require "resty.kafka.producer"

方法

new

语法: p = producer:new(broker_list, producer_config?, cluster_name?)

建议使用异步 producer_type。

broker_listclient 中的相同

可以指定一个可选的选项表。以下选项如下所示

socket_timeoutkeepalive_timeoutkeepalive_sizerefresh_intervalsslssl_verifyclient_config 中的相同

生产者配置,大多类似于 <http://kafka.apache.org/documentation.html#producerconfigs>

  • producer_type

    指定 producer.type。“async” 或 “sync”

  • request_timeout

    指定 request.timeout.ms。默认为 2000 ms

  • required_acks

    指定 request.required.acks不应为零。默认为 1

  • max_retry

    指定 message.send.max.retries。默认为 3

  • retry_backoff

    指定 retry.backoff.ms。默认为 100

  • api_version

    指定生产 API 版本。默认为 0。如果使用 Kafka 0.10.0.0 或更高版本,api_version 可以使用 012。如果使用 Kafka 0.9.x,api_version 应为 01。如果使用 Kafka 0.8.x,api_version 应为 0

  • partitioner

    指定从键和分区数中选择分区的分配器。语法: partitioner = function (key, partition_num, correlation_id) end,correlation_id 是生产者中的自动递增 ID。默认分配器为

    `lua local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id is continuous and start from 0 return id % num end `

缓冲区配置(仅当 producer_type = “async” 时有效)

  • flush_time

    指定 queue.buffering.max.ms。默认为 1000

  • batch_num

    指定 batch.num.messages。默认为 200

  • batch_size

    指定 send.buffer.bytes。默认为 1M(可能达到 2M)。请注意,小于 kafka 服务器中 socket.request.max.bytes / 2 - 10k 配置。

  • max_buffering

    指定 queue.buffering.max.messages。默认为 50,000

  • error_handle

    指定错误处理程序,处理缓冲区发送到 kafka 错误时的数据。语法: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end,message_queue 中失败的消息类似于 `{ key1, msg1, key2, msg2 } `,即使原始值为 nil,message_queue 中的 key 也是空字符串 ""index 是 message_queue 的长度,不应使用 #message_queue。当 retryabletrue 时,表示 kafka 服务器肯定未提交这些消息,您可以安全地重试发送;否则表示可能,建议记录到某个位置。

目前不支持压缩。

第三个可选的 cluster_name 指定集群的名称,默认为 1(是的,它是数字)。当您有两个或多个 kafka 集群时,可以指定不同的名称。这仅在使用 async producer_type 时有效。

send

语法: ok, err = p:send(topic, key, message)

  1. 在同步模式下

    如果成功,则返回当前代理和分区的偏移量 ( cdata: LL )。如果出错,则返回 nil 和描述错误的字符串。

  1. 在异步模式下

    message 将首先写入缓冲区。当缓冲区超过 batch_num 或每隔 flush_time 刷新缓冲区时,它将发送到 kafka 服务器。

    如果成功,则返回 true。如果出错,则返回 nil 和描述错误的字符串(buffer overflow)。

offset

语法: sum, details = p:offset()

    Return the sum of all the topic-partition offset (return by the ProduceRequest api);
    and the details of each topic-partition

flush

语法: ok = p:flush()

始终返回 true

resty.kafka.basic-consumer

要加载此模块,只需执行以下操作

        local bconsumer = require "resty.kafka.basic-consumer"

此模块是消费者的一种极简实现,提供用于按时间查询或获取起始和结束偏移量的 list_offset API,以及用于获取主题中消息的 fetch API。

在单个调用中,只能获取单个主题中单个分区的相关信息,目前不支持批量获取。基本消费者不支持与消费者组相关的 API,因此您需要在通过 list_offset API 获取偏移量后获取消息,或者您的服务可以自行管理偏移量。

方法

new

语法: c = bconsumer:new(broker_list, client_config)

broker_list 是代理列表,如下所示

    [
        {
            "host": "127.0.0.1",
            "port": 9092,
    
            // optional auth
            "sasl_config": {
                "mechanism": "PLAIN",
                "user": "USERNAME",
                "password": "PASSWORD"
            }
        }
    ]

可以指定一个可选的 client_config 表格。以下选项如下所示

客户端配置

  • socket_timeout

    指定网络超时阈值(以毫秒为单位)。应该大于 request_timeout

  • keepalive_timeout

    指定保持活动连接的最大空闲超时时间(以毫秒为单位)。

  • keepalive_size

    指定每个 Nginx 工作进程允许的最大连接数。

  • refresh_interval

    指定以毫秒为单位自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。

  • ssl

    指定客户端是否应使用 ssl 连接。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    指定客户端是否应执行 SSL 验证。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level 此设置控制事务记录的可见性。请参阅:https://kafka.apache.org/protocol.html

  • client_rack

    发出此请求的消费者的机架 ID。请参阅:https://kafka.apache.org/protocol.html

list_offset

语法: offset, err = c:list_offset(topic, partition, timestamp)

参数 timestamp 可以是 UNIX 时间戳或 resty.kafka.protocol.consumer 中定义的常量,LIST_OFFSET_TIMESTAMP_LASTLIST_OFFSET_TIMESTAMP_FIRSTLIST_OFFSET_TIMESTAMP_MAX,用于获取初始和最新偏移量等,语义与 Apache Kafka 中的 ListOffsets API 相同。请参阅:https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

如果成功,则返回指定情况下的偏移量。如果出错,则返回 nil 和描述错误的字符串。

fetch

语法: result, err = c:fetch(topic, partition, offset)

如果成功,则返回指定情况下的以下 result。如果出错,则返回 nil 和描述错误的字符串。

result 将包含更多信息,例如消息

  • records

    包含消息内容的表。

  • errcode

    Fetch API 的错误代码。请参阅:https://kafka.apache.org/protocol.html#protocol_error_codes

  • high_watermark

    Fetch API 的高水位线。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch

  • last_stable_offset

    Fetch API 的最后一个稳定偏移量。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v4

  • log_start_offset

    Fetch API 的日志起始偏移量。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v5

  • aborted_transactions

    Fetch API 中中止的事务。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v4

  • preferred_read_replica

    Fetch API 的首选读取副本。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v11

错误

当您调用此库中提供的模块时,可能会遇到一些错误。根据来源,它们可以分为以下几类。

  • 网络错误:例如连接被拒绝、连接超时等。您需要检查环境中每个服务的连接状态。

  • 与元数据相关的错误:例如无法正确检索元数据或 ApiVersion 数据;指定的主题或分区不存在等。您需要检查 Kafka Broker 和客户端配置。

  • Kafka 返回的错误:有时 Kafka 会在响应数据中包含 err_code 数据,当此问题发生时,返回值中的 err 类似于 OFFSET_OUT_OF_RANGE,所有字符均为大写,并以下划线分隔,在当前库中,我们提供 一个映射错误列表,对应于文本描述。要了解有关这些错误的更多信息,请参阅 Kafka 文档 中的描述。

安装

您需要配置 lua_package_path 指令,将 lua-resty-kafka 源代码树的路径添加到 ngx_lua 的 LUA_PATH 搜索路径中,如下所示

        # nginx.conf
        http {
            lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
            ...
        }

确保运行 Nginx ''worker'' 进程的系统帐户具有读取 .lua 文件的足够权限。

待办事项

  1. Fetch API

  2. Offset API

  3. Offset Commit/Fetch API

作者

朱德江 (doujiang24) <doujiang24@gmail.com>。

版权和许可证

此模块根据 BSD 许可证发布。

版权所有 (C) 2014-2020,朱德江 (doujiang24) <doujiang24@gmail.com>。

保留所有权利。

允许重新分发和使用源代码和二进制形式,无论是否修改,前提是满足以下条件

  • 源代码的重新分发必须保留上述版权声明、此条件列表和以下免责声明。

  • 二进制形式的重新分发必须在随分发提供的文档和/或其他材料中复制上述版权声明、此条件列表和以下免责声明。

本软件由版权持有人和贡献者“按原样”提供,并且不提供任何明示或暗示的担保,包括但不限于适销性和特定用途适用性的暗示担保。在任何情况下,版权持有人或贡献者均不对任何直接、间接、附带、特殊、惩罚性或后果性损害(包括但不限于替代商品或服务的采购;使用、数据或利润损失;或业务中断)负责,无论这些损害是如何造成的以及基于任何责任理论,无论是合同、严格责任或侵权行为(包括疏忽或其他原因),即使已被告知此类损害的可能性。

另请参阅

作者

doujiang24

许可证

2bsd

版本