diff --git a/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
new file mode 100644
index 0000000..f4c559c
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
@@ -0,0 +1,38 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public abstract class AbstractAsyncActionQueue : BackgroundService {
+ private readonly ConcurrentStack<string> _recentIds = new();
+ private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+ SingleReader = true
+ });
+ private static CancellationTokenSource _cts = new();
+
+ /// <summary>
+ /// Enqueue an action to be executed asynchronously
+ /// </summary>
+ /// <param name="id">Reproducible ID</param>
+ /// <param name="action">Action to execute</param>
+ /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+ public virtual async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+ throw new NotImplementedException();
+ }
+
+ private async Task ProcessQueue() {
+ throw new NotImplementedException();
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+ while (!stoppingToken.IsCancellationRequested) {
+ await ProcessQueue();
+ Console.WriteLine("AbstractAsyncActionQueue waiting for new actions, this should never happen!");
+ }
+
+ //clear backlog and exit
+ await ProcessQueue();
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..3d7c90d
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
@@ -0,0 +1,60 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class FiFoAsyncActionQueue(ILogger<FiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+ // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+ private readonly HashSet<string> _recentIds = new();
+ private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+ SingleReader = true
+ });
+ private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+ /// <summary>
+ /// Enqueue an action to be executed asynchronously
+ /// </summary>
+ /// <param name="id">Reproducible ID</param>
+ /// <param name="action">Action to execute</param>
+ /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+ public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+ if (_recentIds.Contains(id)) {
+ logger.LogWarning("Duplicate action ID detected, ignoring action");
+ return false;
+ }
+ await _queue.Writer.WriteAsync(action);
+ _recentIds.Add(id);
+
+ if (_queue.Reader.Count > 100) {
+ logger.LogWarning("Action Queue is getting full, consider increasing the rate limit or exempting the bot!");
+ }
+
+ return true;
+ }
+
+ private async Task ProcessQueue() {
+ await foreach (var task in _queue.Reader.ReadAllAsync()) {
+ await _semaphore.WaitAsync();
+ _ = Task.Run(async () => {
+ try {
+ await task.Invoke();
+ }
+ finally {
+ _semaphore.Release();
+ }
+ });
+ }
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+ while (!stoppingToken.IsCancellationRequested) {
+ await ProcessQueue();
+ logger.LogWarning("Waiting for new actions, ProcessQueue returned early!");
+ }
+
+ //clear backlog and exit
+ await ProcessQueue();
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..631cc74
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
@@ -0,0 +1,67 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class LiFoAsyncActionQueue(ILogger<LiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+ // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+ private readonly HashSet<string> _recentIds = new();
+ private readonly ConcurrentStack<(string Id, Func<Task> Action)> _queue = new();
+ private static CancellationTokenSource _cts = new();
+ private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+ /// <summary>
+ /// Enqueue an action to be executed asynchronously
+ /// </summary>
+ /// <param name="id">Reproducible ID</param>
+ /// <param name="action">Action to execute</param>
+ /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+ public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+ if (_recentIds.Contains(id)) {
+ logger.LogWarning("Duplicate action ID detected, ignoring action");
+ return false;
+ }
+
+ _queue.Push((id, action));
+ _recentIds.Add(id);
+ _cts.Cancel(false);
+
+ return true;
+ }
+
+ private async Task ProcessQueue() {
+ // await foreach (var task in _queue2.Reader.ReadAllAsync()) {
+ while (_queue.TryPop(out var task)) {
+ await _semaphore.WaitAsync();
+ _ = Task.Run(async () => {
+ try {
+ await task.Action.Invoke();
+ _recentIds.Remove(task.Id);
+ }
+ finally {
+ _semaphore.Release();
+ }
+ });
+ }
+
+ _cts.Dispose();
+ _cts = new CancellationTokenSource();
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+ while (!stoppingToken.IsCancellationRequested) {
+ await ProcessQueue();
+ Console.WriteLine(GetType().Name + " waiting for new actions");
+ try {
+ await Task.Delay(10000, _cts.Token);
+ }
+ catch (TaskCanceledException) {
+ Console.WriteLine(GetType().Name + " _cts cancelled");
+ // ignore
+ }
+ }
+
+ //clear backlog and exit
+ await ProcessQueue();
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncMessageQueue.cs b/MatrixContentFilter/Services/AsyncMessageQueue.cs
new file mode 100644
index 0000000..7912cc5
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncMessageQueue.cs
@@ -0,0 +1,43 @@
+using System.Collections.Concurrent;
+using LibMatrix.EventTypes.Spec;
+using LibMatrix.RoomTypes;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class AsyncMessageQueue(ILogger<AsyncMessageQueue> logger, MatrixContentFilterConfiguration cfg) : BackgroundService {
+ private readonly ConcurrentQueue<(GenericRoom Room, RoomMessageEventContent Content)> _queue = new();
+ private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.LogMessages, cfg.ConcurrencyLimits.LogMessages);
+ public void EnqueueMessageAsync(GenericRoom room, RoomMessageEventContent content) {
+ _queue.Enqueue((room, content));
+
+ if (_queue.Count > 100) {
+ logger.LogWarning($"Message Queue is getting full (c={_queue.Count}), consider increasing the rate limit or exempting the bot!");
+ }
+ }
+
+ private async Task ProcessQueue() {
+ while (_queue.TryDequeue(out var message)) {
+ await _semaphore.WaitAsync();
+ _ = Task.Run(async () => {
+ try {
+ await message.Room.SendMessageEventAsync(message.Content);
+ }
+ finally {
+ _semaphore.Release();
+ }
+ });
+ }
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+ while (!stoppingToken.IsCancellationRequested) {
+ await ProcessQueue();
+ await Task.Delay(1000, stoppingToken);
+ }
+
+ //clear backlog and exit
+ await ProcessQueue();
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/BotModeSanityCheckService.cs b/MatrixContentFilter/Services/BotModeSanityCheckService.cs
new file mode 100644
index 0000000..55fe9e8
--- /dev/null
+++ b/MatrixContentFilter/Services/BotModeSanityCheckService.cs
@@ -0,0 +1,56 @@
+using System.Diagnostics;
+using ArcaneLibs;
+using ArcaneLibs.Extensions;
+using LibMatrix;
+using LibMatrix.Filters;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using MatrixContentFilter.Abstractions;
+using MatrixContentFilter.Handlers.Filters;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class BotModeSanityCheckService(
+ ILogger<BotModeSanityCheckService> logger,
+ AuthenticatedHomeserverGeneric hs,
+ ConfigurationService filterConfigService,
+ IEnumerable<IContentFilter> filters,
+ AsyncMessageQueue msgQueue
+) : BackgroundService {
+ /// <summary>Triggered when the application host is ready to start the service.</summary>
+ /// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
+ protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
+ while (!cancellationToken.IsCancellationRequested) {
+ await Task.Delay(10000, cancellationToken);
+ var rooms = await hs.GetJoinedRooms();
+ rooms.RemoveAll(x => x.RoomId == filterConfigService.LogRoom.RoomId);
+ rooms.RemoveAll(x => x.RoomId == filterConfigService.ControlRoom.RoomId);
+
+ var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000);
+ var timelines = rooms.Select(async x => {
+ var room = hs.GetRoom(x.RoomId);
+ // var sync = await room.GetMessagesAsync(null, 1500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode());
+ var iter = room.GetManyMessagesAsync(null, 5000, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode(), chunkSize: 250);
+ await foreach (var sync in iter) {
+ var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => {
+ try {
+ Console.WriteLine("Processing filter {0} (sanity check, chunk[s={1}])", filter.GetType().FullName, sync.Chunk.Count);
+ await filter.ProcessEventListAsync(sync.Chunk);
+ }
+ catch (Exception e) {
+ logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName);
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+ .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build());
+ }
+ });
+
+ await tasks;
+ }
+ }).ToList();
+ await Task.WhenAll(timelines);
+ }
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/ConfigurationService.cs b/MatrixContentFilter/Services/ConfigurationService.cs
new file mode 100644
index 0000000..f83c89a
--- /dev/null
+++ b/MatrixContentFilter/Services/ConfigurationService.cs
@@ -0,0 +1,198 @@
+using ArcaneLibs.Extensions;
+using LibMatrix;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using LibMatrix.RoomTypes;
+using LibMatrix.Utilities;
+using MatrixContentFilter.EventTypes;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class ConfigurationService(ILogger<ConfigurationService> logger, AuthenticatedHomeserverGeneric hs, AsyncMessageQueue msgQueue) : BackgroundService {
+ public BotEnvironmentConfiguration EnvironmentConfiguration { get; private set; }
+ public FilterConfiguration DefaultConfiguration { get; private set; }
+ public Dictionary<string, FilterConfiguration> RoomConfigurationOverrides { get; } = new();
+ public Dictionary<string, FilterConfiguration> FinalRoomConfigurations { get; } = new();
+
+ public GenericRoom LogRoom { get; private set; } = null!;
+ public GenericRoom ControlRoom { get; private set; } = null!;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+ var syncHelper = new SyncHelper(hs, logger) {
+ NamedFilterName = CommonSyncFilters.GetAccountDataWithRooms
+ };
+
+ await foreach (var sync in syncHelper.EnumerateSyncAsync(stoppingToken).WithCancellation(stoppingToken)) {
+ if (sync is { AccountData: null, Rooms: null }) continue;
+ logger.LogInformation("Received configuration update: {syncData}", sync.ToJson(ignoreNull: true));
+ await OnSyncReceived(sync);
+ }
+ }
+
+ public async Task OnSyncReceived(SyncResponse sync) {
+ if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == BotEnvironmentConfiguration.EventId) is { } envEvent) {
+ EnvironmentConfiguration = envEvent.TypedContent as BotEnvironmentConfiguration;
+ msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+ .WithColoredBody("#FF0088", "Environment configuration updated from sync.").WithNewline()
+ .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(EnvironmentConfiguration.ToJson(), "json"))
+ // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(envEvent.ToJson(), "json"))
+ .Build());
+ LogRoom = hs.GetRoom(EnvironmentConfiguration.LogRoomId!);
+ ControlRoom = hs.GetRoom(EnvironmentConfiguration.ControlRoomId!);
+ }
+
+ if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } filterEvent) {
+ DefaultConfiguration = filterEvent.TypedContent as FilterConfiguration;
+ msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+ .WithColoredBody("#00FF88", "Default filter configuration updated from sync.").WithNewline()
+ .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(DefaultConfiguration.ToJson(), "json"))
+ // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(filterEvent.ToJson(), "json"))
+ .Build());
+ }
+
+ await Parallel.ForEachAsync(sync.Rooms?.Join ?? [], async (syncRoom, ct) => {
+ var (roomId, roomData) = syncRoom;
+ if (roomId == LogRoom!.RoomId || roomId == ControlRoom!.RoomId) return;
+ var room = hs.GetRoom(roomId);
+
+ if (roomData.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } roomFilterEvent) {
+ RoomConfigurationOverrides[roomId] = roomFilterEvent.TypedContent as FilterConfiguration;
+ var roomName = await room.GetNameOrFallbackAsync();
+ msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+ .WithColoredBody("#00FF88", msb => msb.WithBody($"Filter configuration updated for ").WithMention(roomId, roomName).WithBody(" from sync.")).WithNewline()
+ .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(RoomConfigurationOverrides[roomId].ToJson(), "json"))
+ .Build());
+ }
+ });
+ }
+
+ public async Task OnStartup(MatrixContentFilterConfiguration configuration) {
+ BotEnvironmentConfiguration _environmentConfiguration;
+ try {
+ _environmentConfiguration = await hs.GetAccountDataAsync<BotEnvironmentConfiguration>(BotEnvironmentConfiguration.EventId);
+ }
+ catch (MatrixException e) {
+ if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw;
+ logger.LogWarning("No environment configuration found, creating one");
+ _environmentConfiguration = new BotEnvironmentConfiguration();
+ }
+
+ if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) {
+ LogRoom = await hs.CreateRoom(new() {
+ Name = "MatrixContentFilter logs",
+ Invite = configuration.Admins,
+ Visibility = "private"
+ });
+ var powerlevels = await LogRoom.GetPowerLevelsAsync();
+ powerlevels.EventsDefault = 20;
+ foreach (var admin in configuration.Admins) {
+ powerlevels.Users[admin] = 100;
+ }
+
+ await LogRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels);
+
+ _environmentConfiguration.LogRoomId = LogRoom.RoomId;
+ await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration);
+ }
+ else {
+ LogRoom = hs.GetRoom(_environmentConfiguration.LogRoomId!);
+ }
+
+ if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) {
+ ControlRoom = await hs.CreateRoom(new() {
+ Name = "MatrixContentFilter control room",
+ Invite = configuration.Admins,
+ Visibility = "private"
+ });
+ var powerlevels = await ControlRoom.GetPowerLevelsAsync();
+ powerlevels.EventsDefault = 20;
+ foreach (var admin in configuration.Admins) {
+ powerlevels.Users[admin] = 100;
+ }
+
+ await ControlRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels);
+
+ _environmentConfiguration.ControlRoomId = ControlRoom.RoomId;
+ await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration);
+ }
+ else {
+ ControlRoom = hs.GetRoom(_environmentConfiguration.ControlRoomId!);
+ }
+
+ FilterConfiguration _filterConfiguration;
+ try {
+ _filterConfiguration = await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId);
+ }
+ catch (MatrixException e) {
+ if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw;
+ logger.LogWarning("No filter configuration found, creating one");
+ msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "No filter configuration found, creating one").Build());
+ _filterConfiguration = new FilterConfiguration();
+ }
+
+ Dictionary<string, object> changes = new();
+
+ T Log<T>(string key, T value) {
+ changes[key] = value;
+ return value;
+ }
+
+ _filterConfiguration.IgnoredUsers ??= Log("ignored_users", (List<string>) [
+ hs.WhoAmI.UserId,
+ .. configuration.Admins
+ ]);
+
+ _filterConfiguration.FileFilter ??= new();
+ _filterConfiguration.FileFilter.IgnoredUsers ??= Log("file_filter->ignored_users", (List<string>) []);
+ _filterConfiguration.FileFilter.Allowed ??= Log("file_filter->allowed", false);
+
+ _filterConfiguration.ImageFilter ??= new();
+ _filterConfiguration.ImageFilter.IgnoredUsers ??= Log("image_filter->ignored_users", (List<string>) []);
+ _filterConfiguration.ImageFilter.Allowed ??= Log("image_filter->allowed", false);
+
+ _filterConfiguration.VideoFilter ??= new();
+ _filterConfiguration.VideoFilter.IgnoredUsers ??= Log("video_filter->ignored_users", (List<string>) []);
+ _filterConfiguration.VideoFilter.Allowed ??= Log("video_filter->allowed", false);
+
+ _filterConfiguration.AudioFilter ??= new();
+ _filterConfiguration.AudioFilter.IgnoredUsers ??= Log("audio_filter->ignored_users", (List<string>) []);
+ _filterConfiguration.AudioFilter.Allowed ??= Log("audio_filter->allowed", false);
+
+ _filterConfiguration.UrlFilter ??= new();
+ _filterConfiguration.UrlFilter.IgnoredUsers ??= Log("url_filter->ignored_users", (List<string>) []);
+ _filterConfiguration.UrlFilter.Allowed ??= Log("url_filter->allowed", false);
+
+ if (changes.Count > 0) {
+ await hs.SetAccountDataAsync(FilterConfiguration.EventId, _filterConfiguration);
+ msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "Default filter configuration updated").WithNewline()
+ .WithTable(msb => {
+ msb = msb.WithTitle("Default configuration changes", 2);
+
+ foreach (var (key, value) in changes) {
+ var formattedValue = value switch {
+ List<string> list => string.Join(", ", list),
+ _ => value.ToString()
+ };
+ msb = msb.WithRow(rb => { rb.WithCell(key).WithCell(formattedValue ?? "formattedValue was null!"); });
+ }
+ }).Build());
+ }
+ }
+
+ private async Task RebuildRoomConfigurations(FilterConfiguration? defaultConfig, Dictionary<string, FilterConfiguration?>? roomConfigurations) {
+ defaultConfig ??= await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId);
+ }
+
+ public async Task<FilterConfiguration> GetFinalRoomConfiguration(string roomId) {
+ if (FinalRoomConfigurations.TryGetValue(roomId, out var config)) return config;
+ var roomConfig = RoomConfigurationOverrides.GetValueOrDefault(roomId);
+ var defaultConfig = DefaultConfiguration;
+
+ FinalRoomConfigurations[roomId] = config;
+ return config;
+ }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/InfoCacheService.cs b/MatrixContentFilter/Services/InfoCacheService.cs
new file mode 100644
index 0000000..974e873
--- /dev/null
+++ b/MatrixContentFilter/Services/InfoCacheService.cs
@@ -0,0 +1,29 @@
+using ArcaneLibs.Collections;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.Homeservers;
+
+namespace MatrixContentFilter.Services;
+
+public class InfoCacheService(AuthenticatedHomeserverGeneric hs) {
+ private static readonly ExpiringSemaphoreCache<string> DisplayNameCache = new();
+ public static readonly ExpiringSemaphoreCache<string> RoomNameCache = new();
+
+ public async Task<string> GetDisplayNameAsync(string roomId, string userId) =>
+ await DisplayNameCache.GetOrAdd($"{roomId}\t{userId}", async () => {
+ var room = hs.GetRoom(roomId);
+ var userState = await room.GetStateAsync<RoomMemberEventContent>(RoomMemberEventContent.EventId, userId);
+ if (!string.IsNullOrWhiteSpace(userState?.DisplayName)) return userState.DisplayName;
+
+ var user = await hs.GetProfileAsync(userId);
+ if (!string.IsNullOrWhiteSpace(user?.DisplayName)) return user.DisplayName;
+
+ return userId;
+ }, TimeSpan.FromMinutes(5));
+
+ public async Task<string> GetRoomNameAsync(string roomId) =>
+ await RoomNameCache.GetOrAdd(roomId, async () => {
+ var room = hs.GetRoom(roomId);
+ var name = await room.GetNameOrFallbackAsync();
+ return name;
+ }, TimeSpan.FromMinutes(30));
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/MatrixContentFilterBot.cs b/MatrixContentFilter/Services/MatrixContentFilterBot.cs
new file mode 100644
index 0000000..321cdd4
--- /dev/null
+++ b/MatrixContentFilter/Services/MatrixContentFilterBot.cs
@@ -0,0 +1,143 @@
+using System.Diagnostics;
+using ArcaneLibs;
+using ArcaneLibs.Extensions;
+using LibMatrix.Filters;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using MatrixContentFilter.Abstractions;
+using MatrixContentFilter.Handlers.Filters;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class MatrixContentFilterBot(
+ ILogger<MatrixContentFilterBot> logger,
+ AuthenticatedHomeserverGeneric hs,
+ MatrixContentFilterConfiguration configuration,
+ ConfigurationService filterConfigService,
+ IEnumerable<IContentFilter> filters,
+ AsyncMessageQueue msgQueue
+) : BackgroundService {
+ /// <summary>Triggered when the application host is ready to start the service.</summary>
+ /// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
+ protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
+ try {
+ await filterConfigService.OnStartup(configuration);
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", "Bot startup successful! Listening for events.")
+ .Build());
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", msb => {
+ msb = msb.WithBody("Inserted filters implementations (internal):").WithNewline();
+ foreach (var filter in filters) {
+ msb = msb.WithBody(filter.GetType().FullName).WithNewline();
+ }
+ }).Build());
+ }
+ catch (Exception e) {
+ logger.LogError(e, "Error on startup");
+ Environment.Exit(1); // We don't want to do a graceful shutdown if we can't start up
+ }
+
+ logger.LogInformation("Bot started!");
+ await Run(cancellationToken);
+ }
+
+ private SyncHelper syncHelper;
+
+ private async Task Run(CancellationToken cancellationToken) {
+ var syncFilter = new SyncFilter() {
+ Room = new() {
+ NotRooms = [filterConfigService.LogRoom.RoomId],
+ Timeline = new(notTypes: ["m.room.redaction"])
+ }
+ };
+ syncHelper = new SyncHelper(hs, logger) {
+ Filter = syncFilter
+ };
+ int i = 0;
+ await foreach (var sync in syncHelper.EnumerateSyncAsync(cancellationToken).WithCancellation(cancellationToken)) {
+ // if (i++ >= 100) {
+ // var sw = Stopwatch.StartNew();
+ // for (int gen = 0; gen < GC.MaxGeneration; gen++) {
+ // GC.Collect(gen, GCCollectionMode.Forced, true, true);
+ // }
+ // i = 0;
+ // msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+ // .WithBody($"Garbage collection took {sw.ElapsedMilliseconds}ms")
+ // .Build());
+ // GC.
+ // }
+
+ // GC.TryStartNoGCRegion(1024 * 1024 * 1024);
+ var sw = Stopwatch.StartNew();
+ int actionCount = filters.Sum(x => x.ActionCount);
+ try {
+ await OnSyncReceived(sync);
+ }
+ catch (Exception e) {
+ logger.LogError(e, "Error processing sync");
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithBody($"Error processing sync: {e.Message}").Build());
+ }
+ finally {
+ Console.WriteLine("Processed sync in {0}, executed {1} actions, {2} of memory usage", sw.Elapsed, filters.Sum(x => x.ActionCount) - actionCount, Util.BytesToString(Environment.WorkingSet));
+ // GC.EndNoGCRegion();
+ }
+
+ // update sync filter
+ if (syncFilter.Room.NotRooms[0] != filterConfigService.LogRoom.RoomId) {
+ syncFilter.Room.NotRooms = [filterConfigService.LogRoom.RoomId];
+ syncHelper.Filter = syncFilter;
+ }
+ }
+ }
+
+ private int _syncCount;
+
+ private async Task OnSyncReceived(SyncResponse sync) {
+ if (_syncCount++ == 0) return; // Skip initial sync :/
+
+ if (sync.Rooms?.Join?.ContainsKey(filterConfigService.LogRoom.RoomId) == true) {
+ sync.Rooms?.Join?.Remove(filterConfigService.LogRoom.RoomId);
+ }
+
+ if (sync.Rooms?.Join?.ContainsKey(filterConfigService.ControlRoom.RoomId) == true) {
+ sync.Rooms?.Join?.Remove(filterConfigService.ControlRoom.RoomId);
+ }
+
+ // HACK: Server likes to send partial timelines during elevated activity, so we need to fetch them in order not to miss events
+ var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000);
+ var limitedTimelineRooms = sync.Rooms?.Join?
+ .Where(x => x.Value.Timeline?.Limited ?? false)
+ .Select(async x => {
+ var (roomId, roomData) = x;
+ var room = hs.GetRoom(roomId);
+ if (roomData.Timeline?.Limited == true) {
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+ .WithColoredBody("FF0000", $"Room {roomId} has limited timeline, fetching! The room may be getting spammed?")
+ .Build());
+ roomData.Timeline.Events ??= [];
+ var newEvents = await room.GetMessagesAsync(roomData.Timeline.PrevBatch ?? "", 500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false));
+ roomData.Timeline.Events.MergeBy(newEvents.Chunk, (x, y) => x.EventId == y.EventId, (x, y) => { });
+ }
+ })
+ .ToList();
+
+ if (limitedTimelineRooms?.Count > 0)
+ await Task.WhenAll(limitedTimelineRooms);
+
+ var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => {
+ try {
+ Console.WriteLine("Processing filter {0}", filter.GetType().FullName);
+ await filter.ProcessSyncAsync(sync);
+ }
+ catch (Exception e) {
+ logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName);
+ msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+ .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build());
+ }
+ });
+
+ await tasks;
+ }
+}
\ No newline at end of file
|