-
Notifications
You must be signed in to change notification settings - Fork 2.6k
External File Cache #16463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External File Cache #16463
Conversation
…ems that do not implement it
…es something in httpfs), and round to MBs in test
In the I would assume we could to ideally only 1 request per query when stuff is cached. Other question: what happens in the evil case of ranges like: This might blow up the cache / add complexity. We do have a somewhat similar logic in duckdb-wasm to handle complex cases, I can later check and compare with that (Wasm piggyback on the browser, so in theory could do better). |
My knee-jerk reaction to this is: can't this result in problems when this data is accidentally modified? Are we doing something to prevent this? I don't think EDIT: |
Also, I believe you are bumping into #16177, my suspicion is that disabling delta might solve this. |
@carlopi The Etag is checked once for every As for the evil byte ranges, if we merge non-overlapping byte ranges, that will cause overhead. I think with the prefetching mechanism in Parquet it should be OK since the byte ranges it fetches are pretty big. Would be cool if we can get close to WASM speeds. |
@lnkuiper this is so cool! will it work with extensions automatically? ie. delta/iceberg scanners? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks amazing @lnkuiper!
@@ -56,7 +56,7 @@ struct ParquetReaderScanState { | |||
vector<idx_t> group_idx_list; | |||
int64_t current_group; | |||
idx_t group_offset; | |||
unique_ptr<FileHandle> file_handle; | |||
unique_ptr<CachingFileHandle> file_handle; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking whether this needs to be aware that its holding a CachingFileHandle instead of a base FileHandle.
Could the VirtualFileSystem be able to hand out CachingFileHandle
s based on the flags passed to OpenFile?
Ideally we would also have a way to have the filesystems be able to use the cache without the calling code aware of it. As discussed in person already, this would break the zero-copy side of things, but that might still be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely.
Just a recap of our discussion: This API implemented in this PR requires us to modify IO code in the Parquet extension (and anywhere else where we would like to use this). Ideally, this would not be the case. However, if we want to enable caching without changing the API, this would create a double copy (memcpy
overhead, and briefly, a 2x of memory usage).
I've implemented it like this to prevent this overhead, and to eventually allow zero-copy reads. However, we should easily be able to build an API on top of this that caches data implicitly using the tried-and-tested API of FileSystem
, allowing for easy integration in, e.g., the Avro extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it would be nice to consider the fact that we may be holding different types of caches in a unified interface. We currently already have things like:
- http metadata cache
- azure client cache
- parquet object cache
which all would benefit from a coherent way to:
- inspect the cache entries
- purge cache entries (could even be a SQL statement perhaps?
DELETE FROM duckdb_cache_entries
) - enforce global limits on caching
- configure eviction strategies / priorities on different caches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this internal issue/discussion is relevant here https://github.com/duckdblabs/duckdb-internal/issues/1833
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this idea. If we abstract it such that for every cache we have some generic function to request some metadata (JSON?) about the current state of the cache, we could query all registered caches using a single table function.
Really cool work!
I wanted to add that such a persistent cache would be super useful for pg_duckdb. For it to work in pg_duckdb it'd be important that the cache would allow concurrent access to the same persistent cache from different duckdb processes, because in pg_duckdb each Postgres connection (i.e. process) has its own in-memory dedicated duckdb instance. |
we need to protect @lnkuiper at all cost :) let him merge his first version and then we can ask him for more :) |
@Tishj Indeed, we compute a checksum in DEBUG mode to prevent things like this from happening. @rudolfix Delta/Iceberg should benefit as these use our Parquet extension, in which this is integrated. Iceberg also uses Avro scans, which do not benefit yet, but we can look into this in the future. @JelteF Definitely. We want to develop this into a persistent cache in the future, but will refine the in-memory version first. |
Thanks! |
@lnkuiper are you planning a follow up PR to make it configurable to perform the cache invalidation ETag HEAD requests? For immutable parquet files e.g. in iceberg this would improve the performance a lot. |
Hi @nicornk, this will come in a follow-up PR. We're still debating the best way to do this, but I think we'll use the new |
@lnkuiper 2.6 x, improvement, not bad at all !!! |
This PR follows up on #15119 with: - Switching the HTTP Logger to use the new logging infrastructure by default (logging to a file is still possible using old setting) - Adds a new mechanism for FileHandle logging - Refactoring the loggers to support structured logging My apologies for the slightly chaotic PR here. It took a few tries to understand what makes sense here and I still feel like we are not 100% there yet. However, this *should* be a decent step in the right direction. ## FileHandle logging This PR adds logging infra for filehandles. Basically what I wanted is to create a unified way to log of the FileSystem APIs. I think this can be very useful, for example in testing/optimizing Iceberg/Delta workloads, but also to easily analyze IO patterns of our various readers like the parquet reader, json reader, or avro readers. Since the log messages will be in JSON format, they are easy to parse and do analysis on. Heres a demo: ``` D set enable_logging=true; D set logging_level='trace'; D COPY (SELECT 1 as a) TO './test.csv'; D FROM "./test.csv"; ┌───────┐ │ a │ │ int64 │ ├───────┤ │ 1 │ └───────┘ D SELECT message FROM duckdb_logs WHERE type = 'FileSystem'; ┌───────────────────────────────────────────────────────────────────────────┐ │ message │ │ varchar │ ├───────────────────────────────────────────────────────────────────────────┤ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"OPEN"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"2"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"1"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"1"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"CLOSE"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"OPEN"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"READ","bytes":"4"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"READ","bytes":"0"} │ │ {"fs":"LocalFileSystem","path":"./test.csv","op":"CLOSE"} │ └───────────────────────────────────────────────────────────────────────────┘ ``` ### Implementation While conceptually simple, the complexity lies in the fact that we want to generally log to the ClientContext to ensure the log entries have connection level context info. This is done by changing what was previously a `unique_ptr<Logger>` to a `shared_ptr<Logger>` in the Client context and copying the pointer to the logger into the filehandles on creation. What this means is that a filehandle will log using the logging configuration of the client context that created it. because for now filehandles will not outlive the clientcontext that created it, this is fine, but in theory this can create some confusion if we were to share filehandles between connections or even between queries. I think we probably just want to ensure we don't keep filehandles open between queries. I've created some macros for standardization of the common filehandle operations that we want to log: ```C++ #define DUCKDB_LOG_FILE_SYSTEM_READ(HANDLE, ...) DUCKDB_LOG_FILE_HANDLE_VA(HANDLE, "READ", __VA_ARGS__); #define DUCKDB_LOG_FILE_SYSTEM_WRITE(HANDLE, ...) DUCKDB_LOG_FILE_HANDLE_VA(HANDLE, "WRITE", __VA_ARGS__); #define DUCKDB_LOG_FILE_SYSTEM_OPEN(HANDLE) DUCKDB_LOG_FILE_HANDLE(HANDLE, "OPEN"); #define DUCKDB_LOG_FILE_SYSTEM_CLOSE(HANDLE) DUCKDB_LOG_FILE_HANDLE(HANDLE, "CLOSE"); ``` Then in the code, for example in the `LocalFileSystem::Read()` we can easily call the logger in an efficient way with the centrally defined log type and level for the FileHandle logging: ```C++ DUCKDB_LOG_FILE_SYSTEM_READ(handle, bytes_to_read, location); ``` which will ensure that extension that implement their own filesystems will be able to easily adhere to the logging convention. ## Logging refactor To neatly support writing structured log messages, this PR makes a few conceptual overhauls to the logging. First of all, we remove the log types as manually passed string. In the initial logging implementation, you would log like: ```C++ DUCKDB_LOG_<level>(context, "<log type string>", "message"); ``` However these manually provided strings are not really practical and likely to result in utter chaos. We change this now to: ```C++ // Default log type (empty string for now) DUCKDB_LOG_<level>(context, "message"); // Predefined log type DUCKDB_LOG(context, LogTypeClassName, "message"); ``` The `DUCKDB_LOG` macro is defined as ```C++ DUCKDB_LOG_INTERNAL(SOURCE, LOG_TYPE_CLASS::NAME, LOG_TYPE_CLASS::LEVEL, LOG_TYPE_CLASS::ConstructLogMessage(__VA_ARGS__)) ``` Which will ensure that the logs types can only be created using the corresponding log message construction methods. The `LogType` class will then also contain the logic to deserialize the log message string into a predefined datatype for easy parsing. What this allows us to do is to easily enable a specific log type, and let DuckDB automatically deserialize and unnest the resulting data: ```SQL PRAGMA enable_logging('FileSystem'); FROM './test.csv' SELECT fs, path, bytes, pos FROM duckdb_logs_parsed('FileSystem'); ``` which yields: ``` LocalFileSystem test.csv OPEN NULL NULL LocalFileSystem test.csv READ 4 0 LocalFileSystem test.csv READ 0 4 LocalFileSystem test.csv CLOSE NULL NULL ``` Note that `duckdb_logs_parsed` is simply a table macro for: ```SQL SELECT * EXCLUDE (message), UNNEST(parse_duckdb_log_message(log_type, message)) FROM duckdb_logs WHERE type = log_type ``` ## TODOs Some general follow ups: - Add logging to azure - Add logging to fsspec - Add logging to #16463 - More benchmarking - Make HTTPLogType a structured type A more complex follow up is to think about how to make this performant. Currently, enabling `PRAGMA enable_logging('FileSystem');` coulld become quite expensive be cause it needs to string compare the the logging type for every single `TRACE` level call. For now we actually get away with it because we only add the logger to the filehandle if that the time of filehandle creation we are logging for `FileSystem`. However, as soon as we were to add more Trace level calls that are hit a lot during execution, things will likely slow down fast.
External File Cache (duckdb/duckdb#16463) [Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
External File Cache (duckdb/duckdb#16463) [Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
External File Cache (duckdb/duckdb#16463) [Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
This PR implements an in-memory cache for external files (e.g., Parquet/CSV/JSON) that caches reads in memory to speed up subsequent reads. It is enabled by default, but can be disabled with
SET enable_external_file_cache=false;
.CachingFileSystem
The main implementation is in
CachingFileSystem
, a class that (sort-of) wrapsFileSystem
, but with a slightly different API. Instead reading a byte range from a file like so:Which reads
nr_bytes
fromlocation
intobufffer
, which must be pre-allocated to fitnr_bytes
, we instead read like so:Which returns a
BufferHandle
to a managed buffer holding the requestednr_bytes
fromlocation
, andbuffer
is set to point into it. This entails that we can (in theory) do zero-copy reads if the data is already cached in memory.This is currently integrated into the Parquet reader, but only when prefetching is enabled (which is the default remote reads, but can be enabled for local Parquet files with
SET prefetch_all_parquet_files=true;
).Reads have a size of
nr_bytes
, and are identified bypath
andlocation
. If the same byte range is (partially) queried twice, we first try to read from the in-memory cache. If a larger byte range fully encompasses a smaller byte range (e.g., when a full row group is queried after first querying a single column), the smaller byte range is removed in favor of the larger byte range. The smaller byte range is used to construct the larger byte range.Cache Invalidation
Because the buffers holding the byte ranges are in-memory only (for now), they might become invalid if the buffer manager decides to evict them. If an invalid buffer is encountered when trying to read from the corresponding file, it is removed from the cache.
Another reason for cache entries becoming invalid is due to files being modified. If we detect that a file has been modified, all of the byte ranges of that file are removed from the cache. We detect this for remote files using the HTTP
Etag
field.For local files, we use the "last modified" timestamp. For some file systems, the resolution of this timestamp is in nanoseconds, which is great, since reads/writes are so fast nowadays, we really need that to be sure that files have not been modified in the meantime. However, some file systems use a resolution of seconds, and our implementation should work regardless of the underlying file system and hardware. So, we require the last modified time to be more than two seconds in the past for the cache entry to be considered valid.
Example: Remote File
The most compelling example is for remote reads. For example:
As we can see, the second read is much faster than the initial read. In theory, it could be just a few milliseconds, but the second query still checks the
Etag
to see if the file was modified in between queries. We may add a config setting to skip checking this, but this can of course lead to very confusing errors when parts of the file are cached, and other are not, but the file has been changed in between queries.The external file cache can be queried to check which files and byte ranges are currently in the cache:
As we can see, there are two byte ranges in cache: a small one for the footer, and a larger one for the single row group that is in the file.
External file cache entries also show up under their own tag in
duckdb_memory()
:Example: Local File
I have the TPC-H
lineitem
table at SF10 in a local Parquet file (lineitem10.parquet
).When I run the following query on current
main
I get an execution time of ~0.44s, and
SET prefetch_all_parquet_files=true;
does not seem to make much of a difference.On this branch, with
SET prefetch_all_parquet_files=true;
, I get an execution time of ~0.46s on the first read, and an execution time of ~0.39s on subsequent reads.So, there seems to be a slight overhead from enabling the cache on the first read (most likely due to all threads inserting/reading from the cache in parallel, requiring some amount of locking), but subsequent reads are a bit faster.
Future Work
In the future, I'd like to enable this not only for prefetch reads, but for almost all Parquet reads. Currently, this would cause overhead, since the Parquet reader copies the data from the cached buffer into a local buffer. However, this can (and should) be entirely zero-copy. We can also cache decompressed buffers, which would speed up subsequent reads significantly.
We could also enable persistent caching, so that users can close their remote datalake querying session, and pick it back up later without such a high initial cost of reading files over the network.
Finally, we could consider integrating this into the CSV/JSON reader as well, so that users can enjoy the same benefits for those file formats as well.
Lots of work to do still, but I think it's best to send this PR now to receive some feedback before going all-in on all kinds of optimizations.
EDIT: I've made it so we also cache the last 8 bytes of Parquet files (footer_length + "PAR1"), so this only opens the file handle when queried for the second time, all else is cached, reducing the time of the second read of the remote Parquet file down from 0.78s to 0.66s.