Skip to content

Conversation

fcharras
Copy link
Contributor

@fcharras fcharras commented Jan 16, 2018

Ok, I think I did it! Parallel API now include two new keywords:

  • as_generator: if True the output of __call__ will be a generator. The user is free to iterate on said generator as he wishes (this does not affect the speed of computations).
  • async_output : if True the output of __call__ will not be sorted according to the input iterable, instead the first elements in the output list (or the output generator if as_generator = True ) are the results that are returned first by the child processes(or threads depending on the backend).

Those two parameters are totally independants. Also, all the nice features of joblib are left untouched (logging,...). Here a little code to test the result (requires PYTHON 3):

import time
import random
from joblib import Parallel, delayed


def some_function(i):
    time.sleep(0.001 * random.randint(1, 100))
    return (i + 1)


pool = Parallel(n_jobs=2, async_output=True, backend='multiprocessing')
pool2 = Parallel(n_jobs=2, async_output=False, backend='multiprocessing')

res = pool(delayed(some_function)(i) for i in range(100))
res2 = pool2(delayed(some_function)(i) for i in range(100))
print(res) # order is different at each call
print(res2) # order is as expected
assert(sorted(res) == res2)

/!\ for now async_output only works in python 3, with backend = multiprocessing. I've added some foundations for support for error_callback keyword in _parallel_backend.

I've also added a helper function get_last_async_result that is useful if the user wishes to add callbacks inside the input iterable.

Implements #79, #217, and follows the previous (closed) PR #582, #585, #586 (that partially implemented the present PR).

@codecov
Copy link

codecov bot commented Jan 16, 2018

Codecov Report

Merging #587 into master will increase coverage by 0.07%.
The diff coverage is 96.59%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #587      +/-   ##
==========================================
+ Coverage   95.17%   95.24%   +0.07%     
==========================================
  Files          39       39              
  Lines        5427     5534     +107     
==========================================
+ Hits         5165     5271     +106     
- Misses        262      263       +1
Impacted Files Coverage Δ
joblib/pool.py 93.27% <100%> (+1.89%) ⬆️
joblib/test/test_parallel.py 96.66% <100%> (+0.52%) ⬆️
joblib/test/common.py 86.79% <100%> (+0.25%) ⬆️
joblib/_parallel_backends.py 95.45% <89.47%> (-0.24%) ⬇️
joblib/parallel.py 97.97% <95.16%> (-0.75%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e101321...4c6b5cf. Read the comment docs.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 16, 2018

There were a race condition that I just fixed (dumb error: the definition of job and job_idx had been switched in _dispatch).

@fcharras
Copy link
Contributor Author

Ok I've reverted some changes (especially using deque instead of lists) and now the tests passes on older python verison, hurrah.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 16, 2018

All the test passed except the pep8 test, it seems it was because an intermediate faulty commit. I rebased and squashed all commits to fix this test. Hope everything finally becomes green...
edit: all green!

@fcharras fcharras force-pushed the async_option branch 4 times, most recently from b77db6a to e95fea5 Compare January 17, 2018 09:58
@lesteve
Copy link
Member

lesteve commented Jan 17, 2018

Thanks a lot for your PR! This is quite a significant change and it's not likely that I will have time to review this in the near future.

Off the top of my head: the as_generator thing is what I was trying to aim for and could not get it to work. This maybe the first aspect I look at when I get the time.

In general it's best not to do too many things in one PR. Small focussed PRs increases the likelihood of getting it merged one day.

BTW I cancelled the Travis builds on this PR because there were taking 10+ hours to complete, not sure why. Update: Travis seems to be having issues: https://www.traviscistatus.com/

@fcharras
Copy link
Contributor Author

fcharras commented Jan 17, 2018

The reason the tests froze is because Travis was down last night, it's up again now. Before this the tests were green, unfortunately the history of the previous tests have been lost in the rebase. I've only added some refactoring + a unittest for the async parameter, those haven't been tested yet but it passes on my local machine, I think you can enable it again.

I'd be happy to help the review process :) it could be broken down into two PR: one for the generator output, and another for the async part (those are 100% independant, except for some refactoring).

Hints for the review: overall there is no radical change to parallel.py, it's the same strategy and methods than before but the queues I/O are managed slightly differently. The default behavior is still the same than current master (I haven't removed or altered any of the previous unittests). The additions to the other files are minors (two unittests and adding error_callback keyword to async_apply in _parallel_backend).

@lesteve
Copy link
Member

lesteve commented Jan 17, 2018

I quickly checked and your PR has the same problem as my old attempt in https://github.com/lesteve/joblib/tree/parallel-return-generator: the program hangs if you don't consume completely the generator. Here is a snippet that highlights the problem in your PR:

import numpy as np

from joblib import Parallel, delayed


def func(arg):
    print('func({})'.format(arg))
    result = np.zeros(int(1e7))
    result[0] = arg
    return result


result = Parallel(
    n_jobs=2, as_generator=True,
    verbose=2, backend='multiprocessing')(
        delayed(func)(i) for i in range(10))

next(result)
next(result)

# Comment this out and the program does not hang
# list(result)

del result

Output:

func(0)
func(1)
func(2)
func(3)

Troubleshooting this problem is a great first challenging step.

@fcharras
Copy link
Contributor Author

Interestingly this does not hang with backend = 'loky' and backend = 'threading'. The generator seems to work fine with those backends so there's at least that.

When the generator is deleted, it raises an error that is caught and processed by _handle_error as expected. What seems to happens with backend = multiprocessing is that backend.abort_everything(ensure_ready=ensure_ready) , which triggers multiprocessing.Pool.terminate(), ends in deadlocks somewhere in the CustomizablePicklingQueue in pool.py. There seems to be an unexpected interaction between multiprocessing.Pool.terminate()and CustomizablePicklingQueue...

@fcharras
Copy link
Contributor Author

fcharras commented Jan 17, 2018

It seems that the following is happening in CustomizablePicklingQueue in pool.py ? Indeed the following line hangs: self._writer.send_bytes(buffer.getvalue()).

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

Source: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe

@fcharras
Copy link
Contributor Author

Replacing super(MemmappingPool, self).terminate() with super(MemmappingPool, self).join() removes the lock! the issue is that terminate is what we really want, but we're getting closer...

@fcharras
Copy link
Contributor Author

fcharras commented Jan 17, 2018

This patch solves the deadlock on my local machine. What seemed to happen is that a message was sent to a killed process, resulting in a deadlock because the processe would never answer. The solution is to close the faulty pipe before terminating the process. I don't exactly understand the role of each of the four pipes in PicklingPool though. Closing the other pipes does nothing or creates BrokenPipe errors.

@lesteve
Copy link
Member

lesteve commented Jan 17, 2018

Nice, well done! IMO it is better to open a PR with only the as_generator functionality. Personally I think that it is likely to be merged more easily than the one about async_output, I could be wrong about this though. Please add a test based on my snippet for all the backends. Have a look at my old branch to see whether you can reuse some of the tests I wrote at the time: master...lesteve:parallel-return-generator

Not sure about the PicklingPool and its four pipes. Maybe @ogrisel can comment on this and on the fix that you found (pretty much do self._inqueue._reader.close() before .terminate()).

I have some old benchmarks that I could rerun to see how we reduce the memory consumption when return an iterator rather than a list.

franck added 5 commits April 25, 2018 22:22
@fcharras fcharras closed this Aug 18, 2018
@letalvoj
Copy link

letalvoj commented May 2, 2021

@lesteve this would be so cool to have ... any chance this could get revisited?

@fcharras
Copy link
Contributor Author

The return_generator part is being developed here #588

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants