diff --git a/.idea/.idea.LibMatrix/.idea/.name b/.idea/.idea.LibMatrix/.idea/.name
deleted file mode 100644
index 2f382b8..0000000
--- a/.idea/.idea.LibMatrix/.idea/.name
+++ /dev/null
@@ -1 +0,0 @@
-LibMatrix
\ No newline at end of file
diff --git a/LibMatrix/Extensions/EnumerableExtensions.cs b/LibMatrix/Extensions/EnumerableExtensions.cs
index 42d9491..ace2c0c 100644
--- a/LibMatrix/Extensions/EnumerableExtensions.cs
+++ b/LibMatrix/Extensions/EnumerableExtensions.cs
@@ -1,29 +1,91 @@
+using System.Collections.Frozen;
+using System.Collections.Immutable;
+
namespace LibMatrix.Extensions;
public static class EnumerableExtensions {
+ public static int insertions = 0;
+ public static int replacements = 0;
+
public static void MergeStateEventLists(this IList<StateEvent> oldState, IList<StateEvent> newState) {
- foreach (var stateEvent in newState) {
- var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
- if (old is null) {
- oldState.Add(stateEvent);
- continue;
+ // foreach (var stateEvent in newState) {
+ // var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
+ // if (old is null) {
+ // oldState.Add(stateEvent);
+ // continue;
+ // }
+ //
+ // oldState.Remove(old);
+ // oldState.Add(stateEvent);
+ // }
+
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ break;
+ case var index:
+ oldState[index] = e;
+ break;
}
+ }
- oldState.Remove(old);
- oldState.Add(stateEvent);
+ int FindIndex(StateEvent needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
+ }
+
+ return -1;
}
}
public static void MergeStateEventLists(this IList<StateEventResponse> oldState, IList<StateEventResponse> newState) {
- foreach (var stateEvent in newState) {
- var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
- if (old is null) {
- oldState.Add(stateEvent);
- continue;
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ break;
+ case var index:
+ oldState[index] = e;
+ break;
+ }
+ }
+
+ int FindIndex(StateEventResponse needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
+ }
+
+ return -1;
+ }
+ }
+
+ public static void MergeStateEventLists(this List<StateEventResponse> oldState, List<StateEventResponse> newState) {
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ insertions++;
+ break;
+ case var index:
+ oldState[index] = e;
+ replacements++;
+ break;
+ }
+ }
+
+ int FindIndex(StateEventResponse needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
}
- oldState.Remove(old);
- oldState.Add(stateEvent);
+ return -1;
}
}
}
\ No newline at end of file
diff --git a/LibMatrix/Extensions/MatrixHttpClient.Single.cs b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
index 42f81a8..a3ea409 100644
--- a/LibMatrix/Extensions/MatrixHttpClient.Single.cs
+++ b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
@@ -1,5 +1,5 @@
#define SINGLE_HTTPCLIENT // Use a single HttpClient instance for all MatrixHttpClient instances
-// #define SYNC_HTTPCLIENT // Only allow one request as a time, for debugging
+// #define SYNC_HTTPCLIENT // Only allow one request as a time, for debugging
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Net;
@@ -15,7 +15,7 @@ using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Requests;
namespace LibMatrix.Extensions;
#if SINGLE_HTTPCLIENT
-// TODO: Add URI wrapper for
+// TODO: Add URI wrapper for
public class MatrixHttpClient {
private static readonly HttpClient Client;
@@ -51,6 +51,7 @@ public class MatrixHttpClient {
internal SemaphoreSlim _rateLimitSemaphore { get; } = new(1, 1);
#endif
+ private const bool LogRequests = true;
public Dictionary<string, string> AdditionalQueryParameters { get; set; } = new();
public Uri? BaseAddress { get; set; }
@@ -60,7 +61,7 @@ public class MatrixHttpClient {
typeof(HttpRequestHeaders).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, [], null)?.Invoke([]) as HttpRequestHeaders ??
throw new InvalidOperationException("Failed to create HttpRequestHeaders");
- private JsonSerializerOptions GetJsonSerializerOptions(JsonSerializerOptions? options = null) {
+ private static JsonSerializerOptions GetJsonSerializerOptions(JsonSerializerOptions? options = null) {
options ??= new JsonSerializerOptions();
options.Converters.Add(new JsonFloatStringConverter());
options.Converters.Add(new JsonDoubleStringConverter());
@@ -70,6 +71,10 @@ public class MatrixHttpClient {
}
public async Task<HttpResponseMessage> SendUnhandledAsync(HttpRequestMessage request, CancellationToken cancellationToken) {
+ if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null");
+ // if (!request.RequestUri.IsAbsoluteUri)
+ request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!);
+ var swWait = Stopwatch.StartNew();
#if SYNC_HTTPCLIENT
await _rateLimitSemaphore.WaitAsync(cancellationToken);
#endif
@@ -79,6 +84,9 @@ public class MatrixHttpClient {
if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null");
if (!request.RequestUri.IsAbsoluteUri)
request.RequestUri = new Uri(BaseAddress ?? throw new InvalidOperationException("Relative URI passed, but no BaseAddress is specified!"), request.RequestUri);
+ swWait.Stop();
+ var swExec = Stopwatch.StartNew();
+
foreach (var (key, value) in AdditionalQueryParameters) request.RequestUri = request.RequestUri.AddQuery(key, value);
foreach (var (key, value) in DefaultRequestHeaders) {
if (request.Headers.Contains(key)) continue;
@@ -87,6 +95,9 @@ public class MatrixHttpClient {
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true);
+ if (LogRequests)
+ Console.WriteLine("Sending " + request.Summarise(includeHeaders: true, includeQuery: true, includeContentIfText: true, hideHeaders: ["Accept"]));
+
HttpResponseMessage? responseMessage;
try {
responseMessage = await Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
@@ -110,8 +121,26 @@ public class MatrixHttpClient {
}
#endif
- Console.WriteLine(
- $"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.Content.Headers.ContentLength ?? 0)})");
+ // Console.WriteLine($"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.GetContentLength())}, WAIT={swWait.ElapsedMilliseconds}ms, EXEC={swExec.ElapsedMilliseconds}ms)");
+ if (LogRequests)
+ Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [
+ "Server",
+ "Date",
+ "Transfer-Encoding",
+ "Connection",
+ "Vary",
+ "Content-Length",
+ "Access-Control-Allow-Origin",
+ "Access-Control-Allow-Methods",
+ "Access-Control-Allow-Headers",
+ "Access-Control-Expose-Headers",
+ "Cache-Control",
+ "Cross-Origin-Resource-Policy",
+ "X-Content-Security-Policy",
+ "Referrer-Policy",
+ "X-Robots-Tag",
+ "Content-Security-Policy"
+ ]));
return responseMessage;
}
@@ -228,7 +257,7 @@ public class MatrixHttpClient {
await foreach (var resp in result) yield return resp;
}
- public async Task<bool> CheckSuccessStatus(string url) {
+ public static async Task<bool> CheckSuccessStatus(string url) {
//cors causes failure, try to catch
try {
var resp = await Client.GetAsync(url);
@@ -259,4 +288,4 @@ public class MatrixHttpClient {
return await SendAsync(request);
}
}
-#endif
\ No newline at end of file
+#endif
diff --git a/LibMatrix/Filters/LocalRoomQueryFilter.cs b/LibMatrix/Filters/LocalRoomQueryFilter.cs
deleted file mode 100644
index b3bd4c0..0000000
--- a/LibMatrix/Filters/LocalRoomQueryFilter.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-namespace LibMatrix.Filters;
-
-public class LocalRoomQueryFilter {
- public string RoomIdContains { get; set; } = "";
- public string NameContains { get; set; } = "";
- public string CanonicalAliasContains { get; set; } = "";
- public string VersionContains { get; set; } = "";
- public string CreatorContains { get; set; } = "";
- public string EncryptionContains { get; set; } = "";
- public string JoinRulesContains { get; set; } = "";
- public string GuestAccessContains { get; set; } = "";
- public string HistoryVisibilityContains { get; set; } = "";
-
- public bool Federatable { get; set; } = true;
- public bool Public { get; set; } = true;
-
- public int JoinedMembersGreaterThan { get; set; }
- public int JoinedMembersLessThan { get; set; } = int.MaxValue;
-
- public int JoinedLocalMembersGreaterThan { get; set; }
- public int JoinedLocalMembersLessThan { get; set; } = int.MaxValue;
- public int StateEventsGreaterThan { get; set; }
- public int StateEventsLessThan { get; set; } = int.MaxValue;
-
- public bool CheckFederation { get; set; }
- public bool CheckPublic { get; set; }
-}
\ No newline at end of file
diff --git a/LibMatrix/Helpers/MessageBuilder.cs b/LibMatrix/Helpers/MessageBuilder.cs
index b639e1f..d3bd6a5 100644
--- a/LibMatrix/Helpers/MessageBuilder.cs
+++ b/LibMatrix/Helpers/MessageBuilder.cs
@@ -105,7 +105,7 @@ public class MessageBuilder(string msgType = "m.text", string format = "org.matr
public MessageBuilder WithTable(Action<TableBuilder> tableBuilder) {
var tb = new TableBuilder(this);
- this.WithHtmlTag("table", msb => tableBuilder(tb));
+ WithHtmlTag("table", msb => tableBuilder(tb));
return this;
}
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 05bfb47..adcc714 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,15 +1,18 @@
using System.Diagnostics;
+using System.Net.Http.Json;
using System.Text.Json;
using ArcaneLibs.Collections;
+using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
private SyncFilter? _filter;
private string? _namedFilterName;
private bool _filterIsDirty;
@@ -18,6 +21,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
+ public bool UseInternalStreamingSync { get; set; } = true;
public string? FilterId {
get => _filterId;
@@ -53,6 +57,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public TimeSpan MinimumDelay { get; set; } = new(0);
+ public async Task<int> GetUnoptimisedStoreCount() {
+ if (storageProvider is null) return -1;
+ var keys = await storageProvider.GetAllKeysAsync();
+ return keys.Count(x => !x.StartsWith("old/")) - 1;
+ }
+
private async Task UpdateFilterAsync() {
if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
_filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
@@ -76,6 +86,25 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
+ if (storageProvider is null) return await SyncAsyncInternal(cancellationToken);
+
+ var key = Since ?? "init";
+ if (await storageProvider.ObjectExistsAsync(key)) {
+ var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key);
+ // We explicitly check that NextBatch doesn't match since to prevent infinite loops...
+ if (cached is not null && cached.NextBatch != Since) {
+ logger?.LogInformation("SyncHelper: Using cached sync response for {}", key);
+ return cached;
+ }
+ }
+
+ var sync = await SyncAsyncInternal(cancellationToken);
+ // Ditto here.
+ if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+ return sync;
+ }
+
+ private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null) {
var sw = Stopwatch.StartNew();
if (_filterIsDirty) await UpdateFilterAsync();
@@ -86,15 +115,23 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
// logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
- var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
- if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogTrace("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
- var deserializeSw = Stopwatch.StartNew();
- var stream = await httpResp.Content.ReadAsStreamAsync();
- await using var seekableStream = new SeekableStream(stream);
- var resp = await JsonSerializer.DeserializeAsync<SyncResponse>(seekableStream, cancellationToken: cancellationToken ?? CancellationToken.None,
- jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", seekableStream.Position, deserializeSw.Elapsed, sw.Elapsed);
+ SyncResponse? resp = null;
+ if (UseInternalStreamingSync) {
+ resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
+ logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed);
+ }
+ else {
+ var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
+ if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
+ logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
+ var deserializeSw = Stopwatch.StartNew();
+ // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None);
+ // var resp = jsonResp.Deserialize<SyncResponse>();
+ resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
+ jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
+ }
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
if (timeToWait.TotalMilliseconds > 0)
await Task.Delay(timeToWait);
@@ -214,4 +251,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
-}
\ No newline at end of file
+
+ private void Log(string message) {
+ if (logger is null) Console.WriteLine(message);
+ else logger.LogInformation(message);
+ }
+}
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index 4633f06..282d26f 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,12 +1,18 @@
+using System.Collections.Frozen;
+using System.Collections.Immutable;
+using System.Diagnostics;
+using System.Text;
+using ArcaneLibs.Extensions;
using LibMatrix.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
@@ -15,7 +21,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
public SyncResponse? MergedState { get; set; }
- private SyncHelper _syncHelper = new(homeserver, logger);
+ private SyncHelper _syncHelper = new(homeserver, logger, storageProvider);
public async Task<(SyncResponse next, SyncResponse merged)> ContinueAsync(CancellationToken? cancellationToken = null) {
// copy properties
@@ -24,156 +30,395 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
_syncHelper.SetPresence = SetPresence;
_syncHelper.Filter = Filter;
_syncHelper.FullState = FullState;
- // run sync
+
var sync = await _syncHelper.SyncAsync(cancellationToken);
if (sync is null) return await ContinueAsync(cancellationToken);
- MergedState = MergedState is null ? sync : MergeSyncs(MergedState, sync);
+
+ if (MergedState is null) MergedState = sync;
+ else MergedState = MergeSyncs(MergedState, sync);
Since = sync.NextBatch;
+
return (sync, MergedState);
}
- private SyncResponse MergeSyncs(SyncResponse oldState, SyncResponse newState) {
- oldState.NextBatch = newState.NextBatch;
+ public async Task OptimiseStore(Action<int, int>? progressCallback = null) {
+ if (storageProvider is null) return;
+ if (!await storageProvider.ObjectExistsAsync("init")) return;
+
+ var totalSw = Stopwatch.StartNew();
+ Console.Write("Optimising sync store...");
+ var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init");
+ var keys = (await storageProvider.GetAllKeysAsync()).Where(x => !x.StartsWith("old/")).ToFrozenSet();
+ var count = keys.Count - 1;
+ int total = count;
+ Console.WriteLine($"Found {count} entries to optimise.");
+
+ var merged = await initLoadTask;
+ if (merged is null) return;
+ if (!keys.Contains(merged.NextBatch)) {
+ Console.WriteLine("Next response after initial sync is not present, not checkpointing!");
+ return;
+ }
- oldState.AccountData ??= new EventList();
- oldState.AccountData.Events ??= new List<StateEventResponse>();
- if (newState.AccountData?.Events is not null)
- oldState.AccountData.Events.MergeStateEventLists(newState.AccountData?.Events ?? new List<StateEventResponse>());
+ // We back up old entries
+ var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
+ await storageProvider.MoveObjectAsync("init", $"{oldPath}/init");
+
+ var moveTasks = new List<Task>();
+
+ Dictionary<string, Dictionary<string, TimeSpan>> traces = [];
+ while (keys.Contains(merged.NextBatch)) {
+ Console.Write($"Merging {merged.NextBatch}, {--count} remaining... ");
+ var sw = Stopwatch.StartNew();
+ var swt = Stopwatch.StartNew();
+ var next = await storageProvider.LoadObjectAsync<SyncResponse>(merged.NextBatch);
+ Console.Write($"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ if (next is null || merged.NextBatch == next.NextBatch) break;
+
+ Console.Write($"Check {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ // back up old entry
+ moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}"));
+ Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+
+ var trace = new Dictionary<string, TimeSpan>();
+ traces[merged.NextBatch] = trace;
+ merged = MergeSyncs(merged, next, trace);
+ Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms");
+ // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+ progressCallback?.Invoke(count, total);
+ }
- oldState.Presence ??= new SyncResponse.PresenceDataStructure();
- if (newState.Presence?.Events is { Count: > 0 })
- if (oldState.Presence.Events is { Count: > 0 })
- oldState.Presence.Events.MergeStateEventLists(newState.Presence.Events);
- else
- oldState.Presence.Events = newState.Presence?.Events;
-
- oldState.DeviceOneTimeKeysCount ??= new Dictionary<string, int>();
- if (newState.DeviceOneTimeKeysCount is not null)
- foreach (var (key, value) in newState.DeviceOneTimeKeysCount)
- oldState.DeviceOneTimeKeysCount[key] = value;
-
- oldState.Rooms ??= new SyncResponse.RoomsDataStructure();
- if (newState.Rooms is not null)
- oldState.Rooms = MergeRoomsDataStructure(oldState.Rooms, newState.Rooms);
-
- oldState.ToDevice ??= new EventList();
- oldState.ToDevice.Events ??= new List<StateEventResponse>();
- if (newState.ToDevice?.Events is not null)
- oldState.ToDevice.Events.MergeStateEventLists(newState.ToDevice?.Events ?? new List<StateEventResponse>());
-
- oldState.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
- if (newState.DeviceLists?.Changed is not null) {
- oldState.DeviceLists.Changed ??= new List<string>();
- foreach (var s in newState.DeviceLists.Changed)
- oldState.DeviceLists.Changed.Add(s);
+ var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}"));
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
+ await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
+
+ await storageProvider.SaveObjectAsync("init", merged);
+ await Task.WhenAll(moveTasks);
+
+ Console.WriteLine($"Optimised store in {totalSw.Elapsed.TotalMilliseconds}ms");
+ Console.WriteLine($"Insertions: {EnumerableExtensions.insertions}, replacements: {EnumerableExtensions.replacements}");
+ }
+
+ /// <summary>
+ /// Remove all but initial sync and last checkpoint
+ /// </summary>
+ public async Task RemoveOldSnapshots() {
+ if (storageProvider is null) return;
+ var sw = Stopwatch.StartNew();
+
+ var map = await GetCheckpointMap();
+ if (map is null) return;
+ if (map.Count < 3) return;
+
+ var toRemove = map.Keys.Skip(1).Take(map.Count - 2).ToList();
+ Console.Write("Cleaning up old snapshots: ");
+ foreach (var key in toRemove) {
+ var path = $"old/{key}/init";
+ if (await storageProvider?.ObjectExistsAsync(path)) {
+ Console.Write($"{key}... ");
+ await storageProvider?.DeleteObjectAsync(path);
+ }
}
- if (newState.DeviceLists?.Left is not null) {
- oldState.DeviceLists.Left ??= new List<string>();
- foreach (var s in newState.DeviceLists.Left)
- oldState.DeviceLists.Left.Add(s);
+ Console.WriteLine("Done!");
+ Console.WriteLine($"Removed {toRemove.Count} old snapshots in {sw.Elapsed.TotalMilliseconds}ms");
+ }
+
+ public async Task UnrollOptimisedStore() {
+ if (storageProvider is null) return;
+ Console.WriteLine("WARNING: Unrolling sync store!");
+ }
+
+ public async Task SquashOptimisedStore(int targetCountPerCheckpoint) {
+ Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint...");
+ var checkpoints = await GetCheckpointMap();
+ if (checkpoints is null) return;
+
+ Console.WriteLine(
+ $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries");
+ Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints.");
+ }
+
+ public async Task dev() {
+ int i = 0;
+ var sw = Stopwatch.StartNew();
+ var hist = GetSerialisedHistory();
+ await foreach (var (key, resp) in hist) {
+ if (resp is null) continue;
+ // Console.WriteLine($"[{++i}] {key} -> {resp.NextBatch} ({resp.GetDerivedSyncTime()})");
+ i++;
}
- return oldState;
+ Console.WriteLine($"Iterated {i} syncResponses in {sw.Elapsed}");
+ Environment.Exit(0);
}
-#region Merge rooms
+ private async IAsyncEnumerable<(string key, SyncResponse? resp)> GetSerialisedHistory() {
+ if (storageProvider is null) yield break;
+ var map = await GetCheckpointMap();
+ var currentRange = map.First();
+ var nextKey = $"old/{map.First().Key}/init";
+ var next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ while (true) {
+ var data = await next;
+ if (data is null) break;
+ yield return (nextKey, data);
+ if (currentRange.Value.Contains(data.NextBatch)) {
+ nextKey = $"old/{currentRange.Key}/{data.NextBatch}";
+ }
+ else if (map.Any(x => x.Value.Contains(data.NextBatch))) {
+ currentRange = map.First(x => x.Value.Contains(data.NextBatch));
+ nextKey = $"old/{currentRange.Key}/{data.NextBatch}";
+ }
+ else if (await storageProvider.ObjectExistsAsync(data.NextBatch)) {
+ nextKey = data.NextBatch;
+ }
+ else break;
+
+ next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ }
+ }
+
+ public async Task<SyncResponse?> GetMergedUpTo(DateTime time) {
+ if (storageProvider is null) return null;
+ var unixTime = new DateTimeOffset(time.ToUniversalTime()).ToUnixTimeMilliseconds();
+ var map = await GetCheckpointMap();
+ if (map is null) return new();
+ var stream = GetSerialisedHistory().GetAsyncEnumerator();
+ SyncResponse? merged = await stream.MoveNextAsync() ? stream.Current.resp : null;
+
+ if (merged.GetDerivedSyncTime() > unixTime) {
+ Console.WriteLine("Initial sync is already past the target time!");
+ Console.WriteLine($"CURRENT: {merged.GetDerivedSyncTime()} (UTC: {DateTimeOffset.FromUnixTimeMilliseconds(merged.GetDerivedSyncTime())})");
+ Console.WriteLine($" TARGET: {unixTime} ({time.Kind}: {time}, UTC: {time.ToUniversalTime()})");
+ return null;
+ }
+
+ while (await stream.MoveNextAsync()) {
+ var (key, resp) = stream.Current;
+ if (resp is null) continue;
+ if (resp.GetDerivedSyncTime() > unixTime) break;
+ merged = MergeSyncs(merged, resp);
+ }
+
+ return merged;
+ }
- private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure oldState, SyncResponse.RoomsDataStructure newState) {
- oldState.Join ??= new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>();
- foreach (var (key, value) in newState.Join ?? new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>())
- if (!oldState.Join.TryAdd(key, value))
- oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value);
-
- oldState.Invite ??= new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>();
- foreach (var (key, value) in newState.Invite ?? new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>())
- if (!oldState.Invite.TryAdd(key, value))
- oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value);
-
- oldState.Leave ??= new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>();
- foreach (var (key, value) in newState.Leave ?? new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>()) {
- if (!oldState.Leave.TryAdd(key, value))
- oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value);
- if (oldState.Invite.ContainsKey(key)) oldState.Invite.Remove(key);
- if (oldState.Join.ContainsKey(key)) oldState.Join.Remove(key);
+ private async Task<ImmutableSortedDictionary<ulong, FrozenSet<string>>> GetCheckpointMap() {
+ if (storageProvider is null) return null;
+ var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+ var map = new Dictionary<ulong, List<string>>();
+ foreach (var key in keys) {
+ if (!key.StartsWith("old/")) continue;
+ var parts = key.Split('/');
+ if (parts.Length < 3) continue;
+ if (!ulong.TryParse(parts[1], out var checkpoint)) continue;
+ if (!map.ContainsKey(checkpoint)) map[checkpoint] = new();
+ map[checkpoint].Add(parts[2]);
}
+ return map.OrderBy(x => x.Key).ToImmutableSortedDictionary(x => x.Key, x => x.Value.ToFrozenSet());
+ }
+
+ private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) {
+ var sw = Stopwatch.StartNew();
+ oldSync.NextBatch = newSync.NextBatch ?? oldSync.NextBatch;
+
+ oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData);
+ trace?.Add("AccountData", sw.GetElapsedAndRestart());
+
+ oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
+ trace?.Add("Presence", sw.GetElapsedAndRestart());
+
+ // TODO: can this be cleaned up?
+ oldSync.DeviceOneTimeKeysCount ??= new();
+ if (newSync.DeviceOneTimeKeysCount is not null)
+ foreach (var (key, value) in newSync.DeviceOneTimeKeysCount)
+ oldSync.DeviceOneTimeKeysCount[key] = value;
+ trace?.Add("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart());
+
+ if (newSync.Rooms is not null)
+ oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, trace);
+ trace?.Add("Rooms", sw.GetElapsedAndRestart());
+
+ oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice);
+ trace?.Add("ToDevice", sw.GetElapsedAndRestart());
+
+ oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
+ oldSync.DeviceLists.Changed ??= [];
+ oldSync.DeviceLists.Left ??= [];
+ if (newSync.DeviceLists?.Changed is not null)
+ foreach (var s in newSync.DeviceLists.Changed!) {
+ oldSync.DeviceLists.Left.Remove(s);
+ oldSync.DeviceLists.Changed.Add(s);
+ }
+
+ trace?.Add("DeviceLists.Changed", sw.GetElapsedAndRestart());
+
+ if (newSync.DeviceLists?.Left is not null)
+ foreach (var s in newSync.DeviceLists.Left!) {
+ oldSync.DeviceLists.Changed.Remove(s);
+ oldSync.DeviceLists.Left.Add(s);
+ }
+
+ trace?.Add("DeviceLists.Left", sw.GetElapsedAndRestart());
+
+ return oldSync;
+ }
+
+#region Merge rooms
+
+ private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState,
+ Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
+ if (oldState is null) return newState;
+
+ if (newState.Join is { Count: > 0 })
+ if (oldState.Join is null)
+ oldState.Join = newState.Join;
+ else
+ foreach (var (key, value) in newState.Join)
+ if (!oldState.Join.TryAdd(key, value))
+ oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, trace);
+ trace?.Add("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart());
+
+ if (newState.Invite is { Count: > 0 })
+ if (oldState.Invite is null)
+ oldState.Invite = newState.Invite;
+ else
+ foreach (var (key, value) in newState.Invite)
+ if (!oldState.Invite.TryAdd(key, value))
+ oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, trace);
+ trace?.Add("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart());
+
+ if (newState.Leave is { Count: > 0 })
+ if (oldState.Leave is null)
+ oldState.Leave = newState.Leave;
+ else
+ foreach (var (key, value) in newState.Leave) {
+ if (!oldState.Leave.TryAdd(key, value))
+ oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, trace);
+ if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key);
+ if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key);
+ }
+ trace?.Add("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart());
+
return oldState;
}
- private SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= new List<StateEventResponse>();
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= new List<StateEventResponse>();
- oldData.State ??= new EventList();
- oldData.State.Events ??= new List<StateEventResponse>();
+ private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>());
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
+ trace?.Add($"LeftRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>());
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
+ trace?.Add($"LeftRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>());
+ oldData.State = MergeEventList(oldData.State, newData.State);
+ trace?.Add($"LeftRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
- private SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData) {
- oldData.InviteState ??= new EventList();
- oldData.InviteState.Events ??= new List<StateEventResponse>();
- if (newData.InviteState?.Events is not null)
- oldData.InviteState.Events.MergeStateEventLists(newData.InviteState?.Events ?? new List<StateEventResponse>());
+ private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
+ oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState);
+ trace?.Add($"InvitedRoomDataStructure.InviteState/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
- private SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= new List<StateEventResponse>();
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= new List<StateEventResponse>();
- oldData.State ??= new EventList();
- oldData.State.Events ??= new List<StateEventResponse>();
- oldData.Ephemeral ??= new EventList();
- oldData.Ephemeral.Events ??= new List<StateEventResponse>();
+ private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>());
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
+ trace?.Add($"JoinedRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>());
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
+ trace?.Add($"JoinedRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>());
+ oldData.State = MergeEventList(oldData.State, newData.State);
+ trace?.Add($"JoinedRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.Ephemeral?.Events is not null)
- oldData.Ephemeral.Events.MergeStateEventLists(newData.Ephemeral?.Events ?? new List<StateEventResponse>());
+ oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral);
+ trace?.Add($"JoinedRoomDataStructure.Ephemeral/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure();
oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount;
oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount;
+ trace?.Add($"JoinedRoom$DataStructure.UnreadNotifications/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (oldData.Summary is null)
+ oldData.Summary = newData.Summary;
+ else {
+ oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
+ oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
+ oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ }
- oldData.Summary ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.SummaryDataStructure {
- Heroes = newData.Summary?.Heroes ?? oldData.Summary?.Heroes,
- JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary?.JoinedMemberCount,
- InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary?.InvitedMemberCount
- };
- oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
- oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
- oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ trace?.Add($"JoinedRoomDataStructure.Summary/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
#endregion
+
+ private static EventList? MergeEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.MergeStateEventLists(newState.Events);
+ return oldState;
+ }
+
+ private static EventList? MergeEventListBy(EventList? oldState, EventList? newState, Func<StateEventResponse, StateEventResponse, bool> comparer) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.ReplaceBy(newState.Events, comparer);
+ return oldState;
+ }
+
+ private static EventList? AppendEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.AddRange(newState.Events);
+ return oldState;
+ }
}
\ No newline at end of file
diff --git a/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs b/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs
index 8c95bc3..c1bbc5a 100644
--- a/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs
+++ b/LibMatrix/Homeservers/AuthenticatedHomeserverGeneric.cs
@@ -47,7 +47,6 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver {
public HsNamedCaches NamedCaches { get; set; }
public GenericRoom GetRoom(string roomId) {
- if (roomId is null || !roomId.StartsWith("!")) throw new ArgumentException("Room ID must start with !", nameof(roomId));
return new GenericRoom(this, roomId);
}
@@ -185,6 +184,17 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver {
#endregion
+#region MSC 4133
+
+ public async Task UpdateProfilePropertyAsync(string name, object? value) {
+ var caps = await GetCapabilitiesAsync();
+ if(caps is null) throw new Exception("Failed to get capabilities");
+
+ }
+
+#endregion
+
+ [Obsolete("This method assumes no support for MSC 4069 and MSC 4133")]
public async Task UpdateProfileAsync(UserProfileResponse? newProfile, bool preserveCustomRoomProfile = true) {
if (newProfile is null) return;
Console.WriteLine($"Updating profile for {WhoAmI.UserId} to {newProfile.ToJson(ignoreNull: true)} (preserving room profiles: {preserveCustomRoomProfile})");
@@ -522,4 +532,8 @@ public class AuthenticatedHomeserverGeneric : RemoteHomeserver {
}
#endregion
-}
\ No newline at end of file
+ private class CapabilitiesResponse {
+ [JsonPropertyName("capabilities")]
+ public Dictionary<string, object>? Capabilities { get; set; }
+ }
+}
diff --git a/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs b/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs
index 3ed7311..a48402a 100644
--- a/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs
+++ b/LibMatrix/Homeservers/ImplementationDetails/Synapse/SynapseAdminApiClient.cs
@@ -3,11 +3,19 @@
using System.Net.Http.Json;
using System.Text.Json;
using System.Text.Json.Nodes;
+using System.Net.Http.Json;
+using System.Text.Json;
+using System.Text.Json.Nodes;
+using System.Text.Json.Serialization;
using ArcaneLibs.Extensions;
using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Filters;
using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Requests;
using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Responses;
using LibMatrix.Responses;
+using LibMatrix.Filters;
+using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Filters;
+using LibMatrix.Homeservers.ImplementationDetails.Synapse.Models.Responses;
+using LibMatrix.Responses;
namespace LibMatrix.Homeservers.ImplementationDetails.Synapse;
diff --git a/LibMatrix/Homeservers/RemoteHomeServer.cs b/LibMatrix/Homeservers/RemoteHomeServer.cs
index 4ee523f..45ecb18 100644
--- a/LibMatrix/Homeservers/RemoteHomeServer.cs
+++ b/LibMatrix/Homeservers/RemoteHomeServer.cs
@@ -54,6 +54,8 @@ public class RemoteHomeserver {
return data;
}
+
+ // TODO: Do we need to support retrieving individual profile properties? Is there any use for that besides just getting the full profile?
public async Task<ClientVersionsResponse> GetClientVersionsAsync() {
var resp = await ClientHttpClient.GetAsync($"/_matrix/client/versions");
diff --git a/LibMatrix/Interfaces/Services/IStorageProvider.cs b/LibMatrix/Interfaces/Services/IStorageProvider.cs
index 165e7df..fb7bb6d 100644
--- a/LibMatrix/Interfaces/Services/IStorageProvider.cs
+++ b/LibMatrix/Interfaces/Services/IStorageProvider.cs
@@ -31,7 +31,7 @@ public interface IStorageProvider {
}
// get all keys
- public Task<List<string>> GetAllKeysAsync() {
+ public Task<IEnumerable<string>> GetAllKeysAsync() {
Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement GetAllKeys()!");
throw new NotImplementedException();
}
@@ -53,4 +53,18 @@ public interface IStorageProvider {
Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement LoadStream(key)!");
throw new NotImplementedException();
}
+
+ // copy
+ public async Task CopyObjectAsync(string sourceKey, string destKey) {
+ Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement CopyObject(sourceKey, destKey), using load + save!");
+ var data = await LoadObjectAsync<object>(sourceKey);
+ await SaveObjectAsync(destKey, data);
+ }
+
+ // move
+ public async Task MoveObjectAsync(string sourceKey, string destKey) {
+ Console.WriteLine($"StorageProvider<{GetType().Name}> does not implement MoveObject(sourceKey, destKey), using copy + delete!");
+ await CopyObjectAsync(sourceKey, destKey);
+ await DeleteObjectAsync(sourceKey);
+ }
}
\ No newline at end of file
diff --git a/LibMatrix/MatrixException.cs b/LibMatrix/MatrixException.cs
index 3a79af8..519f99e 100644
--- a/LibMatrix/MatrixException.cs
+++ b/LibMatrix/MatrixException.cs
@@ -26,43 +26,47 @@ public class MatrixException : Exception {
public string GetAsJson() => GetAsObject().ToJson(ignoreNull: true);
public override string Message =>
- $"{ErrorCode}: {ErrorCode switch {
- // common
- "M_FORBIDDEN" => $"You do not have permission to perform this action: {Error}",
- "M_UNKNOWN_TOKEN" => $"The access token specified was not recognised: {Error}{(SoftLogout == true ? " (soft logout)" : "")}",
- "M_MISSING_TOKEN" => $"No access token was specified: {Error}",
- "M_BAD_JSON" => $"Request contained valid JSON, but it was malformed in some way: {Error}",
- "M_NOT_JSON" => $"Request did not contain valid JSON: {Error}",
- "M_NOT_FOUND" => $"The requested resource was not found: {Error}",
- "M_LIMIT_EXCEEDED" => $"Too many requests have been sent in a short period of time. Wait a while then try again: {Error}",
- "M_UNRECOGNISED" => $"The server did not recognise the request: {Error}",
- "M_UNKOWN" => $"The server encountered an unexpected error: {Error}",
- // endpoint specific
- "M_UNAUTHORIZED" => $"The request did not contain valid authentication information for the target of the request: {Error}",
- "M_USER_DEACTIVATED" => $"The user ID associated with the request has been deactivated: {Error}",
- "M_USER_IN_USE" => $"The user ID associated with the request is already in use: {Error}",
- "M_INVALID_USERNAME" => $"The requested user ID is not valid: {Error}",
- "M_ROOM_IN_USE" => $"The room alias requested is already taken: {Error}",
- "M_INVALID_ROOM_STATE" => $"The room associated with the request is not in a valid state to perform the request: {Error}",
- "M_THREEPID_IN_USE" => $"The threepid requested is already associated with a user ID on this server: {Error}",
- "M_THREEPID_NOT_FOUND" => $"The threepid requested is not associated with any user ID: {Error}",
- "M_THREEPID_AUTH_FAILED" => $"The provided threepid and/or token was invalid: {Error}",
- "M_THREEPID_DENIED" => $"The homeserver does not permit the third party identifier in question: {Error}",
- "M_SERVER_NOT_TRUSTED" => $"The homeserver does not trust the identity server: {Error}",
- "M_UNSUPPORTED_ROOM_VERSION" => $"The room version is not supported: {Error}",
- "M_INCOMPATIBLE_ROOM_VERSION" => $"The room version is incompatible: {Error}",
- "M_BAD_STATE" => $"The request was invalid because the state was invalid: {Error}",
- "M_GUEST_ACCESS_FORBIDDEN" => $"Guest access is forbidden: {Error}",
- "M_CAPTCHA_NEEDED" => $"Captcha needed: {Error}",
- "M_CAPTCHA_INVALID" => $"Captcha invalid: {Error}",
- "M_MISSING_PARAM" => $"Missing parameter: {Error}",
- "M_INVALID_PARAM" => $"Invalid parameter: {Error}",
- "M_TOO_LARGE" => $"The request or entity was too large: {Error}",
- "M_EXCLUSIVE" => $"The resource being requested is reserved by an application service, or the application service making the request has not created the resource: {Error}",
- "M_RESOURCE_LIMIT_EXCEEDED" => $"Exceeded resource limit: {Error}",
- "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" => $"Cannot leave server notice room: {Error}",
- _ => $"Unknown error: {new { ErrorCode, Error, SoftLogout, RetryAfterMs }.ToJson(ignoreNull: true)}"
- }}";
+ $"{ErrorCode}: " +
+ (!string.IsNullOrWhiteSpace(Error)
+ ? Error
+ : ErrorCode switch {
+ // common
+ "M_FORBIDDEN" => $"You do not have permission to perform this action: {Error}",
+ "M_UNKNOWN_TOKEN" => $"The access token specified was not recognised: {Error}{(SoftLogout == true ? " (soft logout)" : "")}",
+ "M_MISSING_TOKEN" => $"No access token was specified: {Error}",
+ "M_BAD_JSON" => $"Request contained valid JSON, but it was malformed in some way: {Error}",
+ "M_NOT_JSON" => $"Request did not contain valid JSON: {Error}",
+ "M_NOT_FOUND" => $"The requested resource was not found: {Error}",
+ "M_LIMIT_EXCEEDED" => $"Too many requests have been sent in a short period of time. Wait a while then try again: {Error}",
+ "M_UNRECOGNISED" => $"The server did not recognise the request: {Error}",
+ "M_UNKOWN" => $"The server encountered an unexpected error: {Error}",
+ // endpoint specific
+ "M_UNAUTHORIZED" => $"The request did not contain valid authentication information for the target of the request: {Error}",
+ "M_USER_DEACTIVATED" => $"The user ID associated with the request has been deactivated: {Error}",
+ "M_USER_IN_USE" => $"The user ID associated with the request is already in use: {Error}",
+ "M_INVALID_USERNAME" => $"The requested user ID is not valid: {Error}",
+ "M_ROOM_IN_USE" => $"The room alias requested is already taken: {Error}",
+ "M_INVALID_ROOM_STATE" => $"The room associated with the request is not in a valid state to perform the request: {Error}",
+ "M_THREEPID_IN_USE" => $"The threepid requested is already associated with a user ID on this server: {Error}",
+ "M_THREEPID_NOT_FOUND" => $"The threepid requested is not associated with any user ID: {Error}",
+ "M_THREEPID_AUTH_FAILED" => $"The provided threepid and/or token was invalid: {Error}",
+ "M_THREEPID_DENIED" => $"The homeserver does not permit the third party identifier in question: {Error}",
+ "M_SERVER_NOT_TRUSTED" => $"The homeserver does not trust the identity server: {Error}",
+ "M_UNSUPPORTED_ROOM_VERSION" => $"The room version is not supported: {Error}",
+ "M_INCOMPATIBLE_ROOM_VERSION" => $"The room version is incompatible: {Error}",
+ "M_BAD_STATE" => $"The request was invalid because the state was invalid: {Error}",
+ "M_GUEST_ACCESS_FORBIDDEN" => $"Guest access is forbidden: {Error}",
+ "M_CAPTCHA_NEEDED" => $"Captcha needed: {Error}",
+ "M_CAPTCHA_INVALID" => $"Captcha invalid: {Error}",
+ "M_MISSING_PARAM" => $"Missing parameter: {Error}",
+ "M_INVALID_PARAM" => $"Invalid parameter: {Error}",
+ "M_TOO_LARGE" => $"The request or entity was too large: {Error}",
+ "M_EXCLUSIVE" =>
+ $"The resource being requested is reserved by an application service, or the application service making the request has not created the resource: {Error}",
+ "M_RESOURCE_LIMIT_EXCEEDED" => $"Exceeded resource limit: {Error}",
+ "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" => $"Cannot leave server notice room: {Error}",
+ _ => $"Unknown error: {new { ErrorCode, Error, SoftLogout, RetryAfterMs }.ToJson(ignoreNull: true)}"
+ });
[SuppressMessage("ReSharper", "InconsistentNaming", Justification = "Follows spec naming")]
public static class ErrorCodes {
diff --git a/LibMatrix/Responses/SyncResponse.cs b/LibMatrix/Responses/SyncResponse.cs
index 529bd5c..977de3e 100644
--- a/LibMatrix/Responses/SyncResponse.cs
+++ b/LibMatrix/Responses/SyncResponse.cs
@@ -1,4 +1,7 @@
using System.Text.Json.Serialization;
+using LibMatrix.EventTypes.Spec.Ephemeral;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.EventTypes.Spec.State.RoomInfo;
namespace LibMatrix.Responses;
@@ -14,7 +17,7 @@ public class SyncResponse {
public EventList? AccountData { get; set; }
[JsonPropertyName("presence")]
- public PresenceDataStructure? Presence { get; set; }
+ public EventList? Presence { get; set; }
[JsonPropertyName("device_one_time_keys_count")]
public Dictionary<string, int>? DeviceOneTimeKeysCount { get; set; }
@@ -61,6 +64,12 @@ public class SyncResponse {
[JsonPropertyName("state")]
public EventList? State { get; set; }
+
+ public override string ToString() {
+ var lastEvent = Timeline?.Events?.LastOrDefault(x => x.Type == "m.room.member");
+ var membership = (lastEvent?.TypedContent as RoomMemberEventContent);
+ return $"LeftRoomDataStructure: {lastEvent?.Sender} {membership?.Membership} ({membership?.Reason})";
+ }
}
public class JoinedRoomDataStructure {
@@ -82,7 +91,7 @@ public class SyncResponse {
[JsonPropertyName("summary")]
public SummaryDataStructure? Summary { get; set; }
- public class TimelineDataStructure {
+ public class TimelineDataStructure : EventList {
public TimelineDataStructure() { }
public TimelineDataStructure(List<StateEventResponse>? events, bool? limited) {
@@ -90,8 +99,8 @@ public class SyncResponse {
Limited = limited;
}
- [JsonPropertyName("events")]
- public List<StateEventResponse>? Events { get; set; }
+ // [JsonPropertyName("events")]
+ // public List<StateEventResponse>? Events { get; set; }
[JsonPropertyName("prev_batch")]
public string? PrevBatch { get; set; }
@@ -125,4 +134,15 @@ public class SyncResponse {
public EventList? InviteState { get; set; }
}
}
-}
\ No newline at end of file
+
+ public long GetDerivedSyncTime() {
+ return ((long[]) [
+ AccountData?.Events?.Max(x => x.OriginServerTs) ?? 0,
+ Presence?.Events?.Max(x => x.OriginServerTs) ?? 0,
+ ToDevice?.Events?.Max(x => x.OriginServerTs) ?? 0,
+ Rooms?.Join?.Values?.Max(x => x.Timeline?.Events?.Max(y => y.OriginServerTs)) ?? 0,
+ Rooms?.Invite?.Values?.Max(x => x.InviteState?.Events?.Max(y => y.OriginServerTs)) ?? 0,
+ Rooms?.Leave?.Values?.Max(x => x.Timeline?.Events?.Max(y => y.OriginServerTs)) ?? 0
+ ]).Max();
+ }
+}
diff --git a/LibMatrix/Responses/UserProfileResponse.cs b/LibMatrix/Responses/UserProfileResponse.cs
index 6c9380f..30e4c32 100644
--- a/LibMatrix/Responses/UserProfileResponse.cs
+++ b/LibMatrix/Responses/UserProfileResponse.cs
@@ -1,3 +1,4 @@
+using System.Text.Json;
using System.Text.Json.Serialization;
namespace LibMatrix.Responses;
@@ -8,4 +9,18 @@ public class UserProfileResponse {
[JsonPropertyName("displayname")]
public string? DisplayName { get; set; }
+
+ // MSC 4133 - Extending User Profile API with Key:Value pairs
+ [JsonExtensionData]
+ public Dictionary<string, JsonElement>? CustomKeys { get; set; }
+
+ public JsonElement? this[string key] {
+ get => CustomKeys?[key];
+ set {
+ if (value is null)
+ CustomKeys?.Remove(key);
+ else
+ (CustomKeys ??= [])[key] = value.Value;
+ }
+ }
}
\ No newline at end of file
diff --git a/LibMatrix/RoomTypes/GenericRoom.cs b/LibMatrix/RoomTypes/GenericRoom.cs
index fb56f2e..ec61a33 100644
--- a/LibMatrix/RoomTypes/GenericRoom.cs
+++ b/LibMatrix/RoomTypes/GenericRoom.cs
@@ -1,5 +1,6 @@
using System.Collections.Frozen;
using System.Net.Http.Json;
+using System.Security.Cryptography;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
@@ -327,8 +328,8 @@ public class GenericRoom {
catch {
try {
var alias = await GetCanonicalAliasAsync();
- if (alias?.Alias is not null) return alias.Alias;
- throw new Exception("No name or alias");
+ if (!string.IsNullOrWhiteSpace(alias?.Alias)) return alias.Alias;
+ throw new Exception("No alias");
}
catch {
try {
@@ -336,7 +337,8 @@ public class GenericRoom {
var memberList = new List<string>();
var memberCount = 0;
await foreach (var member in members)
- memberList.Add(member.RawContent?["displayname"]?.GetValue<string>() ?? "");
+ if (member.StateKey != Homeserver.UserId)
+ memberList.Add(member.RawContent?["displayname"]?.GetValue<string>() ?? "");
memberCount = memberList.Count;
memberList.RemoveAll(string.IsNullOrWhiteSpace);
memberList = memberList.OrderBy(x => x).ToList();
@@ -554,4 +556,4 @@ public class GenericRoom {
public class RoomIdResponse {
[JsonPropertyName("room_id")]
public string RoomId { get; set; }
-}
\ No newline at end of file
+}
diff --git a/LibMatrix/Services/HomeserverProviderService.cs b/LibMatrix/Services/HomeserverProviderService.cs
index 601087d..36bc828 100644
--- a/LibMatrix/Services/HomeserverProviderService.cs
+++ b/LibMatrix/Services/HomeserverProviderService.cs
@@ -22,9 +22,11 @@ public class HomeserverProviderService(ILogger<HomeserverProviderService> logger
AuthenticatedHomeserverGeneric? hs = null;
if (!useGeneric) {
+ var clientVersionsTask = rhs.GetClientVersionsAsync();
+ var serverVersionTask = rhs.FederationClient?.GetServerVersionAsync() ?? Task.FromResult<ServerVersionResponse?>(null)!;
ClientVersionsResponse clientVersions = new();
try {
- clientVersions = await rhs.GetClientVersionsAsync();
+ clientVersions = await clientVersionsTask;
}
catch (Exception e) {
logger.LogError(e, "Failed to get client versions for {homeserver}", homeserver);
@@ -32,7 +34,7 @@ public class HomeserverProviderService(ILogger<HomeserverProviderService> logger
ServerVersionResponse? serverVersion;
try {
- serverVersion = await (rhs.FederationClient?.GetServerVersionAsync() ?? Task.FromResult<ServerVersionResponse?>(null)!);
+ serverVersion = await serverVersionTask;
}
catch (Exception e) {
logger.LogWarning(e, "Failed to get server version for {homeserver}", homeserver);
diff --git a/LibMatrix/Services/HomeserverResolverService.cs b/LibMatrix/Services/HomeserverResolverService.cs
index 01b11cc..53cd2dd 100644
--- a/LibMatrix/Services/HomeserverResolverService.cs
+++ b/LibMatrix/Services/HomeserverResolverService.cs
@@ -62,9 +62,9 @@ public class HomeserverResolverService {
clientWellKnown ??= await _httpClient.TryGetFromJsonAsync<ClientWellKnown>($"http://{homeserver}/.well-known/matrix/client");
if (clientWellKnown is null) {
- if (await _httpClient.CheckSuccessStatus($"https://{homeserver}/_matrix/client/versions"))
+ if (await MatrixHttpClient.CheckSuccessStatus($"https://{homeserver}/_matrix/client/versions"))
return $"https://{homeserver}";
- if (await _httpClient.CheckSuccessStatus($"http://{homeserver}/_matrix/client/versions"))
+ if (await MatrixHttpClient.CheckSuccessStatus($"http://{homeserver}/_matrix/client/versions"))
return $"http://{homeserver}";
}
}
@@ -100,16 +100,16 @@ public class HomeserverResolverService {
var resolved = serverWellKnown.Homeserver.TrimEnd('/');
if (resolved.StartsWith("https://") || resolved.StartsWith("http://"))
return resolved;
- if (await _httpClient.CheckSuccessStatus($"https://{resolved}/_matrix/federation/v1/version"))
+ if (await MatrixHttpClient.CheckSuccessStatus($"https://{resolved}/_matrix/federation/v1/version"))
return $"https://{resolved}";
- if (await _httpClient.CheckSuccessStatus($"http://{resolved}/_matrix/federation/v1/version"))
+ if (await MatrixHttpClient.CheckSuccessStatus($"http://{resolved}/_matrix/federation/v1/version"))
return $"http://{resolved}";
_logger.LogWarning("Server well-known points to invalid server: {resolved}", resolved);
}
// fallback: most servers host C2S and S2S on the same domain
var clientUrl = (await _tryResolveClientEndpoint(homeserver)).TrimEnd('/');
- if (clientUrl is not null && await _httpClient.CheckSuccessStatus($"{clientUrl}/_matrix/federation/v1/version"))
+ if (clientUrl is not null && await MatrixHttpClient.CheckSuccessStatus($"{clientUrl}/_matrix/federation/v1/version"))
return clientUrl;
_logger.LogInformation("No server well-known for {server}...", homeserver);
diff --git a/LibMatrix/StateEvent.cs b/LibMatrix/StateEvent.cs
index 6d8f195..ef760e1 100644
--- a/LibMatrix/StateEvent.cs
+++ b/LibMatrix/StateEvent.cs
@@ -141,7 +141,7 @@ public class StateEventResponse : StateEvent {
public string? EventId { get; set; }
public class UnsignedData {
- [JsonPropertyName("age")]
+ [JsonPropertyName("age"), JsonNumberHandling(JsonNumberHandling.AllowReadingFromString)]
public ulong? Age { get; set; }
[JsonPropertyName("redacted_because")]
diff --git a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
index c15fe7d..b6fe7c2 100644
--- a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
+++ b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
@@ -120,7 +120,7 @@ public class RoomStore {
public Room(string roomId) {
if (string.IsNullOrWhiteSpace(roomId)) throw new ArgumentException("Value cannot be null or whitespace.", nameof(roomId));
- if (roomId[0] != '!') throw new ArgumentException("Room ID must start with !", nameof(roomId));
+ if (roomId[0] != '!') throw new ArgumentException($"Room ID must start with '!', provided value: {roomId ?? "null"}", nameof(roomId));
RoomId = roomId;
Timeline = new();
AccountData = new();
diff --git a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
index ca6a4d8..8501d41 100644
--- a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
+++ b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
@@ -18,7 +18,7 @@ public class BotInstaller(IServiceCollection services) {
public BotInstaller AddMatrixBot() {
services.AddSingleton<LibMatrixBotConfiguration>();
- services.AddScoped<AuthenticatedHomeserverGeneric>(x => {
+ services.AddSingleton<AuthenticatedHomeserverGeneric>(x => {
var config = x.GetService<LibMatrixBotConfiguration>() ?? throw new Exception("No configuration found!");
var hsProvider = x.GetService<HomeserverProviderService>() ?? throw new Exception("No homeserver provider found!");
diff --git a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
index b5b5a2b..d07090f 100644
--- a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
+++ b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
@@ -1,3 +1,4 @@
+using ArcaneLibs.Extensions;
using LibMatrix.EventTypes.Spec;
using LibMatrix.EventTypes.Spec.State.RoomInfo;
using LibMatrix.Filters;
@@ -57,22 +58,44 @@ public class CommandListenerHostedService : IHostedService {
FilterId = filter
};
- syncHelper.TimelineEventHandlers.Add(async @event => {
- try {
- var room = _hs.GetRoom(@event.RoomId);
- // _logger.LogInformation(eventResponse.ToJson(indent: false));
- if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message })
- if (message is { MessageType: "m.text" }) {
- var usedPrefix = await GetUsedPrefix(@event);
- if (usedPrefix is null) return;
- var res = await InvokeCommand(@event, usedPrefix);
- await (_commandResultHandler?.Invoke(res) ?? HandleResult(res));
+ syncHelper.SyncReceivedHandlers.Add(async sync => {
+ _logger.LogInformation("Sync received!");
+ foreach (var roomResp in sync.Rooms?.Join ?? []) {
+ if (roomResp.Value.Timeline?.Events is null or { Count: > 5 }) continue;
+ foreach (var @event in roomResp.Value.Timeline.Events) {
+ @event.RoomId = roomResp.Key;
+ try {
+ // var room = _hs.GetRoom(@event.RoomId);
+ // _logger.LogInformation(eventResponse.ToJson(indent: false));
+ if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message })
+ if (message is { MessageType: "m.text" }) {
+ var usedPrefix = await GetUsedPrefix(@event);
+ if (usedPrefix is null) return;
+ var res = await InvokeCommand(@event, usedPrefix);
+ await (_commandResultHandler?.Invoke(res) ?? HandleResult(res));
+ }
}
- }
- catch (Exception e) {
- _logger.LogError(e, "Error in command listener!");
+ catch (Exception e) {
+ _logger.LogError(e, "Error in command listener!");
+ Console.WriteLine(@event.ToJson(ignoreNull: false, indent: true));
+ var fakeResult = new CommandResult() {
+ Result = CommandResult.CommandResultType.Failure_Exception,
+ Exception = e,
+ Success = false,
+ Context = new() {
+ Homeserver = _hs,
+ CommandName = "[CommandListener.SyncHandler]",
+ Room = _hs.GetRoom(roomResp.Key),
+ Args = [],
+ MessageEvent = @event
+ }
+ };
+ await (_commandResultHandler?.Invoke(fakeResult) ?? HandleResult(fakeResult));
+ }
+ }
}
});
+
await syncHelper.RunSyncLoopAsync(cancellationToken: cancellationToken);
}
|