diff options
Diffstat (limited to 'MatrixContentFilter/Services/AsyncActionQueues')
3 files changed, 165 insertions, 0 deletions
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs new file mode 100644 index 0000000..f4c559c --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs @@ -0,0 +1,38 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public abstract class AbstractAsyncActionQueue : BackgroundService { + private readonly ConcurrentStack<string> _recentIds = new(); + private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() { + SingleReader = true + }); + private static CancellationTokenSource _cts = new(); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public virtual async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + throw new NotImplementedException(); + } + + private async Task ProcessQueue() { + throw new NotImplementedException(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + Console.WriteLine("AbstractAsyncActionQueue waiting for new actions, this should never happen!"); + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs new file mode 100644 index 0000000..3d7c90d --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs @@ -0,0 +1,60 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public class FiFoAsyncActionQueue(ILogger<FiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue { + // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new(); + private readonly HashSet<string> _recentIds = new(); + private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() { + SingleReader = true + }); + private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + if (_recentIds.Contains(id)) { + logger.LogWarning("Duplicate action ID detected, ignoring action"); + return false; + } + await _queue.Writer.WriteAsync(action); + _recentIds.Add(id); + + if (_queue.Reader.Count > 100) { + logger.LogWarning("Action Queue is getting full, consider increasing the rate limit or exempting the bot!"); + } + + return true; + } + + private async Task ProcessQueue() { + await foreach (var task in _queue.Reader.ReadAllAsync()) { + await _semaphore.WaitAsync(); + _ = Task.Run(async () => { + try { + await task.Invoke(); + } + finally { + _semaphore.Release(); + } + }); + } + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + logger.LogWarning("Waiting for new actions, ProcessQueue returned early!"); + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs new file mode 100644 index 0000000..631cc74 --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs @@ -0,0 +1,67 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public class LiFoAsyncActionQueue(ILogger<LiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue { + // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new(); + private readonly HashSet<string> _recentIds = new(); + private readonly ConcurrentStack<(string Id, Func<Task> Action)> _queue = new(); + private static CancellationTokenSource _cts = new(); + private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + if (_recentIds.Contains(id)) { + logger.LogWarning("Duplicate action ID detected, ignoring action"); + return false; + } + + _queue.Push((id, action)); + _recentIds.Add(id); + _cts.Cancel(false); + + return true; + } + + private async Task ProcessQueue() { + // await foreach (var task in _queue2.Reader.ReadAllAsync()) { + while (_queue.TryPop(out var task)) { + await _semaphore.WaitAsync(); + _ = Task.Run(async () => { + try { + await task.Action.Invoke(); + _recentIds.Remove(task.Id); + } + finally { + _semaphore.Release(); + } + }); + } + + _cts.Dispose(); + _cts = new CancellationTokenSource(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + Console.WriteLine(GetType().Name + " waiting for new actions"); + try { + await Task.Delay(10000, _cts.Token); + } + catch (TaskCanceledException) { + Console.WriteLine(GetType().Name + " _cts cancelled"); + // ignore + } + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file |