发表文章

[C] 有关本地的问题: 队列已满, 如何从队列中删除消息 A problem about Local:Queue full and how can I delete message from queue[librdkafka]

q1f3 2017-10-9 145

您好, 我有一个关于 "本地: 队列已满" 的问题。
我设置 "rd_kafka_conf_set (安排," queue.buffering.max.messages "," 10000 ", errstr, sizeof (errstr));"。
我做了一个测试, printf 消息号, 当消息号大于队列. 缓冲最大消息时, 它将打印 "本地: 队列已满"。
如何从队列中删除消息。
9991
9992
9993
9994
9995
9996
9997
9998
9999
10000
10001
% 未能生成主题 snort_packet: 本地: 队列已满
10001
% 未能生成主题 snort_packet: 本地: 队列已满
10001
% 未能生成主题 snort_packet: 本地: 队列已满
10001
% 未能生成主题 snort_packet: 本地: 队列已满
如何重现

清单

请提供以下信息:

  • librdkafka 版本 (发行号或 git 标记): 最新
  • 卡夫卡版本:
  • librdkafka 客户端配置:
  • 操作系统:
  • 使用旧版消费者
  • 使用高级 KafkaConsumer
  • 从 librdkafka 提供日志 (必要时使用 debug=.. )
  • 提供代理日志摘录
  • 关键问题
原文:

Hi,I have a problem about "Local:Queue full".
I set " rd_kafka_conf_set(conf,"queue.buffering.max.messages","10000",errstr,sizeof(errstr));".
And I do a test that printf message number, when message number is bigger than queue.buffering.max.messages,it will print "Local:Queue full".
How can I delete message from queue.
9991
9992
9993
9994
9995
9996
9997
9998
9999
10000
10001
% Failed to produce to topic snort_packet: Local: Queue full
10001
% Failed to produce to topic snort_packet: Local: Queue full
10001
% Failed to produce to topic snort_packet: Local: Queue full
10001
% Failed to produce to topic snort_packet: Local: Queue full
How to reproduce

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag):the latest
  • Apache Kafka version:
  • librdkafka client configuration:
  • Operating system:
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
相关推荐
最新评论 (20)
zhaowq32 2017-10-9
1

我认为你可以设置 "queue.buffering.max.messages" 更大。

原文:

I think you can set "queue.buffering.max.messages" much bigger.

q1f3 2017-10-9
2

@zhaowq32谢谢你的回信!我为 "queue.buffering.max.messages" 设定了最大的价值, 而且问题也无法解决。

原文:

@zhaowq32 Thanks for your reply! I have set biggest value for "queue.buffering.max.messages",and problem also can't be solved.

zhaowq32 2017-10-9
3

消息是否已成功发送?

原文:

Has the message been sent successfully?

q1f3 2017-10-9
4

@zhaowq32消息在 "本地: 队列已满" 发生时无法发送。

原文:

@zhaowq32 message can't be sent when "Local:Queue full" happened.

zhaowq32 2017-10-9
5

我的意思是, 如果消息可以发送成功之前, 错误发生

原文:

I mean if the message can be sent successfully before the error occurs

edenhill 2017-10-9
6

你能提供一些代码吗?

原文:

Can you provide some code?

edenhill 2017-10-9
7

如果您已注册了传递报告回调 (dr_msg_cb), 则必须不断致电 rd_kafka_poll () 来为报告提供服务。

原文:

If you've registered a delivery report callback (dr_msg_cb) you must call rd_kafka_poll() continually to serve the reports.

q1f3 2017-10-9
8

@zhaowq32消息在发生错误之前可以成功发送

原文:

@zhaowq32 message can be sent successfully before the error occurs

q1f3 2017-10-9
9

@edenhill谢谢你的回信。我已经调用了 rd_kafka_poll (), 它将发生在4小时后的 "本地: 队列已满"。
if(rd_kafka_produce(rkt,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,len,NULL,0,NULL) == -1) { fprintf(stderr,"%% Failed to produce to topic %s: %s\n",rd_kafka_topic_name(rkt),rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_poll(rk,1000); }

原文:

@edenhill Thanks for your reply.I have called rd_kafka_poll(),and it will be occur "Local: Queue full" about 4 hours later.
if(rd_kafka_produce(rkt,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_FREE,buf,len,NULL,0,NULL) == -1) { fprintf(stderr,"%% Failed to produce to topic %s: %s\n",rd_kafka_topic_name(rkt),rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_poll(rk,1000); }

q1f3 2017-10-9
10

@edenhill我已经注释了代码。我不知道会发生什么, 如果我注释了代码。
while (unlikely(rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt || (unsigned long long)(rk->rk_curr_msgs.size + size) > (unsigned long long)rk->rk_curr_msgs.max_size)) { if (!block) { //mtx_unlock(&rk->rk_curr_msgs.lock); //return RD_KAFKA_RESP_ERR__QUEUE_FULL; } cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); }

原文:

@edenhill I have commented out the code.I don't know what will be happened,if I comment out the code.
while (unlikely(rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt || (unsigned long long)(rk->rk_curr_msgs.size + size) > (unsigned long long)rk->rk_curr_msgs.max_size)) { if (!block) { //mtx_unlock(&rk->rk_curr_msgs.lock); //return RD_KAFKA_RESP_ERR__QUEUE_FULL; } cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock); }

zhaowq32 2017-10-9
11

我想你可以添加一些代码, 如下面的 rd_kafka_produce。

while(rd_kafka_outq_len(rk) > 0)
{
    rd_kafka_poll(rk, 1000);
}
原文:

I think you could add some code like below after rd_kafka_produce.

while(rd_kafka_outq_len(rk) > 0)
{
    rd_kafka_poll(rk, 1000);
}
q1f3 2017-10-9
12

@zhaowq32好的, 谢谢, 我应该试一下。我想知道邮件被传递到卡夫卡服务器后是否可以从队列中删除。

原文:

@zhaowq32 OK,thank you,I should have a try.And I want to know if the message can be deleted from queue after the message has been delivered to kafka server.

zhaowq32 2017-10-9
13

我认为消息将被删除的队列自动后, 它传递到卡夫卡服务器。

原文:

I think the message would be deleted from queue automaticly after it delivered to kafka server.

q1f3 2017-10-9
14

@zhaowq32我添加代码 {当 (rd_kafka_outq_len (rk) > 0)...} 您在 rd_kafka_produce 后建议, 它会调用 rd_kafka_msg_destroy 删除消息。而我得到另一个错误 "自由 (): 无效的指针: 0x0000000000ed29"。错误可能是由 rd_kafka_poll (rk, 1000) 引起的。

原文:

@zhaowq32 I add the code {while(rd_kafka_outq_len(rk) > 0) ...} that you suggested after rd_kafka_produce,it calls rd_kafka_msg_destroy to delete message.And I get the other error "free(): invalid pointer: 0x0000000000ed29".The error may be caused by rd_kafka_poll(rk, 1000).

zhaowq32 2017-10-9
15

你在 rd_kafka_produce 后打电话给 rd_kafka_msg_destroy 吗?如果你的产品 msgflags 是 ' RD_KAFKA_MSG_F_FREE ', 那么你不应该打电话给 rd_kafka_msg_destroy。

原文:

Did you call rd_kafka_msg_destroy after rd_kafka_produce?If your produce msgflags is 'RD_KAFKA_MSG_F_FREE',then you shouldn't call rd_kafka_msg_destroy.

q1f3 2017-10-9
16

@zhaowq32非常感谢, 一切都很好。

原文:

@zhaowq32 Thank you very much,everything is good.

duoluowangzi 2017-10-9
17

我遇到了同样的问题。当回调失败时, 我重新发送 msg, 几天后就会出现错误。我不知道为什么味精会留在排队这么多。

原文:

I have encountered the same problem. I resend the msg when callback is fail, and after a few days the error will occur. I dont known why msg stay in the queue for so many.

zhaowq32 2017-10-9
18

@duoluowangzi在消息超时 (消息. 超时) 之前, 本地队列将存储未传递到卡夫卡服务器的消息。当网络出现波动或出现其他错误时, 队列中会堆积大量的消息, 从而导致队列已满。

原文:

@duoluowangzi Local queue will storage the message that not delivered to kafka server until the message timeout(message.timeout.ms).When the network has fluctuated or there are other errors, there will be a lot of messages stacked in the queue, resulting in the queue full.

baiwfg2 2017-10-9
19

@edenhill如果没有注册任何回调, 是否仍需要调用 poll ?如果需要, 是否在单独的线程中调用 poll

原文:

@edenhill If I haven't registered any callback, do I still need to call poll ? If need, whether to call poll in a separate thread or not ?

edenhill 2017-10-9
20

@baiwfg2不, 没有回调, 将没有任何排队轮询 ()

原文:

@baiwfg2 Nope, without callbacks there will be nothing enqueued to poll()

返回
发表文章
q1f3
文章数
2
评论数
16
注册排名
19549