about summary refs log tree commit diff
path: root/MiniUtils.Core/PolicyListFetcher.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MiniUtils.Core/PolicyListFetcher.cs')
-rw-r--r--MiniUtils.Core/PolicyListFetcher.cs86
1 files changed, 86 insertions, 0 deletions
diff --git a/MiniUtils.Core/PolicyListFetcher.cs b/MiniUtils.Core/PolicyListFetcher.cs
new file mode 100644

index 0000000..b177112 --- /dev/null +++ b/MiniUtils.Core/PolicyListFetcher.cs
@@ -0,0 +1,86 @@ +using LibMatrix.Filters; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.RoomTypes; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MiniUtils.Core; + +public class PolicyListFetcher(ILogger<PolicyListFetcher> logger, AntiDmSpamConfiguration config, AuthenticatedHomeserverGeneric homeserver, PolicyStore policyStore) + : IHostedService { + private CancellationTokenSource _cts = new(); + + public async Task StartAsync(CancellationToken cancellationToken) { + // _ = Enumerable.Range(0, 10_000_000).Select(x => { + // policyStore.AllPolicies.Add(Guid.NewGuid().ToString(), new UserPolicyRuleEventContent() { + // Entity = Guid.NewGuid().ToString() + x, + // Reason = "meow " + x, + // Recommendation = "m.ban" + // }); + // return 0; + // }).ToList(); + + logger.LogInformation("Starting policy list fetcher"); + await EnsurePolicyListsJoined(); + _ = SyncPolicyLists().ContinueWith(x => { + if (x.IsFaulted) { + logger.LogError(x.Exception, "Error in policy list fetcher"); + } + }, TaskContinuationOptions.OnlyOnFaulted); + } + + public async Task StopAsync(CancellationToken cancellationToken) { + logger.LogInformation("Stopping policy list fetcher"); + await _cts.CancelAsync(); + } + + private async Task EnsurePolicyListsJoined() { + var joinedRooms = await homeserver.GetJoinedRooms(); + var expectedPolicyRooms = config.PolicyLists; + var missingRooms = expectedPolicyRooms.Where(room => !joinedRooms.Any(r => r.RoomId == room.RoomId)).ToList(); + + await Task.WhenAll(missingRooms.Select(async room => { + logger.LogInformation("Joining policy list room {}", room.RoomId); + await homeserver.GetRoom(room.RoomId).JoinAsync(room.Vias); + }).ToList()); + } + + private async Task SyncPolicyLists() { + var syncHelper = new SyncHelper(homeserver, logger) { + Timeout = 30_000, + MinimumDelay = config.MinimumSyncTime, + FilterId = (await homeserver.UploadFilterAsync(new SyncFilter { + AccountData = SyncFilter.EventFilter.Empty, + Presence = SyncFilter.EventFilter.Empty, + Room = new SyncFilter.RoomFilter { + AccountData = SyncFilter.RoomFilter.StateFilter.Empty, + Ephemeral = SyncFilter.RoomFilter.StateFilter.Empty, + State = new SyncFilter.RoomFilter.StateFilter(types: PolicyRoom.SpecPolicyEventTypes.ToList()), + Timeline = new SyncFilter.RoomFilter.StateFilter(types: PolicyRoom.SpecPolicyEventTypes.ToList()), + Rooms = config.PolicyLists.Select(x => x.RoomId).ToList(), + IncludeLeave = false + } + })).FilterId, + UseMsc4222StateAfter = true + }; + + await foreach (var syncResponse in syncHelper.EnumerateSyncAsync(_cts.Token)) { + if (_cts.IsCancellationRequested) return; + + if (syncResponse is { Rooms.Join.Count: > 0 }) { + var newPolicies = syncResponse.Rooms.Join + .Where(x => config.PolicyLists.Any(y => y.RoomId == x.Key)) + .SelectMany(x => x.Value.StateAfter?.Events ?? []) + .Where(x => PolicyRoom.SpecPolicyEventTypes.Contains(x.Type)) + .ToList(); + + logger.LogWarning("Received non-empty sync response with {}/{}/{} rooms, resulting in {} policies", syncResponse.Rooms?.Join?.Count, + syncResponse.Rooms?.Invite?.Count, + syncResponse.Rooms?.Leave?.Count, newPolicies.Count); + + await policyStore.AddPoliciesAsync(newPolicies); + } + } + } +} \ No newline at end of file