-
Notifications
You must be signed in to change notification settings - Fork 2.6k
HTTP parquet optimizations #5405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP parquet optimizations #5405
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! LGTM - one minor comment
@Mytherin this one is good to go from my side, CI is unrelated i think! |
Thanks! |
Wondering if this optimizes the following:
Performing the query:
takes Since these are hive partitioned, I would expect that it would be able to list all the files out. Not sure if this is the cost of many S3 LIST operations or something else that causes such an increase in processing time. |
excited for this feature, as my use-case involves many parquet files on s3, and I find it very slow unless running on ec2. |
An example. Hard to tell if it's doing HTTP range or if it's downloading each parquet file in its entirety?
|
@polidore If files are relatively small or you scan little from each file, you should see a good improvements from this. Note that you can already try it out using a dev build. With regards to your query: duckdb always uses range requests, firstly to query the metadata only, then to fetch the required columns. To tell if only metadata is scanned is currently not easily possible afaik. With this PR you could sortof use the GET request counter to deduce it, but thats not something i would rely on. @danthegoodman1 thats a good question. Filtering on hive partitioned/filename columns requires a specific optimization preventing scanning that seems to not be working there in the ordered case there. Would you mind trying the same query again with this PR using one of the dev builds, then opening a new issue if it persists? |
This is a case where the files are small because they are incrementally created day by day. I could use the duckdb storage engine to do inserts and it would be very fast, but then I can't run analysis on my laptop easily. My plan is to write to s3 and then be able to do analysis on any machine using duckdb. In this way s3 is basically my database "server". I tried it on the dev build, and wow, that's a big improvement! It's 10x faster and goes from unusable to usable. Thanks! Hopefully this gets released soon.
|
@samansmink getting some build errors in my codespace (dev laptop died):
This is |
You probably need #5671 ? |
Will try! |
That is showing as |
A brief look at the PR doesn't indicate that this would be solved from a partitioning perspective. My assumption is that it could optimize by listing only the files that match that prefix, or at least from listing all files figure out what matches before reading any. Would the optimization from this PR be to speed up the concurrency? And if I am understanding correctly, you're saying that it's not working as expected if my assumption is not the observed case? Still working on getting this tested with this build in the meantime. Here is the
|
Assuming you built yourself, git doesn't pull tags automatically, pull the latest tags with: Looking at your queries again, I kinda missed yesterday the following: Without the 4.5s could very well be the expected cost due to the amount of list calls that need to be made, but thats hard to tell from this info alone. The optimisation I mentioned was added in #4211, it will push down filters on hive partitioned columns into the parquet scan. In your case however, you're not filtering on the hive partitioned column, but you are doing a top N on it, this will require falling back to reading all the metadata. This should still be a lot faster though with this PR. To really reduce your query to only list calls would require the order_by + limit to be pushed into the parquet scan somehow. this could be something interesting to look into. For now, you could consider this:
This query should read all metadata once to get the max year, then query the first file in the folder matching the top year. Note that it assumes there are 15 or more tuples in there to match your original query |
@samansmink That query took 35s as well |
even with 0.6.2.dev484, PRAGMA enable_http_metadata_cache; don't seems to be working yet ? |
@djouallah You sure? It seems to be working fine for me on that version from python @danthegoodman1 ill take a look into this tomorrow, there may be some room for further optimizations there |
@djouallah sorry again, it works, it should be SET enable_http_metadata_cache=true ; |
That's a great improvement! Will this then also land in the DuckDB WASM version? |
thanks for implementing this feature, when will this be availablle in the nodejs client ? also: what does "EC" mean in the analyse results ? i was trying to find it in the doc but without success. is it a parameter that should be configured on the client side? or is it just a performance metric ? |
@tobilg WASM is generally updated a bit after the main release the next one is planned for early february, however note this will not have as much of an impact in duckdb wasm @dberardo-com recent dev versions of the node client should have this feature https://www.npmjs.com/package/duckdb?activeTab=versions for a production version, the next release is planned for early february. EC stands for Estimated Cardinalty. It's the amount of tuples that is expected to be produced by the operator. This information can be used by the query planner / optimizer to determine what the best query plan is. This information can help figure out why a specific plan was chosen. |
thanks for the complete answer, I could not find an explanation for the "EC" in the main doc, so you provided a helpful inputs. i still see some confusing figures in the query plan, where the amount of seconds per block in the diagram most of the times is higher then the actual total time elapsed for the whole query. e.g. in this comment one can see a single stage estimat (?) of 300s but the actual query took 23s: #5405 (comment). Is it perhaps because we have to factor in the number of corse on the machine running the query (16 cores in that case?) ? I will try to dig deeper into how to improve performance of parquet_scan, because i still feel that 20s for a query of about 1500 samples still has quite a big impact on UX, thanks for providing this relevant information |
@dberardo-com thats indeed due to parallel execution, it will combine the time spent in the operators by the different threads. |
Thanks @samansmink for the feedback! I was asking about the WASM version because I saw that it produces a "HTTP waterfall" as outlined in duckdb/duckdb-wasm#1087 when querying a lot of files in sequence, instead of possibly in a parallel fashion... If it could profit from this change, the UX of DuckDB WASM would really improve, but I have to admit that I don't know the technical implications. Thanks for your work! |
Is that because of threading limitations in the browser? |
tldr; reading many small parquet files over http now much faster (use many threads)
This PR adds 2 optimizations focused on improving performance of scanning parquet files over HTTP:
Essentially, this PR aims to improve performance of queries like:
select count(*) from 's3://bucket/*.parquet';
with a lot of files. In these queries only the metadata of the files needs to be read, so opening HTTP files and reading parquet metadata were major bottlenecks here.Additionally, the PR adds an HTTP stats counter to the output of
EXPLAIN ANALYZE
which I needed for testing this and could also be useful for debugging your AWS bill.Parallel parquet ParquetReader creation
In the parquet reader, the parquet metadata is read when the ParquetReader is created. Previously, this would happen while holding a lock on the ParquetReadGlobalState. This resulted in single threaded reading of parquet files.
To solve this, now when a thread enters
ParquetParallelStateNext
and creates a ParquetReader for a file, it will release the lock on the ParquetReadGlobalState. This allows other threads to either:HTTP metadata cache
Due to the Filesystem abstraction of the HTTPFS extension, we require a HEAD request every time we create a FileHandle to get the file size. In the parquet reader, multiple handles to the same file will be opened to be able to do parallel reads to the same file. This can get expensive in the case of our example query as it will twice as many head requests as it needs to. The solution is to cache the metadata returned by the HEAD requests.
By default, this cache is invalidated at the end of each query. However a setting is also added that switches to a global cache which is shared between all requests made through the HTTPFS:
SET enable_http_metadata_cache=true
Benchmark
Uploaded a dataset of 120 parquet files to S3, then ran:
select count(*) from 's3://bucket/*.parquet';
from my laptop. Note that the RTT to S3 from DuckDB HQ is ~20ms.Here's the output for the HTTP Stats from the
EXPLAIN ANALYZE
of this query: