-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Faster wal-delta transfer #6458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dd3b50d
to
d6d650f
Compare
📝 WalkthroughWalkthroughThis change set introduces batch update capabilities and related structural enhancements to the internal points service. At the protocol level, new gRPC messages ( On the internal service side, the implementation is refactored to encapsulate core update logic within dedicated methods, and a new handler for batch updates is introduced. This handler processes each operation in the batch sequentially, aggregating hardware usage metrics and returning the final result. Hardware usage accumulation is facilitated by a new method on the In the collection and shard management layer, version-aware logic is introduced to determine if a remote shard supports batch updates, and batch transfer mechanisms are implemented accordingly. The transfer logic ensures consistency by controlling when to wait for batch application, especially during the last batch transfer. New methods are added to facilitate version checking and batch forwarding in the 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (13)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
lib/collection/src/shards/queue_proxy_shard.rs (1)
748-764
: 🛠️ Refactor suggestionPropagate the refined
wait
logic to the per-operation fallbackFollowing the earlier remark, the loop can be adjusted:
-for (_idx, operation) in batch { +for (i, (_idx, operation)) in batch.iter().enumerate() { let mut operation = operation.clone(); @@ - remote_shard + remote_shard .forward_update( operation, - wait, + wait_for_last_op_only && i == batch.len() - 1, WriteOrdering::Weak, hw_measurement_acc.clone(), )This keeps the non-batched path behaviourally identical to the new batched API in terms of client-perceived durability.
🧹 Nitpick comments (12)
lib/api/src/grpc/proto/points_internal_service.proto (2)
100-114
: Addreserved
numbers / names for forward compatibilityOnce shipped, the field numbers inside
UpdateOperation
are frozen.
If a variant is ever removed/renamed we will not be able to reuse its tag.
Reserve a small range now to avoid a breaking change later:message UpdateOperation { reserved 12 to 20; // keep room for future variants oneof update { // … } }Also consider using distinct message types for
set_payload
vsoverwrite_payload
to prevent accidental misuse by code-gen tools that rely on variant names rather
than numbers.
117-119
: Nit: pluraliseoperations
or rename wrapperThe wrapper is called
UpdateBatchInternal
yet the field inside is already
operations
. For terseness & protobuf style consistency this could be:-message UpdateBatchInternal { - repeated UpdateOperation operations = 1; -} +message UpdateBatchInternal { + repeated UpdateOperation ops = 1; +}Purely cosmetic, but keeps request payloads a few bytes smaller and in line with
existing messages likesearch_points
,query_points
, etc.lib/collection/src/shards/remote_shard.rs (3)
105-112
:check_version
may return stale/unknown results – clarify semantics
peer_is_at_version
looks like a cache lookup. If the peer hasn’t announced its
version yet we’ll returnfalse
, which the caller may interpret as “feature
unsupported” and silently fall back to the slow path. That defeats the purpose
of the optimisation.Consider returning an enum (
Unknown | Below | AtLeast
) or exposing a
refresh_version
helper so the caller can trigger an explicit health-check
when the version is unknown.
229-237
: Potentially large allocation – pre-sizeupdates
correctly
updates
is initialised withoperations.len()
capacity – good – but we
immediately iterate by value, causing another allocation when converting
to Protobuf structs.
Ifoperations
is large this doubles memory usage.- let mut updates = Vec::with_capacity(operations.len()); + let mut updates = Vec::with_capacity(operations.len()); // keep + updates.reserve(operations.len()); // not needed, remove comment // OR build in-place with `.map().collect()`: + let updates: Vec<UpdateOperation> = operations.into_iter().map(|operation| { + // conversion + }).collect();
242-415
: Method doing too much – extract conversion logic
forward_update_batch
contains ~160 lines of nestedmatch
statements making
it hard to read and maintain. Extract the “local operation ➜ protobuf
Update
” mapping into a dedicated helper:fn into_update_proto( op: OperationWithClockTag, shard_id: ShardId, collection: &str, wait: bool, ordering: Option<WriteOrdering>, ) -> CollectionResult<Update> { … }Then:
let updates: Result<Vec<_>, _> = operations .into_iter() .map(|op| into_update_proto(op, …)) .try_collect()?;This shortens the method, improves testability, and keeps each concern in its
own place.lib/api/src/grpc/qdrant.rs (4)
9334-9345
: DeriveDeserialize
and align JSON naming to avoid inter-service incompatibilities
The newUpdateOperation
is onlySerialize
-able. Any component that might consume batched requests or log them in JSON now has no straightforward way to de-serialize them back (e.g. for re-play, testing or audit). This is especially painful once the batch travels through message queues or is cached.-#[derive(serde::Serialize)] +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")](The same change is advisable for every struct / enum that is exposed outside the binary.)
Adding
Deserialize
is zero-cost while future-proofing the API.
9381-9384
: ImplementDefault
for convenience & testability
UpdateBatchInternal
is frequently constructed in unit / integration
tests. DerivingDefault
(cheap because all fields already implement
it) streamlines that workflow and avoids verbose...::default()
wrappers.-#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message, Default)]
10530-10536
: Add minimal rust-doc for the new trait method
PointsInternal::update_batch
is a public contract but ships without
documentation. Two-line docs explaining ordering guarantees and error-
propagation will save future readers a trip through the implementation.
11172-11217
: Beware of very large batches – expose max message size knobs
The server handler uses the globalmax_decoding_message_size
, but
client code defaults to tonic’s 4 MiB limit. For delta WAL replication,
a single batch can easily exceed that. Consider:
- Documenting the recommended limits in
README
/ config.- Allowing per-request overrides (
update_batch(max_size=…)
).- Emitting a clear error when the limit is hit so operators know which
knob to tweak.This avoids mysterious
ResourceExhausted: received message larger than max
errors in production.lib/collection/src/shards/queue_proxy_shard.rs (1)
719-746
: Avoid re-computing feature-flag per batch
remote_shard.check_version(&MINIMAL_VERSION_FOR_BATCH_WAL_TRANSFER)
is evaluated for every WAL batch.
For long catch-ups this extra RPC / parse can add noticeable overhead.Suggestion:
- Cache the boolean once in
Inner
(e.g. asupports_batch_updates
field initialised during the first transfer).- Re-use the cached flag inside
transfer_operations_batch
.This tiny change keeps the critical path free of redundant version checks.
src/tonic/api/points_internal_api.rs (2)
561-632
:time
metric forupdate_batch
only reflects the last operation
last_result.time
is left untouched after aggregating multiple operations, so the response under-reports
the wall-clock cost when the batch contains >1 update.Consider accumulating the timings similarly to
usage
:-let mut total_usage = HardwareUsage::default(); +let mut total_usage = HardwareUsage::default(); +let mut total_time = 0.0_f64; @@ - let mut response = result.into_inner(); + let mut response = result.into_inner(); + + total_time += response.time; @@ - last_result = Some(response) + last_result = Some(response) @@ -if let Some(mut last_result) = last_result.take() { - last_result.usage = Some(total_usage); +if let Some(mut last_result) = last_result.take() { + last_result.usage = Some(total_usage); + last_result.time = total_time;This gives clients an accurate latency picture while preserving backward-compatibility.
438-447
: Nit: unnecessary string clone on every call
get_request_collection_hw_usage_counter_for_internal
takescollection_name: String
by value,
forcing each caller to allocate/clamp a newString
. A lightweight&str
suffices:-fn get_request_collection_hw_usage_counter_for_internal( - &self, - collection_name: String, -) -> RequestHwCounter { - let counter = HwMeasurementAcc::new_with_metrics_drain( - self.toc.get_collection_hw_metrics(collection_name), +fn get_request_collection_hw_usage_counter_for_internal( + &self, + collection_name: &str, +) -> RequestHwCounter { + let counter = HwMeasurementAcc::new_with_metrics_drain( + self.toc.get_collection_hw_metrics(collection_name.to_owned()), );and call-sites become:
- self.get_request_collection_hw_usage_counter_for_internal(collection_name.clone()); + self.get_request_collection_hw_usage_counter_for_internal(&collection_name);This removes a few pointless heap allocations on a hot path.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
lib/api/build.rs
(1 hunks)lib/api/src/grpc/mod.rs
(1 hunks)lib/api/src/grpc/ops.rs
(1 hunks)lib/api/src/grpc/proto/points_internal_service.proto
(2 hunks)lib/api/src/grpc/qdrant.rs
(4 hunks)lib/api/src/grpc/validate.rs
(1 hunks)lib/collection/src/shards/queue_proxy_shard.rs
(7 hunks)lib/collection/src/shards/remote_shard.rs
(4 hunks)src/tonic/api/points_internal_api.rs
(14 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
lib/api/src/grpc/qdrant.rs (2)
src/tonic/api/points_internal_api.rs (2)
update_batch
(551-631)new
(49-54)src/tonic/api/update_common.rs (1)
update_batch
(417-662)
⏰ Context from checks skipped due to timeout of 90000ms (13)
- GitHub Check: Basic TLS/HTTPS tests
- GitHub Check: test-snapshot-operations-s3-minio
- GitHub Check: test-shard-snapshot-api-s3-minio
- GitHub Check: test-low-resources
- GitHub Check: test-consistency
- GitHub Check: rust-tests (macos-latest)
- GitHub Check: test-consensus-compose
- GitHub Check: rust-tests (windows-latest)
- GitHub Check: integration-tests-consensus
- GitHub Check: lint
- GitHub Check: rust-tests (ubuntu-latest)
- GitHub Check: integration-tests
- GitHub Check: storage-compat-test
🔇 Additional comments (5)
lib/api/src/grpc/mod.rs (1)
10-10
: Module addition looks goodAdding the new
ops
module that implements hardware usage accumulation for batch updates is a clean approach for extending functionality.lib/api/build.rs (1)
303-304
: Validation entries correctly added for new message typesThe validation entries for
UpdateOperation.update
andUpdateBatchInternal.operations
are properly configured with empty constraints, which will enable nested validation of these fields, maintaining consistency with the existing validation pattern.lib/api/src/grpc/validate.rs (1)
198-215
: Validation implementation follows established patternsThe implementation for
grpc::update_operation::Update
correctly delegates validation to each variant'svalidate
method, following the same pattern as other enum validations in this file. This ensures validation rules are applied consistently across all update operation types.lib/api/src/grpc/ops.rs (1)
1-23
: HardwareUsage aggregation implementation looks goodThe
add
method provides a clean way to accumulate hardware usage metrics from multiple operations, which is essential for the batch update feature. The implementation correctly adds all relevant fields from the provided instance to the corresponding fields inself
.lib/api/src/grpc/qdrant.rs (1)
10254-10278
: Client stub addition LGTM
The stub follows the same readiness / codec / path pattern as the
existing RPCs and correctly wires theGrpcMethod
metadata.
@@ -17,6 +17,7 @@ service PointsInternal { | |||
rpc ClearPayload (ClearPayloadPointsInternal) returns (PointsOperationResponseInternal) {} | |||
rpc CreateFieldIndex (CreateFieldIndexCollectionInternal) returns (PointsOperationResponseInternal) {} | |||
rpc DeleteFieldIndex (DeleteFieldIndexCollectionInternal) returns (PointsOperationResponseInternal) {} | |||
rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Document message‐size expectations for UpdateBatch
UpdateBatch
allows an unbounded list of operations. Clients may unintentionally exceed gRPC’s default 4 MiB limit or server RAM, leading to RESOURCE_EXHAUSTED
errors or OOM kills.
Consider one (or both) of the following:
rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) {
-}
+} // Expect ≤ N MiB total encoded size. Large transfers should be chunked.
or introduce a streaming variant (rpc UpdateBatch(stream UpdateOperation) …
).
This heads off hard-to-debug production failures.
Enforce size limits or switch to streaming for UpdateBatch RPC
The UpdateBatch
RPC currently accepts an unbounded list of operations, which risks exceeding gRPC’s default 4 MiB message limit or exhausting server memory.
Please update lib/api/src/grpc/proto/points_internal_service.proto
(around line 20) to either:
-
Document an explicit maximum encoded payload size:
rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) { -} +} // Expect ≤ N MiB total encoded size. Large batches should be chunked.
-
Or introduce a streaming variant to safely handle large workloads:
rpc UpdateBatchStream(stream UpdateOperation) returns (PointsOperationResponseInternal);
This will help prevent unexpected RESOURCE_EXHAUSTED
errors or OOM failures in production.
let batch_request = &UpdateBatchInternal { | ||
operations: updates, | ||
}; | ||
|
||
let point_operation_response = self | ||
.with_points_client(|mut client| async move { | ||
client | ||
.update_batch(tonic::Request::new(batch_request.clone())) | ||
.await | ||
}) | ||
.await? | ||
.into_inner(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid borrowing issues – move the batch request instead of cloning refs
&UpdateBatchInternal
lives on the stack; passing a reference into an async
block relies on implicit lifetimes and clones a reference, not the struct.
Create the request by value and move it:
- let batch_request = &UpdateBatchInternal {
- operations: updates,
- };
+ let batch_request = UpdateBatchInternal {
+ operations: updates,
+ };
// …
- client.update_batch(tonic::Request::new(batch_request.clone()))
+ client.update_batch(tonic::Request::new(batch_request))
This removes an unnecessary clone and lifetime gymnastics.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let batch_request = &UpdateBatchInternal { | |
operations: updates, | |
}; | |
let point_operation_response = self | |
.with_points_client(|mut client| async move { | |
client | |
.update_batch(tonic::Request::new(batch_request.clone())) | |
.await | |
}) | |
.await? | |
.into_inner(); | |
let batch_request = UpdateBatchInternal { | |
operations: updates, | |
}; | |
let point_operation_response = self | |
.with_points_client(|mut client| async move { | |
client | |
.update_batch(tonic::Request::new(batch_request)) | |
.await | |
}) | |
.await? | |
.into_inner(); |
let request = internal_sync_points( | ||
shard_id, | ||
None, // TODO!? | ||
collection_name.clone(), | ||
operation, | ||
wait, | ||
ordering, | ||
)?; | ||
Update::Sync(request) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clock-tag accidentally dropped for SyncPoints
internal_sync_points
is called with None
instead of
operation.clock_tag
, losing idempotency and ordering guarantees:
- let request = internal_sync_points(
- shard_id,
- None, // TODO!?
+ let request = internal_sync_points(
+ shard_id,
+ operation.clock_tag,
This bug also exists in execute_update_operation
; fixing both keeps WAL
re-sync deterministic.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let request = internal_sync_points( | |
shard_id, | |
None, // TODO!? | |
collection_name.clone(), | |
operation, | |
wait, | |
ordering, | |
)?; | |
Update::Sync(request) | |
} | |
let request = internal_sync_points( | |
shard_id, | |
operation.clock_tag, | |
collection_name.clone(), | |
operation, | |
wait, | |
ordering, | |
)?; | |
Update::Sync(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not accident, sync_points
doesn't require clock tag by design, ToDo was migrated from previous implementation as-is and is not related to the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@generall Thank you for the clarification. I understand that sync_points
doesn't require a clock tag by design, and the TODO comment was carried over from the previous implementation. I'll remember this pattern for future reviews.
✏️ Learnings added
Learnt from: generall
PR: qdrant/qdrant#6458
File: lib/collection/src/shards/remote_shard.rs:280-289
Timestamp: 2025-04-29T15:26:33.253Z
Learning: In the Qdrant codebase, `sync_points` operations don't require a clock tag by design. The `internal_sync_points` function is intentionally called with `None` for the clock tag parameter in both the `forward_update_batch` and `execute_update_operation` methods.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
#[derive(Clone, PartialEq, ::prost::Oneof)] | ||
pub enum Update { | ||
#[prost(message, tag = "1")] | ||
Sync(super::SyncPointsInternal), | ||
#[prost(message, tag = "2")] | ||
Upsert(super::UpsertPointsInternal), | ||
#[prost(message, tag = "3")] | ||
Delete(super::DeletePointsInternal), | ||
#[prost(message, tag = "4")] | ||
UpdateVectors(super::UpdateVectorsInternal), | ||
#[prost(message, tag = "5")] | ||
DeleteVectors(super::DeleteVectorsInternal), | ||
#[prost(message, tag = "6")] | ||
SetPayload(super::SetPayloadPointsInternal), | ||
#[prost(message, tag = "7")] | ||
OverwritePayload(super::SetPayloadPointsInternal), | ||
#[prost(message, tag = "8")] | ||
DeletePayload(super::DeletePayloadPointsInternal), | ||
#[prost(message, tag = "9")] | ||
ClearPayload(super::ClearPayloadPointsInternal), | ||
#[prost(message, tag = "10")] | ||
CreateFieldIndex(super::CreateFieldIndexCollectionInternal), | ||
#[prost(message, tag = "11")] | ||
DeleteFieldIndex(super::DeleteFieldIndexCollectionInternal), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use a dedicated payload message for the OverwritePayload
variant
OverwritePayload
currently re-uses SetPayloadPointsInternal
.
Apart from being semantically confusing, it silently allows the key
field that overwrite operations must not accept (see
overwrite_payload_internal
which ignores key
). A dedicated
OverwritePayloadPointsInternal
(without the key
field) would let the
type-system catch such errors early.
-OverwritePayload(super::SetPayloadPointsInternal),
+OverwritePayload(super::OverwritePayloadPointsInternal),
If the dedicated message already exists in the proto folder, this is a
one-line swap; if it doesn’t, defining it keeps the contract crystal-clear.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is auto-generated file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@generall Understood—no manual changes will be suggested for auto-generated files. Any required changes should be made in the source definitions (e.g., .proto files).
// If we are transferring the last batch, we need to wait for it to be applied. | ||
// - Why can we not wait? Assuming that order of operations is still enforced by the WAL, | ||
// we should end up in exactly the same state with or without waiting. | ||
// - Why do we need to wait on the last batch? If we switch to ready state before | ||
// updates are actually applied, we might create an inconsistency for read operations. | ||
let wait = last_batch; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
wait
flag is applied to every op in the last batch, not only to the final one
wait
is set to true
for the whole last batch, which means every single forward_update
in the fallback path will block until it’s applied.
The intended optimisation (wait only once – the very last op) is therefore lost and the last batch may take up to N × RTT.
- let wait = last_batch;
+ // We only need to block on the *very last* operation if batching is unsupported.
+ // For batched transfer (`supports_update_batching == true`) the single RPC already
+ // covers the whole batch, so `wait` can stay as is.
+ let wait_for_last_op_only = last_batch;
Then, in the non-batched loop (see below) pass
wait: wait_for_last_op_only && i == batch.len() - 1
.
This preserves the latency win introduced by the PR while still guaranteeing consistency for the final visible state.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@generall Understood! If you're satisfied with the current implementation, we'll leave it as is. Thank you for considering the suggestion.
* implement internal batch-update api * use batch update in wal-delta * fix ci
Speed-up wal-delta transfer.
I think I was able to reproduce the issue reported by users. Setup is the following: 3 nodes cluster, 2 CPUs, way more memory than needed.
Create collection with:
and upload points with
On kill of one pods of the cluster, it starts with wal_delta, which never finishes.
This might be expected, if we were operating at maximum disk throughput capacity, but tried the following: increase shard count 3 -> 12 - assuming that
wal_delta
would need to transfer 1/4 of the volume, and it didn't change the outcome.Also, according to the charts, recovering pod only utilize fraction of disk IO comparing to other pods - this is explainable by the fact that 7 out of 8 shards of this pod are dead and do not receive updates.
After this PR, I can no longer reproduce the issue,
wal_delta
transfers are significantly fasterToDo: