Skip to content

Conversation

generall
Copy link
Member

@generall generall commented Apr 28, 2025

Speed-up wal-delta transfer.

I think I was able to reproduce the issue reported by users. Setup is the following: 3 nodes cluster, 2 CPUs, way more memory than needed.
Create collection with:

PUT collections/benchmark
{
  "vectors": {
    "size": 32,
    "distance": "Cosine"
  },
  "shard_number": 3,
  "replication_factor": 2,
  "write_consistency_factor": 1,
  "on_disk_payload": true,
  "wal_config": {
    "wal_capacity_mb": 1024
  },
  "optimizer_config": {
    "indexing_threshold": 100000
  }
}

and upload points with

docker run --rm -it -e QDRANT_API_KEY=xxxx --network=host qdrant/bfb:dev ./bfb --uri https://xxxxxx.europe-west3-0.gcp.cloud.qdrant.io:6334/ -n 100000000 --max-id 1000000 --replication-factor 2 --text-payloads --text-payload-length 100 --skip-field-indices -d 32 --indexing-threshold 100000 -p 8 -t 4 --create-if-missing

On kill of one pods of the cluster, it starts with wal_delta, which never finishes.
This might be expected, if we were operating at maximum disk throughput capacity, but tried the following: increase shard count 3 -> 12 - assuming that wal_delta would need to transfer 1/4 of the volume, and it didn't change the outcome.

Also, according to the charts, recovering pod only utilize fraction of disk IO comparing to other pods - this is explainable by the fact that 7 out of 8 shards of this pod are dead and do not receive updates.

After this PR, I can no longer reproduce the issue, wal_delta transfers are significantly faster


ToDo:

  • use batch update API instead of independent requests
  • do not wait for updates except for the last one
  • run experiment
  • make batch size larger (optional)

@generall generall marked this pull request as ready for review April 29, 2025 15:11
Copy link
Contributor

coderabbitai bot commented Apr 29, 2025

📝 Walkthrough

Walkthrough

This change set introduces batch update capabilities and related structural enhancements to the internal points service. At the protocol level, new gRPC messages (UpdateOperation, UpdateBatchInternal) and the UpdateBatch RPC method are added, allowing clients to send multiple update operations in a single request. The Rust gRPC client and server implementations are updated to support these new types and endpoints, including methods for sending and handling batch updates. Validation logic is extended to cover the new message types.

On the internal service side, the implementation is refactored to encapsulate core update logic within dedicated methods, and a new handler for batch updates is introduced. This handler processes each operation in the batch sequentially, aggregating hardware usage metrics and returning the final result. Hardware usage accumulation is facilitated by a new method on the HardwareUsage struct.

In the collection and shard management layer, version-aware logic is introduced to determine if a remote shard supports batch updates, and batch transfer mechanisms are implemented accordingly. The transfer logic ensures consistency by controlling when to wait for batch application, especially during the last batch transfer. New methods are added to facilitate version checking and batch forwarding in the RemoteShard struct.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d6d650f and 984d60c.

📒 Files selected for processing (1)
  • lib/collection/src/shards/queue_proxy_shard.rs (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • lib/collection/src/shards/queue_proxy_shard.rs
⏰ Context from checks skipped due to timeout of 90000ms (13)
  • GitHub Check: Basic TLS/HTTPS tests
  • GitHub Check: test-snapshot-operations-s3-minio
  • GitHub Check: test-shard-snapshot-api-s3-minio
  • GitHub Check: test-low-resources
  • GitHub Check: test-consensus-compose
  • GitHub Check: rust-tests (macos-latest)
  • GitHub Check: test-consistency
  • GitHub Check: rust-tests (windows-latest)
  • GitHub Check: integration-tests-consensus
  • GitHub Check: rust-tests (ubuntu-latest)
  • GitHub Check: storage-compat-test
  • GitHub Check: integration-tests
  • GitHub Check: lint
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
lib/collection/src/shards/queue_proxy_shard.rs (1)

748-764: 🛠️ Refactor suggestion

Propagate the refined wait logic to the per-operation fallback

Following the earlier remark, the loop can be adjusted:

-for (_idx, operation) in batch {
+for (i, (_idx, operation)) in batch.iter().enumerate() {
     let mut operation = operation.clone();
@@
-        remote_shard
+        remote_shard
             .forward_update(
                 operation,
-                wait,
+                wait_for_last_op_only && i == batch.len() - 1,
                 WriteOrdering::Weak,
                 hw_measurement_acc.clone(),
             )

This keeps the non-batched path behaviourally identical to the new batched API in terms of client-perceived durability.

🧹 Nitpick comments (12)
lib/api/src/grpc/proto/points_internal_service.proto (2)

100-114: Add reserved numbers / names for forward compatibility

Once shipped, the field numbers inside UpdateOperation are frozen.
If a variant is ever removed/renamed we will not be able to reuse its tag.
Reserve a small range now to avoid a breaking change later:

message UpdateOperation {
  reserved 12 to 20;       // keep room for future variants
  oneof update {
      // …
  }
}

Also consider using distinct message types for set_payload vs overwrite_payload
to prevent accidental misuse by code-gen tools that rely on variant names rather
than numbers.


117-119: Nit: pluralise operations or rename wrapper

The wrapper is called UpdateBatchInternal yet the field inside is already
operations. For terseness & protobuf style consistency this could be:

-message UpdateBatchInternal {
-    repeated UpdateOperation operations = 1;
-}
+message UpdateBatchInternal {
+    repeated UpdateOperation ops = 1;
+}

Purely cosmetic, but keeps request payloads a few bytes smaller and in line with
existing messages like search_points, query_points, etc.

lib/collection/src/shards/remote_shard.rs (3)

105-112: check_version may return stale/unknown results – clarify semantics

peer_is_at_version looks like a cache lookup. If the peer hasn’t announced its
version yet we’ll return false, which the caller may interpret as “feature
unsupported” and silently fall back to the slow path. That defeats the purpose
of the optimisation.

Consider returning an enum (Unknown | Below | AtLeast) or exposing a
refresh_version helper so the caller can trigger an explicit health-check
when the version is unknown.


229-237: Potentially large allocation – pre-size updates correctly

updates is initialised with operations.len() capacity – good – but we
immediately iterate by value, causing another allocation when converting
to Protobuf structs.
If operations is large this doubles memory usage.

- let mut updates = Vec::with_capacity(operations.len());
+ let mut updates = Vec::with_capacity(operations.len()); // keep
+ updates.reserve(operations.len());                      // not needed, remove comment

// OR build in-place with `.map().collect()`:
+ let updates: Vec<UpdateOperation> = operations.into_iter().map(|operation| {
+     // conversion
+ }).collect();

242-415: Method doing too much – extract conversion logic

forward_update_batch contains ~160 lines of nested match statements making
it hard to read and maintain. Extract the “local operation ➜ protobuf
Update” mapping into a dedicated helper:

fn into_update_proto(
    op: OperationWithClockTag,
    shard_id: ShardId,
    collection: &str,
    wait: bool,
    ordering: Option<WriteOrdering>,
) -> CollectionResult<Update> {}

Then:

let updates: Result<Vec<_>, _> = operations
    .into_iter()
    .map(|op| into_update_proto(op,))
    .try_collect()?;

This shortens the method, improves testability, and keeps each concern in its
own place.

lib/api/src/grpc/qdrant.rs (4)

9334-9345: Derive Deserialize and align JSON naming to avoid inter-service incompatibilities
The new UpdateOperation is only Serialize-able. Any component that might consume batched requests or log them in JSON now has no straightforward way to de-serialize them back (e.g. for re-play, testing or audit). This is especially painful once the batch travels through message queues or is cached.

-#[derive(serde::Serialize)]
+#[derive(serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "snake_case")]

(The same change is advisable for every struct / enum that is exposed outside the binary.)

Adding Deserialize is zero-cost while future-proofing the API.


9381-9384: Implement Default for convenience & testability
UpdateBatchInternal is frequently constructed in unit / integration
tests. Deriving Default (cheap because all fields already implement
it) streamlines that workflow and avoids verbose ...::default()
wrappers.

-#[derive(Clone, PartialEq, ::prost::Message)]
+#[derive(Clone, PartialEq, ::prost::Message, Default)]

10530-10536: Add minimal rust-doc for the new trait method
PointsInternal::update_batch is a public contract but ships without
documentation. Two-line docs explaining ordering guarantees and error-
propagation will save future readers a trip through the implementation.


11172-11217: Beware of very large batches – expose max message size knobs
The server handler uses the global max_decoding_message_size, but
client code defaults to tonic’s 4 MiB limit. For delta WAL replication,
a single batch can easily exceed that. Consider:

  1. Documenting the recommended limits in README / config.
  2. Allowing per-request overrides (update_batch(max_size=…)).
  3. Emitting a clear error when the limit is hit so operators know which
    knob to tweak.

This avoids mysterious ResourceExhausted: received message larger than max errors in production.

lib/collection/src/shards/queue_proxy_shard.rs (1)

719-746: Avoid re-computing feature-flag per batch

remote_shard.check_version(&MINIMAL_VERSION_FOR_BATCH_WAL_TRANSFER) is evaluated for every WAL batch.
For long catch-ups this extra RPC / parse can add noticeable overhead.

Suggestion:

  1. Cache the boolean once in Inner (e.g. a supports_batch_updates field initialised during the first transfer).
  2. Re-use the cached flag inside transfer_operations_batch.

This tiny change keeps the critical path free of redundant version checks.

src/tonic/api/points_internal_api.rs (2)

561-632: time metric for update_batch only reflects the last operation

last_result.time is left untouched after aggregating multiple operations, so the response under-reports
the wall-clock cost when the batch contains >1 update.

Consider accumulating the timings similarly to usage:

-let mut total_usage = HardwareUsage::default();
+let mut total_usage = HardwareUsage::default();
+let mut total_time   = 0.0_f64;
@@
-        let mut response = result.into_inner();
+        let mut response = result.into_inner();
+
+        total_time += response.time;
@@
-            last_result = Some(response)
+            last_result = Some(response)
@@
-if let Some(mut last_result) = last_result.take() {
-    last_result.usage = Some(total_usage);
+if let Some(mut last_result) = last_result.take() {
+    last_result.usage = Some(total_usage);
+    last_result.time  = total_time;

This gives clients an accurate latency picture while preserving backward-compatibility.


438-447: Nit: unnecessary string clone on every call

get_request_collection_hw_usage_counter_for_internal takes collection_name: String by value,
forcing each caller to allocate/clamp a new String. A lightweight &str suffices:

-fn get_request_collection_hw_usage_counter_for_internal(
-    &self,
-    collection_name: String,
-) -> RequestHwCounter {
-    let counter = HwMeasurementAcc::new_with_metrics_drain(
-        self.toc.get_collection_hw_metrics(collection_name),
+fn get_request_collection_hw_usage_counter_for_internal(
+    &self,
+    collection_name: &str,
+) -> RequestHwCounter {
+    let counter = HwMeasurementAcc::new_with_metrics_drain(
+        self.toc.get_collection_hw_metrics(collection_name.to_owned()),
     );

and call-sites become:

- self.get_request_collection_hw_usage_counter_for_internal(collection_name.clone());
+ self.get_request_collection_hw_usage_counter_for_internal(&collection_name);

This removes a few pointless heap allocations on a hot path.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9c723aa and d6d650f.

📒 Files selected for processing (9)
  • lib/api/build.rs (1 hunks)
  • lib/api/src/grpc/mod.rs (1 hunks)
  • lib/api/src/grpc/ops.rs (1 hunks)
  • lib/api/src/grpc/proto/points_internal_service.proto (2 hunks)
  • lib/api/src/grpc/qdrant.rs (4 hunks)
  • lib/api/src/grpc/validate.rs (1 hunks)
  • lib/collection/src/shards/queue_proxy_shard.rs (7 hunks)
  • lib/collection/src/shards/remote_shard.rs (4 hunks)
  • src/tonic/api/points_internal_api.rs (14 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
lib/api/src/grpc/qdrant.rs (2)
src/tonic/api/points_internal_api.rs (2)
  • update_batch (551-631)
  • new (49-54)
src/tonic/api/update_common.rs (1)
  • update_batch (417-662)
⏰ Context from checks skipped due to timeout of 90000ms (13)
  • GitHub Check: Basic TLS/HTTPS tests
  • GitHub Check: test-snapshot-operations-s3-minio
  • GitHub Check: test-shard-snapshot-api-s3-minio
  • GitHub Check: test-low-resources
  • GitHub Check: test-consistency
  • GitHub Check: rust-tests (macos-latest)
  • GitHub Check: test-consensus-compose
  • GitHub Check: rust-tests (windows-latest)
  • GitHub Check: integration-tests-consensus
  • GitHub Check: lint
  • GitHub Check: rust-tests (ubuntu-latest)
  • GitHub Check: integration-tests
  • GitHub Check: storage-compat-test
🔇 Additional comments (5)
lib/api/src/grpc/mod.rs (1)

10-10: Module addition looks good

Adding the new ops module that implements hardware usage accumulation for batch updates is a clean approach for extending functionality.

lib/api/build.rs (1)

303-304: Validation entries correctly added for new message types

The validation entries for UpdateOperation.update and UpdateBatchInternal.operations are properly configured with empty constraints, which will enable nested validation of these fields, maintaining consistency with the existing validation pattern.

lib/api/src/grpc/validate.rs (1)

198-215: Validation implementation follows established patterns

The implementation for grpc::update_operation::Update correctly delegates validation to each variant's validate method, following the same pattern as other enum validations in this file. This ensures validation rules are applied consistently across all update operation types.

lib/api/src/grpc/ops.rs (1)

1-23: HardwareUsage aggregation implementation looks good

The add method provides a clean way to accumulate hardware usage metrics from multiple operations, which is essential for the batch update feature. The implementation correctly adds all relevant fields from the provided instance to the corresponding fields in self.

lib/api/src/grpc/qdrant.rs (1)

10254-10278: Client stub addition LGTM
The stub follows the same readiness / codec / path pattern as the
existing RPCs and correctly wires the GrpcMethod metadata.

@@ -17,6 +17,7 @@ service PointsInternal {
rpc ClearPayload (ClearPayloadPointsInternal) returns (PointsOperationResponseInternal) {}
rpc CreateFieldIndex (CreateFieldIndexCollectionInternal) returns (PointsOperationResponseInternal) {}
rpc DeleteFieldIndex (DeleteFieldIndexCollectionInternal) returns (PointsOperationResponseInternal) {}
rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Document message‐size expectations for UpdateBatch

UpdateBatch allows an unbounded list of operations. Clients may unintentionally exceed gRPC’s default 4 MiB limit or server RAM, leading to RESOURCE_EXHAUSTED errors or OOM kills.
Consider one (or both) of the following:

 rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) {
-}
+}  // Expect ≤ N MiB total encoded size.  Large transfers should be chunked.

or introduce a streaming variant (rpc UpdateBatch(stream UpdateOperation) …).
This heads off hard-to-debug production failures.


Enforce size limits or switch to streaming for UpdateBatch RPC

The UpdateBatch RPC currently accepts an unbounded list of operations, which risks exceeding gRPC’s default 4 MiB message limit or exhausting server memory.

Please update lib/api/src/grpc/proto/points_internal_service.proto (around line 20) to either:

  • Document an explicit maximum encoded payload size:

     rpc UpdateBatch (UpdateBatchInternal) returns (PointsOperationResponseInternal) {
    -}
    +}  // Expect ≤ N MiB total encoded size. Large batches should be chunked.
  • Or introduce a streaming variant to safely handle large workloads:

    rpc UpdateBatchStream(stream UpdateOperation) returns (PointsOperationResponseInternal);

This will help prevent unexpected RESOURCE_EXHAUSTED errors or OOM failures in production.

Comment on lines +417 to +428
let batch_request = &UpdateBatchInternal {
operations: updates,
};

let point_operation_response = self
.with_points_client(|mut client| async move {
client
.update_batch(tonic::Request::new(batch_request.clone()))
.await
})
.await?
.into_inner();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid borrowing issues – move the batch request instead of cloning refs

&UpdateBatchInternal lives on the stack; passing a reference into an async
block relies on implicit lifetimes and clones a reference, not the struct.
Create the request by value and move it:

- let batch_request = &UpdateBatchInternal {
-     operations: updates,
- };
+ let batch_request = UpdateBatchInternal {
+     operations: updates,
+ };
  // …
- client.update_batch(tonic::Request::new(batch_request.clone()))
+ client.update_batch(tonic::Request::new(batch_request))

This removes an unnecessary clone and lifetime gymnastics.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let batch_request = &UpdateBatchInternal {
operations: updates,
};
let point_operation_response = self
.with_points_client(|mut client| async move {
client
.update_batch(tonic::Request::new(batch_request.clone()))
.await
})
.await?
.into_inner();
let batch_request = UpdateBatchInternal {
operations: updates,
};
let point_operation_response = self
.with_points_client(|mut client| async move {
client
.update_batch(tonic::Request::new(batch_request))
.await
})
.await?
.into_inner();

Comment on lines +280 to +289
let request = internal_sync_points(
shard_id,
None, // TODO!?
collection_name.clone(),
operation,
wait,
ordering,
)?;
Update::Sync(request)
}
Copy link
Contributor

@coderabbitai coderabbitai bot Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clock-tag accidentally dropped for SyncPoints

internal_sync_points is called with None instead of
operation.clock_tag, losing idempotency and ordering guarantees:

- let request = internal_sync_points(
-     shard_id,
-     None, // TODO!?
+ let request = internal_sync_points(
+     shard_id,
+     operation.clock_tag,

This bug also exists in execute_update_operation; fixing both keeps WAL
re-sync deterministic.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let request = internal_sync_points(
shard_id,
None, // TODO!?
collection_name.clone(),
operation,
wait,
ordering,
)?;
Update::Sync(request)
}
let request = internal_sync_points(
shard_id,
operation.clock_tag,
collection_name.clone(),
operation,
wait,
ordering,
)?;
Update::Sync(request)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not accident, sync_points doesn't require clock tag by design, ToDo was migrated from previous implementation as-is and is not related to the PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@generall Thank you for the clarification. I understand that sync_points doesn't require a clock tag by design, and the TODO comment was carried over from the previous implementation. I'll remember this pattern for future reviews.


✏️ Learnings added
Learnt from: generall
PR: qdrant/qdrant#6458
File: lib/collection/src/shards/remote_shard.rs:280-289
Timestamp: 2025-04-29T15:26:33.253Z
Learning: In the Qdrant codebase, `sync_points` operations don't require a clock tag by design. The `internal_sync_points` function is intentionally called with `None` for the clock tag parameter in both the `forward_update_batch` and `execute_update_operation` methods.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +9350 to +9374
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Update {
#[prost(message, tag = "1")]
Sync(super::SyncPointsInternal),
#[prost(message, tag = "2")]
Upsert(super::UpsertPointsInternal),
#[prost(message, tag = "3")]
Delete(super::DeletePointsInternal),
#[prost(message, tag = "4")]
UpdateVectors(super::UpdateVectorsInternal),
#[prost(message, tag = "5")]
DeleteVectors(super::DeleteVectorsInternal),
#[prost(message, tag = "6")]
SetPayload(super::SetPayloadPointsInternal),
#[prost(message, tag = "7")]
OverwritePayload(super::SetPayloadPointsInternal),
#[prost(message, tag = "8")]
DeletePayload(super::DeletePayloadPointsInternal),
#[prost(message, tag = "9")]
ClearPayload(super::ClearPayloadPointsInternal),
#[prost(message, tag = "10")]
CreateFieldIndex(super::CreateFieldIndexCollectionInternal),
#[prost(message, tag = "11")]
DeleteFieldIndex(super::DeleteFieldIndexCollectionInternal),
}
Copy link
Contributor

@coderabbitai coderabbitai bot Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a dedicated payload message for the OverwritePayload variant
OverwritePayload currently re-uses SetPayloadPointsInternal.
Apart from being semantically confusing, it silently allows the key
field that overwrite operations must not accept (see
overwrite_payload_internal which ignores key). A dedicated
OverwritePayloadPointsInternal (without the key field) would let the
type-system catch such errors early.

-OverwritePayload(super::SetPayloadPointsInternal),
+OverwritePayload(super::OverwritePayloadPointsInternal),

If the dedicated message already exists in the proto folder, this is a
one-line swap; if it doesn’t, defining it keeps the contract crystal-clear.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is auto-generated file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@generall Understood—no manual changes will be suggested for auto-generated files. Any required changes should be made in the source definitions (e.g., .proto files).

Comment on lines +514 to +520
// If we are transferring the last batch, we need to wait for it to be applied.
// - Why can we not wait? Assuming that order of operations is still enforced by the WAL,
// we should end up in exactly the same state with or without waiting.
// - Why do we need to wait on the last batch? If we switch to ready state before
// updates are actually applied, we might create an inconsistency for read operations.
let wait = last_batch;

Copy link
Contributor

@coderabbitai coderabbitai bot Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

wait flag is applied to every op in the last batch, not only to the final one

wait is set to true for the whole last batch, which means every single forward_update in the fallback path will block until it’s applied.
The intended optimisation (wait only once – the very last op) is therefore lost and the last batch may take up to N × RTT.

-        let wait = last_batch;
+        // We only need to block on the *very last* operation if batching is unsupported.
+        // For batched transfer (`supports_update_batching == true`) the single RPC already
+        // covers the whole batch, so `wait` can stay as is.
+        let wait_for_last_op_only = last_batch;

Then, in the non-batched loop (see below) pass
wait: wait_for_last_op_only && i == batch.len() - 1.

This preserves the latency win introduced by the PR while still guaranteeing consistency for the final visible state.

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@generall Understood! If you're satisfied with the current implementation, we'll leave it as is. Thank you for considering the suggestion.

@generall generall merged commit 0ea6bc0 into dev Apr 29, 2025
17 checks passed
@generall generall deleted the faster-wal-delta branch April 29, 2025 17:31
generall added a commit that referenced this pull request May 22, 2025
* implement internal batch-update api

* use batch update in wal-delta

* fix ci
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant