Skip to content

Conversation

Mytherin
Copy link
Collaborator

This PR creates a new MultiFileReader class that provides helper functions for reader functions that can read from multiple files (e.g. Parquet, CSV, JSON functions). The goal of this class is to unify the logic for the various tasks that are required, including:

  • Globbing the file system in different ways (LIST parameter, VARCHAR parameter, etc)
  • The filename parameter (outputting file names)
  • The hive_partitioning parameter (reading from hive partitions and outputting extra columns based on partition info)
  • The union_by_name parameter (unifying the schema of all files)
  • Handling with projection & filter pushdown combined with these parameters
  • Handling file pruning based on hive partitions/filename filters

By unifying the code, we can provide a consistent experience for any function that operates on multiple files, while avoiding having to duplicate (often rather complex!) logic.

This PR unifies the logic of the Parquet reader and the CSV reader, and partly the JSON reader (but more work remains on the JSON reader side). This has the following positive effects:

  • Support projection pushdown in the (parallel) CSV reader
  • Support union_by_name, filename and hive_partitioning in the Parallel CSV reader
  • glob/parquet_metadata/parquet_schema now support LIST parameters
  • Fix several issues when combining union_by_name with filter pushdown in the Parquet reader
  • Fix several issues when combining union_by_name and hive_partitioning
  • Clean up Parquet reader code - strip out anything that does not have to do with reading from the Parquet file

Mytherin added 30 commits March 27, 2023 21:31
@Mytherin Mytherin requested a review from pdet March 30, 2023 11:21
@david-cortes
Copy link
Contributor

@Mytherin By the way, since you are looking into handling of filename on pushdowns here and this unifies it across readers, I imagine this might be touching the same kind of code that produces the segfault from issue #6881 from de-referencing a null pointer when filename gets passed to radix sort.

If this is the case (please ignore if it isn't), I am seeing a segfault here with a similar stack trace from -fsanitize=address - this is from executing the query on the random data from that issue in the CLI interface with .read <file.sql>.

Perhaps the stack trace might be connected to this PR:

v0.7.2-dev1230 c4546040aa
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D .read crasher.sql
AddressSanitizer:DEADLYSIGNAL
=================================================================
==10798==ERROR: AddressSanitizer: SEGV on unknown address 0x000000000007 (pc 0x7feb485716c0 bp 0x62900271f200 sp 0x7feb417f2fd8 T10)
==10798==The signal is caused by a READ memory access.
==10798==Hint: address points to the zero page.
    #0 0x7feb485716c0 in __memmove_avx_unaligned_erms ../sysdeps/x86_64/multiarch/memmove-vec-unaligned-erms.S:372
    #1 0x5560cead5488 in duckdb::Radix::EncodeStringDataPrefix(unsigned char*, duckdb::string_t, unsigned long) /home/david/Projects/duckdb/src/include/duckdb/common/radix.hpp:53
    #2 0x5560cead5488 in duckdb::RadixScatterStringVector(duckdb::UnifiedVectorFormat&, duckdb::SelectionVector const&, unsigned long, unsigned char**, bool, bool, bool, unsigned long, unsigned long) /home/david/Projects/duckdb/src/common/row_operations/row_radix_scatter.cpp:69
    #3 0x5560ceb3fd2d in duckdb::RowOperations::RadixScatter(duckdb::Vector&, unsigned long, duckdb::SelectionVector const&, unsigned long, unsigned char**, bool, bool, bool, unsigned long, unsigned long, unsigned long) /home/david/Projects/duckdb/src/common/row_operations/row_radix_scatter.cpp:256
    #4 0x5560ceb6e308 in duckdb::LocalSortState::SinkChunk(duckdb::DataChunk&, duckdb::DataChunk&) /home/david/Projects/duckdb/src/common/sort/sort_state.cpp:189
    #5 0x5560cefd5f21 in duckdb::PhysicalOrder::Sink(duckdb::ExecutionContext&, duckdb::GlobalSinkState&, duckdb::LocalSinkState&, duckdb::DataChunk&) const /home/david/Projects/duckdb/src/execution/operator/order/physical_order.cpp:96
    #6 0x5560cd68a1f7 in duckdb::PipelineExecutor::ExecutePushInternal(duckdb::DataChunk&, unsigned long) /home/david/Projects/duckdb/src/parallel/pipeline_executor.cpp:104
    #7 0x5560cd68ad8e in duckdb::PipelineExecutor::Execute(unsigned long) /home/david/Projects/duckdb/src/parallel/pipeline_executor.cpp:52
    #8 0x5560cd6a3195 in duckdb::PipelineTask::ExecuteTask(duckdb::TaskExecutionMode) /home/david/Projects/duckdb/src/parallel/pipeline.cpp:42
    #9 0x5560cd67e5d7 in duckdb::ExecutorTask::Execute(duckdb::TaskExecutionMode) /home/david/Projects/duckdb/src/parallel/executor_task.cpp:17
    #10 0x5560cd675336 in duckdb::TaskScheduler::ExecuteForever(std::atomic<bool>*) /home/david/Projects/duckdb/src/parallel/task_scheduler.cpp:135
    #11 0x7feb482d44a2  (/lib/x86_64-linux-gnu/libstdc++.so.6+0xd44a2)
    #12 0x7feb484a7fd3 in start_thread nptl/pthread_create.c:442
    #13 0x7feb4852866b in clone3 ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

AddressSanitizer can not provide additional info.
SUMMARY: AddressSanitizer: SEGV ../sysdeps/x86_64/multiarch/memmove-vec-unaligned-erms.S:372 in __memmove_avx_unaligned_erms
Thread T10 created by T0 here:
    #0 0x7feb48649726 in __interceptor_pthread_create ../../../../src/libsanitizer/asan/asan_interceptors.cpp:207
    #1 0x7feb482d4578 in std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (/lib/x86_64-linux-gnu/libstdc++.so.6+0xd4578)

==10798==ABORTING

And the SQL file (note that the system paths need to be specified as absolute paths):

SET memory_limit='1GB';
SET temp_directory='/home/david/ddb_example/';
with tbl0 as (
select
    *
from read_parquet(
    '/home/david/ddb_example/*.parquet'
    , filename=true
    , file_row_number=true
    , union_by_name=true
)
where
    dt is not null
    and dt >= '2020-04-01'
    and dt <= '2020-08-01'
    and cat1 is not null
    and num1 is not null
)
, tbl_join1 as (
    select
        cat1
        , favg("num2") as "num2_avg"
        , count(*) as num2_cnt
    from tbl0
    where
        "num2" is not null
        and not isnan("num2"::float)
    group by
        cat1
)
, tbl_join2 as (
    select
        cat3
        , favg("num3") as "num3_avg"
        , count(*) as num3_cnt
    from tbl0
    where
        "num3" is not null
        and not isnan("num3"::float)
    group by cat3
)
select
    tbl0.cat1
    , tbl0.cat3
    , cat2
    , num1
    , num3
    , num2
    , dt
    , num2_avg
    , num2_cnt
    , num3_avg
    , num3_cnt
    , row_number() over(order by filename, file_row_number) as row_num
from tbl0
join tbl_join1
    on tbl_join1.cat1 = tbl0.cat1
join tbl_join2
    on tbl_join2.cat3 = tbl0.cat3
where
    "dt" is not null
order by filename, file_row_number

@Mytherin
Copy link
Collaborator Author

The issue I found with union_by_name while investigating that issue were indeed why I set out to do this refactor - but after looking at it again the issue itself does not appear to be related to union_by_name or filename - but rather to the window function partitioning code and how it interacts with low-memory situations.

Copy link
Contributor

@pdet pdet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks excellent,

The only thing I'm missing is verification that the projections are being pushed down on the CSV reader.
I guess a test that regex the plan should suffice?

Otherwise, we could also add tests like the one in this commit:
cbdcb1b

To verify we are executing the correct reader (parallel or single threaded)
https://github.com/duckdb/duckdb/blob/cbdcb1b8e90687f3b531c9d32f674db48a84c072/test/sql/copy/csv/parallel/test_parallel_option.test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Segfault when using enable_object_cache alongside union_by_name for parquet files
3 participants