Skip to content

Conversation

untitaker
Copy link
Member

Full stacktrace we are encountering:

  File "sentry/utils/kafka.py", line 52, in run_processor_with_signals
    processor.run()
  File "arroyo/processing/processor.py", line 335, in run
    self._run_once()
  File "arroyo/processing/processor.py", line 418, in _run_once
    self.__processing_strategy.poll()
  File "arroyo/processing/strategies/healthcheck.py", line 32, in poll
    self.__next_step.poll()
  File "arroyo/processing/strategies/guard.py", line 101, in poll
    self.__inner_strategy.poll()
  File "arroyo/processing/strategies/run_task.py", line 55, in poll
    self.__next_step.poll()
  File "arroyo/processing/strategies/guard.py", line 37, in poll
    self.__next_step.poll()
  File "arroyo/processing/strategies/batching.py", line 82, in poll
    self.__reduce_step.poll()
  File "arroyo/processing/strategies/reduce.py", line 110, in poll
    self.__buffer_step.poll()
  File "arroyo/processing/strategies/buffer.py", line 165, in poll
    self.__flush(force=False)
  File "arroyo/processing/strategies/buffer.py", line 125, in __flush
    self.__next_step.submit(buffer_msg)
  File "arroyo/processing/strategies/guard.py", line 82, in submit
    self.__inner_strategy.submit(message)
  File "arroyo/processing/strategies/run_task.py", line 52, in submit
    self.__next_step.submit(Message(value))
  File "arroyo/processing/strategies/guard.py", line 34, in submit
    self.__next_step.submit(message)
  File "sentry/spans/consumers/process/flusher.py", line 316, in submit
    self.next_step.submit(message)
  File "arroyo/processing/strategies/commit.py", line 34, in submit
    self.__commit(message.committable)
  File "arroyo/processing/processor.py", line 321, in __commit
    self.__consumer.commit_offsets()
  File "arroyo/backends/kafka/consumer.py", line 624, in commit_offsets
    return self.__commit_retry_policy.call(self.__commit)
  File "arroyo/utils/retries.py", line 88, in call
    return callable()
  File "arroyo/backends/kafka/consumer.py", line 582, in __commit
    result = self.__consumer.commit(

KafkaException: KafkaError{code=UNKNOWN_MEMBER_ID,val=25,str="Commit failed: Broker: Unknown member"}

Full stacktrace we are encountering:

      File "sentry/utils/kafka.py", line 52, in run_processor_with_signals
        processor.run()
      File "arroyo/processing/processor.py", line 335, in run
        self._run_once()
      File "arroyo/processing/processor.py", line 418, in _run_once
        self.__processing_strategy.poll()
      File "arroyo/processing/strategies/healthcheck.py", line 32, in poll
        self.__next_step.poll()
      File "arroyo/processing/strategies/guard.py", line 101, in poll
        self.__inner_strategy.poll()
      File "arroyo/processing/strategies/run_task.py", line 55, in poll
        self.__next_step.poll()
      File "arroyo/processing/strategies/guard.py", line 37, in poll
        self.__next_step.poll()
      File "arroyo/processing/strategies/batching.py", line 82, in poll
        self.__reduce_step.poll()
      File "arroyo/processing/strategies/reduce.py", line 110, in poll
        self.__buffer_step.poll()
      File "arroyo/processing/strategies/buffer.py", line 165, in poll
        self.__flush(force=False)
      File "arroyo/processing/strategies/buffer.py", line 125, in __flush
        self.__next_step.submit(buffer_msg)
      File "arroyo/processing/strategies/guard.py", line 82, in submit
        self.__inner_strategy.submit(message)
      File "arroyo/processing/strategies/run_task.py", line 52, in submit
        self.__next_step.submit(Message(value))
      File "arroyo/processing/strategies/guard.py", line 34, in submit
        self.__next_step.submit(message)
      File "sentry/spans/consumers/process/flusher.py", line 316, in submit
        self.next_step.submit(message)
      File "arroyo/processing/strategies/commit.py", line 34, in submit
        self.__commit(message.committable)
      File "arroyo/processing/processor.py", line 321, in __commit
        self.__consumer.commit_offsets()
      File "arroyo/backends/kafka/consumer.py", line 624, in commit_offsets
        return self.__commit_retry_policy.call(self.__commit)
      File "arroyo/utils/retries.py", line 88, in call
        return callable()
      File "arroyo/backends/kafka/consumer.py", line 582, in __commit
        result = self.__consumer.commit(

    KafkaException: KafkaError{code=UNKNOWN_MEMBER_ID,val=25,str="Commit failed: Broker: Unknown member"}
@untitaker untitaker requested review from a team as code owners June 27, 2025 10:36
Comment on lines +520 to +525
# If shutdown_strategy_before_consumer is set, work around an issue
# where rdkafka would revoke our partition, but then also immediately
# revoke our member ID as well, causing join() of the CommitStrategy
# (that is running in the partition revocation callback) to crash.
if self.__shutdown_strategy_before_consumer:
self._close_processing_strategy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the failure mode here. I am aware there is a race condition somewhere in arroyo during shutdown, but could you please elaborate on the sequence of events we are seeing here?
Specifically:

  1. Is this happening at rebalancing only or during shutdown only or both? This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?
  2. What do you mean with "immediately revoke our member id as well" ? If that was causing the commit to fail wouldn't we experience crashes constantly in all consumers that commit during partition revocation ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this happening at rebalancing only or during shutdown only or both? This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?

only during shutdown. during shutdown, we trigger rebalancing through rdkafka_consumer.close(), which in turn closes the strategy.

This method is only called during consumer shutdown, so even if the commit fails there and crash does it really represent a problem in your consumer ?

self._close_consumer_strategy() is also called during partition revocation. i'm trying to call it also before partition revocation to reduce the likelihood that we get a failing commit. I don't yet know if that will work, to be honest.

What do you mean with "immediately revoke our member id as well" ? If that was causing the commit to fail wouldn't we experience crashes constantly in all consumers that commit during partition revocation ?

from what i can tell this also happens in other consumers, but much less frequently. i plainly don't understand why span buffer is more affected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only during shutdown. during shutdown, we trigger rebalancing through rdkafka_consumer.close(), which in turn closes the strategy.

So if this happens only during shutdown, is there any actual impact on the work the consumer does? Wouldn't the consumer just crash instead of terminating cleanly ? Does this have any impact ?

Copy link
Contributor

@fpacifici fpacifici Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what i can tell this also happens in other consumers, but much less frequently. i plainly don't understand why span buffer is more affected.

So are you saying that, at times, in some consumers, the memebr id is revoked before running the revocation callback ?And that we do not know why ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are you saying that, at times, in some consumers, the memebr id is revoked before running the revocation callback ?And that we do not know why ?

yes that's the issue I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a metric to check whether, when we close the consumer, the member_id is populated? While it may be possible that rdkafka still thinks it has a member id thus the field is still populated,, maybe we can see what is actually happening. I think it is worth a try.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other questions.

Aside from your implementation, I have a suggestion to get to the bottom of this.
We have a log line right before we close the consumer and one at the beginning of the partition revoked callback.
Do you see a long delay between the two ?

Comment on lines +280 to +285
try:
self.__processing_strategy.join(self.__join_timeout)
self.__metrics_buffer.incr_timing(
"arroyo.consumer.join.time", time.time() - start_join
)
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code may raise MessageRejected. If that happens we would skip these lines

self.__consumer.close()
 self.__processor_factory.shutdown()

at line 530

Are you sure that is not a problem ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are supposed to raise MessageRejected in join(). if join calls submit and it internally raises MessageRejected, join() is supposed to handle it. empirically this is true because the code you are reviewing has been part of partition revocation before and we've not seen MessageRejected crashes there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if join calls submit and it internally raises MessageRejected, join() is supposed to handle it. empirically this is true because the code you are reviewing has been part of partition revocation before and we've not seen MessageRejected crashes there

I agree they should not. Could you please do a quick check if this is happening on custom strategies. It should be a quick search.

@untitaker
Copy link
Member Author

untitaker commented Jun 30, 2025

We have a log line right before we close the consumer and one at the beginning of the partition revoked callback.
Do you see a long delay between the two ?

no they happen within the same second, join() itself also doesn't take a long time

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please consider doing the check on custom join methods before merging.

@untitaker
Copy link
Member Author

please consider doing the check on custom join methods before merging.

checked getsentry sources and also searched for MessageRejected errors in our sentry project

@untitaker untitaker merged commit 7cb0b8c into main Jul 2, 2025
15 checks passed
@untitaker untitaker deleted the shutdown-strategy-before-consumer branch July 2, 2025 18:07
@untitaker
Copy link
Member Author

Could you please add a metric to check whether, when we close the consumer, the member_id is populated? While it may be possible that rdkafka still thinks it has a member id thus the field is still populated,, maybe we can see what is actually happening. I think it is worth a try.

I missed this comment. I will do this before bumping arroyo

@untitaker
Copy link
Member Author

#468

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants