about summary refs log tree commit diff
path: root/MatrixContentFilter/Services
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2024-10-04 19:51:44 +0200
committerRory& <root@rory.gay>2024-10-04 19:51:44 +0200
commitc8f7ef7c1d2bd705a5442c0dc591b8e5a50673a5 (patch)
tree9b951c6e2c120ec370ce8318238aadbdda880a89 /MatrixContentFilter/Services
downloadMatrixContentFilter-master.tar.xz
Initial commit HEAD master
Diffstat (limited to 'MatrixContentFilter/Services')
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs38
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs60
-rw-r--r--MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs67
-rw-r--r--MatrixContentFilter/Services/AsyncMessageQueue.cs43
-rw-r--r--MatrixContentFilter/Services/BotModeSanityCheckService.cs56
-rw-r--r--MatrixContentFilter/Services/ConfigurationService.cs198
-rw-r--r--MatrixContentFilter/Services/InfoCacheService.cs29
-rw-r--r--MatrixContentFilter/Services/MatrixContentFilterBot.cs143
8 files changed, 634 insertions, 0 deletions
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
new file mode 100644
index 0000000..f4c559c
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/AbstractionAsyncActionQueue.cs
@@ -0,0 +1,38 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public abstract class AbstractAsyncActionQueue : BackgroundService {
+    private readonly ConcurrentStack<string> _recentIds = new();
+    private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+        SingleReader = true
+    });
+    private static CancellationTokenSource _cts = new();
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public virtual async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        throw new NotImplementedException();
+    }
+
+    private async Task ProcessQueue() {
+        throw new NotImplementedException();
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            Console.WriteLine("AbstractAsyncActionQueue waiting for new actions, this should never happen!");
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..3d7c90d
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/FiFoAsyncActionQueue.cs
@@ -0,0 +1,60 @@
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class FiFoAsyncActionQueue(ILogger<FiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+    // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+    private readonly HashSet<string> _recentIds = new();
+    private readonly Channel<Func<Task>> _queue = Channel.CreateUnbounded<Func<Task>>(new UnboundedChannelOptions() {
+        SingleReader = true
+    });
+    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        if (_recentIds.Contains(id)) {
+            logger.LogWarning("Duplicate action ID detected, ignoring action");
+            return false;
+        }
+        await _queue.Writer.WriteAsync(action);
+        _recentIds.Add(id);
+
+        if (_queue.Reader.Count > 100) {
+            logger.LogWarning("Action Queue is getting full, consider increasing the rate limit or exempting the bot!");
+        }
+
+        return true;
+    }
+
+    private async Task ProcessQueue() {
+        await foreach (var task in _queue.Reader.ReadAllAsync()) {
+            await _semaphore.WaitAsync();
+            _ = Task.Run(async () => {
+                try {
+                    await task.Invoke();
+                }
+                finally {
+                    _semaphore.Release();
+                }
+            });
+        }
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            logger.LogWarning("Waiting for new actions, ProcessQueue returned early!");
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
new file mode 100644
index 0000000..631cc74
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncActionQueues/LiFoAsyncActionQueue.cs
@@ -0,0 +1,67 @@
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services.AsyncActionQueues;
+
+public class LiFoAsyncActionQueue(ILogger<LiFoAsyncActionQueue> logger, MatrixContentFilterConfiguration cfg) : AbstractAsyncActionQueue {
+    // private readonly ConcurrentQueue<(string Id, Func<Task> Action)> _queue = new();
+    private readonly HashSet<string> _recentIds = new();
+    private readonly ConcurrentStack<(string Id, Func<Task> Action)> _queue = new();
+    private static CancellationTokenSource _cts = new();
+    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.Redactions, cfg.ConcurrencyLimits.Redactions);
+
+    /// <summary>
+    ///     Enqueue an action to be executed asynchronously
+    /// </summary>
+    /// <param name="id">Reproducible ID</param>
+    /// <param name="action">Action to execute</param>
+    /// <returns>`true` if action was appended, `false` if action was not added, eg. due to duplicate ID</returns>
+    public override async Task<bool> EqueueActionAsync(string id, Func<Task> action) {
+        if (_recentIds.Contains(id)) {
+            logger.LogWarning("Duplicate action ID detected, ignoring action");
+            return false;
+        }
+
+        _queue.Push((id, action));
+        _recentIds.Add(id);
+        _cts.Cancel(false);
+
+        return true;
+    }
+
+    private async Task ProcessQueue() {
+        // await foreach (var task in _queue2.Reader.ReadAllAsync()) {
+        while (_queue.TryPop(out var task)) {
+            await _semaphore.WaitAsync();
+            _ = Task.Run(async () => {
+                try {
+                    await task.Action.Invoke();
+                    _recentIds.Remove(task.Id);
+                }
+                finally {
+                    _semaphore.Release();
+                }
+            });
+        }
+
+        _cts.Dispose();
+        _cts = new CancellationTokenSource();
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            Console.WriteLine(GetType().Name + " waiting for new actions");
+            try {
+                await Task.Delay(10000, _cts.Token);
+            }
+            catch (TaskCanceledException) {
+                Console.WriteLine(GetType().Name + " _cts cancelled");
+                // ignore
+            }
+        }
+
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/AsyncMessageQueue.cs b/MatrixContentFilter/Services/AsyncMessageQueue.cs
new file mode 100644
index 0000000..7912cc5
--- /dev/null
+++ b/MatrixContentFilter/Services/AsyncMessageQueue.cs
@@ -0,0 +1,43 @@
+using System.Collections.Concurrent;
+using LibMatrix.EventTypes.Spec;
+using LibMatrix.RoomTypes;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class AsyncMessageQueue(ILogger<AsyncMessageQueue> logger, MatrixContentFilterConfiguration cfg) : BackgroundService {
+    private readonly ConcurrentQueue<(GenericRoom Room, RoomMessageEventContent Content)> _queue = new();
+    private readonly SemaphoreSlim _semaphore = new(cfg.ConcurrencyLimits.LogMessages, cfg.ConcurrencyLimits.LogMessages);
+    public void EnqueueMessageAsync(GenericRoom room, RoomMessageEventContent content) {
+        _queue.Enqueue((room, content));
+        
+        if (_queue.Count > 100) {
+            logger.LogWarning($"Message Queue is getting full (c={_queue.Count}), consider increasing the rate limit or exempting the bot!");
+        }
+    }
+    
+    private async Task ProcessQueue() {
+        while (_queue.TryDequeue(out var message)) {
+            await _semaphore.WaitAsync();
+            _ = Task.Run(async () => {
+                try {
+                    await message.Room.SendMessageEventAsync(message.Content);
+                }
+                finally {
+                    _semaphore.Release();
+                }
+            });
+        }
+    }
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        while (!stoppingToken.IsCancellationRequested) {
+            await ProcessQueue();
+            await Task.Delay(1000, stoppingToken);
+        }
+        
+        //clear backlog and exit
+        await ProcessQueue();
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/BotModeSanityCheckService.cs b/MatrixContentFilter/Services/BotModeSanityCheckService.cs
new file mode 100644
index 0000000..55fe9e8
--- /dev/null
+++ b/MatrixContentFilter/Services/BotModeSanityCheckService.cs
@@ -0,0 +1,56 @@
+using System.Diagnostics;
+using ArcaneLibs;
+using ArcaneLibs.Extensions;
+using LibMatrix;
+using LibMatrix.Filters;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using MatrixContentFilter.Abstractions;
+using MatrixContentFilter.Handlers.Filters;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class BotModeSanityCheckService(
+    ILogger<BotModeSanityCheckService> logger,
+    AuthenticatedHomeserverGeneric hs,
+    ConfigurationService filterConfigService,
+    IEnumerable<IContentFilter> filters,
+    AsyncMessageQueue msgQueue
+) : BackgroundService {
+    /// <summary>Triggered when the application host is ready to start the service.</summary>
+    /// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
+    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
+        while (!cancellationToken.IsCancellationRequested) {
+            await Task.Delay(10000, cancellationToken);
+            var rooms = await hs.GetJoinedRooms();
+            rooms.RemoveAll(x => x.RoomId == filterConfigService.LogRoom.RoomId);
+            rooms.RemoveAll(x => x.RoomId == filterConfigService.ControlRoom.RoomId);
+
+            var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000);
+            var timelines = rooms.Select(async x => {
+                var room = hs.GetRoom(x.RoomId);
+                // var sync = await room.GetMessagesAsync(null, 1500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode());
+                var iter = room.GetManyMessagesAsync(null, 5000, filter: timelineFilter.ToJson(ignoreNull: true, indent: false).UrlEncode(), chunkSize: 250);
+                await foreach (var sync in iter) {
+                    var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => {
+                        try {
+                            Console.WriteLine("Processing filter {0} (sanity check, chunk[s={1}])", filter.GetType().FullName, sync.Chunk.Count);
+                            await filter.ProcessEventListAsync(sync.Chunk);
+                        }
+                        catch (Exception e) {
+                            logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName);
+                            msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+                                .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build());
+                        }
+                    });
+
+                    await tasks;
+                }
+            }).ToList();
+            await Task.WhenAll(timelines);
+        }
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/ConfigurationService.cs b/MatrixContentFilter/Services/ConfigurationService.cs
new file mode 100644
index 0000000..f83c89a
--- /dev/null
+++ b/MatrixContentFilter/Services/ConfigurationService.cs
@@ -0,0 +1,198 @@
+using ArcaneLibs.Extensions;
+using LibMatrix;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using LibMatrix.RoomTypes;
+using LibMatrix.Utilities;
+using MatrixContentFilter.EventTypes;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class ConfigurationService(ILogger<ConfigurationService> logger, AuthenticatedHomeserverGeneric hs, AsyncMessageQueue msgQueue) : BackgroundService {
+    public BotEnvironmentConfiguration EnvironmentConfiguration { get; private set; }
+    public FilterConfiguration DefaultConfiguration { get; private set; }
+    public Dictionary<string, FilterConfiguration> RoomConfigurationOverrides { get; } = new();
+    public Dictionary<string, FilterConfiguration> FinalRoomConfigurations { get; } = new();
+
+    public GenericRoom LogRoom { get; private set; } = null!;
+    public GenericRoom ControlRoom { get; private set; } = null!;
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
+        var syncHelper = new SyncHelper(hs, logger) {
+            NamedFilterName = CommonSyncFilters.GetAccountDataWithRooms
+        };
+
+        await foreach (var sync in syncHelper.EnumerateSyncAsync(stoppingToken).WithCancellation(stoppingToken)) {
+            if (sync is { AccountData: null, Rooms: null }) continue;
+            logger.LogInformation("Received configuration update: {syncData}", sync.ToJson(ignoreNull: true));
+            await OnSyncReceived(sync);
+        }
+    }
+
+    public async Task OnSyncReceived(SyncResponse sync) {
+        if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == BotEnvironmentConfiguration.EventId) is { } envEvent) {
+            EnvironmentConfiguration = envEvent.TypedContent as BotEnvironmentConfiguration;
+            msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+                .WithColoredBody("#FF0088", "Environment configuration updated from sync.").WithNewline()
+                .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(EnvironmentConfiguration.ToJson(), "json"))
+                // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(envEvent.ToJson(), "json"))
+                .Build());
+            LogRoom = hs.GetRoom(EnvironmentConfiguration.LogRoomId!);
+            ControlRoom = hs.GetRoom(EnvironmentConfiguration.ControlRoomId!);
+        }
+
+        if (sync.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } filterEvent) {
+            DefaultConfiguration = filterEvent.TypedContent as FilterConfiguration;
+            msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+                .WithColoredBody("#00FF88", "Default filter configuration updated from sync.").WithNewline()
+                .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(DefaultConfiguration.ToJson(), "json"))
+                // .WithCollapsibleSection("Full event JSON", _msb => _msb.WithCodeBlock(filterEvent.ToJson(), "json"))
+                .Build());
+        }
+
+        await Parallel.ForEachAsync(sync.Rooms?.Join ?? [], async (syncRoom, ct) => {
+            var (roomId, roomData) = syncRoom;
+            if (roomId == LogRoom!.RoomId || roomId == ControlRoom!.RoomId) return;
+            var room = hs.GetRoom(roomId);
+
+            if (roomData.AccountData?.Events?.FirstOrDefault(x => x.Type == FilterConfiguration.EventId) is { } roomFilterEvent) {
+                RoomConfigurationOverrides[roomId] = roomFilterEvent.TypedContent as FilterConfiguration;
+                var roomName = await room.GetNameOrFallbackAsync();
+                msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice")
+                    .WithColoredBody("#00FF88", msb => msb.WithBody($"Filter configuration updated for ").WithMention(roomId, roomName).WithBody(" from sync.")).WithNewline()
+                    .WithCollapsibleSection("JSON data:", msb => msb.WithCodeBlock(RoomConfigurationOverrides[roomId].ToJson(), "json"))
+                    .Build());
+            }
+        });
+    }
+
+    public async Task OnStartup(MatrixContentFilterConfiguration configuration) {
+        BotEnvironmentConfiguration _environmentConfiguration;
+        try {
+            _environmentConfiguration = await hs.GetAccountDataAsync<BotEnvironmentConfiguration>(BotEnvironmentConfiguration.EventId);
+        }
+        catch (MatrixException e) {
+            if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw;
+            logger.LogWarning("No environment configuration found, creating one");
+            _environmentConfiguration = new BotEnvironmentConfiguration();
+        }
+
+        if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) {
+            LogRoom = await hs.CreateRoom(new() {
+                Name = "MatrixContentFilter logs",
+                Invite = configuration.Admins,
+                Visibility = "private"
+            });
+            var powerlevels = await LogRoom.GetPowerLevelsAsync();
+            powerlevels.EventsDefault = 20;
+            foreach (var admin in configuration.Admins) {
+                powerlevels.Users[admin] = 100;
+            }
+
+            await LogRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels);
+
+            _environmentConfiguration.LogRoomId = LogRoom.RoomId;
+            await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration);
+        }
+        else {
+            LogRoom = hs.GetRoom(_environmentConfiguration.LogRoomId!);
+        }
+
+        if (string.IsNullOrWhiteSpace(_environmentConfiguration.ControlRoomId)) {
+            ControlRoom = await hs.CreateRoom(new() {
+                Name = "MatrixContentFilter control room",
+                Invite = configuration.Admins,
+                Visibility = "private"
+            });
+            var powerlevels = await ControlRoom.GetPowerLevelsAsync();
+            powerlevels.EventsDefault = 20;
+            foreach (var admin in configuration.Admins) {
+                powerlevels.Users[admin] = 100;
+            }
+
+            await ControlRoom.SendStateEventAsync(RoomPowerLevelEventContent.EventId, powerlevels);
+
+            _environmentConfiguration.ControlRoomId = ControlRoom.RoomId;
+            await hs.SetAccountDataAsync(BotEnvironmentConfiguration.EventId, _environmentConfiguration);
+        }
+        else {
+            ControlRoom = hs.GetRoom(_environmentConfiguration.ControlRoomId!);
+        }
+
+        FilterConfiguration _filterConfiguration;
+        try {
+            _filterConfiguration = await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId);
+        }
+        catch (MatrixException e) {
+            if (e is not { ErrorCode: MatrixException.ErrorCodes.M_NOT_FOUND }) throw;
+            logger.LogWarning("No filter configuration found, creating one");
+            msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "No filter configuration found, creating one").Build());
+            _filterConfiguration = new FilterConfiguration();
+        }
+
+        Dictionary<string, object> changes = new();
+
+        T Log<T>(string key, T value) {
+            changes[key] = value;
+            return value;
+        }
+
+        _filterConfiguration.IgnoredUsers ??= Log("ignored_users", (List<string>) [
+            hs.WhoAmI.UserId,
+            .. configuration.Admins
+        ]);
+
+        _filterConfiguration.FileFilter ??= new();
+        _filterConfiguration.FileFilter.IgnoredUsers ??= Log("file_filter->ignored_users", (List<string>) []);
+        _filterConfiguration.FileFilter.Allowed ??= Log("file_filter->allowed", false);
+
+        _filterConfiguration.ImageFilter ??= new();
+        _filterConfiguration.ImageFilter.IgnoredUsers ??= Log("image_filter->ignored_users", (List<string>) []);
+        _filterConfiguration.ImageFilter.Allowed ??= Log("image_filter->allowed", false);
+
+        _filterConfiguration.VideoFilter ??= new();
+        _filterConfiguration.VideoFilter.IgnoredUsers ??= Log("video_filter->ignored_users", (List<string>) []);
+        _filterConfiguration.VideoFilter.Allowed ??= Log("video_filter->allowed", false);
+
+        _filterConfiguration.AudioFilter ??= new();
+        _filterConfiguration.AudioFilter.IgnoredUsers ??= Log("audio_filter->ignored_users", (List<string>) []);
+        _filterConfiguration.AudioFilter.Allowed ??= Log("audio_filter->allowed", false);
+
+        _filterConfiguration.UrlFilter ??= new();
+        _filterConfiguration.UrlFilter.IgnoredUsers ??= Log("url_filter->ignored_users", (List<string>) []);
+        _filterConfiguration.UrlFilter.Allowed ??= Log("url_filter->allowed", false);
+
+        if (changes.Count > 0) {
+            await hs.SetAccountDataAsync(FilterConfiguration.EventId, _filterConfiguration);
+            msgQueue.EnqueueMessageAsync(LogRoom, new MessageBuilder("m.notice").WithColoredBody("#FF0000", "Default filter configuration updated").WithNewline()
+                .WithTable(msb => {
+                    msb = msb.WithTitle("Default configuration changes", 2);
+
+                    foreach (var (key, value) in changes) {
+                        var formattedValue = value switch {
+                            List<string> list => string.Join(", ", list),
+                            _ => value.ToString()
+                        };
+                        msb = msb.WithRow(rb => { rb.WithCell(key).WithCell(formattedValue ?? "formattedValue was null!"); });
+                    }
+                }).Build());
+        }
+    }
+
+    private async Task RebuildRoomConfigurations(FilterConfiguration? defaultConfig, Dictionary<string, FilterConfiguration?>? roomConfigurations) {
+        defaultConfig ??= await hs.GetAccountDataAsync<FilterConfiguration>(FilterConfiguration.EventId);
+    }
+
+    public async Task<FilterConfiguration> GetFinalRoomConfiguration(string roomId) {
+        if (FinalRoomConfigurations.TryGetValue(roomId, out var config)) return config;
+        var roomConfig = RoomConfigurationOverrides.GetValueOrDefault(roomId);
+        var defaultConfig = DefaultConfiguration;
+
+        FinalRoomConfigurations[roomId] = config;
+        return config;
+    }
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/InfoCacheService.cs b/MatrixContentFilter/Services/InfoCacheService.cs
new file mode 100644
index 0000000..974e873
--- /dev/null
+++ b/MatrixContentFilter/Services/InfoCacheService.cs
@@ -0,0 +1,29 @@
+using ArcaneLibs.Collections;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.Homeservers;
+
+namespace MatrixContentFilter.Services;
+
+public class InfoCacheService(AuthenticatedHomeserverGeneric hs) {
+    private static readonly ExpiringSemaphoreCache<string> DisplayNameCache = new();
+    public static readonly ExpiringSemaphoreCache<string> RoomNameCache = new();
+
+    public async Task<string> GetDisplayNameAsync(string roomId, string userId) =>
+        await DisplayNameCache.GetOrAdd($"{roomId}\t{userId}", async () => {
+            var room = hs.GetRoom(roomId);
+            var userState = await room.GetStateAsync<RoomMemberEventContent>(RoomMemberEventContent.EventId, userId);
+            if (!string.IsNullOrWhiteSpace(userState?.DisplayName)) return userState.DisplayName;
+
+            var user = await hs.GetProfileAsync(userId);
+            if (!string.IsNullOrWhiteSpace(user?.DisplayName)) return user.DisplayName;
+
+            return userId;
+        }, TimeSpan.FromMinutes(5));
+
+    public async Task<string> GetRoomNameAsync(string roomId) =>
+        await RoomNameCache.GetOrAdd(roomId, async () => {
+            var room = hs.GetRoom(roomId);
+            var name = await room.GetNameOrFallbackAsync();
+            return name;
+        }, TimeSpan.FromMinutes(30));
+}
\ No newline at end of file
diff --git a/MatrixContentFilter/Services/MatrixContentFilterBot.cs b/MatrixContentFilter/Services/MatrixContentFilterBot.cs
new file mode 100644
index 0000000..321cdd4
--- /dev/null
+++ b/MatrixContentFilter/Services/MatrixContentFilterBot.cs
@@ -0,0 +1,143 @@
+using System.Diagnostics;
+using ArcaneLibs;
+using ArcaneLibs.Extensions;
+using LibMatrix.Filters;
+using LibMatrix.Helpers;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using MatrixContentFilter.Abstractions;
+using MatrixContentFilter.Handlers.Filters;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MatrixContentFilter.Services;
+
+public class MatrixContentFilterBot(
+    ILogger<MatrixContentFilterBot> logger,
+    AuthenticatedHomeserverGeneric hs,
+    MatrixContentFilterConfiguration configuration,
+    ConfigurationService filterConfigService,
+    IEnumerable<IContentFilter> filters,
+    AsyncMessageQueue msgQueue
+) : BackgroundService {
+    /// <summary>Triggered when the application host is ready to start the service.</summary>
+    /// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
+    protected override async Task ExecuteAsync(CancellationToken cancellationToken) {
+        try {
+            await filterConfigService.OnStartup(configuration);
+            msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", "Bot startup successful! Listening for events.")
+                .Build());
+            msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithColoredBody("#00FF00", msb => {
+                msb = msb.WithBody("Inserted filters implementations (internal):").WithNewline();
+                foreach (var filter in filters) {
+                    msb = msb.WithBody(filter.GetType().FullName).WithNewline();
+                }
+            }).Build());
+        }
+        catch (Exception e) {
+            logger.LogError(e, "Error on startup");
+            Environment.Exit(1); // We don't want to do a graceful shutdown if we can't start up
+        }
+
+        logger.LogInformation("Bot started!");
+        await Run(cancellationToken);
+    }
+
+    private SyncHelper syncHelper;
+
+    private async Task Run(CancellationToken cancellationToken) {
+        var syncFilter = new SyncFilter() {
+            Room = new() {
+                NotRooms = [filterConfigService.LogRoom.RoomId],
+                Timeline = new(notTypes: ["m.room.redaction"])
+            }
+        };
+        syncHelper = new SyncHelper(hs, logger) {
+            Filter = syncFilter
+        };
+        int i = 0;
+        await foreach (var sync in syncHelper.EnumerateSyncAsync(cancellationToken).WithCancellation(cancellationToken)) {
+            // if (i++ >= 100) {
+            //     var sw = Stopwatch.StartNew();
+            //     for (int gen = 0; gen < GC.MaxGeneration; gen++) {
+            //         GC.Collect(gen, GCCollectionMode.Forced, true, true);
+            //     }
+            //     i = 0;
+            //     msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+            //         .WithBody($"Garbage collection took {sw.ElapsedMilliseconds}ms")
+            //         .Build());
+            //     GC.
+            // }
+
+            // GC.TryStartNoGCRegion(1024 * 1024 * 1024);
+            var sw = Stopwatch.StartNew();
+            int actionCount = filters.Sum(x => x.ActionCount);
+            try {
+                await OnSyncReceived(sync);
+            }
+            catch (Exception e) {
+                logger.LogError(e, "Error processing sync");
+                msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice").WithBody($"Error processing sync: {e.Message}").Build());
+            }
+            finally {
+                Console.WriteLine("Processed sync in {0}, executed {1} actions, {2} of memory usage", sw.Elapsed, filters.Sum(x => x.ActionCount) - actionCount, Util.BytesToString(Environment.WorkingSet));
+                // GC.EndNoGCRegion();
+            }
+            
+            // update sync filter
+            if (syncFilter.Room.NotRooms[0] != filterConfigService.LogRoom.RoomId) {
+                syncFilter.Room.NotRooms = [filterConfigService.LogRoom.RoomId];
+                syncHelper.Filter = syncFilter;
+            }
+        }
+    }
+
+    private int _syncCount;
+
+    private async Task OnSyncReceived(SyncResponse sync) {
+        if (_syncCount++ == 0) return; // Skip initial sync :/
+
+        if (sync.Rooms?.Join?.ContainsKey(filterConfigService.LogRoom.RoomId) == true) {
+            sync.Rooms?.Join?.Remove(filterConfigService.LogRoom.RoomId);
+        }
+
+        if (sync.Rooms?.Join?.ContainsKey(filterConfigService.ControlRoom.RoomId) == true) {
+            sync.Rooms?.Join?.Remove(filterConfigService.ControlRoom.RoomId);
+        }
+
+        // HACK: Server likes to send partial timelines during elevated activity, so we need to fetch them in order not to miss events
+        var timelineFilter = new SyncFilter.RoomFilter.StateFilter(notTypes: ["m.room.redaction"], limit: 5000);
+        var limitedTimelineRooms = sync.Rooms?.Join?
+            .Where(x => x.Value.Timeline?.Limited ?? false)
+            .Select(async x => {
+                var (roomId, roomData) = x;
+                var room = hs.GetRoom(roomId);
+                if (roomData.Timeline?.Limited == true) {
+                    msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+                        .WithColoredBody("FF0000", $"Room {roomId} has limited timeline, fetching! The room may be getting spammed?")
+                        .Build());
+                    roomData.Timeline.Events ??= [];
+                    var newEvents = await room.GetMessagesAsync(roomData.Timeline.PrevBatch ?? "", 500, filter: timelineFilter.ToJson(ignoreNull: true, indent: false));
+                    roomData.Timeline.Events.MergeBy(newEvents.Chunk, (x, y) => x.EventId == y.EventId, (x, y) => { });
+                }
+            })
+            .ToList();
+
+        if (limitedTimelineRooms?.Count > 0)
+            await Task.WhenAll(limitedTimelineRooms);
+
+        var tasks = Parallel.ForEachAsync(filters, async (filter, ct) => {
+            try {
+                Console.WriteLine("Processing filter {0}", filter.GetType().FullName);
+                await filter.ProcessSyncAsync(sync);
+            }
+            catch (Exception e) {
+                logger.LogError(e, "Error processing sync with filter {filter}", filter.GetType().FullName);
+                msgQueue.EnqueueMessageAsync(filterConfigService.LogRoom, new MessageBuilder("m.notice")
+                    .WithBody($"Error processing sync with filter {filter.GetType().FullName}: {e.Message}").Build());
+            }
+        });
+
+        await tasks;
+    }
+}
\ No newline at end of file