Skip to content

Conversation

evanh
Copy link
Member

@evanh evanh commented Jul 18, 2025

The rdkafka library has two times when it will return error messages: if the message fails to be added to the produce buffer, and if the message fails to be flushed out of the buffer. Currently only the first case was being handled.

When a message is sent, attach a channel using the DeliveryOpaque concept. When the message is flushed from the buffer, the context callback is called. Use that channel to forward on the success or failure to the send function.

The send function will now wait on the receive channel for the message to be sent. If it takes longer than 5 seconds, a new error ProduceWaitTimeout will be sent.

Since this send function is already called inside a synchronous thread, this won't block other messages from producing.

Depends on some concepts added in #472

evanh added 2 commits July 18, 2025 11:28
This adds the error codes to the producer failure metric in Rust. Some new producer errors were also
added that explicitly deal with producing (similar to the consumer errors).
Base automatically changed from evanh/fix/add-error-codes-rust-producer to main July 21, 2025 16:24
@@ -118,9 +117,9 @@ impl ArroyoProducer<KafkaPayload> for KafkaProducer {
// If the producer fails to flush the message out of the buffer, the delivery callback will send an error
// on the channel.
// match rx.recv_timeout(Duration::from_secs(5)) {
match rx.recv() {
match rx.recv_timeout(Duration::from_secs(5)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the record i think we should make the produce function here return a future, so to align it with python. but it's not that important and we can postpone it

@evanh evanh changed the title WIP: fix(produce): Ensure produce waits for buffer flush fix(produce): Ensure produce waits for buffer flush Jul 22, 2025
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address my question before merging.

// If the producer fails to flush the message out of the buffer, the delivery callback will send an error
// on the channel.
// match rx.recv_timeout(Duration::from_secs(5)) {
match rx.recv_timeout(Duration::from_secs(5)) {
Copy link
Contributor

@fpacifici fpacifici Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something or this is making the produce call synchronous and make it return only when the message has been sent to Kafka?
That would not be acceptable. We rely on produce to be asynchronous for throughput.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the current API contract. in the produce step we then use a threadpool to wait on those functions. indeed it would be better (and closer to the python version) to have this function async but i don't think it has significant implications for the runtime. maybe we should run snuba locally though to see that it works before deploying to S4S

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that we wanted this to be synchronous? How else would we track errors when producing properly?

Copy link
Contributor

@fpacifici fpacifici Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me see if I understood this correctly:

  • The Arroyo producer produce step becomes synchronous here. That does not behave like the rdkafka one.
  • But we use the PRoducer strategy to produce in the consumers and that manages produciton in a separate thread so that would absorb the latency.

Is that correct?

If yes I think there are still problems:

  • If we are not in a consumer, we would not use the Producer strategy thus arroyo produce would become synchronous. The Producer strategy can only be used in consumers. This is not ok.
  • In high throughput consumer we can produce thousands of messages in each batch that goes to kafka.
    • We would keep a channel open for each one of these thousands messages.
    • I doubt this is not going to be an performance issue. For a produce batch we will have thousands of tasks on the multithread pool waiting for results. This is likely going to create a massive amount of context switches even if the producer releases the thread while waiting (don't know how the wait call is implemented under the hoods).
  • uptime checker does not use the Producer strategy https://github.com/getsentry/uptime-checker/blob/main/src/producer/kafka_producer.rs
  • Snuba does not use the producer strategy https://github.com/getsentry/snuba/blob/master/rust_snuba/src/strategies/commit_log.rs#L130. Fair, they run the commit log producer in a separate thread. That would change the behavior nevertheless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, i didn't know the uptime checker code. i guess it has to happen.

Copy link
Contributor

@fpacifici fpacifici Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that we wanted this to be synchronous? How else would we track errors when producing properly?

Produce should never be synchronous. That's what allows kafka to be fast. Production and consumption are done in batches asynchronously. You fill in a buffer toi produce and rdkafka sends it in the background.
In order to be notified about the result of an operation we use callbacks from rdkafka both in python and rust. That's how we decide when to commit a message in the indexer for example. We commit only when we get a positive confirmation the message was taken by kafka via the callback.

Is there an issue here in recording metrics from the producer callback ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I can record the metrics in the callback, that's not hard. I only went through this process because it was my understanding that this behaviour was a bug not a feature. I will close this PR and open a new one.

@evanh
Copy link
Member Author

evanh commented Jul 22, 2025

This was fixing the wrong problem. The issue is not that the ArroyoProducer is asynchronous, but that the ProduceStrategy is asynchronous. This change would almost certainly cause performance issues.

Closing this so we can fix the other issue.

@evanh evanh closed this Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants