Skip to content

Conversation

JPVenson
Copy link
Member

Fixes several issues with system overloads when there are multiple folders present in a library as each would spawn its own queue limited to the task limit.

The new scheduler uses a global limit and cleansup tasks afterwards.

@JPVenson JPVenson requested review from Shadowghost, crobibero and a team June 11, 2025 13:13
@crobibero
Copy link
Member

I haven't tested this but it looks like this has a cts leak (+ a few other tweaks)

Index: MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
--- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs	(revision a2fb1f63d9d30e09de4244e2b6085bb6ec0e75b6)
+++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs	(date 1749650016869)
@@ -13,19 +13,16 @@
 /// <summary>
 /// Provides Parallel action interface to process tasks with a set concurrency level.
 /// </summary>
-public class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler
+public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IDisposable
 {
     private readonly IHostApplicationLifetime _hostApplicationLifetime;
     private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
     private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
-
-    private static AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
-
-    /// <summary>
-    /// Gets used to lock all operations on the Tasks queue and creating workers.
-    /// </summary>
+    private readonly BlockingCollection<TaskQueueItem> _tasks = new();
     private readonly Lock _taskLock = new();
 
+    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
+
     private Task? _cleanupTask;
 
     /// <summary>
@@ -39,8 +36,6 @@
         _logger = logger;
     }
 
-    private BlockingCollection<TaskQueueItem> Tasks { get; set; } = new();
-
     private void ScheduleTaskCleanup()
     {
         lock (_taskLock)
@@ -57,13 +52,13 @@
         void RunCleanupTask()
         {
             _cleanupTask = Task.Delay(TimeSpan.FromSeconds(10)).ContinueWith(
-                t =>
+                _ =>
                 {
                     lock (_taskLock)
                     {
-                        if (Tasks.Count > 0)
+                        if (_tasks.Count > 0)
                         {
-                            // tasks are still there so its still in use. Reschedule cleanup task.
+                            // tasks are still there so it is still in use. Reschedule cleanup task.
                             // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
                             RunCleanupTask();
                             return;
@@ -88,8 +83,7 @@
             var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
             for (int i = 0; i < parallelism; i++)
             {
-                var stopToken = new CancellationTokenSource();
-                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
+                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(new CancellationTokenSource().Token, _hostApplicationLifetime.ApplicationStopping);
                 _taskRunners.Add(
                     combinedSource,
                     Task.Factory.StartNew(
@@ -108,19 +102,20 @@
         _deadlockDetector.Value = stopToken;
         try
         {
-            foreach (var item in Tasks.GetConsumingEnumerable(stopToken.Token))
+            foreach (var item in _tasks.GetConsumingEnumerable(stopToken.Token))
             {
                 await ProcessItem(item).ConfigureAwait(false);
             }
         }
         catch (OperationCanceledException) when (stopToken.IsCancellationRequested)
         {
-            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
+            // Task was cancelled, nothing to do.
         }
         finally
         {
-            _deadlockDetector.Value = default!;
+            _deadlockDetector.Value = null!;
             _taskRunners.Remove(stopToken);
+            stopToken.Dispose();
         }
     }
 
@@ -130,15 +125,15 @@
         {
             if (item.CancellationToken.IsCancellationRequested)
             {
-                // if item is cancled, just skip it
+                // if item is cancelled, just skip it
                 return;
             }
 
             await item.Worker(item.Data).ConfigureAwait(true);
         }
-        catch (System.Exception ex)
+        catch (Exception ex)
         {
-            _logger.LogError(ex, "Error while performing a library operation.");
+            _logger.LogError(ex, "Error while performing a library operation");
         }
         finally
         {
@@ -167,7 +162,7 @@
                     {
                         // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
                         var innerPercentRounded = Math.Round(innerPercent);
-                        if (queueItem!.ProgressValue != innerPercentRounded)
+                        if (queueItem.ProgressValue != innerPercentRounded)
                         {
                             queueItem.ProgressValue = innerPercentRounded;
                             UpdateProgress();
@@ -181,13 +176,13 @@
         for (var i = 0; i < workItems.Length; i++)
         {
             var item = workItems[i]!;
-            Tasks.Add(item, CancellationToken.None);
+            _tasks.Add(item, CancellationToken.None);
         }
 
         if (_deadlockDetector.Value is not null)
         {
-            // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achived
-            while (workItems.Any(e => !e.Done.Task.IsCompleted) && Tasks.TryTake(out var item, 0, _deadlockDetector.Value.Token))
+            // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
+            while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 0, _deadlockDetector.Value.Token))
             {
                 await ProcessItem(item).ConfigureAwait(false);
             }
@@ -200,6 +195,13 @@
         }
     }
 
+    /// <inheritdoc />
+    public void Dispose()
+    {
+        _tasks.Dispose();
+        _cleanupTask?.Dispose();
+    }
+
     private class TaskQueueItem
     {
         public required object Data { get; init; }

cvium
cvium previously requested changes Jun 11, 2025
@JPVenson
Copy link
Member Author

yea good catch on the CTS, totally overlooked that one

JPVenson and others added 2 commits June 11, 2025 17:24
…yLibraryScheduler.cs

Co-authored-by: Cody Robibero <cody@robibe.ro>
Copy link

github-actions bot commented Jun 11, 2025

No changes to OpenAPI specification found. See history of this comment for previous changes.

@JPVenson JPVenson requested a review from crobibero June 11, 2025 16:46
@Shadowghost Shadowghost requested a review from a team June 11, 2025 18:41
Copy link
Member

@crobibero crobibero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a spelling pass, testing is later

@crobibero
Copy link
Member

Seems to be working fine for me, but we shouldn't add more usages to the BaseItem.ConfigurationManager if we don't absolutely have to

Index: MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs
--- a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs	(revision 3ea69ff0f219af32f41c507710679f38bac4dd1c)
+++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs	(date 1749917894574)
@@ -5,7 +5,7 @@
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
-using MediaBrowser.Controller.Entities;
+using MediaBrowser.Controller.Configuration;
 using Microsoft.Extensions.Hosting;
 using Microsoft.Extensions.Logging;
 
@@ -19,6 +19,7 @@
     private const int CleanupGracePeriod = 60;
     private readonly IHostApplicationLifetime _hostApplicationLifetime;
     private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
+    private readonly IServerConfigurationManager _serverConfigurationManager;
     private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
 
     private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
@@ -39,10 +40,15 @@
     /// </summary>
     /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
     /// <param name="logger">The logger.</param>
-    public LimitedConcurrencyLibraryScheduler(IHostApplicationLifetime hostApplicationLifetime, ILogger<LimitedConcurrencyLibraryScheduler> logger)
+    /// <param name="serverConfigurationManager">The server configuration manager.</param>
+    public LimitedConcurrencyLibraryScheduler(
+        IHostApplicationLifetime hostApplicationLifetime,
+        ILogger<LimitedConcurrencyLibraryScheduler> logger,
+        IServerConfigurationManager serverConfigurationManager)
     {
         _hostApplicationLifetime = hostApplicationLifetime;
         _logger = logger;
+        _serverConfigurationManager = serverConfigurationManager;
     }
 
     private void ScheduleTaskCleanup()
@@ -94,7 +100,7 @@
     {
         lock (_taskLock)
         {
-            var fanoutConcurrency = BaseItem.ConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
+            var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
             var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
             _logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism);
             for (int i = 0; i < parallelism; i++)
@@ -217,7 +223,7 @@
             };
         }).ToArray();
 
-        if (BaseItem.ConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
+        if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
         {
             _logger.LogDebug("Process sequentially.");
             try

@crobibero
Copy link
Member

An alternate approach might be to use Channels to manage the queue and just have the concurrent number of workers as sleeping threads

@JPVenson
Copy link
Member Author

An alternate approach might be to use Channels to manage the queue and just have the concurrent number of workers as sleeping threads

i thought about that too, even with the current code that would made things waaaay easier because i dont need to cleanup everything after running but it also hogs resources and i dont see why we should reserve those threads for something that only runs once a day.
There are many better patterns, like having scoped scheduler runners for operations or more interactive sharing of resources which i all want to utilise but they require much more effort and intigration into the code and i am not willing to touch more code in the RC then absolutly nessesary.

Channels are also a very nice pattern but IMO they are more suited for static usage. I think a channel is overkill here for this internal usage.

@crobibero crobibero requested a review from cvium June 14, 2025 17:50
Copy link
Contributor

@Shadowghost Shadowghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on user testing we might need to change the default concurrency

@crobibero
Copy link
Member

This can be improved for future rc/release, but merging for now

@crobibero crobibero merged commit 0e1be6c into jellyfin:master Jun 15, 2025
18 checks passed
@JPVenson JPVenson linked an issue Jul 8, 2025 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Jellyfin creates enormous amounts of processes
5 participants