using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using ArcaneLibs; using ArcaneLibs.Collections; using ArcaneLibs.Extensions; using LibMatrix; using LibMatrix.EventTypes.Spec.State; using LibMatrix.EventTypes.Spec.State.RoomInfo; using LibMatrix.Filters; using LibMatrix.Helpers; using LibMatrix.Responses; using LibMatrix.Services; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ModerationClient.Models.SpaceTreeNodes; using ModerationClient.Services; namespace ModerationClient.ViewModels; public partial class ClientViewModel : ViewModelBase { public ClientViewModel(ILogger logger, MatrixAuthenticationService authService, CommandLineConfiguration cfg) { _logger = logger; _authService = authService; _cfg = cfg; DisplayedSpaces.Add(_allRoomsNode = new AllRoomsSpaceNode(this)); DisplayedSpaces.Add(DirectMessages = new SpaceNode(false) { Name = "Direct messages" }); _ = Task.Run(Run).ContinueWith(x => { if (x.IsFaulted) { Status = "Critical error running client view model: " + x.Exception?.Message; _logger.LogError(x.Exception, "Error running client view model."); } }); } private readonly ILogger _logger; private readonly MatrixAuthenticationService _authService; private readonly CommandLineConfiguration _cfg; private readonly SpaceNode _allRoomsNode; public ObservableCollection DisplayedSpaces { get; } = []; public ObservableDictionary AllRooms { get; } = new(); public SpaceNode DirectMessages { get; } public bool Paused { get; set; } = true; public string Status { get => field + " " + DateTime.Now; set => SetProperty(ref field, value); } = "Loading..."; [field: AllowNull, MaybeNull] public SpaceNode CurrentSpace { get => field ?? _allRoomsNode; set => SetProperty(ref field, value); } public RoomNode? CurrentRoom { get; set { if (SetProperty(ref field, value)) OnPropertyChanged(nameof(CurrentRoomViewModel)); } } public RoomViewModel? CurrentRoomViewModel => CurrentRoom is not null ? new RoomViewModel(_authService.Homeserver!, CurrentRoom) : null; public async Task Run() { Console.WriteLine("Running client view model loop..."); ArgumentNullException.ThrowIfNull(_authService.Homeserver, nameof(_authService.Homeserver)); // var sh = new SyncStateResolver(_authService.Homeserver, _logger, storageProvider: new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "syncCache"))); var store = new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "syncCache")); var mediaCache = new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "mediaCache")); Console.WriteLine($"Sync store at {store.TargetPath}"); var sh = new SyncHelper(_authService.Homeserver, new NullLogger(), storageProvider: store) { // MinimumDelay = TimeSpan.FromSeconds(1) }; Console.WriteLine("Sync helper created."); var unoptimised = await sh.GetUnoptimisedStoreCount(); // this is slow, so we cache //optimise - we create a new scope here to make ssr go out of scope if (unoptimised > 100) { Console.WriteLine("RUN - Optimising sync store..."); Status = "Optimising sync store, please wait..."; var ssr = new SyncStateResolver(_authService.Homeserver, _logger, storageProvider: store); Console.WriteLine("Created sync state resolver..."); Status = "Optimising sync store, please wait... Creating new snapshot..."; var sw = Stopwatch.StartNew(); await ssr.OptimiseStore((remaining, total) => { // if (remaining % (remaining / 10) == 0) if (sw.ElapsedMilliseconds > 100) { Status = $"Optimising sync store, please wait... {remaining}/{total} remaining..."; sw.Restart(); } }); Status = "Optimising sync store, please wait... Deleting old intermediate snapshots..."; await ssr.RemoveOldSnapshots(); for (int gen = 0; gen < GC.MaxGeneration; gen++) { Status = $"Collecting garbage #{gen}: {Util.BytesToString(GC.GetGCMemoryInfo().TotalCommittedBytes)}"; await Task.Delay(1000); } Status = $"Collecting garbage: {GC.GetTotalMemory(true) / 1024 / 1024}MB"; await Task.Delay(1000); } GC.Collect(); unoptimised = 0; Status = "Doing initial sync..."; var currentSyncRes = "init"; var lastGc = DateTime.Now; int syncGatewayTimeouts = 0; // var filter = new SyncFilter() { // Room = new SyncFilter.RoomFilter() { // Rooms = [] // } // }; // // sh.Filter = filter; sh.ExceptionHandlers.Add(async (ex) => { if (ex is HttpRequestException { StatusCode: HttpStatusCode.GatewayTimeout }) { syncGatewayTimeouts++; Status = $"Got {syncGatewayTimeouts} sync gateway timeouts..."; } }); await foreach (var res in sh.EnumerateSyncAsync()) { var sw = Stopwatch.StartNew(); //log thing // Program.Beep((short)250, 0); Status = $"Processing sync... {res.NextBatch}"; if (res.Rooms?.Leave is { Count: > 0 }) { var maxKeyLength = res.Rooms.Leave.Keys.Max(x => x.Length); foreach (var (key, value) in res.Rooms?.Leave ?? []) { List events = [..value.Timeline?.Events ?? [], ..value.State?.Events ?? []]; var ownEvents = events.Where(x => x is { StateKey: "@emma:rory.gay", Type: RoomMemberEventContent.EventId }).ToList(); var maxSenderLength = ownEvents.Max(x => x.Sender!.Length); foreach (var evt in ownEvents) { var ct = evt.ContentAs()!; var reasonString = string.IsNullOrWhiteSpace(ct.Reason) ? "" : $" (reason: {ct.Reason})"; Console.WriteLine($"Room {key.PadLeft(maxKeyLength)} removed: {evt.StateKey} by {evt.Sender!.PadLeft(maxSenderLength)} at {DateTimeOffset.FromUnixTimeMilliseconds((long)evt.OriginServerTs!)}, membership: {ct.Membership,6}{reasonString}"); } } } if (res.Rooms?.Leave is { Count: > 0 }) Console.WriteLine($"Processed {res.Rooms?.Leave?.Count} left rooms"); // await Task.Delay(10000); var applySw = Stopwatch.StartNew(); await ApplySyncChanges(res); applySw.Stop(); Program.Beep(0, 0); if (DateTime.Now - lastGc > TimeSpan.FromMinutes(1)) { lastGc = DateTime.Now; GC.Collect(); } Console.WriteLine($"Processed sync {currentSyncRes} in {sw.ElapsedMilliseconds}ms (applied in: {applySw.ElapsedMilliseconds}ms)"); if (Paused) { Status = "Sync loop interrupted... Press pause/break to resume."; while (Paused) await Task.Delay(100); } else Status = $"Syncing... {unoptimised++} unoptimised sync responses, last={currentSyncRes}..."; currentSyncRes = res.NextBatch; } } private async Task ApplySyncChanges(SyncResponse newSync) { await ApplySpaceChanges(newSync); if (newSync.AccountData?.Events?.FirstOrDefault(x => x.Type == "m.direct") is { } evt) { await ApplyDirectMessagesChanges(evt); } } private async Task ApplySpaceChanges(SyncResponse newSync) { List tasks = []; foreach (var room in newSync.Rooms?.Join ?? []) { if (!AllRooms.ContainsKey(room.Key)) { // AllRooms.Add(room.Key, new RoomNode { Name = "Loading..." }); AllRooms.Add(room.Key, new RoomNode { Name = "", RoomID = room.Key }); } var nameEvent = room.Value.State?.Events?.FirstOrDefault(x => x is { Type: "m.room.name", StateKey: "" }); if (nameEvent != null) AllRooms[room.Key].Name = (nameEvent?.TypedContent as RoomNameEventContent)?.Name ?? ""; if (string.IsNullOrWhiteSpace(AllRooms[room.Key].Name)) { AllRooms[room.Key].Name = "Loading..."; tasks.Add(_authService.Homeserver!.GetRoom(room.Key).GetNameOrFallbackAsync().ContinueWith(r => { if (r.IsFaulted) { _logger.LogError(r.Exception, "Error getting room name for {RoomKey}", room.Key); return AllRooms[room.Key].Name = "Error loading room name"; } return AllRooms[room.Key].Name = r.Result; })); // Status = $"Getting room name for {room.Key}..."; // AllRooms[room.Key].Name = await _authService.Homeserver!.GetRoom(room.Key).GetNameOrFallbackAsync(); } if (room.Value.Timeline?.Events is not null) { foreach (var evt in room.Value.Timeline.Events) { AllRooms[room.Key].Timeline.Add(evt); } } if (room.Value.State?.Events is not null) { foreach (var evt in room.Value.State.Events) { AllRooms[room.Key].State.Add(evt); } } } await AwaitTasks(tasks, "Waiting for {0}/{1} tasks while applying room changes..."); } private ExpiringSemaphoreCache _profileCache = new(); private async Task ApplyDirectMessagesChanges(StateEventResponse evt) { _logger.LogCritical("Direct messages updated!"); var dms = evt.RawContent.Deserialize>(); List tasks = []; foreach (var (userId, roomIds) in dms) { if (roomIds is null || roomIds.Length == 0) continue; var space = DirectMessages.ChildSpaces.FirstOrDefault(x => x.RoomID == userId); if (space is null) { space = new SpaceNode { Name = userId, RoomID = userId }; // tasks.Add(_authService.Homeserver!.GetProfileAsync(userId) // .ContinueWith(r => space.Name = string.IsNullOrWhiteSpace(r.Result.DisplayName) ? userId : r.Result.DisplayName)); tasks.Add(_profileCache.GetOrAdd(userId, async () => await _authService.Homeserver!.GetProfileAsync(userId), TimeSpan.FromMinutes(5)) .ContinueWith(r => { if (!r.IsFaulted) return string.IsNullOrWhiteSpace(r.Result.DisplayName) ? userId : r.Result.DisplayName; return userId; })); DirectMessages.ChildSpaces.Add(space); } foreach (var roomId in roomIds) { var room = space.ChildRooms.FirstOrDefault(x => x.RoomID == roomId); if (room is null) { room = AllRooms.TryGetValue(roomId, out var existing) ? existing : new RoomNode { Name = "Unknown: " + roomId, RoomID = roomId }; space.ChildRooms.Add(room); } } foreach (var spaceChildRoom in space.ChildRooms.ToList()) { if (!roomIds.Contains(spaceChildRoom.RoomID)) { space.ChildRooms.Remove(spaceChildRoom); } } } await AwaitTasks(tasks, "Waiting for {0}/{1} tasks while applying DM changes..."); } private async Task AwaitTasks(List tasks, string message) { if (tasks.Count > 0) { int total = tasks.Count; while (tasks.Any(x => !x.IsCompleted)) { int incomplete = tasks.Count(x => !x.IsCompleted); // Program.Beep((short)MathUtil.Map(incomplete, 0, total, 20, 7500), 5); // Program.Beep(0, 0); if (incomplete < 10 || incomplete % (total / 10) == 0) Status = string.Format(message, incomplete, total); await Task.WhenAny(tasks); tasks.RemoveAll(x => x.IsCompleted); } Program.Beep(0, 0); } } } // implementation details public class AllRoomsSpaceNode : SpaceNode { public AllRoomsSpaceNode(ClientViewModel vm) : base(false) { Name = "All rooms"; vm.AllRooms.CollectionChanged += (_, args) => { switch (args.Action) { case NotifyCollectionChangedAction.Add: case NotifyCollectionChangedAction.Remove: case NotifyCollectionChangedAction.Replace: { foreach (var room in args.NewItems?.Cast>() ?? []) ChildRooms.Add(room.Value); foreach (var room in args.OldItems?.Cast>() ?? []) ChildRooms.Remove(room.Value); break; } case NotifyCollectionChangedAction.Reset: { ChildSpaces.Clear(); ChildRooms.Clear(); break; } } }; } }