about summary refs log tree commit diff
path: root/MatrixContentFilter/Services/AsyncActionQueues
diff options
context:
space:
mode:
Diffstat (limited to 'MatrixContentFilter/Services/AsyncActionQueues')
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs38
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs60
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs67
3 files changed, 165 insertions, 0 deletions
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
new file mode 100644
index 0000000..f4c559c
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
@@ -0,0 +1,38 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public abstract class AbstractAsyncActionQueue : BackgroundService {
+    private readonly ConcurrentStack<string> _recentIds = new();
+    private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+        SingleReader = true
+    });
+    private static CancellationTokenSource _cts = new();
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public virtual async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        throw new NotImplementedException();
+    }
+
+    private async Task ProcessQueue() {
+        throw new NotImplementedException();
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            Console.WriteLine("AbstractAsyncActionQueue waiting for new actions, this should never happen!");
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..3d7c90d
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
@@ -0,0 +1,60 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class FiFoAsyncActionQueue(ILogger<FiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+    // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+    private readonly HashSet<string> _recentIds = new();
+    private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+        SingleReader = true
+    });
+    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        if (_recentIds.Contains(id)) {
+            logger.LogWarning("Duplicate action ID detected, ignoring action");
+            return false;
+        }
+        await _queue.Writer.WriteAsync(action);
+        _recentIds.Add(id);
+
+        if (_queue.Reader.Count > 100) {
+            logger.LogWarning("Action Queue is getting full, consider increasing the rate limit or exempting the bot!");
+        }
+
+        return true;
+    }
+
+    private async Task ProcessQueue() {
+        await foreach (var task in _queue.Reader.ReadAllAsync()) {
+            await _semaphore.WaitAsync();
+            _ = Task.Run(async () => {
+                try {
+                    await task.Invoke();
+                }
+                finally {
+                    _semaphore.Release();
+                }
+            });
+        }
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            logger.LogWarning("Waiting for new actions, ProcessQueue returned early!");
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..631cc74
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
@@ -0,0 +1,67 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class LiFoAsyncActionQueue(ILogger<LiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+    // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+    private readonly HashSet<string> _recentIds = new();
+    private readonly ConcurrentStack<(string Id, Func<Task> Action)> _queue = new();
+    private static CancellationTokenSource _cts = new();
+    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        if (_recentIds.Contains(id)) {
+            logger.LogWarning("Duplicate action ID detected, ignoring action");
+            return false;
+        }
+
+        _queue.Push((id, action));
+        _recentIds.Add(id);
+        _cts.Cancel(false);
+
+        return true;
+    }
+
+    private async Task ProcessQueue() {
+        // await foreach (var task in _queue2.Reader.ReadAllAsync()) {
+        while (_queue.TryPop(out var task)) {
+            await _semaphore.WaitAsync();
+            _ = Task.Run(async () => {
+                try {
+                    await task.Action.Invoke();
+                    _recentIds.Remove(task.Id);
+                }
+                finally {
+                    _semaphore.Release();
+                }
+            });
+        }
+
+        _cts.Dispose();
+        _cts = new CancellationTokenSource();
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            Console.WriteLine(GetType().Name + " waiting for new actions");
+            try {
+                await Task.Delay(10000, _cts.Token);
+            }
+            catch (TaskCanceledException) {
+                Console.WriteLine(GetType().Name + " _cts cancelled");
+                // ignore
+            }
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file