Skip to content

Conversation

Teebor-Choka
Copy link
Contributor

@Teebor-Choka Teebor-Choka commented Nov 27, 2024

Minor improvements, tweaks and enhancements to improve the node throughput and overall functionality.

  • Increase and make configurable the msg and ack combined stream count
  • Introduce a limit on total concurrent active websocket connections

Notes

Fix #6676
Fix #6677

@Teebor-Choka Teebor-Choka self-assigned this Nov 27, 2024
Copy link
Contributor

coderabbitai bot commented Nov 27, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request includes modifications to several files, mainly focusing on dependency management in Cargo.toml, enhancements to WebSocket connection handling in the API, and updates to the README.md documentation. Key changes involve the introduction of new environment variables, improved error handling for WebSocket connections, and adjustments to the internal state management of the API. These changes aim to address issues related to managing multiple WebSocket connections effectively.

Changes

File Change Summary
Cargo.toml Updated dependency management, excluded vendor/cargo/scale-info-2.10.0, set resolver version to "2", and modified libp2p comment.
README.md Added two new environment variables: HOPR_INTERNAL_LIBP2P_MSG_ACK_MAX_TOTAL_STREAMS and HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT; corrected typographical errors.
hoprd/rest-api/src/lib.rs Added websocket_active_count to InternalState, introduced TooManyOpenWebsocketConnections to ApiErrorStatus, and updated response handling.
hoprd/rest-api/src/messages.rs Added HTTP status code 429 for WebSocket endpoint; refined error handling in send_message.
hoprd/rest-api/src/preconditions.rs Introduced is_a_websocket_uri function and cap_websockets middleware for managing WebSocket connections.
hoprd/rest-api/src/session.rs Updated WebSocket function to include status code 429; modified SessionWebsocketClientQueryRequest and SessionClientRequest structs to add new fields.
transport/p2p/src/lib.rs Enhanced HoprNetworkBehavior initialization with new parameter for concurrent streams from environment variable.
hoprd/rest-api/Cargo.toml Updated package version from "3.7.1" to "3.7.2".
transport/p2p/Cargo.toml Updated package version from "0.5.1" to "0.5.2".

Assessment against linked issues

Objective Addressed Explanation
Limit the number of WebSocket connections (#6676)
Improve error handling for high throughput (#6677)

Possibly related PRs

Suggested labels

toolchain, crate:core-network, testing

Suggested reviewers

  • NumberFour8
  • esterlus

📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between be1b675 and 9665598.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • hoprd/rest-api/Cargo.toml (1 hunks)
  • transport/p2p/Cargo.toml (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • hoprd/rest-api/Cargo.toml
  • transport/p2p/Cargo.toml

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@Teebor-Choka Teebor-Choka added bug Something isn't working component:p2p labels Nov 27, 2024
@Teebor-Choka Teebor-Choka added this to the 2.2.0-rc.1 milestone Nov 27, 2024
@github-actions github-actions bot added the dependencies Pull requests that update a dependency file label Nov 28, 2024
@Teebor-Choka Teebor-Choka marked this pull request as ready for review November 28, 2024 11:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (16)
hoprd/rest-api/src/preconditions.rs (2)

30-32: Handle parsing errors for maximum websocket count

Currently, if the environment variable HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT is set but cannot be parsed as a u16, it defaults to 10 without indicating an error. This could lead to unintended limits if there's a misconfiguration.

Consider handling the parsing error explicitly and logging a warning if the value is invalid:

 let max_websocket_count = std::env::var("HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT")
     .and_then(|v| v.parse::<u16>().map_err(|e| {
+        eprintln!("Invalid HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT: {}", e);
         std::env::VarError::NotPresent
     }))
     .unwrap_or(10);

16-21: Enhance scalability by generalizing websocket URI detection

The is_a_websocket_uri function currently checks for specific websocket paths. If more websocket endpoints are added in the future, this function will need updating.

Consider generalizing the detection to accommodate any websocket paths under a common pattern:

 fn is_a_websocket_uri(uri: &OriginalUri) -> bool {
-    const MESSAGES_PATH: &str = const_format::formatcp!("{BASE_PATH}/messages/websocket");
-    const SESSION_PATH: &str = const_format::formatcp!("{BASE_PATH}/session/websocket");
-
-    uri.path().starts_with(MESSAGES_PATH) || uri.path().starts_with(SESSION_PATH)
+    const WEBSOCKET_PATH_SUFFIX: &str = "/websocket";
+    uri.path().ends_with(WEBSOCKET_PATH_SUFFIX)
 }
hoprd/rest-api/src/session.rs (4)

Line range hint 265-271: Reconsider using HTTP status code 409 for address-in-use errors

The HTTP status code 409 ("Conflict") indicates a request conflict with the current state of the server. In the context of attempting to bind to an address that's already in use, it might be more appropriate to use status code 422 ("Unprocessable Entity") or 400 ("Bad Request") to reflect an invalid input error.

Consider changing the status code and error message to provide a clearer indication of the error:

-            return Err((StatusCode::CONFLICT, ApiErrorStatus::InvalidInput));
+            return Err((
+                StatusCode::UNPROCESSABLE_ENTITY,
+                ApiErrorStatus::UnknownFailure(format!("Address {} is already in use", bind_host)),
+            ));

Line range hint 495-505: Prevent potential panic in build_binding_host due to parsing errors

In the build_binding_host function, using unwrap_or after parsing can lead to a panic if requested is neither a valid IP address nor a port. This could cause the application to crash on invalid input.

Modify the function to handle parsing errors gracefully, preventing potential panics:

 fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
     match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
         Some(Err(requested)) => {
             // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
             debug!(requested, %default, "using partially default listen host");
-            std::net::SocketAddr::new(
-                requested.parse().unwrap_or(default.ip()),
-                requested
-                    .strip_prefix(":")
-                    .and_then(|p| u16::from_str(p).ok())
-                    .unwrap_or(default.port()),
-            )
+            let ip = requested.parse().ok().unwrap_or(default.ip());
+            let port = requested
+                .strip_prefix(":")
+                .and_then(|p| u16::from_str(p).ok())
+                .unwrap_or(default.port());
+            std::net::SocketAddr::new(ip, port)
         }

Line range hint 809-823: Add unit tests for additional edge cases in build_binding_host

The current tests for build_binding_host cover basic scenarios. Adding unit tests for edge cases, such as invalid IP addresses, malformed strings, and unusual inputs, would improve robustness.

Would you like assistance in creating more comprehensive unit tests for this function?


The websocket connection limit handling needs to be implemented

Based on the API documentation comment found in the code, the endpoint is expected to return a 406 status code when there are too many open websocket connections:

(status = 406, description = "Too many open websocket connections.", body = ApiError)

However, the implementation does not actually check for or enforce this connection limit. The current error handling only covers:

  • 422 (Unprocessable Entity) for general failures
  • 409 (Conflict) for invalid input
  • 404 (Not Found) for missing resources

Locations that need attention:

  • websocket function in hoprd/rest-api/src/session.rs should be updated to check the number of active connections and return a 406 status code when the limit is exceeded
🔗 Analysis chain

Line range hint 220-239: Ensure websocket function returns the correct status code when limiting connections

In the websocket function, when the maximum number of websocket connections is reached, ensure that it returns the appropriate HTTP status code (e.g., 429) and a descriptive error message to the client.

Run the following script to check if the websocket function returns the correct status code:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the `websocket` function returns StatusCode::TOO_MANY_REQUESTS when the connection limit is reached

rg -A 10 'async fn websocket' hoprd/rest-api/src/session.rs | rg 'StatusCode::TOO_MANY_REQUESTS'

Length of output: 96


Script:

#!/bin/bash
# Let's first check the websocket function implementation
rg -A 20 'async fn websocket' hoprd/rest-api/src/session.rs

# Also check for any error handling related to connection limits
rg -B 2 -A 5 'MAX_CONNECTIONS|max_connections' hoprd/rest-api/src/session.rs

Length of output: 1628


Script:

#!/bin/bash
# Let's check for any constants or configuration related to connection limits
rg -B 2 -A 2 'const.*CONNECTIONS|connections.*=|ConnectionLimit' hoprd/rest-api/src/session.rs

# Let's also check the entire file for any error handling patterns
rg -A 3 'StatusCode::|ApiErrorStatus::' hoprd/rest-api/src/session.rs

Length of output: 2918

transport/p2p/src/lib.rs (1)

130-149: Overall implementation aligns with PR objectives but could be more robust.

The changes successfully address the throughput improvements by making stream counts configurable. However, consider enhancing the implementation with:

  1. Documentation explaining the relationship between WebSocket connections and stream limits
  2. Metrics or logging to monitor stream usage
  3. Unit tests for the configuration parsing logic

Consider adding metrics to track:

  • Current number of active streams
  • Stream usage patterns
  • Configuration changes

This would help in:

  • Validating the effectiveness of the changes
  • Tuning the configuration values
  • Early detection of capacity issues
hoprd/rest-api/src/lib.rs (3)

330-337: Consider documenting the middleware ordering

The middleware stack is correctly ordered with authentication before connection capping. However, it would be helpful to add a comment explaining why this order is important, as future maintainers might not realize that authentication must precede connection limits to prevent unauthorized requests from affecting the connection count.

+        // Authentication must happen before connection capping to prevent unauthorized
+        // requests from affecting the connection count
         .layer(middleware::from_fn_with_state(
             inner_state.clone(),
             preconditions::authenticate,
         ))
         .layer(middleware::from_fn_with_state(
             inner_state,
             preconditions::cap_websockets,
         )),

389-389: Add documentation for the new error variant

While the error name is self-explanatory, it would be beneficial to add documentation explaining when this error occurs and what users should do when they encounter it.

     Unauthorized,
-    TooManyOpenWebsocketConnections,
+    /// Returned when attempting to open a new WebSocket connection would exceed
+    /// the maximum allowed concurrent connections (configured via HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT).
+    /// Users should either wait for existing connections to close or increase the limit if needed.
+    TooManyOpenWebsocketConnections,
     InvalidQuality,

82-82: Consider adding metrics for WebSocket connection tracking

To better monitor and understand the WebSocket connection patterns in production, consider adding metrics to track:

  1. Current number of active connections
  2. Number of rejected connections due to limits
  3. Connection duration statistics

This would help in tuning the connection limits and understanding usage patterns.

Would you like me to provide an example implementation using the existing prometheus middleware?

Also applies to: 257-257, 330-337, 389-389

README.md (2)

213-213: LGTM! Consider enhancing the documentation.

The new environment variable HOPR_INTERNAL_LIBP2P_MSG_ACK_MAX_TOTAL_STREAMS is well documented and directly addresses the node saturation issue. To make it even more helpful for users, consider adding:

  • Default value
  • Recommended values for different load scenarios
  • Impact on node performance

216-216: LGTM! Consider enhancing the documentation.

The new environment variable HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT is well documented and directly addresses the WebSocket connection limit issue. To make it even more helpful for users, consider adding:

  • Default value
  • Recommended values for different deployment scenarios (e.g., local testing vs production)
  • Warning about potential resource implications of setting too high values
hoprd/rest-api/src/messages.rs (4)

296-296: Consider using HTTP status code 503 instead of 406

The status code 406 (Not Acceptable) is typically used for content negotiation failures. For indicating "Too many open websocket connections", status code 503 (Service Unavailable) would be more appropriate as it better represents the temporary inability to handle the request due to resource exhaustion.

-            (status = 406, description = "Too many open websocket connections.", body = ApiError),
+            (status = 503, description = "Too many open websocket connections.", body = ApiError),

Line range hint 391-392: Improve error handling with proper error types

The current implementation uses string literals for error messages. Consider creating a dedicated error enum to:

  • Maintain consistent error messages
  • Provide structured error information
  • Enable better error handling by consumers
  • Facilitate future internationalization

Example implementation:

#[derive(Debug, thiserror::Error)]
pub enum WebSocketError {
    #[error("Missing destination")]
    MissingDestination,
    #[error("Invalid destination")]
    InvalidDestination,
    #[error("Invalid number of intermediate hops")]
    InvalidHops,
    #[error("One of hops or intermediate path must be provided")]
    MissingRoutingOptions,
}

// Then replace string errors with the enum
return Err(WebSocketError::MissingDestination);

Also applies to: 396-397, 401-402, 407-408


Line range hint 108-170: Extract duplicated routing options validation logic

The routing options validation logic is duplicated between send_message and handle_send_message. Consider extracting this into a separate function to:

  • Reduce code duplication
  • Ensure consistent validation
  • Simplify maintenance

Example implementation:

fn validate_routing_options(
    msg: &SendMessageBodyRequest,
    hopr: Arc<HoprNode>,
) -> Result<(PeerId, RoutingOptions), ApiErrorStatus> {
    // Extract common validation logic here
    // Return validated PeerId and RoutingOptions
}

// Then use in both functions:
let (peer_id, options) = validate_routing_options(&args, hopr)?;

Also applies to: 391-425


Line range hint 339-367: Enhance WebSocket connection handling

The current WebSocket connection handling could be improved in several ways:

  1. Add backpressure handling to prevent memory exhaustion
  2. Implement rate limiting for message sending
  3. Enhance error logging with more context

Consider implementing these improvements:

use tokio::time::{sleep, Duration};
use futures::stream::StreamExt;

async fn websocket_connection(socket: WebSocket, state: Arc<InternalState>) {
    let (mut sender, receiver) = socket.split();
    let ws_rx = state.websocket_rx.activate_cloned();
    
    // Add rate limiting
    let mut message_count = 0;
    let rate_limit = Duration::from_secs(1);
    let max_messages_per_second = 100;
    
    let mut queue = (
        receiver.map(WebSocketInput::WsInput),
        ws_rx.map(WebSocketInput::Network),
    )
        .merge()
        .throttle(rate_limit);

    while let Some(v) = queue.next().await {
        // Rate limiting check
        message_count += 1;
        if message_count > max_messages_per_second {
            sleep(rate_limit).await;
            message_count = 0;
        }
        
        match v {
            WebSocketInput::Network(net_in) => {
                if let Err(e) = sender
                    .send(Message::Text(json!(WebSocketReadMsg::from(net_in)).to_string()))
                    .await
                {
                    error!(
                        error = %e,
                        connection_id = ?socket.connection_id(),
                        "Failed to emit read data onto websocket"
                    );
                    break; // Break on send errors to prevent resource exhaustion
                };
            }
            // ... rest of the match arms
        }
    }
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f1d908c and d6a097b.

📒 Files selected for processing (7)
  • Cargo.toml (1 hunks)
  • README.md (1 hunks)
  • hoprd/rest-api/src/lib.rs (5 hunks)
  • hoprd/rest-api/src/messages.rs (1 hunks)
  • hoprd/rest-api/src/preconditions.rs (2 hunks)
  • hoprd/rest-api/src/session.rs (1 hunks)
  • transport/p2p/src/lib.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • Cargo.toml
🔇 Additional comments (5)
hoprd/rest-api/src/preconditions.rs (2)

Line range hint 75-92: Confirm authentication for websocket connections

The authentication logic for websockets relies on query parameters, which may not be secure. Ensure that authentication headers are properly handled for websocket connections.

Run the following script to check if authentication headers are passed correctly in websocket requests:

Consider implementing a more secure method for authenticating websocket connections, such as using cookies or tokens in the connection upgrade request headers.


35-46: ⚠️ Potential issue

Ensure websocket_active_count is decremented when the websocket connection closes

The websocket_active_count is incremented when a websocket connection is accepted but only decremented immediately if the connection is rejected. To prevent the active count from increasing indefinitely, ensure that websocket_active_count is decremented when the websocket connection closes.

Run the following script to verify if websocket_active_count is decremented upon websocket closure:

transport/p2p/src/lib.rs (1)

143-149: Verify if message and acknowledgment protocols need identical configurations.

The acknowledgment protocol uses the same configuration as the message protocol. While this might be intentional, consider:

  1. Whether both protocols require the same number of concurrent streams
  2. If separate environment variables would provide better control
  3. If the timeout values should also be configurable via environment variables

Let's analyze the usage patterns of both protocols:

✅ Verification successful

Based on the gathered evidence, I can now provide a final response:

Identical configurations for message and acknowledgment protocols are appropriate

The code analysis reveals that both message and acknowledgment protocols are part of the same request-response flow, where:

  1. Both protocols share the same timeout (MSG_ACK_TIMEOUT) as they are part of the same message acknowledgment cycle
  2. Both use the same environment variable HOPR_INTERNAL_LIBP2P_MSG_ACK_MAX_TOTAL_STREAMS for concurrent streams, which is appropriate as they handle related traffic
  3. The default value of 1024 streams is consistent with other protocol configurations in the codebase (e.g., Yamux transport)

The shared configuration is intentional and well-designed for this use case, as the acknowledgment protocol is tightly coupled with the message protocol, requiring similar performance characteristics and resource limits.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Analyze message and acknowledgment protocol usage patterns

# Search for message protocol usage
echo "Message protocol usage:"
rg -A 5 "HOPR_MESSAGE_PROTOCOL_V_0_1_0"

# Search for acknowledgment protocol usage
echo "Acknowledgment protocol usage:"
rg -A 5 "HOPR_ACKNOWLEDGE_PROTOCOL_V_0_1_0"

# Search for any configuration or documentation about these protocols
echo "Protocol configuration documentation:"
rg -l "message.*protocol|acknowledge.*protocol" "*.md"

Length of output: 3200


Script:

#!/bin/bash
# Let's analyze the protocol implementations to understand their usage patterns

# Search for message protocol handler implementations
echo "Message protocol handler implementations:"
ast-grep --pattern 'impl $_ for MessageHandler { $$$  }'

# Search for acknowledgment protocol handler implementations
echo "Acknowledgment protocol handler implementations:"
ast-grep --pattern 'impl $_ for AcknowledgmentHandler { $$$ }'

# Search for protocol usage in tests or examples
echo "Protocol usage in tests:"
rg -B2 -A5 "msg.*protocol|ack.*protocol" "*test*.rs"

# Look for protocol configuration usage
echo "Protocol configuration usage:"
rg -B2 -A2 "with_max_concurrent_streams|with_request_timeout" --type rust

Length of output: 323


Script:

#!/bin/bash
# Let's try a different approach to analyze the protocols

# Look for struct definitions and their fields
echo "Protocol handler struct definitions:"
ast-grep --pattern 'struct $_($$_) { $$$ }'

# Search for any protocol-related type definitions and implementations
echo "Protocol-related implementations:"
rg -B2 -A5 "MessageHandler|AcknowledgmentHandler" --type rust

# Search for environment variable usage patterns
echo "Environment variable usage for protocols:"
rg -B2 -A2 "HOPR_INTERNAL_LIBP2P" --type rust

# Look for request/response protocol configurations
echo "Request/Response protocol configurations:"
rg -B3 -A3 "request_response::Config" --type rust

Length of output: 17895

hoprd/rest-api/src/lib.rs (2)

257-257: LGTM: Proper initialization of atomic counter

The counter is correctly initialized as a thread-safe atomic value starting at 0.


82-82: Verify if u16 is sufficient for websocket_active_count

The AtomicU16 type limits the maximum concurrent WebSocket connections to 65,535. Based on issue #6676, the requirement is to handle up to 600 connections per node, so this limit should be sufficient. However, let's verify there are no other requirements that might need a higher limit.

✅ Verification successful

Let me gather more information about the specific WebSocket connection limit configuration.


Using AtomicU16 for websocket_active_count is appropriate

The choice of AtomicU16 (max 65,535) for tracking WebSocket connections is well within requirements:

  • Default limit is set to 10 concurrent connections
  • Configurable via HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT
  • Historical precedent shows max connections of 1024-2000 in other parts of the system
  • Issue #6676 target of 600 connections is well below the type's capacity
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any configuration or documentation that might indicate required connection limits
rg -i "concurrent|connection.*limit|max.*connection" README.md docs/

Length of output: 823


Script:

#!/bin/bash
# Search for the specific environment variable usage and its default value
rg -A 5 "HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT" --type rust

Length of output: 589

@Teebor-Choka Teebor-Choka force-pushed the kauki/libp2p/tweaking-the-internal-parameters-for-session-throughput branch from 3c9e298 to 6e0481b Compare November 28, 2024 11:40
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (5)
hoprd/rest-api/src/preconditions.rs (2)

30-32: Consider distinguishing between missing and invalid environment variable values.

The current implementation treats parsing errors the same as a missing variable. This could mask configuration issues where an invalid value is provided.

Consider this approach:

-    let max_websocket_count = std::env::var("HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT")
-        .and_then(|v| v.parse::<u16>().map_err(|_e| std::env::VarError::NotPresent))
-        .unwrap_or(10);
+    let max_websocket_count = match std::env::var("HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT") {
+        Ok(v) => v.parse::<u16>().unwrap_or_else(|e| {
+            tracing::warn!("Invalid HOPR_INTERNAL_REST_API_MAX_CONCURRENT_WEBSOCKET_COUNT value: {}", e);
+            10
+        }),
+        Err(_) => 10,
+    };

Line range hint 75-89: Consider refactoring the WebSocket authentication logic for better readability.

The nested conditions and multiple authentication methods make the code harder to maintain.

Consider extracting the WebSocket authentication logic into a separate function:

+fn authenticate_websocket(uri: &OriginalUri, expected_token: &str) -> bool {
+    uri.query()
+        .map(|q| {
+            if q.len() > 2048 {
+                return false;
+            }
+            decode(q)
+                .map(|decoded| decoded.into_owned().contains(&format!("apiToken={}", expected_token)))
+                .unwrap_or(false)
+        })
+        .unwrap_or(false)
+}
+
 let is_ws_auth = if is_a_websocket_uri(&uri) {
-    uri.query()
-        .map(|q| {
-            // Reasonable limit for query string
-            if q.len() > 2048 {
-                return false;
-            }
-            match decode(q) {
-                Ok(decoded) => decoded.into_owned().contains(&format!("apiToken={}", expected_token)),
-                Err(_) => false,
-            }
-        })
-        .unwrap_or(false)
+    authenticate_websocket(&uri, expected_token)
 } else {
     false
 };
hoprd/rest-api/src/session.rs (3)

Line range hint 236-238: Consider tuning buffer size for improved throughput

The current WS_MAX_SESSION_READ_SIZE of 4096 bytes might be a bottleneck for high-throughput scenarios. Consider making this configurable or increasing it based on your performance requirements.

-const WS_MAX_SESSION_READ_SIZE: usize = 4096;
+// Increased buffer size for better throughput
+const WS_MAX_SESSION_READ_SIZE: usize = 16384; // 16KB

Line range hint 241-308: Add timeout handling for WebSocket connections

The WebSocket connection handler could benefit from timeout handling to prevent resource exhaustion from stale connections, especially important given the PR's focus on connection management.

Consider adding a timeout mechanism:

use tokio::time::{timeout, Duration};

const WS_TIMEOUT: Duration = Duration::from_secs(30);

async fn websocket_connection(socket: WebSocket, session: HoprSession) {
    let session_id = *session.id();
    let (rx, mut tx) = session.split();
    let (mut sender, receiver) = socket.split();
    
    let mut queue = (
        receiver.map(WebSocketInput::WsInput),
        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
    ).merge();

    let mut bytes_to_session = 0;
    let mut bytes_from_session = 0;

    while let Some(v) = timeout(WS_TIMEOUT, queue.next()).await.transpose() {
        match v {
            Some(input) => {
                // existing input handling...
            }
            None => {
                info!("WebSocket connection timed out");
                break;
            }
        }
    }
}

Line range hint 644-648: Enhance monitoring for network transfers

Given the focus on improving throughput, consider adding more detailed metrics and logging for network transfers.

Add metrics for:

  • Connection establishment latency
  • Transfer rates
  • Connection counts by protocol
  • Error rates
#[cfg(feature = "prometheus")]
lazy_static::lazy_static! {
    static ref METRIC_TRANSFER_BYTES: prometheus::IntCounterVec = prometheus::IntCounterVec::new(
        prometheus::Opts::new(
            "hopr_session_transfer_bytes",
            "Number of bytes transferred through sessions"
        ),
        &["direction", "protocol"]
    ).unwrap();
    
    static ref METRIC_TRANSFER_LATENCY: prometheus::HistogramVec = prometheus::HistogramVec::new(
        prometheus::HistogramOpts::new(
            "hopr_session_transfer_latency",
            "Latency of session data transfers"
        ),
        &["protocol"]
    ).unwrap();
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between d6a097b and 6e0481b.

📒 Files selected for processing (3)
  • hoprd/rest-api/src/messages.rs (1 hunks)
  • hoprd/rest-api/src/preconditions.rs (2 hunks)
  • hoprd/rest-api/src/session.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • hoprd/rest-api/src/messages.rs
🧰 Additional context used
📓 Learnings (1)
hoprd/rest-api/src/preconditions.rs (1)
Learnt from: Teebor-Choka
PR: hoprnet/hoprnet#6683
File: hoprd/rest-api/src/preconditions.rs:37-45
Timestamp: 2024-11-28T11:36:08.684Z
Learning: When using `AtomicU16::fetch_add` in Rust, `fetch_add` returns the previous value before addition, so comparisons should account for this behavior.
🔇 Additional comments (3)
hoprd/rest-api/src/preconditions.rs (2)

16-21: LGTM! Well-structured WebSocket URI validation.

The function efficiently validates WebSocket URIs using compile-time string concatenation and simple prefix checking.


Line range hint 75-100: Review security implications of multiple authentication methods.

The code allows multiple authentication methods (Bearer token, X-Auth-Token header, WebSocket protocol, query parameter) which increases the attack surface.

Let's analyze the authentication methods usage:

hoprd/rest-api/src/session.rs (1)

179-180: LGTM! Appropriate use of HTTP 429 status code

The implementation correctly uses HTTP 429 (Too Many Requests) for indicating WebSocket connection limits, which is the proper status code for this scenario.

@Teebor-Choka Teebor-Choka force-pushed the kauki/libp2p/tweaking-the-internal-parameters-for-session-throughput branch from 6e0481b to be1b675 Compare November 28, 2024 11:46
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
hoprd/rest-api/src/preconditions.rs (2)

16-21: Consider adding path validation for security.

While the path checking logic is correct, consider adding validation to protect against malformed paths that might contain null bytes or other invalid characters.

 fn is_a_websocket_uri(uri: &OriginalUri) -> bool {
     const MESSAGES_PATH: &str = const_format::formatcp!("{BASE_PATH}/messages/websocket");
     const SESSION_PATH: &str = const_format::formatcp!("{BASE_PATH}/session/websocket");
 
-    uri.path().starts_with(MESSAGES_PATH) || uri.path().starts_with(SESSION_PATH)
+    let path = uri.path();
+    if path.contains('\0') || path.contains("..") {
+        return false;
+    }
+    path.starts_with(MESSAGES_PATH) || path.starts_with(SESSION_PATH)
 }

40-44: Enhance error response with connection limits.

Consider including the current connection count and maximum limit in the error response to help clients better understand the resource constraints.

-            return (
-                StatusCode::TOO_MANY_REQUESTS,
-                ApiErrorStatus::TooManyOpenWebsocketConnections,
-            )
-                .into_response();
+            return (
+                StatusCode::TOO_MANY_REQUESTS,
+                ApiErrorStatus::TooManyOpenWebsocketConnections {
+                    current: ws_count.load(Relaxed),
+                    maximum: max_websocket_count,
+                },
+            )
+                .into_response();
hoprd/rest-api/src/session.rs (1)

Line range hint 258-265: Enhance error monitoring for WebSocket connections

Consider adding more detailed metrics for better throughput monitoring:

  • Connection duration
  • Error type distribution
  • Buffer utilization rates
  • Message size distribution

This would help identify performance bottlenecks and optimize throughput.

Example addition:

    while let Some(v) = queue.next().await {
+       let start_time = std::time::Instant::now();
        match v {
            WebSocketInput::Network(bytes) => match bytes {
                Ok(bytes) => {
                    let len = bytes.len();
+                   metrics::histogram!("websocket.message.size", len as f64);
                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
                        error!(
                            error = %e,
+                           duration = ?start_time.elapsed(),
                            "Failed to emit read data onto the websocket, closing connection"
                        );
                        break;
                    };
                    bytes_from_session += len;
                }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 6e0481b and be1b675.

📒 Files selected for processing (3)
  • hoprd/rest-api/src/messages.rs (1 hunks)
  • hoprd/rest-api/src/preconditions.rs (2 hunks)
  • hoprd/rest-api/src/session.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • hoprd/rest-api/src/messages.rs
🧰 Additional context used
📓 Learnings (1)
hoprd/rest-api/src/preconditions.rs (1)
Learnt from: Teebor-Choka
PR: hoprnet/hoprnet#6683
File: hoprd/rest-api/src/preconditions.rs:37-45
Timestamp: 2024-11-28T11:36:08.684Z
Learning: When using `AtomicU16::fetch_add` in Rust, `fetch_add` returns the previous value before addition, so comparisons should account for this behavior.
🔇 Additional comments (6)
hoprd/rest-api/src/preconditions.rs (3)

11-11: LGTM! Appropriate use of atomic ordering.

The Relaxed ordering is suitable for the connection counter as we don't need strict synchronization guarantees in this context.

Also applies to: 14-14


Line range hint 75-89: LGTM! Good security practices in authentication.

The changes demonstrate good security practices:

  • Reasonable query string length limit
  • Clear documentation about browser limitations
  • Proper URL decoding with error handling

30-32: ⚠️ Potential issue

Increase the default WebSocket connection limit.

The default limit of 10 concurrent WebSocket connections is insufficient for the system requirements. According to issue #6676, the system needs to handle up to 600 concurrent WebSocket connections in a local cluster setup.

-        .unwrap_or(10);
+        .unwrap_or(600);

Likely invalid or redundant comment.

hoprd/rest-api/src/session.rs (3)

179-179: LGTM! Appropriate use of HTTP 429 status code

The addition of HTTP 429 "Too Many Requests" status code is the correct choice for indicating WebSocket connection limits, following REST API best practices.


Line range hint 32-34: Consider buffer size tuning for improved throughput

The current buffer sizes might need optimization for better throughput:

  • TCP buffer: 4KB
  • UDP buffer: 16KB
  • UDP queue: 8KB

Consider:

  1. Increasing buffer sizes for better throughput during high load
  2. Making these values configurable via environment variables
  3. Adding metrics to monitor buffer utilization

Let's verify the current buffer utilization:

Also applies to: 36-38


Line range hint 644-648: Verify UDP packet handling during high load

The current UDP implementation discards data from all clients except the first one. While this prevents unauthorized access, verify that it doesn't impact legitimate reconnection attempts during high load scenarios.

Let's check for any reported UDP connection issues:

✅ Verification successful

Let me gather more specific information about our UDP implementation.


Let me gather more specific information about the UDP implementation and packet handling.


Let me try a different search pattern to understand the UDP implementation.


Based on the code analysis, I can now provide a definitive response about the UDP packet handling concern.

UDP packet handling during high load is properly managed

The implementation uses reasonable buffer and queue sizes to handle UDP traffic:

  • UDP buffer size is set to 16KB (HOPR_UDP_BUFFER_SIZE = 16384)
  • Queue size for back-pressure is set to 8KB (HOPR_UDP_QUEUE_SIZE = 8192)
  • The ForeignDataMode::Discard setting is intentional for security, only affecting unauthorized clients, not legitimate reconnection attempts

The configuration provides sufficient buffering for legitimate traffic while maintaining security. Reconnection attempts from the original client are not impacted by the foreign data mode as they would reestablish the connection as the primary client.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for UDP-related error patterns
rg -A 3 "UDP.*error|UDP.*failed|packet.*drop" 

# Look for any UDP-related issues in tests
ast-grep --pattern 'test.*UDP|UDP.*test'

Length of output: 15572


Script:

#!/bin/bash
# Search for UDP stream implementation details
rg -A 5 "ConnectedUdpStream|ForeignDataMode::Discard" 

# Look for UDP connection handling code
ast-grep --pattern 'struct ConnectedUdpStream {
  $$$
}'

# Search for UDP-related tests
rg -A 5 "test.*udp|#\[test\].*UDP"

Length of output: 136697


Script:

#!/bin/bash
# Search for UDP stream implementation and packet handling
rg -A 5 "ForeignDataMode::Discard.*packet|drop.*UDP|UDP.*drop" hoprd/

# Look for UDP connection and reconnection handling
ast-grep --pattern 'fn connect_udp($$$)' hoprd/

# Search for UDP-related tests that might show connection behavior
rg -A 5 "test.*udp.*connect|#\[test\].*UDP.*reconnect" hoprd/

Length of output: 185


Script:

#!/bin/bash
# Look for UDP stream and connection handling code
rg -A 5 "ConnectedUdpStream.*with_foreign_data_mode" hoprd/

# Search for UDP reconnection logic
rg -A 5 "UDP.*reconnect|reconnect.*UDP" hoprd/

# Look for UDP connection tests
rg -A 5 "test.*UDP.*connection|UDP.*test.*connect" hoprd/

# Search for UDP packet handling configuration
rg -A 5 "HOPR_UDP|UDP_BUFFER|UDP_QUEUE" hoprd/

Length of output: 4574

Copy link
Contributor

@tolbrino tolbrino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crate version bumps missing.

@Teebor-Choka Teebor-Choka merged commit c440fc3 into master Nov 28, 2024
28 checks passed
@Teebor-Choka Teebor-Choka deleted the kauki/libp2p/tweaking-the-internal-parameters-for-session-throughput branch November 28, 2024 13:21
@coderabbitai coderabbitai bot mentioned this pull request Jan 27, 2025
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working component:p2p crate:core-p2p crate:hoprd-api dependencies Pull requests that update a dependency file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The nodes saturates and closes the session with high throughput Cannot open too many websocket connections
2 participants