-
-
Notifications
You must be signed in to change notification settings - Fork 7
fix(produce): Ensure produce waits for buffer flush #473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This adds the error codes to the producer failure metric in Rust. Some new producer errors were also added that explicitly deal with producing (similar to the consumer errors).
@@ -118,9 +117,9 @@ impl ArroyoProducer<KafkaPayload> for KafkaProducer { | |||
// If the producer fails to flush the message out of the buffer, the delivery callback will send an error | |||
// on the channel. | |||
// match rx.recv_timeout(Duration::from_secs(5)) { | |||
match rx.recv() { | |||
match rx.recv_timeout(Duration::from_secs(5)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the record i think we should make the produce function here return a future, so to align it with python. but it's not that important and we can postpone it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address my question before merging.
// If the producer fails to flush the message out of the buffer, the delivery callback will send an error | ||
// on the channel. | ||
// match rx.recv_timeout(Duration::from_secs(5)) { | ||
match rx.recv_timeout(Duration::from_secs(5)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I missing something or this is making the produce
call synchronous and make it return only when the message has been sent to Kafka?
That would not be acceptable. We rely on produce to be asynchronous for throughput.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is the current API contract. in the produce step we then use a threadpool to wait on those functions. indeed it would be better (and closer to the python version) to have this function async but i don't think it has significant implications for the runtime. maybe we should run snuba locally though to see that it works before deploying to S4S
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was that we wanted this to be synchronous? How else would we track errors when producing properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me see if I understood this correctly:
- The Arroyo producer
produce
step becomes synchronous here. That does not behave like the rdkafka one. - But we use the PRoducer strategy to produce in the consumers and that manages produciton in a separate thread so that would absorb the latency.
Is that correct?
If yes I think there are still problems:
- If we are not in a consumer, we would not use the Producer strategy thus arroyo produce would become synchronous. The
Producer
strategy can only be used in consumers. This is not ok. - In high throughput consumer we can produce thousands of messages in each batch that goes to kafka.
-
- We would keep a channel open for each one of these thousands messages.
-
- I doubt this is not going to be an performance issue. For a produce batch we will have thousands of tasks on the multithread pool waiting for results. This is likely going to create a massive amount of context switches even if the producer releases the thread while waiting (don't know how the wait call is implemented under the hoods).
- uptime checker does not use the Producer strategy https://github.com/getsentry/uptime-checker/blob/main/src/producer/kafka_producer.rs
- Snuba does not use the producer strategy https://github.com/getsentry/snuba/blob/master/rust_snuba/src/strategies/commit_log.rs#L130. Fair, they run the commit log producer in a separate thread. That would change the behavior nevertheless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, i didn't know the uptime checker code. i guess it has to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was that we wanted this to be synchronous? How else would we track errors when producing properly?
Produce should never be synchronous. That's what allows kafka to be fast. Production and consumption are done in batches asynchronously. You fill in a buffer toi produce and rdkafka sends it in the background.
In order to be notified about the result of an operation we use callbacks from rdkafka both in python and rust. That's how we decide when to commit a message in the indexer for example. We commit only when we get a positive confirmation the message was taken by kafka via the callback.
Is there an issue here in recording metrics from the producer callback ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I can record the metrics in the callback, that's not hard. I only went through this process because it was my understanding that this behaviour was a bug not a feature. I will close this PR and open a new one.
This was fixing the wrong problem. The issue is not that the ArroyoProducer is asynchronous, but that the ProduceStrategy is asynchronous. This change would almost certainly cause performance issues. Closing this so we can fix the other issue. |
The rdkafka library has two times when it will return error messages: if the message fails to be added to the produce buffer, and if the message fails to be flushed out of the buffer. Currently only the first case was being handled.
When a message is sent, attach a channel using the
DeliveryOpaque
concept. When the message is flushed from the buffer, the context callback is called. Use that channel to forward on the success or failure to thesend
function.The
send
function will now wait on the receive channel for the message to be sent. If it takes longer than 5 seconds, a new errorProduceWaitTimeout
will be sent.Since this
send
function is already called inside a synchronous thread, this won't block other messages from producing.Depends on some concepts added in #472