Skip to content

Conversation

fpacifici
Copy link
Contributor

THis is meant to troubleshoot SENTRY-3N9F

https://sentry.sentry.io/issues/6268915744/?project=1&query=server_name%3Agetsentry-consumer-ingest-events%2A&referrer=issue-stream
All the evidence seems to point to a scenario where we do not cleanup the state of the consumer properly between a partition revocation and partition assignemnts whenm the consumer is paused.

Logs from the issue

Aug 8, 7:58:50.629 AM   Caught exception, shutting down...
Aug 8, 7:58:47.495 AM   Member id: 'rdkafka-1e9f43ea-923b-4406-aac2-3a97c7cd8673'
Aug 8, 7:58:47.494 AM   New partitions assigned: {Partition(topic=Topic(name='ingest-occurrences'), index=8): 258338255, Partition(topic=Topic(name='ingest-occurrences'), index=9): 254545584, Partition(topic=Topic(name='ingest-occurrences'), index=10): 266927758}
Aug 8, 7:58:47.484 AM   Partition revocation complete.
Aug 8, 7:58:47.482 AM   <arroyo.processing.strategies.run_task_with_multiprocessing.RunTaskWithMultiprocessing object at 0x790dffc686b0> exited successfully
Aug 8, 7:58:34.097 AM   Waiting for <arroyo.processing.strategies.run_task_with_multiprocessing.RunTaskWithMultiprocessing object at 0x790dffc686b0> to exit...
Aug 8, 7:58:34.097 AM   Member id: 'rdkafka-1e9f43ea-923b-4406-aac2-3a97c7cd8673'
Aug 8, 7:58:34.097 AM   Closing <arroyo.processing.strategies.run_task_with_multiprocessing.RunTaskWithMultiprocessing object at 0x790dffc686b0>...
Aug 8, 7:58:34.096 AM   Partitions to revoke: [Partition(topic=Topic(name='ingest-occurrences'), index=12), Partition(topic=Topic(name='ingest-occurrences'), index=13), Partition(topic=Topic(name='ingest-occurrences'), index=14), Partition(topic=Topic(name='ingest-occurrences'), index=15)]
Aug 8, 7:58:28.902 AM   Member id: 'rdkafka-1e9f43ea-923b-4406-aac2-3a97c7cd8673'

The error shows this state at the moment of the exception

paused_partitions = [
Partition(topic=Topic(name='ingest-occurrences'), index=13),
Partition(topic=Topic(name='ingest-occurrences'), index=14),
Partition(topic=Topic(name='ingest-occurrences'), index=15),
Partition(topic=Topic(name='ingest-occurrences'), index=12)
]
carried over message for partition 15

Which should be impossible as different partitions were assigned before the error.

So this log should at least confirm whether we have a wrong state after partition revocation

@fpacifici fpacifici requested review from a team as code owners August 8, 2025 23:04
Copy link

sentry-io bot commented Aug 8, 2025

🔍 Existing Issues For Review

Your pull request is modifying functions with the following pre-existing issues:

📄 File: arroyo/backends/kafka/consumer.py

Function Unhandled Issue
assignment_callback KafkaException: KafkaError{code=_DESTROY,val=-197,str="Failed to get committed offsets: Local: Broker handle destroyed"} ...
Event Count: 1

Did you find this useful? React with a 👍 or 👎

@fpacifici fpacifici merged commit cad6b54 into main Aug 8, 2025
15 checks passed
@fpacifici fpacifici deleted the fpacifici/fix_pause_error branch August 8, 2025 23:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants