-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Parquet writer - reduce memory usage of order-preserving write #10756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…threads if the total amount of cached data is exceeded
… threads waiting to execute tasks
… on available memory
…en it is no longer being granted
…buffers in case we are re-initializing the same append state across column data collections
1 task
This was referenced Feb 23, 2024
1 task
krlmlr
added a commit
to duckdb/duckdb-r
that referenced
this pull request
Mar 15, 2024
Merge pull request duckdb/duckdb#10658 from hannes/csvpathlength Merge pull request duckdb/duckdb#10756 from Mytherin/preserveinsertionordermemory Merge pull request duckdb/duckdb#10746 from samansmink/enable-azure-autoload Merge pull request duckdb/duckdb#10747 from maiadegraaf/list_reverse_bug Merge pull request duckdb/duckdb#10748 from taniabogatsch/capi-tests Merge pull request duckdb/duckdb#10739 from peterboncz/pb/immmedate_mode_only_in_non_autocommit Merge pull request duckdb/duckdb#10688 from Tmonster/union_exclude Merge pull request duckdb/duckdb#10710 from samansmink/comment-on-column Merge pull request duckdb/duckdb#10725 from hawkfish/fuzzer-preceding-frame Merge pull request duckdb/duckdb#10723 from hawkfish/fuzzer-null-timestamp Merge pull request duckdb/duckdb#10436 from taniabogatsch/map-fixes Merge pull request duckdb/duckdb#10587 from kryonix/main Merge pull request duckdb/duckdb#10738 from TinyTinni/fix-assert-in-iscntrl Merge pull request duckdb/duckdb#10708 from carlopi/ci_fixes Merge pull request duckdb/duckdb#10726 from hawkfish/fuzzer-to-weeks Merge pull request duckdb/duckdb#10727 from hawkfish/fuzzer-window-bind Merge pull request duckdb/duckdb#10733 from TinyTinni/remove-static-string Merge pull request duckdb/duckdb#10715 from Tishj/python_tpch_regression_rework Merge pull request duckdb/duckdb#10728 from hawkfish/fuzzer-argminmax-decimal Merge pull request duckdb/duckdb#10717 from carlopi/fix_extension_deploy Merge pull request duckdb/duckdb#10694 from Mytherin/castquerylocation Merge pull request duckdb/duckdb#10448 from peteraisher/feature/use-assertThrows-for-jdbc-tests Merge pull request duckdb/duckdb#10691 from Mytherin/issue10685 Merge pull request duckdb/duckdb#10684 from Mytherin/distincton Merge pull request duckdb/duckdb#9539 from Tishj/timestamp_unit_to_tz Merge pull request duckdb/duckdb#10341 from Tmonster/tpch_ingestion_benchmark Merge pull request duckdb/duckdb#10689 from Mytherin/juliaversion Merge pull request duckdb/duckdb#10669 from Mytherin/skippedtests Merge pull request duckdb/duckdb#10679 from Tishj/reenable_window_rows_overflow Merge pull request duckdb/duckdb#10672 from carlopi/wasm_extensions_ci Merge pull request duckdb/duckdb#10660 from szarnyasg/update-storage-info-for-v0100 Merge pull request duckdb/duckdb#10643 from bleskes/duck_transaction_o11y Merge pull request duckdb/duckdb#10654 from carlopi/fix_10548 Merge pull request duckdb/duckdb#10650 from hannes/noprintf Merge pull request duckdb/duckdb#10649 from Mytherin/explicitenumnumbers
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR reduces the maximum memory usage of writing Parquet files with order-preservation enabled in parallel.
The way the order-preserving parallel Parquet writer works is that it materializes batches of data based on their batch index, then repartitions them into the desired row group sizes, and finally writes the batches out to disk (implemented in #7375).
The problem here is that writing is disconnected from the materialization of batches - and only a single thread can write to the file at the same time. If the other threads materialize data faster than the single thread can write, memory usage will keep on growing, as the other threads are not prevented from accumulating more data in their buffers. In this situation, memory can keep on increasing until the entire Parquet file is read. This is a problem for Parquet files whose uncompressed contents do not fit in memory.
In this PR we address this issue by adding a backpressure mechanism to the batch copy. We use the temporary memory manager to reserve space for the materialized batches (with a maximum of 25% of the memory limit). As threads materialize data, we use the size of the respective ColumnDataCollections (through a new method
AllocationSize
) to keep track of how much memory we have gathered. If we exceed the available memory, we block the thread from continuing to materialize data. Halted threads may still help with repartitioning and preparing batches throughExecuteTasks
, however.The thread that is processing the minimum batch index can always continue - as the data that that thread is materializing can be written to disk immediately. Note that we do not yet deal with this in a nice way if a single batch index contains loads of data as we currently only repartition & flush after the batch index has been exhausted. However, this scenario is relatively rare in practice and can only really happen when streaming directly from Parquet files that have been written with very large row groups.
Max Threads
This PR adds a new
MaxThreads
function to thePhysicalSink
. This can be used to limit the parallelism of a pipeline at the sink level. This is used for the order-preserving write as processing data with many threads and a low memory footprint is actually counter-productive and will lead to lower performance than using fewer threads, as well as a lower memory footprint. In the batch copy as a heuristic we require at least4MB
per column per thread of available memory. The amount of threads can be scaled down automatically when too little memory is available.Tests & Benchmarks
Below are some tests and benchmarks:
Lineitem SF10 Parquet (2.5GB compressed, 5.4GB uncompressed, 10GB CSV)
ClickBench Hits Parquet (14GB compressed, 36GB uncompressed, 80GB CSV)