diff options
author | Rory& <root@rory.gay> | 2024-10-04 19:51:44 +0200 |
---|---|---|
committer | Rory& <root@rory.gay> | 2024-10-04 19:51:44 +0200 |
commit | c8f7ef7c1d2bd705a5442c0dc591b8e5a50673a5 (patch) | |
tree | 9b951c6e2c120ec370ce8318238aadbdda880a89 /MatrixContentFilter/Services | |
download | MatrixContentFilter-master.tar.xz |
Diffstat (limited to 'MatrixContentFilter/Services')
8 files changed, 634 insertions, 0 deletions
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs new file mode 100644 index 0000000..f4c559c --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs @@ -0,0 +1,38 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public abstract class AbstractAsyncActionQueue : BackgroundService { + private readonly ConcurrentStack<string> _recentIds = new(); + private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() { + SingleReader = true + }); + private static CancellationTokenSource _cts = new(); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public virtual async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + throw new NotImplementedException(); + } + + private async Task ProcessQueue() { + throw new NotImplementedException(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + Console.WriteLine("AbstractAsyncActionQueue waiting for new actions, this should never happen!"); + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs new file mode 100644 index 0000000..3d7c90d --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs @@ -0,0 +1,60 @@ +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public class FiFoAsyncActionQueue(ILogger<FiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue { + // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new(); + private readonly HashSet<string> _recentIds = new(); + private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() { + SingleReader = true + }); + private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + if (_recentIds.Contains(id)) { + logger.LogWarning("Duplicate action ID detected, ignoring action"); + return false; + } + await _queue.Writer.WriteAsync(action); + _recentIds.Add(id); + + if (_queue.Reader.Count > 100) { + logger.LogWarning("Action Queue is getting full, consider increasing the rate limit or exempting the bot!"); + } + + return true; + } + + private async Task ProcessQueue() { + await foreach (var task in _queue.Reader.ReadAllAsync()) { + await _semaphore.WaitAsync(); + _ = Task.Run(async () => { + try { + await task.Invoke(); + } + finally { + _semaphore.Release(); + } + }); + } + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + logger.LogWarning("Waiting for new actions, ProcessQueue returned early!"); + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs new file mode 100644 index 0000000..631cc74 --- /dev/null +++ b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs @@ -0,0 +1,67 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services.AsyncActionQueues; + +public class LiFoAsyncActionQueue(ILogger<LiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue { + // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new(); + private readonly HashSet<string> _recentIds = new(); + private readonly ConcurrentStack<(string Id, Func<Task> Action)> _queue = new(); + private static CancellationTokenSource _cts = new(); + private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions); + + /// <summary> + /// Enqueue an action to be executed asynchronously + /// </summary> + /// <param name="id">Reproducible ID</param> + /// <param name="action">Action to execute</param> + /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns> + public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) { + if (_recentIds.Contains(id)) { + logger.LogWarning("Duplicate action ID detected, ignoring action"); + return false; + } + + _queue.Push((id, action)); + _recentIds.Add(id); + _cts.Cancel(false); + + return true; + } + + private async Task ProcessQueue() { + // await foreach (var task in _queue2.Reader.ReadAllAsync()) { + while (_queue.TryPop(out var task)) { + await _semaphore.WaitAsync(); + _ = Task.Run(async () => { + try { + await task.Action.Invoke(); + _recentIds.Remove(task.Id); + } + finally { + _semaphore.Release(); + } + }); + } + + _cts.Dispose(); + _cts = new CancellationTokenSource(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + Console.WriteLine(GetType().Name + " waiting for new actions"); + try { + await Task.Delay(10000, _cts.Token); + } + catch (TaskCanceledException) { + Console.WriteLine(GetType().Name + " _cts cancelled"); + // ignore + } + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/AsyncMessageQueue.cs b/MatrixContentFilter/Services/AsyncMessageQueue.cs new file mode 100644 index 0000000..7912cc5 --- /dev/null +++ b/MatrixContentFilter/Services/AsyncMessageQueue.cs @@ -0,0 +1,43 @@ +using System.Collections.Concurrent; +using LibMatrix.EventTypes.Spec; +using LibMatrix.RoomTypes; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services; + +public class AsyncMessageQueue(ILogger<AsyncMessageQueue> logger, MatrixContentFilterConfiguration cfg) : BackgroundService { + private readonly ConcurrentQueue<(GenericRoom Room, RoomMessageEventContent Content)> _queue = new(); + private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.LogMessages, cfg.ConcurrencyLimits.LogMessages); + public void EnqueueMessageAsync(GenericRoom room, RoomMessageEventContent content) { + _queue.Enqueue((room, content)); + + if (_queue.Count > 100) { + logger.LogWarning($"Message Queue is getting full (c={_queue.Count}), consider increasing the rate limit or exempting the bot!"); + } + } + + private async Task ProcessQueue() { + while (_queue.TryDequeue(out var message)) { + await _semaphore.WaitAsync(); + _ = Task.Run(async () => { + try { + await message.Room.SendMessageEventAsync(message.Content); + } + finally { + _semaphore.Release(); + } + }); + } + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + while (!stoppingToken.IsCancellationRequested) { + await ProcessQueue(); + await Task.Delay(1000, stoppingToken); + } + + //clear backlog and exit + await ProcessQueue(); + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/BotModeSanityCheckService.cs b/MatrixContentFilter/Services/BotModeSanityCheckService.cs new file mode 100644 index 0000000..55fe9e8 --- /dev/null +++ b/MatrixContentFilter/Services/BotModeSanityCheckService.cs @@ -0,0 +1,56 @@ +using System.Diagnostics; +using ArcaneLibs; +using ArcaneLibs.Extensions; +using LibMatrix; +using LibMatrix.Filters; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.Responses; +using MatrixContentFilter.Abstractions; +using MatrixContentFilter.Handlers.Filters; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services; + +public class BotModeSanityCheckService( + ILogger<BotModeSanityCheckService> logger, + AuthenticatedHomeserverGeneric hs, + ConfigurationService filterConfigService, + IEnumerable<IContentFilter> filters, + AsyncMessageQueue msgQueue +) : BackgroundService { + /// <summary>Triggered when the application host is ready to start the service.</summary> + /// <param name="cancellationToken">Indicates that the start process has been aborted.</param> + protected override async Task ExecuteAsync(CancellationToken cancellationToken) { + while (!cancellationToken.IsCancellationRequested) { + await Task.Delay(10000, cancellationToken); + var rooms = await hs.GetJoinedRooms(); + rooms.RemoveAll(x => x.RoomId == filterConfigService.LogRoom.RoomId); + rooms.RemoveAll(x => x.RoomId == filterConfigService.ControlRoom.RoomId); + + var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000); + var timelines = rooms.Select(async x => { + var room = hs.GetRoom(x.RoomId); + // var sync = await room.GetMessagesAsync(null, 1500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode()); + var iter = room.GetManyMessagesAsync(null, 5000, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode(), chunkSize: 250); + await foreach (var sync in iter) { + var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => { + try { + Console.WriteLine("Processing filter {0} (sanity check, chunk[s={1}])", filter.GetType().FullName, sync.Chunk.Count); + await filter.ProcessEventListAsync(sync.Chunk); + } + catch (Exception e) { + logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName); + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice") + .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build()); + } + }); + + await tasks; + } + }).ToList(); + await Task.WhenAll(timelines); + } + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/ConfigurationService.cs b/MatrixContentFilter/Services/ConfigurationService.cs new file mode 100644 index 0000000..f83c89a --- /dev/null +++ b/MatrixContentFilter/Services/ConfigurationService.cs @@ -0,0 +1,198 @@ +using ArcaneLibs.Extensions; +using LibMatrix; +using LibMatrix.EventTypes.Spec.State; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.Responses; +using LibMatrix.RoomTypes; +using LibMatrix.Utilities; +using MatrixContentFilter.EventTypes; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services; + +public class ConfigurationService(ILogger<ConfigurationService> logger, AuthenticatedHomeserverGeneric hs, AsyncMessageQueue msgQueue) : BackgroundService { + public BotEnvironmentConfiguration EnvironmentConfiguration { get; private set; } + public FilterConfiguration DefaultConfiguration { get; private set; } + public Dictionary<string, FilterConfiguration> RoomConfigurationOverrides { get; } = new(); + public Dictionary<string, FilterConfiguration> FinalRoomConfigurations { get; } = new(); + + public GenericRoom LogRoom { get; private set; } = null!; + public GenericRoom ControlRoom { get; private set; } = null!; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + var syncHelper = new SyncHelper(hs, logger) { + NamedFilterName = CommonSyncFilters.GetAccountDataWithRooms + }; + + await foreach (var sync in syncHelper.EnumerateSyncAsync(stoppingToken).WithCancellation(stoppingToken)) { + if (sync is { AccountData: null, Rooms: null }) continue; + logger.LogInformation("Received configuration update: {syncData}", sync.ToJson(ignoreNull: true)); + await OnSyncReceived(sync); + } + } + + public async Task OnSyncReceived(SyncResponse sync) { + if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == BotEnvironmentConfiguration.EventId) is { } envEvent) { + EnvironmentConfiguration = envEvent.TypedContent as BotEnvironmentConfiguration; + msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice") + .WithColoredBody("#FF0088", "Environment configuration updated from sync.").WithNewline() + .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(EnvironmentConfiguration.ToJson(), "json")) + // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(envEvent.ToJson(), "json")) + .Build()); + LogRoom = hs.GetRoom(EnvironmentConfiguration.LogRoomId!); + ControlRoom = hs.GetRoom(EnvironmentConfiguration.ControlRoomId!); + } + + if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } filterEvent) { + DefaultConfiguration = filterEvent.TypedContent as FilterConfiguration; + msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice") + .WithColoredBody("#00FF88", "Default filter configuration updated from sync.").WithNewline() + .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(DefaultConfiguration.ToJson(), "json")) + // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(filterEvent.ToJson(), "json")) + .Build()); + } + + await Parallel.ForEachAsync(sync.Rooms?.Join ?? [], async (syncRoom, ct) => { + var (roomId, roomData) = syncRoom; + if (roomId == LogRoom!.RoomId || roomId == ControlRoom!.RoomId) return; + var room = hs.GetRoom(roomId); + + if (roomData.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } roomFilterEvent) { + RoomConfigurationOverrides[roomId] = roomFilterEvent.TypedContent as FilterConfiguration; + var roomName = await room.GetNameOrFallbackAsync(); + msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice") + .WithColoredBody("#00FF88", msb => msb.WithBody($"Filter configuration updated for ").WithMention(roomId, roomName).WithBody(" from sync.")).WithNewline() + .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(RoomConfigurationOverrides[roomId].ToJson(), "json")) + .Build()); + } + }); + } + + public async Task OnStartup(MatrixContentFilterConfiguration configuration) { + BotEnvironmentConfiguration _environmentConfiguration; + try { + _environmentConfiguration = await hs.GetAccountDataAsync<BotEnvironmentConfiguration>(BotEnvironmentConfiguration.EventId); + } + catch (MatrixException e) { + if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw; + logger.LogWarning("No environment configuration found, creating one"); + _environmentConfiguration = new BotEnvironmentConfiguration(); + } + + if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) { + LogRoom = await hs.CreateRoom(new() { + Name = "MatrixContentFilter logs", + Invite = configuration.Admins, + Visibility = "private" + }); + var powerlevels = await LogRoom.GetPowerLevelsAsync(); + powerlevels.EventsDefault = 20; + foreach (var admin in configuration.Admins) { + powerlevels.Users[admin] = 100; + } + + await LogRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels); + + _environmentConfiguration.LogRoomId = LogRoom.RoomId; + await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration); + } + else { + LogRoom = hs.GetRoom(_environmentConfiguration.LogRoomId!); + } + + if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) { + ControlRoom = await hs.CreateRoom(new() { + Name = "MatrixContentFilter control room", + Invite = configuration.Admins, + Visibility = "private" + }); + var powerlevels = await ControlRoom.GetPowerLevelsAsync(); + powerlevels.EventsDefault = 20; + foreach (var admin in configuration.Admins) { + powerlevels.Users[admin] = 100; + } + + await ControlRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels); + + _environmentConfiguration.ControlRoomId = ControlRoom.RoomId; + await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration); + } + else { + ControlRoom = hs.GetRoom(_environmentConfiguration.ControlRoomId!); + } + + FilterConfiguration _filterConfiguration; + try { + _filterConfiguration = await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId); + } + catch (MatrixException e) { + if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw; + logger.LogWarning("No filter configuration found, creating one"); + msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "No filter configuration found, creating one").Build()); + _filterConfiguration = new FilterConfiguration(); + } + + Dictionary<string, object> changes = new(); + + T Log<T>(string key, T value) { + changes[key] = value; + return value; + } + + _filterConfiguration.IgnoredUsers ??= Log("ignored_users", (List<string>) [ + hs.WhoAmI.UserId, + .. configuration.Admins + ]); + + _filterConfiguration.FileFilter ??= new(); + _filterConfiguration.FileFilter.IgnoredUsers ??= Log("file_filter->ignored_users", (List<string>) []); + _filterConfiguration.FileFilter.Allowed ??= Log("file_filter->allowed", false); + + _filterConfiguration.ImageFilter ??= new(); + _filterConfiguration.ImageFilter.IgnoredUsers ??= Log("image_filter->ignored_users", (List<string>) []); + _filterConfiguration.ImageFilter.Allowed ??= Log("image_filter->allowed", false); + + _filterConfiguration.VideoFilter ??= new(); + _filterConfiguration.VideoFilter.IgnoredUsers ??= Log("video_filter->ignored_users", (List<string>) []); + _filterConfiguration.VideoFilter.Allowed ??= Log("video_filter->allowed", false); + + _filterConfiguration.AudioFilter ??= new(); + _filterConfiguration.AudioFilter.IgnoredUsers ??= Log("audio_filter->ignored_users", (List<string>) []); + _filterConfiguration.AudioFilter.Allowed ??= Log("audio_filter->allowed", false); + + _filterConfiguration.UrlFilter ??= new(); + _filterConfiguration.UrlFilter.IgnoredUsers ??= Log("url_filter->ignored_users", (List<string>) []); + _filterConfiguration.UrlFilter.Allowed ??= Log("url_filter->allowed", false); + + if (changes.Count > 0) { + await hs.SetAccountDataAsync(FilterConfiguration.EventId, _filterConfiguration); + msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "Default filter configuration updated").WithNewline() + .WithTable(msb => { + msb = msb.WithTitle("Default configuration changes", 2); + + foreach (var (key, value) in changes) { + var formattedValue = value switch { + List<string> list => string.Join(", ", list), + _ => value.ToString() + }; + msb = msb.WithRow(rb => { rb.WithCell(key).WithCell(formattedValue ?? "formattedValue was null!"); }); + } + }).Build()); + } + } + + private async Task RebuildRoomConfigurations(FilterConfiguration? defaultConfig, Dictionary<string, FilterConfiguration?>? roomConfigurations) { + defaultConfig ??= await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId); + } + + public async Task<FilterConfiguration> GetFinalRoomConfiguration(string roomId) { + if (FinalRoomConfigurations.TryGetValue(roomId, out var config)) return config; + var roomConfig = RoomConfigurationOverrides.GetValueOrDefault(roomId); + var defaultConfig = DefaultConfiguration; + + FinalRoomConfigurations[roomId] = config; + return config; + } +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/InfoCacheService.cs b/MatrixContentFilter/Services/InfoCacheService.cs new file mode 100644 index 0000000..974e873 --- /dev/null +++ b/MatrixContentFilter/Services/InfoCacheService.cs @@ -0,0 +1,29 @@ +using ArcaneLibs.Collections; +using LibMatrix.EventTypes.Spec.State; +using LibMatrix.Homeservers; + +namespace MatrixContentFilter.Services; + +public class InfoCacheService(AuthenticatedHomeserverGeneric hs) { + private static readonly ExpiringSemaphoreCache<string> DisplayNameCache = new(); + public static readonly ExpiringSemaphoreCache<string> RoomNameCache = new(); + + public async Task<string> GetDisplayNameAsync(string roomId, string userId) => + await DisplayNameCache.GetOrAdd($"{roomId}\t{userId}", async () => { + var room = hs.GetRoom(roomId); + var userState = await room.GetStateAsync<RoomMemberEventContent>(RoomMemberEventContent.EventId, userId); + if (!string.IsNullOrWhiteSpace(userState?.DisplayName)) return userState.DisplayName; + + var user = await hs.GetProfileAsync(userId); + if (!string.IsNullOrWhiteSpace(user?.DisplayName)) return user.DisplayName; + + return userId; + }, TimeSpan.FromMinutes(5)); + + public async Task<string> GetRoomNameAsync(string roomId) => + await RoomNameCache.GetOrAdd(roomId, async () => { + var room = hs.GetRoom(roomId); + var name = await room.GetNameOrFallbackAsync(); + return name; + }, TimeSpan.FromMinutes(30)); +} \ No newline at end of file diff --git a/MatrixContentFilter/Services/MatrixContentFilterBot.cs b/MatrixContentFilter/Services/MatrixContentFilterBot.cs new file mode 100644 index 0000000..321cdd4 --- /dev/null +++ b/MatrixContentFilter/Services/MatrixContentFilterBot.cs @@ -0,0 +1,143 @@ +using System.Diagnostics; +using ArcaneLibs; +using ArcaneLibs.Extensions; +using LibMatrix.Filters; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.Responses; +using MatrixContentFilter.Abstractions; +using MatrixContentFilter.Handlers.Filters; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MatrixContentFilter.Services; + +public class MatrixContentFilterBot( + ILogger<MatrixContentFilterBot> logger, + AuthenticatedHomeserverGeneric hs, + MatrixContentFilterConfiguration configuration, + ConfigurationService filterConfigService, + IEnumerable<IContentFilter> filters, + AsyncMessageQueue msgQueue +) : BackgroundService { + /// <summary>Triggered when the application host is ready to start the service.</summary> + /// <param name="cancellationToken">Indicates that the start process has been aborted.</param> + protected override async Task ExecuteAsync(CancellationToken cancellationToken) { + try { + await filterConfigService.OnStartup(configuration); + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", "Bot startup successful! Listening for events.") + .Build()); + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", msb => { + msb = msb.WithBody("Inserted filters implementations (internal):").WithNewline(); + foreach (var filter in filters) { + msb = msb.WithBody(filter.GetType().FullName).WithNewline(); + } + }).Build()); + } + catch (Exception e) { + logger.LogError(e, "Error on startup"); + Environment.Exit(1); // We don't want to do a graceful shutdown if we can't start up + } + + logger.LogInformation("Bot started!"); + await Run(cancellationToken); + } + + private SyncHelper syncHelper; + + private async Task Run(CancellationToken cancellationToken) { + var syncFilter = new SyncFilter() { + Room = new() { + NotRooms = [filterConfigService.LogRoom.RoomId], + Timeline = new(notTypes: ["m.room.redaction"]) + } + }; + syncHelper = new SyncHelper(hs, logger) { + Filter = syncFilter + }; + int i = 0; + await foreach (var sync in syncHelper.EnumerateSyncAsync(cancellationToken).WithCancellation(cancellationToken)) { + // if (i++ >= 100) { + // var sw = Stopwatch.StartNew(); + // for (int gen = 0; gen < GC.MaxGeneration; gen++) { + // GC.Collect(gen, GCCollectionMode.Forced, true, true); + // } + // i = 0; + // msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice") + // .WithBody($"Garbage collection took {sw.ElapsedMilliseconds}ms") + // .Build()); + // GC. + // } + + // GC.TryStartNoGCRegion(1024 * 1024 * 1024); + var sw = Stopwatch.StartNew(); + int actionCount = filters.Sum(x => x.ActionCount); + try { + await OnSyncReceived(sync); + } + catch (Exception e) { + logger.LogError(e, "Error processing sync"); + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithBody($"Error processing sync: {e.Message}").Build()); + } + finally { + Console.WriteLine("Processed sync in {0}, executed {1} actions, {2} of memory usage", sw.Elapsed, filters.Sum(x => x.ActionCount) - actionCount, Util.BytesToString(Environment.WorkingSet)); + // GC.EndNoGCRegion(); + } + + // update sync filter + if (syncFilter.Room.NotRooms[0] != filterConfigService.LogRoom.RoomId) { + syncFilter.Room.NotRooms = [filterConfigService.LogRoom.RoomId]; + syncHelper.Filter = syncFilter; + } + } + } + + private int _syncCount; + + private async Task OnSyncReceived(SyncResponse sync) { + if (_syncCount++ == 0) return; // Skip initial sync :/ + + if (sync.Rooms?.Join?.ContainsKey(filterConfigService.LogRoom.RoomId) == true) { + sync.Rooms?.Join?.Remove(filterConfigService.LogRoom.RoomId); + } + + if (sync.Rooms?.Join?.ContainsKey(filterConfigService.ControlRoom.RoomId) == true) { + sync.Rooms?.Join?.Remove(filterConfigService.ControlRoom.RoomId); + } + + // HACK: Server likes to send partial timelines during elevated activity, so we need to fetch them in order not to miss events + var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000); + var limitedTimelineRooms = sync.Rooms?.Join? + .Where(x => x.Value.Timeline?.Limited ?? false) + .Select(async x => { + var (roomId, roomData) = x; + var room = hs.GetRoom(roomId); + if (roomData.Timeline?.Limited == true) { + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice") + .WithColoredBody("FF0000", $"Room {roomId} has limited timeline, fetching! The room may be getting spammed?") + .Build()); + roomData.Timeline.Events ??= []; + var newEvents = await room.GetMessagesAsync(roomData.Timeline.PrevBatch ?? "", 500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false)); + roomData.Timeline.Events.MergeBy(newEvents.Chunk, (x, y) => x.EventId == y.EventId, (x, y) => { }); + } + }) + .ToList(); + + if (limitedTimelineRooms?.Count > 0) + await Task.WhenAll(limitedTimelineRooms); + + var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => { + try { + Console.WriteLine("Processing filter {0}", filter.GetType().FullName); + await filter.ProcessSyncAsync(sync); + } + catch (Exception e) { + logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName); + msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice") + .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build()); + } + }); + + await tasks; + } +} \ No newline at end of file |