-
Notifications
You must be signed in to change notification settings - Fork 97
feat(runtime): make spawned tasks abortable through futures mechanism #7247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughThis change refactors asynchronous task management across multiple crates by replacing the use of Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant AsyncRuntime
participant Task
Caller->>AsyncRuntime: spawn_as_abortable(future)
AsyncRuntime->>Task: spawn abortable(future)
AsyncRuntime-->>Caller: AbortHandle
Note over Caller,Task: Caller uses AbortHandle to abort the task when needed
Caller->>Task: AbortHandle.abort()
Task-->>Caller: Task is aborted
Suggested labels
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
Closes #7113 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
chain/indexer/src/block.rs (1)
339-345
: Abort the background task on early‐error to avoid leakage.If
rx.next().await
yieldsNone
you return an error but the indexing task (indexing_abort_handle
) is left running in the background, potentially holding DB connections and CPU.
Ensure cleanup before bubbling the error:- } else { - Err(crate::errors::CoreEthereumIndexerError::ProcessError( - "Error during indexing start".into(), - )) + } else { + // stop the spawned task – it is either unhealthy or + // entered its completion path already + indexing_abort_handle.abort(); + Err(crate::errors::CoreEthereumIndexerError::ProcessError( + "Error during indexing start".into(), + )) }transport/probe/src/probe.rs (1)
179-181
: Avoid moving out ofself
– clone the tuple instead.
let me = self.me;
moves the(OffchainPublicKey, Address)
out ofself
.
If either field is notCopy
, the compiler will complain thatself
is partially moved and the rest of the method can no longer use it.
Clone (or borrow) instead:- let me = self.me; + let me = self.me.clone();
🧹 Nitpick comments (17)
common/async-runtime/Cargo.toml (1)
20-26
: Specifydefault-features = false
if you only needAbortHandle
.
futures
pulls in a fair amount of optional code.
If the crate only relies onAbortHandle
/abortable
, disable the default feature set to keep your compile graph lean:-futures = { workspace = true } +futures = { workspace = true, default-features = false }transport/protocol/src/lib.rs (2)
72-73
: RedundantBox::pin
indirection.
spawn_as_abortable
accepts anySend + 'static
future;execute_on_tick(...)
already returns one.
Dropping the heap allocation trims an indirection and aPin<Box<..>>
:-spawn_as_abortable(Box::pin(execute_on_tick( +spawn_as_abortable(execute_on_tick(
155-156
: Prefer a singleAbortHandle
type path for clarity.Here the public signature binds to
hopr_async_runtime::AbortHandle
while the implementation receives the handle directly fromfutures::future::AbortHandle
viaspawn_as_abortable
.
Keeping only the re-exported path avoids accidental type mismatch if the runtime stops re-exporting:pub async fn run_msg_ack_protocol<Db>(...) -> HashMap<ProtocolProcesses, AbortHandle> // import AbortHandle from hopr_async_runtimehoprd/hoprd/src/main.rs (1)
258-270
: Consider awaiting terminated tasks for graceful shutdown.Calling
AbortHandle::abort()
is fire-and-forget; long-running clean-up work inside the tasks (flush logs, close sockets, persist state) may be dropped mid-flight.
If predictable shutdown is desirable, keep theJoinHandle
(or a completion channel) andawait
it after aborting, or add a short timeout sleep before exiting.hoprd/rest-api/src/session.rs (1)
900-906
: Ignoreabort()
result?
AbortHandle::abort()
is infallible but instant; if you want to be sure the listener is actually gone (e.g. to reclaim the port before recreating), keep the matchingAbortRegistration
or a completion signal and wait for confirmation before returning 204.transport/probe/src/probe.rs (1)
173-177
: Minor readability nit – pull the future out before spawning.Long, nested expressions inside
spawn_as_abortable
are harder to read and debug. Extract the future into a variable first, then spawn:- hopr_async_runtime::spawn_as_abortable(direct_neighbors - .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| { + { + let fut = direct_neighbors.for_each_concurrent( + max_parallel_probes, + move |(peer, notifier)| { /* … */ - }) - ) + }, + ); + hopr_async_runtime::spawn_as_abortable(fut) + }hopr/hopr-lib/tests/chain_integration_tests.rs (1)
807-808
: Un-necessary async/join_all
aroundabort()
AbortHandle::abort()
is synchronous and returns()
.
Wrapping each call in an async block and thenjoin_all
ing adds overhead without benefit:-futures::future::join_all(alice_node.node_tasks.into_iter().map(|ah| async move { ah.abort() })).await; -futures::future::join_all(bob_node.node_tasks.into_iter().map(|ah| async move { ah.abort() })).await; +for ah in alice_node.node_tasks { ah.abort(); } +for ah in bob_node.node_tasks { ah.abort(); }hopr/hopr-lib/src/lib.rs (2)
907-926
: Session-server task is now abortable butfor_each_concurrent
ignores abort errors
AbortHandle::abort()
will immediately short-circuit the stream future, but the internal closure swallows theAborted
result.
If you want to surface early termination (for metrics / debug), wrap the spawn in an async block and log theErr(Aborted)
.That said, functionally this change is correct.
976-994
: Strategy-tick now abortable – potential minor improvementAborting will instantaneously end the outer async block, but any in-flight
on_tick()
awaits will still run to completion.
If finer-grained cancellation is ever required, consider passing a cancellation token intoon_tick()
.No blocking change required for this PR.
common/async-runtime/src/lib.rs (1)
26-34
:spawn_as_abortable
drops theJoinHandle
– consider exposing itRight now the
JoinHandle
is assigned to_jh
and immediately dropped, which means:
- Panics inside the task are not observable.
- You cannot
.await
completion for graceful shutdown.A lightweight enhancement would be to return
(AbortHandle, JoinHandle<Result<T, Aborted>>)
:-pub fn spawn_as_abortable<F, T>(f: F) -> AbortHandle +pub fn spawn_as_abortable<F, T>(f: F) -> (AbortHandle, prelude::JoinHandle<Result<T, futures::future::Aborted>>) @@ - let _jh = prelude::spawn(proc); - abort_handle + let jh = prelude::spawn(proc); + (abort_handle, jh)Consumers that do not care about the
JoinHandle
can simply ignore the second tuple element, preserving ergonomics while enabling richer lifecycle management when needed.transport/session/src/manager.rs (3)
356-362
: Prefer a singleAbortHandle
path for clarity
start
currently returnsVec<hopr_async_runtime::AbortHandle>
while the file already importsfutures::future::AbortHandle
asAbortHandle
.
Unlesshopr_async_runtime::AbortHandle
is a public type-alias, mixing the two paths is needlessly confusing.-) -> crate::errors::Result<Vec<hopr_async_runtime::AbortHandle>> { +) -> crate::errors::Result<Vec<AbortHandle>> {Keeps the code self-consistent and avoids surprising type-path mismatches.
370-393
: Loss of panic visibility after switching tospawn_as_abortable
spawn_as_abortable
discards the underlyingJoinHandle
; panics in the spawned future will therefore be silently dropped.
If observing task failure is important, consider wrapping the future and logging / metrics-recording the result before returning theAbortHandle
.
1183-1185
: Unnecessary async hop when aborting handles in tests
AbortHandle::abort()
is synchronous, so spawning an async loop adds overhead for no gain.-futures::stream::iter(ahs) - .for_each(|ah| async move { ah.abort() }) - .await; +for ah in ahs { + ah.abort(); +}Reduces test runtime and avoids an extra task.
transport/api/src/lib.rs (4)
39-40
: Shadow-import ofAbortHandle
The import adds
AbortHandle
fromhopr_async_runtime
, yet several modules rely on the re-export fromfutures
.
Pick one path consistently (ideally the short unqualified one imported here) to avoid ambiguity.
271-272
: Return type path consistencySame note as in
SessionManager::start
: prefer the already-importedAbortHandle
instead of the fully-qualified one for readability:-) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> { +) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {(only the path changed in the diff ‑ no semantic change).
380-382
: HashMap now stores onlyAbortHandle
s – make key names reflect thatVariable names like
processes
still refer to “processes”, but inner variables later namedjh
were not adjusted (jh
now holds anAbortHandle
).
Renaming avoids mental friction for future readers.
542-566
: Long-running dispatch loop swallows errorsInside the abortable future, any error logged in
dispatch_message
is discarded.
If unintended, bubble the error out so test/ops can detect mis-behaviour before aborting:if let Err(e) = smgr.dispatch_message(pseudonym, data).await { error!(%e, "error…"); return Err(e); // let outer task log/panic if needed }Even logging the first error and breaking out would help operational visibility.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (17)
chain/api/Cargo.toml
(1 hunks)chain/api/src/lib.rs
(2 hunks)chain/indexer/src/block.rs
(4 hunks)common/async-runtime/Cargo.toml
(2 hunks)common/async-runtime/src/lib.rs
(2 hunks)hopr/hopr-lib/src/lib.rs
(7 hunks)hopr/hopr-lib/tests/chain_integration_tests.rs
(4 hunks)hoprd/hoprd/src/main.rs
(4 hunks)hoprd/rest-api/src/session.rs
(5 hunks)transport/api/Cargo.toml
(1 hunks)transport/api/src/lib.rs
(5 hunks)transport/probe/Cargo.toml
(1 hunks)transport/probe/src/probe.rs
(4 hunks)transport/protocol/Cargo.toml
(1 hunks)transport/protocol/src/lib.rs
(5 hunks)transport/session/Cargo.toml
(1 hunks)transport/session/src/manager.rs
(10 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
transport/protocol/src/lib.rs (2)
common/async-runtime/src/lib.rs (1)
spawn_as_abortable
(26-34)transport/protocol/src/timer.rs (1)
execute_on_tick
(15-43)
transport/api/src/lib.rs (1)
common/async-runtime/src/lib.rs (1)
spawn_as_abortable
(26-34)
hopr/hopr-lib/src/lib.rs (2)
common/async-runtime/src/lib.rs (1)
spawn_as_abortable
(26-34)transport/protocol/src/timer.rs (1)
execute_on_tick
(15-43)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Linter
- GitHub Check: hopli / docker
- GitHub Check: zizmor
- GitHub Check: Build Candidate Binaries (hopli)
- GitHub Check: tests-unit-nightly
- GitHub Check: tests-unit
- GitHub Check: Build Candidate Binaries (hoprd)
- GitHub Check: Cargo Audit
🔇 Additional comments (13)
transport/session/Cargo.toml (1)
3-3
: Version bump aligns with breaking changes.The minor version increment from
0.8.0
to0.9.0
correctly reflects the breaking migration fromJoinHandle
toAbortHandle
semantics under 0.x semver conventions.transport/api/Cargo.toml (1)
3-3
: Minor version bump signals breaking API change.Updating from
0.11.0
to0.12.0
is appropriate to indicate the refactor of task spawning and cancellation fromJoinHandle
toAbortHandle
.transport/probe/Cargo.toml (1)
3-3
: Incremented minor version for breaking task API change.The change from
0.1.0
to0.2.0
correctly denotes the shift to abortable futures in the probe crate.chain/api/Cargo.toml (1)
3-3
: Appropriate version bump for breaking async runtime update.Bumping
hopr-chain-api
from0.5.1
to0.6.0
signals the breaking transition toAbortHandle
-based task management.transport/protocol/Cargo.toml (1)
3-3
: Reflect breaking change via minor version bump.The upgrade from
0.7.0
to0.8.0
is consistent with semver practices for breaking async API changes.chain/api/src/lib.rs (1)
248-252
: Consider propagating failures fromaction_queue.start()
spawn_as_abortable(self.action_queue.clone().start())
will discard any error returned bystart()
, whereas the previousJoinHandle
allowed awaiting the result and propagating it.
Ifstart()
can fail, wrap it to log/convert the error before spawning:-let handle = spawn_as_abortable(self.action_queue.clone().start()); +let handle = spawn_as_abortable(async { + if let Err(e) = self.action_queue.clone().start().await { + tracing::error!(%e, "OutgoingOnchainActionQueue terminated unexpectedly"); + } +});hopr/hopr-lib/src/lib.rs (5)
34-40
: Import ofAbortHandle
correctly replacesJoinHandle
.Good call switching to the futures-crate handle and cleaning up the old import.
No further action required.
609-613
: Down-stream code must be adapted to newrun()
signature.
run()
now returnsHashMap<HoprLibProcesses, AbortHandle>
.
Please confirm all call-sites (e.g.hoprd
, integration tests, REST API) compile and abort tasks with.abort()
.
623-623
: Processes map switched toAbortHandle
– 👍The local map definition is consistent with the new API and avoids type mismatch.
887-901
:OnReceivedAcknowledgement
loop made abortable – nice upgradeWrapping the endless loop in
spawn_as_abortable
gives external cancellation while keeping behaviour unchanged.
Consider logging theAborted
case (returned by the wrapped future) if observability is desired, but otherwise this looks good.
946-961
: Periodic ticket-flush converted to abortable future – OKGood to see the 5-second tick running through a single utility (
execute_on_tick
) and now cancellable via the returned handle.
No issues spotted.common/async-runtime/src/lib.rs (1)
4-6
: ExposingAbortHandle
at crate root – looks goodRe-exporting the handle simplifies downstream imports.
No concerns.transport/api/src/lib.rs (1)
464-466
:transport_layer.run()
left without a way to abort/observe panics
spawn_as_abortable
gives cancellation, but the earlier_mixing_process_before_sending_out
(line 461) is spawned with plainspawn
and its handle is ignored – it can neither be aborted nor observed.Consider:
processes.insert( HoprTransportProcess::MediumMixer, spawn_as_abortable(_mixing_process_before_sending_out), );(or convert the earlier
spawn
call tospawn_as_abortable
directly).
[ suggest_essential_refactor ]
This pull request introduces a significant refactor across multiple modules to replace the use of
JoinHandle
withAbortHandle
for managing asynchronous tasks. This change improves task cancellation capabilities and aligns the codebase with modern async patterns. The changes span several files and include updates to function signatures, imports, and task spawning logic.Key Changes by Theme:
Transition to
AbortHandle
:JoinHandle
withAbortHandle
in task management across multiple modules, includingchain/api
,chain/indexer
,hopr-lib
,hoprd
, andhoprd/rest-api
. This required updating function signatures, return types, and task storage structures. [1] [2] [3] [4] [5]Task Spawning Updates:
spawn_as_abortable
incommon/async-runtime
to spawn tasks withAbortHandle
support. This function wraps tasks in an abortable future and returns the correspondingAbortHandle
. [1] [2] [3] [4]Dependency and Import Adjustments:
futures
as a dependency inCargo.toml
to accessAbortHandle
and related utilities. Updated imports across files to includeAbortHandle
and removeJoinHandle
where no longer needed. [1] [2] [3]Test and Integration Updates:
chain_integration_tests.rs
to useAbortHandle
for task management. Adjusted cleanup logic to call.abort()
onAbortHandle
instances instead of cancelingJoinHandle
. [1] [2]REST API Session Management:
hoprd/rest-api/src/session.rs
to storeAbortHandle
inStoredSessionEntry
instead ofJoinHandle
. Updated task spawning for session listeners to usespawn_as_abortable
. [1] [2]These changes collectively enhance the codebase's robustness and maintainability by adopting a more flexible and modern approach to asynchronous task management.