Skip to content

Conversation

kemkemG0
Copy link
Contributor

@kemkemG0 kemkemG0 commented May 1, 2024

What I changed

  • I have added the aws-sdk-s3 to the collection for managing operations on S3. I chose this SDK because it is the official one and the version 1.24.0 is well-established and stable. The implementation was based on the official sample code found at https://github.com/awslabs/aws-sdk-rust/tree/main/examples/examples/s3/src/bin.

  • Removed aws_s3_rust and replaced it with object_store. This change allows the same abstract code to be used with S3, GCS, and Azure Storage. Currently, the implementation is only for S3, but it can be easily extended to support other services.

  • Next, I implemented the S3 functionalities in the existing snapshots_manager.rs. The necessary functions using aws-sdk-s3 were implemented in snapshots_s3_ops.rs and called from there.

  • Deleted the previously created snapshot_s3_ops.rs and introduced a more abstract snapshot_storage_ops.rs.

  • One challenging aspect, different from what was initially expected, involved operations like delete and download of snapshots. The process used get_snapshot_path for path verification followed by delete and download actions, which invariably led to errors when the paths did not exist locally on S3.

  • To address this, I implemented get_s3_snapshot_path and used a match statement to handle different scenarios.

  • The same changes have been made in four places: delete and download of snapshots, and delete and download of full snapshots. (Note: Additional changes were also necessary for downloading shards.)

  • Addressed a review comment by removing S3-specific functions such as get_s3_snapshot_path and get_full_s3_snapshot_path. Replaced them with more abstract functions get_snapshot_path and get_full_snapshot_path located in snapshot_storage_ops.rs to enhance abstraction.

  • The use of object_store has resulted in a reduced build size.

Test Modifications

  • The test script tests/snapshots/snapshots-recovery.sh was modified to allow switching between local and s3 based on the input arguments. For S3 tests, the config file is automatically modified using the yq command before execution.

✅ I have verified that all tests pass on my forked repository.

resolve: #4109

/claim #4109

All Submissions:

  • Contributions should target the dev branch. Did you create your branch from dev?
  • Have you followed the guidelines in our Contributing document?
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?

New Feature Submissions:

  1. Does your submission pass tests?
  2. Have you formatted your code locally using cargo +nightly fmt --all command prior to submission?
  3. Have you checked your code using cargo clippy --all --all-features command?

Changes to Core Features:

  • Have you added an explanation of what your changes do and why you'd like us to include them?
  • Have you written new tests for your core changes, as applicable?
  • Have you successfully ran tests with your changes locally?

@kemkemG0 kemkemG0 marked this pull request as ready for review May 1, 2024 06:12
@kemkemG0 kemkemG0 changed the title Implement S3 snapshot manager [WIP] Implement S3 snapshot manager May 1, 2024
@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 1, 2024

Implementation is done.

I am going to add unitests/integration test from now.

Comment on lines 78 to 82
# AWS
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.24.0"
aws-smithy-types = "1.1.8"
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the official AWS SDK for S3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for that?

Comment on lines 16 to 44
#[derive(Clone, Deserialize, Debug, Default)]
pub struct SnapShotsConfig {
pub snapshots_storage: SnapshotsStorageConfig,
pub s3_config: Option<S3Config>,
}

#[derive(Clone, Debug, Default)]
pub enum SnapshotsStorageConfig {
#[default]
Local,
S3,
}

impl<'de> Deserialize<'de> for SnapshotsStorageConfig {
fn deserialize<D>(deserializer: D) -> Result<SnapshotsStorageConfig, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.as_str() {
"local" => Ok(SnapshotsStorageConfig::Local),
"s3" => Ok(SnapshotsStorageConfig::S3),
_ => Err(serde::de::Error::custom(
"Invalid snapshots_storage. Use 'local' or 's3'",
)),
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are for deserialization of config yaml

Comment on lines 32 to 33
const CHUNK_SIZE: u64 = 1024 * 1024 * 5;
const MAX_CHUNKS: u64 = 10000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key.map(|k| k.trim_start_matches("./").to_string())
}

pub async fn multi_part_upload(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

algora-pbc bot commented May 7, 2024

🎉🎈 @kemkemG0 has been awarded $200! 🎈🎊

@generall
Copy link
Member

generall commented May 7, 2024

A few things we would need to finalize this PR:

  • In the do_get_snapshot and similar functions we download the snapshot from s3 and only after it we respond to te user. But this might not work so good with large files, where user would need to wait indefinitely without a single response byte. Additionally it will left temporary files behind which are never cleaned.

My suggestion is to stream files from either local or S3 as an

HttpResponse::Ok().content_type("application/octet-stream").streaming(stream)

In order to do that, we would need to implement something like

pub struct SnapshotStreamer {
    snapshot_manager: SnapshotStorageManager,
    snapshot_path: PathBuf,
}

impl Stream for SnapshotStreamer {
    type Item = Result<bytes::Bytes, CollectionError>;

NamedFile from actix does a lot of nice things, maybe we could either keep it for local files, or re-implement in the octet-stream.

  • make sure tests work

Please note that the main issue bounty should still be paid once this PR is merged

@kemkemG0 kemkemG0 changed the title Implement S3 snapshot manager [WIP] Implement S3 snapshot manager May 9, 2024
@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 10, 2024

@generall
I also noticed that, shard snapshots had the separated implementations and need to add create_shard_snapshot, recover_shard_snapshot, create_shard_snapshot and delete_shard_snapshot under snapshot_storage_manager.
These looks complicated and may take some time.

Nevermind, it was just an internal thing and we just need to call common create_store after these are done.

@kemkemG0
Copy link
Contributor Author

kemkemG0 commented May 10, 2024

  • Fixed bugs and tests (should) passe
  • Implement downloading with Stream
  • Add Integration test for Shard Snapshot API with S3 storage

@kemkemG0 kemkemG0 changed the title [WIP] Implement S3 snapshot manager Implement S3 snapshot manager May 10, 2024
@@ -254,3 +310,35 @@ jobs:
trap 'kill $(jobs -p) &>/dev/null || :' EXIT
sleep 10
./tests/shard-snapshot-api.sh test-all

test-shard-snapshot-api-s3-minio:
Copy link
Contributor Author

@kemkemG0 kemkemG0 May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add shard snapshot API integration test for s3 version.

Comment on lines +6 to +42

pub struct SnapShotStreamLocalFS {
pub snapshot_path: PathBuf,
pub req: HttpRequest,
}
pub struct SnapShotStreamCloudStrage {
pub streamer:
std::pin::Pin<Box<dyn Stream<Item = Result<bytes::Bytes, object_store::Error>> + Send>>,
}

pub enum SnapshotStream {
LocalFS(SnapShotStreamLocalFS),
CloudStorage(SnapShotStreamCloudStrage),
}

impl Responder for SnapshotStream {
type Body = actix_web::body::BoxBody;

fn respond_to(self, _: &actix_web::HttpRequest) -> HttpResponse<Self::Body> {
match self {
SnapshotStream::LocalFS(stream) => match NamedFile::open(stream.snapshot_path) {
Ok(file) => file.into_response(&stream.req),
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
HttpResponse::NotFound().body(format!("File not found: {}", e))
}
_ => HttpResponse::InternalServerError()
.body(format!("Failed to open file: {}", e)),
},
},

SnapshotStream::CloudStorage(stream) => HttpResponse::Ok()
.content_type("application/octet-stream")
.streaming(stream.streamer),
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add SnspShotStream for downloading snapshot with streaming

@kemkemG0
Copy link
Contributor Author

@generall
I fixed them up and added extra integration test !

@generall generall merged commit 0d46aeb into qdrant:dev May 10, 2024
generall pushed a commit that referenced this pull request May 26, 2024
* Add SnapshotsStorageConfig enum(Local or S3) and deserialize implementation

* [refactor]  use snapshots_config instead of s3_config

* update config

* add AWS official`aws-sdk-s3`

* implement store_file() WITHOUT error handling

* implement list_snapshots

* implement delete_snapshot

* run `cargo +nightly fmt`

* delete println

* implement get_stored_file

* Add error handlings

* Refactor AWS S3 configuration and error handling

* fix bugs

* create an empty test file

* fix `alias_test.rs` for StorageConfig type

* tempolary delete some test and try s3 test

* Update integration-tests.yml to use snap instead of apt-get for installing yq

* Update integration-tests.yml to use sudo when installing yq

* add sudo

* make (full/non-full) snapshots downloadable

* debug

* small fix

* Add S3 endpoint URL configuration option

* fix

* fix

* debug

* fix endpoint

* update to http://127.0.0.1:9000/

* update

* fix

* fix `#[get("/collections/{collection}/shards/{shard}/snapshots/{snapshot}")]` for s3

* put original tests back

* refactor

* small fix (delete println & echo)

* use object_store and refactor

* create snapshot_storage_ops and implement

* Refactor get_appropriate_chunk_size function to adjust chunk size based on service limits and file size

* cargo +nightly fmt --all

* make it more abstract

* Refactor SnapshotsStorageConfig deserialization in SnapShotsConfig

* small update

* small fix

* Update dependencies in Cargo.lock

* Update minio image to satantime/minio-server

* Refactor snapshot storage paths in snapshots_manager.rs and snapshot_storage_ops.rs

* Fix issue with downloaded file size not matching expected size in download_snapshot function

* add flush

* Use Streaming instead of donloading once

* apply `cargo +nightly fmt --all`

* Fix issue with opening file in SnapshotStream::LocalFS variant

* Fix error handling in SnapshotStream::LocalFS variant

* Add integration test for Shard Snapshot API with S3 storage (#7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants