Skip to content

Conversation

SemyonSinchenko
Copy link
Collaborator

What changes were proposed in this pull request?

I added a breakable -> break to the Pregel. In the case of convergence before reaching maxIter it will give a significant performance benefit. In the case of not convergence before reaching maxIter it may introduce a small overhead, but because Dataset.isEmpty is not an action but is a quite cheap operation, the performance degradation is very slow. Another strong benefit to merge it is that allows users to set a very high value of maxIter if they are sure that their Pregel should converge, so it simplify development and usage.

Why are the changes needed?

As described in #549 / #368

Close #549
Close #368

@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 80.00000% with 6 lines in your changes missing coverage. Please review.

Project coverage is 89.60%. Comparing base (bc487ef) to head (6265d19).
Report is 17 commits behind head on master.

Files with missing lines Patch % Lines
src/main/scala/org/graphframes/lib/Pregel.scala 80.00% 6 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #550      +/-   ##
==========================================
- Coverage   91.43%   89.60%   -1.83%     
==========================================
  Files          18       18              
  Lines         829      914      +85     
  Branches       52      110      +58     
==========================================
+ Hits          758      819      +61     
- Misses         71       95      +24     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@SemyonSinchenko
Copy link
Collaborator Author

@SauronShepherd could you please take a look? It is exactly as we discussed in email. I also created #553 and I'm going to write a deep dive into how to use Pregel, so I will describe earlyStopping vs maxIter as part of my work on #553. Does it sounds good for you?
cc: @rjurney

@SauronShepherd
Copy link
Contributor

SauronShepherd commented Mar 22, 2025

Some Thoughts:

  1. It looks like you're executing the filter msgDF.filter(Pregel.msg.isNotNull) twice—once to check if it's empty (to stop the loop) and again to retrieve new messages. This transformation should be performed only once.
  2. Why is the checkpointDir check inside the loop when it doesn't depend on the iteration? Also, why was this block committed under an unrelated topic (Spark Connect)?
  3. msgDF is declared as a var, but it should be a val since it is never updated.
  4. setMaxIter is the setter for maxIter, and setCheckpointInterval is the setter for checkpointInterval. For consistency, shouldn't setEarlyStopping be the setter for earlyStopping instead of the current doEarlyStopping?
  5. Caching after checkpointing the DataFrame may be redundant.
  6. Is newVertexUpdateColDF.count() really necessary?
  7. Since the execution plan isn't that large, I think we should set the checkpointInterval to 5.

Other Observations:

  • sendMsgs should also be a val.
  • CHECKPOINT_NAME_PREFIX is not used.

More Thoughts:
isEmpty is actually an action and not a cheap operation, especially when AQE is enabled. I ran executions using your unit test without caching, unpersisting, and counting DataFrames, and the tests completed in 48 seconds without "early stopping" but with maxIter reduced to 9 and checkpointInterval increased to 5, so only one checkpoint was performed). I also believe we could further improve the overall performance of this algorithm.

I think your "early stopping" feature is a great addition, particularly for cases where users don't know or can't estimate the required number of iterations, to prevent having many more iterations than necessary. However, it may introduce some overhead, so when maxIter is low, it should be discouraged. We could mention this in the documentation, for example.

All that being said, I don’t believe any of my comments prevent this PR from being approved. We can merge it now and revisit it in the future.

@SemyonSinchenko
Copy link
Collaborator Author

Thanks for the review!

It looks like you're executing the filter msgDF.filter(Pregel.msg.isNotNull) twice—once to check if it's empty (to stop the loop) and again to retrieve new messages. This transformation should be performed only once.

Nice catch, thanks! Fixed!

Why is the checkpointDir check inside the loop when it doesn't depend on the iteration? Also, why was this block committed under an unrelated topic (Spark Connect)?

It was added with a SparkConnect PR. It is not related anyhow to the current PR and it is marked as a change because I wrap all the whil into breakable {}. Fixed!

msgDF is declared as a var, but it should be a val since it is never updated.

Fixed (actually it was before this PR)!

setMaxIter is the setter for maxIter, and setCheckpointInterval is the setter for checkpointInterval. For consistency, shouldn't setEarlyStopping be the setter for earlyStopping instead of the current doEarlyStopping?

Fixed!

Caching after checkpointing the DataFrame may be redundant.

Let's leave it for the further PRs? Otherwise it is hard to track performance improvements...

Is newVertexUpdateColDF.count() really necessary?

I don't think so, but let's leave it for the next PR?

We could mention this in the documentation, for example.

I added it to both scala docstring and python docstring:

  /**
   * Should Pregel stop earlier in case of no new messages to send?
   *
   * Early stopping allows to terminate Pregel before reaching maxIter by checking is there any
   * non-null message or not. While in some cases it may gain significant performance boost, it
   * other cases it can tend to performance degradation, because checking is messages DataFrame is
   * empty or not is an action and requires materialization of the Spark Plan with some additional
   * computations.
   *
   * In the case when user can assume a good value of maxIter it is recommended to leave this
   * value to the default "false". In the case when it is hard to estimate an amount of iterations
   * required for convergence, it is recommended to set this value to "false" to avoid iterating
   * over convergence until reaching maxIter. When this value is "true", maxIter can be set to a
   * bigger value without risks.
   *
   * @param value
   *   should Pregel checks for the termination condition on each step
   * @return
   */

I have also plans to write a comprehensive guide about Pregel.

Since the execution plan isn't that large, I think we should set the checkpointInterval to 5.

It may fail in the case when there were some transformations in edges because we are joining with it on each iteration... Let's leave it for the next PRs?

@SauronShepherd
Copy link
Contributor

I'll have to add some minor changes in the Pregel class because of #554, but I'll keep the cache too.

@SemyonSinchenko
Copy link
Collaborator Author

@SauronShepherd Let's merge this one to unblock your work on #554 and avoid merge conflicts?

@SauronShepherd
Copy link
Contributor

I don't see any other option but commenting on this issue.

@SemyonSinchenko
Copy link
Collaborator Author

@rjurney Hi! Could you take a look and if everything is good, could you please merge it?

@SemyonSinchenko SemyonSinchenko requested a review from rjurney March 27, 2025 18:07
@SemyonSinchenko
Copy link
Collaborator Author

@rjurney what do you think about it?

@SemyonSinchenko
Copy link
Collaborator Author

@SauronShepherd I cannot get an answer from @rjurney, could you please review it? I need it to finalize the new proposed version of the "no-graphx" ShortestPaths.

@rjurney
Copy link
Collaborator

rjurney commented Mar 31, 2025

@SemyonSinchenko sorry about that, I opened it to review last night. On it.

Copy link
Collaborator

@rjurney rjurney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@SemyonSinchenko SemyonSinchenko merged commit 5db86a2 into graphframes:master Mar 31, 2025
5 checks passed
@SemyonSinchenko SemyonSinchenko deleted the 549-pregel-early-stopping branch April 6, 2025 09:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: stop Pregel earlier in case there is no non-null messages Stop Pregel interations after no more new messages are created
4 participants