diff options
m--------- | ArcaneLibs | 0 | ||||
-rw-r--r-- | LibMatrix/Extensions/EnumerableExtensions.cs | 90 | ||||
-rw-r--r-- | LibMatrix/Extensions/MatrixHttpClient.Single.cs | 42 | ||||
-rw-r--r-- | LibMatrix/Helpers/SyncHelper.cs | 26 | ||||
-rw-r--r-- | LibMatrix/Helpers/SyncStateResolver.cs | 251 | ||||
-rw-r--r-- | LibMatrix/Responses/SyncResponse.cs | 6 | ||||
-rw-r--r-- | LibMatrix/RoomTypes/GenericRoom.cs | 10 | ||||
-rw-r--r-- | Tests/LibMatrix.HomeserverEmulator/Services/RoomStore.cs | 2 | ||||
-rw-r--r-- | Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs | 4 | ||||
-rw-r--r-- | Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs | 48 |
10 files changed, 322 insertions, 157 deletions
diff --git a/ArcaneLibs b/ArcaneLibs -Subproject 2befb4f38c101f71a885acf8f6d442f8ace34dd +Subproject b7685c786b29e7f8ae2db6ff0f79a52efc57020 diff --git a/LibMatrix/Extensions/EnumerableExtensions.cs b/LibMatrix/Extensions/EnumerableExtensions.cs index 42d9491..ace2c0c 100644 --- a/LibMatrix/Extensions/EnumerableExtensions.cs +++ b/LibMatrix/Extensions/EnumerableExtensions.cs @@ -1,29 +1,91 @@ +using System.Collections.Frozen; +using System.Collections.Immutable; + namespace LibMatrix.Extensions; public static class EnumerableExtensions { + public static int insertions = 0; + public static int replacements = 0; + public static void MergeStateEventLists(this IList<StateEvent> oldState, IList<StateEvent> newState) { - foreach (var stateEvent in newState) { - var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); - if (old is null) { - oldState.Add(stateEvent); - continue; + // foreach (var stateEvent in newState) { + // var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); + // if (old is null) { + // oldState.Add(stateEvent); + // continue; + // } + // + // oldState.Remove(old); + // oldState.Add(stateEvent); + // } + + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + break; + case var index: + oldState[index] = e; + break; } + } - oldState.Remove(old); - oldState.Add(stateEvent); + int FindIndex(StateEvent needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; + } + + return -1; } } public static void MergeStateEventLists(this IList<StateEventResponse> oldState, IList<StateEventResponse> newState) { - foreach (var stateEvent in newState) { - var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); - if (old is null) { - oldState.Add(stateEvent); - continue; + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + break; + case var index: + oldState[index] = e; + break; + } + } + + int FindIndex(StateEventResponse needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; + } + + return -1; + } + } + + public static void MergeStateEventLists(this List<StateEventResponse> oldState, List<StateEventResponse> newState) { + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + insertions++; + break; + case var index: + oldState[index] = e; + replacements++; + break; + } + } + + int FindIndex(StateEventResponse needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; } - oldState.Remove(old); - oldState.Add(stateEvent); + return -1; } } } \ No newline at end of file diff --git a/LibMatrix/Extensions/MatrixHttpClient.Single.cs b/LibMatrix/Extensions/MatrixHttpClient.Single.cs index 3c8aea4..7548a2c 100644 --- a/LibMatrix/Extensions/MatrixHttpClient.Single.cs +++ b/LibMatrix/Extensions/MatrixHttpClient.Single.cs @@ -50,6 +50,7 @@ public class MatrixHttpClient { internal SemaphoreSlim _rateLimitSemaphore { get; } = new(1, 1); #endif + private const bool LogRequests = true; public Dictionary<string, string> AdditionalQueryParameters { get; set; } = new(); public Uri? BaseAddress { get; set; } @@ -71,20 +72,21 @@ public class MatrixHttpClient { public async Task<HttpResponseMessage> SendUnhandledAsync(HttpRequestMessage request, CancellationToken cancellationToken) { if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null"); // if (!request.RequestUri.IsAbsoluteUri) - request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!); + request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!); var swWait = Stopwatch.StartNew(); #if SYNC_HTTPCLIENT await _rateLimitSemaphore.WaitAsync(cancellationToken); #endif swWait.Stop(); var swExec = Stopwatch.StartNew(); - + foreach (var (key, value) in AdditionalQueryParameters) request.RequestUri = request.RequestUri.AddQuery(key, value); foreach (var (key, value) in DefaultRequestHeaders) request.Headers.Add(key, value); request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true); - Console.WriteLine("Sending " + request.Summarise(includeHeaders:true, includeQuery: true, includeContentIfText: true)); - + if (LogRequests) + Console.WriteLine("Sending " + request.Summarise(includeHeaders: true, includeQuery: true, includeContentIfText: true, hideHeaders: ["Accept"])); + HttpResponseMessage? responseMessage; try { responseMessage = await Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); @@ -101,19 +103,25 @@ public class MatrixHttpClient { #endif // Console.WriteLine($"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.GetContentLength())}, WAIT={swWait.ElapsedMilliseconds}ms, EXEC={swExec.ElapsedMilliseconds}ms)"); - Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [ - "Server", - "Date", - "Transfer-Encoding", - "Connection", - "Vary", - "Content-Length", - "Access-Control-Allow-Origin", - "Access-Control-Allow-Methods", - "Access-Control-Allow-Headers", - "Access-Control-Expose-Headers", - "Cache-Control" - ])); + if (LogRequests) + Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [ + "Server", + "Date", + "Transfer-Encoding", + "Connection", + "Vary", + "Content-Length", + "Access-Control-Allow-Origin", + "Access-Control-Allow-Methods", + "Access-Control-Allow-Headers", + "Access-Control-Expose-Headers", + "Cache-Control", + "Cross-Origin-Resource-Policy", + "X-Content-Security-Policy", + "Referrer-Policy", + "X-Robots-Tag", + "Content-Security-Policy" + ])); return responseMessage; } diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs index 7d5364b..e8a8e70 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs @@ -1,5 +1,7 @@ using System.Diagnostics; using System.Net.Http.Json; +using System.Text.Json; +using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; using LibMatrix.Homeservers; @@ -18,6 +20,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? Since { get; set; } public int Timeout { get; set; } = 30000; public string? SetPresence { get; set; } = "online"; + public bool UseInternalStreamingSync { get; set; } = true; public string? FilterId { get => _filterId; @@ -110,13 +113,22 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg // logger?.LogInformation("SyncHelper: Calling: {}", url); try { - var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); - if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); - logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); - var deserializeSw = Stopwatch.StartNew(); - var resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, - jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); - logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed); + SyncResponse? resp = null; + if (UseInternalStreamingSync) { + resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None); + logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed); + } + else { + var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); + if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); + logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); + var deserializeSw = Stopwatch.StartNew(); + // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None); + // var resp = jsonResp.Deserialize<SyncResponse>(); + resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, + jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); + logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed); + } var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs index 3be4492..282d26f 100644 --- a/LibMatrix/Helpers/SyncStateResolver.cs +++ b/LibMatrix/Helpers/SyncStateResolver.cs @@ -1,6 +1,7 @@ using System.Collections.Frozen; using System.Collections.Immutable; using System.Diagnostics; +using System.Text; using ArcaneLibs.Extensions; using LibMatrix.Extensions; using LibMatrix.Filters; @@ -40,15 +41,16 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge return (sync, MergedState); } - public async Task OptimiseStore() { + public async Task OptimiseStore(Action<int, int>? progressCallback = null) { if (storageProvider is null) return; if (!await storageProvider.ObjectExistsAsync("init")) return; var totalSw = Stopwatch.StartNew(); Console.Write("Optimising sync store..."); var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init"); - var keys = (await storageProvider.GetAllKeysAsync()).Where(x=>!x.StartsWith("old/")).ToFrozenSet(); + var keys = (await storageProvider.GetAllKeysAsync()).Where(x => !x.StartsWith("old/")).ToFrozenSet(); var count = keys.Count - 1; + int total = count; Console.WriteLine($"Found {count} entries to optimise."); var merged = await initLoadTask; @@ -64,6 +66,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge var moveTasks = new List<Task>(); + Dictionary<string, Dictionary<string, TimeSpan>> traces = []; while (keys.Contains(merged.NextBatch)) { Console.Write($"Merging {merged.NextBatch}, {--count} remaining... "); var sw = Stopwatch.StartNew(); @@ -77,28 +80,36 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}")); Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); - merged = MergeSyncs(merged, next); + var trace = new Dictionary<string, TimeSpan>(); + traces[merged.NextBatch] = trace; + merged = MergeSyncs(merged, next, trace); Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms"); // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining..."); + progressCallback?.Invoke(count, total); } + var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}")); + var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString)); + await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms); + await storageProvider.SaveObjectAsync("init", merged); await Task.WhenAll(moveTasks); - + Console.WriteLine($"Optimised store in {totalSw.Elapsed.TotalMilliseconds}ms"); + Console.WriteLine($"Insertions: {EnumerableExtensions.insertions}, replacements: {EnumerableExtensions.replacements}"); } /// <summary> /// Remove all but initial sync and last checkpoint /// </summary> public async Task RemoveOldSnapshots() { - if(storageProvider is null) return; + if (storageProvider is null) return; var sw = Stopwatch.StartNew(); var map = await GetCheckpointMap(); if (map is null) return; - if(map.Count < 3) return; + if (map.Count < 3) return; var toRemove = map.Keys.Skip(1).Take(map.Count - 2).ToList(); Console.Write("Cleaning up old snapshots: "); @@ -109,6 +120,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge await storageProvider?.DeleteObjectAsync(path); } } + Console.WriteLine("Done!"); Console.WriteLine($"Removed {toRemove.Count} old snapshots in {sw.Elapsed.TotalMilliseconds}ms"); } @@ -137,6 +149,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge // Console.WriteLine($"[{++i}] {key} -> {resp.NextBatch} ({resp.GetDerivedSyncTime()})"); i++; } + Console.WriteLine($"Iterated {i} syncResponses in {sw.Elapsed}"); Environment.Exit(0); } @@ -188,7 +201,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge if (resp.GetDerivedSyncTime() > unixTime) break; merged = MergeSyncs(merged, resp); } - + return merged; } @@ -208,29 +221,29 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge return map.OrderBy(x => x.Key).ToImmutableSortedDictionary(x => x.Key, x => x.Value.ToFrozenSet()); } - private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync) { + private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) { + var sw = Stopwatch.StartNew(); oldSync.NextBatch = newSync.NextBatch ?? oldSync.NextBatch; - oldSync.AccountData ??= new EventList(); - oldSync.AccountData.Events ??= []; - if (newSync.AccountData?.Events is not null) - oldSync.AccountData.Events.MergeStateEventLists(newSync.AccountData?.Events ?? []); + oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData); + trace?.Add("AccountData", sw.GetElapsedAndRestart()); - oldSync.Presence ??= new(); - oldSync.Presence.Events?.ReplaceBy(newSync.Presence?.Events ?? [], (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type); + oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type); + trace?.Add("Presence", sw.GetElapsedAndRestart()); + // TODO: can this be cleaned up? oldSync.DeviceOneTimeKeysCount ??= new(); if (newSync.DeviceOneTimeKeysCount is not null) foreach (var (key, value) in newSync.DeviceOneTimeKeysCount) oldSync.DeviceOneTimeKeysCount[key] = value; + trace?.Add("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart()); if (newSync.Rooms is not null) - oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms); + oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, trace); + trace?.Add("Rooms", sw.GetElapsedAndRestart()); - oldSync.ToDevice ??= new EventList(); - oldSync.ToDevice.Events ??= []; - if (newSync.ToDevice?.Events is not null) - oldSync.ToDevice.Events.MergeStateEventLists(newSync.ToDevice?.Events ?? []); + oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice); + trace?.Add("ToDevice", sw.GetElapsedAndRestart()); oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure(); oldSync.DeviceLists.Changed ??= []; @@ -241,125 +254,171 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge oldSync.DeviceLists.Changed.Add(s); } + trace?.Add("DeviceLists.Changed", sw.GetElapsedAndRestart()); + if (newSync.DeviceLists?.Left is not null) foreach (var s in newSync.DeviceLists.Left!) { oldSync.DeviceLists.Changed.Remove(s); oldSync.DeviceLists.Left.Add(s); } - return oldSync; - } - - private List<StateEventResponse>? MergePresenceEvents(List<StateEventResponse>? oldEvents, List<StateEventResponse>? newEvents) { - if (oldEvents is null) return newEvents; - if (newEvents is null) return oldEvents; + trace?.Add("DeviceLists.Left", sw.GetElapsedAndRestart()); - foreach (var newEvent in newEvents) { - oldEvents.RemoveAll(x => x.Sender == newEvent.Sender && x.Type == newEvent.Type); - oldEvents.Add(newEvent); - } - - return oldEvents; + return oldSync; } #region Merge rooms - private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState) { + private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState, + Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); if (oldState is null) return newState; - oldState.Join ??= new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>(); - foreach (var (key, value) in newState.Join ?? new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>()) - if (!oldState.Join.ContainsKey(key)) oldState.Join[key] = value; - else oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value); - - oldState.Invite ??= new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>(); - foreach (var (key, value) in newState.Invite ?? new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>()) - if (!oldState.Invite.ContainsKey(key)) oldState.Invite[key] = value; - else oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value); - - oldState.Leave ??= new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>(); - foreach (var (key, value) in newState.Leave ?? new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>()) { - if (!oldState.Leave.ContainsKey(key)) oldState.Leave[key] = value; - else oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value); - if (oldState.Invite.ContainsKey(key)) oldState.Invite.Remove(key); - if (oldState.Join.ContainsKey(key)) oldState.Join.Remove(key); - } + + if (newState.Join is { Count: > 0 }) + if (oldState.Join is null) + oldState.Join = newState.Join; + else + foreach (var (key, value) in newState.Join) + if (!oldState.Join.TryAdd(key, value)) + oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, trace); + trace?.Add("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart()); + + if (newState.Invite is { Count: > 0 }) + if (oldState.Invite is null) + oldState.Invite = newState.Invite; + else + foreach (var (key, value) in newState.Invite) + if (!oldState.Invite.TryAdd(key, value)) + oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, trace); + trace?.Add("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart()); + + if (newState.Leave is { Count: > 0 }) + if (oldState.Leave is null) + oldState.Leave = newState.Leave; + else + foreach (var (key, value) in newState.Leave) { + if (!oldState.Leave.TryAdd(key, value)) + oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, trace); + if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key); + if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key); + } + trace?.Add("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart()); return oldState; } private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData) { - oldData.AccountData ??= new EventList(); - oldData.AccountData.Events ??= []; - oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure(); - oldData.Timeline.Events ??= []; - oldData.State ??= new EventList(); - oldData.State.Events ??= []; - - if (newData.AccountData?.Events is not null) - oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? []); - - if (newData.Timeline?.Events is not null) - oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? []); + SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); + + oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData); + trace?.Add($"LeftRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); + + oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure + ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure"); oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited; oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch; + trace?.Add($"LeftRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.State?.Events is not null) - oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? []); + oldData.State = MergeEventList(oldData.State, newData.State); + trace?.Add($"LeftRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData) { - oldData.InviteState ??= new EventList(); - oldData.InviteState.Events ??= []; - if (newData.InviteState?.Events is not null) - oldData.InviteState.Events.MergeStateEventLists(newData.InviteState?.Events ?? []); + SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); + oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState); + trace?.Add($"InvitedRoomDataStructure.InviteState/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData) { - oldData.AccountData ??= new EventList(); - oldData.AccountData.Events ??= []; - oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure(); - oldData.Timeline.Events ??= []; - oldData.State ??= new EventList(); - oldData.State.Events ??= []; - oldData.Ephemeral ??= new EventList(); - oldData.Ephemeral.Events ??= []; - - if (newData.AccountData?.Events is not null) - oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? []); - - if (newData.Timeline?.Events is not null) - oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? []); + SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); + + oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData); + trace?.Add($"JoinedRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); + + oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure + ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure"); oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited; oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch; + trace?.Add($"JoinedRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.State?.Events is not null) - oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? []); + oldData.State = MergeEventList(oldData.State, newData.State); + trace?.Add($"JoinedRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.Ephemeral?.Events is not null) - oldData.Ephemeral.Events.MergeStateEventLists(newData.Ephemeral?.Events ?? []); + oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral); + trace?.Add($"JoinedRoomDataStructure.Ephemeral/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure(); oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount; oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount; + trace?.Add($"JoinedRoom$DataStructure.UnreadNotifications/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); + + if (oldData.Summary is null) + oldData.Summary = newData.Summary; + else { + oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes; + oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount; + oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount; + } - oldData.Summary ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.SummaryDataStructure { - Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes, - JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount, - InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount - }; - oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes; - oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount; - oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount; + trace?.Add($"JoinedRoomDataStructure.Summary/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } #endregion + + private static EventList? MergeEventList(EventList? oldState, EventList? newState) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.MergeStateEventLists(newState.Events); + return oldState; + } + + private static EventList? MergeEventListBy(EventList? oldState, EventList? newState, Func<StateEventResponse, StateEventResponse, bool> comparer) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.ReplaceBy(newState.Events, comparer); + return oldState; + } + + private static EventList? AppendEventList(EventList? oldState, EventList? newState) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.AddRange(newState.Events); + return oldState; + } } \ No newline at end of file diff --git a/LibMatrix/Responses/SyncResponse.cs b/LibMatrix/Responses/SyncResponse.cs index 2d3d3f8..f2b901d 100644 --- a/LibMatrix/Responses/SyncResponse.cs +++ b/LibMatrix/Responses/SyncResponse.cs @@ -86,7 +86,7 @@ public class SyncResponse { [JsonPropertyName("summary")] public SummaryDataStructure? Summary { get; set; } - public class TimelineDataStructure { + public class TimelineDataStructure : EventList { public TimelineDataStructure() { } public TimelineDataStructure(List<StateEventResponse>? events, bool? limited) { @@ -94,8 +94,8 @@ public class SyncResponse { Limited = limited; } - [JsonPropertyName("events")] - public List<StateEventResponse>? Events { get; set; } + // [JsonPropertyName("events")] + // public List<StateEventResponse>? Events { get; set; } [JsonPropertyName("prev_batch")] public string? PrevBatch { get; set; } diff --git a/LibMatrix/RoomTypes/GenericRoom.cs b/LibMatrix/RoomTypes/GenericRoom.cs index 4f6a4e9..4324dc3 100644 --- a/LibMatrix/RoomTypes/GenericRoom.cs +++ b/LibMatrix/RoomTypes/GenericRoom.cs @@ -1,6 +1,7 @@ using System.Collections.Frozen; using System.Diagnostics; using System.Net.Http.Json; +using System.Security.Cryptography; using System.Text.Json; using System.Text.Json.Nodes; using System.Text.Json.Serialization; @@ -211,10 +212,11 @@ public class GenericRoom { public async Task<RoomIdResponse> JoinAsync(string[]? homeservers = null, string? reason = null, bool checkIfAlreadyMember = true) { if (checkIfAlreadyMember) try { - _ = await GetCreateEventAsync(); - return new RoomIdResponse { - RoomId = RoomId - }; + var ser = await GetStateEventOrNullAsync(RoomMemberEventContent.EventId, Homeserver.UserId); + if (ser?.TypedContent is RoomMemberEventContent { Membership: "join" }) + return new RoomIdResponse { + RoomId = RoomId + }; } catch { } //ignore diff --git a/Tests/LibMatrix.HomeserverEmulator/Services/RoomStore.cs b/Tests/LibMatrix.HomeserverEmulator/Services/RoomStore.cs index 5cdc3ab..c798cce 100644 --- a/Tests/LibMatrix.HomeserverEmulator/Services/RoomStore.cs +++ b/Tests/LibMatrix.HomeserverEmulator/Services/RoomStore.cs @@ -137,7 +137,7 @@ public class RoomStore { public Room(string roomId) { if (string.IsNullOrWhiteSpace(roomId)) throw new ArgumentException("Value cannot be null or whitespace.", nameof(roomId)); - if (roomId[0] != '!') throw new ArgumentException("Room ID must start with !", nameof(roomId)); + if (roomId[0] != '!') throw new ArgumentException($"Room ID must start with '!', provided value: {roomId ?? "null"}", nameof(roomId)); RoomId = roomId; Timeline = new(); AccountData = new(); diff --git a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs index 215f28a..5a0b7ad 100644 --- a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs +++ b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs @@ -19,7 +19,7 @@ public class BotInstaller(IServiceCollection services) { public BotInstaller AddMatrixBot() { services.AddSingleton<LibMatrixBotConfiguration>(); - services.AddScoped<AuthenticatedHomeserverGeneric>(x => { + services.AddSingleton<AuthenticatedHomeserverGeneric>(x => { var config = x.GetService<LibMatrixBotConfiguration>() ?? throw new Exception("No configuration found!"); var hsProvider = x.GetService<HomeserverProviderService>() ?? throw new Exception("No homeserver provider found!"); var hs = hsProvider.GetAuthenticatedWithToken(config.Homeserver, config.AccessToken).Result; @@ -37,7 +37,7 @@ public class BotInstaller(IServiceCollection services) { } public BotInstaller DiscoverAllCommands() { - foreach (var commandClass in new ClassCollector<ICommand>().ResolveFromAllAccessibleAssemblies()) { + foreach (var commandClass in ClassCollector<ICommand>.ResolveFromAllAccessibleAssemblies()) { Console.WriteLine($"Adding command {commandClass.Name}"); services.AddScoped(typeof(ICommand), commandClass); } diff --git a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs index 601e598..9a7585e 100644 --- a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs +++ b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs @@ -59,22 +59,44 @@ public class CommandListenerHostedService : IHostedService { FilterId = filter }; - syncHelper.TimelineEventHandlers.Add(async @event => { - try { - var room = _hs.GetRoom(@event.RoomId); - // _logger.LogInformation(eventResponse.ToJson(indent: false)); - if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message }) - if (message is { MessageType: "m.text" }) { - var usedPrefix = await GetUsedPrefix(@event); - if (usedPrefix is null) return; - var res = await InvokeCommand(@event, usedPrefix); - await (_commandResultHandler?.Invoke(res) ?? HandleResult(res)); + syncHelper.SyncReceivedHandlers.Add(async sync => { + _logger.LogInformation("Sync received!"); + foreach (var roomResp in sync.Rooms?.Join ?? []) { + if (roomResp.Value.Timeline?.Events is null or { Count: > 5 }) continue; + foreach (var @event in roomResp.Value.Timeline.Events) { + @event.RoomId = roomResp.Key; + try { + // var room = _hs.GetRoom(@event.RoomId); + // _logger.LogInformation(eventResponse.ToJson(indent: false)); + if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message }) + if (message is { MessageType: "m.text" }) { + var usedPrefix = await GetUsedPrefix(@event); + if (usedPrefix is null) return; + var res = await InvokeCommand(@event, usedPrefix); + await (_commandResultHandler?.Invoke(res) ?? HandleResult(res)); + } } - } - catch (Exception e) { - _logger.LogError(e, "Error in command listener!"); + catch (Exception e) { + _logger.LogError(e, "Error in command listener!"); + Console.WriteLine(@event.ToJson(ignoreNull: false, indent: true)); + var fakeResult = new CommandResult() { + Result = CommandResult.CommandResultType.Failure_Exception, + Exception = e, + Success = false, + Context = new() { + Homeserver = _hs, + CommandName = "[CommandListener.SyncHandler]", + Room = _hs.GetRoom(roomResp.Key), + Args = [], + MessageEvent = @event + } + }; + await (_commandResultHandler?.Invoke(fakeResult) ?? HandleResult(fakeResult)); + } + } } }); + await syncHelper.RunSyncLoopAsync(cancellationToken: cancellationToken); } |