-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Use proper scheduler that honors the parallel task limit #14281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use proper scheduler that honors the parallel task limit #14281
Conversation
I haven't tested this but it looks like this has a cts leak (+ a few other tweaks) Index: MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
--- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs (revision a2fb1f63d9d30e09de4244e2b6085bb6ec0e75b6)
+++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs (date 1749650016869)
@@ -13,19 +13,16 @@
/// <summary>
/// Provides Parallel action interface to process tasks with a set concurrency level.
/// </summary>
-public class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler
+public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IDisposable
{
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
-
- private static AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
-
- /// <summary>
- /// Gets used to lock all operations on the Tasks queue and creating workers.
- /// </summary>
+ private readonly BlockingCollection<TaskQueueItem> _tasks = new();
private readonly Lock _taskLock = new();
+ private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
+
private Task? _cleanupTask;
/// <summary>
@@ -39,8 +36,6 @@
_logger = logger;
}
- private BlockingCollection<TaskQueueItem> Tasks { get; set; } = new();
-
private void ScheduleTaskCleanup()
{
lock (_taskLock)
@@ -57,13 +52,13 @@
void RunCleanupTask()
{
_cleanupTask = Task.Delay(TimeSpan.FromSeconds(10)).ContinueWith(
- t =>
+ _ =>
{
lock (_taskLock)
{
- if (Tasks.Count > 0)
+ if (_tasks.Count > 0)
{
- // tasks are still there so its still in use. Reschedule cleanup task.
+ // tasks are still there so it is still in use. Reschedule cleanup task.
// we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
RunCleanupTask();
return;
@@ -88,8 +83,7 @@
var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
for (int i = 0; i < parallelism; i++)
{
- var stopToken = new CancellationTokenSource();
- var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
+ var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(new CancellationTokenSource().Token, _hostApplicationLifetime.ApplicationStopping);
_taskRunners.Add(
combinedSource,
Task.Factory.StartNew(
@@ -108,19 +102,20 @@
_deadlockDetector.Value = stopToken;
try
{
- foreach (var item in Tasks.GetConsumingEnumerable(stopToken.Token))
+ foreach (var item in _tasks.GetConsumingEnumerable(stopToken.Token))
{
await ProcessItem(item).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
{
- // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
+ // Task was cancelled, nothing to do.
}
finally
{
- _deadlockDetector.Value = default!;
+ _deadlockDetector.Value = null!;
_taskRunners.Remove(stopToken);
+ stopToken.Dispose();
}
}
@@ -130,15 +125,15 @@
{
if (item.CancellationToken.IsCancellationRequested)
{
- // if item is cancled, just skip it
+ // if item is cancelled, just skip it
return;
}
await item.Worker(item.Data).ConfigureAwait(true);
}
- catch (System.Exception ex)
+ catch (Exception ex)
{
- _logger.LogError(ex, "Error while performing a library operation.");
+ _logger.LogError(ex, "Error while performing a library operation");
}
finally
{
@@ -167,7 +162,7 @@
{
// round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
var innerPercentRounded = Math.Round(innerPercent);
- if (queueItem!.ProgressValue != innerPercentRounded)
+ if (queueItem.ProgressValue != innerPercentRounded)
{
queueItem.ProgressValue = innerPercentRounded;
UpdateProgress();
@@ -181,13 +176,13 @@
for (var i = 0; i < workItems.Length; i++)
{
var item = workItems[i]!;
- Tasks.Add(item, CancellationToken.None);
+ _tasks.Add(item, CancellationToken.None);
}
if (_deadlockDetector.Value is not null)
{
- // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achived
- while (workItems.Any(e => !e.Done.Task.IsCompleted) && Tasks.TryTake(out var item, 0, _deadlockDetector.Value.Token))
+ // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
+ while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 0, _deadlockDetector.Value.Token))
{
await ProcessItem(item).ConfigureAwait(false);
}
@@ -200,6 +195,13 @@
}
}
+ /// <inheritdoc />
+ public void Dispose()
+ {
+ _tasks.Dispose();
+ _cleanupTask?.Dispose();
+ }
+
private class TaskQueueItem
{
public required object Data { get; init; } |
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
yea good catch on the CTS, totally overlooked that one |
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Show resolved
Hide resolved
…yLibraryScheduler.cs Co-authored-by: Cody Robibero <cody@robibe.ro>
No changes to OpenAPI specification found. See history of this comment for previous changes. |
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Show resolved
Hide resolved
Dont use longrunning Tasks
…tConcurrencyOnScan
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a spelling pass, testing is later
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
Outdated
Show resolved
Hide resolved
Seems to be working fine for me, but we shouldn't add more usages to the BaseItem.ConfigurationManager if we don't absolutely have to Index: MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
--- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs (revision 3ea69ff0f219af32f41c507710679f38bac4dd1c)
+++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs (date 1749917894574)
@@ -5,7 +5,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using MediaBrowser.Controller.Entities;
+using MediaBrowser.Controller.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -19,6 +19,7 @@
private const int CleanupGracePeriod = 60;
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
+ private readonly IServerConfigurationManager _serverConfigurationManager;
private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
@@ -39,10 +40,15 @@
/// </summary>
/// <param name="hostApplicationLifetime">The hosting lifetime.</param>
/// <param name="logger">The logger.</param>
- public LimitedConcurrencyLibraryScheduler(IHostApplicationLifetime hostApplicationLifetime, ILogger<LimitedConcurrencyLibraryScheduler> logger)
+ /// <param name="serverConfigurationManager">The server configuration manager.</param>
+ public LimitedConcurrencyLibraryScheduler(
+ IHostApplicationLifetime hostApplicationLifetime,
+ ILogger<LimitedConcurrencyLibraryScheduler> logger,
+ IServerConfigurationManager serverConfigurationManager)
{
_hostApplicationLifetime = hostApplicationLifetime;
_logger = logger;
+ _serverConfigurationManager = serverConfigurationManager;
}
private void ScheduleTaskCleanup()
@@ -94,7 +100,7 @@
{
lock (_taskLock)
{
- var fanoutConcurrency = BaseItem.ConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
+ var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
_logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism);
for (int i = 0; i < parallelism; i++)
@@ -217,7 +223,7 @@
};
}).ToArray();
- if (BaseItem.ConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
+ if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
{
_logger.LogDebug("Process sequentially.");
try |
An alternate approach might be to use Channels to manage the queue and just have the concurrent number of workers as sleeping threads |
i thought about that too, even with the current code that would made things waaaay easier because i dont need to cleanup everything after running but it also hogs resources and i dont see why we should reserve those threads for something that only runs once a day. Channels are also a very nice pattern but IMO they are more suited for static usage. I think a channel is overkill here for this internal usage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on user testing we might need to change the default concurrency
This can be improved for future rc/release, but merging for now |
Fixes several issues with system overloads when there are multiple folders present in a library as each would spawn its own queue limited to the task limit.
The new scheduler uses a global limit and cleansup tasks afterwards.