-
Notifications
You must be signed in to change notification settings - Fork 252
Fix connected component #454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
val (currSum, cnt) = ee.select(sum(col(SRC).cast(DecimalType(20, 0))), count("*")).rdd | ||
val (currSum, cnt) = minNbrs1.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the convergence critieria correction.
Ref: Lemma 8 in paper https://dl.acm.org/doi/pdf/10.1145/2670979.2670997
var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr | ||
.persist(intermediateStorageLevel) | ||
|
||
var prevSum: BigDecimal = minNbrs1.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any concern with an overflow by capping this at 20?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added verification.
var prevSum: BigDecimal = null | ||
|
||
// compute min neighbors (including self-min) | ||
var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to do the first minimization vector connection reduction outside of the while loop. +1
@@ -356,12 +362,14 @@ object ConnectedComponents extends Logging { | |||
|
|||
ee.persist(intermediateStorageLevel) | |||
|
|||
// test convergence | |||
minNbrs1 = minNbrs(ee) // src >= min_nbr | |||
.persist(intermediateStorageLevel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Within the termination of the while loop, should we also explicitly call unpersist() in order to remove the cache references to these pseudo-mutable vars that represent an immutable cache state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will revisit and fix the persist/unpersist operations
Let's get the original reporter to confirm with the customer on their data to validate that the convergence correctness is fixed. Let's also be sure to clean up the persistence (as that is something that they complained about) due to the memory cache holding on to those orphan states that were persisted during iterative convergence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have test to validate the algorithm is correct?
I will generate some random graph to test. |
This is great! |
Fix connected component algorithm implementation:
The algorithm is described in this paper: https://dl.acm.org/doi/pdf/10.1145/2670979.2670997
Summary of the algorithm: essentially, the algorithm is doing "large-star" and "small-star" transformations over the graph,
until convergence (i.e. the transformation doesn't change graph structure),
large star transformation:
For each node U, finding the neighbor nodes V that has larger label than node U, and finding the node M that has the minimal label in the set of
{*node_U_neighbor_node_set, node_U}
, for every node V, generate an edge from V to M, the generated edges compose of the "large star" transformation output graph.small star transformation:
For each node U, finding the neighbor nodes V that has smaller label than node U, and finding the node M that has the minimal label in the set of
node_U_neighbor_node_set
, for every node V, generate an edge from V to M, and additionally, generate an edge of U to M, the generated edges compose of the "small star" transformation output graph.Convergence critieria:
Convergence means large star and small star transformation generates identical output graph, and we can prove that, the output graph has the same component connectivity, and the topology of the graph is the overall graph is a union of disjoint stars, one for each connected component, and each star has a center node with the minimal label in its connected component.
To check if the graph reaches convergence, we can check the sum of all nodes' "minimal label value in set {*neighbor_nodes, self_node}", the sum keeps decreasing for each pass of large-star / small-star transformation, once it stops decreasing, it means the graph reaches convergence, and we can get each node's connectivity component ID by calculating its "minimal label value in set {*neighbor_nodes, self_node}".
So, current graphframe connected component algorithm code has one error in convergence checking, I correct it in this PR.