Skip to content

Conversation

Teebor-Choka
Copy link
Contributor

This pull request introduces a significant refactor across multiple modules to replace the use of JoinHandle with AbortHandle for managing asynchronous tasks. This change improves task cancellation capabilities and aligns the codebase with modern async patterns. The changes span several files and include updates to function signatures, imports, and task spawning logic.

Key Changes by Theme:

Transition to AbortHandle:

  • Replaced JoinHandle with AbortHandle in task management across multiple modules, including chain/api, chain/indexer, hopr-lib, hoprd, and hoprd/rest-api. This required updating function signatures, return types, and task storage structures. [1] [2] [3] [4] [5]

Task Spawning Updates:

  • Introduced and utilized a new utility function spawn_as_abortable in common/async-runtime to spawn tasks with AbortHandle support. This function wraps tasks in an abortable future and returns the corresponding AbortHandle. [1] [2] [3] [4]

Dependency and Import Adjustments:

  • Added futures as a dependency in Cargo.toml to access AbortHandle and related utilities. Updated imports across files to include AbortHandle and remove JoinHandle where no longer needed. [1] [2] [3]

Test and Integration Updates:

  • Updated test logic in chain_integration_tests.rs to use AbortHandle for task management. Adjusted cleanup logic to call .abort() on AbortHandle instances instead of canceling JoinHandle. [1] [2]

REST API Session Management:

  • Refactored session handling in hoprd/rest-api/src/session.rs to store AbortHandle in StoredSessionEntry instead of JoinHandle. Updated task spawning for session listeners to use spawn_as_abortable. [1] [2]

These changes collectively enhance the codebase's robustness and maintainability by adopting a more flexible and modern approach to asynchronous task management.

@Teebor-Choka Teebor-Choka added this to the 3.1.0 milestone Jun 16, 2025
@Teebor-Choka Teebor-Choka self-assigned this Jun 16, 2025
Copy link
Contributor

coderabbitai bot commented Jun 16, 2025

📝 Walkthrough

Walkthrough

This change refactors asynchronous task management across multiple crates by replacing the use of JoinHandle with AbortHandle and transitioning from spawning tasks with spawn to spawn_as_abortable. The update standardizes task cancellation and abortion, affecting method signatures, struct fields, and test logic throughout the codebase. Several package versions are also incremented.

Changes

File(s) Change Summary
chain/api/Cargo.toml, common/async-runtime/Cargo.toml, transport/api/Cargo.toml,
transport/probe/Cargo.toml, transport/protocol/Cargo.toml, transport/session/Cargo.toml
Incremented package version numbers.
common/async-runtime/src/lib.rs Removed cancel_join_handle, added spawn_as_abortable function and re-exported AbortHandle.
chain/api/src/lib.rs, chain/indexer/src/block.rs, hopr/hopr-lib/src/lib.rs,
hoprd/hoprd/src/main.rs, hoprd/rest-api/src/session.rs, transport/api/src/lib.rs,
transport/probe/src/probe.rs, transport/protocol/src/lib.rs, transport/session/src/manager.rs
Replaced JoinHandle with AbortHandle in method signatures, struct fields, and task spawning; updated imports and spawning logic to use spawn_as_abortable.
hopr/hopr-lib/tests/chain_integration_tests.rs, transport/session/src/manager.rs (tests) Updated tests to use AbortHandle and abort() for task cancellation instead of JoinHandle and cancel_join_handle.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant AsyncRuntime
    participant Task

    Caller->>AsyncRuntime: spawn_as_abortable(future)
    AsyncRuntime->>Task: spawn abortable(future)
    AsyncRuntime-->>Caller: AbortHandle

    Note over Caller,Task: Caller uses AbortHandle to abort the task when needed
    Caller->>Task: AbortHandle.abort()
    Task-->>Caller: Task is aborted
Loading

Suggested labels

toolchain, crate:chain-api, crate:chain-indexer

Suggested reviewers

  • tolbrino
  • NumberFour8
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@NumberFour8
Copy link
Contributor

Closes #7113

@Teebor-Choka Teebor-Choka marked this pull request as ready for review June 16, 2025 12:47
@Teebor-Choka Teebor-Choka modified the milestones: 3.1.0, 3.0.0 Jun 16, 2025
@Teebor-Choka Teebor-Choka enabled auto-merge (squash) June 16, 2025 12:51
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (2)
chain/indexer/src/block.rs (1)

339-345: Abort the background task on early‐error to avoid leakage.

If rx.next().await yields None you return an error but the indexing task (indexing_abort_handle) is left running in the background, potentially holding DB connections and CPU.
Ensure cleanup before bubbling the error:

-        } else {
-            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
-                "Error during indexing start".into(),
-            ))
+        } else {
+            // stop the spawned task – it is either unhealthy or
+            // entered its completion path already
+            indexing_abort_handle.abort();
+            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
+                "Error during indexing start".into(),
+            ))
         }
transport/probe/src/probe.rs (1)

179-181: Avoid moving out of self – clone the tuple instead.

let me = self.me; moves the (OffchainPublicKey, Address) out of self.
If either field is not Copy, the compiler will complain that self is partially moved and the rest of the method can no longer use it.
Clone (or borrow) instead:

-                    let me = self.me;
+                    let me = self.me.clone();
🧹 Nitpick comments (17)
common/async-runtime/Cargo.toml (1)

20-26: Specify default-features = false if you only need AbortHandle.

futures pulls in a fair amount of optional code.
If the crate only relies on AbortHandle / abortable, disable the default feature set to keep your compile graph lean:

-futures = { workspace = true }
+futures = { workspace = true, default-features = false }
transport/protocol/src/lib.rs (2)

72-73: Redundant Box::pin indirection.

spawn_as_abortable accepts any Send + 'static future; execute_on_tick(...) already returns one.
Dropping the heap allocation trims an indirection and a Pin<Box<..>>:

-spawn_as_abortable(Box::pin(execute_on_tick(
+spawn_as_abortable(execute_on_tick(

155-156: Prefer a single AbortHandle type path for clarity.

Here the public signature binds to hopr_async_runtime::AbortHandle while the implementation receives the handle directly from futures::future::AbortHandle via spawn_as_abortable.
Keeping only the re-exported path avoids accidental type mismatch if the runtime stops re-exporting:

pub async fn run_msg_ack_protocol<Db>(...) 
    -> HashMap<ProtocolProcesses, AbortHandle> // import AbortHandle from hopr_async_runtime
hoprd/hoprd/src/main.rs (1)

258-270: Consider awaiting terminated tasks for graceful shutdown.

Calling AbortHandle::abort() is fire-and-forget; long-running clean-up work inside the tasks (flush logs, close sockets, persist state) may be dropped mid-flight.
If predictable shutdown is desirable, keep the JoinHandle (or a completion channel) and await it after aborting, or add a short timeout sleep before exiting.

hoprd/rest-api/src/session.rs (1)

900-906: Ignore abort() result?

AbortHandle::abort() is infallible but instant; if you want to be sure the listener is actually gone (e.g. to reclaim the port before recreating), keep the matching AbortRegistration or a completion signal and wait for confirmation before returning 204.

transport/probe/src/probe.rs (1)

173-177: Minor readability nit – pull the future out before spawning.

Long, nested expressions inside spawn_as_abortable are harder to read and debug. Extract the future into a variable first, then spawn:

-            hopr_async_runtime::spawn_as_abortable(direct_neighbors
-                .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
+            {
+                let fut = direct_neighbors.for_each_concurrent(
+                    max_parallel_probes,
+                    move |(peer, notifier)| {
                         /* … */
-                })
-            )
+                    },
+                );
+                hopr_async_runtime::spawn_as_abortable(fut)
+            }
hopr/hopr-lib/tests/chain_integration_tests.rs (1)

807-808: Un-necessary async/join_all around abort()

AbortHandle::abort() is synchronous and returns ().
Wrapping each call in an async block and then join_alling adds overhead without benefit:

-futures::future::join_all(alice_node.node_tasks.into_iter().map(|ah| async move { ah.abort() })).await;
-futures::future::join_all(bob_node.node_tasks.into_iter().map(|ah| async move { ah.abort() })).await;
+for ah in alice_node.node_tasks { ah.abort(); }
+for ah in bob_node.node_tasks { ah.abort(); }
hopr/hopr-lib/src/lib.rs (2)

907-926: Session-server task is now abortable but for_each_concurrent ignores abort errors

AbortHandle::abort() will immediately short-circuit the stream future, but the internal closure swallows the Aborted result.
If you want to surface early termination (for metrics / debug), wrap the spawn in an async block and log the Err(Aborted).

That said, functionally this change is correct.


976-994: Strategy-tick now abortable – potential minor improvement

Aborting will instantaneously end the outer async block, but any in-flight on_tick() awaits will still run to completion.
If finer-grained cancellation is ever required, consider passing a cancellation token into on_tick().

No blocking change required for this PR.

common/async-runtime/src/lib.rs (1)

26-34: spawn_as_abortable drops the JoinHandle – consider exposing it

Right now the JoinHandle is assigned to _jh and immediately dropped, which means:

  1. Panics inside the task are not observable.
  2. You cannot .await completion for graceful shutdown.

A lightweight enhancement would be to return (AbortHandle, JoinHandle<Result<T, Aborted>>):

-pub fn spawn_as_abortable<F, T>(f: F) -> AbortHandle
+pub fn spawn_as_abortable<F, T>(f: F) -> (AbortHandle, prelude::JoinHandle<Result<T, futures::future::Aborted>>)
@@
-    let _jh = prelude::spawn(proc);
-    abort_handle
+    let jh = prelude::spawn(proc);
+    (abort_handle, jh)

Consumers that do not care about the JoinHandle can simply ignore the second tuple element, preserving ergonomics while enabling richer lifecycle management when needed.

transport/session/src/manager.rs (3)

356-362: Prefer a single AbortHandle path for clarity

start currently returns Vec<hopr_async_runtime::AbortHandle> while the file already imports futures::future::AbortHandle as AbortHandle.
Unless hopr_async_runtime::AbortHandle is a public type-alias, mixing the two paths is needlessly confusing.

-) -> crate::errors::Result<Vec<hopr_async_runtime::AbortHandle>> {
+) -> crate::errors::Result<Vec<AbortHandle>> {

Keeps the code self-consistent and avoids surprising type-path mismatches.


370-393: Loss of panic visibility after switching to spawn_as_abortable

spawn_as_abortable discards the underlying JoinHandle; panics in the spawned future will therefore be silently dropped.
If observing task failure is important, consider wrapping the future and logging / metrics-recording the result before returning the AbortHandle.


1183-1185: Unnecessary async hop when aborting handles in tests

AbortHandle::abort() is synchronous, so spawning an async loop adds overhead for no gain.

-futures::stream::iter(ahs)
-    .for_each(|ah| async move { ah.abort() })
-    .await;
+for ah in ahs {
+    ah.abort();
+}

Reduces test runtime and avoids an extra task.

transport/api/src/lib.rs (4)

39-40: Shadow-import of AbortHandle

The import adds AbortHandle from hopr_async_runtime, yet several modules rely on the re-export from futures.
Pick one path consistently (ideally the short unqualified one imported here) to avoid ambiguity.


271-272: Return type path consistency

Same note as in SessionManager::start: prefer the already-imported AbortHandle instead of the fully-qualified one for readability:

-) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
+) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {

(only the path changed in the diff ‑ no semantic change).


380-382: HashMap now stores only AbortHandles – make key names reflect that

Variable names like processes still refer to “processes”, but inner variables later named jh were not adjusted (jh now holds an AbortHandle).
Renaming avoids mental friction for future readers.


542-566: Long-running dispatch loop swallows errors

Inside the abortable future, any error logged in dispatch_message is discarded.
If unintended, bubble the error out so test/ops can detect mis-behaviour before aborting:

if let Err(e) = smgr.dispatch_message(pseudonym, data).await {
    error!(%e, "error…");
    return Err(e); // let outer task log/panic if needed
}

Even logging the first error and breaking out would help operational visibility.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dbbf429 and 652dde2.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (17)
  • chain/api/Cargo.toml (1 hunks)
  • chain/api/src/lib.rs (2 hunks)
  • chain/indexer/src/block.rs (4 hunks)
  • common/async-runtime/Cargo.toml (2 hunks)
  • common/async-runtime/src/lib.rs (2 hunks)
  • hopr/hopr-lib/src/lib.rs (7 hunks)
  • hopr/hopr-lib/tests/chain_integration_tests.rs (4 hunks)
  • hoprd/hoprd/src/main.rs (4 hunks)
  • hoprd/rest-api/src/session.rs (5 hunks)
  • transport/api/Cargo.toml (1 hunks)
  • transport/api/src/lib.rs (5 hunks)
  • transport/probe/Cargo.toml (1 hunks)
  • transport/probe/src/probe.rs (4 hunks)
  • transport/protocol/Cargo.toml (1 hunks)
  • transport/protocol/src/lib.rs (5 hunks)
  • transport/session/Cargo.toml (1 hunks)
  • transport/session/src/manager.rs (10 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
transport/protocol/src/lib.rs (2)
common/async-runtime/src/lib.rs (1)
  • spawn_as_abortable (26-34)
transport/protocol/src/timer.rs (1)
  • execute_on_tick (15-43)
transport/api/src/lib.rs (1)
common/async-runtime/src/lib.rs (1)
  • spawn_as_abortable (26-34)
hopr/hopr-lib/src/lib.rs (2)
common/async-runtime/src/lib.rs (1)
  • spawn_as_abortable (26-34)
transport/protocol/src/timer.rs (1)
  • execute_on_tick (15-43)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Linter
  • GitHub Check: hopli / docker
  • GitHub Check: zizmor
  • GitHub Check: Build Candidate Binaries (hopli)
  • GitHub Check: tests-unit-nightly
  • GitHub Check: tests-unit
  • GitHub Check: Build Candidate Binaries (hoprd)
  • GitHub Check: Cargo Audit
🔇 Additional comments (13)
transport/session/Cargo.toml (1)

3-3: Version bump aligns with breaking changes.

The minor version increment from 0.8.0 to 0.9.0 correctly reflects the breaking migration from JoinHandle to AbortHandle semantics under 0.x semver conventions.

transport/api/Cargo.toml (1)

3-3: Minor version bump signals breaking API change.

Updating from 0.11.0 to 0.12.0 is appropriate to indicate the refactor of task spawning and cancellation from JoinHandle to AbortHandle.

transport/probe/Cargo.toml (1)

3-3: Incremented minor version for breaking task API change.

The change from 0.1.0 to 0.2.0 correctly denotes the shift to abortable futures in the probe crate.

chain/api/Cargo.toml (1)

3-3: Appropriate version bump for breaking async runtime update.

Bumping hopr-chain-api from 0.5.1 to 0.6.0 signals the breaking transition to AbortHandle-based task management.

transport/protocol/Cargo.toml (1)

3-3: Reflect breaking change via minor version bump.

The upgrade from 0.7.0 to 0.8.0 is consistent with semver practices for breaking async API changes.

chain/api/src/lib.rs (1)

248-252: Consider propagating failures from action_queue.start()

spawn_as_abortable(self.action_queue.clone().start()) will discard any error returned by start(), whereas the previous JoinHandle allowed awaiting the result and propagating it.
If start() can fail, wrap it to log/convert the error before spawning:

-let handle = spawn_as_abortable(self.action_queue.clone().start());
+let handle = spawn_as_abortable(async {
+    if let Err(e) = self.action_queue.clone().start().await {
+        tracing::error!(%e, "OutgoingOnchainActionQueue terminated unexpectedly");
+    }
+});
hopr/hopr-lib/src/lib.rs (5)

34-40: Import of AbortHandle correctly replaces JoinHandle.

Good call switching to the futures-crate handle and cleaning up the old import.
No further action required.


609-613: Down-stream code must be adapted to new run() signature.

run() now returns HashMap<HoprLibProcesses, AbortHandle>.
Please confirm all call-sites (e.g. hoprd, integration tests, REST API) compile and abort tasks with .abort().


623-623: Processes map switched to AbortHandle – 👍

The local map definition is consistent with the new API and avoids type mismatch.


887-901: OnReceivedAcknowledgement loop made abortable – nice upgrade

Wrapping the endless loop in spawn_as_abortable gives external cancellation while keeping behaviour unchanged.
Consider logging the Aborted case (returned by the wrapped future) if observability is desired, but otherwise this looks good.


946-961: Periodic ticket-flush converted to abortable future – OK

Good to see the 5-second tick running through a single utility (execute_on_tick) and now cancellable via the returned handle.
No issues spotted.

common/async-runtime/src/lib.rs (1)

4-6: Exposing AbortHandle at crate root – looks good

Re-exporting the handle simplifies downstream imports.
No concerns.

transport/api/src/lib.rs (1)

464-466: transport_layer.run() left without a way to abort/observe panics

spawn_as_abortable gives cancellation, but the earlier _mixing_process_before_sending_out (line 461) is spawned with plain spawn and its handle is ignored – it can neither be aborted nor observed.

Consider:

processes.insert(
    HoprTransportProcess::MediumMixer,
    spawn_as_abortable(_mixing_process_before_sending_out),
);

(or convert the earlier spawn call to spawn_as_abortable directly).
[ suggest_essential_refactor ]

@Teebor-Choka Teebor-Choka merged commit defb66e into master Jun 16, 2025
49 checks passed
@Teebor-Choka Teebor-Choka deleted the kauki/hopr-lib/abortable-join-handles branch June 16, 2025 13:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migrate codebase to use futures::future::AbortHandle instead of runtime-specific JoinHandle
2 participants