Skip to content

Conversation

Mytherin
Copy link
Owner

@Mytherin Mytherin commented Mar 4, 2024

This PR reworks the hive partitioned write (i.e. COPY with the PARTITION_BY operator) so that writes are flushed to disk every 500K rows instead of waiting to flush everything at the final combine. This reduces memory usage primarily in the case where we are writing a large table to a small number of partitions. In the current implementation we would first gather all data (mostly in the temporary directory) and only then start writing to the output files. With the periodic flushing the data is instead written out to disk in a streaming fashion.

Note that while this is effective for a small number of partitions, it is much less effective for many partitions, as the individual parquet files we write will still need to gather data enough data to fill each row group. Hence memory pressure is still very high in the many-partition scenario, as if you have 10K partitions and a row group size of 120K, you could be caching up to 120K*10K rows per-thread in-memory. A potential work-around for that is to reduce the row-group size.

Below are some timings for the query over the full hits data set (100M rows, 14GB source file, ~80GB in CSV format) writing out to 10 partitions:

copy (select CounterID%10 as partition_key, * from '~/Data/hits.parquet')
TO 'hits_partitioned' (FORMAT PARQUET, PARTITION_BY partition_key);
Version 52GB memory limit, no temp dir 52GB memory limit, temp dir
v0.10.0 OOM 212s
New 53s 53s

@github-actions github-actions bot marked this pull request as draft March 4, 2024 12:17
@Mytherin Mytherin closed this Mar 11, 2024
@Mytherin Mytherin deleted the partitionedwritememory branch June 7, 2024 12:54
Mytherin pushed a commit that referenced this pull request Oct 16, 2024
Mytherin added a commit that referenced this pull request Dec 3, 2024
I was investigating the following crash where a checkpoint task had its
underlying resources being destroyed while it was still running. The two
interesting threads are the following:

```
thread #1, name = 'duckling', stop reason = signal SIGTRAP
    frame #0: 0x0000ffff91bb71ec
    frame #1: 0x0000aaaad73a38e8 duckling`duckdb::InternalException::InternalException(this=<unavailable>, msg=<unavailable>) at exception.cpp:336:2
    frame #2: 0x0000aaaad786eb68 duckling`duckdb::unique_ptr<duckdb::RowGroup, std::default_delete<duckdb::RowGroup>, true>::operator*() const [inlined] duckdb::unique_ptr<duckdb::RowGroup, std::default_delete<duckdb::RowGroup>, true>::AssertNotNull(null=<unavailable>) at unique_ptr.hpp:25:10
    frame #3: 0x0000aaaad786eaf4 duckling`duckdb::unique_ptr<duckdb::RowGroup, std::default_delete<duckdb::RowGroup>, true>::operator*(this=0x0000aaacbb73e008) const at unique_ptr.hpp:34:4
    frame #4: 0x0000aaaad787abbc duckling`duckdb::CheckpointTask::ExecuteTask(this=0x0000aaabec92be60) at row_group_collection.cpp:732:21
    frame #5: 0x0000aaaad7756ea4 duckling`duckdb::BaseExecutorTask::Execute(this=0x0000aaabec92be60, mode=<unavailable>) at task_executor.cpp:72:3
    frame #6: 0x0000aaaad7757e28 duckling`duckdb::TaskScheduler::ExecuteForever(this=0x0000aaaafda30e10, marker=0x0000aaaafda164a8) at task_scheduler.cpp:189:32
    frame #7: 0x0000ffff91a031fc
    frame #8: 0x0000ffff91c0d5c8

thread duckdb#120, stop reason = signal 0
    frame #0: 0x0000ffff91c71c24
    frame #1: 0x0000ffff91e1264c
    frame #2: 0x0000ffff91e01888
    frame #3: 0x0000ffff91e018f8
    frame #4: 0x0000ffff91e01c10
    frame #5: 0x0000ffff91e05afc
    frame #6: 0x0000ffff91e05e70
    frame #7: 0x0000aaaad784b63c duckling`duckdb::RowGroup::~RowGroup() [inlined] std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release(this=<unavailable>) at shared_ptr_base.h:184:10
    frame #8: 0x0000aaaad784b5b4 duckling`duckdb::RowGroup::~RowGroup() [inlined] std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count(this=<unavailable>) at shared_ptr_base.h:705:11
    frame #9: 0x0000aaaad784b5ac duckling`duckdb::RowGroup::~RowGroup() [inlined] std::__shared_ptr<duckdb::ColumnData, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(this=<unavailable>) at shared_ptr_base.h:1154:31
    frame #10: 0x0000aaaad784b5ac duckling`duckdb::RowGroup::~RowGroup() [inlined] duckdb::shared_ptr<duckdb::ColumnData, true>::~shared_ptr(this=<unavailable>) at shared_ptr_ipp.hpp:115:24
    frame #11: 0x0000aaaad784b5ac duckling`duckdb::RowGroup::~RowGroup() [inlined] void std::_Destroy<duckdb::shared_ptr<duckdb::ColumnData, true>>(__pointer=<unavailable>) at stl_construct.h:151:19
    frame duckdb#12: 0x0000aaaad784b5ac duckling`duckdb::RowGroup::~RowGroup() [inlined] void std::_Destroy_aux<false>::__destroy<duckdb::shared_ptr<duckdb::ColumnData, true>*>(__first=<unavailable>, __last=<unavailable>) at stl_construct.h:163:6
    frame duckdb#13: 0x0000aaaad784b5a0 duckling`duckdb::RowGroup::~RowGroup() [inlined] void std::_Destroy<duckdb::shared_ptr<duckdb::ColumnData, true>*>(__first=<unavailable>, __last=<unavailable>) at stl_construct.h:195:7
    frame duckdb#14: 0x0000aaaad784b5a0 duckling`duckdb::RowGroup::~RowGroup() [inlined] void std::_Destroy<duckdb::shared_ptr<duckdb::ColumnData, true>*, duckdb::shared_ptr<duckdb::ColumnData, true>>(__first=<unavailable>, __last=<unavailable>, (null)=<unavailable>) at alloc_traits.h:848:7
    frame duckdb#15: 0x0000aaaad784b5a0 duckling`duckdb::RowGroup::~RowGroup() [inlined] std::vector<duckdb::shared_ptr<duckdb::ColumnData, true>, std::allocator<duckdb::shared_ptr<duckdb::ColumnData, true>>>::~vector(this=<unavailable>) at stl_vector.h:680:2
    frame duckdb#16: 0x0000aaaad784b5a0 duckling`duckdb::RowGroup::~RowGroup(this=<unavailable>) at row_group.cpp:83:1
    frame duckdb#17: 0x0000aaaad786ee18 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] std::default_delete<duckdb::RowGroup>::operator()(this=0x0000aaacbb73e1a8, __ptr=0x0000aaab75ae7860) const at unique_ptr.h:85:2
    frame duckdb#18: 0x0000aaaad786ee10 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() at unique_ptr.h:361:4
    frame duckdb#19: 0x0000aaaad786ee08 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] duckdb::SegmentNode<duckdb::RowGroup>::~SegmentNode(this=0x0000aaacbb73e1a0) at segment_tree.hpp:21:8
    frame duckdb#20: 0x0000aaaad786ee08 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] void std::_Destroy<duckdb::SegmentNode<duckdb::RowGroup>>(__pointer=0x0000aaacbb73e1a0) at stl_construct.h:151:19
    frame duckdb#21: 0x0000aaaad786ee08 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] void std::_Destroy_aux<false>::__destroy<duckdb::SegmentNode<duckdb::RowGroup>*>(__first=0x0000aaacbb73e1a0, __last=0x0000aaacbb751130) at stl_construct.h:163:6
    frame duckdb#22: 0x0000aaaad786ede8 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] void std::_Destroy<duckdb::SegmentNode<duckdb::RowGroup>*>(__first=<unavailable>, __last=0x0000aaacbb751130) at stl_construct.h:195:7
    frame duckdb#23: 0x0000aaaad786ede8 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector() [inlined] void std::_Destroy<duckdb::SegmentNode<duckdb::RowGroup>*, duckdb::SegmentNode<duckdb::RowGroup>>(__first=<unavailable>, __last=0x0000aaacbb751130, (null)=0x0000fffefc81a908) at alloc_traits.h:848:7
    frame duckdb#24: 0x0000aaaad786ede8 duckling`std::vector<duckdb::SegmentNode<duckdb::RowGroup>, std::allocator<duckdb::SegmentNode<duckdb::RowGroup>>>::~vector(this=size=4883) at stl_vector.h:680:2
    frame duckdb#25: 0x0000aaaad7857f74 duckling`duckdb::RowGroupCollection::Checkpoint(this=<unavailable>, writer=<unavailable>, global_stats=0x0000fffefc81a9c0) at row_group_collection.cpp:1017:1
    frame duckdb#26: 0x0000aaaad788f02c duckling`duckdb::DataTable::Checkpoint(this=0x0000aaab04649e70, writer=0x0000aaab6584ea80, serializer=0x0000fffefc81ab38) at data_table.cpp:1427:14
    frame duckdb#27: 0x0000aaaad7881394 duckling`duckdb::SingleFileCheckpointWriter::WriteTable(this=0x0000fffefc81b128, table=0x0000aaab023b78c0, serializer=0x0000fffefc81ab38) at checkpoint_manager.cpp:528:11
    frame duckdb#28: 0x0000aaaad787ece4 duckling`duckdb::SingleFileCheckpointWriter::CreateCheckpoint() [inlined] duckdb::SingleFileCheckpointWriter::CreateCheckpoint(this=<unavailable>, obj=0x0000fffefc81ab38)::$_7::operator()(duckdb::Serializer::List&, unsigned long) const::'lambda'(duckdb::Serializer&)::operator()(duckdb::Serializer&) const at checkpoint_manager.cpp:181:43
    frame duckdb#29: 0x0000aaaad787ecd8 duckling`duckdb::SingleFileCheckpointWriter::CreateCheckpoint() [inlined] void duckdb::Serializer::List::WriteObject<duckdb::SingleFileCheckpointWriter::CreateCheckpoint()::$_7::operator()(duckdb::Serializer::List&, unsigned long) const::'lambda'(duckdb::Serializer&)>(this=<unavailable>, f=(unnamed class) @ 0x0000600002cbd2b0) at serializer.hpp:385:2
    frame duckdb#30: 0x0000aaaad787ecc4 duckling`duckdb::SingleFileCheckpointWriter::CreateCheckpoint() [inlined] duckdb::SingleFileCheckpointWriter::CreateCheckpoint()::$_7::operator()(this=<unavailable>, list=<unavailable>, i=2) const at checkpoint_manager.cpp:181:8
    frame duckdb#31: 0x0000aaaad787ecb0 duckling`duckdb::SingleFileCheckpointWriter::CreateCheckpoint() at serializer.hpp:151:4
    frame duckdb#32: 0x0000aaaad787ec94 duckling`duckdb::SingleFileCheckpointWriter::CreateCheckpoint(this=0x0000fffefc81b128) at checkpoint_manager.cpp:179:13
    frame duckdb#33: 0x0000aaaad78954a8 duckling`duckdb::SingleFileStorageManager::CreateCheckpoint(this=0x0000aaaafe1de140, options=(wal_action = DONT_DELETE_WAL, action = CHECKPOINT_IF_REQUIRED, type = FULL_CHECKPOINT)) at storage_manager.cpp:365:17
    frame duckdb#34: 0x0000aaaad78baac0 duckling`duckdb::DuckTransactionManager::Checkpoint(this=0x0000aaaafe167e00, context=<unavailable>, force=<unavailable>) at duck_transaction_manager.cpp:198:18
    frame duckdb#35: 0x0000aaaad69d02c0 duckling`md::Server::BackgroundCheckpointIfNeeded(this=0x0000aaaafdbfe900) at server.cpp:1983:31
    frame duckdb#36: 0x0000aaaadac5d3f0 duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] std::function<void ()>::operator()(this=<unavailable>) const at std_function.h:590:9
    frame duckdb#37: 0x0000aaaadac5d3e0 duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] md::BackgroundCronTask::Start(unsigned long)::$_0::operator()(this=0x0000aaaafdf169a8) const at background_cron_task.cpp:25:4
    frame duckdb#38: 0x0000aaaadac5d30c duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] void std::__invoke_impl<void, md::BackgroundCronTask::Start(unsigned long)::$_0>((null)=<unavailable>, __f=0x0000aaaafdf169a8) at invoke.h:61:14
    frame duckdb#39: 0x0000aaaadac5d30c duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] std::__invoke_result<md::BackgroundCronTask::Start(unsigned long)::$_0>::type std::__invoke<md::BackgroundCronTask::Start(unsigned long)::$_0>(__fn=0x0000aaaafdf169a8) at invoke.h:96:14
    frame duckdb#40: 0x0000aaaadac5d30c duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] void std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>::_M_invoke<0ul>(this=0x0000aaaafdf169a8, (null)=<unavailable>) at std_thread.h:259:13
    frame duckdb#41: 0x0000aaaadac5d30c duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run() [inlined] std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>::operator()(this=0x0000aaaafdf169a8) at std_thread.h:266:11
    frame duckdb#42: 0x0000aaaadac5d30c duckling`std::thread::_State_impl<std::thread::_Invoker<std::tuple<md::BackgroundCronTask::Start(unsigned long)::$_0>>>::_M_run(this=0x0000aaaafdf169a0) at std_thread.h:211:13
    frame duckdb#43: 0x0000ffff91a031fc
    frame duckdb#44: 0x0000ffff91c0d5c8
```

The problem is that if there's an IO exception being thrown in
`RowGroupCollection::Checkpoint` after some (but not all) checkpoint
tasks have been scheduled but before
`checkpoint_state.executor.WorkOnTasks();` is called, it results in an
InternalException / DuckDB crash as the `Checkpoint ` method does not
wait for the scheduled tasks to have completed before destroying the
referenced resources.
Mytherin added a commit that referenced this pull request Feb 18, 2025
We had two users crash with the following backtrace:

```
    frame #0: 0x0000ffffab2571ec
    frame #1: 0x0000aaaaac00c5fc duckling`duckdb::InternalException::InternalException(this=<unavailable>, msg=<unavailable>) at exception.cpp:328:2
    frame #2: 0x0000aaaaac1ee418 duckling`duckdb::optional_ptr<duckdb::OptimisticDataWriter, true>::CheckValid(this=<unavailable>) const at optional_ptr.hpp:34:11
    frame #3: 0x0000aaaaac1eea8c duckling`duckdb::MergeCollectionTask::Execute(duckdb::PhysicalBatchInsert const&, duckdb::ClientContext&, duckdb::GlobalSinkState&, duckdb::LocalSinkState&) [inlined] duckdb::optional_ptr<duckdb::OptimisticDataWriter, true>::operator*(this=<unavailable>) at optional_ptr.hpp:43:3
    frame #4: 0x0000aaaaac1eea84 duckling`duckdb::MergeCollectionTask::Execute(this=0x0000aaaaf1b06150, op=<unavailable>, context=0x0000aaaba820d8d0, gstate_p=0x0000aaab06880f00, lstate_p=<unavailable>) at physical_batch_insert.cpp:219:90
    frame #5: 0x0000aaaaac1d2e10 duckling`duckdb::PhysicalBatchInsert::Sink(duckdb::ExecutionContext&, duckdb::DataChunk&, duckdb::OperatorSinkInput&) const [inlined] duckdb::PhysicalBatchInsert::ExecuteTask(this=0x0000aaaafa62ab40, context=<unavailable>, gstate_p=0x0000aaab06880f00, lstate_p=0x0000aab12d442960) const at physical_batch_insert.cpp:425:8
    frame #6: 0x0000aaaaac1d2dd8 duckling`duckdb::PhysicalBatchInsert::Sink(duckdb::ExecutionContext&, duckdb::DataChunk&, duckdb::OperatorSinkInput&) const [inlined] duckdb::PhysicalBatchInsert::ExecuteTasks(this=0x0000aaaafa62ab40, context=<unavailable>, gstate_p=0x0000aaab06880f00, lstate_p=0x0000aab12d442960) const at physical_batch_insert.cpp:431:9
    frame #7: 0x0000aaaaac1d2dd8 duckling`duckdb::PhysicalBatchInsert::Sink(this=0x0000aaaafa62ab40, context=0x0000aab2fffd7cb0, chunk=<unavailable>, input=<unavailable>) const at physical_batch_insert.cpp:494:4
    frame #8: 0x0000aaaaac353158 duckling`duckdb::PipelineExecutor::ExecutePushInternal(duckdb::DataChunk&, duckdb::ExecutionBudget&, unsigned long) [inlined] duckdb::PipelineExecutor::Sink(this=0x0000aab2fffd7c00, chunk=0x0000aab2fffd7d30, input=0x0000fffec0aba8d8) at pipeline_executor.cpp:521:24
    frame #9: 0x0000aaaaac353130 duckling`duckdb::PipelineExecutor::ExecutePushInternal(this=0x0000aab2fffd7c00, input=0x0000aab2fffd7d30, chunk_budget=0x0000fffec0aba980, initial_idx=0) at pipeline_executor.cpp:332:23
    frame #10: 0x0000aaaaac34f7b4 duckling`duckdb::PipelineExecutor::Execute(this=0x0000aab2fffd7c00, max_chunks=<unavailable>) at pipeline_executor.cpp:201:13
    frame #11: 0x0000aaaaac34f258 duckling`duckdb::PipelineTask::ExecuteTask(duckdb::TaskExecutionMode) [inlined] duckdb::PipelineExecutor::Execute(this=<unavailable>) at pipeline_executor.cpp:278:9
    frame duckdb#12: 0x0000aaaaac34f250 duckling`duckdb::PipelineTask::ExecuteTask(this=0x0000aab16dafd630, mode=<unavailable>) at pipeline.cpp:51:33
    frame duckdb#13: 0x0000aaaaac348298 duckling`duckdb::ExecutorTask::Execute(this=0x0000aab16dafd630, mode=<unavailable>) at executor_task.cpp:49:11
    frame duckdb#14: 0x0000aaaaac356600 duckling`duckdb::TaskScheduler::ExecuteForever(this=0x0000aaaaf0105560, marker=0x0000aaaaf00ee578) at task_scheduler.cpp:189:32
    frame duckdb#15: 0x0000ffffab0a31fc
    frame duckdb#16: 0x0000ffffab2ad5c8
```

Core dump analysis showed that the assertion `D_ASSERT(lstate.writer);`
in `MergeCollectionTask::Execute` (i.e. it is crashing because
`lstate.writer` is NULLPTR) was not satisfied when
`PhysicalBatchInsert::Sink` was processing merge tasks from (other)
pipeline executors.

My suspicion is that this is only likely to happen for heavily
concurrent workloads (applicable to the two users which crashed). The
patch submitted as part of this PR has addressed the issue for these
users.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants