about summary refs log tree commit diff
path: root/MatrixUtils.Web/Pages/Rooms/Index2Components/RoomsIndex2SyncContainer.razor
blob: 1fb3f893e6981c67ce82f2a4811456364e020b82 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@using LibMatrix.Helpers
@using LibMatrix.Responses
@using MatrixUtils.Abstractions
@using System.Diagnostics
@using System.Diagnostics.CodeAnalysis
@using LibMatrix.EventTypes.Spec.State
@using LibMatrix.Extensions
@using LibMatrix.Utilities
@using System.Collections.ObjectModel
@using ArcaneLibs
@inject ILogger<RoomsIndex2SyncContainer> logger
<pre>RoomsIndex2SyncContainer</pre>
@foreach (var (name, value) in _statusList) {
    <pre>[@name] @value.Status</pre>
}

@code {

    [Parameter]
    public Index2.RoomListViewData Data { get; set; } = null!;

    private SyncHelper syncHelper;

    private Queue<KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>> queue = new();

    private ObservableCollection<(string name, ObservableStatus value)> _statusList = new();

    protected override async Task OnInitializedAsync() {
        _statusList.CollectionChanged += (sender, args) => {
            StateHasChanged();
            if (args.NewItems is { Count: > 0 })
                foreach (var item in args.NewItems) {
                    if (item is not (string name, ObservableStatus value)) continue;
                    value.PropertyChanged += (sender, args) => {
                        if(value.Show) StateHasChanged();
                    };
                }
        };

        while (Data.Homeserver is null) {
            await Task.Delay(100);
        }

        await SetUpSync();
    }

    private async Task SetUpSync() {
        var status = await GetOrAddStatus("Main");
        var syncHelpers = new Dictionary<string, SyncHelper>() {
            ["Main"] = new SyncHelper(Data.Homeserver, logger) {
                Timeout = 30000,
                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetBasicRoomInfo),
                // FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetBasicRoomInfo),
                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
            }
        };
        status.Status = "Initial sync... Checking server filter capability...";
        var syncRes = await syncHelpers["Main"].SyncAsync();
        if (!syncRes.Rooms?.Join?.Any(x => x.Value.State?.Events?.Any(y => y.Type == SpaceChildEventContent.EventId) ?? false) ?? true) {
            status.Status = "Initial sync indicates that server supports filters, starting helpers!";
            syncHelpers.Add("SpaceRelations", new SyncHelper(Data.Homeserver, logger) {
                Timeout = 30000,
                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetSpaceRelations),
                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
            });

            syncHelpers.Add("Profile", new SyncHelper(Data.Homeserver, logger) {
                Timeout = 30000,
                FilterId = await Data.Homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(CommonSyncFilters.GetOwnMemberEvents),
                // MinimumDelay = TimeSpan.FromMilliseconds(5000)
            });
        }
        else status.Status = "Initial sync indicates that server does not support filters, continuing without extra filters!";

        await HandleSyncResponse(syncRes);

        //  profileSyncHelper = new SyncHelper(Homeserver, logger) {
        //     Timeout = 10000,
        //     Filter = profileUpdateFilter,
        //     MinimumDelay = TimeSpan.FromMilliseconds(5000)
        //  };
        // profileUpdateFilter.Room.State.Senders.Add(Homeserver.WhoAmI.UserId);
        RunQueueProcessor();
        foreach (var helper in syncHelpers) {
            Console.WriteLine($"Starting sync loop for {helper.Key}");
            RunSyncLoop(helper.Value, helper.Key);
        }
    }

    private async Task RunQueueProcessor() {
        var status = await GetOrAddStatus("QueueProcessor");
        var statusd = await GetOrAddStatus("QueueProcessor/D", show: false);
        while (true) {
            await Task.Delay(1000);
            try {
                var renderTimeSw = Stopwatch.StartNew();
                while (queue.Count == 0) {
                    var delay = 1000;
                    Console.WriteLine("Queue is empty, waiting...");
                    // Status2 = $"Queue is empty, waiting for {delay}ms...";
                    await Task.Delay(delay);
                }

                status.Status = $"Queue no longer empty after {renderTimeSw.Elapsed}!";
                renderTimeSw.Restart();

                int maxUpdates = 5000;
                while (maxUpdates-- > 0 && queue.TryDequeue(out var queueEntry)) {
                    var (roomId, roomData) = queueEntry;
                    statusd.Status = $"Dequeued room {roomId}";
                    RoomInfo room;

                    if (Data.Rooms.Any(x => x.Room.RoomId == roomId)) {
                        room = Data.Rooms.First(x => x.Room.RoomId == roomId);
                        statusd.Status = $"{roomId} already known with {room.StateEvents?.Count ?? 0} state events";
                    }
                    else {
                        statusd.Status = $"Eencountered new room {roomId}!";
                        room = new RoomInfo(Data.Homeserver!.GetRoom(roomId), roomData.State?.Events);
                        Data.Rooms.Add(room);
                    }

                    if (roomData.State?.Events is { Count: > 0 })
                        room.StateEvents!.MergeStateEventLists(roomData.State.Events);
                    else {
                        statusd.Status = $"Could not merge state for {room.Room.RoomId} as new data contains no state events!";
                    }

                    // await Task.Delay(10);
                }

                status.Status = $"Got {Data.Rooms.Count} rooms so far! {queue.Count} entries left in processing queue... Parsed last response in {renderTimeSw.Elapsed}";

                // RenderContents |= queue.Count == 0;
                // await Task.Delay(Data.Rooms.Count);
            }
            catch (Exception e) {
                Console.WriteLine("QueueWorker exception: " + e);
            }
        }
    }

    private async Task RunSyncLoop(SyncHelper syncHelper, string name = "Unknown") {
        var status = await GetOrAddStatus($"SYNC/{name}");
        status.Status = $"Initial syncing...";

        var syncs = syncHelper.EnumerateSyncAsync();
        await foreach (var sync in syncs) {
            var sw = Stopwatch.StartNew();
            status.Status = $"[{DateTime.Now}] Got {Data.Rooms.Count} rooms so far! {sync.Rooms?.Join?.Count ?? 0} new updates!";

            await HandleSyncResponse(sync);
            status.Status += $"\nProcessed sync in {sw.ElapsedMilliseconds}ms, queue length: {queue.Count}";
        }
    }

    private async Task HandleSyncResponse(SyncResponse? sync) {
        if (sync?.Rooms?.Join is { Count: > 0 })
            foreach (var joinedRoom in sync.Rooms.Join)
                queue.Enqueue(joinedRoom);

        if (sync.Rooms.Leave is { Count: > 0 })
            foreach (var leftRoom in sync.Rooms.Leave)
                if (Data.Rooms.Any(x => x.Room.RoomId == leftRoom.Key))
                    Data.Rooms.Remove(Data.Rooms.First(x => x.Room.RoomId == leftRoom.Key));
    }

    private SemaphoreSlim _syncLock = new(1, 1);

    private async Task<ObservableStatus> GetOrAddStatus(string name, bool show = true, bool log = true) {
        await _syncLock.WaitAsync();
        try {
            if (_statusList.Any(x => x.name == name))
                return _statusList.First(x => x.name == name).value;
            var status = new ObservableStatus() {
                Name = name,
                Log = log,
                Show = show
            };
            _statusList.Add((name, status));
            return status;
        }
        finally {
            _syncLock.Release();
        }
    }

    private class ObservableStatus : NotifyPropertyChanged {
        private string _status = "Initialising...";
        public string Name { get; set; } = "Unknown";
        public bool Show { get; set; } = true;
        public bool Log { get; set; } = true;

        public string Status {
            get => _status;
            set {
                if(SetField(ref _status, value) && Log)
                    Console.WriteLine($"[{Name}]: {value}");
            }
        }
    }

}