lua-resty-kafka

基于 cosocket API 的 Openresty Lua kafka 客户端驱动

$ opm get JoshuaOliphant/lua-resty-kafka

名称

lua-resty-kafka - 基于 cosocket API 的 ngx_lua Lua kafka 客户端驱动

状态

该库仍处于早期开发阶段,目前尚处于实验阶段。

描述

该 Lua 库是 ngx_lua nginx 模块的 Kafka 客户端驱动

http://wiki.nginx.org/HttpLuaModule

该 Lua 库利用了 ngx_lua 的 cosocket API,确保了 100% 的非阻塞行为。

请注意,至少需要 ngx_lua 0.9.3ngx_openresty 1.4.3.7,并且遗憾的是只支持 LuaJIT (--with-luajit)。

概要

        lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
    
        server {
            location /test {
                content_by_lua '
                    local cjson = require "cjson"
                    local client = require "resty.kafka.client"
                    local producer = require "resty.kafka.producer"
    
                    local broker_list = {
                        { host = "127.0.0.1", port = 9092 },
                    }
    
                    local key = "key"
                    local message = "halo world"
    
                    -- usually we do not use this library directly
                    local cli = client:new(broker_list)
                    local brokers, partitions = cli:fetch_metadata("test")
                    if not brokers then
                        ngx.say("fetch_metadata failed, err:", partitions)
                    end
                    ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
    
    
                    -- sync producer_type
                    local p = producer:new(broker_list)
    
                    local offset, err = p:send("test", key, message)
                    if not offset then
                        ngx.say("send err:", err)
                        return
                    end
                    ngx.say("send success, offset: ", tonumber(offset))
    
                    -- this is async producer_type and bp will be reused in the whole nginx worker
                    local bp = producer:new(broker_list, { producer_type = "async" })
    
                    local ok, err = bp:send("test", key, message)
                    if not ok then
                        ngx.say("send err:", err)
                        return
                    end
    
                    ngx.say("send success, ok:", ok)
                ';
            }
        }

模块

resty.kafka.client

要加载此模块,只需执行以下操作

        local client = require "resty.kafka.client"

方法

new

语法:c = client:new(broker_list, client_config)

broker_list 是代理列表,如以下示例

    [
        {
            "host": "127.0.0.1",
            "port": 9092
        }
    ]

可以指定可选的 client_config 表。以下选项如下

客户端配置

  • socket_timeout

    指定以毫秒为单位的网络超时阈值。应该大于 request_timeout

  • keepalive_timeout

    指定保持活动连接的最大空闲超时时间(以毫秒为单位)。

  • keepalive_size

    指定每个 Nginx worker 允许的连接池中的最大连接数。

  • refresh_interval

    指定以毫秒为单位的自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。

fetch_metadata

语法:brokers, partitions = c:fetch_metadata(topic)

如果成功,则返回 topic 的所有代理和分区。如果发生错误,则返回 nil 和描述错误的字符串。

refresh

语法:brokers, partitions = c:refresh()

这将刷新所有通过 fetch_metadata 获取的主题的元数据。如果成功,则返回所有代理和所有主题的所有分区。如果发生错误,则返回 nil 和描述错误的字符串。

resty.kafka.producer

要加载此模块,只需执行以下操作

        local producer = require "resty.kafka.producer"

方法

new

语法:p = producer:new(broker_list, producer_config?, cluster_name?)

建议使用异步 producer_type。

broker_listclient 中的相同

可以指定一个可选的选项表。以下选项如下

socket_timeoutkeepalive_timeoutkeepalive_sizerefresh_intervalclient_config 中的相同

生产者配置,最像 <<http://kafka.apache.org/documentation.html#producerconfigs> 中的配置

  • producer_type

    指定 producer.type。 "async" 或 "sync"

  • request_timeout

    指定 request.timeout.ms。默认值为 2000 毫秒

  • required_acks

    指定 request.required.acks不应为零。默认值为 1

  • max_retry

    指定 message.send.max.retries。默认值为 3

  • retry_backoff

    指定 retry.backoff.ms。默认值为 100

  • partitioner

    指定从键和分区数中选择分区的分区器。语法:partitioner = function (key, partition_num, correlation_id) end,correlation_id 是生产者中的自动递增 ID。默认分区器为

  • api_version

    指定生产 API 版本。默认值为 0。如果您使用的是 Kafka 0.10.0.0 或更高版本,api_version 可以使用 012。如果您使用的是 Kafka 0.9.x,api_version 应该为 01。如果您使用的是 Kafka 0.8.x,api_version 应该为 0

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
    
        -- partition_id is continuous and start from 0
        return id % num
    end

缓冲区配置(仅适用于 producer_type = "async")

  • flush_time

    指定 queue.buffering.max.ms。默认值为 1000

  • batch_num

    指定 batch.num.messages。默认值为 200

  • batch_size

    指定 send.buffer.bytes。默认值为 1M(可能达到 2M)。请注意,应该小于 kafka 服务器中 socket.request.max.bytes / 2 - 10k 配置。

  • max_buffering

    指定 queue.buffering.max.messages。默认值为 50,000

  • error_handle

    指定错误处理程序,在缓冲区发送到 kafka 时处理数据错误。语法:error_handle = function (topic, partition_id, message_queue, index, err, retryable) end,message_queue 中的失败消息类似于 `{ key1, msg1, key2, msg2 } `,即使原始消息为 nil,message_queue 中的 key 也是空字符串 ""index 是 message_queue 的长度,不应使用 #message_queue。当 retryabletrue 时,表示 kafka 服务器肯定没有提交这些消息,您可以安全地重试发送;否则表示可能没有提交,建议将其记录到某个地方。

目前不支持压缩。

第三个可选的 cluster_name 指定集群的名称,默认值为 1(是的,它是数字)。当您有两个或更多个 kafka 集群时,您可以指定不同的名称。这仅适用于 async producer_type。

send

语法:ok, err = p:send(topic, key, message)

  1. 在同步模型中

    如果成功,则返回当前代理和分区的偏移量 ( cdata: LL )。如果发生错误,则返回 nil 和描述错误的字符串。

  1. 在异步模型中

    message 将首先写入缓冲区。当缓冲区超过 batch_num 或每 flush_time 刷新缓冲区时,它将发送到 kafka 服务器。

    如果成功,则返回 true。如果发生错误,则返回 nil 和描述错误的字符串 (buffer overflow)。

offset

语法:sum, details = p:offset()

    Return the sum of all the topic-partition offset (return by the ProduceRequest api);
    and the details of each topic-partition

flush

语法:ok = p:flush()

始终返回 true

安装

您需要配置 lua_package_path 指令,将 lua-resty-kafka 源代码树的路径添加到 ngx_lua 的 LUA_PATH 搜索路径中,例如

        # nginx.conf
        http {
            lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
            ...
        }

确保运行 Nginx "worker" 进程的系统帐户具有足够的权限来读取 .lua 文件。

待办事项

  1. 获取 API

  2. 偏移 API

  3. 偏移提交/获取 API

作者

Dejiang Zhu (doujiang24) <doujiang24@gmail.com>.

版权和许可

该模块根据 BSD 许可证授权。

版权所有 (C) 2014-2014,作者:Dejiang Zhu (doujiang24) <doujiang24@gmail.com>。

保留所有权利。

在满足以下条件的情况下,允许以源代码和二进制形式重新分发和使用此软件,无论是否修改:

  • 源代码的再分发必须保留上述版权声明、此条件列表以及以下免责声明。

  • 二进制形式的再分发必须在随分发提供的文档和/或其他材料中复制上述版权声明、此条件列表以及以下免责声明。

本软件由版权持有人和贡献者“按现状”提供,并且任何明示或暗示的担保,包括但不限于适销性和特定用途适用性的暗示担保,均被排除在外。在任何情况下,版权持有人或贡献者均不对因使用本软件而导致的任何直接、间接、附带、特殊、示例性或后果性损害(包括但不限于替代商品或服务的采购;使用、数据或利润损失;或业务中断)承担责任,无论这些损害是基于何种责任理论造成的,无论是合同、严格责任还是侵权行为(包括疏忽或其他原因),即使已告知可能发生此类损害。

另请参阅

  • ngx_lua 模块:http://wiki.nginx.org/HttpLuaModule

  • kafka 协议:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

  • the lua-resty-redis library

  • the lua-resty-logger-socket library

  • the sarama

作者

doujiang24

许可证

2bsd

版本