Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/dispute-coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
assert_matches = "1.4.0"
polkadot-overseer = { path = "../../overseer" }
137 changes: 131 additions & 6 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{
ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
DisputeParticipationMessage, ImportStatementsResult
DisputeParticipationMessage, ImportStatementsResult,
}
};
use polkadot_node_subsystem_util::rolling_session_window::{
Expand Down Expand Up @@ -71,10 +71,27 @@ const ACTIVE_DURATION_SECS: Timestamp = 180;
/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
type Timestamp = u64;

#[derive(Eq, PartialEq)]
enum Participation {
Pending,
Complete,
}

impl Participation {
fn complete(&mut self) -> bool {
let complete = *self == Participation::Complete;
if !complete {
*self = Participation::Complete
}
complete
}
}

struct State {
keystore: Arc<LocalKeystore>,
highest_session: Option<SessionIndex>,
rolling_session_window: RollingSessionWindow,
recovery_state: Participation,
}

/// Configuration for the dispute coordinator subsystem.
Expand Down Expand Up @@ -277,7 +294,7 @@ where
B: Backend,
{
loop {
let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await;
let res = run_until_error(&mut ctx, &subsystem, &mut backend, &*clock).await;
match res {
Err(e) => {
e.trace();
Expand All @@ -299,7 +316,7 @@ where
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async fn run_iteration<B, Context>(
async fn run_until_error<B, Context>(
ctx: &mut Context,
subsystem: &DisputeCoordinatorSubsystem,
backend: &mut B,
Expand All @@ -314,6 +331,7 @@ where
keystore: subsystem.keystore.clone(),
highest_session: None,
rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
recovery_state: Participation::Pending,
};

loop {
Expand All @@ -328,7 +346,14 @@ where
&mut overlay_db,
&mut state,
update.activated.into_iter().map(|a| a.hash),
).await?
).await?;
if !state.recovery_state.complete() {
handle_startup(
ctx,
&mut overlay_db,
&mut state,
).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOverseer::Communication { msg } => {
Expand All @@ -349,6 +374,98 @@ where
}
}

// Restores the subsystem's state before proceeding with the main event loop. Primarily, this
// repopulates the rolling session window the relevant session information to handle incoming
// import statement requests.
//
// This method also retransmits a `DisputeParticiationMessage::Participate` for any non-concluded
// disputes for which the subsystem doesn't have a local statement, ensuring it eventually makes an
// arbitration on the dispute.
async fn handle_startup<Context>(
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<Message = DisputeCoordinatorMessage>,
Context: SubsystemContext<Message = DisputeCoordinatorMessage>,
{
let recent_disputes = match overlay_db.load_recent_disputes() {
Ok(Some(disputes)) => disputes,
Ok(None) => return Ok(()),
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into());
},
};

// Filter out disputes that have already concluded.
let active_disputes = recent_disputes.into_iter()
.filter(|(_, status)| *status == DisputeStatus::Active)
.collect::<RecentDisputes>();

for ((session, ref candidate_hash), _) in active_disputes.into_iter() {
let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
tracing::error!(target: LOG_TARGET, "Failed initial load of candidate votes: {:?}", e);
continue
},
};

let validators = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
continue
}
Some(info) => info.validators.clone(),
};

let n_validators = validators.len();
let voted_indices: HashSet<_> = votes.voted_indices().into_iter().collect();

// Determine if there are any missing local statements for this dispute. Validators are
// filtered if:
// 1) their statement already exists, or
// 2) the validator key is not in the local keystore (i.e. the validator is remote).
// The remaining set only contains local validators that are also missing statements.
let missing_local_statement = validators.iter()
.enumerate()
.map(|(index, validator)| (ValidatorIndex(index as _), validator))
.any(|(index, validator)|
!voted_indices.contains(&index) &&
state.keystore
.key_pair::<ValidatorPair>(validator)
.ok()
.map_or(false, |v| v.is_some())
);

// Send a `DisputeParticipationMessage` for all non-concluded disputes which do not have a
// recorded local statement.
if missing_local_statement {
let (report_availability, receive_availability) = oneshot::channel();
ctx.send_message(DisputeParticipationMessage::Participate {
candidate_hash: *candidate_hash,
candidate_receipt: votes.candidate_receipt.clone(),
session,
n_validators: n_validators as u32,
report_availability,
}).await;

if !receive_availability.await? {
tracing::debug!(target: LOG_TARGET, "Participation failed. Candidate not available");
}
}
}

Ok(())
}

async fn handle_new_activations(
ctx: &mut (impl SubsystemContext<Message = DisputeCoordinatorMessage> + overseer::SubsystemContext<Message = DisputeCoordinatorMessage>),
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
Expand Down Expand Up @@ -470,8 +587,8 @@ async fn handle_incoming(
) => {
issue_local_statement(
ctx,
state,
overlay_db,
state,
candidate_hash,
candidate_receipt,
session,
Expand Down Expand Up @@ -547,6 +664,10 @@ async fn handle_import_statements(
"Missing info for session which has an active dispute",
);

pending_confirmation
.send(ImportStatementsResult::InvalidImport)
.map_err(|_| Error::OneshotSend)?;

return Ok(())
}
Some(info) => info.validators.clone(),
Expand Down Expand Up @@ -671,13 +792,17 @@ async fn handle_import_statements(

overlay_db.write_candidate_votes(session, candidate_hash, votes.into());

pending_confirmation
.send(ImportStatementsResult::ValidImport)
.map_err(|_| Error::OneshotSend)?;

Ok(())
}

async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
state: &mut State,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
Expand Down
Loading