-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
Description
Consumer hangs if closing together with deleting topic
How to reproduce
Hello,
in ClickHouse we have an issue with an integration test if librdkafka master is more recent than 8e20e1e, IOW if librdkafka contains PR 4117.
Test scenario:
1. Six consumers consume messages from a topic with six partitions.
2. Delete the topic (not via librdkafka)
3. Close the consumers one by one
One of the consumers is more or less reproducibly hangs during closing, while virtually anything helps – it is enough to add a sleep() between (2) and (3) or even try to use a ClickHouse build with a sanitizer.
I tried to create MRU not using ClickHouse, but did not succeeded.
The scenario seems a bit insane, though it is crucial for us and effectively prevents us from using recent librdkafka.
How ClickHouse closes a consumer.
• unsubscribe
• drain queue
• free callbacks
• call rdkafka_consumer_close
ClickHouse maintains rebalance callback (actually cppkafka does).
My investigations.
Problematic part of PR 4117 is rd_kafka_toppar_keep(rktp)
Specifically where it is called from rd_kafka_toppar_pause_resume to do resume.
In rd_kafka_broker_thread_main we are waiting forever while (!rd_kafka_broker_terminating(rkb)) which is actually rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1 .
REFCNT DEBUG output https://pastila.nl/?00659d03/ee47523355fd8a694171a23c8b2a48c6
Some ClickHouse logs https://pastila.nl/?002bbb54/247b8ebbb941432451f7ae5ce10f319b
Am I right thinking that the problem is there is no suitable counterpart to read RD_KAFKA_OP_BARRIER from fetch queue?
Is it possible to resolve this problem at application side?
Checklist
- librdkafka version 8e20e1e
- Apache Kafka version:
image: confluentinc/cp-kafka:5.2.0
- librdkafka client configuration: https://pastila.nl/?07076592/a59038db454e37bda6f455f41a9e4020)](https://pastila.nl/?07076592/a59038db454e37bda6f455f41a9e4020)
- Operating system:
ubuntu:22.04
- Provide logs (with
debug=..
as necessary) from librdkafka - Provide broker log excerpts
- Critical issue