about summary refs log tree commit diff
path: root/MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2024-05-14 17:49:09 +0200
committerRory& <root@rory.gay>2024-05-14 17:49:09 +0200
commit41c5a84dacfd036b8d8f01f72226ac5a519995e3 (patch)
treea4bfc76541692cbbb0fc18f34463cf31a57440f5 /MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor
parentImprove the heatmap layout (diff)
downloadMatrixUtils-41c5a84dacfd036b8d8f01f72226ac5a519995e3.tar.xz
Organise tools somewhat, set proper icons for nav menu
Diffstat (limited to 'MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor')
-rw-r--r--MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor201
1 files changed, 201 insertions, 0 deletions
diff --git a/MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor b/MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor
new file mode 100644
index 0000000..91f228d
--- /dev/null
+++ b/MatrixUtils.Web/Pages/Labs/Rooms2/Index2Components/RoomsIndex2SyncContainer.razor
@@ -0,0 +1,201 @@
+@using LibMatrix.Helpers
+@using LibMatrix.Responses
+@using MatrixUtils.Abstractions
+@using System.Diagnostics
+@using LibMatrix.EventTypes.Spec.State
+@using LibMatrix.Extensions
+@using LibMatrix.Utilities
+@using System.Collections.ObjectModel
+@using ArcaneLibs
+@inject ILogger<RoomsIndex2SyncContainer> logger
+<pre>RoomsIndex2SyncContainer</pre>
+@foreach (var (name, value) in _statusList) {
+    <pre>[@name] @value.Status</pre>
+}
+
+@code {
+
+    [Parameter]
+    public Index2.RoomListViewData Data { get; set; } = null!;
+
+    private SyncHelper syncHelper;
+
+    private Queue<KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>> queue = new();
+
+    private ObservableCollection<(string name, ObservableStatus value)> _statusList = new();
+
+    protected override async Task OnInitializedAsync() {
+        _statusList.CollectionChanged += (sender, args) => {
+            StateHasChanged();
+            if (args.NewItems is { Count: > 0 })
+                foreach (var item in args.NewItems) {
+                    if (item is not (string name, ObservableStatus value)) continue;
+                    value.PropertyChanged += (sender, args) => {
+                        if(value.Show) StateHasChanged();
+                    };
+                }
+        };
+
+        while (Data.Homeserver is null) {
+            await Task.Delay(100);
+        }
+
+        await SetUpSync();
+    }
+
+    private async Task SetUpSync() {
+        var status = await GetOrAddStatus("Main");
+        var syncHelpers = new Dictionary<string, SyncHelper>() {
+            ["Main"] = new SyncHelper(Data.Homeserver, logger) {
+                Timeout = 30000,
+                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetBasicRoomInfo),
+                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
+            }
+        };
+        status.Status = "Initial sync... Checking server filter capability...";
+        var syncRes = await syncHelpers["Main"].SyncAsync();
+        if (!syncRes.Rooms?.Join?.Any(x => x.Value.State?.Events?.Any(y => y.Type == SpaceChildEventContent.EventId) ?? false) ?? true) {
+            status.Status = "Initial sync indicates that server supports filters, starting helpers!";
+            syncHelpers.Add("SpaceRelations", new SyncHelper(Data.Homeserver, logger) {
+                Timeout = 30000,
+                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetSpaceRelations),
+                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
+            });
+
+            syncHelpers.Add("Profile", new SyncHelper(Data.Homeserver, logger) {
+                Timeout = 30000,
+                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetOwnMemberEvents),
+                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
+            });
+        }
+        else status.Status = "Initial sync indicates that server does not support filters, continuing without extra filters!";
+
+        await HandleSyncResponse(syncRes);
+
+        //  profileSyncHelper = new SyncHelper(Homeserver, logger) {
+        //     Timeout = 10000,
+        //     Filter = profileUpdateFilter,
+        //     MinimumDelay = TimeSpan.FromMilliseconds(5000)
+        //  };
+        // profileUpdateFilter.Room.State.Senders.Add(Homeserver.WhoAmI.UserId);
+        RunQueueProcessor();
+        foreach (var helper in syncHelpers) {
+            Console.WriteLine($"Starting sync loop for {helper.Key}");
+            RunSyncLoop(helper.Value, helper.Key);
+        }
+    }
+
+    private async Task RunQueueProcessor() {
+        var status = await GetOrAddStatus("QueueProcessor");
+        var statusd = await GetOrAddStatus("QueueProcessor/D", show: false);
+        while (true) {
+            await Task.Delay(1000);
+            try {
+                var renderTimeSw = Stopwatch.StartNew();
+                while (queue.Count == 0) {
+                    var delay = 1000;
+                    Console.WriteLine("Queue is empty, waiting...");
+                    // Status2 = $"Queue is empty, waiting for {delay}ms...";
+                    await Task.Delay(delay);
+                }
+
+                status.Status = $"Queue no longer empty after {renderTimeSw.Elapsed}!";
+                renderTimeSw.Restart();
+
+                int maxUpdates = 5000;
+                while (maxUpdates-- > 0 && queue.TryDequeue(out var queueEntry)) {
+                    var (roomId, roomData) = queueEntry;
+                    statusd.Status = $"Dequeued room {roomId}";
+                    RoomInfo room;
+
+                    if (Data.Rooms.Any(x => x.Room.RoomId == roomId)) {
+                        room = Data.Rooms.First(x => x.Room.RoomId == roomId);
+                        statusd.Status = $"{roomId} already known with {room.StateEvents?.Count ?? 0} state events";
+                    }
+                    else {
+                        statusd.Status = $"Eencountered new room {roomId}!";
+                        room = new RoomInfo(Data.Homeserver!.GetRoom(roomId), roomData.State?.Events);
+                        Data.Rooms.Add(room);
+                    }
+
+                    if (roomData.State?.Events is { Count: > 0 })
+                        room.StateEvents!.MergeStateEventLists(roomData.State.Events);
+                    else {
+                        statusd.Status = $"Could not merge state for {room.Room.RoomId} as new data contains no state events!";
+                    }
+
+                    // await Task.Delay(10);
+                }
+
+                status.Status = $"Got {Data.Rooms.Count} rooms so far! {queue.Count} entries left in processing queue... Parsed last response in {renderTimeSw.Elapsed}";
+
+                // RenderContents |= queue.Count == 0;
+                // await Task.Delay(Data.Rooms.Count);
+            }
+            catch (Exception e) {
+                Console.WriteLine("QueueWorker exception: " + e);
+            }
+        }
+    }
+
+    private async Task RunSyncLoop(SyncHelper syncHelper, string name = "Unknown") {
+        var status = await GetOrAddStatus($"SYNC/{name}");
+        status.Status = $"Initial syncing...";
+
+        var syncs = syncHelper.EnumerateSyncAsync();
+        await foreach (var sync in syncs) {
+            var sw = Stopwatch.StartNew();
+            status.Status = $"[{DateTime.Now}] Got {Data.Rooms.Count} rooms so far! {sync.Rooms?.Join?.Count ?? 0} new updates!";
+
+            await HandleSyncResponse(sync);
+            status.Status += $"\nProcessed sync in {sw.ElapsedMilliseconds}ms, queue length: {queue.Count}";
+        }
+    }
+
+    private async Task HandleSyncResponse(SyncResponse? sync) {
+        if (sync?.Rooms?.Join is { Count: > 0 })
+            foreach (var joinedRoom in sync.Rooms.Join)
+                queue.Enqueue(joinedRoom);
+
+        if (sync.Rooms.Leave is { Count: > 0 })
+            foreach (var leftRoom in sync.Rooms.Leave)
+                if (Data.Rooms.Any(x => x.Room.RoomId == leftRoom.Key))
+                    Data.Rooms.Remove(Data.Rooms.First(x => x.Room.RoomId == leftRoom.Key));
+    }
+
+    private SemaphoreSlim _syncLock = new(1, 1);
+
+    private async Task<ObservableStatus> GetOrAddStatus(string name, bool show = true, bool log = true) {
+        await _syncLock.WaitAsync();
+        try {
+            if (_statusList.Any(x => x.name == name))
+                return _statusList.First(x => x.name == name).value;
+            var status = new ObservableStatus() {
+                Name = name,
+                Log = log,
+                Show = show
+            };
+            _statusList.Add((name, status));
+            return status;
+        }
+        finally {
+            _syncLock.Release();
+        }
+    }
+
+    private class ObservableStatus : NotifyPropertyChanged {
+        private string _status = "Initialising...";
+        public string Name { get; set; } = "Unknown";
+        public bool Show { get; set; } = true;
+        public bool Log { get; set; } = true;
+
+        public string Status {
+            get => _status;
+            set {
+                if(SetField(ref _status, value) && Log)
+                    Console.WriteLine($"[{Name}]: {value}");
+            }
+        }
+    }
+
+}
\ No newline at end of file