Skip to content

Conversation

samansmink
Copy link
Contributor

@samansmink samansmink commented Nov 18, 2022

tldr; reading many small parquet files over http now much faster (use many threads)

This PR adds 2 optimizations focused on improving performance of scanning parquet files over HTTP:

  • parallel Parquet metadata reading
  • HTTP metadata caching

Essentially, this PR aims to improve performance of queries like: select count(*) from 's3://bucket/*.parquet'; with a lot of files. In these queries only the metadata of the files needs to be read, so opening HTTP files and reading parquet metadata were major bottlenecks here.

Additionally, the PR adds an HTTP stats counter to the output of EXPLAIN ANALYZE which I needed for testing this and could also be useful for debugging your AWS bill.

Parallel parquet ParquetReader creation

In the parquet reader, the parquet metadata is read when the ParquetReader is created. Previously, this would happen while holding a lock on the ParquetReadGlobalState. This resulted in single threaded reading of parquet files.

To solve this, now when a thread enters ParquetParallelStateNext and creates a ParquetReader for a file, it will release the lock on the ParquetReadGlobalState. This allows other threads to either:

  • pick up the next row group to be scanned
  • start creating a ParquetReader for another file that needs to be scanned
  • when all files are scanned or opened: wait for the current file to be opened by another thread.

HTTP metadata cache

Due to the Filesystem abstraction of the HTTPFS extension, we require a HEAD request every time we create a FileHandle to get the file size. In the parquet reader, multiple handles to the same file will be opened to be able to do parallel reads to the same file. This can get expensive in the case of our example query as it will twice as many head requests as it needs to. The solution is to cache the metadata returned by the HEAD requests.

By default, this cache is invalidated at the end of each query. However a setting is also added that switches to a global cache which is shared between all requests made through the HTTPFS: SET enable_http_metadata_cache=true

Benchmark

Uploaded a dataset of 120 parquet files to S3, then ran: select count(*) from 's3://bucket/*.parquet'; from my laptop. Note that the RTT to S3 from DuckDB HQ is ~20ms.

implementation threads time
current master 8 33s
current master 120 34s
parallel metadata 8 13.3s
parallel metadata 120 13s
parallel metadata + http cache 8 3.26s
parallel metadata + http cache 120 0.97s

Here's the output for the HTTP Stats from the EXPLAIN ANALYZE of this query:

┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││            HTTP Stats:            ││
││                                   ││
││             in: 234KB             ││
││            out: 0 bytes           ││
││             #HEAD: 119            ││
││             #GET: 241             ││
││              #PUT: 0              ││
││              #POST: 0             ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘

@Mytherin Mytherin changed the base branch from master to feature November 19, 2022 15:45
Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! LGTM - one minor comment

@samansmink
Copy link
Contributor Author

@Mytherin this one is good to go from my side, CI is unrelated i think!

@Mytherin Mytherin merged commit a386815 into duckdb:feature Dec 8, 2022
@Mytherin
Copy link
Collaborator

Mytherin commented Dec 8, 2022

Thanks!

@danthegoodman1
Copy link

Wondering if this optimizes the following:

CREATE VIEW test AS
  SELECT *, type, year, quarter, filename FROM read_parquet('s3://ookla-open-data/parquet/performance/type=*/year=*/quarter=*/*.parquet', HIVE_PARTITIONING = 1, filename=true)

Performing the query:

select filename from test ORDER BY year DESC LIMIT 15

takes 35253.36670875549 ms from a github codespaces in my testing, and 4450 ms without the ORDER BY

Since these are hive partitioned, I would expect that it would be able to list all the files out. Not sure if this is the cost of many S3 LIST operations or something else that causes such an increase in processing time.

@polidore
Copy link

excited for this feature, as my use-case involves many parquet files on s3, and I find it very slow unless running on ec2.

@polidore
Copy link

An example. Hard to tell if it's doing HTTP range or if it's downloading each parquet file in its entirety?

┌─────────────┴─────────────┐
│        PARQUET_SCAN       │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│          datadate         │
│           ticker          │
│          currency         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│ Filters: datadate=2022-06 │
│-07 AND datadate IS NO...  │
│          EC=14966         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            1562           │
│         (2448.31s)        │
└───────────────────────────┘

@samansmink
Copy link
Contributor Author

@polidore If files are relatively small or you scan little from each file, you should see a good improvements from this. Note that you can already try it out using a dev build. With regards to your query: duckdb always uses range requests, firstly to query the metadata only, then to fetch the required columns. To tell if only metadata is scanned is currently not easily possible afaik. With this PR you could sortof use the GET request counter to deduce it, but thats not something i would rely on.

@danthegoodman1 thats a good question. Filtering on hive partitioned/filename columns requires a specific optimization preventing scanning that seems to not be working there in the ordered case there. Would you mind trying the same query again with this PR using one of the dev builds, then opening a new issue if it persists?

@polidore
Copy link

This is a case where the files are small because they are incrementally created day by day. I could use the duckdb storage engine to do inserts and it would be very fast, but then I can't run analysis on my laptop easily. My plan is to write to s3 and then be able to do analysis on any machine using duckdb. In this way s3 is basically my database "server".

I tried it on the dev build, and wow, that's a big improvement! It's 10x faster and goes from unusable to usable. Thanks! Hopefully this gets released soon.

┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││            HTTP Stats:            ││
││                                   ││
││             in: 11.1MB            ││
││            out: 0 bytes           ││
││            #HEAD: 1070            ││
││             #GET: 2143            ││
││              #PUT: 0              ││
││              #POST: 0             ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││         Total Time: 26.59s        ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌───────────────────────────┐
│      EXPLAIN_ANALYZE      │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│             0             │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│          ORDER_BY         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│          ORDERS:          │
│  "s3://atom-tca/routes/*  │
│    .parquet".ticker ASC   │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            1562           │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│        PARQUET_SCAN       │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│          datadate         │
│           ticker          │
│          currency         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│ Filters: datadate=2022-06 │
│-07 AND datadate IS NO...  │
│          EC=14966         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            1562           │
│         (388.61s)         │
└───────────────────────────┘
D

@danthegoodman1
Copy link

@samansmink getting some build errors in my codespace (dev laptop died):

[ 29%] Building CXX object src/main/CMakeFiles/duckdb_main.dir/ub_duckdb_main.cpp.o
In file included from /workspaces/codespaces-blank/duckdb/build/release/src/main/ub_duckdb_main.cpp:7:
/workspaces/codespaces-blank/duckdb/src/main/config.cpp:26:101: error: ‘ResetLocal’ is not a member of ‘duckdb::EnableProgressBarPrintSetting’
   26 |   _PARAM::Name, _PARAM::Description, _PARAM::InputType, nullptr, _PARAM::SetLocal, nullptr, _PARAM::ResetLocal,  \
      |                                                                                                     ^~~~~~~~~~
/workspaces/codespaces-blank/duckdb/src/main/config.cpp:65:50: note: in expansion of macro ‘DUCKDB_LOCAL’
   65 |                                                  DUCKDB_LOCAL(EnableProgressBarPrintSetting),
      |                                                  ^~~~~~~~~~~~
make[3]: *** [src/main/CMakeFiles/duckdb_main.dir/build.make:63: src/main/CMakeFiles/duckdb_main.dir/ub_duckdb_main.cpp.o] Error 1
make[3]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make[2]: *** [CMakeFiles/Makefile2:5907: src/main/CMakeFiles/duckdb_main.dir/all] Error 2
make[2]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make[1]: *** [Makefile:130: all] Error 2
make[1]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make: *** [Makefile:167: release] Error 2

This is master

@Mause
Copy link
Member

Mause commented Dec 13, 2022

@samansmink getting some build errors in my codespace (dev laptop died):

[ 29%] Building CXX object src/main/CMakeFiles/duckdb_main.dir/ub_duckdb_main.cpp.o
In file included from /workspaces/codespaces-blank/duckdb/build/release/src/main/ub_duckdb_main.cpp:7:
/workspaces/codespaces-blank/duckdb/src/main/config.cpp:26:101: error: ‘ResetLocal’ is not a member of ‘duckdb::EnableProgressBarPrintSetting’
   26 |   _PARAM::Name, _PARAM::Description, _PARAM::InputType, nullptr, _PARAM::SetLocal, nullptr, _PARAM::ResetLocal,  \
      |                                                                                                     ^~~~~~~~~~
/workspaces/codespaces-blank/duckdb/src/main/config.cpp:65:50: note: in expansion of macro ‘DUCKDB_LOCAL’
   65 |                                                  DUCKDB_LOCAL(EnableProgressBarPrintSetting),
      |                                                  ^~~~~~~~~~~~
make[3]: *** [src/main/CMakeFiles/duckdb_main.dir/build.make:63: src/main/CMakeFiles/duckdb_main.dir/ub_duckdb_main.cpp.o] Error 1
make[3]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make[2]: *** [CMakeFiles/Makefile2:5907: src/main/CMakeFiles/duckdb_main.dir/all] Error 2
make[2]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make[1]: *** [Makefile:130: all] Error 2
make[1]: Leaving directory '/workspaces/codespaces-blank/duckdb/build/release'
make: *** [Makefile:167: release] Error 2

This is master

You probably need #5671 ?

@danthegoodman1
Copy link

Will try!

@danthegoodman1
Copy link

That is showing as v0.3.4-dev7245 for me, not sure if that's accurate or not

@danthegoodman1
Copy link

Filtering on hive partitioned/filename columns requires a specific optimization preventing scanning that seems to not be working there in the ordered case there. Would you mind trying the same query again with this PR using one of the dev builds, then opening a new issue if it persists?

A brief look at the PR doesn't indicate that this would be solved from a partitioning perspective. My assumption is that it could optimize by listing only the files that match that prefix, or at least from listing all files figure out what matches before reading any. Would the optimization from this PR be to speed up the concurrency?

And if I am understanding correctly, you're saying that it's not working as expected if my assumption is not the observed case?

Still working on getting this tested with this build in the meantime.

Here is the EXPLAIN ANALYZE of the query in v0.6.1 for reference:

┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Query Profiling Information ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE select filename from test ORDER BY year DESC LIMIT 15
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Total Time: 32.97s ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
┌───────────────────────────┐
│ EXPLAIN_ANALYZE │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 0 │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ #0 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 15 │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ TOP_N │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ Top 15 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ #1 DESC │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 15 │
│ (12.48s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PARQUET_SCAN │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ filename │
│ year │
│ EC=146311080 │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
│ 155194705 │
│ (111.76s) │
└───────────────────────────┘

@samansmink
Copy link
Contributor Author

@danthegoodman1

That is showing as v0.3.4-dev7245 for me, not sure if that's accurate or not

Assuming you built yourself, git doesn't pull tags automatically, pull the latest tags with: git fetch --all --tags

Looking at your queries again, I kinda missed yesterday the following: Without the ORDER BY your query only needs to scan a single file, assuming it contains more than 15 tuples. With the ORDER BY all files will be scanned.

4.5s could very well be the expected cost due to the amount of list calls that need to be made, but thats hard to tell from this info alone.

The optimisation I mentioned was added in #4211, it will push down filters on hive partitioned columns into the parquet scan. In your case however, you're not filtering on the hive partitioned column, but you are doing a top N on it, this will require falling back to reading all the metadata. This should still be a lot faster though with this PR.

To really reduce your query to only list calls would require the order_by + limit to be pushed into the parquet scan somehow. this could be something interesting to look into.

For now, you could consider this:

select filename from test WHERE year=(
    SELECT max(year) from test;
) LIMIT 15;

This query should read all metadata once to get the max year, then query the first file in the folder matching the top year. Note that it assumes there are 15 or more tuples in there to match your original query

@danthegoodman1
Copy link

@samansmink That query took 35s as well

@djouallah
Copy link

even with 0.6.2.dev484, PRAGMA enable_http_metadata_cache; don't seems to be working yet ?

@samansmink
Copy link
Contributor Author

@djouallah You sure? It seems to be working fine for me on that version from python

@danthegoodman1 ill take a look into this tomorrow, there may be some room for further optimizations there

@djouallah
Copy link

@djouallah sorry again, it works, it should be SET enable_http_metadata_cache=true ;

@tobilg
Copy link

tobilg commented Dec 15, 2022

That's a great improvement! Will this then also land in the DuckDB WASM version?

tobilg added a commit to tobilg/duckdb-nodejs-layer that referenced this pull request Dec 16, 2022
@dberardo-com
Copy link

thanks for implementing this feature, when will this be availablle in the nodejs client ?

also: what does "EC" mean in the analyse results ? i was trying to find it in the doc but without success. is it a parameter that should be configured on the client side? or is it just a performance metric ?

@samansmink
Copy link
Contributor Author

@tobilg WASM is generally updated a bit after the main release the next one is planned for early february, however note this will not have as much of an impact in duckdb wasm

@dberardo-com recent dev versions of the node client should have this feature https://www.npmjs.com/package/duckdb?activeTab=versions for a production version, the next release is planned for early february.

EC stands for Estimated Cardinalty. It's the amount of tuples that is expected to be produced by the operator. This information can be used by the query planner / optimizer to determine what the best query plan is. This information can help figure out why a specific plan was chosen.

@dberardo-com
Copy link

thanks for the complete answer, I could not find an explanation for the "EC" in the main doc, so you provided a helpful inputs.

i still see some confusing figures in the query plan, where the amount of seconds per block in the diagram most of the times is higher then the actual total time elapsed for the whole query. e.g. in this comment one can see a single stage estimat (?) of 300s but the actual query took 23s: #5405 (comment). Is it perhaps because we have to factor in the number of corse on the machine running the query (16 cores in that case?) ?

I will try to dig deeper into how to improve performance of parquet_scan, because i still feel that 20s for a query of about 1500 samples still has quite a big impact on UX, thanks for providing this relevant information

@samansmink
Copy link
Contributor Author

@dberardo-com thats indeed due to parallel execution, it will combine the time spent in the operators by the different threads.

@tobilg
Copy link

tobilg commented Dec 21, 2022

Thanks @samansmink for the feedback! I was asking about the WASM version because I saw that it produces a "HTTP waterfall" as outlined in duckdb/duckdb-wasm#1087 when querying a lot of files in sequence, instead of possibly in a parallel fashion... If it could profit from this change, the UX of DuckDB WASM would really improve, but I have to admit that I don't know the technical implications. Thanks for your work!

@kylebarron
Copy link

note this will not have as much of an impact in duckdb wasm

Is that because of threading limitations in the browser?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants