Skip to content

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jun 3, 2024

Fix connected component algorithm implementation:

The algorithm is described in this paper: https://dl.acm.org/doi/pdf/10.1145/2670979.2670997

Summary of the algorithm: essentially, the algorithm is doing "large-star" and "small-star" transformations over the graph,
until convergence (i.e. the transformation doesn't change graph structure),

graph = input_graph  # input graph each node must has a unique and comparable label number
while (! converged) {
   graph = large_star(graph)
   graph = small_star(graph)
}

large star transformation:
For each node U, finding the neighbor nodes V that has larger label than node U, and finding the node M that has the minimal label in the set of {*node_U_neighbor_node_set, node_U}, for every node V, generate an edge from V to M, the generated edges compose of the "large star" transformation output graph.

small star transformation:
For each node U, finding the neighbor nodes V that has smaller label than node U, and finding the node M that has the minimal label in the set of node_U_neighbor_node_set, for every node V, generate an edge from V to M, and additionally, generate an edge of U to M, the generated edges compose of the "small star" transformation output graph.

image

Convergence critieria:

Convergence means large star and small star transformation generates identical output graph, and we can prove that, the output graph has the same component connectivity, and the topology of the graph is the overall graph is a union of disjoint stars, one for each connected component, and each star has a center node with the minimal label in its connected component.

To check if the graph reaches convergence, we can check the sum of all nodes' "minimal label value in set {*neighbor_nodes, self_node}", the sum keeps decreasing for each pass of large-star / small-star transformation, once it stops decreasing, it means the graph reaches convergence, and we can get each node's connectivity component ID by calculating its "minimal label value in set {*neighbor_nodes, self_node}".

So, current graphframe connected component algorithm code has one error in convergence checking, I correct it in this PR.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Comment on lines 364 to 372
val (currSum, cnt) = ee.select(sum(col(SRC).cast(DecimalType(20, 0))), count("*")).rdd
val (currSum, cnt) = minNbrs1.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the convergence critieria correction.

Ref: Lemma 8 in paper https://dl.acm.org/doi/pdf/10.1145/2670979.2670997

var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr
.persist(intermediateStorageLevel)

var prevSum: BigDecimal = minNbrs1.select(sum(col(MIN_NBR).cast(DecimalType(20, 0))), count("*")).rdd

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern with an overflow by capping this at 20?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added verification.

var prevSum: BigDecimal = null

// compute min neighbors (including self-min)
var minNbrs1: DataFrame = minNbrs(ee) // src >= min_nbr

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to do the first minimization vector connection reduction outside of the while loop. +1

@@ -356,12 +362,14 @@ object ConnectedComponents extends Logging {

ee.persist(intermediateStorageLevel)

// test convergence
minNbrs1 = minNbrs(ee) // src >= min_nbr
.persist(intermediateStorageLevel)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within the termination of the while loop, should we also explicitly call unpersist() in order to remove the cache references to these pseudo-mutable vars that represent an immutable cache state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will revisit and fix the persist/unpersist operations

@BenWilson2
Copy link

Let's get the original reporter to confirm with the customer on their data to validate that the convergence correctness is fixed. Let's also be sure to clean up the persistence (as that is something that they complained about) due to the memory cache holding on to those orphan states that were persisted during iterative convergence.
Once these are done, LGTM!

Copy link
Collaborator

@serena-ruan serena-ruan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test to validate the algorithm is correct?

@WeichenXu123
Copy link
Contributor Author

Do we have test to validate the algorithm is correct?

I will generate some random graph to test.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123 WeichenXu123 merged commit 107eebe into graphframes:master Jun 20, 2024
@minerjaime
Copy link

This is great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants