Skip to content

Conversation

SemyonSinchenko
Copy link
Collaborator

Work is in progress but the overall design is in the final state:

  • subproject graphframes-connect
  • GraphFrames API in protobuf: graphframes-connect/src/main/protobuf/graphframes.proto
  • Proto->GraphFrames->Proto logic: graphframes-connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnectUtils.scala
  • Connect Relation Plugin: graphframes-connect/src/main/scala/org/apache/spark/sql/graphframes/GraphFramesConnect.scala

For JVM part generation is built in to the build.sbt; for Python (and possible other clients) buf (buf.yaml, buf.gen.yaml)

Close #447

@codecov-commenter
Copy link

codecov-commenter commented Feb 8, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 42.85714% with 4 lines in your changes missing coverage. Please review.

Project coverage is 90.35%. Comparing base (bc487ef) to head (fc8ebae).
Report is 7 commits behind head on master.

Files with missing lines Patch % Lines
src/main/scala/org/graphframes/lib/Pregel.scala 25.00% 3 Missing ⚠️
...cala/org/graphframes/lib/ConnectedComponents.scala 66.66% 1 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #506      +/-   ##
==========================================
- Coverage   91.43%   90.35%   -1.09%     
==========================================
  Files          18       18              
  Lines         829      902      +73     
  Branches       52       96      +44     
==========================================
+ Hits          758      815      +57     
- Misses         71       87      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@SemyonSinchenko SemyonSinchenko changed the title [WIP] feat: SparkConnect support feat: SparkConnect support Feb 8, 2025
@SemyonSinchenko SemyonSinchenko marked this pull request as ready for review February 8, 2025 18:54
@SemyonSinchenko
Copy link
Collaborator Author

@rjurney Hello!

At the moment PR provides a (mostly) working version of GrahFrames API for PySpark Connect. There are a lot of open questions from my side:

  • I would like to change tests to work for both connect and classic but it can be tricky because current tests are very old; at the moment I just copied the whole tests suite and slightly modified it;
  • I do not know what is the best way to integrate Connect API to the current PySpark API: at the moment I implemented everything as a separate GraphFrameConnect class that has (almost) the same API like the GraphFrame;

How to try it: run the script in dev folder. Connect to the Spark Connect Server with SparkSession.builder.remote("sc://localhost:15002").getOrCreate() and use graphframes.connect.graphframes_client.GraphFrameConnect.

@rjurney
Copy link
Collaborator

rjurney commented Feb 9, 2025

@rjurney Hello!

At the moment PR provides a (mostly) working version of GrahFrames API for PySpark Connect. There are a lot of open questions from my side:

  • I would like to change tests to work for both connect and classic but it can be tricky because current tests are very old; at the moment I just copied the whole tests suite and slightly modified it;
  • I do not know what is the best way to integrate Connect API to the current PySpark API: at the moment I implemented everything as a separate GraphFrameConnect class that has (almost) the same API like the GraphFrame;

How to try it: run the script in dev folder. Connect to the Spark Connect Server with SparkSession.builder.remote("sc://localhost:15002").getOrCreate() and use graphframes.connect.graphframes_client.GraphFrameConnect.

My PR #473 converts the unit tests to pytest tests. That should help there. My PR is a monster, I will work on breaking it up into pieces tomorrow and that should give you the ability to pull the test-related code into your PR.

It would be nice if there wasn't a completely different API for Spark Connect. How different are the implementations? I'll take a look at the PR tomorrow and see.

This is very cool work, thanks for it!

@SemyonSinchenko
Copy link
Collaborator Author

It would be nice if there wasn't a completely different API for Spark Connect. How different are the implementations? I'll take a look at the PR tomorrow and see.

It is the same, just implemented as a separate class (python/graphframes/connect/graphframe_client.py)

@rjurney
Copy link
Collaborator

rjurney commented Feb 11, 2025

Is it feasible to add connect support to GraphFrame? Especially if it could detect a Connect connection and 'just work' or take an argument?

@SemyonSinchenko
Copy link
Collaborator Author

Is it feasible to add connect support to GraphFrame? Especially if it could detect a Connect connection and 'just work' or take an argument?

Of course. That is the idea, but the question is how to do it? In Spark 3.x devs made a decision to make two versions of DataFrame (pyspark.sql.DataFrame and pyspark.sql.connect.dataframe.DataFrame). In the Spark 4.x it is changed and there are three versions at the moment:

I did not want to touch the existing code in this PR. I would like to finalize the implementation as a standalone GraphFrameConnect class that has exactly the same API as an existing GraphFrame. And we can deal with an integration in the next iteration.

What do you think about it?
@rjurney

@SemyonSinchenko
Copy link
Collaborator Author

To be honest, I like the idea of dispatch like in PySpark, but it will require big changes in the current GraphFrame, tests, build, etc. I think it would be better to do in GraphFrames 1.0 because I would like to slightly change the public API.

For example, due to Spark Connect limitations it maybe tricky to return both loss and DataFrame from svdPlusPlus. I would like to change the signature and do not return the loss by default. The same about PageRank: I would like to make computation of the edge weights optional with default to False.

@rjurney
Copy link
Collaborator

rjurney commented Feb 11, 2025

Is it feasible to add connect support to GraphFrame? Especially if it could detect a Connect connection and 'just work' or take an argument?

Of course. That is the idea, but the question is how to do it? In Spark 3.x devs made a decision to make two versions of DataFrame (pyspark.sql.DataFrame and pyspark.sql.connect.dataframe.DataFrame). In the Spark 4.x it is changed and there are three versions at the moment:

I did not want to touch the existing code in this PR. I would like to finalize the implementation as a standalone GraphFrameConnect class that has exactly the same API as an existing GraphFrame. And we can deal with an integration in the next iteration.

What do you think about it? @rjurney

Ohhhh wow, okay. Now this all makes sense :) I would not have thought that. Everything makes sense now. I think you're doing what makes sense.

@SemyonSinchenko
Copy link
Collaborator Author

Questions / Topics

  1. Should we store the generated python code in the repository or not? Apache Spark is storing python code but JVM code is generated during the build. At the moment, my PR follows the way of Apache Spark.
  2. Should we have a separate GraphFramesConnect or should we use runtime dispatch instead? At the moment my implementation offers a separate class, but Apache Spark uses dispatch.
  3. Should a GraphFrames Connect be a part of graphframes distribution or should we provide a separate JAR instead?
  4. Should be a Python Connect client be a part of graphframes distribution, or should it be an extra (like pip install graphframes[connect])?

@SemyonSinchenko SemyonSinchenko changed the title feat: SparkConnect support [WIP-DO-NOT-MERGE] feat: SparkConnect support Feb 23, 2025
@rjurney
Copy link
Collaborator

rjurney commented Feb 25, 2025

@SemyonSinchenko thanks for your work here, this is just awesome. I am going to give it a thorough review this weekend, sooner if I can. Will have questions borne of my lack of Scala knowledge as much as anything :)

@SemyonSinchenko
Copy link
Collaborator Author

@rjurney What is the plan for the review? Will you review this changes, or should I try to ask someone else to review this?

@rjurney
Copy link
Collaborator

rjurney commented Mar 4, 2025

@SemyonSinchenko TODAY

@rjurney
Copy link
Collaborator

rjurney commented Mar 4, 2025

@SemyonSinchenko can you describe any changes to the user experience of GraphFrames? Today I am using databricks-connect via VSCode for PySpark on Databricks and I can't use GraphFrame.pageRank because I am on Connect. Will it work with this PR? What about checkpointing code? That doesn't work either. Just wondering.

@SemyonSinchenko
Copy link
Collaborator Author

@SemyonSinchenko can you describe any changes to the user experience of GraphFrames? Today I am using databricks-connect via VSCode for PySpark on Databricks and I can't use GraphFrame.pageRank because I am on Connect. Will it work with this PR? What about checkpointing code? That doesn't work either. Just wondering.

@rjurney

For classic users nothing should changes, only some minor / questinable things. For example, I introduced the build of JAR to the process of building the graphframes PySpark. Maybe I should remove it. Should I?

For connect users it is different. They should add graphframes-connect JAR to the Spark Connect Server part and add a spark conf like spark.connect.extensions.relation.classes=org.apache.spark.sql.graphframes.GraphFramesConnect to configuration of their Spark Connect Server. That is how the connect plugin system is supposed to be used.

Answers:

  1. Will it work on Databricks? Most probably not, but I cannot be sure for 100%. The story is that Databricks Spark is a fork of Apache Spark and it does not match 100% to the Apache Spark. As I can assume based on the delta-io code for PySpark Connect, guys from Databricks backported the signature of plugins from the Spark 4.0 to their branch for DBRs 14.x and 15.x. There is a strong reason for that, the same reason why it was changed from 3.x to 4.x. But my code is targeting Apache Spark 3.4.x and 3.5.x with a workaround, related to the shading rules, to avoid the problem with plugins. So, in my understanding, even with my plugin, GraphFrames won't work on DBR 14.x and DBR 15.x (Spark 3.5.x), but I'm not 100% sure. My question is, should we maintain the DBR compatibility? And if so, how to do it without an access to the source code of DBRs? At the moment there is even no documentation about how to add Spark Connect plugins to the Databricks clusters and you should provide the right configuration before the start of runtime (I'm assuming that the only way is by init scripts that have terrible documentation).
  2. I workarounded the checkpoint problem by the usage of the spark.checkpoint.dir variable. You can see it, for example, in these lines. For classic the behavior is the same.

@SemyonSinchenko
Copy link
Collaborator Author

@rjurney JFYI. Because I know about the breaking change in connect plugins system from Apache Spark 3.5.x to 4.0.x (the change, that in my understanding is already in the DBR 14.x and 15.x), I separated all the plugins logic from the plugin itself: the plugin itself is less than five logical lines of code to simplify the migration (and DBR shim) as much as possible. In Apache Spark 4.0.x the signature is changed to Optional<LogicalPlan> transform(byte[] relation, SparkConnectPlanner planner); and my workaround with shading of GraphFrames proto message is not needed anymore.

@rjurney
Copy link
Collaborator

rjurney commented Mar 10, 2025

@SemyonSinchenko so is it ready for review then?

@SemyonSinchenko
Copy link
Collaborator Author

@SemyonSinchenko so is it ready for review then?

@rjurney Yes, it is. I still need to resolve merge conflicts, but 99% of them are related to the tests, not to the logic. Also one additional method will be introduced to wrap also a recently added power iteration clustering.

@SemyonSinchenko SemyonSinchenko changed the title [DO-NOT-MERGE] feat: SparkConnect support feat: SparkConnect support Mar 10, 2025
@SemyonSinchenko
Copy link
Collaborator Author

@rjurney I resolved all the conflicts.

An important changes you should check:

  • I added the GraphFrames JAR to the python package to avoid JVM-errors;
  • I moved tests to python/tests to avoid storing them inside the package;
  • I significantly simplify tests by removing all the artifacts of unittests;

At the moment only part of tests are turned on for Connect because other part of tests is based on the examples that are called through py4j.

@rjurney
Copy link
Collaborator

rjurney commented Mar 10, 2025

Wow, awesome! I'll review it now.

Copy link
Collaborator

@rjurney rjurney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko I'm going ahead and approving this, although I would appreciate a brief explanation of where dispatch is implemented because I can't tell what the user experience of this new approach i.e. normal vs connect.

_sym_db = _symbol_database.Default()


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SemyonSinchenko what is this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, do not look on this. This code is generated automatically by the protoc (buf)!

return _from_java_gf(jdf, self._spark)

def filterEdges(self, condition: Union[str, Column]) -> "GraphFrame":
def filterEdges(self, condition: str | Column) -> "GraphFrame":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What version of Python introduced this? Ahhh 3.5. Cool, I learned about it :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is quite new, but it was backported in all supported versions, so all that you need to do is to add a line:

from __future__ import annotations

https://peps.python.org/pep-0563/

raise TypeError("condition should be string or Column")
return _from_java_gf(jdf, self._spark)

return GraphFrame._from_impl(self._impl.filterEdges(condition=condition))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exception handling handled elsewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the proposed design, all the exception are the responsibility of the implementation. So, answer is yes, it is handled in impl.

@rjurney
Copy link
Collaborator

rjurney commented Mar 16, 2025

@SemyonSinchenko regarding the JAR, I think it is right to include it and think we should add it to MANIFEST.in.

Copy link
Collaborator

@rjurney rjurney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm again

@rjurney rjurney merged commit 1e702c2 into graphframes:master Mar 17, 2025
5 checks passed
@SemyonSinchenko SemyonSinchenko deleted the 447-spark-connect branch April 6, 2025 09:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Spark Connect support
3 participants