lua-resty-kafka
基于 cosocket API 的 Openresty Lua kafka 客户端驱动
$ opm get JoshuaOliphant/lua-resty-kafka
名称
lua-resty-kafka - 基于 cosocket API 的 ngx_lua Lua kafka 客户端驱动
状态
该库仍处于早期开发阶段,目前尚处于实验阶段。
描述
该 Lua 库是 ngx_lua nginx 模块的 Kafka 客户端驱动
http://wiki.nginx.org/HttpLuaModule
该 Lua 库利用了 ngx_lua 的 cosocket API,确保了 100% 的非阻塞行为。
请注意,至少需要 ngx_lua 0.9.3 或 ngx_openresty 1.4.3.7,并且遗憾的是只支持 LuaJIT (--with-luajit
)。
概要
lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}
local key = "key"
local message = "halo world"
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)
';
}
}
模块
resty.kafka.client
要加载此模块,只需执行以下操作
local client = require "resty.kafka.client"
方法
new
语法:c = client:new(broker_list, client_config)
broker_list
是代理列表,如以下示例
[
{
"host": "127.0.0.1",
"port": 9092
}
]
可以指定可选的 client_config
表。以下选项如下
客户端配置
socket_timeout
指定以毫秒为单位的网络超时阈值。应该大于
request_timeout
。
keepalive_timeout
指定保持活动连接的最大空闲超时时间(以毫秒为单位)。
keepalive_size
指定每个 Nginx worker 允许的连接池中的最大连接数。
refresh_interval
指定以毫秒为单位的自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。
fetch_metadata
语法:brokers, partitions = c:fetch_metadata(topic)
如果成功,则返回 topic
的所有代理和分区。如果发生错误,则返回 nil
和描述错误的字符串。
refresh
语法:brokers, partitions = c:refresh()
这将刷新所有通过 fetch_metadata
获取的主题的元数据。如果成功,则返回所有代理和所有主题的所有分区。如果发生错误,则返回 nil
和描述错误的字符串。
resty.kafka.producer
要加载此模块,只需执行以下操作
local producer = require "resty.kafka.producer"
方法
new
语法:p = producer:new(broker_list, producer_config?, cluster_name?)
建议使用异步 producer_type。
broker_list
与 client
中的相同
可以指定一个可选的选项表。以下选项如下
socket_timeout
、keepalive_timeout
、keepalive_size
、refresh_interval
与 client_config
中的相同
生产者配置,最像 <<http://kafka.apache.org/documentation.html#producerconfigs> 中的配置
producer_type
指定
producer.type
。 "async" 或 "sync"
request_timeout
指定
request.timeout.ms
。默认值为2000 毫秒
required_acks
指定
request.required.acks
,不应为零。默认值为1
。
max_retry
指定
message.send.max.retries
。默认值为3
。
retry_backoff
指定
retry.backoff.ms
。默认值为100
。
partitioner
指定从键和分区数中选择分区的分区器。
语法:partitioner = function (key, partition_num, correlation_id) end
,correlation_id 是生产者中的自动递增 ID。默认分区器为
api_version
指定生产 API 版本。默认值为
0
。如果您使用的是 Kafka 0.10.0.0 或更高版本,api_version
可以使用0
、1
或2
。如果您使用的是 Kafka 0.9.x,api_version
应该为0
或1
。如果您使用的是 Kafka 0.8.x,api_version
应该为0
。
local function default_partitioner(key, num, correlation_id)
local id = key and crc32(key) or correlation_id
-- partition_id is continuous and start from 0
return id % num
end
缓冲区配置(仅适用于 producer_type
= "async")
flush_time
指定
queue.buffering.max.ms
。默认值为1000
。
batch_num
指定
batch.num.messages
。默认值为200
。
batch_size
指定
send.buffer.bytes
。默认值为1M
(可能达到 2M)。请注意,应该小于 kafka 服务器中socket.request.max.bytes / 2 - 10k
配置。
max_buffering
指定
queue.buffering.max.messages
。默认值为50,000
。
error_handle
指定错误处理程序,在缓冲区发送到 kafka 时处理数据错误。
语法:error_handle = function (topic, partition_id, message_queue, index, err, retryable) end
,message_queue 中的失败消息类似于`{ key1, msg1, key2, msg2 }
`,即使原始消息为
nil
,message_queue 中的key
也是空字符串""
。index
是 message_queue 的长度,不应使用#message_queue
。当retryable
为true
时,表示 kafka 服务器肯定没有提交这些消息,您可以安全地重试发送;否则表示可能没有提交,建议将其记录到某个地方。
目前不支持压缩。
第三个可选的 cluster_name
指定集群的名称,默认值为 1
(是的,它是数字)。当您有两个或更多个 kafka 集群时,您可以指定不同的名称。这仅适用于 async
producer_type。
send
语法:ok, err = p:send(topic, key, message)
在同步模型中
如果成功,则返回当前代理和分区的偏移量 ( cdata: LL )。如果发生错误,则返回
nil
和描述错误的字符串。
在异步模型中
message
将首先写入缓冲区。当缓冲区超过batch_num
或每flush_time
刷新缓冲区时,它将发送到 kafka 服务器。如果成功,则返回
true
。如果发生错误,则返回nil
和描述错误的字符串 (buffer overflow
)。
offset
语法:sum, details = p:offset()
Return the sum of all the topic-partition offset (return by the ProduceRequest api);
and the details of each topic-partition
flush
语法:ok = p:flush()
始终返回 true
。
安装
您需要配置 lua_package_path 指令,将 lua-resty-kafka 源代码树的路径添加到 ngx_lua 的 LUA_PATH 搜索路径中,例如
# nginx.conf
http {
lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
...
}
确保运行 Nginx "worker" 进程的系统帐户具有足够的权限来读取 .lua
文件。
待办事项
获取 API
偏移 API
偏移提交/获取 API
作者
Dejiang Zhu (doujiang24) <doujiang24@gmail.com>.
版权和许可
该模块根据 BSD 许可证授权。
版权所有 (C) 2014-2014,作者:Dejiang Zhu (doujiang24) <doujiang24@gmail.com>。
保留所有权利。
在满足以下条件的情况下,允许以源代码和二进制形式重新分发和使用此软件,无论是否修改:
源代码的再分发必须保留上述版权声明、此条件列表以及以下免责声明。
二进制形式的再分发必须在随分发提供的文档和/或其他材料中复制上述版权声明、此条件列表以及以下免责声明。
本软件由版权持有人和贡献者“按现状”提供,并且任何明示或暗示的担保,包括但不限于适销性和特定用途适用性的暗示担保,均被排除在外。在任何情况下,版权持有人或贡献者均不对因使用本软件而导致的任何直接、间接、附带、特殊、示例性或后果性损害(包括但不限于替代商品或服务的采购;使用、数据或利润损失;或业务中断)承担责任,无论这些损害是基于何种责任理论造成的,无论是合同、严格责任还是侵权行为(包括疏忽或其他原因),即使已告知可能发生此类损害。
另请参阅
ngx_lua 模块:http://wiki.nginx.org/HttpLuaModule
kafka 协议:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
the lua-resty-redis library
the lua-resty-logger-socket library
the sarama
作者
doujiang24
许可证
2bsd
版本
-
基于 cosocket API 的 Openresty Lua kafka 客户端驱动 2019-08-30 23:22:08