-
Notifications
You must be signed in to change notification settings - Fork 252
feat: SparkConnect support #506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: SparkConnect support #506
Conversation
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #506 +/- ##
==========================================
- Coverage 91.43% 90.35% -1.09%
==========================================
Files 18 18
Lines 829 902 +73
Branches 52 96 +44
==========================================
+ Hits 758 815 +57
- Misses 71 87 +16 ☔ View full report in Codecov by Sentry. |
@rjurney Hello! At the moment PR provides a (mostly) working version of GrahFrames API for PySpark Connect. There are a lot of open questions from my side:
How to try it: run the script in |
My PR #473 converts the unit tests to It would be nice if there wasn't a completely different API for Spark Connect. How different are the implementations? I'll take a look at the PR tomorrow and see. This is very cool work, thanks for it! |
It is the same, just implemented as a separate class ( |
Is it feasible to add connect support to |
Of course. That is the idea, but the question is how to do it? In Spark 3.x devs made a decision to make two versions of
I did not want to touch the existing code in this PR. I would like to finalize the implementation as a standalone What do you think about it? |
To be honest, I like the idea of dispatch like in PySpark, but it will require big changes in the current For example, due to Spark Connect limitations it maybe tricky to return both |
Ohhhh wow, okay. Now this all makes sense :) I would not have thought that. Everything makes sense now. I think you're doing what makes sense. |
Questions / Topics
|
@SemyonSinchenko thanks for your work here, this is just awesome. I am going to give it a thorough review this weekend, sooner if I can. Will have questions borne of my lack of Scala knowledge as much as anything :) |
@rjurney What is the plan for the review? Will you review this changes, or should I try to ask someone else to review this? |
@SemyonSinchenko TODAY |
@SemyonSinchenko can you describe any changes to the user experience of GraphFrames? Today I am using |
For classic users nothing should changes, only some minor / questinable things. For example, I introduced the build of JAR to the process of building the graphframes PySpark. Maybe I should remove it. Should I? For connect users it is different. They should add graphframes-connect JAR to the Spark Connect Server part and add a spark conf like Answers:
|
@rjurney JFYI. Because I know about the breaking change in connect plugins system from Apache Spark 3.5.x to 4.0.x (the change, that in my understanding is already in the DBR 14.x and 15.x), I separated all the plugins logic from the plugin itself: the plugin itself is less than five logical lines of code to simplify the migration (and DBR shim) as much as possible. In Apache Spark 4.0.x the signature is changed to |
@SemyonSinchenko so is it ready for review then? |
@rjurney Yes, it is. I still need to resolve merge conflicts, but 99% of them are related to the tests, not to the logic. Also one additional method will be introduced to wrap also a recently added power iteration clustering. |
Classic tests are passed; ++some changes
@rjurney I resolved all the conflicts. An important changes you should check:
At the moment only part of tests are turned on for Connect because other part of tests is based on the |
Wow, awesome! I'll review it now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SemyonSinchenko I'm going ahead and approving this, although I would appreciate a brief explanation of where dispatch is implemented because I can't tell what the user experience of this new approach i.e. normal vs connect.
_sym_db = _symbol_database.Default() | ||
|
||
|
||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SemyonSinchenko what is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, do not look on this. This code is generated automatically by the protoc
(buf
)!
return _from_java_gf(jdf, self._spark) | ||
|
||
def filterEdges(self, condition: Union[str, Column]) -> "GraphFrame": | ||
def filterEdges(self, condition: str | Column) -> "GraphFrame": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What version of Python introduced this? Ahhh 3.5. Cool, I learned about it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is quite new, but it was backported in all supported versions, so all that you need to do is to add a line:
from __future__ import annotations
raise TypeError("condition should be string or Column") | ||
return _from_java_gf(jdf, self._spark) | ||
|
||
return GraphFrame._from_impl(self._impl.filterEdges(condition=condition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this exception handling handled elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the proposed design, all the exception are the responsibility of the implementation. So, answer is yes, it is handled in impl.
@SemyonSinchenko regarding the JAR, I think it is right to include it and think we should add it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm again
Work is in progress but the overall design is in the final state:
graphframes-connect
For JVM part generation is built in to the
build.sbt
; for Python (and possible other clients)buf
(buf.yaml, buf.gen.yaml)Close #447