Skip to content

Conversation

lnkuiper
Copy link
Contributor

@lnkuiper lnkuiper commented Feb 28, 2025

This PR implements an in-memory cache for external files (e.g., Parquet/CSV/JSON) that caches reads in memory to speed up subsequent reads. It is enabled by default, but can be disabled with SET enable_external_file_cache=false;.

CachingFileSystem

The main implementation is in CachingFileSystem, a class that (sort-of) wraps FileSystem, but with a slightly different API. Instead reading a byte range from a file like so:

void Read(void *buffer, idx_t nr_bytes, idx_t location);

Which reads nr_bytes from location into bufffer, which must be pre-allocated to fit nr_bytes, we instead read like so:

BufferHandle Read(data_ptr_t &buffer, idx_t nr_bytes, idx_t location);

Which returns a BufferHandle to a managed buffer holding the requested nr_bytes from location, and buffer is set to point into it. This entails that we can (in theory) do zero-copy reads if the data is already cached in memory.

This is currently integrated into the Parquet reader, but only when prefetching is enabled (which is the default remote reads, but can be enabled for local Parquet files with SET prefetch_all_parquet_files=true;).

Reads have a size of nr_bytes, and are identified by path and location. If the same byte range is (partially) queried twice, we first try to read from the in-memory cache. If a larger byte range fully encompasses a smaller byte range (e.g., when a full row group is queried after first querying a single column), the smaller byte range is removed in favor of the larger byte range. The smaller byte range is used to construct the larger byte range.

Cache Invalidation

Because the buffers holding the byte ranges are in-memory only (for now), they might become invalid if the buffer manager decides to evict them. If an invalid buffer is encountered when trying to read from the corresponding file, it is removed from the cache.

Another reason for cache entries becoming invalid is due to files being modified. If we detect that a file has been modified, all of the byte ranges of that file are removed from the cache. We detect this for remote files using the HTTP Etag field.

For local files, we use the "last modified" timestamp. For some file systems, the resolution of this timestamp is in nanoseconds, which is great, since reads/writes are so fast nowadays, we really need that to be sure that files have not been modified in the meantime. However, some file systems use a resolution of seconds, and our implementation should work regardless of the underlying file system and hardware. So, we require the last modified time to be more than two seconds in the past for the cache entry to be considered valid.

Example: Remote File

The most compelling example is for remote reads. For example:

D .timer on
D .mode trash
D from 's3://duckdb-blobs/data/shakespeare.parquet';
Run Time (s): real 1.826 user 0.042848 sys 0.024513
D from 's3://duckdb-blobs/data/shakespeare.parquet';
Run Time (s): real 0.661 user 0.043247 sys 0.005002

As we can see, the second read is much faster than the initial read. In theory, it could be just a few milliseconds, but the second query still checks the Etag to see if the file was modified in between queries. We may add a config setting to skip checking this, but this can of course lead to very confusing errors when parts of the file are cached, and other are not, but the file has been changed in between queries.

The external file cache can be queried to check which files and byte ranges are currently in the cache:

D .mode duckbox
D from duckdb_external_file_cache();
┌────────────────────────────────────────────┬──────────┬──────────┬─────────┐
│                    path                    │ nr_bytes │ location │ loaded  │
│                  varchar                   │  int64   │  int64   │ boolean │
├────────────────────────────────────────────┼──────────┼──────────┼─────────┤
│ s3://duckdb-blobs/data/shakespeare.parquet16974834 │ true    │
│ s3://duckdb-blobs/data/shakespeare.parquet6971697487 │ true    │
└────────────────────────────────────────────┴──────────┴──────────┴─────────┘

As we can see, there are two byte ranges in cache: a small one for the footer, and a larger one for the single row group that is in the file.

External file cache entries also show up under their own tag in duckdb_memory():

D from duckdb_memory() where tag = 'EXTERNAL_FILE_CACHE';
┌─────────────────────┬────────────────────┬─────────────────────────┐
│         tag         │ memory_usage_bytes │ temporary_storage_bytes │
│       varchar       │       int64        │          int64          │
├─────────────────────┼────────────────────┼─────────────────────────┤
│ EXTERNAL_FILE_CACHE │      17039360            │
└─────────────────────┴────────────────────┴─────────────────────────┘

Example: Local File

I have the TPC-H lineitem table at SF10 in a local Parquet file (lineitem10.parquet).
When I run the following query on current main

select any_value(columns(*)) from 'lineitem10.parquet';

I get an execution time of ~0.44s, and SET prefetch_all_parquet_files=true; does not seem to make much of a difference.

On this branch, with SET prefetch_all_parquet_files=true;, I get an execution time of ~0.46s on the first read, and an execution time of ~0.39s on subsequent reads.

So, there seems to be a slight overhead from enabling the cache on the first read (most likely due to all threads inserting/reading from the cache in parallel, requiring some amount of locking), but subsequent reads are a bit faster.

Future Work

In the future, I'd like to enable this not only for prefetch reads, but for almost all Parquet reads. Currently, this would cause overhead, since the Parquet reader copies the data from the cached buffer into a local buffer. However, this can (and should) be entirely zero-copy. We can also cache decompressed buffers, which would speed up subsequent reads significantly.

We could also enable persistent caching, so that users can close their remote datalake querying session, and pick it back up later without such a high initial cost of reading files over the network.

Finally, we could consider integrating this into the CSV/JSON reader as well, so that users can enjoy the same benefits for those file formats as well.

Lots of work to do still, but I think it's best to send this PR now to receive some feedback before going all-in on all kinds of optimizations.

EDIT: I've made it so we also cache the last 8 bytes of Parquet files (footer_length + "PAR1"), so this only opens the file handle when queried for the second time, all else is cached, reducing the time of the second read of the remote Parquet file down from 0.78s to 0.66s.

@lnkuiper lnkuiper requested a review from samansmink February 28, 2025 10:07
@carlopi
Copy link
Contributor

carlopi commented Feb 28, 2025

In the from 's3://duckdb-blobs/data/shakespeare.parquet'; example, how many times is the Etag checked per query?
I would check properly how many HEAD / GET are performed with / without cache.

I would assume we could to ideally only 1 request per query when stuff is cached.
This is relevant if there are N ranges instead of only 1.

Other question: what happens in the evil case of ranges like:
0-1000, 1-1001, 2-1002, etc?

This might blow up the cache / add complexity.

We do have a somewhat similar logic in duckdb-wasm to handle complex cases, I can later check and compare with that (Wasm piggyback on the browser, so in theory could do better).

@Tishj
Copy link
Contributor

Tishj commented Feb 28, 2025

This entails that we can (in theory) do zero-copy reads if the data is already cached in memory.

My knee-jerk reaction to this is: can't this result in problems when this data is accidentally modified? Are we doing something to prevent this? I don't think BufferHandle enforces that the underlying memory is const.

EDIT:
As we talked about, the debug checksums solve this, you can ignore this comment 👍

@carlopi
Copy link
Contributor

carlopi commented Feb 28, 2025

Also, I believe you are bumping into #16177, my suspicion is that disabling delta might solve this.

@lnkuiper
Copy link
Contributor Author

@carlopi The Etag is checked once for every HTTPFileHandle, which (I think) is once per thread. I think it's not caching the PAR1 and footer length yet, which it really should.

As for the evil byte ranges, if we merge non-overlapping byte ranges, that will cause overhead. I think with the prefetching mechanism in Parquet it should be OK since the byte ranges it fetches are pretty big.

Would be cool if we can get close to WASM speeds.

@rudolfix
Copy link

@lnkuiper this is so cool! will it work with extensions automatically? ie. delta/iceberg scanners?

Copy link
Contributor

@samansmink samansmink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks amazing @lnkuiper!

@@ -56,7 +56,7 @@ struct ParquetReaderScanState {
vector<idx_t> group_idx_list;
int64_t current_group;
idx_t group_offset;
unique_ptr<FileHandle> file_handle;
unique_ptr<CachingFileHandle> file_handle;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether this needs to be aware that its holding a CachingFileHandle instead of a base FileHandle.
Could the VirtualFileSystem be able to hand out CachingFileHandles based on the flags passed to OpenFile?

Ideally we would also have a way to have the filesystems be able to use the cache without the calling code aware of it. As discussed in person already, this would break the zero-copy side of things, but that might still be worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely.

Just a recap of our discussion: This API implemented in this PR requires us to modify IO code in the Parquet extension (and anywhere else where we would like to use this). Ideally, this would not be the case. However, if we want to enable caching without changing the API, this would create a double copy (memcpy overhead, and briefly, a 2x of memory usage).

I've implemented it like this to prevent this overhead, and to eventually allow zero-copy reads. However, we should easily be able to build an API on top of this that caches data implicitly using the tried-and-tested API of FileSystem, allowing for easy integration in, e.g., the Avro extension.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be nice to consider the fact that we may be holding different types of caches in a unified interface. We currently already have things like:

  • http metadata cache
  • azure client cache
  • parquet object cache

which all would benefit from a coherent way to:

  • inspect the cache entries
  • purge cache entries (could even be a SQL statement perhaps?DELETE FROM duckdb_cache_entries)
  • enforce global limits on caching
  • configure eviction strategies / priorities on different caches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this internal issue/discussion is relevant here https://github.com/duckdblabs/duckdb-internal/issues/1833

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea. If we abstract it such that for every cache we have some generic function to request some metadata (JSON?) about the current state of the cache, we could query all registered caches using a single table function.

@JelteF
Copy link
Contributor

JelteF commented Feb 28, 2025

Really cool work!

We could also enable persistent caching, so that users can close their remote datalake querying session, and pick it back up later without such a high initial cost of reading files over the network.

I wanted to add that such a persistent cache would be super useful for pg_duckdb. For it to work in pg_duckdb it'd be important that the cache would allow concurrent access to the same persistent cache from different duckdb processes, because in pg_duckdb each Postgres connection (i.e. process) has its own in-memory dedicated duckdb instance.

@djouallah
Copy link

we need to protect @lnkuiper at all cost :) let him merge his first version and then we can ask him for more :)

@lnkuiper
Copy link
Contributor Author

lnkuiper commented Mar 3, 2025

@Tishj Indeed, we compute a checksum in DEBUG mode to prevent things like this from happening.

@rudolfix Delta/Iceberg should benefit as these use our Parquet extension, in which this is integrated. Iceberg also uses Avro scans, which do not benefit yet, but we can look into this in the future.

@JelteF Definitely. We want to develop this into a persistent cache in the future, but will refine the in-memory version first.

@duckdb-draftbot duckdb-draftbot marked this pull request as draft April 14, 2025 09:02
@lnkuiper lnkuiper marked this pull request as ready for review April 14, 2025 11:28
@duckdb-draftbot duckdb-draftbot marked this pull request as draft April 14, 2025 13:16
@lnkuiper lnkuiper marked this pull request as ready for review April 14, 2025 14:04
@duckdb-draftbot duckdb-draftbot marked this pull request as draft April 15, 2025 06:54
@lnkuiper lnkuiper marked this pull request as ready for review April 15, 2025 07:03
@duckdb-draftbot duckdb-draftbot marked this pull request as draft April 15, 2025 13:16
@lnkuiper lnkuiper marked this pull request as ready for review April 15, 2025 13:22
@Mytherin Mytherin merged commit 9e7fedd into duckdb:main Apr 16, 2025
48 checks passed
@Mytherin
Copy link
Collaborator

Thanks!

@nicornk
Copy link
Contributor

nicornk commented Apr 19, 2025

@lnkuiper are you planning a follow up PR to make it configurable to perform the cache invalidation ETag HEAD requests? For immutable parquet files e.g. in iceberg this would improve the performance a lot.

@lnkuiper
Copy link
Contributor Author

Hi @nicornk, this will come in a follow-up PR. We're still debating the best way to do this, but I think we'll use the new ExtendedOpenFileInfo in OpenFileInfo that was recently added.

@djouallah
Copy link

djouallah commented May 15, 2025

@lnkuiper 2.6 x, improvement, not bad at all !!!
image

Mytherin added a commit that referenced this pull request May 15, 2025
This PR follows up on #15119 with:
- Switching the HTTP Logger to use the new logging infrastructure by
default (logging to a file is still possible using old setting)
- Adds a new mechanism for FileHandle logging
- Refactoring the loggers to support structured logging

My apologies for the slightly chaotic PR here. It took a few tries to
understand what makes sense here and I still feel like we are not 100%
there yet. However, this *should* be a decent step in the right
direction.

## FileHandle logging

This PR adds logging infra for filehandles. Basically what I wanted is
to create a unified way to log of the FileSystem APIs. I think this can
be very useful, for example in testing/optimizing Iceberg/Delta
workloads, but also to easily analyze IO patterns of our various readers
like the parquet reader, json reader, or avro readers. Since the log
messages will be in JSON format, they are easy to parse and do analysis
on.

Heres a demo:

```
D set enable_logging=true;
D set logging_level='trace';
D COPY (SELECT 1 as a) TO './test.csv';
D FROM "./test.csv";
┌───────┐
│   a   │
│ int64 │
├───────┤
│   1   │
└───────┘
D SELECT message FROM duckdb_logs WHERE type = 'FileSystem';
┌───────────────────────────────────────────────────────────────────────────┐
│                                  message                                  │
│                                  varchar                                  │
├───────────────────────────────────────────────────────────────────────────┤
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"OPEN"}                  │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"2"}     │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"1"}     │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"WRITE","bytes":"1"}     │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"CLOSE"}                 │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"OPEN"}                  │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"READ","bytes":"4"}      │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"READ","bytes":"0"}      │
│ {"fs":"LocalFileSystem","path":"./test.csv","op":"CLOSE"}                 │
└───────────────────────────────────────────────────────────────────────────┘
```

### Implementation
While conceptually simple, the complexity lies in the fact that we want
to generally log to the ClientContext to ensure the log entries have
connection level context info. This is done by changing what was
previously a `unique_ptr<Logger>` to a `shared_ptr<Logger>` in the
Client context and copying the pointer to the logger into the
filehandles on creation. What this means is that a filehandle will log
using the logging configuration of the client context that created it.
because for now filehandles will not outlive the clientcontext that
created it, this is fine, but in theory this can create some confusion
if we were to share filehandles between connections or even between
queries. I think we probably just want to ensure we don't keep
filehandles open between queries.

I've created some macros for standardization of the common filehandle
operations that we want to log:
```C++
#define DUCKDB_LOG_FILE_SYSTEM_READ(HANDLE, ...)  DUCKDB_LOG_FILE_HANDLE_VA(HANDLE, "READ", __VA_ARGS__);
#define DUCKDB_LOG_FILE_SYSTEM_WRITE(HANDLE, ...) DUCKDB_LOG_FILE_HANDLE_VA(HANDLE, "WRITE", __VA_ARGS__);
#define DUCKDB_LOG_FILE_SYSTEM_OPEN(HANDLE)       DUCKDB_LOG_FILE_HANDLE(HANDLE, "OPEN");
#define DUCKDB_LOG_FILE_SYSTEM_CLOSE(HANDLE)      DUCKDB_LOG_FILE_HANDLE(HANDLE, "CLOSE");

```

Then in the code, for example in the `LocalFileSystem::Read()` we can
easily call the logger in an efficient way with the centrally defined
log type and level for the FileHandle logging:

```C++
DUCKDB_LOG_FILE_SYSTEM_READ(handle, bytes_to_read, location);

```

which will ensure that extension that implement their own filesystems
will be able to easily adhere to the logging convention.

## Logging refactor
To neatly support writing structured log messages, this PR makes a few
conceptual overhauls to the logging.

First of all, we remove the log types as manually passed string. In the
initial logging implementation, you would log like:

```C++
DUCKDB_LOG_<level>(context, "<log type string>", "message");
```
However these manually provided strings are not really practical and
likely to result in utter chaos.

We change this now to:

```C++
// Default log type (empty string for now)
DUCKDB_LOG_<level>(context, "message");

// Predefined log type
DUCKDB_LOG(context, LogTypeClassName, "message");
```

The `DUCKDB_LOG` macro is defined as 

```C++
DUCKDB_LOG_INTERNAL(SOURCE, LOG_TYPE_CLASS::NAME, LOG_TYPE_CLASS::LEVEL, LOG_TYPE_CLASS::ConstructLogMessage(__VA_ARGS__))
```

Which will ensure that the logs types can only be created using the
corresponding log message construction methods. The `LogType` class will
then also contain the logic to deserialize the log message string into a
predefined datatype for easy parsing.

What this allows us to do is to easily enable a specific log type, and
let DuckDB automatically deserialize and unnest the resulting data:

```SQL
PRAGMA enable_logging('FileSystem');
FROM './test.csv'
SELECT fs, path, bytes, pos FROM duckdb_logs_parsed('FileSystem');
```
which yields:
```
LocalFileSystem	test.csv	OPEN	NULL	NULL
LocalFileSystem	test.csv	READ	4	0
LocalFileSystem	test.csv	READ	0	4
LocalFileSystem	test.csv	CLOSE	NULL	NULL
```

Note that `duckdb_logs_parsed` is simply a table macro for:

```SQL
SELECT * EXCLUDE (message), UNNEST(parse_duckdb_log_message(log_type, message))
FROM duckdb_logs
WHERE type = log_type
```

## TODOs
Some general follow ups:
- Add logging to azure
- Add logging to fsspec 
- Add logging to #16463
- More benchmarking
- Make HTTPLogType a structured type

A more complex follow up is to think about how to make this performant.
Currently, enabling `PRAGMA enable_logging('FileSystem');` coulld become
quite expensive be cause it needs to string compare the the logging type
for every single `TRACE` level call. For now we actually get away with
it because we only add the logger to the filehandle if that the time of
filehandle creation we are logging for `FileSystem`. However, as soon as
we were to add more Trace level calls that are hit a lot during
execution, things will likely slow down fast.
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request May 18, 2025
External File Cache (duckdb/duckdb#16463)
[Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request May 18, 2025
External File Cache (duckdb/duckdb#16463)
[Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request May 19, 2025
External File Cache (duckdb/duckdb#16463)
[Dev] Add "registries" to `vcpkg.json`, add script to list the packages of the registry. (duckdb/duckdb#17124)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants