-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Implement S3 snapshot manager #4150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement S3 snapshot manager #4150
Conversation
Implementation is done. I am going to add unitests/integration test from now. |
lib/collection/Cargo.toml
Outdated
# AWS | ||
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } | ||
aws-sdk-s3 = "1.24.0" | ||
aws-smithy-types = "1.1.8" | ||
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the official AWS SDK for S3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for that?
#[derive(Clone, Deserialize, Debug, Default)] | ||
pub struct SnapShotsConfig { | ||
pub snapshots_storage: SnapshotsStorageConfig, | ||
pub s3_config: Option<S3Config>, | ||
} | ||
|
||
#[derive(Clone, Debug, Default)] | ||
pub enum SnapshotsStorageConfig { | ||
#[default] | ||
Local, | ||
S3, | ||
} | ||
|
||
impl<'de> Deserialize<'de> for SnapshotsStorageConfig { | ||
fn deserialize<D>(deserializer: D) -> Result<SnapshotsStorageConfig, D::Error> | ||
where | ||
D: serde::Deserializer<'de>, | ||
{ | ||
let s: String = Deserialize::deserialize(deserializer)?; | ||
match s.as_str() { | ||
"local" => Ok(SnapshotsStorageConfig::Local), | ||
"s3" => Ok(SnapshotsStorageConfig::S3), | ||
_ => Err(serde::de::Error::custom( | ||
"Invalid snapshots_storage. Use 'local' or 's3'", | ||
)), | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are for deserialization of config yaml
const CHUNK_SIZE: u64 = 1024 * 1024 * 5; | ||
const MAX_CHUNKS: u64 = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed official example as below.
This probably should be on config but i wasn't sure.
key.map(|k| k.trim_start_matches("./").to_string()) | ||
} | ||
|
||
pub async fn multi_part_upload( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used this official example code as the reference.
…test-for-s3-snapshots
🎉🎈 @kemkemG0 has been awarded $200! 🎈🎊 |
A few things we would need to finalize this PR:
My suggestion is to stream files from either local or S3 as an
In order to do that, we would need to implement something like
Please note that the main issue bounty should still be paid once this PR is merged |
@generall Nevermind, it was just an internal thing and we just need to call common |
…nload_snapshot function
Fix bugs, adding `flush()` after `write_all()`
|
Use Streaming instead of Downloading snapshots
@@ -254,3 +310,35 @@ jobs: | |||
trap 'kill $(jobs -p) &>/dev/null || :' EXIT | |||
sleep 10 | |||
./tests/shard-snapshot-api.sh test-all | |||
|
|||
test-shard-snapshot-api-s3-minio: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add shard snapshot API integration test for s3 version.
|
||
pub struct SnapShotStreamLocalFS { | ||
pub snapshot_path: PathBuf, | ||
pub req: HttpRequest, | ||
} | ||
pub struct SnapShotStreamCloudStrage { | ||
pub streamer: | ||
std::pin::Pin<Box<dyn Stream<Item = Result<bytes::Bytes, object_store::Error>> + Send>>, | ||
} | ||
|
||
pub enum SnapshotStream { | ||
LocalFS(SnapShotStreamLocalFS), | ||
CloudStorage(SnapShotStreamCloudStrage), | ||
} | ||
|
||
impl Responder for SnapshotStream { | ||
type Body = actix_web::body::BoxBody; | ||
|
||
fn respond_to(self, _: &actix_web::HttpRequest) -> HttpResponse<Self::Body> { | ||
match self { | ||
SnapshotStream::LocalFS(stream) => match NamedFile::open(stream.snapshot_path) { | ||
Ok(file) => file.into_response(&stream.req), | ||
Err(e) => match e.kind() { | ||
std::io::ErrorKind::NotFound => { | ||
HttpResponse::NotFound().body(format!("File not found: {}", e)) | ||
} | ||
_ => HttpResponse::InternalServerError() | ||
.body(format!("Failed to open file: {}", e)), | ||
}, | ||
}, | ||
|
||
SnapshotStream::CloudStorage(stream) => HttpResponse::Ok() | ||
.content_type("application/octet-stream") | ||
.streaming(stream.streamer), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add SnspShotStream
for downloading snapshot with streaming
@generall |
* Add SnapshotsStorageConfig enum(Local or S3) and deserialize implementation * [refactor] use snapshots_config instead of s3_config * update config * add AWS official`aws-sdk-s3` * implement store_file() WITHOUT error handling * implement list_snapshots * implement delete_snapshot * run `cargo +nightly fmt` * delete println * implement get_stored_file * Add error handlings * Refactor AWS S3 configuration and error handling * fix bugs * create an empty test file * fix `alias_test.rs` for StorageConfig type * tempolary delete some test and try s3 test * Update integration-tests.yml to use snap instead of apt-get for installing yq * Update integration-tests.yml to use sudo when installing yq * add sudo * make (full/non-full) snapshots downloadable * debug * small fix * Add S3 endpoint URL configuration option * fix * fix * debug * fix endpoint * update to http://127.0.0.1:9000/ * update * fix * fix `#[get("/collections/{collection}/shards/{shard}/snapshots/{snapshot}")]` for s3 * put original tests back * refactor * small fix (delete println & echo) * use object_store and refactor * create snapshot_storage_ops and implement * Refactor get_appropriate_chunk_size function to adjust chunk size based on service limits and file size * cargo +nightly fmt --all * make it more abstract * Refactor SnapshotsStorageConfig deserialization in SnapShotsConfig * small update * small fix * Update dependencies in Cargo.lock * Update minio image to satantime/minio-server * Refactor snapshot storage paths in snapshots_manager.rs and snapshot_storage_ops.rs * Fix issue with downloaded file size not matching expected size in download_snapshot function * add flush * Use Streaming instead of donloading once * apply `cargo +nightly fmt --all` * Fix issue with opening file in SnapshotStream::LocalFS variant * Fix error handling in SnapshotStream::LocalFS variant * Add integration test for Shard Snapshot API with S3 storage (#7)
What I changed
I have added theaws-sdk-s3
to the collection for managing operations on S3. I chose this SDK because it is the official one and the version 1.24.0 is well-established and stable. The implementation was based on the official sample code found at https://github.com/awslabs/aws-sdk-rust/tree/main/examples/examples/s3/src/bin.Removed aws_s3_rust and replaced it with object_store. This change allows the same abstract code to be used with S3, GCS, and Azure Storage. Currently, the implementation is only for S3, but it can be easily extended to support other services.
Next, I implemented the S3 functionalities in the existingsnapshots_manager.rs
. The necessary functions usingaws-sdk-s3
were implemented insnapshots_s3_ops.rs
and called from there.Deleted the previously created snapshot_s3_ops.rs and introduced a more abstract snapshot_storage_ops.rs.
One challenging aspect, different from what was initially expected, involved operations like delete and download of snapshots. The process usedget_snapshot_path
for path verification followed by delete and download actions, which invariably led to errors when the paths did not exist locally on S3.To address this, I implementedget_s3_snapshot_path
and used a match statement to handle different scenarios.The same changes have been made in four places: delete and download of snapshots, and delete and download of full snapshots. (Note: Additional changes were also necessary for downloading shards.)Addressed a review comment by removing S3-specific functions such as get_s3_snapshot_path and get_full_s3_snapshot_path. Replaced them with more abstract functions get_snapshot_path and get_full_snapshot_path located in snapshot_storage_ops.rs to enhance abstraction.
The use of object_store has resulted in a reduced build size.
Test Modifications
tests/snapshots/snapshots-recovery.sh
was modified to allow switching betweenlocal
ands3
based on the input arguments. For S3 tests, the config file is automatically modified using the yq command before execution.✅ I have verified that all tests pass on my forked repository.
resolve: #4109
/claim #4109
All Submissions:
dev
branch. Did you create your branch fromdev
?New Feature Submissions:
cargo +nightly fmt --all
command prior to submission?cargo clippy --all --all-features
command?Changes to Core Features: