lua-resty-kafka
基于 cosocket API 的 OpenResty Lua Kafka 客户端驱动
$ opm get doujiang24/lua-resty-kafka
名称
lua-resty-kafka - 基于 cosocket API 的 ngx_lua Lua Kafka 客户端驱动
状态
该库仍处于早期开发阶段,尚处于实验性阶段。
描述
此 Lua 库是 ngx_lua nginx 模块的 Kafka 客户端驱动
http://wiki.nginx.org/HttpLuaModule
此 Lua 库利用了 ngx_lua 的 cosocket API,确保了 100% 的非阻塞行为。
请注意,至少需要 ngx_lua 0.9.3 或 openresty 1.4.3.7,并且不幸的是只支持 LuaJIT (--with-luajit
)。
对于 ssl
连接,至少需要 ngx_lua 0.9.11 或 openresty 1.7.4.1,并且不幸的是只支持 LuaJIT (--with-luajit
)。
概要
lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- optional auth
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)
';
}
}
模块
resty.kafka.client
要加载此模块,只需执行以下操作
local client = require "resty.kafka.client"
方法
new
语法: c = client:new(broker_list, client_config)
broker_list
是代理列表,如下所示
[
{
"host": "127.0.0.1",
"port": 9092,
// optional auth
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
可以指定一个可选的 client_config
表格。以下选项如下所示
客户端配置
socket_timeout
指定网络超时阈值(以毫秒为单位)。应该大于
request_timeout
。
keepalive_timeout
指定保持活动连接的最大空闲超时时间(以毫秒为单位)。
keepalive_size
指定每个 Nginx 工作进程允许的最大连接数。
refresh_interval
指定以毫秒为单位自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。
ssl
指定客户端是否应使用 ssl 连接。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
ssl_verify
指定客户端是否应执行 SSL 验证。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
resolver
指定一个主机解析函数,该函数返回一个 IP 字符串或
nil
,以覆盖系统默认的主机解析器。默认为nil
,不执行解析。示例function(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
语法: brokers, partitions = c:fetch_metadata(topic)
如果成功,则返回 topic
的所有代理和分区。如果出错,则返回 nil
和描述错误的字符串。
refresh
语法: brokers, partitions = c:refresh()
这将刷新所有通过 fetch_metadata
获取的主题的元数据。如果成功,则返回所有代理和所有主题的所有分区。如果出错,则返回 nil
和描述错误的字符串。
choose_api_version
语法: api_version = c:choose_api_version(api_key, min_version, max_version)
这有助于客户端为 API 选择正确的 api_key
版本。
当提供 min_version
和 max_version
时,它将充当限制,并且返回值中的所选版本无论代理支持的 API 版本有多高或多低都不会超过其限制。当它们未提供时,它将遵循代理支持的版本范围。
提示:版本选择策略是在允许的范围内选择最大版本。
resty.kafka.producer
要加载此模块,只需执行以下操作
local producer = require "resty.kafka.producer"
方法
new
语法: p = producer:new(broker_list, producer_config?, cluster_name?)
建议使用异步 producer_type。
broker_list
与 client
中的相同
可以指定一个可选的选项表。以下选项如下所示
socket_timeout
、keepalive_timeout
、keepalive_size
、refresh_interval
、ssl
、ssl_verify
与 client_config
中的相同
生产者配置,大多类似于 <http://kafka.apache.org/documentation.html#producerconfigs>
producer_type
指定
producer.type
。“async” 或 “sync”
request_timeout
指定
request.timeout.ms
。默认为2000 ms
required_acks
指定
request.required.acks
,不应为零。默认为1
。
max_retry
指定
message.send.max.retries
。默认为3
。
retry_backoff
指定
retry.backoff.ms
。默认为100
。
api_version
指定生产 API 版本。默认为
0
。如果使用 Kafka 0.10.0.0 或更高版本,api_version
可以使用0
、1
或2
。如果使用 Kafka 0.9.x,api_version
应为0
或1
。如果使用 Kafka 0.8.x,api_version
应为0
。
partitioner
指定从键和分区数中选择分区的分配器。
语法: partitioner = function (key, partition_num, correlation_id) end
,correlation_id 是生产者中的自动递增 ID。默认分配器为`lua local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id is continuous and start from 0 return id % num end
`
缓冲区配置(仅当 producer_type
= “async” 时有效)
flush_time
指定
queue.buffering.max.ms
。默认为1000
。
batch_num
指定
batch.num.messages
。默认为200
。
batch_size
指定
send.buffer.bytes
。默认为1M
(可能达到 2M)。请注意,应小于 kafka 服务器中socket.request.max.bytes / 2 - 10k
配置。
max_buffering
指定
queue.buffering.max.messages
。默认为50,000
。
error_handle
指定错误处理程序,处理缓冲区发送到 kafka 错误时的数据。
语法: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end
,message_queue 中失败的消息类似于`{ key1, msg1, key2, msg2 }
`,即使原始值为
nil
,message_queue 中的key
也是空字符串""
。index
是 message_queue 的长度,不应使用#message_queue
。当retryable
为true
时,表示 kafka 服务器肯定未提交这些消息,您可以安全地重试发送;否则表示可能,建议记录到某个位置。
目前不支持压缩。
第三个可选的 cluster_name
指定集群的名称,默认为 1
(是的,它是数字)。当您有两个或多个 kafka 集群时,可以指定不同的名称。这仅在使用 async
producer_type 时有效。
send
语法: ok, err = p:send(topic, key, message)
在同步模式下
如果成功,则返回当前代理和分区的偏移量 ( cdata: LL )。如果出错,则返回
nil
和描述错误的字符串。
在异步模式下
message
将首先写入缓冲区。当缓冲区超过batch_num
或每隔flush_time
刷新缓冲区时,它将发送到 kafka 服务器。如果成功,则返回
true
。如果出错,则返回nil
和描述错误的字符串(buffer overflow
)。
offset
语法: sum, details = p:offset()
Return the sum of all the topic-partition offset (return by the ProduceRequest api);
and the details of each topic-partition
flush
语法: ok = p:flush()
始终返回 true
。
resty.kafka.basic-consumer
要加载此模块,只需执行以下操作
local bconsumer = require "resty.kafka.basic-consumer"
此模块是消费者的一种极简实现,提供用于按时间查询或获取起始和结束偏移量的 list_offset
API,以及用于获取主题中消息的 fetch
API。
在单个调用中,只能获取单个主题中单个分区的相关信息,目前不支持批量获取。基本消费者不支持与消费者组相关的 API,因此您需要在通过 list_offset
API 获取偏移量后获取消息,或者您的服务可以自行管理偏移量。
方法
new
语法: c = bconsumer:new(broker_list, client_config)
broker_list
是代理列表,如下所示
[
{
"host": "127.0.0.1",
"port": 9092,
// optional auth
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
可以指定一个可选的 client_config
表格。以下选项如下所示
客户端配置
socket_timeout
指定网络超时阈值(以毫秒为单位)。应该大于
request_timeout
。
keepalive_timeout
指定保持活动连接的最大空闲超时时间(以毫秒为单位)。
keepalive_size
指定每个 Nginx 工作进程允许的最大连接数。
refresh_interval
指定以毫秒为单位自动刷新元数据的间隔时间。如果为 nil,则元数据不会自动刷新。
ssl
指定客户端是否应使用 ssl 连接。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
ssl_verify
指定客户端是否应执行 SSL 验证。默认为 false。请参阅:https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
isolation_level
此设置控制事务记录的可见性。请参阅:https://kafka.apache.org/protocol.html
client_rack
发出此请求的消费者的机架 ID。请参阅:https://kafka.apache.org/protocol.html
list_offset
语法: offset, err = c:list_offset(topic, partition, timestamp)
参数 timestamp 可以是 UNIX 时间戳或 resty.kafka.protocol.consumer
中定义的常量,LIST_OFFSET_TIMESTAMP_LAST
、LIST_OFFSET_TIMESTAMP_FIRST
、LIST_OFFSET_TIMESTAMP_MAX
,用于获取初始和最新偏移量等,语义与 Apache Kafka 中的 ListOffsets API 相同。请参阅:https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
如果成功,则返回指定情况下的偏移量。如果出错,则返回 nil
和描述错误的字符串。
fetch
语法: result, err = c:fetch(topic, partition, offset)
如果成功,则返回指定情况下的以下 result
。如果出错,则返回 nil
和描述错误的字符串。
result
将包含更多信息,例如消息
records
包含消息内容的表。
errcode
Fetch API 的错误代码。请参阅:https://kafka.apache.org/protocol.html#protocol_error_codes
high_watermark
Fetch API 的高水位线。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch
last_stable_offset
Fetch API 的最后一个稳定偏移量。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v4
log_start_offset
Fetch API 的日志起始偏移量。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v5
aborted_transactions
Fetch API 中中止的事务。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v4
preferred_read_replica
Fetch API 的首选读取副本。内容取决于 API 版本,可能为 nil。请参阅:https://kafka.apache.org/protocol.html#The_Messages_Fetch 响应 API 版本高于 v11
错误
当您调用此库中提供的模块时,可能会遇到一些错误。根据来源,它们可以分为以下几类。
网络错误:例如连接被拒绝、连接超时等。您需要检查环境中每个服务的连接状态。
与元数据相关的错误:例如无法正确检索元数据或 ApiVersion 数据;指定的主题或分区不存在等。您需要检查 Kafka Broker 和客户端配置。
Kafka 返回的错误:有时 Kafka 会在响应数据中包含 err_code 数据,当此问题发生时,返回值中的
err
类似于OFFSET_OUT_OF_RANGE
,所有字符均为大写,并以下划线分隔,在当前库中,我们提供 一个映射错误列表,对应于文本描述。要了解有关这些错误的更多信息,请参阅 Kafka 文档 中的描述。
安装
您需要配置 lua_package_path 指令,将 lua-resty-kafka 源代码树的路径添加到 ngx_lua 的 LUA_PATH 搜索路径中,如下所示
# nginx.conf
http {
lua_package_path "/path/to/lua-resty-kafka/lib/?.lua;;";
...
}
确保运行 Nginx ''worker'' 进程的系统帐户具有读取 .lua
文件的足够权限。
待办事项
Fetch API
Offset API
Offset Commit/Fetch API
作者
朱德江 (doujiang24) <doujiang24@gmail.com>。
版权和许可证
此模块根据 BSD 许可证发布。
版权所有 (C) 2014-2020,朱德江 (doujiang24) <doujiang24@gmail.com>。
保留所有权利。
允许重新分发和使用源代码和二进制形式,无论是否修改,前提是满足以下条件
源代码的重新分发必须保留上述版权声明、此条件列表和以下免责声明。
二进制形式的重新分发必须在随分发提供的文档和/或其他材料中复制上述版权声明、此条件列表和以下免责声明。
本软件由版权持有人和贡献者“按原样”提供,并且不提供任何明示或暗示的担保,包括但不限于适销性和特定用途适用性的暗示担保。在任何情况下,版权持有人或贡献者均不对任何直接、间接、附带、特殊、惩罚性或后果性损害(包括但不限于替代商品或服务的采购;使用、数据或利润损失;或业务中断)负责,无论这些损害是如何造成的以及基于任何责任理论,无论是合同、严格责任或侵权行为(包括疏忽或其他原因),即使已被告知此类损害的可能性。
另请参阅
ngx_lua 模块:http://wiki.nginx.org/HttpLuaModule
Kafka 协议:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
the lua-resty-redis 库
the lua-resty-logger-socket 库
the sarama
作者
doujiang24
许可证
2bsd
版本
-
基于 cosocket API 的 Openresty Lua Kafka 客户端驱动程序 2022-05-06 00:58:31
-
基于 cosocket API 的 Openresty Lua Kafka 客户端驱动程序 2021-10-14 08:20:56
-
基于 cosocket API 的 Openresty Lua Kafka 客户端驱动程序 2020-04-19 14:00:03
-
基于 cosocket API 的 Openresty Lua Kafka 客户端驱动程序 2019-08-31 00:57:45
-
基于 cosocket API 的 Openresty Lua Kafka 客户端驱动程序 2016-09-30 07:50:43