Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


### Fixed

- Prevent data lost of unsynchronized files in case of power failure for unfinished blocks, [PR-901](

## [1.16.1] - 2025-08-09

### Fixed
Expand Down
4 changes: 4 additions & 0 deletions reductstore/src/core/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl<K: Eq + Hash + Clone, V> Cache<K, V> {
self.store.keys().collect()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
self.store.iter_mut().map(|(k, v)| (k, &mut v.value))
}

fn discard_old_descriptors(&mut self) -> Vec<(K, V)> {
// remove old descriptors
let mut removed = Vec::new();
Expand Down
184 changes: 165 additions & 19 deletions reductstore/src/core/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@
// Licensed under the Business Source License 1.1

use crate::core::cache::Cache;
use log::{debug, warn};
use reduct_base::error::ReductError;
use reduct_base::internal_server_error;
use std::fs::{remove_dir_all, remove_file, rename, File};
use std::io::{Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::{Arc, LazyLock, RwLock, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, LazyLock, RwLock, RwLockWriteGuard, Weak};
use std::thread::spawn;
use std::time::Duration;

const FILE_CACHE_MAX_SIZE: usize = 1024;
const FILE_CACHE_TIME_TO_LIVE: Duration = Duration::from_secs(60);

pub(crate) static FILE_CACHE: LazyLock<FileCache> =
LazyLock::new(|| FileCache::new(FILE_CACHE_MAX_SIZE, FILE_CACHE_TIME_TO_LIVE));
const FILECACHE_SYNC_INTERVAL: Duration = Duration::from_millis(100);

pub(crate) static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
FileCache::new(
FILE_CACHE_MAX_SIZE,
FILE_CACHE_TIME_TO_LIVE,
FILECACHE_SYNC_INTERVAL,
)
});

pub(crate) struct FileWeak {
file: Weak<RwLock<File>>,
Expand Down Expand Up @@ -48,15 +58,19 @@ enum AccessMode {
struct FileDescriptor {
file_ref: FileRc,
mode: AccessMode,
synced: bool,
}

/// A cache to keep file descriptors open
///
/// This optimization is needed for network file systems because opening
/// and closing files for writing causes synchronization overhead.
///
/// Additionally, it periodically syncs files to disk to ensure data integrity.
#[derive(Clone)]
pub(crate) struct FileCache {
cache: Arc<RwLock<Cache<PathBuf, FileDescriptor>>>,
stop_sync_worker: Arc<AtomicBool>,
}

impl FileCache {
Expand All @@ -66,9 +80,50 @@ impl FileCache {
///
/// * `max_size` - The maximum number of file descriptors to keep open
/// * `ttl` - The time to live for a file descriptor
pub fn new(max_size: usize, ttl: Duration) -> Self {
/// * `sync_interval` - The interval to sync files from cache to disk
pub fn new(max_size: usize, ttl: Duration, sync_interval: Duration) -> Self {
let cache = Arc::new(RwLock::new(Cache::<PathBuf, FileDescriptor>::new(
max_size, ttl,
)));
let cache_clone = Arc::clone(&cache);
let stop_sync_worker = Arc::new(AtomicBool::new(false));
let stop_sync_worker_clone = Arc::clone(&stop_sync_worker);

spawn(move || {
// Periodically sync files from cache to disk
while !stop_sync_worker.load(Ordering::Relaxed) {
std::thread::sleep(sync_interval);
Self::sync_rw_and_unused_files(&cache);
}
});

FileCache {
cache: Arc::new(RwLock::new(Cache::new(max_size, ttl))),
cache: cache_clone,
stop_sync_worker: stop_sync_worker_clone,
}
}

fn sync_rw_and_unused_files(cache: &Arc<RwLock<Cache<PathBuf, FileDescriptor>>>) {
let mut cache = cache.write().unwrap();

for (path, file_desc) in cache.iter_mut() {
// Sync only writeable files that are not synced yet
// and are not used by other threads
if file_desc.mode != AccessMode::ReadWrite
|| file_desc.synced
|| Arc::strong_count(&file_desc.file_ref) > 1
|| Arc::weak_count(&file_desc.file_ref) > 0
{
continue;
}

if let Err(err) = file_desc.file_ref.write().unwrap().sync_all() {
warn!("Failed to sync file {}: {}", path.display(), err);
continue;
}

debug!("File {} synced to disk", path.display());
file_desc.synced = true; // Mark as synced after successful sync
}
}

Expand All @@ -91,12 +146,11 @@ impl FileCache {
} else {
let file = File::options().read(true).open(path)?;
let file = Arc::new(RwLock::new(file));
cache.insert(
Self::save_in_cache_and_sync_discarded(
path.clone(),
FileDescriptor {
file_ref: Arc::clone(&file),
mode: AccessMode::Read,
},
&mut cache,
&file,
AccessMode::Read,
);
file
};
Expand Down Expand Up @@ -124,13 +178,14 @@ impl FileCache {
let mut cache = self.cache.write()?;

let file = if let Some(desc) = cache.get_mut(path) {
desc.synced = false;

if desc.mode == AccessMode::ReadWrite {
Arc::clone(&desc.file_ref)
} else {
let rw_file = File::options().write(true).read(true).open(path)?;
desc.file_ref = Arc::new(RwLock::new(rw_file));
desc.mode = AccessMode::ReadWrite;

Arc::clone(&desc.file_ref)
}
} else {
Expand All @@ -140,12 +195,11 @@ impl FileCache {
.read(true)
.open(path)?;
let file = Arc::new(RwLock::new(file));
cache.insert(
Self::save_in_cache_and_sync_discarded(
path.clone(),
FileDescriptor {
file_ref: Arc::clone(&file),
mode: AccessMode::ReadWrite,
},
&mut cache,
&file,
AccessMode::ReadWrite,
);
file
};
Expand Down Expand Up @@ -178,7 +232,6 @@ impl FileCache {
pub fn remove(&self, path: &PathBuf) -> Result<(), ReductError> {
let mut cache = self.cache.write()?;
cache.remove(path);

if path.try_exists()? {
remove_file(path)?;
}
Expand All @@ -195,6 +248,12 @@ impl FileCache {
Ok(())
}

/// Discards all files in the cache that are under the specified path.
///
/// This function iterates through the cache and removes all file descriptors
/// whose paths start with the specified `path`. If a file is in read-write mode
/// and has not been synced, it attempts to sync the file before removing it from the cache.
///
pub fn discard_recursive(&self, path: &PathBuf) -> Result<(), ReductError> {
let mut cache = self.cache.write()?;
let files_to_remove = cache
Expand All @@ -203,9 +262,18 @@ impl FileCache {
.filter(|file_path| file_path.starts_with(path))
.map(|file_path| (*file_path).clone())
.collect::<Vec<PathBuf>>();

for file_path in files_to_remove {
cache.remove(&file_path);
if let Some(file) = cache.remove(&file_path) {
// If the file is in read-write mode and not synced, we need to sync it before removing from cache
if file.mode == AccessMode::ReadWrite && !file.synced {
if let Err(err) = file.file_ref.write()?.sync_all() {
warn!("Failed to sync file {}: {}", file_path.display(), err);
}
}
}
}

Ok(())
}

Expand All @@ -228,6 +296,37 @@ impl FileCache {
rename(old_path, new_path)?;
Ok(())
}

/// Saves a file descriptor in the cache and syncs any discarded files.
///
/// We need to make sure that we sync all files that were discarded
fn save_in_cache_and_sync_discarded(
path: PathBuf,
cache: &mut RwLockWriteGuard<Cache<PathBuf, FileDescriptor>>,
file: &Arc<RwLock<File>>,
mode: AccessMode,
) -> () {
let discarded_files = cache.insert(
path.clone(),
FileDescriptor {
file_ref: Arc::clone(file),
mode,
synced: false,
},
);

for (_, file) in discarded_files {
if let Err(err) = file.file_ref.write().unwrap().sync_all() {
warn!("Failed to sync file {}: {}", path.display(), err);
}
}
}
}

impl Drop for FileCache {
fn drop(&mut self) {
self.stop_sync_worker.store(true, Ordering::Relaxed);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -383,9 +482,56 @@ mod tests {
assert_eq!(tmp_dir.exists(), false);
}

mod sync_rw_and_unused_files {
use super::*;

#[rstest]
fn test_sync_unused_files(cache: FileCache, tmp_dir: PathBuf) {
let file_path = tmp_dir.join("test_sync_rw_and_unused_files.txt");
{
let _file_ref = cache
.write_or_create(&file_path, SeekFrom::Start(0))
.unwrap()
.upgrade()
.unwrap();

assert!(
!cache.cache.write().unwrap().get(&file_path).unwrap().synced,
"File should not be synced initially"
);
}

// Wait for the sync worker to run
sleep(Duration::from_millis(150));

assert!(
cache.cache.write().unwrap().get(&file_path).unwrap().synced,
"File should be synced after sync worker runs"
);
}

#[rstest]
fn test_not_sync_used_files(cache: FileCache, tmp_dir: PathBuf) {
let file_path = tmp_dir.join("test_not_sync_unused_files.txt");
let _file_ref = cache
.write_or_create(&file_path, SeekFrom::Start(0))
.unwrap()
.upgrade()
.unwrap();

// Wait for the sync worker to run
sleep(Duration::from_millis(150));

assert!(
!cache.cache.write().unwrap().get(&file_path).unwrap().synced,
"File should not be synced after sync worker runs"
);
}
}

#[fixture]
fn cache() -> FileCache {
FileCache::new(2, Duration::from_millis(100))
FileCache::new(2, Duration::from_millis(100), Duration::from_millis(100))
}

#[fixture]
Expand Down
Loading