1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
using System.Diagnostics;
using ArcaneLibs;
using ArcaneLibs.Extensions;
using LibMatrix;
using LibMatrix.Filters;
using LibMatrix.Helpers;
using LibMatrix.Homeservers;
using LibMatrix.Responses;
using MatrixContentFilter.Abstractions;
using MatrixContentFilter.Handlers.Filters;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace MatrixContentFilter.Services;
public class BotModeSanityCheckService(
ILogger<BotModeSanityCheckService> logger,
AuthenticatedHomeserverGeneric hs,
ConfigurationService filterConfigService,
IEnumerable<IContentFilter> filters,
AsyncMessageQueue msgQueue
) : BackgroundService {
/// <summary>Triggered when the application host is ready to start the service.</summary>
/// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
await Task.Delay(10000, cancellationToken);
var rooms = await hs.GetJoinedRooms();
rooms.RemoveAll(x => x.RoomId == filterConfigService.LogRoom.RoomId);
rooms.RemoveAll(x => x.RoomId == filterConfigService.ControlRoom.RoomId);
var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000);
var timelines = rooms.Select(async x => {
var room = hs.GetRoom(x.RoomId);
// var sync = await room.GetMessagesAsync(null, 1500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode());
var iter = room.GetManyMessagesAsync(null, 5000, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode(), chunkSize: 250);
await foreach (var sync in iter) {
var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => {
try {
Console.WriteLine("Processing filter {0} (sanity check, chunk[s={1}])", filter.GetType().FullName, sync.Chunk.Count);
await filter.ProcessEventListAsync(sync.Chunk);
}
catch (Exception e) {
logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName);
msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
.WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build());
}
});
await tasks;
}
}).ToList();
await Task.WhenAll(timelines);
}
}
}
|