diff --git a/ArcaneLibs b/ArcaneLibs
-Subproject 2956a7ce4e8d12034322a91b6afa449e7035485
+Subproject 3d116cf672f881a67732f2cb6cc5c7efa1b8deb
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index 282d26f..35360d2 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -35,7 +35,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
if (sync is null) return await ContinueAsync(cancellationToken);
if (MergedState is null) MergedState = sync;
- else MergedState = MergeSyncs(MergedState, sync);
+ else MergedState = await MergeSyncs(MergedState, sync);
Since = sync.NextBatch;
return (sync, MergedState);
@@ -82,16 +82,37 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
var trace = new Dictionary<string, TimeSpan>();
traces[merged.NextBatch] = trace;
- merged = MergeSyncs(merged, next, trace);
+ merged = await MergeSyncs(merged, next, trace);
Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms");
// Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
progressCallback?.Invoke(count, total);
+#if WRITE_TRACE
+ var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false, ignoreNull: true)}"));
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
+ var traceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
+ var slowtraceString = string.Join("\n",
+ traces
+ .Where(x=>x.Value.Max(y=>y.Value.TotalMilliseconds) >= 100)
+ .OrderBy(x=>x.Value.Max(y=>y.Value))
+ .Select(x => $"{x.Key}\t{x.Value.Where(y => y.Value.TotalMilliseconds >= 100).ToDictionary().ToJson(indent: false, ignoreNull: true)}"));
+ var slowms = new MemoryStream(Encoding.UTF8.GetBytes(slowtraceString));
+ var slowTraceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}-slow", slowms);
+ var slow1straceString = string.Join("\n",
+ traces
+ .Where(x=>x.Value.Max(y=>y.Value.TotalMilliseconds) >= 1000)
+ .OrderBy(x=>x.Value.Max(y=>y.Value))
+ .Select(x => $"{x.Key}\t{x.Value.Where(y => y.Value.TotalMilliseconds >= 1000).ToDictionary().ToJson(indent: false, ignoreNull: true)}"));
+ var slow1sms = new MemoryStream(Encoding.UTF8.GetBytes(slow1straceString));
+ var slow1sTraceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}-slow-1s", slow1sms);
+
+ await Task.WhenAll(traceSaveTask, slowTraceSaveTask, slow1sTraceSaveTask);
+#endif
}
- var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}"));
- var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
- await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
+ // var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false)}"));
+ // var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
+ // await storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
await storageProvider.SaveObjectAsync("init", merged);
await Task.WhenAll(moveTasks);
@@ -115,9 +136,9 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
Console.Write("Cleaning up old snapshots: ");
foreach (var key in toRemove) {
var path = $"old/{key}/init";
- if (await storageProvider?.ObjectExistsAsync(path)) {
+ if (await storageProvider.ObjectExistsAsync(path)) {
Console.Write($"{key}... ");
- await storageProvider?.DeleteObjectAsync(path);
+ await storageProvider.DeleteObjectAsync(path);
}
}
@@ -199,7 +220,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
var (key, resp) = stream.Current;
if (resp is null) continue;
if (resp.GetDerivedSyncTime() > unixTime) break;
- merged = MergeSyncs(merged, resp);
+ merged = await MergeSyncs(merged, resp);
}
return merged;
@@ -221,48 +242,82 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
return map.OrderBy(x => x.Key).ToImmutableSortedDictionary(x => x.Key, x => x.Value.ToFrozenSet());
}
- private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) {
- var sw = Stopwatch.StartNew();
- oldSync.NextBatch = newSync.NextBatch ?? oldSync.NextBatch;
-
- oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData);
- trace?.Add("AccountData", sw.GetElapsedAndRestart());
-
- oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
- trace?.Add("Presence", sw.GetElapsedAndRestart());
-
- // TODO: can this be cleaned up?
- oldSync.DeviceOneTimeKeysCount ??= new();
- if (newSync.DeviceOneTimeKeysCount is not null)
- foreach (var (key, value) in newSync.DeviceOneTimeKeysCount)
- oldSync.DeviceOneTimeKeysCount[key] = value;
- trace?.Add("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart());
-
- if (newSync.Rooms is not null)
- oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, trace);
- trace?.Add("Rooms", sw.GetElapsedAndRestart());
-
- oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice);
- trace?.Add("ToDevice", sw.GetElapsedAndRestart());
-
- oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
- oldSync.DeviceLists.Changed ??= [];
- oldSync.DeviceLists.Left ??= [];
- if (newSync.DeviceLists?.Changed is not null)
- foreach (var s in newSync.DeviceLists.Changed!) {
- oldSync.DeviceLists.Left.Remove(s);
- oldSync.DeviceLists.Changed.Add(s);
+ private async Task<SyncResponse> MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) {
+ // var sw = Stopwatch.StartNew();
+ oldSync.NextBatch = newSync.NextBatch;
+
+ void Trace(string key, TimeSpan span) {
+ if (trace is not null) {
+ lock (trace)
+ trace.Add(key, span);
}
+ }
- trace?.Add("DeviceLists.Changed", sw.GetElapsedAndRestart());
+ var accountDataTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: AccountData took {sw.ElapsedMilliseconds}ms");
+ Trace("AccountData", sw.GetElapsedAndRestart());
+ });
- if (newSync.DeviceLists?.Left is not null)
- foreach (var s in newSync.DeviceLists.Left!) {
- oldSync.DeviceLists.Changed.Remove(s);
- oldSync.DeviceLists.Left.Add(s);
- }
+ var presenceTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence, (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: Presence took {sw.ElapsedMilliseconds}ms");
+ Trace("Presence", sw.GetElapsedAndRestart());
+ });
+
+ var deviceOneTimeKeysTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ // TODO: can this be cleaned up?
+ oldSync.DeviceOneTimeKeysCount ??= new();
+ if (newSync.DeviceOneTimeKeysCount is not null)
+ foreach (var (key, value) in newSync.DeviceOneTimeKeysCount)
+ oldSync.DeviceOneTimeKeysCount[key] = value;
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceOneTimeKeysCount took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart());
+ });
+
+ var roomsTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ if (newSync.Rooms is not null)
+ oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, Trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: Rooms took {sw.ElapsedMilliseconds}ms");
+ Trace("Rooms", sw.GetElapsedAndRestart());
+ });
+
+ var toDeviceTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: ToDevice took {sw.ElapsedMilliseconds}ms");
+ Trace("ToDevice", sw.GetElapsedAndRestart());
+ });
+
+ var deviceListsTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
+ oldSync.DeviceLists.Changed ??= [];
+ oldSync.DeviceLists.Left ??= [];
+ if (newSync.DeviceLists?.Changed is not null)
+ foreach (var s in newSync.DeviceLists.Changed!) {
+ oldSync.DeviceLists.Left.Remove(s);
+ oldSync.DeviceLists.Changed.Add(s);
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceLists.Changed took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceLists.Changed", sw.GetElapsedAndRestart());
+
+ if (newSync.DeviceLists?.Left is not null)
+ foreach (var s in newSync.DeviceLists.Left!) {
+ oldSync.DeviceLists.Changed.Remove(s);
+ oldSync.DeviceLists.Left.Add(s);
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceLists.Left took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceLists.Left", sw.GetElapsedAndRestart());
+ });
- trace?.Add("DeviceLists.Left", sw.GetElapsedAndRestart());
+ await Task.WhenAll(accountDataTask, presenceTask, deviceOneTimeKeysTask, roomsTask, toDeviceTask, deviceListsTask);
return oldSync;
}
@@ -270,7 +325,7 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
#region Merge rooms
private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState,
- Dictionary<string, TimeSpan>? trace) {
+ Action<string, TimeSpan> trace) {
var sw = Stopwatch.StartNew();
if (oldState is null) return newState;
@@ -280,8 +335,9 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
else
foreach (var (key, value) in newState.Join)
if (!oldState.Join.TryAdd(key, value))
- oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, trace);
- trace?.Add("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart());
+ oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, key, trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Join took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart());
if (newState.Invite is { Count: > 0 })
if (oldState.Invite is null)
@@ -289,8 +345,9 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
else
foreach (var (key, value) in newState.Invite)
if (!oldState.Invite.TryAdd(key, value))
- oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, trace);
- trace?.Add("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart());
+ oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, key, trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Invite took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart());
if (newState.Leave is { Count: > 0 })
if (oldState.Leave is null)
@@ -298,66 +355,79 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
else
foreach (var (key, value) in newState.Leave) {
if (!oldState.Leave.TryAdd(key, value))
- oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, trace);
+ oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, key, trace);
if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key);
if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key);
}
- trace?.Add("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Leave took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart());
return oldState;
}
private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
var sw = Stopwatch.StartNew();
oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
- trace?.Add($"LeftRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+ trace($"LeftRoomDataStructure.AccountData/{roomId}", sw.GetElapsedAndRestart());
oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
- trace?.Add($"LeftRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+ trace($"LeftRoomDataStructure.Timeline/{roomId}", sw.GetElapsedAndRestart());
oldData.State = MergeEventList(oldData.State, newData.State);
- trace?.Add($"LeftRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+ trace($"LeftRoomDataStructure.State/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
var sw = Stopwatch.StartNew();
oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState);
- trace?.Add($"InvitedRoomDataStructure.InviteState/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeInvitedRoomDataStructure.InviteState took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"InvitedRoomDataStructure.InviteState/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, Dictionary<string, TimeSpan>? trace) {
+ SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
var sw = Stopwatch.StartNew();
oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
- trace?.Add($"JoinedRoomDataStructure.AccountData/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.AccountData took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.AccountData/{roomId}", sw.GetElapsedAndRestart());
oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
- trace?.Add($"JoinedRoomDataStructure.Timeline/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Timeline took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Timeline/{roomId}", sw.GetElapsedAndRestart());
oldData.State = MergeEventList(oldData.State, newData.State);
- trace?.Add($"JoinedRoomDataStructure.State/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.State took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.State/{roomId}", sw.GetElapsedAndRestart());
oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral);
- trace?.Add($"JoinedRoomDataStructure.Ephemeral/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Ephemeral took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Ephemeral/{roomId}", sw.GetElapsedAndRestart());
oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure();
oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount;
oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount;
- trace?.Add($"JoinedRoom$DataStructure.UnreadNotifications/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.UnreadNotifications took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoom$DataStructure.UnreadNotifications/{roomId}", sw.GetElapsedAndRestart());
if (oldData.Summary is null)
oldData.Summary = newData.Summary;
@@ -367,7 +437,8 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
}
- trace?.Add($"JoinedRoomDataStructure.Summary/{oldData.GetHashCode()}", sw.GetElapsedAndRestart());
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Summary took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Summary/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
@@ -386,7 +457,8 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
return oldState;
}
- oldState.Events.MergeStateEventLists(newState.Events);
+ // oldState.Events.MergeStateEventLists(newState.Events);
+ oldState = MergeEventListBy(oldState, newState, (oldEvt, newEvt) => oldEvt.Type == newEvt.Type && oldEvt.StateKey == newEvt.StateKey);
return oldState;
}
|