using System.Collections.Concurrent; using ArcaneLibs.Extensions; using LibMatrix; using LibMatrix.EventTypes.Spec.State.RoomInfo; using LibMatrix.Helpers; using LibMatrix.Homeservers; using LibMatrix.Responses; using LibMatrix.RoomTypes; using MatrixContentFilter.Abstractions; using MatrixContentFilter.EventTypes; using MatrixContentFilter.Services; using MatrixContentFilter.Services.AsyncActionQueues; namespace MatrixContentFilter.Handlers.Filters; public class PendingInviteLimiter( ConfigurationService cfgService, AuthenticatedHomeserverGeneric hs, AsyncMessageQueue msgQueue, InfoCacheService infoCache, AbstractAsyncActionQueue actionQueue, MatrixContentFilterMetrics metrics) : IContentFilter { public override Task ProcessSyncAsync(SyncResponse syncResponse) { if (syncResponse.Rooms?.Join is null) return Task.CompletedTask; var tasks = syncResponse.Rooms.Join.Select(ProcessRoomAsync); return Task.WhenAll(tasks); } private Task ProcessRoomAsync(KeyValuePair syncRoom) { var (roomId, roomData) = syncRoom; if (roomId == cfgService.LogRoom.RoomId || roomId == cfgService.ControlRoom.RoomId) return Task.CompletedTask; if (roomData.Timeline?.Events is null) return Task.CompletedTask; var config = cfgService.RoomConfigurationOverrides.GetValueOrDefault(roomId)?.InviteLimiter; var room = hs.GetRoom(roomId); var tasks = roomData.Timeline.Events.Select(msg => ProcessEventAsync(room, msg, config)).ToList(); return Task.WhenAll(tasks); } public override Task ProcessEventListAsync(List events) { var tasks = events.GroupBy(x => x.RoomId).Select(async x => { var room = hs.GetRoom(x.Key); var config = cfgService.RoomConfigurationOverrides.GetValueOrDefault(x.Key)?.InviteLimiter; var tasks = x.Select(msg => ProcessEventAsync(room, msg, config)).ToList(); await Task.WhenAll(tasks); }).ToList(); return Task.WhenAll(tasks); } // key format: roomid:sender private ConcurrentDictionary HeuristicInviteCount = new(); private async Task ProcessEventAsync(GenericRoom room, StateEventResponse evt, FilterConfiguration.BasicLimiterConfiguration? roomConfiguration) { if (evt.Type != "m.room.member") return; var content = evt.TypedContent as RoomMemberEventContent; if (content?.Membership != "invite") return; metrics.Increment("pending_invite_limiter_processed_event_count"); var key = $"{evt.RoomId}:{evt.Sender}"; HeuristicInviteCount.AddOrUpdate(key, 1, (_, count) => count + 1); if (HeuristicInviteCount[key] > 5) { await actionQueue.EqueueActionAsync(evt.EventId, async () => { var displayName = await infoCache.GetDisplayNameAsync(room.RoomId, evt.Sender!); var roomName = await infoCache.GetRoomNameAsync(room.RoomId); msgQueue.EnqueueMessageAsync(cfgService.LogRoom, new MessageBuilder("m.notice") .WithBody("Pending invite limiter heuristic tripped for ").WithMention(evt.Sender!).WithBody(" in ").WithMention(evt.RoomId, roomName).WithBody($" ({HeuristicInviteCount[key]})!").WithNewline() .WithBody("Updating heuristics with real counts...") .Build()); var invitedMembersByInviter = (await room.GetMembersListAsync(joinedOnly: false)) .Where(x => x.ContentAs()!.Membership == "invite") .GroupBy(x=>x.Sender!); foreach (var sender in invitedMembersByInviter) { HeuristicInviteCount.AddOrUpdate($"{room.RoomId}:{sender.Key}", sender.Count(), (_, count) => count + sender.Count()); msgQueue.EnqueueMessageAsync(cfgService.LogRoom, new MessageBuilder("m.notice") .WithBody("Updated heuristic count for ").WithMention(sender.Key).WithBody(" in ").WithMention(room.RoomId, roomName).WithBody($" ({HeuristicInviteCount[key]})!").WithNewline() .Build()); } // msgQueue.EnqueueMessageAsync(cfgService.LogRoom, new MessageBuilder("m.notice") // .WithBody("Invite from ").WithMention(evt.Sender, displayName).WithBody(" in ").WithMention(room.RoomId, roomName).WithBody(" was rejected!").WithNewline() // .WithCollapsibleSection("Message data", msb => msb.WithCodeBlock(content.ToJson(ignoreNull: true), "json")) // .Build()); }); } ActionCount++; } }