-
-
Notifications
You must be signed in to change notification settings - Fork 7
Add shutdown_strategy_before_consumer flag #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Full stacktrace we are encountering: File "sentry/utils/kafka.py", line 52, in run_processor_with_signals processor.run() File "arroyo/processing/processor.py", line 335, in run self._run_once() File "arroyo/processing/processor.py", line 418, in _run_once self.__processing_strategy.poll() File "arroyo/processing/strategies/healthcheck.py", line 32, in poll self.__next_step.poll() File "arroyo/processing/strategies/guard.py", line 101, in poll self.__inner_strategy.poll() File "arroyo/processing/strategies/run_task.py", line 55, in poll self.__next_step.poll() File "arroyo/processing/strategies/guard.py", line 37, in poll self.__next_step.poll() File "arroyo/processing/strategies/batching.py", line 82, in poll self.__reduce_step.poll() File "arroyo/processing/strategies/reduce.py", line 110, in poll self.__buffer_step.poll() File "arroyo/processing/strategies/buffer.py", line 165, in poll self.__flush(force=False) File "arroyo/processing/strategies/buffer.py", line 125, in __flush self.__next_step.submit(buffer_msg) File "arroyo/processing/strategies/guard.py", line 82, in submit self.__inner_strategy.submit(message) File "arroyo/processing/strategies/run_task.py", line 52, in submit self.__next_step.submit(Message(value)) File "arroyo/processing/strategies/guard.py", line 34, in submit self.__next_step.submit(message) File "sentry/spans/consumers/process/flusher.py", line 316, in submit self.next_step.submit(message) File "arroyo/processing/strategies/commit.py", line 34, in submit self.__commit(message.committable) File "arroyo/processing/processor.py", line 321, in __commit self.__consumer.commit_offsets() File "arroyo/backends/kafka/consumer.py", line 624, in commit_offsets return self.__commit_retry_policy.call(self.__commit) File "arroyo/utils/retries.py", line 88, in call return callable() File "arroyo/backends/kafka/consumer.py", line 582, in __commit result = self.__consumer.commit( KafkaException: KafkaError{code=UNKNOWN_MEMBER_ID,val=25,str="Commit failed: Broker: Unknown member"}
# If shutdown_strategy_before_consumer is set, work around an issue | ||
# where rdkafka would revoke our partition, but then also immediately | ||
# revoke our member ID as well, causing join() of the CommitStrategy | ||
# (that is running in the partition revocation callback) to crash. | ||
if self.__shutdown_strategy_before_consumer: | ||
self._close_processing_strategy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand the failure mode here. I am aware there is a race condition somewhere in arroyo during shutdown, but could you please elaborate on the sequence of events we are seeing here?
Specifically:
- Is this happening at rebalancing only or during shutdown only or both? This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?
- What do you mean with "immediately revoke our member id as well" ? If that was causing the commit to fail wouldn't we experience crashes constantly in all consumers that commit during partition revocation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this happening at rebalancing only or during shutdown only or both? This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?
only during shutdown. during shutdown, we trigger rebalancing through rdkafka_consumer.close()
, which in turn closes the strategy.
This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?
self._close_consumer_strategy()
is also called during partition revocation. i'm trying to call it also before partition revocation to reduce the likelihood that we get a failing commit. I don't yet know if that will work, to be honest.
What do you mean with "immediately revoke our member id as well" ? If that was causing the commit to fail wouldn't we experience crashes constantly in all consumers that commit during partition revocation ?
from what i can tell this also happens in other consumers, but much less frequently. i plainly don't understand why span buffer is more affected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only during shutdown. during shutdown, we trigger rebalancing through rdkafka_consumer.close(), which in turn closes the strategy.
So if this happens only during shutdown, is there any actual impact on the work the consumer does? Wouldn't the consumer just crash instead of terminating cleanly ? Does this have any impact ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from what i can tell this also happens in other consumers, but much less frequently. i plainly don't understand why span buffer is more affected.
So are you saying that, at times, in some consumers, the memebr id is revoked before running the revocation callback ?And that we do not know why ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So are you saying that, at times, in some consumers, the memebr id is revoked before running the revocation callback ?And that we do not know why ?
yes that's the issue I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a metric to check whether, when we close the consumer, the member_id is populated? While it may be possible that rdkafka still thinks it has a member id thus the field is still populated,, maybe we can see what is actually happening. I think it is worth a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my other questions.
Aside from your implementation, I have a suggestion to get to the bottom of this.
We have a log line right before we close the consumer and one at the beginning of the partition revoked callback.
Do you see a long delay between the two ?
try: | ||
self.__processing_strategy.join(self.__join_timeout) | ||
self.__metrics_buffer.incr_timing( | ||
"arroyo.consumer.join.time", time.time() - start_join | ||
) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code may raise MessageRejected. If that happens we would skip these lines
self.__consumer.close()
self.__processor_factory.shutdown()
at line 530
Are you sure that is not a problem ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we are supposed to raise MessageRejected in join(). if join calls submit and it internally raises MessageRejected, join() is supposed to handle it. empirically this is true because the code you are reviewing has been part of partition revocation before and we've not seen MessageRejected crashes there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if join calls submit and it internally raises MessageRejected, join() is supposed to handle it. empirically this is true because the code you are reviewing has been part of partition revocation before and we've not seen MessageRejected crashes there
I agree they should not. Could you please do a quick check if this is happening on custom strategies. It should be a quick search.
no they happen within the same second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please consider doing the check on custom join
methods before merging.
checked getsentry sources and also searched for MessageRejected errors in our sentry project |
I missed this comment. I will do this before bumping arroyo |
Full stacktrace we are encountering: