about summary refs log tree commit diff
path: root/MatrixContentFilter/Services/AsyncMessageQueue.cs
blob: 7912cc523be64f24ef089bc09d36d89d8b0a1275 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
using System.Collections.Concurrent;
using LibMatrix.EventTypes.Spec;
using LibMatrix.RoomTypes;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace MatrixContentFilter.Services;

public class AsyncMessageQueue(ILogger<AsyncMessageQueue> logger, MatrixContentFilterConfiguration cfg) : BackgroundService {
    private readonly ConcurrentQueue<(GenericRoom Room, RoomMessageEventContent Content)> _queue = new();
    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.LogMessages, cfg.ConcurrencyLimits.LogMessages);
    public void EnqueueMessageAsync(GenericRoom room, RoomMessageEventContent content) {
        _queue.Enqueue((room, content));
        
        if (_queue.Count > 100) {
            logger.LogWarning($"Message Queue is getting full (c={_queue.Count}), consider increasing the rate limit or exempting the bot!");
        }
    }
    
    private async Task ProcessQueue() {
        while (_queue.TryDequeue(out var message)) {
            await _semaphore.WaitAsync();
            _ = Task.Run(async () => {
                try {
                    await message.Room.SendMessageEventAsync(message.Content);
                }
                finally {
                    _semaphore.Release();
                }
            });
        }
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
        while (!stoppingToken.IsCancellationRequested) {
            await ProcessQueue();
            await Task.Delay(1000, stoppingToken);
        }
        
        //clear backlog and exit
        await ProcessQueue();
    }
}