-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(ingestion): conversion events buffer consumer #9432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
} | ||
} | ||
|
||
await Promise.all(promises) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should commit the offsets right after this, currently we do the work, then try to set up the pausing, then commit offsets, if we die while sleeping we'd try to process them again & we should try to minimize the amount of time after processing something and committing offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tagged you in another comment.
Also, we don't synchronously sleep. setTimeout
runs in the background
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure based on all you say it should work, though I'd find it easier to read if we had await commitOffsetsIfNecessary()
right after this line unless there's a reason not to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a good reason - if I run commitOffsetsIfNecessary
it will move the offset past this entire batch. We rely on throw new DelayProcessing()
to commit only the messages we processed via resolveOffset
and not the entire batch.
I should however do some thinking about the problems of running Promise.all
with this. This might be more robust if done in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think the way to go is to do these in order and set partitionsConsumedConcurrently
for (const message of batch.messages) { | ||
// kafka timestamps are unix timestamps in string format | ||
const processAt = Number(message.timestamp) + this.pluginsServer.BUFFER_CONVERSION_SECONDS * 1000 | ||
const delayUntilTimeToProcess = processAt - new Date().getTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of keeping track of the max delay seconds lets track the latest processAt (so we can sleep less later as this processing and the promise.all later will take some time too). Furthermore once we start not processing we shouldn't process any other messages later probably either?
how efficient is using new Date().getTime()
vs doing that once in the front and using a variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how efficient is using new Date().getTime() vs doing that once in the front and using a variable?
This is negligible. I've nevertheless moved it to Date.now()
which is faster and cleaner.
Instead of keeping track of the max delay seconds lets track the latest processAt (so we can sleep less later as this processing and the promise.all later will take some time too). Furthermore once we start not processing we shouldn't process any other messages later probably either?
I'd rather sleep more actually. First, note that we're sleeping per partition. Even if a given partition is sleeping we can still consumer from others.
The goal with sleeping longer is to maximize our chances of pulling a full batch we can process synchronously in the future.
Consider we get a batch:
event , ts
--------------
event0, 0
event1, 2
event3, 40
event4, 55
*now == 0
Kafka will feed us batches, so imagine I pull a batch with all the events above. I can process event0
, and then I sleep 2s and pull the next 3 events from Kafka again. Now I sleep some more and pull event3
and event4
. I'm incurring the overhead of pulling and filtering the batch multiple times when I could have just moved on to other partitions and pull a whole batch from this one when I'm more certain I'll be able to process a larger batch.
By sleeping for the maximum possible value in a batch, I also increase the chance that after the consumer is back up and gets the assignment, I'll be able to process event5
and event6
(etc etc) as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No sorry I didn't explain myself well in the proposal for "Instead of keeping track of the max delay seconds lets track the latest processAt"
I'll modify your example a bit:
event , ts
--------------
event0, 0
event1, 5
event3, 40
event4, 55
*now == 2
the idea was to:
- get max
ts
, which in your example would be 55 - At the point of sleep calculate the sleep time, e.g. maybe a second has passed, so now instead of sleeping 53s (55-2) we'd only sleep 52s (55-3).
Once we wake from sleep we'll be able to process all events and we're less likely to over sleep.
This isn't super important probably ... depends on how resource constrained the plugin-server is etc.
Co-authored-by: Tiina Turban <tiina303@gmail.com>
Updated description to add testing steps |
* master: (137 commits) feat(cohorts): add cohort filter grammars (#9540) feat(cohorts): Backwards compatibility of groups and properties (#9462) perf(ingestion): unsubscribe from buffer topic while no events are produced to it (#9556) fix: Fix `Loading` positioning and `LemonButton` disabled state (#9554) test: Speed up backend tests (#9289) fix: LemonSpacer -> LemonDivider (#9549) feat(funnels): Highlight significant deviations in new funnel viz (#9536) docs(storybook): Lemon UI (#9426) feat: add support for list of teams to enable the conversion buffer for (#9542) chore(onboarding): cleanup framework grid experiment (#9527) fix(signup): domain provisioning on cloud (#9515) chore: split out async migrations ci (#9539) feat(ingestion): enable json ingestion for self-hosted by default (#9448) feat(cohort): add all cohort filter selectors to Storybook (#9492) feat(ingestion): conversion events buffer consumer (#9432) ci(run-backend-tests): remove CH version default (#9532) feat: Add person info to events (#9404) feat(ingestion): produce to buffer partitioned by team_id:distinct_id (#9518) fix: bring latest_migrations.manifest up to date (#9525) chore: removes unused feature flag (#9529) ...
Problem
#9182
Changes
Follow up to #9427.
Implements the consumer for the buffer.
This is slightly tricky. Still needs tests.
How did you test this code?
Added tests + ran manually.
To run manually:
CONVERSION_BUFFER_ENABLED=1
andBUFFER_CONVERSION_SECONDS=30
DelayProcessing
is thrown