diff --git a/LibMatrix/Extensions/EnumerableExtensions.cs b/LibMatrix/Extensions/EnumerableExtensions.cs
index 42d9491..ace2c0c 100644
--- a/LibMatrix/Extensions/EnumerableExtensions.cs
+++ b/LibMatrix/Extensions/EnumerableExtensions.cs
@@ -1,29 +1,91 @@
+using System.Collections.Frozen;
+using System.Collections.Immutable;
+
namespace LibMatrix.Extensions;
public static class EnumerableExtensions {
+ public static int insertions = 0;
+ public static int replacements = 0;
+
public static void MergeStateEventLists(this IList<StateEvent> oldState, IList<StateEvent> newState) {
- foreach (var stateEvent in newState) {
- var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
- if (old is null) {
- oldState.Add(stateEvent);
- continue;
+ // foreach (var stateEvent in newState) {
+ // var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
+ // if (old is null) {
+ // oldState.Add(stateEvent);
+ // continue;
+ // }
+ //
+ // oldState.Remove(old);
+ // oldState.Add(stateEvent);
+ // }
+
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ break;
+ case var index:
+ oldState[index] = e;
+ break;
}
+ }
- oldState.Remove(old);
- oldState.Add(stateEvent);
+ int FindIndex(StateEvent needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
+ }
+
+ return -1;
}
}
public static void MergeStateEventLists(this IList<StateEventResponse> oldState, IList<StateEventResponse> newState) {
- foreach (var stateEvent in newState) {
- var old = oldState.FirstOrDefault(x => x.Type == stateEvent.Type && x.StateKey == stateEvent.StateKey);
- if (old is null) {
- oldState.Add(stateEvent);
- continue;
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ break;
+ case var index:
+ oldState[index] = e;
+ break;
+ }
+ }
+
+ int FindIndex(StateEventResponse needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
+ }
+
+ return -1;
+ }
+ }
+
+ public static void MergeStateEventLists(this List<StateEventResponse> oldState, List<StateEventResponse> newState) {
+ foreach (var e in newState) {
+ switch (FindIndex(e)) {
+ case -1:
+ oldState.Add(e);
+ insertions++;
+ break;
+ case var index:
+ oldState[index] = e;
+ replacements++;
+ break;
+ }
+ }
+
+ int FindIndex(StateEventResponse needle) {
+ for (int i = 0; i < oldState.Count; i++) {
+ var old = oldState[i];
+ if (old.Type == needle.Type && old.StateKey == needle.StateKey)
+ return i;
}
- oldState.Remove(old);
- oldState.Add(stateEvent);
+ return -1;
}
}
}
\ No newline at end of file
diff --git a/LibMatrix/Extensions/MatrixHttpClient.Single.cs b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
index cdc0dca..0e6d467 100644
--- a/LibMatrix/Extensions/MatrixHttpClient.Single.cs
+++ b/LibMatrix/Extensions/MatrixHttpClient.Single.cs
@@ -51,6 +51,7 @@ public class MatrixHttpClient {
internal SemaphoreSlim _rateLimitSemaphore { get; } = new(1, 1);
#endif
+ private const bool LogRequests = true;
public Dictionary<string, string> AdditionalQueryParameters { get; set; } = new();
public Uri? BaseAddress { get; set; }
@@ -72,7 +73,7 @@ public class MatrixHttpClient {
public async Task<HttpResponseMessage> SendUnhandledAsync(HttpRequestMessage request, CancellationToken cancellationToken) {
if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null");
// if (!request.RequestUri.IsAbsoluteUri)
- request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!);
+ request.RequestUri = request.RequestUri.EnsureAbsolute(BaseAddress!);
var swWait = Stopwatch.StartNew();
#if SYNC_HTTPCLIENT
await _rateLimitSemaphore.WaitAsync(cancellationToken);
@@ -82,6 +83,9 @@ public class MatrixHttpClient {
if (request.RequestUri is null) throw new NullReferenceException("RequestUri is null");
if (!request.RequestUri.IsAbsoluteUri) request.RequestUri = new Uri(BaseAddress, request.RequestUri);
+ swWait.Stop();
+ var swExec = Stopwatch.StartNew();
+
foreach (var (key, value) in AdditionalQueryParameters) request.RequestUri = request.RequestUri.AddQuery(key, value);
foreach (var (key, value) in DefaultRequestHeaders) {
if (request.Headers.Contains(key)) continue;
@@ -90,7 +94,8 @@ public class MatrixHttpClient {
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true);
- Console.WriteLine("Sending " + request.Summarise(includeHeaders:true, includeQuery: true, includeContentIfText: true));
+ if (LogRequests)
+ Console.WriteLine("Sending " + request.Summarise(includeHeaders: true, includeQuery: true, includeContentIfText: true, hideHeaders: ["Accept"]));
HttpResponseMessage? responseMessage;
try {
@@ -108,19 +113,25 @@ public class MatrixHttpClient {
#endif
// Console.WriteLine($"Sending {request.Method} {request.RequestUri} ({Util.BytesToString(request.Content?.Headers.ContentLength ?? 0)}) -> {(int)responseMessage.StatusCode} {responseMessage.StatusCode} ({Util.BytesToString(responseMessage.GetContentLength())}, WAIT={swWait.ElapsedMilliseconds}ms, EXEC={swExec.ElapsedMilliseconds}ms)");
- Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [
- "Server",
- "Date",
- "Transfer-Encoding",
- "Connection",
- "Vary",
- "Content-Length",
- "Access-Control-Allow-Origin",
- "Access-Control-Allow-Methods",
- "Access-Control-Allow-Headers",
- "Access-Control-Expose-Headers",
- "Cache-Control"
- ]));
+ if (LogRequests)
+ Console.WriteLine("Received " + responseMessage.Summarise(includeHeaders: true, includeContentIfText: false, hideHeaders: [
+ "Server",
+ "Date",
+ "Transfer-Encoding",
+ "Connection",
+ "Vary",
+ "Content-Length",
+ "Access-Control-Allow-Origin",
+ "Access-Control-Allow-Methods",
+ "Access-Control-Allow-Headers",
+ "Access-Control-Expose-Headers",
+ "Cache-Control",
+ "Cross-Origin-Resource-Policy",
+ "X-Content-Security-Policy",
+ "Referrer-Policy",
+ "X-Robots-Tag",
+ "Content-Security-Policy"
+ ]));
return responseMessage;
}
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index a7010ee..f95d6f8 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -2,6 +2,7 @@ using System.Diagnostics;
using System.Net.Http.Json;
using System.Text.Json;
using ArcaneLibs.Collections;
+using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
@@ -21,6 +22,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
+ public bool UseInternalStreamingSync { get; set; } = true;
public string? FilterId {
get => _filterId;
@@ -114,15 +116,23 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
// logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
- var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
- if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogTrace("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
- var deserializeSw = Stopwatch.StartNew();
- var stream = await httpResp.Content.ReadAsStreamAsync();
- await using var seekableStream = new SeekableStream(stream);
- var resp = await JsonSerializer.DeserializeAsync<SyncResponse>(seekableStream, cancellationToken: cancellationToken ?? CancellationToken.None,
- jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", seekableStream.Position, deserializeSw.Elapsed, sw.Elapsed);
+ SyncResponse? resp = null;
+ if (UseInternalStreamingSync) {
+ resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
+ logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed);
+ }
+ else {
+ var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
+ if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
+ logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
+ var deserializeSw = Stopwatch.StartNew();
+ // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None);
+ // var resp = jsonResp.Deserialize<SyncResponse>();
+ resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
+ jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
+ }
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
if (timeToWait.TotalMilliseconds > 0)
await Task.Delay(timeToWait);
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index 3be4492..282d26f 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,6 +1,7 @@
using System.Collections.Frozen;
using System.Collections.Immutable;
using System.Diagnostics;
+using System.Text;
using ArcaneLibs.Extensions;
using LibMatrix.Extensions;
using LibMatrix.Filters;
@@ -40,15 +41,16 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
return (sync, MergedState);
}
- public async Task OptimiseStore() {
+ public async Task OptimiseStore(Action<int, int>? progressCallback = null) {
if (storageProvider is null) return;
if (!await storageProvider.ObjectExistsAsync("init")) return;
var totalSw = Stopwatch.StartNew();
Console.Write("Optimising sync store...");
var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init");
- var keys = (await storageProvider.GetAllKeysAsync()).Where(x=>!x.StartsWith("old/")).ToFrozenSet();
+ var keys = (await storageProvider.GetAllKeysAsync()).Where(x => !x.StartsWith("old/")).ToFrozenSet();
var count = keys.Count - 1;
+ int total = count;
Console.WriteLine($"Found {count} entries to optimise.");
var merged = await initLoadTask;
@@ -64,6 +66,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
var moveTasks = new List<Task>();
+ Dictionary<string, Dictionary<string, TimeSpan>> traces = [];
while (keys.Contains(merged.NextBatch)) {
Console.Write($"Merging {merged.NextBatch}, {--count} remaining... ");
var sw = Stopwatch.StartNew();
@@ -77,28 +80,36 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}"));
Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
- merged = MergeSyncs(merged, next);
+ var trace = new Dictionary<string, TimeSpan>();
+ traces[merged.NextBatch] = trace;
+ merged = MergeSyncs(merged, next, trace);
Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms");
// Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+ progressCallback?.Invoke(count, total);
}
+ var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}"));
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
+ await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
+
await storageProvider.SaveObjectAsync("init", merged);
await Task.WhenAll(moveTasks);
-
+
Console.WriteLine($"Optimised store in {totalSw.Elapsed.TotalMilliseconds}ms");
+ Console.WriteLine($"Insertions: {EnumerableExtensions.insertions}, replacements: {EnumerableExtensions.replacements}");
}
/// <summary>
/// Remove all but initial sync and last checkpoint
/// </summary>
public async Task RemoveOldSnapshots() {
- if(storageProvider is null) return;
+ if (storageProvider is null) return;
var sw = Stopwatch.StartNew();
var map = await GetCheckpointMap();
if (map is null) return;
- if(map.Count < 3) return;
+ if (map.Count < 3) return;
var toRemove = map.Keys.Skip(1).Take(map.Count - 2).ToList();
Console.Write("Cleaning up old snapshots: ");
@@ -109,6 +120,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
await storageProvider?.DeleteObjectAsync(path);
}
}
+
Console.WriteLine("Done!");
Console.WriteLine($"Removed {toRemove.Count} old snapshots in {sw.Elapsed.TotalMilliseconds}ms");
}
@@ -137,6 +149,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
// Console.WriteLine($"[{++i}] {key} -> {resp.NextBatch} ({resp.GetDerivedSyncTime()})");
i++;
}
+
Console.WriteLine($"Iterated {i} syncResponses in {sw.Elapsed}");
Environment.Exit(0);
}
@@ -188,7 +201,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
if (resp.GetDerivedSyncTime() > unixTime) break;
merged = MergeSyncs(merged, resp);
}
-
+
return merged;
}
@@ -208,29 +221,29 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
return map.OrderBy(x => x.Key).ToImmutableSortedDictionary(x => x.Key, x => x.Value.ToFrozenSet());
}
- private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync) {
+ private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) {
+ var sw = Stopwatch.StartNew();
oldSync.NextBatch = newSync.NextBatch ?? oldSync.NextBatch;
- oldSync.AccountData ??= new EventList();
- oldSync.AccountData.Events ??= [];
- if (newSync.AccountData?.Events is not null)
- oldSync.AccountData.Events.MergeStateEventLists(newSync.AccountData?.Events ?? []);
+ oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData);
+ trace?.Add("AccountData", sw.GetElapsedAndRestart());
- oldSync.Presence ??= new();
- oldSync.Presence.Events?.ReplaceBy(newSync.Presence?.Events ?? [], (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
+ oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
+ trace?.Add("Presence", sw.GetElapsedAndRestart());
+ // TODO: can this be cleaned up?
oldSync.DeviceOneTimeKeysCount ??= new();
if (newSync.DeviceOneTimeKeysCount is not null)
foreach (var (key, value) in newSync.DeviceOneTimeKeysCount)
oldSync.DeviceOneTimeKeysCount[key] = value;
+ trace?.Add("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart());
if (newSync.Rooms is not null)
- oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms);
+ oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, trace);
+ trace?.Add("Rooms", sw.GetElapsedAndRestart());
- oldSync.ToDevice ??= new EventList();
- oldSync.ToDevice.Events ??= [];
- if (newSync.ToDevice?.Events is not null)
- oldSync.ToDevice.Events.MergeStateEventLists(newSync.ToDevice?.Events ?? []);
+ oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice);
+ trace?.Add("ToDevice", sw.GetElapsedAndRestart());
oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
oldSync.DeviceLists.Changed ??= [];
@@ -241,125 +254,171 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
oldSync.DeviceLists.Changed.Add(s);
}
+ trace?.Add("DeviceLists.Changed", sw.GetElapsedAndRestart());
+
if (newSync.DeviceLists?.Left is not null)
foreach (var s in newSync.DeviceLists.Left!) {
oldSync.DeviceLists.Changed.Remove(s);
oldSync.DeviceLists.Left.Add(s);
}
- return oldSync;
- }
-
- private List<StateEventResponse>? MergePresenceEvents(List<StateEventResponse>? oldEvents, List<StateEventResponse>? newEvents) {
- if (oldEvents is null) return newEvents;
- if (newEvents is null) return oldEvents;
+ trace?.Add("DeviceLists.Left", sw.GetElapsedAndRestart());
- foreach (var newEvent in newEvents) {
- oldEvents.RemoveAll(x => x.Sender == newEvent.Sender && x.Type == newEvent.Type);
- oldEvents.Add(newEvent);
- }
-
- return oldEvents;
+ return oldSync;
}
#region Merge rooms
- private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState) {
+ private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState,
+ Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
if (oldState is null) return newState;
- oldState.Join ??= new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>();
- foreach (var (key, value) in newState.Join ?? new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>())
- if (!oldState.Join.ContainsKey(key)) oldState.Join[key] = value;
- else oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value);
-
- oldState.Invite ??= new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>();
- foreach (var (key, value) in newState.Invite ?? new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>())
- if (!oldState.Invite.ContainsKey(key)) oldState.Invite[key] = value;
- else oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value);
-
- oldState.Leave ??= new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>();
- foreach (var (key, value) in newState.Leave ?? new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>()) {
- if (!oldState.Leave.ContainsKey(key)) oldState.Leave[key] = value;
- else oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value);
- if (oldState.Invite.ContainsKey(key)) oldState.Invite.Remove(key);
- if (oldState.Join.ContainsKey(key)) oldState.Join.Remove(key);
- }
+
+ if (newState.Join is { Count: > 0 })
+ if (oldState.Join is null)
+ oldState.Join = newState.Join;
+ else
+ foreach (var (key, value) in newState.Join)
+ if (!oldState.Join.TryAdd(key, value))
+ oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, trace);
+ trace?.Add("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart());
+
+ if (newState.Invite is { Count: > 0 })
+ if (oldState.Invite is null)
+ oldState.Invite = newState.Invite;
+ else
+ foreach (var (key, value) in newState.Invite)
+ if (!oldState.Invite.TryAdd(key, value))
+ oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, trace);
+ trace?.Add("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart());
+
+ if (newState.Leave is { Count: > 0 })
+ if (oldState.Leave is null)
+ oldState.Leave = newState.Leave;
+ else
+ foreach (var (key, value) in newState.Leave) {
+ if (!oldState.Leave.TryAdd(key, value))
+ oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, trace);
+ if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key);
+ if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key);
+ }
+ trace?.Add("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart());
return oldState;
}
private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= [];
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= [];
- oldData.State ??= new EventList();
- oldData.State.Events ??= [];
-
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? []);
-
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? []);
+ SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
+
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
+ trace?.Add($"LeftRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
+ trace?.Add($"LeftRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? []);
+ oldData.State = MergeEventList(oldData.State, newData.State);
+ trace?.Add($"LeftRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData) {
- oldData.InviteState ??= new EventList();
- oldData.InviteState.Events ??= [];
- if (newData.InviteState?.Events is not null)
- oldData.InviteState.Events.MergeStateEventLists(newData.InviteState?.Events ?? []);
+ SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
+ oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState);
+ trace?.Add($"InvitedRoomDataStructure.InviteState/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= [];
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= [];
- oldData.State ??= new EventList();
- oldData.State.Events ??= [];
- oldData.Ephemeral ??= new EventList();
- oldData.Ephemeral.Events ??= [];
-
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? []);
-
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? []);
+ SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ var sw = Stopwatch.StartNew();
+
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
+ trace?.Add($"JoinedRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
+ trace?.Add($"JoinedRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? []);
+ oldData.State = MergeEventList(oldData.State, newData.State);
+ trace?.Add($"JoinedRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
- if (newData.Ephemeral?.Events is not null)
- oldData.Ephemeral.Events.MergeStateEventLists(newData.Ephemeral?.Events ?? []);
+ oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral);
+ trace?.Add($"JoinedRoomDataStructure.Ephemeral/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure();
oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount;
oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount;
+ trace?.Add($"JoinedRoom$DataStructure.UnreadNotifications/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (oldData.Summary is null)
+ oldData.Summary = newData.Summary;
+ else {
+ oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
+ oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
+ oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ }
- oldData.Summary ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.SummaryDataStructure {
- Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes,
- JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount,
- InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount
- };
- oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
- oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
- oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ trace?.Add($"JoinedRoomDataStructure.Summary/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
return oldData;
}
#endregion
+
+ private static EventList? MergeEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.MergeStateEventLists(newState.Events);
+ return oldState;
+ }
+
+ private static EventList? MergeEventListBy(EventList? oldState, EventList? newState, Func<StateEventResponse, StateEventResponse, bool> comparer) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.ReplaceBy(newState.Events, comparer);
+ return oldState;
+ }
+
+ private static EventList? AppendEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.AddRange(newState.Events);
+ return oldState;
+ }
}
\ No newline at end of file
diff --git a/LibMatrix/Responses/SyncResponse.cs b/LibMatrix/Responses/SyncResponse.cs
index 9b4ce05..a4391b7 100644
--- a/LibMatrix/Responses/SyncResponse.cs
+++ b/LibMatrix/Responses/SyncResponse.cs
@@ -90,7 +90,7 @@ public class SyncResponse {
[JsonPropertyName("summary")]
public SummaryDataStructure? Summary { get; set; }
- public class TimelineDataStructure {
+ public class TimelineDataStructure : EventList {
public TimelineDataStructure() { }
public TimelineDataStructure(List<StateEventResponse>? events, bool? limited) {
@@ -98,8 +98,8 @@ public class SyncResponse {
Limited = limited;
}
- [JsonPropertyName("events")]
- public List<StateEventResponse>? Events { get; set; }
+ // [JsonPropertyName("events")]
+ // public List<StateEventResponse>? Events { get; set; }
[JsonPropertyName("prev_batch")]
public string? PrevBatch { get; set; }
diff --git a/LibMatrix/RoomTypes/GenericRoom.cs b/LibMatrix/RoomTypes/GenericRoom.cs
index 02bd555..84a3d30 100644
--- a/LibMatrix/RoomTypes/GenericRoom.cs
+++ b/LibMatrix/RoomTypes/GenericRoom.cs
@@ -1,6 +1,7 @@
using System.Collections.Frozen;
using System.Diagnostics;
using System.Net.Http.Json;
+using System.Security.Cryptography;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
diff --git a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
index 5cdc3ab..c798cce 100644
--- a/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
+++ b/Utilities/LibMatrix.HomeserverEmulator/Services/RoomStore.cs
@@ -137,7 +137,7 @@ public class RoomStore {
public Room(string roomId) {
if (string.IsNullOrWhiteSpace(roomId)) throw new ArgumentException("Value cannot be null or whitespace.", nameof(roomId));
- if (roomId[0] != '!') throw new ArgumentException("Room ID must start with !", nameof(roomId));
+ if (roomId[0] != '!') throw new ArgumentException($"Room ID must start with '!', provided value: {roomId ?? "null"}", nameof(roomId));
RoomId = roomId;
Timeline = new();
AccountData = new();
diff --git a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
index ca6a4d8..8501d41 100644
--- a/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
+++ b/Utilities/LibMatrix.Utilities.Bot/BotCommandInstaller.cs
@@ -18,7 +18,7 @@ public class BotInstaller(IServiceCollection services) {
public BotInstaller AddMatrixBot() {
services.AddSingleton<LibMatrixBotConfiguration>();
- services.AddScoped<AuthenticatedHomeserverGeneric>(x => {
+ services.AddSingleton<AuthenticatedHomeserverGeneric>(x => {
var config = x.GetService<LibMatrixBotConfiguration>() ?? throw new Exception("No configuration found!");
var hsProvider = x.GetService<HomeserverProviderService>() ?? throw new Exception("No homeserver provider found!");
diff --git a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
index 601e598..9a7585e 100644
--- a/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
+++ b/Utilities/LibMatrix.Utilities.Bot/Services/CommandListenerHostedService.cs
@@ -59,22 +59,44 @@ public class CommandListenerHostedService : IHostedService {
FilterId = filter
};
- syncHelper.TimelineEventHandlers.Add(async @event => {
- try {
- var room = _hs.GetRoom(@event.RoomId);
- // _logger.LogInformation(eventResponse.ToJson(indent: false));
- if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message })
- if (message is { MessageType: "m.text" }) {
- var usedPrefix = await GetUsedPrefix(@event);
- if (usedPrefix is null) return;
- var res = await InvokeCommand(@event, usedPrefix);
- await (_commandResultHandler?.Invoke(res) ?? HandleResult(res));
+ syncHelper.SyncReceivedHandlers.Add(async sync => {
+ _logger.LogInformation("Sync received!");
+ foreach (var roomResp in sync.Rooms?.Join ?? []) {
+ if (roomResp.Value.Timeline?.Events is null or { Count: > 5 }) continue;
+ foreach (var @event in roomResp.Value.Timeline.Events) {
+ @event.RoomId = roomResp.Key;
+ try {
+ // var room = _hs.GetRoom(@event.RoomId);
+ // _logger.LogInformation(eventResponse.ToJson(indent: false));
+ if (@event is { Type: "m.room.message", TypedContent: RoomMessageEventContent message })
+ if (message is { MessageType: "m.text" }) {
+ var usedPrefix = await GetUsedPrefix(@event);
+ if (usedPrefix is null) return;
+ var res = await InvokeCommand(@event, usedPrefix);
+ await (_commandResultHandler?.Invoke(res) ?? HandleResult(res));
+ }
}
- }
- catch (Exception e) {
- _logger.LogError(e, "Error in command listener!");
+ catch (Exception e) {
+ _logger.LogError(e, "Error in command listener!");
+ Console.WriteLine(@event.ToJson(ignoreNull: false, indent: true));
+ var fakeResult = new CommandResult() {
+ Result = CommandResult.CommandResultType.Failure_Exception,
+ Exception = e,
+ Success = false,
+ Context = new() {
+ Homeserver = _hs,
+ CommandName = "[CommandListener.SyncHandler]",
+ Room = _hs.GetRoom(roomResp.Key),
+ Args = [],
+ MessageEvent = @event
+ }
+ };
+ await (_commandResultHandler?.Invoke(fakeResult) ?? HandleResult(fakeResult));
+ }
+ }
}
});
+
await syncHelper.RunSyncLoopAsync(cancellationToken: cancellationToken);
}
|