diff --git a/ModerationClient/ViewModels/ClientViewModel.cs b/ModerationClient/ViewModels/ClientViewModel.cs
index 312b46a..378c0de 100644
--- a/ModerationClient/ViewModels/ClientViewModel.cs
+++ b/ModerationClient/ViewModels/ClientViewModel.cs
@@ -1,17 +1,29 @@
-using System;
+using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Text.Json;
+using System.Threading;
using System.Threading.Tasks;
+using ArcaneLibs;
using ArcaneLibs.Collections;
+using ArcaneLibs.Extensions;
+using LibMatrix;
using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.EventTypes.Spec.State.RoomInfo;
+using LibMatrix.Filters;
using LibMatrix.Helpers;
using LibMatrix.Responses;
-using MatrixUtils.Abstractions;
+using LibMatrix.Services;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using ModerationClient.Models.SpaceTreeNodes;
using ModerationClient.Services;
namespace ModerationClient.ViewModels;
@@ -22,48 +34,157 @@ public partial class ClientViewModel : ViewModelBase {
_authService = authService;
_cfg = cfg;
DisplayedSpaces.Add(_allRoomsNode = new AllRoomsSpaceNode(this));
- _ = Task.Run(Run);
+ DisplayedSpaces.Add(DirectMessages = new SpaceNode(false) { Name = "Direct messages" });
+ _ = Task.Run(Run).ContinueWith(x => {
+ if (x.IsFaulted) {
+ Status = "Critical error running client view model: " + x.Exception?.Message;
+ _logger.LogError(x.Exception, "Error running client view model.");
+ }
+ });
}
private readonly ILogger<ClientViewModel> _logger;
private readonly MatrixAuthenticationService _authService;
private readonly CommandLineConfiguration _cfg;
- private SpaceNode? _currentSpace;
private readonly SpaceNode _allRoomsNode;
- private string _status = "Loading...";
public ObservableCollection<SpaceNode> DisplayedSpaces { get; } = [];
public ObservableDictionary<string, RoomNode> AllRooms { get; } = new();
+ public SpaceNode DirectMessages { get; }
+
+ public bool Paused { get; set; } = true;
+
+ public string Status {
+ get => field + " " + DateTime.Now;
+ set => SetProperty(ref field, value);
+ } = "Loading...";
+ [field: AllowNull, MaybeNull]
public SpaceNode CurrentSpace {
- get => _currentSpace ?? _allRoomsNode;
- set => SetProperty(ref _currentSpace, value);
+ get => field ?? _allRoomsNode;
+ set => SetProperty(ref field, value);
}
- public string Status {
- get => _status + " " + DateTime.Now;
- set => SetProperty(ref _status, value);
+ public RoomNode? CurrentRoom {
+ get;
+ set {
+ if (SetProperty(ref field, value)) OnPropertyChanged(nameof(CurrentRoomViewModel));
+ }
}
+ public RoomViewModel? CurrentRoomViewModel => CurrentRoom is not null ? new RoomViewModel(_authService.Homeserver!, CurrentRoom) : null;
+
public async Task Run() {
- Status = "Interrupted.";
- return;
+ Console.WriteLine("Running client view model loop...");
+ ArgumentNullException.ThrowIfNull(_authService.Homeserver, nameof(_authService.Homeserver));
+ // var sh = new SyncStateResolver(_authService.Homeserver, _logger, storageProvider: new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "syncCache")));
+ var store = new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "syncCache"));
+ var mediaCache = new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "mediaCache"));
+ Console.WriteLine($"Sync store at {store.TargetPath}");
+
+ var sh = new SyncHelper(_authService.Homeserver, new NullLogger<SyncHelper>(), storageProvider: store) {
+ // MinimumDelay = TimeSpan.FromSeconds(1)
+ };
+ Console.WriteLine("Sync helper created.");
+
+ var unoptimised = await sh.GetUnoptimisedStoreCount(); // this is slow, so we cache
+ //optimise - we create a new scope here to make ssr go out of scope
+ if (unoptimised > 100) {
+ Console.WriteLine("RUN - Optimising sync store...");
+ Status = "Optimising sync store, please wait...";
+ var ssr = new SyncStateResolver(_authService.Homeserver, _logger, storageProvider: store);
+ Console.WriteLine("Created sync state resolver...");
+ Status = "Optimising sync store, please wait... Creating new snapshot...";
+ var sw = Stopwatch.StartNew();
+ await ssr.OptimiseStore((remaining, total) => {
+ // if (remaining % (remaining / 10) == 0)
+ if (sw.ElapsedMilliseconds > 100) {
+ Status = $"Optimising sync store, please wait... {remaining}/{total} remaining...";
+ sw.Restart();
+ }
+ });
+ Status = "Optimising sync store, please wait... Deleting old intermediate snapshots...";
+ await ssr.RemoveOldSnapshots();
+ for (int gen = 0; gen < GC.MaxGeneration; gen++) {
+ Status = $"Collecting garbage #{gen}: {Util.BytesToString(GC.GetGCMemoryInfo().TotalCommittedBytes)}";
+ await Task.Delay(1000);
+ }
+
+ Status = $"Collecting garbage: {GC.GetTotalMemory(true) / 1024 / 1024}MB";
+ await Task.Delay(1000);
+ }
+
+ GC.Collect();
+ unoptimised = 0;
Status = "Doing initial sync...";
- var sh = new SyncStateResolver(_authService.Homeserver, _logger, storageProvider: new FileStorageProvider(Path.Combine(_cfg.ProfileDirectory, "syncCache")));
- // var res = await sh.SyncAsync();
- //await sh.OptimiseStore();
- while (true) {
- // Status = "Syncing...";
- var res = await sh.ContinueAsync();
- Status = $"Processing sync... {res.next.NextBatch}";
- await ApplySpaceChanges(res.next);
- //OnPropertyChanged(nameof(CurrentSpace));
- //OnPropertyChanged(nameof(CurrentSpace.ChildRooms));
- // Console.WriteLine($"mow A={AllRooms.Count}|D={DisplayedSpaces.Count}");
- // for (int i = 0; i < GC.MaxGeneration; i++) {
- // GC.Collect(i, GCCollectionMode.Forced, blocking: true);
- // GC.WaitForPendingFinalizers();
- // }
- Status = "Syncing...";
+ var currentSyncRes = "init";
+ var lastGc = DateTime.Now;
+ int syncGatewayTimeouts = 0;
+ // var filter = new SyncFilter() {
+ // Room = new SyncFilter.RoomFilter() {
+ // Rooms = []
+ // }
+ // };
+ //
+ // sh.Filter = filter;
+
+ sh.ExceptionHandlers.Add(async (ex) => {
+ if (ex is HttpRequestException { StatusCode: HttpStatusCode.GatewayTimeout }) {
+ syncGatewayTimeouts++;
+ Status = $"Got {syncGatewayTimeouts} sync gateway timeouts...";
+ }
+ });
+ await foreach (var res in sh.EnumerateSyncAsync()) {
+ var sw = Stopwatch.StartNew();
+ //log thing
+
+ // Program.Beep((short)250, 0);
+ Status = $"Processing sync... {res.NextBatch}";
+
+ if (res.Rooms?.Leave is { Count: > 0 }) {
+ var maxKeyLength = res.Rooms.Leave.Keys.Max(x => x.Length);
+ foreach (var (key, value) in res.Rooms?.Leave ?? []) {
+ List<StateEventResponse> events = [..value.Timeline?.Events ?? [], ..value.State?.Events ?? []];
+ var ownEvents = events.Where(x => x is { StateKey: "@emma:rory.gay", Type: RoomMemberEventContent.EventId }).ToList();
+ var maxSenderLength = ownEvents.Max(x => x.Sender!.Length);
+ foreach (var evt in ownEvents) {
+ var ct = evt.ContentAs<RoomMemberEventContent>()!;
+ var reasonString = string.IsNullOrWhiteSpace(ct.Reason)
+ ? ""
+ : $" (reason: {ct.Reason})";
+ Console.WriteLine($"Room {key.PadLeft(maxKeyLength)} removed: {evt.StateKey} by {evt.Sender!.PadLeft(maxSenderLength)} at {DateTimeOffset.FromUnixTimeMilliseconds((long)evt.OriginServerTs!)}, membership: {ct.Membership,6}{reasonString}");
+ }
+ }
+ }
+
+ if (res.Rooms?.Leave is { Count: > 0 })
+ Console.WriteLine($"Processed {res.Rooms?.Leave?.Count} left rooms");
+ // await Task.Delay(10000);
+
+ var applySw = Stopwatch.StartNew();
+ await ApplySyncChanges(res);
+ applySw.Stop();
+
+ Program.Beep(0, 0);
+ if (DateTime.Now - lastGc > TimeSpan.FromMinutes(1)) {
+ lastGc = DateTime.Now;
+ GC.Collect();
+ }
+
+ Console.WriteLine($"Processed sync {currentSyncRes} in {sw.ElapsedMilliseconds}ms (applied in: {applySw.ElapsedMilliseconds}ms)");
+ if (Paused) {
+ Status = "Sync loop interrupted... Press pause/break to resume.";
+ while (Paused) await Task.Delay(100);
+ }
+ else Status = $"Syncing... {unoptimised++} unoptimised sync responses, last={currentSyncRes}...";
+
+ currentSyncRes = res.NextBatch;
+ }
+ }
+
+ private async Task ApplySyncChanges(SyncResponse newSync) {
+ await ApplySpaceChanges(newSync);
+ if (newSync.AccountData?.Events?.FirstOrDefault(x => x.Type == "m.direct") is { } evt) {
+ await ApplyDirectMessagesChanges(evt);
}
}
@@ -71,60 +192,105 @@ public partial class ClientViewModel : ViewModelBase {
List<Task> tasks = [];
foreach (var room in newSync.Rooms?.Join ?? []) {
if (!AllRooms.ContainsKey(room.Key)) {
- AllRooms.Add(room.Key, new RoomNode { Name = "Loading..." });
+ // AllRooms.Add(room.Key, new RoomNode { Name = "Loading..." });
+ AllRooms.Add(room.Key, new RoomNode { Name = "", RoomID = room.Key });
}
- if (room.Value.State?.Events is not null) {
- var nameEvent = room.Value.State!.Events!.FirstOrDefault(x => x.Type == "m.room.name" && x.StateKey == "");
+ var nameEvent = room.Value.State?.Events?.FirstOrDefault(x => x is { Type: "m.room.name", StateKey: "" });
+ if (nameEvent != null)
AllRooms[room.Key].Name = (nameEvent?.TypedContent as RoomNameEventContent)?.Name ?? "";
- if (string.IsNullOrWhiteSpace(AllRooms[room.Key].Name)) {
- AllRooms[room.Key].Name = "Loading...";
- tasks.Add(_authService.Homeserver!.GetRoom(room.Key).GetNameOrFallbackAsync().ContinueWith(r => AllRooms[room.Key].Name = r.Result));
+
+ if (string.IsNullOrWhiteSpace(AllRooms[room.Key].Name)) {
+ AllRooms[room.Key].Name = "Loading...";
+ tasks.Add(_authService.Homeserver!.GetRoom(room.Key).GetNameOrFallbackAsync().ContinueWith(r => {
+ if (r.IsFaulted) {
+ _logger.LogError(r.Exception, "Error getting room name for {RoomKey}", room.Key);
+ return AllRooms[room.Key].Name = "Error loading room name";
+ }
+
+ return AllRooms[room.Key].Name = r.Result;
+ }));
+ // Status = $"Getting room name for {room.Key}...";
+ // AllRooms[room.Key].Name = await _authService.Homeserver!.GetRoom(room.Key).GetNameOrFallbackAsync();
+ }
+
+ if (room.Value.Timeline?.Events is not null) {
+ foreach (var evt in room.Value.Timeline.Events) {
+ AllRooms[room.Key].Timeline.Add(evt);
+ }
+ }
+
+ if (room.Value.State?.Events is not null) {
+ foreach (var evt in room.Value.State.Events) {
+ AllRooms[room.Key].State.Add(evt);
}
}
}
-
- await Task.WhenAll(tasks);
-
- return;
-
- List<string> handledRoomIds = [];
- var spaces = newSync.Rooms?.Join?
- .Where(x => x.Value.State?.Events is not null)
- .Where(x => x.Value.State!.Events!.Any(y => y.Type == "m.room.create" && (y.TypedContent as RoomCreateEventContent)!.Type == "m.space"))
- .ToList();
- Console.WriteLine("spaces: " + spaces.Count);
- var nonRootSpaces = spaces
- .Where(x => spaces.Any(x => x.Value.State!.Events!.Any(y => y.Type == "m.space.child" && y.StateKey == x.Key)))
- .ToDictionary();
-
- var rootSpaces = spaces
- .Where(x => !nonRootSpaces.ContainsKey(x.Key))
- .ToDictionary();
- // var rootSpaces = spaces
- // .Where(x=>!spaces.Any(x=>x.Value.State!.Events!.Any(y=>y.Type == "m.space.child" && y.StateKey == x.Key)))
- // .ToList();
-
- foreach (var (roomId, room) in rootSpaces) {
- var space = new SpaceNode { Name = (room.State!.Events!.First(x => x.Type == "m.room.name")!.TypedContent as RoomNameEventContent).Name };
- DisplayedSpaces.Add(space);
- handledRoomIds.Add(roomId);
+
+ await AwaitTasks(tasks, "Waiting for {0}/{1} tasks while applying room changes...");
+ }
+
+ private ExpiringSemaphoreCache<UserProfileResponse> _profileCache = new();
+
+ private async Task ApplyDirectMessagesChanges(StateEventResponse evt) {
+ _logger.LogCritical("Direct messages updated!");
+ var dms = evt.RawContent.Deserialize<Dictionary<string, string[]?>>();
+ List<Task> tasks = [];
+ foreach (var (userId, roomIds) in dms) {
+ if (roomIds is null || roomIds.Length == 0) continue;
+ var space = DirectMessages.ChildSpaces.FirstOrDefault(x => x.RoomID == userId);
+ if (space is null) {
+ space = new SpaceNode { Name = userId, RoomID = userId };
+ // tasks.Add(_authService.Homeserver!.GetProfileAsync(userId)
+ // .ContinueWith(r => space.Name = string.IsNullOrWhiteSpace(r.Result.DisplayName) ? userId : r.Result.DisplayName));
+ tasks.Add(_profileCache.GetOrAdd(userId, async () => await _authService.Homeserver!.GetProfileAsync(userId), TimeSpan.FromMinutes(5))
+ .ContinueWith(r => {
+ if (!r.IsFaulted)
+ return string.IsNullOrWhiteSpace(r.Result.DisplayName) ? userId : r.Result.DisplayName;
+ return userId;
+ }));
+ DirectMessages.ChildSpaces.Add(space);
+ }
+
+ foreach (var roomId in roomIds) {
+ var room = space.ChildRooms.FirstOrDefault(x => x.RoomID == roomId);
+ if (room is null) {
+ room = AllRooms.TryGetValue(roomId, out var existing) ? existing : new RoomNode { Name = "Unknown: " + roomId, RoomID = roomId };
+ space.ChildRooms.Add(room);
+ }
+ }
+
+ foreach (var spaceChildRoom in space.ChildRooms.ToList()) {
+ if (!roomIds.Contains(spaceChildRoom.RoomID)) {
+ space.ChildRooms.Remove(spaceChildRoom);
+ }
+ }
}
+
+ await AwaitTasks(tasks, "Waiting for {0}/{1} tasks while applying DM changes...");
}
-}
-public class SpaceNode : RoomNode {
- public ObservableCollection<SpaceNode> ChildSpaces { get; set; } = [];
- public ObservableCollection<RoomNode> ChildRooms { get; set; } = [];
-}
+ private async Task AwaitTasks(List<Task> tasks, string message) {
+ if (tasks.Count > 0) {
+ int total = tasks.Count;
+ while (tasks.Any(x => !x.IsCompleted)) {
+ int incomplete = tasks.Count(x => !x.IsCompleted);
+ // Program.Beep((short)MathUtil.Map(incomplete, 0, total, 20, 7500), 5);
+ // Program.Beep(0, 0);
+ if (incomplete < 10 || incomplete % (total / 10) == 0)
+ Status = string.Format(message, incomplete, total);
+ await Task.WhenAny(tasks);
+ tasks.RemoveAll(x => x.IsCompleted);
+ }
-public class RoomNode {
- public string Name { get; set; }
+ Program.Beep(0, 0);
+ }
+ }
}
// implementation details
public class AllRoomsSpaceNode : SpaceNode {
- public AllRoomsSpaceNode(ClientViewModel vm) {
+ public AllRoomsSpaceNode(ClientViewModel vm) : base(false) {
Name = "All rooms";
vm.AllRooms.CollectionChanged += (_, args) => {
switch (args.Action) {
|