Skip to content

Conversation

lnkuiper
Copy link
Contributor

This PR fixes a bunch of allocation-related issues.

Jemalloc

We use jemalloc pretty much out-of-the-box, which is very efficient, but it caused DuckDB to use more than it should. I've now set the decay rate of jemalloc's pages to 1 second rather than the default 10 seconds. This should reduce memory usage mostly for long-running queries.

I've also configured jemalloc to use one arena per thread, and we now flush thread-local caches and purge thread-local arenas after every task that takes longer than 100ms. This is needed because the decay discussed above is triggered by doing allocations or when threads exit. DuckDB's threads do not exit, but stay idle when no queries are run, so, currently, some of jemalloc's allocations are never cleaned up after queries are done.

One clear downside to this approach is when a specific query performs many small tasks that run for just over 100ms. This is a rare case because DuckDB is made for bulk processing, so tasks should run for longer, not triggering a flush very often. In all of our regression tests, this only really regresses one query, 07c in the IMDB benchmark (I hope that's ok?).

Buffer size counting

I've found two places where we didn't route ColumnDataCollection allocations through the buffer manager, namely in the local states of the parquet writer and in the BatchedDataCollection of the PhysicalLimit operator. I've ensured we now count these buffers to our total memory usage, and we shouldn't go over the memory limit there anymore.

Redundant string copying

The parquet writer does dictionary compression when writing strings, and builds a dictionary when creating a row group.

We used to not copy over strings when creating the row group from a ColumnDataCollection, but we found out we needed to because the strings weren't guaranteed to be in memory (because we were using a buffer-managed ColumnDataCollection).

However, we've switched to an in-memory ColumnDataCollection since then (using the BufferAllocator so we can count the size!), so the strings are now guaranteed to be in memory again! I've removed the redundant string copying, which reduces the parquet writer's memory usage and improves performance when writing strings.

Other

I've replaced a bunch of duckdb::unique_ptr with unique_ptr in the parquet extension as we always use the duckdb namespace. This was a side-effect of switching to our own safer unique_ptr implementation.

@lnkuiper
Copy link
Contributor Author

lnkuiper commented May 26, 2023

@carlopi had an idea that we could have a budget of 100ms rather than the fixed 100ms for a task. So, after 2x50ms tasks it will purge. I like the idea, but this regresses imdb q07c more.

Not sure what's best, since this pesky 07c is the only query that really regresses with these thread-local purges

@hannes
Copy link
Member

hannes commented May 30, 2023

I'm okay with that regression. @Mytherin?

@Mytherin
Copy link
Collaborator

I think that's fine as well

Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! LGTM. Some comments below. I agree that it is likely better to run this every n milliseconds instead of only after a single task running more than 100ms - with perhaps a higher threshold. For example instead of only after tasks that exceed 100ms, after executing tasks totalling >250ms.

An alternative would be to look at the arena size somehow, e.g. using thread.allocated. That might be even better as just because a task is long-running, does not necessarily mean a lot of memory is allocated (e.g. we could be aggregating a large data source into a small hash table).

shared_ptr<Task> task;
// loop until the marker is set to false
while (*marker) {
// wait for a signal with a timeout
queue->semaphore.wait();
if (queue->q.try_dequeue(task)) {
const auto timestamp_before_task = CurrentTimeMS();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use Timestamp::GetCurrentTimestamp here?

void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
#ifndef DUCKDB_NO_THREADS
constexpr static int64_t TASK_DURATION_FLUSH_THRESHOLD_MS = 100;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be a configuration parameter - with also an optional way to disable it?

@lnkuiper
Copy link
Contributor Author

I'm now using thread.peak.read rather than thread.allocated because this can be reset with thread.peak.reset.

I've set the flush threshold to 134217728 (1 << 27), and now we don't seem to be regressing on IMDB 07c anymore (at least, locally). Hopefully no other regressions in CI

Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes! LGTM. Ready to merge after CI passes.

@lnkuiper
Copy link
Contributor Author

lnkuiper commented Jun 5, 2023

This is ready to go, the one fail is the regression test we OK'd

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants