Skip to content

Conversation

GaelVaroquaux
Copy link
Member

A simple dask.distributed example

@GaelVaroquaux
Copy link
Member Author

For discussion and early review. Cc @TomAugspurger

@codecov
Copy link

codecov bot commented Jan 26, 2018

Codecov Report

Merging #613 into master will decrease coverage by 0.09%.
The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff            @@
##           master    #613     +/-   ##
========================================
- Coverage   95.29%   95.2%   -0.1%     
========================================
  Files          39      39             
  Lines        5462    5462             
========================================
- Hits         5205    5200      -5     
- Misses        257     262      +5
Impacted Files Coverage Δ
joblib/_parallel_backends.py 95.25% <0%> (-1.3%) ⬇️
joblib/test/test_memory.py 97.8% <0%> (-0.37%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9d52110...d110ba5. Read the comment docs.

return i

with joblib.parallel_backend('dask.distributed',
scheduler_host=client.scheduler.address):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.scheduler.address isn't always available, depending on how the type of cluster (It is available for the LocalCluster created automatically for Client(), not necessarily for others)

I believe the recommended way would be

address = client.scheduler_info()['address']

with joblib.parallel_backend('dask.distributed',
                             scheduler_host=address): 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.scheduler.address is always available, in this case scheduler is actually the connection object, not the scheduler itself. In either event though you don't need either.

Also, if relevant, I hope to release dask.distributed within a week.


Realistic usage scenario: combining dask code with joblib code, for
instance using dask for preprocessing data, and scikit-learn for machine
learning.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider "prototyping a solution, to later be run on a truly distributed cluster" a "realistic usage scenario".

That (prototyping, before moving to a cluster) and the diagnostics dashboard are my two most common use cases for using the distributed scheduler on a single-machine.

@GaelVaroquaux GaelVaroquaux changed the title [WIP] Add a dask.distributed example [MRG] Add a dask.distributed example Jan 26, 2018
@@ -0,0 +1,57 @@
"""
Using distributed for single_machine parallel computing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single_machine -> single machine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask here, or, if preferred, dask.distributed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use dask.distributed.

This naming thing is quite confusing (I can't blame you, it's an error that I been into over and over, first with enthought.mayavi => mayavi, and later with scikits.learn => scikit-learn (imported as sklearn)). However, it will confuse the users, and even myself, as it makes the difference and the boundary between projects quite blurry.

import distributed.joblib # noqa

###############################################################################
# Run parallel computation using dask.distributed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add dask to intersphinx to link to their documentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yey! Great idea!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to see this happen. I have no experience with intersphinx myself but have seen it in use more often more recently and have liked what I've seen.

return i


with joblib.parallel_backend('dask.distributed', scheduler_host=address):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would even add backend='dask.distributed' or add a small discussion after the title (it seems a bit empty there).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am not 100% sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should suffice if you have already created a client (requires master). You also don't need the address = line above

with joblib.parallel_backend('dask'):

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now backend='dask' is very confusing, because, as @TomAugspurger was explaining to me, by default dask doesn't use the distributed backend, but a threading one.

I'll wait for a release of distributed to update this example, as I would like it to run with released version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but you'll never use this with the legacy threaded scheduler, it isn't sufficiently flexible to handle Joblib's dynamism. There is only one relevant use case for Dask here, and it's the newer scheduler.

@@ -16,7 +16,7 @@ conda update --yes --quiet conda
conda create -n $CONDA_ENV_NAME --yes --quiet python=3
source activate $CONDA_ENV_NAME

conda install --yes --quiet pip numpy sphinx matplotlib pillow
conda install --yes --quiet pip numpy sphinx matplotlib pillow dask distributed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just dask suffices here. The core package is now called dask-core while dask is a metapackage that includes distributed and a few other packages (like numpy, pandas, ...)

@@ -0,0 +1,57 @@
"""
Using distributed for single_machine parallel computing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask here, or, if preferred, dask.distributed.

###############################################################################
# Setup the distributed client
###############################################################################
from distributed import Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly we tend to encourage from dask.distributed import Client in examples

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is terribly confusing, you realize. I haven't looked at the codebases of the package, but my mental model right now is a bit lost as to what package does what.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that confusing if you're coming to it fresh (which most people are). It's really only a pain for the old hands who were around when we called the thing distributed on its own.

return i


with joblib.parallel_backend('dask.distributed', scheduler_host=address):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should suffice if you have already created a client (requires master). You also don't need the address = line above

with joblib.parallel_backend('dask'):

# Recover the address
address = client.scheduler_info()['address']

# This import registers the dask backend for joblib
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports and registers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/dask/dask.distributed/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that that was correct: import is used as a noun here.

joblib.Parallel(n_jobs=2, verbose=100)(
joblib.delayed(long_running_function)(i)
for i in range(10))
# We can check that joblib is indeed using the dask.distributed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reword without we

@GaelVaroquaux
Copy link
Member Author

I can't get the intersphinx mapping to work. Is distributed.Client documented in what is captured by intersphinx?

@GaelVaroquaux
Copy link
Member Author

GaelVaroquaux commented Jan 26, 2018 via email

@GaelVaroquaux
Copy link
Member Author

GaelVaroquaux commented Jan 26, 2018 via email

@mrocklin
Copy link
Contributor

OK, and the threaded scheduler is going away? That would explain part of
my confusion.

No it's staying around. It's useful if you don't have tornado, are allergic to dependencies (it's stdlib only), or if you're doing relatively straightforward dask.array work. The newer scheduler is generally a more robustly a good choice though.

@GaelVaroquaux
Copy link
Member Author

GaelVaroquaux commented Jan 26, 2018 via email

@mrocklin
Copy link
Contributor

Is there a way for us to flatten the namespace on our end?

@GaelVaroquaux
Copy link
Member Author

GaelVaroquaux commented Jan 26, 2018 via email

@mrocklin
Copy link
Contributor

Ha!

@GaelVaroquaux
Copy link
Member Author

GaelVaroquaux commented Jan 27, 2018 via email



with joblib.parallel_backend('dask.distributed', scheduler_host=address):
joblib.Parallel(n_jobs=2, verbose=100)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current master, there is no need to put n_jobs=2 here anymore.

for i in range(10))
# Check that joblib is indeed using the dask.distributed
# backend
print(joblib.Parallel(n_jobs=1)._backend)
Copy link
Contributor

@ogrisel ogrisel Feb 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer necessary (on current master), I have added the active backend name in the verbose output of the call to Parallel.

@GaelVaroquaux
Copy link
Member Author

I've addressed all issues, and CI is green.

Can I have merge?

###############################################################################
# The verbose messages below show that the backend is indeed the
# dask.distributed one
with joblib.parallel_backend('dask.distributed', scheduler_host=address):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding scheduler_host=address is no longer strictly necessary. Dask will use the most recently created Client by default.

Leaving it in is ok too.

@GaelVaroquaux GaelVaroquaux force-pushed the distributed_example branch from e18d12a to d110ba5 Compare May 28, 2018 17:53
@GaelVaroquaux
Copy link
Member Author

Merging this guy.

@GaelVaroquaux GaelVaroquaux merged commit 2a31976 into joblib:master May 28, 2018
yarikoptic added a commit to yarikoptic/joblib that referenced this pull request Jul 28, 2018
* tag '0.12': (116 commits)
  Release 0.12
  typo
  typo
  typo
  ENH add initializer limiting n_threads for C-libs (joblib#701)
  DOC better parallel docstring (joblib#704)
  [MRG] Nested parallel call thread bomb mitigation (joblib#700)
  MTN vendor loky2.1.3 (joblib#699)
  Make it possible to configure the reusable executor workers timeout (joblib#698)
  MAINT increase timeouts to make test more robust on travis
  DOC: use the .joblib extension instead of .pkl (joblib#697)
  [MRG] Fix exception handling in nested parallel calls (joblib#696)
  Fix skip test lz4 not installed (joblib#695)
  [MRG] numpy_pickle:  several enhancements (joblib#626)
  Introduce Parallel.__call__ backend callbacks (joblib#689)
  Add distributed on readthedocs (joblib#686)
  Support registration of external backends (joblib#655)
  [MRG] Add a dask.distributed example (joblib#613)
  ENH use cloudpickle to pickle interactively defined callable (joblib#677)
  CI freeze the version of sklearn0.19.1 and scipy1.0.1 (joblib#685)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants