Skip to content

Conversation

GerHobbelt
Copy link

magic number 7: the big one. Fixes various race conditions surrounding normal and abnormal threadpool / application termination.

This has been what I've been hunting down the last couple of days, while using thread-pool in larger applications.

This PR incorporates #70 and #71 as without those, the solution is still half-arsed.1 Merging all as-is should not give merge conflicts; at least didn't when I checked this myself.

Note:

While some of the PRs that spun off off this one are due to code review, this was a scorcher observed repeatedly on the mentioned hardware. The 'odd stuff' mentioned below has been observed in the wild with both thread_pool v2 and v3; while those v2 vs. v3 codebases are quite different in places, it turned out they both suffer from the same set of issues, only to be triggered by different circumstances perhaps.

Where it is mentioned that the thread(s) were spinning like mad, this was mostly when switching back & forth between regular and paused mode; part of the craziness was observed in the debugger, part of it was deduced from code review as the application at times refused to behave sensibly when interrupted by a debugger, so we had to reason through the codebase, as debugging was significantly changing the timing and causing all kinds of other artefacts. 😪

Turns out at least on MSWindows when application code calls the exit() RTL API to abort the application (or calls abort() or otherwise) that threadpool worker threads may fail to terminate (causing lockup at application exit) or are already 'disappeared' before that last wait_for_tasks() gets a chance to tickle them into terminating.

Yes, I know this commit introduces "that last wait_for_tasks()" call as can be seen in the destroy_threads() part of the diff, but that call is key to resolving the lockup situation: when lockup at application termination occurs, this is due to the thread workers not getting signaled to wake up when they just entered condition.wait() at that time. So what wait_for_tasks() must do is "keep on screaming", i.e. repeatedly invoking notify_all() to ensure all threads get a chance to reach their cond.wait() and then get signalled while the destroy_threads() has now switched to the 'shutdown mode' by setting 'running = false', which the worker threads can now see and act accordingly, i.e. terminate.

As this is a kind of a wild mix at run-time (some threads MAY still be busy with other stuff, not yet reaching their cond.wait() while others already have and have terminated) so the wait_for_tasks() also checks repeatedly whether any threads still need an opportunity to get notified -- this is taken care of by the repeated notify_all(); only when all threads have terminated (and thus the thread.is_joinable() API produces true for them all) does the call finish and pass control over to the next bit: the loop where threads are cleaned up through thread.join().

This commit assumes various other patches to have been included already; this is extracted from this series of dev commits:


Revision: a6ea090

  • better document the new purpose of sleep_duration:

    @brief The wait_for_tasks() poll cycle period, in milliseconds, where after each sleep_duration wait_for_tasks() will be incited to broadcast its waiting state to the worker threads so they wake from their potential short slumbers and check all shutdown / reactivation conditions and signal back when they've done so. Consequently, when there's no tasks pending or incoming, thread workers are not woken up, ever, unless wait_for_tasks() explicitly asks for them to wake up and check everything, including 'pause mode' and 'shutdown/cleanup' (running == 0).

    If set to 0, then instead of sleeping for a bit when termination conditions have not yet been met, wait_for_tasks() will execute std::this_thread::yield(). The default value is 10 milliseconds.

This re-introduces the sleep_duration member variable, but for a slightly different purpose than in v2.x - or so I gathered from code review while I upgraded from v2 to latest v3 of thread-pool.

Note that the sleep_duration value is only relevant to execution timing of wait_for_tasks() when one of these conditions apply:

  • the threadpool has been put into 'pause mode' and there are no more lingering tasks from the previous era being finished, while the queue stays in 'pause mode'.
  • the threadpool is shutting down (running==false, due to threadpool instance cleanup & destruction, usually part of an application shutdown)
     std::chrono::milliseconds sleep_duration = std::chrono::milliseconds(10);
  • Limit the maximum poll period of wait_for_tasks() to 1 second. (This is only relevant in spurious 'exit on catastrophic failure' cases and has yet to be observed 'in the wild'. Yet the poll period is important for the 'yell at worker threads to wake up and check' bit that makes them terminate (as intended) when you're in shutting-down mode.

Revision: 12253b6

fixes:

  • push_task(): document which (member) variables are under which mutex' overwatch and make sure the code matches this.

    • Case in point: the tasks_total counter MUST be kept in sync with the actual tasks queue size hence it must be managed by the same mutex, or you will have situations where get_tasks_running() is lying to you and we CANNOT afford that.
    • Second case in point: task_done_cv has an opposing purpose and MUST be wrapped by its own mutex to prevent deadlock between wait_for_tasks() and any worker threads. Introducing task_done_mutex for that.
  • reset(): first (re)create threads, only then unpause() them -- when they weren't paused already before. Use the unpause() API explicitly as it has been augmented and signals the threadpool to observe the state change (going out of 'pause mode').

    Previously, this wasn't necessary, because the threadpool threads would cycle like crazed rodents on meth when they got put into 'pause mode'; now we have improved the waiting game any state change that expects the threadpool threads to start working again requires a signal to them (.notify_all()). That signal sent by unpause() will only be observed by the threads when we're sure they are actively waiting already, hence they MUST have been created again first.

  • unpause(): see above. Now signals all worker threads that sleepy times are over and the task queue must be checked, once again. Previous code didn't need this as the worker threads were not sleeping in `pause mode' but were continuously polling.

augmentations:

  • [TO BE SUBMITTED] now accepts NEGATIVE thread_counts during pool creation (or reset()) to create a pool that's occupying all CPU cores MINUS thread_count. This is useful when you want the pool to occupy ALMOST all cores, but leave one or more for other, unrelated, jobs.

  • thread workers now wait for a signal to wake up when in 'pause mode', as they always already were in normal run mode.

    Q: Why don't the thread workers have to use wait_for() with a timeout to allow them to check independent state changes, such as 'running', to help them detect when the application/threadpool is shutting down/destroyed?

    A: because the threadpool destructor invokes wait_for_tasks(), which incessantly will broadcast to the workers, waking them periodically to have them inspect the shutdown state until they all have responded. wait_for_tasks() already necessitated the use of a wait_for() timeout-based poll loop already for other reasons (one of both sides of the fence need to timeout and periodically check spurious / race conditions, because .notify_one() and .notify_all(), by definition, do not wait for the intended observers to start observing through their .wait() calls. wait_for_tasks(), as part of its own 'poll loop', will repeatedly send those signals (.notify_all()) to reach all worker threads, no matter how busy and occupied they have been with previous tasks. This same 'keep on yelling' approach helps us when shutting down the pool as the same repeated .notify_all() will now reach all threads in a most timely fashion to have each worker thread check the shutdown (running) state and act accordingly. Which is why we can safely have the worker threads use the simpler .wait()-only approach.

    Note that, at least under Windows 10, threads MAY be nuked silently under fatal/crash/exception conditions, in which case the thread disappears even before it was able to do anything simple, like updating the alive_threads_count count, because the thread worker code simply will cease to run and exist. Fortunately, thread::is_joinable() detects this fact -- which was a very important reason to have wait_for_tasks() be a wait_for()-timed poll loop, as we CANNOT guarantee on all OSes that worker threads will be able to wake up and act once a severe-enough fault condition has occurred in the application/task code. From our own observations, it already seems sufficient to directly, bluntly, call the standard exit(N) RTL API to have this happen to you: no exception of any kind will be reported then, yet all threads will be joinable as they will have vanished already.

WARNING: hence, get_alive_threads_count() will be unsafely optimistic and depending on that number at such disparate times will surely cause your application to lock up on exit on semi-random occasions. This is completely independent of the running state, as this is driven by external factors.

  • [TO BE SUBMITTED] catch unexpected C++ and SEH/hardware exceptions occurring in your tasks/worker-threads in the outer layers of the worker thread: as this is a catastrophic, fatal, condition anyway (your application state is largely unpredictable by then already), the thread will terminate, but we now have a fighting chance of catching and reporting such errors at least. As C++ and SEH exception handling cannot co-exist in a single function, we have the following call chain, where each wraps the next one: workerthread_main() --> __worker_SEH() --> __worker() --> worker(), where worker() is the core threadpool thread code, waiting for and executing tasks once they arrive.

    When you need special handling of this (fatal) scenario in your application, you can create a derived class and override the workerthread_main() method with your own. Observe the code comments when you do to ensure continued proper operation of the threadpool. (__worker_SEH() returns a boolean indicating whether its termination was due to normal or abnormal (i.e. catastrophic failure) termination, while, when starting, you may pass a string to them, which will now have been filled with the relevant and available error information. The given implementation prints this info the STDERR -- but you can replace that behaviour in your override.


Testing

This was tested as part of a larger work (other PRs are forthcoming shortly) after hunting down shutdown issues (application lockups, etc.) in a large application.

Tested this code via your provided test code rig; see my own fork and the referenced commits which point into there.

Tested on AMD Ryzen 3700X, 128GB RAM, latest Win10/64, latest MSVC2019 dev environment. Using in-house project files which use a (in-house) standardized set of optimizations.

Additional information

TBD

The patches are hopefully largely self-explanatory. Where deemed useful, the original commit messages from the dev fork have been referenced and included.

Footnotes

  1. ok, those are minor details when viewed in perspective but I consider them part of all this, as I can still spot opportunities for trouble when those are not included. Hence they're part of this patch, as I did my best to make this as rock-solid as I could. Hopefully Sunday didn't led me astray there. 😅✝️

…g normal and abnormal threadpool / application termination.

Turns out at least on MSWindows when application code calls the exit() RTL API to abort the application (or calls abort() or otherwise) that threadpool worker threads may fail to terminate (causing lockup at application exit) or are already 'disappeared' before that last wait_for_tasks() gets a chance to tickle them into terminating.

Yes, I know this commit introduces "that last wait_for_tasks()" call as can be seen in the `destroy_threads()` part of the diff, but that call is *key* to resolving the lockup situation: when lockup at application termination occurs, this is due to the thread workers not getting signaled to wake up when they just entered condition.wait() at that time. So what wait_for_taks() must do is "keep on screaming", i.e. *repeatedly* invoking notify_all() to ensure all threads get a chance to reach their cond.wait() and then get signalled while the destroy_threads() has now set the 'shutdown mode' by setting 'running = false', which the worker threads can now see and act accordingly, i.e. terminate.

As this is a kind of a wild mix (some threads MAY still be busy with other stuff, not yet reaching their cond.wait() while others already have and have terminated) so the wait_for_tasks() also checks repeatedly whether any threads still need an opportunity to get notified; only when all threads have terminated (and thus the thread.is_joinable() API produces TRUE for them all) does the call finish and pass control over to the next bit: the loop where threads are cleaned up through thread.join().

This commit assumes various other patches to have been included already; this is extracted from this series of dev commits:

---

Revision: a6ea090

- better document the new purpose of `sleep_duration`:

  @brief The `wait_for_tasks()` poll cycle period, in milliseconds, where after each `sleep_duration` wait_for_tasks() will be incited to broadcast its waiting state to the worker threads so they wake from their potential short slumbers and check all shutdown / reactivation conditions and signal back when they've done so. Consequently, when there's no tasks pending or incoming, thread workers are not woken up, ever, unless `wait_for_tasks()` explicitly asks for them to wake up and check everything, including 'pause mode' and 'shutdown/cleanup' (running == 0).

  If set to 0, then instead of sleeping for a bit when termination conditions have not yet been met, `wait_for_tasks()` will execute std::this_thread::yield(). The default value is 10 milliseconds.

  Note that the `sleep_duration` value is only relevant to execution timing of `wait_for_tasks()` when one of these conditions apply:
  - the threadpool has been put into 'pause mode' and there are no more lingering tasks from the previous era being finished, while the queue stays in 'pause mode'.
  - the threadpool is shutting down (running==false, due to threadpool instance cleanup & destruction, usually part of an application shutdown)

       std::chrono::milliseconds sleep_duration = std::chrono::milliseconds(10);

- Limit the maximum poll period of `wait_for_tasks()` to 1 second. (This is only relevant in spurious 'exit on catastrophic failure' cases and has yet to be observed 'in the wild'. Yet the poll period is important for the 'yell at worker threads to wake up and check' bit that makes them terminate (as intended) when you're in shutting-down mode.

Revision: 12253b6

fixes:

- push_task(): document which (member) variables are under which mutex' overwatch and makee sure the code matches this.
  + Case in point: the `tasks_total` counter MUST be kept in sync with the actual `tasks` queue size hence it must be managed by the same mutex, or you will have situations where `get_tasks_running()` is lying to you and we CANNOT afford that.
  + Second case in point: `task_done_cv` has an opposing purpose and MUST be wrapped by its own mutex to prevent deadlock between wait_for_tasks() and any worker threads. Introducing `task_done_mutex` for that.

- reset(): first (re)create threads, *only then* unpause() them -- when they weren't paused already *before*. Use the `unpause()` API explicitly as it has been augmented and signals the threadpool to observe the state change (going out of 'pause mode').

  Previously, this wasn't necessary, because the threadpool threads would cycle like crazed rodents on meth when they got put into 'pause mode'; now we have improved the waiting game any state change that expects the threadpool threads to start working again requires a signal to them (.notify_all()). That signal sent by `unpause()` will only be *observed* by the threads when we're sure they are actively waiting already, hence they MUST have been created again first.

- unpause(): see above. Now signals all worker threads that sleepy times are over and the task queue must be checked, once again. Previous code didn't need this as the worker threads were not sleeping in `pause mode' but were continuously polling.

augmentations:

- [TO BE SUBMITTED] now accepts NEGATIVE thread_counts during pool creation (or reset()) to create a pool that's occupying all CPU cores MINUS thread_count. This is useful when you want the pool to occupy ALMOST all cores, but leave one or more for other, unrelated, jobs.

- thread workers now wait for a signal to wake up when in 'pause mode', as they always already were in normal run mode.

  Q: Why don't the thread workers have to use wait_for() with a timeout to allow them to check independent state changes, such as 'running', to help them detect when the application/threadpool is shutting down/destroyed?

  A: because the threadpool destructor invokes wait_for_tasks(), which incessantly will broadcast to the workers, waking them periodically to have them inspect the shutdown state until they all have responded. wait_for_tasks() already necessitated the use of a wait_for() timeout-based poll loop already for other reasons (one of both sides of the fence need to timeout and periodically check spurious / race conditions, because .notify_one() and .notify_all(), by definition, do not wait for the intended observers to start observing through theeir .wait() calls. wait_for_tasks(), as part of its own 'poll loop', will repeatedly send those signals (.notify_all()) to reach all worker threads, no matter how busy and occupied they have been with previous tasks. This same 'keep on yelling' approach helps us when shutting down the pool as the same repeated .notify_all() will now reach all threads in a most timely fashion to have each worker thread check the shutdown (`running`) state and act accordingly. Which is why we can safely have the worker threads use the simpler .wait()-only approach.

  Note that, at least under Windows 10, threads MAY be nuked silently under fatal/crash/exception conditions, in which case the thread disappears even before it was able to do anything simple, like updating the `alive_threads_count` count, because the thread worker code simply will cease to run and exist. Fortunately, thread::is_joinable() detects this fact -- which was a very important reason to have `wait_for_tasks()` be a wait_for_tasks()-timed poll loop, as we CANNOT guarantee on all OSes that worker threads will be able to wake up and act once a severe-enough fault condition has occurred in the application/task code. From our own observations, it already seems sufficient to directly, bluntly, call the standard `exit(N)` RTL API to have this happen to you: no exception of any kind will be reported then, yet all threads will be joinable as they will have *vanished* already.

WARNING: hence, `get_alive_threads_count()` will be unsafely optimistic and depending on that number at such disparate times will surely cause your application to lock up on exit on semi-random occasions. This is completely independent of the `running` state, as this is driven by external factors.

- [TO BE SUBMITTED] catch unexpected C++ and SEH/hardware exceptions occurring in your tasks/worker-threads in the outer layers of the worker thread: as this is a catastrophic, fatal, condition anyway (your application state is largely unpredictable by then already), the thread will terminate, but we now have a fighting chance of catching and reporting such errors at least. As C++ and SEH exception handling cannot co-exist in a single function, we have the following call chain, where each wraps the next one: workerthread_main() --> __worker_SEH() --> __worker() --> worker(), where worker() is the core threadpool thread code, waiting for and executing tasks once they arrive.
  When you need special handling of this (fatal) scenario in your application, you can create a derived class and override the workerthread_main() with your own. Observe the code comments when you do to ensure continued proper operation of the threadpool. (__worker_SEH() returns a boolean indicating whether its termination was due to normal or abnormal (i.e. catastrophic failure) termination, while you may pass a string to them, which will now have been filled with the relevant and available error information. The given implementation prints this info the STDERR -- but you can replace that behaviour in your override.
@GerHobbelt
Copy link
Author

Note: the commit message itself has a couple of bad typos/autofill-mistakes; the pullreq message above has the corrected contents.

@GerHobbelt
Copy link
Author

See #78 (comment).

As stated there: this one needs to be reconsidered. There's still worth in here, but as it is, it's overzealous on the making-sure-it-allows-everyone-to-properly-kill-itself bit and -- I now believe -- DOES NOT need .wait_for() in wait_for_tasks() even for abnormal termination scenarios.

This'll take a few days, for I need to check & doublecheck my thought processes (2300 hours local now, so probably not my best time for a fresh think about this 😉) before I (re)submit.

We can do that with either a fresh Pull Req or do it as a follow-up in this one. Whichever way you prefer to continue this process. (I suggest closing this one and the follow-up can then ref this one by github mention.)

@bshoshany
Copy link
Owner

I see. Of course, take your time. I will close this pull request now. But if you believe there's a bug in the library, I would greatly appreciate if you first provided a minimal working example that reproduces the error, so I can see it for myself. This can be done as a follow-up here. Once I understand where the error comes from, I may have other ways to solve it, so don't create a new pull request just yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants