blob: 7912cc523be64f24ef089bc09d36d89d8b0a1275 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
using System.Collections.Concurrent;
using LibMatrix.EventTypes.Spec;
using LibMatrix.RoomTypes;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace MatrixContentFilter.Services;
public class AsyncMessageQueue(ILogger<AsyncMessageQueue> logger, MatrixContentFilterConfiguration cfg) : BackgroundService {
private readonly ConcurrentQueue<(GenericRoom Room, RoomMessageEventContent Content)> _queue = new();
private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.LogMessages, cfg.ConcurrencyLimits.LogMessages);
public void EnqueueMessageAsync(GenericRoom room, RoomMessageEventContent content) {
_queue.Enqueue((room, content));
if (_queue.Count > 100) {
logger.LogWarning($"Message Queue is getting full (c={_queue.Count}), consider increasing the rate limit or exempting the bot!");
}
}
private async Task ProcessQueue() {
while (_queue.TryDequeue(out var message)) {
await _semaphore.WaitAsync();
_ = Task.Run(async () => {
try {
await message.Room.SendMessageEventAsync(message.Content);
}
finally {
_semaphore.Release();
}
});
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
while (!stoppingToken.IsCancellationRequested) {
await ProcessQueue();
await Task.Delay(1000, stoppingToken);
}
//clear backlog and exit
await ProcessQueue();
}
}
|