Skip to content

feat(ingestion): conversion events buffer consumer #9432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 26, 2022

Conversation

yakkomajuri
Copy link
Contributor

@yakkomajuri yakkomajuri commented Apr 15, 2022

Problem

#9182

Changes

Follow up to #9427.

Implements the consumer for the buffer.

This is slightly tricky. Still needs tests.

How did you test this code?

Added tests + ran manually.

To run manually:

  1. I set CONVERSION_BUFFER_ENABLED=1 and BUFFER_CONVERSION_SECONDS=30
  2. I added logging at the producer and consumer
  3. I sent events that would go to the buffer
  4. The logs confirmed the flow:
    • Event is sent to buffer
    • Event is consumed from buffer
    • DelayProcessing is thrown
    • Consumer sleeps
    • Consumer comes back up
    • Event is ingested correctly

@yakkomajuri yakkomajuri requested a review from tiina303 April 18, 2022 16:52
}
}

await Promise.all(promises)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should commit the offsets right after this, currently we do the work, then try to set up the pausing, then commit offsets, if we die while sleeping we'd try to process them again & we should try to minimize the amount of time after processing something and committing offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagged you in another comment.

Also, we don't synchronously sleep. setTimeout runs in the background

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure based on all you say it should work, though I'd find it easier to read if we had await commitOffsetsIfNecessary() right after this line unless there's a reason not to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a good reason - if I run commitOffsetsIfNecessary it will move the offset past this entire batch. We rely on throw new DelayProcessing() to commit only the messages we processed via resolveOffset and not the entire batch.

I should however do some thinking about the problems of running Promise.all with this. This might be more robust if done in order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the way to go is to do these in order and set partitionsConsumedConcurrently

for (const message of batch.messages) {
// kafka timestamps are unix timestamps in string format
const processAt = Number(message.timestamp) + this.pluginsServer.BUFFER_CONVERSION_SECONDS * 1000
const delayUntilTimeToProcess = processAt - new Date().getTime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of keeping track of the max delay seconds lets track the latest processAt (so we can sleep less later as this processing and the promise.all later will take some time too). Furthermore once we start not processing we shouldn't process any other messages later probably either?

how efficient is using new Date().getTime() vs doing that once in the front and using a variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how efficient is using new Date().getTime() vs doing that once in the front and using a variable?

This is negligible. I've nevertheless moved it to Date.now() which is faster and cleaner.

Instead of keeping track of the max delay seconds lets track the latest processAt (so we can sleep less later as this processing and the promise.all later will take some time too). Furthermore once we start not processing we shouldn't process any other messages later probably either?

I'd rather sleep more actually. First, note that we're sleeping per partition. Even if a given partition is sleeping we can still consumer from others.

The goal with sleeping longer is to maximize our chances of pulling a full batch we can process synchronously in the future.

Consider we get a batch:

event , ts
--------------
event0, 0
event1, 2
event3, 40
event4, 55

*now == 0

Kafka will feed us batches, so imagine I pull a batch with all the events above. I can process event0, and then I sleep 2s and pull the next 3 events from Kafka again. Now I sleep some more and pull event3 and event4. I'm incurring the overhead of pulling and filtering the batch multiple times when I could have just moved on to other partitions and pull a whole batch from this one when I'm more certain I'll be able to process a larger batch.

By sleeping for the maximum possible value in a batch, I also increase the chance that after the consumer is back up and gets the assignment, I'll be able to process event5 and event6 (etc etc) as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No sorry I didn't explain myself well in the proposal for "Instead of keeping track of the max delay seconds lets track the latest processAt"

I'll modify your example a bit:

event , ts
--------------
event0, 0
event1, 5
event3, 40
event4, 55

*now == 2

the idea was to:

  1. get max ts, which in your example would be 55
  2. At the point of sleep calculate the sleep time, e.g. maybe a second has passed, so now instead of sleeping 53s (55-2) we'd only sleep 52s (55-3).

Once we wake from sleep we'll be able to process all events and we're less likely to over sleep.

This isn't super important probably ... depends on how resource constrained the plugin-server is etc.

yakkomajuri and others added 2 commits April 25, 2022 12:48
Co-authored-by: Tiina Turban <tiina303@gmail.com>
Base automatically changed from events-buffer to master April 25, 2022 14:10
@yakkomajuri
Copy link
Contributor Author

Updated description to add testing steps

@yakkomajuri yakkomajuri merged commit 0a744d8 into master Apr 26, 2022
@yakkomajuri yakkomajuri deleted the events-buffer-consumer-2 branch April 26, 2022 12:44
fuziontech added a commit that referenced this pull request Apr 28, 2022
* master: (137 commits)
  feat(cohorts): add cohort filter grammars (#9540)
  feat(cohorts): Backwards compatibility of groups and properties (#9462)
  perf(ingestion): unsubscribe from buffer topic while no events are produced to it (#9556)
  fix: Fix `Loading` positioning and `LemonButton` disabled state (#9554)
  test: Speed up backend tests (#9289)
  fix: LemonSpacer -> LemonDivider (#9549)
  feat(funnels): Highlight significant deviations in new funnel viz (#9536)
  docs(storybook): Lemon UI (#9426)
  feat: add support for list of teams to enable the conversion buffer for (#9542)
  chore(onboarding): cleanup framework grid experiment (#9527)
  fix(signup): domain provisioning on cloud (#9515)
  chore: split out async migrations ci (#9539)
  feat(ingestion): enable json ingestion for self-hosted by default (#9448)
  feat(cohort): add all cohort filter selectors to Storybook (#9492)
  feat(ingestion): conversion events buffer consumer (#9432)
  ci(run-backend-tests): remove CH version default (#9532)
  feat: Add person info to events (#9404)
  feat(ingestion): produce to buffer partitioned by team_id:distinct_id (#9518)
  fix: bring latest_migrations.manifest up to date (#9525)
  chore: removes unused feature flag (#9529)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants