about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.idea/.idea.LibMatrix/.idea/.name1
-rw-r--r--LibMatrix/Extensions/EnumerableExtensions.cs90
-rw-r--r--LibMatrix/Extensions/MatrixHttpClient.Single.cs43
-rw-r--r--LibMatrix/Filters/LocalRoomQueryFilter.cs27
-rw-r--r--LibMatrix/Helpers/MessageBuilder.cs2
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs64
-rw-r--r--LibMatrix/Helpers/SyncStateResolver.cs455
-rw-r--r--LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs18
-rw-r--r--LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs8
-rw-r--r--LibMatrix/Homeservers/RemoteHomeServer.cs2
-rw-r--r--LibMatrix/Interfaces/Services/IStorageProvider.cs16
-rw-r--r--LibMatrix/MatrixException.cs78
-rw-r--r--LibMatrix/Responses/SyncResponse.cs30
-rw-r--r--LibMatrix/Responses/UserProfileResponse.cs15
-rw-r--r--LibMatrix/RoomTypes/GenericRoom.cs10
-rw-r--r--LibMatrix/Services/HomeserverProviderService.cs6
-rw-r--r--LibMatrix/Services/HomeserverResolverService.cs10
-rw-r--r--LibMatrix/StateEvent.cs2
-rw-r--r--Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs2
-rw-r--r--Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs2
-rw-r--r--Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs49
21 files changed, 692 insertions, 238 deletions
diff --git a/.idea/.idea.LibMatrix/.idea/.name b/.idea/.idea.LibMatrix/.idea/.name
deleted file mode 100644

index 2f382b8..0000000 --- a/.idea/.idea.LibMatrix/.idea/.name +++ /dev/null
@@ -1 +0,0 @@ -LibMatrix \ No newline at end of file diff --git a/LibMatrix/Extensions/EnumerableExtensions.cs b/LibMatrix/Extensions/EnumerableExtensions.cs
index 42d9491..ace2c0c 100644 --- a/LibMatrix/Extensions/EnumerableExtensions.cs +++ b/LibMatrix/Extensions/EnumerableExtensions.cs
@@ -1,29 +1,91 @@ +using System.Collections.Frozen; +using System.Collections.Immutable; + namespace LibMatrix.Extensions; public static class EnumerableExtensions { + public static int insertions = 0; + public static int replacements = 0; + public static void MergeStateEventLists(this IList<StateEvent> oldState, IList<StateEvent> newState) { - foreach (var stateEvent in newState) { - var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); - if (old is null) { - oldState.Add(stateEvent); - continue; + // foreach (var stateEvent in newState) { + // var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); + // if (old is null) { + // oldState.Add(stateEvent); + // continue; + // } + // + // oldState.Remove(old); + // oldState.Add(stateEvent); + // } + + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + break; + case var index: + oldState[index] = e; + break; } + } - oldState.Remove(old); - oldState.Add(stateEvent); + int FindIndex(StateEvent needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; + } + + return -1; } } public static void MergeStateEventLists(this IList<StateEventResponse> oldState, IList<StateEventResponse> newState) { - foreach (var stateEvent in newState) { - var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey); - if (old is null) { - oldState.Add(stateEvent); - continue; + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + break; + case var index: + oldState[index] = e; + break; + } + } + + int FindIndex(StateEventResponse needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; + } + + return -1; + } + } + + public static void MergeStateEventLists(this List<StateEventResponse> oldState, List<StateEventResponse> newState) { + foreach (var e in newState) { + switch (FindIndex(e)) { + case -1: + oldState.Add(e); + insertions++; + break; + case var index: + oldState[index] = e; + replacements++; + break; + } + } + + int FindIndex(StateEventResponse needle) { + for (int i = 0; i < oldState.Count; i++) { + var old = oldState[i]; + if (old.Type == needle.Type && old.StateKey == needle.StateKey) + return i; } - oldState.Remove(old); - oldState.Add(stateEvent); + return -1; } } } \ No newline at end of file diff --git a/LibMatrix/Extensions/MatrixHttpClient.Single.cs b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
index 42f81a8..a3ea409 100644 --- a/LibMatrix/Extensions/MatrixHttpClient.Single.cs +++ b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
@@ -1,5 +1,5 @@ #define SINGLE_HTTPCLIENT // Use a single HttpClient instance for all MatrixHttpClient instances -// #define SYNC_HTTPCLIENT // Only allow one request as a time, for debugging +// #define SYNC_HTTPCLIENT // Only allow one request as a time, for debugging using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Net; @@ -15,7 +15,7 @@ using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Requests; namespace LibMatrix.Extensions; #if SINGLE_HTTPCLIENT -// TODO: Add URI wrapper for +// TODO: Add URI wrapper for public class MatrixHttpClient { private static readonly HttpClient Client; @@ -51,6 +51,7 @@ public class MatrixHttpClient { internal SemaphoreSlim _rateLimitSemaphore { get; } = new(1, 1); #endif + private const bool LogRequests = true; public Dictionary<string, string> AdditionalQueryParameters { get; set; } = new(); public Uri? BaseAddress { get; set; } @@ -60,7 +61,7 @@ public class MatrixHttpClient { typeof(HttpRequestHeaders).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, [], null)?.Invoke([]) as HttpRequestHeaders ?? throw new InvalidOperationException("Failed to create HttpRequestHeaders"); - private JsonSerializerOptions GetJsonSerializerOptions(JsonSerializerOptions? options = null) { + private static JsonSerializerOptions GetJsonSerializerOptions(JsonSerializerOptions? options = null) { options ??= new JsonSerializerOptions(); options.Converters.Add(new JsonFloatStringConverter()); options.Converters.Add(new JsonDoubleStringConverter()); @@ -70,6 +71,10 @@ public class MatrixHttpClient { } public async Task<HttpResponseMessage> SendUnhandledAsync(HttpRequestMessage request, CancellationToken cancellationToken) { + if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null"); + // if (!request.RequestUri.IsAbsoluteUri) + request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!); + var swWait = Stopwatch.StartNew(); #if SYNC_HTTPCLIENT await _rateLimitSemaphore.WaitAsync(cancellationToken); #endif @@ -79,6 +84,9 @@ public class MatrixHttpClient { if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null"); if (!request.RequestUri.IsAbsoluteUri) request.RequestUri = new Uri(BaseAddress ?? throw new InvalidOperationException("Relative URI passed, but no BaseAddress is specified!"), request.RequestUri); + swWait.Stop(); + var swExec = Stopwatch.StartNew(); + foreach (var (key, value) in AdditionalQueryParameters) request.RequestUri = request.RequestUri.AddQuery(key, value); foreach (var (key, value) in DefaultRequestHeaders) { if (request.Headers.Contains(key)) continue; @@ -87,6 +95,9 @@ public class MatrixHttpClient { request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true); + if (LogRequests) + Console.WriteLine("Sending " + request.Summarise(includeHeaders: true, includeQuery: true, includeContentIfText: true, hideHeaders: ["Accept"])); + HttpResponseMessage? responseMessage; try { responseMessage = await Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); @@ -110,8 +121,26 @@ public class MatrixHttpClient { } #endif - Console.WriteLine( - $"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.Content.Headers.ContentLength ?? 0)})"); + // Console.WriteLine($"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.GetContentLength())}, WAIT={swWait.ElapsedMilliseconds}ms, EXEC={swExec.ElapsedMilliseconds}ms)"); + if (LogRequests) + Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [ + "Server", + "Date", + "Transfer-Encoding", + "Connection", + "Vary", + "Content-Length", + "Access-Control-Allow-Origin", + "Access-Control-Allow-Methods", + "Access-Control-Allow-Headers", + "Access-Control-Expose-Headers", + "Cache-Control", + "Cross-Origin-Resource-Policy", + "X-Content-Security-Policy", + "Referrer-Policy", + "X-Robots-Tag", + "Content-Security-Policy" + ])); return responseMessage; } @@ -228,7 +257,7 @@ public class MatrixHttpClient { await foreach (var resp in result) yield return resp; } - public async Task<bool> CheckSuccessStatus(string url) { + public static async Task<bool> CheckSuccessStatus(string url) { //cors causes failure, try to catch try { var resp = await Client.GetAsync(url); @@ -259,4 +288,4 @@ public class MatrixHttpClient { return await SendAsync(request); } } -#endif \ No newline at end of file +#endif diff --git a/LibMatrix/Filters/LocalRoomQueryFilter.cs b/LibMatrix/Filters/LocalRoomQueryFilter.cs deleted file mode 100644
index b3bd4c0..0000000 --- a/LibMatrix/Filters/LocalRoomQueryFilter.cs +++ /dev/null
@@ -1,27 +0,0 @@ -namespace LibMatrix.Filters; - -public class LocalRoomQueryFilter { - public string RoomIdContains { get; set; } = ""; - public string NameContains { get; set; } = ""; - public string CanonicalAliasContains { get; set; } = ""; - public string VersionContains { get; set; } = ""; - public string CreatorContains { get; set; } = ""; - public string EncryptionContains { get; set; } = ""; - public string JoinRulesContains { get; set; } = ""; - public string GuestAccessContains { get; set; } = ""; - public string HistoryVisibilityContains { get; set; } = ""; - - public bool Federatable { get; set; } = true; - public bool Public { get; set; } = true; - - public int JoinedMembersGreaterThan { get; set; } - public int JoinedMembersLessThan { get; set; } = int.MaxValue; - - public int JoinedLocalMembersGreaterThan { get; set; } - public int JoinedLocalMembersLessThan { get; set; } = int.MaxValue; - public int StateEventsGreaterThan { get; set; } - public int StateEventsLessThan { get; set; } = int.MaxValue; - - public bool CheckFederation { get; set; } - public bool CheckPublic { get; set; } -} \ No newline at end of file diff --git a/LibMatrix/Helpers/MessageBuilder.cs b/LibMatrix/Helpers/MessageBuilder.cs
index b639e1f..d3bd6a5 100644 --- a/LibMatrix/Helpers/MessageBuilder.cs +++ b/LibMatrix/Helpers/MessageBuilder.cs
@@ -105,7 +105,7 @@ public class MessageBuilder(string msgType = "m.text", string format = "org.matr public MessageBuilder WithTable(Action<TableBuilder> tableBuilder) { var tb = new TableBuilder(this); - this.WithHtmlTag("table", msb => tableBuilder(tb)); + WithHtmlTag("table", msb => tableBuilder(tb)); return this; } diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 05bfb47..adcc714 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,15 +1,18 @@ using System.Diagnostics; +using System.Net.Http.Json; using System.Text.Json; using ArcaneLibs.Collections; +using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; using LibMatrix.Homeservers; +using LibMatrix.Interfaces.Services; using LibMatrix.Responses; using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; -public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) { +public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { private SyncFilter? _filter; private string? _namedFilterName; private bool _filterIsDirty; @@ -18,6 +21,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? Since { get; set; } public int Timeout { get; set; } = 30000; public string? SetPresence { get; set; } = "online"; + public bool UseInternalStreamingSync { get; set; } = true; public string? FilterId { get => _filterId; @@ -53,6 +57,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public TimeSpan MinimumDelay { get; set; } = new(0); + public async Task<int> GetUnoptimisedStoreCount() { + if (storageProvider is null) return -1; + var keys = await storageProvider.GetAllKeysAsync(); + return keys.Count(x => !x.StartsWith("old/")) - 1; + } + private async Task UpdateFilterAsync() { if (!string.IsNullOrWhiteSpace(NamedFilterName)) { _filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName); @@ -76,6 +86,25 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } + if (storageProvider is null) return await SyncAsyncInternal(cancellationToken); + + var key = Since ?? "init"; + if (await storageProvider.ObjectExistsAsync(key)) { + var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key); + // We explicitly check that NextBatch doesn't match since to prevent infinite loops... + if (cached is not null && cached.NextBatch != Since) { + logger?.LogInformation("SyncHelper: Using cached sync response for {}", key); + return cached; + } + } + + var sync = await SyncAsyncInternal(cancellationToken); + // Ditto here. + if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + return sync; + } + + private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null) { var sw = Stopwatch.StartNew(); if (_filterIsDirty) await UpdateFilterAsync(); @@ -86,15 +115,23 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg // logger?.LogInformation("SyncHelper: Calling: {}", url); try { - var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); - if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); - logger?.LogTrace("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); - var deserializeSw = Stopwatch.StartNew(); - var stream = await httpResp.Content.ReadAsStreamAsync(); - await using var seekableStream = new SeekableStream(stream); - var resp = await JsonSerializer.DeserializeAsync<SyncResponse>(seekableStream, cancellationToken: cancellationToken ?? CancellationToken.None, - jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); - logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", seekableStream.Position, deserializeSw.Elapsed, sw.Elapsed); + SyncResponse? resp = null; + if (UseInternalStreamingSync) { + resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None); + logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed); + } + else { + var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); + if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); + logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); + var deserializeSw = Stopwatch.StartNew(); + // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None); + // var resp = jsonResp.Deserialize<SyncResponse>(); + resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, + jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); + logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed); + } + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) await Task.Delay(timeToWait); @@ -214,4 +251,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg /// Event fired when an account data event is received /// </summary> public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new(); -} \ No newline at end of file + + private void Log(string message) { + if (logger is null) Console.WriteLine(message); + else logger.LogInformation(message); + } +} diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index 4633f06..282d26f 100644 --- a/LibMatrix/Helpers/SyncStateResolver.cs +++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,12 +1,18 @@ +using System.Collections.Frozen; +using System.Collections.Immutable; +using System.Diagnostics; +using System.Text; +using ArcaneLibs.Extensions; using LibMatrix.Extensions; using LibMatrix.Filters; using LibMatrix.Homeservers; +using LibMatrix.Interfaces.Services; using LibMatrix.Responses; using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; -public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) { +public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { public string? Since { get; set; } public int Timeout { get; set; } = 30000; public string? SetPresence { get; set; } = "online"; @@ -15,7 +21,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge public SyncResponse? MergedState { get; set; } - private SyncHelper _syncHelper = new(homeserver, logger); + private SyncHelper _syncHelper = new(homeserver, logger, storageProvider); public async Task<(SyncResponse next, SyncResponse merged)> ContinueAsync(CancellationToken? cancellationToken = null) { // copy properties @@ -24,156 +30,395 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge _syncHelper.SetPresence = SetPresence; _syncHelper.Filter = Filter; _syncHelper.FullState = FullState; - // run sync + var sync = await _syncHelper.SyncAsync(cancellationToken); if (sync is null) return await ContinueAsync(cancellationToken); - MergedState = MergedState is null ? sync : MergeSyncs(MergedState, sync); + + if (MergedState is null) MergedState = sync; + else MergedState = MergeSyncs(MergedState, sync); Since = sync.NextBatch; + return (sync, MergedState); } - private SyncResponse MergeSyncs(SyncResponse oldState, SyncResponse newState) { - oldState.NextBatch = newState.NextBatch; + public async Task OptimiseStore(Action<int, int>? progressCallback = null) { + if (storageProvider is null) return; + if (!await storageProvider.ObjectExistsAsync("init")) return; + + var totalSw = Stopwatch.StartNew(); + Console.Write("Optimising sync store..."); + var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init"); + var keys = (await storageProvider.GetAllKeysAsync()).Where(x => !x.StartsWith("old/")).ToFrozenSet(); + var count = keys.Count - 1; + int total = count; + Console.WriteLine($"Found {count} entries to optimise."); + + var merged = await initLoadTask; + if (merged is null) return; + if (!keys.Contains(merged.NextBatch)) { + Console.WriteLine("Next response after initial sync is not present, not checkpointing!"); + return; + } - oldState.AccountData ??= new EventList(); - oldState.AccountData.Events ??= new List<StateEventResponse>(); - if (newState.AccountData?.Events is not null) - oldState.AccountData.Events.MergeStateEventLists(newState.AccountData?.Events ?? new List<StateEventResponse>()); + // We back up old entries + var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; + await storageProvider.MoveObjectAsync("init", $"{oldPath}/init"); + + var moveTasks = new List<Task>(); + + Dictionary<string, Dictionary<string, TimeSpan>> traces = []; + while (keys.Contains(merged.NextBatch)) { + Console.Write($"Merging {merged.NextBatch}, {--count} remaining... "); + var sw = Stopwatch.StartNew(); + var swt = Stopwatch.StartNew(); + var next = await storageProvider.LoadObjectAsync<SyncResponse>(merged.NextBatch); + Console.Write($"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + if (next is null || merged.NextBatch == next.NextBatch) break; + + Console.Write($"Check {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + // back up old entry + moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}")); + Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + + var trace = new Dictionary<string, TimeSpan>(); + traces[merged.NextBatch] = trace; + merged = MergeSyncs(merged, next, trace); + Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms"); + // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining..."); + progressCallback?.Invoke(count, total); + } - oldState.Presence ??= new SyncResponse.PresenceDataStructure(); - if (newState.Presence?.Events is { Count: > 0 }) - if (oldState.Presence.Events is { Count: > 0 }) - oldState.Presence.Events.MergeStateEventLists(newState.Presence.Events); - else - oldState.Presence.Events = newState.Presence?.Events; - - oldState.DeviceOneTimeKeysCount ??= new Dictionary<string, int>(); - if (newState.DeviceOneTimeKeysCount is not null) - foreach (var (key, value) in newState.DeviceOneTimeKeysCount) - oldState.DeviceOneTimeKeysCount[key] = value; - - oldState.Rooms ??= new SyncResponse.RoomsDataStructure(); - if (newState.Rooms is not null) - oldState.Rooms = MergeRoomsDataStructure(oldState.Rooms, newState.Rooms); - - oldState.ToDevice ??= new EventList(); - oldState.ToDevice.Events ??= new List<StateEventResponse>(); - if (newState.ToDevice?.Events is not null) - oldState.ToDevice.Events.MergeStateEventLists(newState.ToDevice?.Events ?? new List<StateEventResponse>()); - - oldState.DeviceLists ??= new SyncResponse.DeviceListsDataStructure(); - if (newState.DeviceLists?.Changed is not null) { - oldState.DeviceLists.Changed ??= new List<string>(); - foreach (var s in newState.DeviceLists.Changed) - oldState.DeviceLists.Changed.Add(s); + var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}")); + var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString)); + await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms); + + await storageProvider.SaveObjectAsync("init", merged); + await Task.WhenAll(moveTasks); + + Console.WriteLine($"Optimised store in {totalSw.Elapsed.TotalMilliseconds}ms"); + Console.WriteLine($"Insertions: {EnumerableExtensions.insertions}, replacements: {EnumerableExtensions.replacements}"); + } + + /// <summary> + /// Remove all but initial sync and last checkpoint + /// </summary> + public async Task RemoveOldSnapshots() { + if (storageProvider is null) return; + var sw = Stopwatch.StartNew(); + + var map = await GetCheckpointMap(); + if (map is null) return; + if (map.Count < 3) return; + + var toRemove = map.Keys.Skip(1).Take(map.Count - 2).ToList(); + Console.Write("Cleaning up old snapshots: "); + foreach (var key in toRemove) { + var path = $"old/{key}/init"; + if (await storageProvider?.ObjectExistsAsync(path)) { + Console.Write($"{key}... "); + await storageProvider?.DeleteObjectAsync(path); + } } - if (newState.DeviceLists?.Left is not null) { - oldState.DeviceLists.Left ??= new List<string>(); - foreach (var s in newState.DeviceLists.Left) - oldState.DeviceLists.Left.Add(s); + Console.WriteLine("Done!"); + Console.WriteLine($"Removed {toRemove.Count} old snapshots in {sw.Elapsed.TotalMilliseconds}ms"); + } + + public async Task UnrollOptimisedStore() { + if (storageProvider is null) return; + Console.WriteLine("WARNING: Unrolling sync store!"); + } + + public async Task SquashOptimisedStore(int targetCountPerCheckpoint) { + Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint..."); + var checkpoints = await GetCheckpointMap(); + if (checkpoints is null) return; + + Console.WriteLine( + $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries"); + Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints."); + } + + public async Task dev() { + int i = 0; + var sw = Stopwatch.StartNew(); + var hist = GetSerialisedHistory(); + await foreach (var (key, resp) in hist) { + if (resp is null) continue; + // Console.WriteLine($"[{++i}] {key} -> {resp.NextBatch} ({resp.GetDerivedSyncTime()})"); + i++; } - return oldState; + Console.WriteLine($"Iterated {i} syncResponses in {sw.Elapsed}"); + Environment.Exit(0); } -#region Merge rooms + private async IAsyncEnumerable<(string key, SyncResponse? resp)> GetSerialisedHistory() { + if (storageProvider is null) yield break; + var map = await GetCheckpointMap(); + var currentRange = map.First(); + var nextKey = $"old/{map.First().Key}/init"; + var next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey); + while (true) { + var data = await next; + if (data is null) break; + yield return (nextKey, data); + if (currentRange.Value.Contains(data.NextBatch)) { + nextKey = $"old/{currentRange.Key}/{data.NextBatch}"; + } + else if (map.Any(x => x.Value.Contains(data.NextBatch))) { + currentRange = map.First(x => x.Value.Contains(data.NextBatch)); + nextKey = $"old/{currentRange.Key}/{data.NextBatch}"; + } + else if (await storageProvider.ObjectExistsAsync(data.NextBatch)) { + nextKey = data.NextBatch; + } + else break; + + next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey); + } + } + + public async Task<SyncResponse?> GetMergedUpTo(DateTime time) { + if (storageProvider is null) return null; + var unixTime = new DateTimeOffset(time.ToUniversalTime()).ToUnixTimeMilliseconds(); + var map = await GetCheckpointMap(); + if (map is null) return new(); + var stream = GetSerialisedHistory().GetAsyncEnumerator(); + SyncResponse? merged = await stream.MoveNextAsync() ? stream.Current.resp : null; + + if (merged.GetDerivedSyncTime() > unixTime) { + Console.WriteLine("Initial sync is already past the target time!"); + Console.WriteLine($"CURRENT: {merged.GetDerivedSyncTime()} (UTC: {DateTimeOffset.FromUnixTimeMilliseconds(merged.GetDerivedSyncTime())})"); + Console.WriteLine($" TARGET: {unixTime} ({time.Kind}: {time}, UTC: {time.ToUniversalTime()})"); + return null; + } + + while (await stream.MoveNextAsync()) { + var (key, resp) = stream.Current; + if (resp is null) continue; + if (resp.GetDerivedSyncTime() > unixTime) break; + merged = MergeSyncs(merged, resp); + } + + return merged; + } - private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure oldState, SyncResponse.RoomsDataStructure newState) { - oldState.Join ??= new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>(); - foreach (var (key, value) in newState.Join ?? new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>()) - if (!oldState.Join.TryAdd(key, value)) - oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value); - - oldState.Invite ??= new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>(); - foreach (var (key, value) in newState.Invite ?? new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>()) - if (!oldState.Invite.TryAdd(key, value)) - oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value); - - oldState.Leave ??= new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>(); - foreach (var (key, value) in newState.Leave ?? new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>()) { - if (!oldState.Leave.TryAdd(key, value)) - oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value); - if (oldState.Invite.ContainsKey(key)) oldState.Invite.Remove(key); - if (oldState.Join.ContainsKey(key)) oldState.Join.Remove(key); + private async Task<ImmutableSortedDictionary<ulong, FrozenSet<string>>> GetCheckpointMap() { + if (storageProvider is null) return null; + var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet(); + var map = new Dictionary<ulong, List<string>>(); + foreach (var key in keys) { + if (!key.StartsWith("old/")) continue; + var parts = key.Split('/'); + if (parts.Length < 3) continue; + if (!ulong.TryParse(parts[1], out var checkpoint)) continue; + if (!map.ContainsKey(checkpoint)) map[checkpoint] = new(); + map[checkpoint].Add(parts[2]); } + return map.OrderBy(x => x.Key).ToImmutableSortedDictionary(x => x.Key, x => x.Value.ToFrozenSet()); + } + + private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) { + var sw = Stopwatch.StartNew(); + oldSync.NextBatch = newSync.NextBatch ?? oldSync.NextBatch; + + oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData); + trace?.Add("AccountData", sw.GetElapsedAndRestart()); + + oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type); + trace?.Add("Presence", sw.GetElapsedAndRestart()); + + // TODO: can this be cleaned up? + oldSync.DeviceOneTimeKeysCount ??= new(); + if (newSync.DeviceOneTimeKeysCount is not null) + foreach (var (key, value) in newSync.DeviceOneTimeKeysCount) + oldSync.DeviceOneTimeKeysCount[key] = value; + trace?.Add("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart()); + + if (newSync.Rooms is not null) + oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, trace); + trace?.Add("Rooms", sw.GetElapsedAndRestart()); + + oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice); + trace?.Add("ToDevice", sw.GetElapsedAndRestart()); + + oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure(); + oldSync.DeviceLists.Changed ??= []; + oldSync.DeviceLists.Left ??= []; + if (newSync.DeviceLists?.Changed is not null) + foreach (var s in newSync.DeviceLists.Changed!) { + oldSync.DeviceLists.Left.Remove(s); + oldSync.DeviceLists.Changed.Add(s); + } + + trace?.Add("DeviceLists.Changed", sw.GetElapsedAndRestart()); + + if (newSync.DeviceLists?.Left is not null) + foreach (var s in newSync.DeviceLists.Left!) { + oldSync.DeviceLists.Changed.Remove(s); + oldSync.DeviceLists.Left.Add(s); + } + + trace?.Add("DeviceLists.Left", sw.GetElapsedAndRestart()); + + return oldSync; + } + +#region Merge rooms + + private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState, + Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); + if (oldState is null) return newState; + + if (newState.Join is { Count: > 0 }) + if (oldState.Join is null) + oldState.Join = newState.Join; + else + foreach (var (key, value) in newState.Join) + if (!oldState.Join.TryAdd(key, value)) + oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, trace); + trace?.Add("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart()); + + if (newState.Invite is { Count: > 0 }) + if (oldState.Invite is null) + oldState.Invite = newState.Invite; + else + foreach (var (key, value) in newState.Invite) + if (!oldState.Invite.TryAdd(key, value)) + oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, trace); + trace?.Add("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart()); + + if (newState.Leave is { Count: > 0 }) + if (oldState.Leave is null) + oldState.Leave = newState.Leave; + else + foreach (var (key, value) in newState.Leave) { + if (!oldState.Leave.TryAdd(key, value)) + oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, trace); + if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key); + if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key); + } + trace?.Add("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart()); + return oldState; } - private SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData) { - oldData.AccountData ??= new EventList(); - oldData.AccountData.Events ??= new List<StateEventResponse>(); - oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure(); - oldData.Timeline.Events ??= new List<StateEventResponse>(); - oldData.State ??= new EventList(); - oldData.State.Events ??= new List<StateEventResponse>(); + private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData, + SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); - if (newData.AccountData?.Events is not null) - oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>()); + oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData); + trace?.Add($"LeftRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.Timeline?.Events is not null) - oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>()); + oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure + ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure"); oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited; oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch; + trace?.Add($"LeftRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.State?.Events is not null) - oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>()); + oldData.State = MergeEventList(oldData.State, newData.State); + trace?.Add($"LeftRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } - private SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData) { - oldData.InviteState ??= new EventList(); - oldData.InviteState.Events ??= new List<StateEventResponse>(); - if (newData.InviteState?.Events is not null) - oldData.InviteState.Events.MergeStateEventLists(newData.InviteState?.Events ?? new List<StateEventResponse>()); + private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData, + SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); + oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState); + trace?.Add($"InvitedRoomDataStructure.InviteState/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } - private SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData, - SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData) { - oldData.AccountData ??= new EventList(); - oldData.AccountData.Events ??= new List<StateEventResponse>(); - oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure(); - oldData.Timeline.Events ??= new List<StateEventResponse>(); - oldData.State ??= new EventList(); - oldData.State.Events ??= new List<StateEventResponse>(); - oldData.Ephemeral ??= new EventList(); - oldData.Ephemeral.Events ??= new List<StateEventResponse>(); + private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData, + SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) { + var sw = Stopwatch.StartNew(); - if (newData.AccountData?.Events is not null) - oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>()); + oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData); + trace?.Add($"JoinedRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.Timeline?.Events is not null) - oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>()); + oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure + ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure"); oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited; oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch; + trace?.Add($"JoinedRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.State?.Events is not null) - oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>()); + oldData.State = MergeEventList(oldData.State, newData.State); + trace?.Add($"JoinedRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); - if (newData.Ephemeral?.Events is not null) - oldData.Ephemeral.Events.MergeStateEventLists(newData.Ephemeral?.Events ?? new List<StateEventResponse>()); + oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral); + trace?.Add($"JoinedRoomDataStructure.Ephemeral/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure(); oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount; oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount; + trace?.Add($"JoinedRoom$DataStructure.UnreadNotifications/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); + + if (oldData.Summary is null) + oldData.Summary = newData.Summary; + else { + oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes; + oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount; + oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount; + } - oldData.Summary ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.SummaryDataStructure { - Heroes = newData.Summary?.Heroes ?? oldData.Summary?.Heroes, - JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary?.JoinedMemberCount, - InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary?.InvitedMemberCount - }; - oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes; - oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount; - oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount; + trace?.Add($"JoinedRoomDataStructure.Summary/{oldData.GetHashCode()}", sw.GetElapsedAndRestart()); return oldData; } #endregion + + private static EventList? MergeEventList(EventList? oldState, EventList? newState) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.MergeStateEventLists(newState.Events); + return oldState; + } + + private static EventList? MergeEventListBy(EventList? oldState, EventList? newState, Func<StateEventResponse, StateEventResponse, bool> comparer) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.ReplaceBy(newState.Events, comparer); + return oldState; + } + + private static EventList? AppendEventList(EventList? oldState, EventList? newState) { + if (newState is null) return oldState; + if (oldState is null) { + return newState; + } + + if (newState.Events is null) return oldState; + if (oldState.Events is null) { + oldState.Events = newState.Events; + return oldState; + } + + oldState.Events.AddRange(newState.Events); + return oldState; + } } \ No newline at end of file diff --git a/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs b/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs
index 8c95bc3..c1bbc5a 100644 --- a/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs +++ b/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs
@@ -47,7 +47,6 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver { public HsNamedCaches NamedCaches { get; set; } public GenericRoom GetRoom(string roomId) { - if (roomId is null || !roomId.StartsWith("!")) throw new ArgumentException("Room ID must start with !", nameof(roomId)); return new GenericRoom(this, roomId); } @@ -185,6 +184,17 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver { #endregion +#region MSC 4133 + + public async Task UpdateProfilePropertyAsync(string name, object? value) { + var caps = await GetCapabilitiesAsync(); + if(caps is null) throw new Exception("Failed to get capabilities"); + + } + +#endregion + + [Obsolete("This method assumes no support for MSC 4069 and MSC 4133")] public async Task UpdateProfileAsync(UserProfileResponse? newProfile, bool preserveCustomRoomProfile = true) { if (newProfile is null) return; Console.WriteLine($"Updating profile for {WhoAmI.UserId} to {newProfile.ToJson(ignoreNull: true)} (preserving room profiles: {preserveCustomRoomProfile})"); @@ -522,4 +532,8 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver { } #endregion -} \ No newline at end of file + private class CapabilitiesResponse { + [JsonPropertyName("capabilities")] + public Dictionary<string, object>? Capabilities { get; set; } + } +} diff --git a/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs b/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs
index 3ed7311..a48402a 100644 --- a/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs +++ b/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs
@@ -3,11 +3,19 @@ using System.Net.Http.Json; using System.Text.Json; using System.Text.Json.Nodes; +using System.Net.Http.Json; +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; using ArcaneLibs.Extensions; using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Filters; using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Requests; using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Responses; using LibMatrix.Responses; +using LibMatrix.Filters; +using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Filters; +using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Responses; +using LibMatrix.Responses; namespace LibMatrix.Homeservers.ImplementationDetails.Synapse; diff --git a/LibMatrix/Homeservers/RemoteHomeServer.cs b/LibMatrix/Homeservers/RemoteHomeServer.cs
index 4ee523f..45ecb18 100644 --- a/LibMatrix/Homeservers/RemoteHomeServer.cs +++ b/LibMatrix/Homeservers/RemoteHomeServer.cs
@@ -54,6 +54,8 @@ public class RemoteHomeserver { return data; } + + // TODO: Do we need to support retrieving individual profile properties? Is there any use for that besides just getting the full profile? public async Task<ClientVersionsResponse> GetClientVersionsAsync() { var resp = await ClientHttpClient.GetAsync($"/_matrix/client/versions"); diff --git a/LibMatrix/Interfaces/Services/IStorageProvider.cs b/LibMatrix/Interfaces/Services/IStorageProvider.cs
index 165e7df..fb7bb6d 100644 --- a/LibMatrix/Interfaces/Services/IStorageProvider.cs +++ b/LibMatrix/Interfaces/Services/IStorageProvider.cs
@@ -31,7 +31,7 @@ public interface IStorageProvider { } // get all keys - public Task<List<string>> GetAllKeysAsync() { + public Task<IEnumerable<string>> GetAllKeysAsync() { Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement GetAllKeys()!"); throw new NotImplementedException(); } @@ -53,4 +53,18 @@ public interface IStorageProvider { Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement LoadStream(key)!"); throw new NotImplementedException(); } + + // copy + public async Task CopyObjectAsync(string sourceKey, string destKey) { + Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement CopyObject(sourceKey, destKey), using load + save!"); + var data = await LoadObjectAsync<object>(sourceKey); + await SaveObjectAsync(destKey, data); + } + + // move + public async Task MoveObjectAsync(string sourceKey, string destKey) { + Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement MoveObject(sourceKey, destKey), using copy + delete!"); + await CopyObjectAsync(sourceKey, destKey); + await DeleteObjectAsync(sourceKey); + } } \ No newline at end of file diff --git a/LibMatrix/MatrixException.cs b/LibMatrix/MatrixException.cs
index 3a79af8..519f99e 100644 --- a/LibMatrix/MatrixException.cs +++ b/LibMatrix/MatrixException.cs
@@ -26,43 +26,47 @@ public class MatrixException : Exception { public string GetAsJson() => GetAsObject().ToJson(ignoreNull: true); public override string Message => - $"{ErrorCode}: {ErrorCode switch { - // common - "M_FORBIDDEN" => $"You do not have permission to perform this action: {Error}", - "M_UNKNOWN_TOKEN" => $"The access token specified was not recognised: {Error}{(SoftLogout == true ? " (soft logout)" : "")}", - "M_MISSING_TOKEN" => $"No access token was specified: {Error}", - "M_BAD_JSON" => $"Request contained valid JSON, but it was malformed in some way: {Error}", - "M_NOT_JSON" => $"Request did not contain valid JSON: {Error}", - "M_NOT_FOUND" => $"The requested resource was not found: {Error}", - "M_LIMIT_EXCEEDED" => $"Too many requests have been sent in a short period of time. Wait a while then try again: {Error}", - "M_UNRECOGNISED" => $"The server did not recognise the request: {Error}", - "M_UNKOWN" => $"The server encountered an unexpected error: {Error}", - // endpoint specific - "M_UNAUTHORIZED" => $"The request did not contain valid authentication information for the target of the request: {Error}", - "M_USER_DEACTIVATED" => $"The user ID associated with the request has been deactivated: {Error}", - "M_USER_IN_USE" => $"The user ID associated with the request is already in use: {Error}", - "M_INVALID_USERNAME" => $"The requested user ID is not valid: {Error}", - "M_ROOM_IN_USE" => $"The room alias requested is already taken: {Error}", - "M_INVALID_ROOM_STATE" => $"The room associated with the request is not in a valid state to perform the request: {Error}", - "M_THREEPID_IN_USE" => $"The threepid requested is already associated with a user ID on this server: {Error}", - "M_THREEPID_NOT_FOUND" => $"The threepid requested is not associated with any user ID: {Error}", - "M_THREEPID_AUTH_FAILED" => $"The provided threepid and/or token was invalid: {Error}", - "M_THREEPID_DENIED" => $"The homeserver does not permit the third party identifier in question: {Error}", - "M_SERVER_NOT_TRUSTED" => $"The homeserver does not trust the identity server: {Error}", - "M_UNSUPPORTED_ROOM_VERSION" => $"The room version is not supported: {Error}", - "M_INCOMPATIBLE_ROOM_VERSION" => $"The room version is incompatible: {Error}", - "M_BAD_STATE" => $"The request was invalid because the state was invalid: {Error}", - "M_GUEST_ACCESS_FORBIDDEN" => $"Guest access is forbidden: {Error}", - "M_CAPTCHA_NEEDED" => $"Captcha needed: {Error}", - "M_CAPTCHA_INVALID" => $"Captcha invalid: {Error}", - "M_MISSING_PARAM" => $"Missing parameter: {Error}", - "M_INVALID_PARAM" => $"Invalid parameter: {Error}", - "M_TOO_LARGE" => $"The request or entity was too large: {Error}", - "M_EXCLUSIVE" => $"The resource being requested is reserved by an application service, or the application service making the request has not created the resource: {Error}", - "M_RESOURCE_LIMIT_EXCEEDED" => $"Exceeded resource limit: {Error}", - "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" => $"Cannot leave server notice room: {Error}", - _ => $"Unknown error: {new { ErrorCode, Error, SoftLogout, RetryAfterMs }.ToJson(ignoreNull: true)}" - }}"; + $"{ErrorCode}: " + + (!string.IsNullOrWhiteSpace(Error) + ? Error + : ErrorCode switch { + // common + "M_FORBIDDEN" => $"You do not have permission to perform this action: {Error}", + "M_UNKNOWN_TOKEN" => $"The access token specified was not recognised: {Error}{(SoftLogout == true ? " (soft logout)" : "")}", + "M_MISSING_TOKEN" => $"No access token was specified: {Error}", + "M_BAD_JSON" => $"Request contained valid JSON, but it was malformed in some way: {Error}", + "M_NOT_JSON" => $"Request did not contain valid JSON: {Error}", + "M_NOT_FOUND" => $"The requested resource was not found: {Error}", + "M_LIMIT_EXCEEDED" => $"Too many requests have been sent in a short period of time. Wait a while then try again: {Error}", + "M_UNRECOGNISED" => $"The server did not recognise the request: {Error}", + "M_UNKOWN" => $"The server encountered an unexpected error: {Error}", + // endpoint specific + "M_UNAUTHORIZED" => $"The request did not contain valid authentication information for the target of the request: {Error}", + "M_USER_DEACTIVATED" => $"The user ID associated with the request has been deactivated: {Error}", + "M_USER_IN_USE" => $"The user ID associated with the request is already in use: {Error}", + "M_INVALID_USERNAME" => $"The requested user ID is not valid: {Error}", + "M_ROOM_IN_USE" => $"The room alias requested is already taken: {Error}", + "M_INVALID_ROOM_STATE" => $"The room associated with the request is not in a valid state to perform the request: {Error}", + "M_THREEPID_IN_USE" => $"The threepid requested is already associated with a user ID on this server: {Error}", + "M_THREEPID_NOT_FOUND" => $"The threepid requested is not associated with any user ID: {Error}", + "M_THREEPID_AUTH_FAILED" => $"The provided threepid and/or token was invalid: {Error}", + "M_THREEPID_DENIED" => $"The homeserver does not permit the third party identifier in question: {Error}", + "M_SERVER_NOT_TRUSTED" => $"The homeserver does not trust the identity server: {Error}", + "M_UNSUPPORTED_ROOM_VERSION" => $"The room version is not supported: {Error}", + "M_INCOMPATIBLE_ROOM_VERSION" => $"The room version is incompatible: {Error}", + "M_BAD_STATE" => $"The request was invalid because the state was invalid: {Error}", + "M_GUEST_ACCESS_FORBIDDEN" => $"Guest access is forbidden: {Error}", + "M_CAPTCHA_NEEDED" => $"Captcha needed: {Error}", + "M_CAPTCHA_INVALID" => $"Captcha invalid: {Error}", + "M_MISSING_PARAM" => $"Missing parameter: {Error}", + "M_INVALID_PARAM" => $"Invalid parameter: {Error}", + "M_TOO_LARGE" => $"The request or entity was too large: {Error}", + "M_EXCLUSIVE" => + $"The resource being requested is reserved by an application service, or the application service making the request has not created the resource: {Error}", + "M_RESOURCE_LIMIT_EXCEEDED" => $"Exceeded resource limit: {Error}", + "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" => $"Cannot leave server notice room: {Error}", + _ => $"Unknown error: {new { ErrorCode, Error, SoftLogout, RetryAfterMs }.ToJson(ignoreNull: true)}" + }); [SuppressMessage("ReSharper", "InconsistentNaming", Justification = "Follows spec naming")] public static class ErrorCodes { diff --git a/LibMatrix/Responses/SyncResponse.cs b/LibMatrix/Responses/SyncResponse.cs
index 529bd5c..977de3e 100644 --- a/LibMatrix/Responses/SyncResponse.cs +++ b/LibMatrix/Responses/SyncResponse.cs
@@ -1,4 +1,7 @@ using System.Text.Json.Serialization; +using LibMatrix.EventTypes.Spec.Ephemeral; +using LibMatrix.EventTypes.Spec.State; +using LibMatrix.EventTypes.Spec.State.RoomInfo; namespace LibMatrix.Responses; @@ -14,7 +17,7 @@ public class SyncResponse { public EventList? AccountData { get; set; } [JsonPropertyName("presence")] - public PresenceDataStructure? Presence { get; set; } + public EventList? Presence { get; set; } [JsonPropertyName("device_one_time_keys_count")] public Dictionary<string, int>? DeviceOneTimeKeysCount { get; set; } @@ -61,6 +64,12 @@ public class SyncResponse { [JsonPropertyName("state")] public EventList? State { get; set; } + + public override string ToString() { + var lastEvent = Timeline?.Events?.LastOrDefault(x => x.Type == "m.room.member"); + var membership = (lastEvent?.TypedContent as RoomMemberEventContent); + return $"LeftRoomDataStructure: {lastEvent?.Sender} {membership?.Membership} ({membership?.Reason})"; + } } public class JoinedRoomDataStructure { @@ -82,7 +91,7 @@ public class SyncResponse { [JsonPropertyName("summary")] public SummaryDataStructure? Summary { get; set; } - public class TimelineDataStructure { + public class TimelineDataStructure : EventList { public TimelineDataStructure() { } public TimelineDataStructure(List<StateEventResponse>? events, bool? limited) { @@ -90,8 +99,8 @@ public class SyncResponse { Limited = limited; } - [JsonPropertyName("events")] - public List<StateEventResponse>? Events { get; set; } + // [JsonPropertyName("events")] + // public List<StateEventResponse>? Events { get; set; } [JsonPropertyName("prev_batch")] public string? PrevBatch { get; set; } @@ -125,4 +134,15 @@ public class SyncResponse { public EventList? InviteState { get; set; } } } -} \ No newline at end of file + + public long GetDerivedSyncTime() { + return ((long[]) [ + AccountData?.Events?.Max(x => x.OriginServerTs) ?? 0, + Presence?.Events?.Max(x => x.OriginServerTs) ?? 0, + ToDevice?.Events?.Max(x => x.OriginServerTs) ?? 0, + Rooms?.Join?.Values?.Max(x => x.Timeline?.Events?.Max(y => y.OriginServerTs)) ?? 0, + Rooms?.Invite?.Values?.Max(x => x.InviteState?.Events?.Max(y => y.OriginServerTs)) ?? 0, + Rooms?.Leave?.Values?.Max(x => x.Timeline?.Events?.Max(y => y.OriginServerTs)) ?? 0 + ]).Max(); + } +} diff --git a/LibMatrix/Responses/UserProfileResponse.cs b/LibMatrix/Responses/UserProfileResponse.cs
index 6c9380f..30e4c32 100644 --- a/LibMatrix/Responses/UserProfileResponse.cs +++ b/LibMatrix/Responses/UserProfileResponse.cs
@@ -1,3 +1,4 @@ +using System.Text.Json; using System.Text.Json.Serialization; namespace LibMatrix.Responses; @@ -8,4 +9,18 @@ public class UserProfileResponse { [JsonPropertyName("displayname")] public string? DisplayName { get; set; } + + // MSC 4133 - Extending User Profile API with Key:Value pairs + [JsonExtensionData] + public Dictionary<string, JsonElement>? CustomKeys { get; set; } + + public JsonElement? this[string key] { + get => CustomKeys?[key]; + set { + if (value is null) + CustomKeys?.Remove(key); + else + (CustomKeys ??= [])[key] = value.Value; + } + } } \ No newline at end of file diff --git a/LibMatrix/RoomTypes/GenericRoom.cs b/LibMatrix/RoomTypes/GenericRoom.cs
index fb56f2e..ec61a33 100644 --- a/LibMatrix/RoomTypes/GenericRoom.cs +++ b/LibMatrix/RoomTypes/GenericRoom.cs
@@ -1,5 +1,6 @@ using System.Collections.Frozen; using System.Net.Http.Json; +using System.Security.Cryptography; using System.Text.Json; using System.Text.Json.Nodes; using System.Text.Json.Serialization; @@ -327,8 +328,8 @@ public class GenericRoom { catch { try { var alias = await GetCanonicalAliasAsync(); - if (alias?.Alias is not null) return alias.Alias; - throw new Exception("No name or alias"); + if (!string.IsNullOrWhiteSpace(alias?.Alias)) return alias.Alias; + throw new Exception("No alias"); } catch { try { @@ -336,7 +337,8 @@ public class GenericRoom { var memberList = new List<string>(); var memberCount = 0; await foreach (var member in members) - memberList.Add(member.RawContent?["displayname"]?.GetValue<string>() ?? ""); + if (member.StateKey != Homeserver.UserId) + memberList.Add(member.RawContent?["displayname"]?.GetValue<string>() ?? ""); memberCount = memberList.Count; memberList.RemoveAll(string.IsNullOrWhiteSpace); memberList = memberList.OrderBy(x => x).ToList(); @@ -554,4 +556,4 @@ public class GenericRoom { public class RoomIdResponse { [JsonPropertyName("room_id")] public string RoomId { get; set; } -} \ No newline at end of file +} diff --git a/LibMatrix/Services/HomeserverProviderService.cs b/LibMatrix/Services/HomeserverProviderService.cs
index 601087d..36bc828 100644 --- a/LibMatrix/Services/HomeserverProviderService.cs +++ b/LibMatrix/Services/HomeserverProviderService.cs
@@ -22,9 +22,11 @@ public class HomeserverProviderService(ILogger<HomeserverProviderService> logger AuthenticatedHomeserverGeneric? hs = null; if (!useGeneric) { + var clientVersionsTask = rhs.GetClientVersionsAsync(); + var serverVersionTask = rhs.FederationClient?.GetServerVersionAsync() ?? Task.FromResult<ServerVersionResponse?>(null)!; ClientVersionsResponse clientVersions = new(); try { - clientVersions = await rhs.GetClientVersionsAsync(); + clientVersions = await clientVersionsTask; } catch (Exception e) { logger.LogError(e, "Failed to get client versions for {homeserver}", homeserver); @@ -32,7 +34,7 @@ public class HomeserverProviderService(ILogger<HomeserverProviderService> logger ServerVersionResponse? serverVersion; try { - serverVersion = await (rhs.FederationClient?.GetServerVersionAsync() ?? Task.FromResult<ServerVersionResponse?>(null)!); + serverVersion = await serverVersionTask; } catch (Exception e) { logger.LogWarning(e, "Failed to get server version for {homeserver}", homeserver); diff --git a/LibMatrix/Services/HomeserverResolverService.cs b/LibMatrix/Services/HomeserverResolverService.cs
index 01b11cc..53cd2dd 100644 --- a/LibMatrix/Services/HomeserverResolverService.cs +++ b/LibMatrix/Services/HomeserverResolverService.cs
@@ -62,9 +62,9 @@ public class HomeserverResolverService { clientWellKnown ??= await _httpClient.TryGetFromJsonAsync<ClientWellKnown>($"http://{homeserver}/.well-known/matrix/client"); if (clientWellKnown is null) { - if (await _httpClient.CheckSuccessStatus($"https://{homeserver}/_matrix/client/versions")) + if (await MatrixHttpClient.CheckSuccessStatus($"https://{homeserver}/_matrix/client/versions")) return $"https://{homeserver}"; - if (await _httpClient.CheckSuccessStatus($"http://{homeserver}/_matrix/client/versions")) + if (await MatrixHttpClient.CheckSuccessStatus($"http://{homeserver}/_matrix/client/versions")) return $"http://{homeserver}"; } } @@ -100,16 +100,16 @@ public class HomeserverResolverService { var resolved = serverWellKnown.Homeserver.TrimEnd('/'); if (resolved.StartsWith("https://") || resolved.StartsWith("http://")) return resolved; - if (await _httpClient.CheckSuccessStatus($"https://{resolved}/_matrix/federation/v1/version")) + if (await MatrixHttpClient.CheckSuccessStatus($"https://{resolved}/_matrix/federation/v1/version")) return $"https://{resolved}"; - if (await _httpClient.CheckSuccessStatus($"http://{resolved}/_matrix/federation/v1/version")) + if (await MatrixHttpClient.CheckSuccessStatus($"http://{resolved}/_matrix/federation/v1/version")) return $"http://{resolved}"; _logger.LogWarning("Server well-known points to invalid server: {resolved}", resolved); } // fallback: most servers host C2S and S2S on the same domain var clientUrl = (await _tryResolveClientEndpoint(homeserver)).TrimEnd('/'); - if (clientUrl is not null && await _httpClient.CheckSuccessStatus($"{clientUrl}/_matrix/federation/v1/version")) + if (clientUrl is not null && await MatrixHttpClient.CheckSuccessStatus($"{clientUrl}/_matrix/federation/v1/version")) return clientUrl; _logger.LogInformation("No server well-known for {server}...", homeserver); diff --git a/LibMatrix/StateEvent.cs b/LibMatrix/StateEvent.cs
index 6d8f195..ef760e1 100644 --- a/LibMatrix/StateEvent.cs +++ b/LibMatrix/StateEvent.cs
@@ -141,7 +141,7 @@ public class StateEventResponse : StateEvent { public string? EventId { get; set; } public class UnsignedData { - [JsonPropertyName("age")] + [JsonPropertyName("age"), JsonNumberHandling(JsonNumberHandling.AllowReadingFromString)] public ulong? Age { get; set; } [JsonPropertyName("redacted_because")] diff --git a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
index c15fe7d..b6fe7c2 100644 --- a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs +++ b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
@@ -120,7 +120,7 @@ public class RoomStore { public Room(string roomId) { if (string.IsNullOrWhiteSpace(roomId)) throw new ArgumentException("Value cannot be null or whitespace.", nameof(roomId)); - if (roomId[0] != '!') throw new ArgumentException("Room ID must start with !", nameof(roomId)); + if (roomId[0] != '!') throw new ArgumentException($"Room ID must start with '!', provided value: {roomId ?? "null"}", nameof(roomId)); RoomId = roomId; Timeline = new(); AccountData = new(); diff --git a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
index ca6a4d8..8501d41 100644 --- a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs +++ b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
@@ -18,7 +18,7 @@ public class BotInstaller(IServiceCollection services) { public BotInstaller AddMatrixBot() { services.AddSingleton<LibMatrixBotConfiguration>(); - services.AddScoped<AuthenticatedHomeserverGeneric>(x => { + services.AddSingleton<AuthenticatedHomeserverGeneric>(x => { var config = x.GetService<LibMatrixBotConfiguration>() ?? throw new Exception("No configuration found!"); var hsProvider = x.GetService<HomeserverProviderService>() ?? throw new Exception("No homeserver provider found!"); diff --git a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
index b5b5a2b..d07090f 100644 --- a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs +++ b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
@@ -1,3 +1,4 @@ +using ArcaneLibs.Extensions; using LibMatrix.EventTypes.Spec; using LibMatrix.EventTypes.Spec.State.RoomInfo; using LibMatrix.Filters; @@ -57,22 +58,44 @@ public class CommandListenerHostedService : IHostedService { FilterId = filter }; - syncHelper.TimelineEventHandlers.Add(async @event => { - try { - var room = _hs.GetRoom(@event.RoomId); - // _logger.LogInformation(eventResponse.ToJson(indent: false)); - if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message }) - if (message is { MessageType: "m.text" }) { - var usedPrefix = await GetUsedPrefix(@event); - if (usedPrefix is null) return; - var res = await InvokeCommand(@event, usedPrefix); - await (_commandResultHandler?.Invoke(res) ?? HandleResult(res)); + syncHelper.SyncReceivedHandlers.Add(async sync => { + _logger.LogInformation("Sync received!"); + foreach (var roomResp in sync.Rooms?.Join ?? []) { + if (roomResp.Value.Timeline?.Events is null or { Count: > 5 }) continue; + foreach (var @event in roomResp.Value.Timeline.Events) { + @event.RoomId = roomResp.Key; + try { + // var room = _hs.GetRoom(@event.RoomId); + // _logger.LogInformation(eventResponse.ToJson(indent: false)); + if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message }) + if (message is { MessageType: "m.text" }) { + var usedPrefix = await GetUsedPrefix(@event); + if (usedPrefix is null) return; + var res = await InvokeCommand(@event, usedPrefix); + await (_commandResultHandler?.Invoke(res) ?? HandleResult(res)); + } } - } - catch (Exception e) { - _logger.LogError(e, "Error in command listener!"); + catch (Exception e) { + _logger.LogError(e, "Error in command listener!"); + Console.WriteLine(@event.ToJson(ignoreNull: false, indent: true)); + var fakeResult = new CommandResult() { + Result = CommandResult.CommandResultType.Failure_Exception, + Exception = e, + Success = false, + Context = new() { + Homeserver = _hs, + CommandName = "[CommandListener.SyncHandler]", + Room = _hs.GetRoom(roomResp.Key), + Args = [], + MessageEvent = @event + } + }; + await (_commandResultHandler?.Invoke(fakeResult) ?? HandleResult(fakeResult)); + } + } } }); + await syncHelper.RunSyncLoopAsync(cancellationToken: cancellationToken); }