-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
Description
we enconter a issue when closing the consumer in Rust SDK, it hangs calling rd_kafka_assign() in rebalance_cb.
code is like (using C code)
rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE|RD_KAFKA_EVENT_OFFSET_COMMIT|RD_KAFKA_EVENT_ERROR|RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH);
rd_kafka_poll_set_consumer(rk);
rd_kafka_queue_t * queue = rd_kafka_queue_get_consumer(rk);
// should poll here, but we don't do poll to reproduce the bug
// close consumer
rd_kafka_consumer_close_queue(rk, queue);
while (!rd_kafka_consumer_closed(rk)) {
rd_kafka_event_t *rkev;
rkev = rd_kafka_queue_poll(queue, 60 * 1000);
fprintf(stderr, "%% Get event %s \n", rd_kafka_event_name(rkev));
if (rkev) {
if (rd_kafka_event_type(rkev) == RD_KAFKA_EVENT_REBALANCE) {
rebalance_cb_event(rk, rkev);
}
}
}
static void rebalance_cb_event(rd_kafka_t *rk,
rd_kafka_event_t* rkev) {
rd_kafka_error_t *error = NULL;
rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
switch (rd_kafka_event_error(rkev)) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
error = rd_kafka_incremental_assign(
rk, rd_kafka_topic_partition_list_new(0));
}
else {
ret_err = rd_kafka_assign(
rk, rd_kafka_topic_partition_list_new(0));
printf(stderr, "assign ret: %s\n",
rd_kafka_err2str(ret_err));
sleep(3);
}
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
print_partition_list(stderr, rd_kafka_topic_partition_list_new(0));
if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) {
error = rd_kafka_incremental_unassign(rk, rd_kafka_topic_partition_list_new(0));
} else {
ret_err = rd_kafka_assign(rk, NULL);
wait_eof = 0;
}
break;
default:
rd_kafka_assign(rk, NULL);
break;
}
if (error) {
fprintf(stderr, "incremental assign failure: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
} else if (ret_err) {
fprintf(stderr, "assign failure: %s\n",
rd_kafka_err2str(ret_err));
}
}
In some cases the program will never exist, and the call stack hangs in rd_kafka_assign
* frame #0: 0x00000001916019ec libsystem_kernel.dylib`__psynch_cvwait + 8
frame #1: 0x000000019163f55c libsystem_pthread.dylib`_pthread_cond_wait + 1228
frame #2: 0x00000001003ee258 rdkafka_complex_consumer_example`cnd_wait + 12
frame #3: 0x0000000100397c14 rdkafka_complex_consumer_example`rd_kafka_q_pop_serve + 620
frame #4: 0x000000010039a764 rdkafka_complex_consumer_example`rd_kafka_op_req + 148
frame #5: 0x00000001003ce394 rdkafka_complex_consumer_example`rd_kafka_assign + 80
frame #6: 0x00000001003686f0 rdkafka_complex_consumer_example`main [inlined] rebalance_cb_event(rk=0x000000014e010a00, rkev=<unavailable>) at rdkafka_complex_consumer_example.c:175:35 [opt]
frame #7: 0x0000000100368634 rdkafka_complex_consumer_example`main(argc=<unavailable>, argv=<unavailable>) at rdkafka_complex_consumer_example.c:619:33 [opt]
After investigation, it seems that:
- consumer start and join/sync group success.
- tigger a rebalance event, since user didn't poll, it will not be handled
- rd_kafka_consumer_close_queue to trigger a RD_KAFKA_OP_TERMINATE in cgrp
- when closing rd_kafka_queue_poll will be called, and rebalance event handled
- RD_KAFKA_OP_ASSIGN send into cgrp_op queue
- RD_KAFKA_OP_TERMINATE handled, and exit the rd_kafka_q_serve
- rd_kafka_cgrp_serve called rd_kafka_q_purge(rkcg->rkcg_ops); to purge the RD_KAFKA_OP_ASSIGN event
- main thread never get reply from RD_KAFKA_OP_ASSIGN, so it hangs forever
// in rdkafka.c thread_main:
int cnt = rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);
a proper fix is to handle all rkcg->rkcg_ops before rd_kafka_q_purge(rkcg->rkcg_ops);
PR
How to reproduce
It's a random case, i managed to re-produce it after i modify many code:
- change heart beat response to always error ( so we are not in the group)
- change rd_kafka_q_serve in main thread to handle 1 op at a time
- disable rd_kafka_cgrp_serve before RD_KAFKA_OP_GET_REBALANCE_PROTOCOL called so RD_KAFKA_OP_ASSIGN can be in queue first and handle after rd_kafka_cgrp_serve called.
aiquestion@1bfc1e4
build rdkafka_complex_consumer_example.c and run it. it will hang. after apply the fix in PR, it can close successfully.
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/confluentinc/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- librdkafka version (release number or git tag):
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
- Apache Kafka version:
<REPLACE with e.g., 0.10.2.3>
- librdkafka client configuration:
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
- Operating system:
<REPLACE with e.g., Centos 5 (x64)>
- Provide logs (with
debug=..
as necessary) from librdkafka - Provide broker log excerpts
- Critical issue