-
Notifications
You must be signed in to change notification settings - Fork 434
[MRG] Add a dask.distributed example #613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MRG] Add a dask.distributed example #613
Conversation
For discussion and early review. Cc @TomAugspurger |
Codecov Report
@@ Coverage Diff @@
## master #613 +/- ##
========================================
- Coverage 95.29% 95.2% -0.1%
========================================
Files 39 39
Lines 5462 5462
========================================
- Hits 5205 5200 -5
- Misses 257 262 +5
Continue to review full report at Codecov.
|
return i | ||
|
||
with joblib.parallel_backend('dask.distributed', | ||
scheduler_host=client.scheduler.address): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client.scheduler.address
isn't always available, depending on how the type of cluster (It is available for the LocalCluster
created automatically for Client()
, not necessarily for others)
I believe the recommended way would be
address = client.scheduler_info()['address']
with joblib.parallel_backend('dask.distributed',
scheduler_host=address):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client.scheduler.address is always available, in this case scheduler
is actually the connection object, not the scheduler itself. In either event though you don't need either.
Also, if relevant, I hope to release dask.distributed within a week.
|
||
Realistic usage scenario: combining dask code with joblib code, for | ||
instance using dask for preprocessing data, and scikit-learn for machine | ||
learning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you consider "prototyping a solution, to later be run on a truly distributed cluster" a "realistic usage scenario".
That (prototyping, before moving to a cluster) and the diagnostics dashboard are my two most common use cases for using the distributed scheduler on a single-machine.
@@ -0,0 +1,57 @@ | |||
""" | |||
Using distributed for single_machine parallel computing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single_machine -> single machine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed
. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask
here, or, if preferred, dask.distributed
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll use dask.distributed.
This naming thing is quite confusing (I can't blame you, it's an error that I been into over and over, first with enthought.mayavi => mayavi, and later with scikits.learn => scikit-learn (imported as sklearn)). However, it will confuse the users, and even myself, as it makes the difference and the boundary between projects quite blurry.
import distributed.joblib # noqa | ||
|
||
############################################################################### | ||
# Run parallel computation using dask.distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could add dask to intersphinx to link to their documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yey! Great idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love to see this happen. I have no experience with intersphinx myself but have seen it in use more often more recently and have liked what I've seen.
return i | ||
|
||
|
||
with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would even add backend='dask.distributed'
or add a small discussion after the title (it seems a bit empty there).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I am not 100% sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should suffice if you have already created a client (requires master). You also don't need the address =
line above
with joblib.parallel_backend('dask'):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now backend='dask' is very confusing, because, as @TomAugspurger was explaining to me, by default dask doesn't use the distributed backend, but a threading one.
I'll wait for a release of distributed to update this example, as I would like it to run with released version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but you'll never use this with the legacy threaded scheduler, it isn't sufficiently flexible to handle Joblib's dynamism. There is only one relevant use case for Dask here, and it's the newer scheduler.
@@ -16,7 +16,7 @@ conda update --yes --quiet conda | |||
conda create -n $CONDA_ENV_NAME --yes --quiet python=3 | |||
source activate $CONDA_ENV_NAME | |||
|
|||
conda install --yes --quiet pip numpy sphinx matplotlib pillow | |||
conda install --yes --quiet pip numpy sphinx matplotlib pillow dask distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just dask
suffices here. The core package is now called dask-core
while dask
is a metapackage that includes distributed
and a few other packages (like numpy, pandas, ...)
@@ -0,0 +1,57 @@ | |||
""" | |||
Using distributed for single_machine parallel computing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed
. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask
here, or, if preferred, dask.distributed
.
############################################################################### | ||
# Setup the distributed client | ||
############################################################################### | ||
from distributed import Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly we tend to encourage from dask.distributed import Client
in examples
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is terribly confusing, you realize. I haven't looked at the codebases of the package, but my mental model right now is a bit lost as to what package does what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not that confusing if you're coming to it fresh (which most people are). It's really only a pain for the old hands who were around when we called the thing distributed
on its own.
return i | ||
|
||
|
||
with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should suffice if you have already created a client (requires master). You also don't need the address =
line above
with joblib.parallel_backend('dask'):
# Recover the address | ||
address = client.scheduler_info()['address'] | ||
|
||
# This import registers the dask backend for joblib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imports and registers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/dask/dask.distributed/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that that was correct: import is used as a noun here.
joblib.Parallel(n_jobs=2, verbose=100)( | ||
joblib.delayed(long_running_function)(i) | ||
for i in range(10)) | ||
# We can check that joblib is indeed using the dask.distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reword without we
I can't get the intersphinx mapping to work. Is distributed.Client documented in what is captured by intersphinx? |
Also, if relevant, I hope to release dask.distributed within a week.
Cool!
I think that I'd like to merge this example first, and modify it as soon
as you release dask.distributed.
I also hope that we'll release an alpha of the new joblib soon after.
That way, we'll all be in production soon.
|
Right, but you'll never use this with the legacy threaded scheduler, it isn't
sufficiently flexible to handle Joblib's dynamism. There is only one relevant
use case for Dask here, and it's the newer scheduler.
OK, and the threaded scheduler is going away? That would explain part of
my confusion.
|
No it's staying around. It's useful if you don't have tornado, are allergic to dependencies (it's stdlib only), or if you're doing relatively straightforward dask.array work. The newer scheduler is generally a more robustly a good choice though. |
I found the problem with intersphinx: Client is documented as
distributed.client.Client:
https://github.com/dask/distributed/blob/master/docs/source/api.rst
while we use it as distributed.Client.
I believe that the "currentmodule" should be changed in the file above.
|
Is there a way for us to flatten the namespace on our end? |
are allergic to dependencies (it's stdlib only),
Thanks for having my health in mind :).
|
Ha! |
Is there a way for us to flatten the namespace on our end?
In the API documentation file, just use as a "currentmodule" "dask": the
currentmodule will define what sphinx considers is the public path of the
objects.
|
|
||
|
||
with joblib.parallel_backend('dask.distributed', scheduler_host=address): | ||
joblib.Parallel(n_jobs=2, verbose=100)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current master, there is no need to put n_jobs=2
here anymore.
for i in range(10)) | ||
# Check that joblib is indeed using the dask.distributed | ||
# backend | ||
print(joblib.Parallel(n_jobs=1)._backend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer necessary (on current master), I have added the active backend name in the verbose output of the call to Parallel
.
fd6cfdb
to
c9d6626
Compare
I've addressed all issues, and CI is green. Can I have merge? |
############################################################################### | ||
# The verbose messages below show that the backend is indeed the | ||
# dask.distributed one | ||
with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding scheduler_host=address
is no longer strictly necessary. Dask will use the most recently created Client by default.
Leaving it in is ok too.
Hence it is visible on the example, which is better
e18d12a
to
d110ba5
Compare
Merging this guy. |
* tag '0.12': (116 commits) Release 0.12 typo typo typo ENH add initializer limiting n_threads for C-libs (joblib#701) DOC better parallel docstring (joblib#704) [MRG] Nested parallel call thread bomb mitigation (joblib#700) MTN vendor loky2.1.3 (joblib#699) Make it possible to configure the reusable executor workers timeout (joblib#698) MAINT increase timeouts to make test more robust on travis DOC: use the .joblib extension instead of .pkl (joblib#697) [MRG] Fix exception handling in nested parallel calls (joblib#696) Fix skip test lz4 not installed (joblib#695) [MRG] numpy_pickle: several enhancements (joblib#626) Introduce Parallel.__call__ backend callbacks (joblib#689) Add distributed on readthedocs (joblib#686) Support registration of external backends (joblib#655) [MRG] Add a dask.distributed example (joblib#613) ENH use cloudpickle to pickle interactively defined callable (joblib#677) CI freeze the version of sklearn0.19.1 and scipy1.0.1 (joblib#685) ...
A simple dask.distributed example