Skip to content

Conversation

Mytherin
Copy link
Collaborator

This PR reduces the maximum memory usage of writing Parquet files with order-preservation enabled in parallel.

The way the order-preserving parallel Parquet writer works is that it materializes batches of data based on their batch index, then repartitions them into the desired row group sizes, and finally writes the batches out to disk (implemented in #7375).

The problem here is that writing is disconnected from the materialization of batches - and only a single thread can write to the file at the same time. If the other threads materialize data faster than the single thread can write, memory usage will keep on growing, as the other threads are not prevented from accumulating more data in their buffers. In this situation, memory can keep on increasing until the entire Parquet file is read. This is a problem for Parquet files whose uncompressed contents do not fit in memory.

In this PR we address this issue by adding a backpressure mechanism to the batch copy. We use the temporary memory manager to reserve space for the materialized batches (with a maximum of 25% of the memory limit). As threads materialize data, we use the size of the respective ColumnDataCollections (through a new method AllocationSize) to keep track of how much memory we have gathered. If we exceed the available memory, we block the thread from continuing to materialize data. Halted threads may still help with repartitioning and preparing batches through ExecuteTasks, however.

The thread that is processing the minimum batch index can always continue - as the data that that thread is materializing can be written to disk immediately. Note that we do not yet deal with this in a nice way if a single batch index contains loads of data as we currently only repartition & flush after the batch index has been exhausted. However, this scenario is relatively rare in practice and can only really happen when streaming directly from Parquet files that have been written with very large row groups.

Max Threads

This PR adds a new MaxThreads function to the PhysicalSink. This can be used to limit the parallelism of a pipeline at the sink level. This is used for the order-preserving write as processing data with many threads and a low memory footprint is actually counter-productive and will lead to lower performance than using fewer threads, as well as a lower memory footprint. In the batch copy as a heuristic we require at least 4MB per column per thread of available memory. The amount of threads can be scaled down automatically when too little memory is available.

Tests & Benchmarks

Below are some tests and benchmarks:

Lineitem SF10 Parquet (2.5GB compressed, 5.4GB uncompressed, 10GB CSV)

Memory Limit v0.10.0 (10T) New 1 Thread preserve_insertion_order=false
500MB OOM 30.5s 57s 8.7s
1GB OOM 17.2s 57s 8.7s
2GB OOM 12.3s 57s 8.7s
3GB OOM 10.8s 57s 8.7s
4GB 8.5s 9.5s 57s 8.7s
5GB 8.7s 9.0s 57s 8.7s

ClickBench Hits Parquet (14GB compressed, 36GB uncompressed, 80GB CSV)

Memory Limit v0.10.0 (10T) New 1 Thread preserve_insertion_order=false
2GB OOM OOM 397s 63s
4GB OOM 197s 397s 63s
8GB OOM 168s 397s 63s
16GB OOM 143s 397s 63s
32GB OOM 113s 397s 63s
64GB OOM 93s 397s 63s
128GB 130s 98s 397s 63s

@Mytherin Mytherin merged commit 0301182 into duckdb:main Feb 20, 2024
@Mytherin Mytherin mentioned this pull request Feb 20, 2024
1 task
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request Mar 15, 2024
Merge pull request duckdb/duckdb#10658 from hannes/csvpathlength
Merge pull request duckdb/duckdb#10756 from Mytherin/preserveinsertionordermemory
Merge pull request duckdb/duckdb#10746 from samansmink/enable-azure-autoload
Merge pull request duckdb/duckdb#10747 from maiadegraaf/list_reverse_bug
Merge pull request duckdb/duckdb#10748 from taniabogatsch/capi-tests
Merge pull request duckdb/duckdb#10739 from peterboncz/pb/immmedate_mode_only_in_non_autocommit
Merge pull request duckdb/duckdb#10688 from Tmonster/union_exclude
Merge pull request duckdb/duckdb#10710 from samansmink/comment-on-column
Merge pull request duckdb/duckdb#10725 from hawkfish/fuzzer-preceding-frame
Merge pull request duckdb/duckdb#10723 from hawkfish/fuzzer-null-timestamp
Merge pull request duckdb/duckdb#10436 from taniabogatsch/map-fixes
Merge pull request duckdb/duckdb#10587 from kryonix/main
Merge pull request duckdb/duckdb#10738 from TinyTinni/fix-assert-in-iscntrl
Merge pull request duckdb/duckdb#10708 from carlopi/ci_fixes
Merge pull request duckdb/duckdb#10726 from hawkfish/fuzzer-to-weeks
Merge pull request duckdb/duckdb#10727 from hawkfish/fuzzer-window-bind
Merge pull request duckdb/duckdb#10733 from TinyTinni/remove-static-string
Merge pull request duckdb/duckdb#10715 from Tishj/python_tpch_regression_rework
Merge pull request duckdb/duckdb#10728 from hawkfish/fuzzer-argminmax-decimal
Merge pull request duckdb/duckdb#10717 from carlopi/fix_extension_deploy
Merge pull request duckdb/duckdb#10694 from Mytherin/castquerylocation
Merge pull request duckdb/duckdb#10448 from peteraisher/feature/use-assertThrows-for-jdbc-tests
Merge pull request duckdb/duckdb#10691 from Mytherin/issue10685
Merge pull request duckdb/duckdb#10684 from Mytherin/distincton
Merge pull request duckdb/duckdb#9539 from Tishj/timestamp_unit_to_tz
Merge pull request duckdb/duckdb#10341 from Tmonster/tpch_ingestion_benchmark
Merge pull request duckdb/duckdb#10689 from Mytherin/juliaversion
Merge pull request duckdb/duckdb#10669 from Mytherin/skippedtests
Merge pull request duckdb/duckdb#10679 from Tishj/reenable_window_rows_overflow
Merge pull request duckdb/duckdb#10672 from carlopi/wasm_extensions_ci
Merge pull request duckdb/duckdb#10660 from szarnyasg/update-storage-info-for-v0100
Merge pull request duckdb/duckdb#10643 from bleskes/duck_transaction_o11y
Merge pull request duckdb/duckdb#10654 from carlopi/fix_10548
Merge pull request duckdb/duckdb#10650 from hannes/noprintf
Merge pull request duckdb/duckdb#10649 from Mytherin/explicitenumnumbers
@Mytherin Mytherin deleted the preserveinsertionordermemory branch July 5, 2024 11:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant