diff --git a/LibMatrix/Helpers/HomeserverWeightEstimation.cs b/LibMatrix/Helpers/HomeserverWeightEstimation.cs
deleted file mode 100644
index 5735af3..0000000
--- a/LibMatrix/Helpers/HomeserverWeightEstimation.cs
+++ /dev/null
@@ -1,58 +0,0 @@
-namespace LibMatrix.Helpers;
-
-public class HomeserverWeightEstimation {
- public static Dictionary<string, int> EstimatedSize = new() {
- { "matrix.org", 843870 },
- { "anontier.nl", 44809 },
- { "nixos.org", 8195 },
- { "the-apothecary.club", 6983 },
- { "waifuhunter.club", 3953 },
- { "neko.dev", 2666 },
- { "nerdsin.space", 2647 },
- { "feline.support", 2633 },
- { "gitter.im", 2584 },
- { "midov.pl", 2219 },
- { "no.lgbtqia.zone", 2083 },
- { "nheko.im", 1883 },
- { "fachschaften.org", 1849 },
- { "pixelthefox.net", 1478 },
- { "arcticfoxes.net", 981 },
- { "pixie.town", 817 },
- { "privacyguides.org", 809 },
- { "rory.gay", 653 },
- { "artemislena.eu", 599 },
- { "alchemi.dev", 445 },
- { "jameskitt616.one", 390 },
- { "hackint.org", 382 },
- { "pikaviestin.fi", 368 },
- { "matrix.nomagic.uk", 337 },
- { "thearcanebrony.net", 178 },
- { "fairydust.space", 176 },
- { "grin.hu", 176 },
- { "envs.net", 165 },
- { "tastytea.de", 143 },
- { "koneko.chat", 121 },
- { "vscape.tk", 115 },
- { "funklause.de", 112 },
- { "seirdy.one", 107 },
- { "pcg.life", 72 },
- { "draupnir.midnightthoughts.space", 22 },
- { "tchncs.de", 19 },
- { "catgirl.cloud", 16 },
- { "possum.city", 16 },
- { "tu-dresden.de", 9 },
- { "fosscord.com", 9 },
- { "nightshade.fun", 8 },
- { "matrix.eclipse.org", 8 },
- { "masfloss.net", 8 },
- { "e2e.zone", 8 },
- { "hyteck.de", 8 }
- };
-
- public static Dictionary<string, int> LargeRooms = new() {
- { "!ehXvUhWNASUkSLvAGP:matrix.org", 21957 },
- { "!fRRqjOaQcUbKOfCjvc:anontier.nl", 19117 },
- { "!OGEhHVWSdvArJzumhm:matrix.org", 101457 },
- { "!YTvKGNlinIzlkMTVRl:matrix.org", 30164 }
- };
-}
\ No newline at end of file
diff --git a/LibMatrix/Helpers/MessageBuilder.cs b/LibMatrix/Helpers/MessageBuilder.cs
index d897078..6f55739 100644
--- a/LibMatrix/Helpers/MessageBuilder.cs
+++ b/LibMatrix/Helpers/MessageBuilder.cs
@@ -10,9 +10,9 @@ public class MessageBuilder(string msgType = "m.text", string format = "org.matr
public RoomMessageEventContent Build() => Content;
- public MessageBuilder WithBody(string body) {
+ public MessageBuilder WithBody(string body, string? formattedBody = null) {
Content.Body += body;
- Content.FormattedBody += body;
+ Content.FormattedBody += formattedBody ?? body;
return this;
}
@@ -37,6 +37,10 @@ public class MessageBuilder(string msgType = "m.text", string format = "org.matr
return this;
}
+ public static string GetColoredBody(string color, string body) {
+ return $"<font color=\"{color}\">{body}</font>";
+ }
+
public MessageBuilder WithColoredBody(string color, string body) {
Content.Body += body;
Content.FormattedBody += $"<font color=\"{color}\">{body}</font>";
@@ -91,16 +95,38 @@ public class MessageBuilder(string msgType = "m.text", string format = "org.matr
return this;
}
+ public MessageBuilder WithMention(string id, string? displayName = null) {
+ Content.Body += $"@{displayName ?? id}";
+ Content.FormattedBody += $"<a href=\"https://matrix.to/#/{id}\">{displayName ?? id}</a>";
+ if (id == "@room") {
+ Content.Mentions ??= new();
+ Content.Mentions.Room = true;
+ }
+ else if (id.StartsWith('@')) {
+ Content.Mentions ??= new();
+ Content.Mentions.Users ??= new();
+ Content.Mentions.Users.Add(id);
+ }
+
+ return this;
+ }
+
+ public MessageBuilder WithNewline() {
+ Content.Body += "\n";
+ Content.FormattedBody += "<br>";
+ return this;
+ }
+
public MessageBuilder WithTable(Action<TableBuilder> tableBuilder) {
var tb = new TableBuilder(this);
- this.WithHtmlTag("table", msb => tableBuilder(tb));
+ WithHtmlTag("table", msb => tableBuilder(tb));
return this;
}
public class TableBuilder(MessageBuilder msb) {
public TableBuilder WithTitle(string title, int colspan) {
msb.Content.Body += title + "\n";
- msb.Content.FormattedBody += $"<thead><tr><th colspan=\"{colspan}\">{title}</th></tr></thead>";
+ msb.Content.FormattedBody += $"<thead><tr><th colspan=\"{colspan}\">{title}</th></tr></thead><br/>";
return this;
}
diff --git a/LibMatrix/Helpers/RoomBuilder.cs b/LibMatrix/Helpers/RoomBuilder.cs
new file mode 100644
index 0000000..bef7568
--- /dev/null
+++ b/LibMatrix/Helpers/RoomBuilder.cs
@@ -0,0 +1,173 @@
+using System.Runtime.Intrinsics.X86;
+using LibMatrix.EventTypes.Spec.State.RoomInfo;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using LibMatrix.RoomTypes;
+
+namespace LibMatrix.Helpers;
+
+public class RoomBuilder {
+ public string? Type { get; set; }
+ public string Version { get; set; } = "11";
+ public RoomNameEventContent Name { get; set; } = new();
+ public RoomTopicEventContent Topic { get; set; } = new();
+ public RoomAvatarEventContent Avatar { get; set; } = new();
+ public RoomCanonicalAliasEventContent CanonicalAlias { get; set; } = new();
+ public string AliasLocalPart { get; set; } = string.Empty;
+ public bool IsFederatable { get; set; } = true;
+ public long OwnPowerLevel { get; set; } = MatrixConstants.MaxSafeJsonInteger;
+
+ public RoomJoinRulesEventContent JoinRules { get; set; } = new() {
+ JoinRule = RoomJoinRulesEventContent.JoinRules.Public
+ };
+
+ public RoomHistoryVisibilityEventContent HistoryVisibility { get; set; } = new() {
+ HistoryVisibility = RoomHistoryVisibilityEventContent.HistoryVisibilityTypes.Shared
+ };
+
+ /// <summary>
+ /// State events to be sent *before* room access is configured. Keep this small!
+ /// </summary>
+ public List<StateEvent> ImportantState { get; set; } = [];
+
+ /// <summary>
+ /// State events to be sent *after* room access is configured, but before invites are sent.
+ /// </summary>
+ public List<StateEvent> InitialState { get; set; } = [];
+
+ /// <summary>
+ /// Users to invite, with optional reason
+ /// </summary>
+ public Dictionary<string, string?> Invites { get; set; } = new();
+
+ public RoomPowerLevelEventContent PowerLevels { get; init; } = new() {
+ EventsDefault = 0,
+ UsersDefault = 0,
+ Kick = 50,
+ Invite = 50,
+ Ban = 50,
+ Redact = 50,
+ StateDefault = 50,
+ NotificationsPl = new() {
+ Room = 50
+ },
+ Users = [],
+ Events = new Dictionary<string, long> {
+ { RoomAvatarEventContent.EventId, 50 },
+ { RoomCanonicalAliasEventContent.EventId, 50 },
+ { RoomEncryptionEventContent.EventId, 100 },
+ { RoomHistoryVisibilityEventContent.EventId, 100 },
+ { RoomNameEventContent.EventId, 50 },
+ { RoomPowerLevelEventContent.EventId, 100 },
+ { RoomServerAclEventContent.EventId, 100 },
+ { RoomTombstoneEventContent.EventId, 100 },
+ { RoomPolicyServerEventContent.EventId, 100 }
+ }
+ };
+
+ public async Task<GenericRoom> Create(AuthenticatedHomeserverGeneric homeserver) {
+ var crq = new CreateRoomRequest() {
+ PowerLevelContentOverride = new() {
+ EventsDefault = 1000000,
+ UsersDefault = 1000000,
+ Kick = 1000000,
+ Invite = 1000000,
+ Ban = 1000000,
+ Redact = 1000000,
+ StateDefault = 1000000,
+ NotificationsPl = new() {
+ Room = 1000000
+ },
+ Users = new Dictionary<string, long>() {
+ { homeserver.WhoAmI.UserId, MatrixConstants.MaxSafeJsonInteger }
+ },
+ Events = new Dictionary<string, long> {
+ { RoomAvatarEventContent.EventId, 1000000 },
+ { RoomCanonicalAliasEventContent.EventId, 1000000 },
+ { RoomEncryptionEventContent.EventId, 1000000 },
+ { RoomHistoryVisibilityEventContent.EventId, 1000000 },
+ { RoomNameEventContent.EventId, 1000000 },
+ { RoomPowerLevelEventContent.EventId, 1000000 },
+ { RoomServerAclEventContent.EventId, 1000000 },
+ { RoomTombstoneEventContent.EventId, 1000000 },
+ { RoomPolicyServerEventContent.EventId, 1000000 }
+ }
+ },
+ Visibility = "private",
+ RoomVersion = Version
+ };
+
+ if (!string.IsNullOrWhiteSpace(Type))
+ crq.CreationContent.Add("type", Type);
+
+ if (!IsFederatable)
+ crq.CreationContent.Add("m.federate", false);
+
+ var room = await homeserver.CreateRoom(crq);
+
+ await SetBasicRoomInfoAsync(room);
+ await SetStatesAsync(room, ImportantState);
+ await SetAccessAsync(room);
+ await SetStatesAsync(room, InitialState);
+ await SendInvites(room);
+
+ return room;
+ }
+
+ private async Task SendInvites(GenericRoom room) {
+ if (Invites.Count == 0) return;
+
+ var inviteTasks = Invites.Select(async kvp => {
+ try {
+ await room.InviteUserAsync(kvp.Key, kvp.Value);
+ }
+ catch (MatrixException e) {
+ Console.Error.WriteLine("Failed to invite {0} to {1}: {2}", kvp.Key, room.RoomId, e.Message);
+ }
+ });
+
+ await Task.WhenAll(inviteTasks);
+ }
+
+ private async Task SetStatesAsync(GenericRoom room, List<StateEvent> state) {
+ foreach (var ev in state) {
+ await (string.IsNullOrWhiteSpace(ev.StateKey)
+ ? room.SendStateEventAsync(ev.Type, ev.RawContent)
+ : room.SendStateEventAsync(ev.Type, ev.StateKey, ev.RawContent));
+ }
+ }
+
+ private async Task SetBasicRoomInfoAsync(GenericRoom room) {
+ if (!string.IsNullOrWhiteSpace(Name.Name))
+ await room.SendStateEventAsync(RoomNameEventContent.EventId, Name);
+
+ if (!string.IsNullOrWhiteSpace(Topic.Topic))
+ await room.SendStateEventAsync(RoomTopicEventContent.EventId, Topic);
+
+ if (!string.IsNullOrWhiteSpace(Avatar.Url))
+ await room.SendStateEventAsync(RoomAvatarEventContent.EventId, Avatar);
+
+ if (!string.IsNullOrWhiteSpace(AliasLocalPart))
+ CanonicalAlias.Alias = $"#{AliasLocalPart}:{room.Homeserver.ServerName}";
+
+ if (!string.IsNullOrWhiteSpace(CanonicalAlias.Alias)) {
+ await room.Homeserver.SetRoomAliasAsync(CanonicalAlias.Alias!, room.RoomId);
+ await room.SendStateEventAsync(RoomCanonicalAliasEventContent.EventId, CanonicalAlias);
+ }
+ }
+
+ private async Task SetAccessAsync(GenericRoom room) {
+ PowerLevels.Users![room.Homeserver.WhoAmI.UserId] = OwnPowerLevel;
+ await room.SendStateEventAsync(RoomPowerLevelEventContent.EventId, PowerLevels);
+
+ if (!string.IsNullOrWhiteSpace(HistoryVisibility.HistoryVisibility))
+ await room.SendStateEventAsync(RoomHistoryVisibilityEventContent.EventId, HistoryVisibility);
+
+ if (!string.IsNullOrWhiteSpace(JoinRules.JoinRuleValue))
+ await room.SendStateEventAsync(RoomJoinRulesEventContent.EventId, JoinRules);
+ }
+}
+
+public class MatrixConstants {
+ public const long MaxSafeJsonInteger = 9007199254740991L; // 2^53 - 1
+}
\ No newline at end of file
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 1833bd0..c8e2928 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,22 +1,56 @@
using System.Diagnostics;
using System.Net.Http.Json;
+using System.Reflection;
+using System.Text.Json;
+using ArcaneLibs.Collections;
+using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
+using LibMatrix.Helpers.SyncProcessors;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
+ private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver, logger).EmulateMsc4222;
+
private SyncFilter? _filter;
private string? _namedFilterName;
- private bool _filterIsDirty = false;
- private string? _filterId = null;
+ private bool _filterIsDirty;
+ private string? _filterId;
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
- public string? SetPresence { get; set; } = "online";
+ public string? SetPresence { get; set; }
+
+ /// <summary>
+ /// Disabling this uses a technically slower code path, useful for checking whether delay comes from waiting for server or deserialising responses
+ /// </summary>
+ public bool UseInternalStreamingSync { get; set; } = true;
+
+ public bool UseMsc4222StateAfter {
+ get;
+ set {
+ field = value;
+ if (value) {
+ AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Added MSC4222 emulation sync processor");
+ }
+ else {
+ AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Removed MSC4222 emulation sync processor");
+ }
+ }
+ } = false;
+
+ public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [
+ SimpleSyncProcessors.FillRoomIds
+ ];
+
+ public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = [];
public string? FilterId {
get => _filterId;
@@ -42,16 +76,26 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
_filter = value;
_filterIsDirty = true;
_filterId = null;
+ _namedFilterName = null;
}
}
+ /// <summary>
+ /// Always include all rooms, and their full state according to passed filter
+ /// </summary>
public bool FullState { get; set; }
public bool IsInitialSync { get; set; } = true;
public TimeSpan MinimumDelay { get; set; } = new(0);
- private async Task updateFilterAsync() {
+ public async Task<int> GetUnoptimisedStoreCount() {
+ if (storageProvider is null) return -1;
+ var keys = await storageProvider.GetAllKeysAsync();
+ return keys.Count(static x => !x.StartsWith("old/")) - 1;
+ }
+
+ private async Task UpdateFilterAsync() {
if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
_filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
if (_filterId is null)
@@ -61,9 +105,11 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
else if (Filter is not null)
_filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId;
else _filterId = null;
+
+ _filterIsDirty = false;
}
- public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null) {
+ public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null, bool noDelay = false) {
if (homeserver is null) {
Console.WriteLine("Null passed as homeserver for SyncHelper!");
throw new ArgumentNullException(nameof(homeserver), "Null passed as homeserver for SyncHelper!");
@@ -74,26 +120,86 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
+ if (storageProvider is null) {
+ var res = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (res is null) return null;
+ if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ res = preprocessor(res);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ res = await preprocessor(res);
+ }
+
+ return res;
+ }
+
+ var key = Since ?? "init";
+ if (await storageProvider.ObjectExistsAsync(key)) {
+ var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key);
+ // We explicitly check that NextBatch doesn't match since to prevent infinite loops...
+ if (cached is not null && cached.NextBatch != Since) {
+ logger?.LogInformation("SyncHelper: Using cached sync response for {}", key);
+ return cached;
+ }
+ }
+
+ var sync = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (sync is null) return null;
+ // Ditto here.
+ if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+
+ if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ sync = preprocessor(sync);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ sync = await preprocessor(sync);
+ }
+
+ return sync;
+ }
+
+ private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null, bool noDelay = false) {
var sw = Stopwatch.StartNew();
- if (_filterIsDirty) await updateFilterAsync();
+ if (_filterIsDirty) await UpdateFilterAsync();
- var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
+ var url = $"/_matrix/client/v3/sync?timeout={Timeout}";
+ if (!string.IsNullOrWhiteSpace(SetPresence)) url += $"&set_presence={SetPresence}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (_filterId is not null) url += $"&filter={_filterId}";
+ if (FullState) url += "&full_state=true";
+ if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility
- logger?.LogInformation("SyncHelper: Calling: {}", url);
+ // logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
- var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
- if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.Content.Headers.ContentLength ?? -1, sw.Elapsed);
- var deserializeSw = Stopwatch.StartNew();
- var resp = await httpResp.Content.ReadFromJsonAsync<SyncResponse>(cancellationToken: cancellationToken ?? CancellationToken.None,
- jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.Content.Headers.ContentLength ?? -1, deserializeSw.Elapsed, sw.Elapsed);
+ SyncResponse? resp;
+ if (UseInternalStreamingSync) {
+ resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
+ logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed);
+ }
+ else {
+ var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
+ if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
+ var receivedTime = sw.Elapsed;
+ var deserializeSw = Stopwatch.StartNew();
+ resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
+ jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} response time, {} deserialize time, {} total", httpResp.GetContentLength(), receivedTime,
+ deserializeSw.Elapsed, sw.Elapsed);
+ }
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
- if (timeToWait.TotalMilliseconds > 0)
+ if (!noDelay && timeToWait.TotalMilliseconds > 0) {
+ logger?.LogWarning("SyncAsyncInternal: Waiting {delay}", timeToWait);
await Task.Delay(timeToWait);
+ }
+
return resp;
}
catch (TaskCanceledException) {
@@ -103,6 +209,8 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
catch (Exception e) {
Console.WriteLine(e);
logger?.LogError(e, "Failed to sync!\n{}", e.ToString());
+ await Task.WhenAll(ExceptionHandlers.Select(x => x.Invoke(e)).ToList());
+ if (e is MatrixException { ErrorCode: MatrixException.ErrorCodes.M_UNKNOWN_TOKEN }) throw;
}
return null;
@@ -110,10 +218,17 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public async IAsyncEnumerable<SyncResponse> EnumerateSyncAsync(CancellationToken? cancellationToken = null) {
while (!cancellationToken?.IsCancellationRequested ?? true) {
- var sync = await SyncAsync(cancellationToken);
+ var sw = Stopwatch.StartNew();
+ var sync = await SyncAsync(cancellationToken, noDelay: true);
if (sync is null) continue;
if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch;
yield return sync;
+
+ var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
+ if (timeToWait.TotalMilliseconds > 0) {
+ logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait);
+ await Task.Delay(timeToWait);
+ }
}
}
@@ -183,7 +298,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
if (syncResponse.Rooms is { Join.Count: > 0 })
foreach (var updatedRoom in syncResponse.Rooms.Join) {
if (updatedRoom.Value.Timeline is null) continue;
- foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events) {
+ foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events ?? []) {
stateEventResponse.RoomId = updatedRoom.Key;
var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList();
await Task.WhenAll(tasks);
@@ -210,7 +325,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
-
+
+ /// <summary>
+ /// Event fired when an exception is thrown
+ /// </summary>
+ public List<Func<Exception, Task>> ExceptionHandlers { get; } = new();
+
private void Log(string message) {
if (logger is null) Console.WriteLine(message);
else logger.LogInformation(message);
diff --git a/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
new file mode 100644
index 0000000..e34b5cf
--- /dev/null
+++ b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
@@ -0,0 +1,210 @@
+using System.Diagnostics;
+using System.Timers;
+using ArcaneLibs.Extensions;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+using Microsoft.Extensions.Logging;
+
+namespace LibMatrix.Helpers.SyncProcessors;
+
+public class Msc4222EmulationSyncProcessor(AuthenticatedHomeserverGeneric homeserver, ILogger? logger) {
+ private static bool StateEventsMatch(StateEventResponse a, StateEventResponse b) {
+ return a.Type == b.Type && a.StateKey == b.StateKey;
+ }
+
+ private static bool StateEventIsNewer(StateEventResponse a, StateEventResponse b) {
+ return StateEventsMatch(a, b) && a.OriginServerTs < b.OriginServerTs;
+ }
+
+ public async Task<SyncResponse?> EmulateMsc4222(SyncResponse? resp) {
+ var sw = Stopwatch.StartNew();
+ if (resp is null or { Rooms: null }) return resp;
+
+ if (
+ resp.Rooms.Join?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true
+ || resp.Rooms.Leave?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true
+ ) {
+ logger?.Log(sw.ElapsedMilliseconds > 100 ? LogLevel.Warning : LogLevel.Debug,
+ "Msc4222EmulationSyncProcessor.EmulateMsc4222 determined that no emulation is needed in {elapsed}", sw.Elapsed);
+ return resp;
+ }
+
+ resp = await EmulateMsc4222Internal(resp, sw);
+
+ return SimpleSyncProcessors.FillRoomIds(resp);
+ }
+
+ private async Task<SyncResponse?> EmulateMsc4222Internal(SyncResponse? resp, Stopwatch sw) {
+ var modified = false;
+ List<Task<bool>> tasks = [];
+ if (resp.Rooms is { Join.Count: > 0 }) {
+ tasks.AddRange(resp.Rooms.Join.Select(ProcessJoinedRooms).ToList());
+ }
+
+ if (resp.Rooms is { Leave.Count: > 0 }) {
+ tasks.AddRange(resp.Rooms.Leave.Select(ProcessLeftRooms).ToList());
+ }
+
+ var tasksEnum = tasks.ToAsyncEnumerable();
+ await foreach (var wasModified in tasksEnum) {
+ if (wasModified) {
+ modified = true;
+ }
+ }
+
+ logger?.Log(sw.ElapsedMilliseconds > 100 ? LogLevel.Warning : LogLevel.Debug,
+ "Msc4222EmulationSyncProcessor.EmulateMsc4222 processed {joinCount}/{leaveCount} rooms in {elapsed} (modified: {modified})",
+ resp.Rooms?.Join?.Count ?? 0, resp.Rooms?.Leave?.Count ?? 0, sw.Elapsed, modified);
+
+ if (modified)
+ resp.Msc4222Method = SyncResponse.Msc4222SyncType.Emulated;
+
+ return resp;
+ }
+
+ private async Task<bool> ProcessJoinedRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure> roomData) {
+ var (roomId, data) = roomData;
+ var room = homeserver.GetRoom(roomId);
+
+ if (data.StateAfter is { Events.Count: > 0 }) {
+ return false;
+ }
+
+ data.StateAfter = new() { Events = [] };
+
+ data.StateAfter = new() {
+ Events = []
+ };
+
+ var oldState = new List<StateEventResponse>();
+ if (data.State is { Events.Count: > 0 }) {
+ oldState.ReplaceBy(data.State.Events, StateEventIsNewer);
+ }
+
+ if (data.Timeline is { Limited: true }) {
+ if (data.Timeline.Events != null)
+ oldState.ReplaceBy(data.Timeline.Events, StateEventIsNewer);
+
+ try {
+ var timeline = await homeserver.GetRoom(roomId).GetMessagesAsync(limit: 250);
+ if (timeline is { State.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.State, StateEventIsNewer);
+ }
+
+ if (timeline is { Chunk.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.Chunk.Where(x => x.StateKey != null), StateEventIsNewer);
+ }
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get timeline for room {roomId}, state may be incomplete!\n{exception}", roomId, e);
+ }
+ }
+
+ oldState = oldState.DistinctBy(x => (x.Type, x.StateKey)).ToList();
+
+ // Different order: we need oldState here to reduce the set
+ try {
+ data.StateAfter.Events = (await room.GetFullStateAsListAsync())
+ // .Where(x=> oldState.Any(y => StateEventsMatch(x, y)))
+ // .Join(oldState, x => (x.Type, x.StateKey), y => (y.Type, y.StateKey), (x, y) => x)
+ .IntersectBy(oldState.Select(s => (s.Type, s.StateKey)), s => (s.Type, s.StateKey))
+ .ToList();
+
+ data.State = null;
+ return true;
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get full state for room {roomId}, state may be incomplete!\n{exception}", roomId, e);
+ }
+
+ var tasks = oldState
+ .Select(async oldEvt => {
+ try {
+ return await room.GetStateEventAsync(oldEvt.Type, oldEvt.StateKey!);
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get state event {type}/{stateKey} for room {roomId}, state may be incomplete!\n{exception}",
+ oldEvt.Type, oldEvt.StateKey, roomId, e);
+ return oldEvt;
+ }
+ });
+
+ var tasksEnum = tasks.ToAsyncEnumerable();
+ await foreach (var evt in tasksEnum) {
+ data.StateAfter.Events.Add(evt);
+ }
+
+ data.State = null;
+
+ return true;
+ }
+
+ private async Task<bool> ProcessLeftRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure> roomData) {
+ var (roomId, data) = roomData;
+ var room = homeserver.GetRoom(roomId);
+
+ if (data.StateAfter is { Events.Count: > 0 }) {
+ return false;
+ }
+
+ data.StateAfter = new() {
+ Events = []
+ };
+
+ try {
+ data.StateAfter.Events = await room.GetFullStateAsListAsync();
+ data.State = null;
+ return true;
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get full state for room {roomId}, state may be incomplete!\n{exception}", roomId, e);
+ }
+
+ var oldState = new List<StateEventResponse>();
+ if (data.State is { Events.Count: > 0 }) {
+ oldState.ReplaceBy(data.State.Events, StateEventIsNewer);
+ }
+
+ if (data.Timeline is { Limited: true }) {
+ if (data.Timeline.Events != null)
+ oldState.ReplaceBy(data.Timeline.Events, StateEventIsNewer);
+
+ try {
+ var timeline = await homeserver.GetRoom(roomId).GetMessagesAsync(limit: 250);
+ if (timeline is { State.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.State, StateEventIsNewer);
+ }
+
+ if (timeline is { Chunk.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.Chunk.Where(x => x.StateKey != null), StateEventIsNewer);
+ }
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get timeline for room {roomId}, state may be incomplete!\n{exception}", roomId, e);
+ }
+ }
+
+ oldState = oldState.DistinctBy(x => (x.Type, x.StateKey)).ToList();
+
+ var tasks = oldState
+ .Select(async oldEvt => {
+ try {
+ return await room.GetStateEventAsync(oldEvt.Type, oldEvt.StateKey!);
+ }
+ catch (Exception e) {
+ logger?.LogWarning("Msc4222Emulation: Failed to get state event {type}/{stateKey} for room {roomId}, state may be incomplete!\n{exception}",
+ oldEvt.Type, oldEvt.StateKey, roomId, e);
+ return oldEvt;
+ }
+ });
+
+ var tasksEnum = tasks.ToAsyncEnumerable();
+ await foreach (var evt in tasksEnum) {
+ data.StateAfter.Events.Add(evt);
+ }
+
+ data.State = null;
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs
new file mode 100644
index 0000000..5981cb5
--- /dev/null
+++ b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs
@@ -0,0 +1,47 @@
+using System.Diagnostics;
+using LibMatrix.Responses;
+
+namespace LibMatrix.Helpers.SyncProcessors;
+
+public class SimpleSyncProcessors {
+ public static SyncResponse? FillRoomIds(SyncResponse? resp) {
+ var sw = Stopwatch.StartNew();
+ if (resp is not { Rooms: not null }) return resp;
+ if (resp.Rooms.Join is { Count: > 0 })
+ Parallel.ForEach(resp.Rooms.Join, (roomEntry) => {
+ var (id, data) = roomEntry;
+ if (data.AccountData is { Events.Count: > 0 })
+ Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id);
+ if (data.Ephemeral is { Events.Count: > 0 })
+ Parallel.ForEach(data.Ephemeral.Events, evt => evt.RoomId = id);
+ if (data.Timeline is { Events.Count: > 0 })
+ Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id);
+ if (data.State is { Events.Count: > 0 })
+ Parallel.ForEach(data.State.Events, evt => evt.RoomId = id);
+ if (data.StateAfter is { Events.Count: > 0 })
+ Parallel.ForEach(data.StateAfter.Events, evt => evt.RoomId = id);
+ });
+ if (resp.Rooms.Leave is { Count: > 0 })
+ Parallel.ForEach(resp.Rooms.Leave, (roomEntry) => {
+ var (id, data) = roomEntry;
+ if (data.AccountData is { Events.Count: > 0 })
+ Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id);
+ if (data.Timeline is { Events.Count: > 0 })
+ Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id);
+ if (data.State is { Events.Count: > 0 })
+ Parallel.ForEach(data.State.Events, evt => evt.RoomId = id);
+ if (data.StateAfter is { Events.Count: > 0 })
+ Parallel.ForEach(data.StateAfter.Events, evt => evt.RoomId = id);
+ });
+ if (resp.Rooms.Invite is { Count: > 0 })
+ Parallel.ForEach(resp.Rooms.Invite, (roomEntry) => {
+ var (id, data) = roomEntry;
+ if (data.InviteState is { Events.Count: > 0 })
+ Parallel.ForEach(data.InviteState.Events, evt => evt.RoomId = id);
+ });
+
+ Console.WriteLine($"SimpleSyncProcessors.FillRoomIds took {sw.Elapsed}");
+
+ return resp;
+ }
+}
\ No newline at end of file
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index 72d600d..f111c79 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,12 +1,20 @@
+using System.Collections.Concurrent;
+using System.Collections.Frozen;
+using System.Collections.Immutable;
+using System.Diagnostics;
+using System.Text.Json;
+using System.Threading.Tasks.Dataflow;
+using ArcaneLibs.Extensions;
using LibMatrix.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
@@ -15,7 +23,20 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
public SyncResponse? MergedState { get; set; }
- private SyncHelper _syncHelper = new(homeserver, logger);
+ private SyncHelper _syncHelper = new(homeserver, logger, storageProvider);
+
+ private async Task<SyncResponse?> LoadSyncResponse(string key) {
+ if (storageProvider is null) ArgumentNullException.ThrowIfNull(storageProvider);
+ var stream = await storageProvider.LoadStreamAsync(key);
+ return JsonSerializer.Deserialize<SyncResponse>(stream!, SyncResponseSerializerContext.Default.SyncResponse);
+ }
+
+ private async Task SaveSyncResponse(string key, SyncResponse value) {
+ ArgumentNullException.ThrowIfNull(storageProvider);
+ var ms = new MemoryStream();
+ await JsonSerializer.SerializeAsync(ms, value, SyncResponseSerializerContext.Default.SyncResponse);
+ await storageProvider.SaveStreamAsync(key, ms);
+ }
public async Task<(SyncResponse next, SyncResponse merged)> ContinueAsync(CancellationToken? cancellationToken = null) {
// copy properties
@@ -24,149 +45,615 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
_syncHelper.SetPresence = SetPresence;
_syncHelper.Filter = Filter;
_syncHelper.FullState = FullState;
- // run sync
- var sync = await _syncHelper.SyncAsync(cancellationToken);
+
+ var sync = await _syncHelper.SyncAsync(cancellationToken, noDelay: false);
if (sync is null) return await ContinueAsync(cancellationToken);
+
if (MergedState is null) MergedState = sync;
- else MergedState = MergeSyncs(MergedState, sync);
+ else MergedState = await MergeSyncs(MergedState, sync);
Since = sync.NextBatch;
+
return (sync, MergedState);
}
- private SyncResponse MergeSyncs(SyncResponse oldState, SyncResponse newState) {
- oldState.NextBatch = newState.NextBatch ?? oldState.NextBatch;
+ // private async IAsyncEnumerable<List<SyncResponse>> MergeP() {
+
+ // }
+
+ private async Task<SyncResponse?> OptimiseFrom(string start, Action<int, int>? progressCallback = null) {
+ var a = GetSerializedUnoptimisedResponses(start);
+ SyncResponse merged = null!;
+ int iters = 0;
+ var sw = Stopwatch.StartNew();
+ await foreach (var (key, resp) in a) {
+ if (resp is null) continue;
+ iters++;
+ // if (key == "init") _merged = resp;
+ // else _merged = await MergeSyncs(_merged, resp);
+ // Console.WriteLine($"{key} @ {resp.GetDerivedSyncTime()} -> {resp.NextBatch}");
+ }
- oldState.AccountData ??= new EventList();
- oldState.AccountData.Events ??= new List<StateEventResponse>();
- if (newState.AccountData?.Events is not null)
- oldState.AccountData.Events.MergeStateEventLists(newState.AccountData?.Events ?? new List<StateEventResponse>());
+ Console.WriteLine($"OptimiseFrom {start} finished in {sw.Elapsed.TotalMilliseconds}ms with {iters} iterations");
- oldState.Presence ??= new SyncResponse.PresenceDataStructure();
- if (newState.Presence?.Events is not null)
- oldState.Presence.Events.MergeStateEventLists(newState.Presence?.Events ?? new List<StateEventResponse>());
+ return merged;
+ }
- oldState.DeviceOneTimeKeysCount ??= new Dictionary<string, int>();
- if (newState.DeviceOneTimeKeysCount is not null)
- foreach (var (key, value) in newState.DeviceOneTimeKeysCount)
- oldState.DeviceOneTimeKeysCount[key] = value;
+ private async Task<List<string>> GetSerializedUnoptimisedKeysParallel(string start = "init") {
+ Dictionary<string, string> pairs = [];
+ var unoptimisedKeys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.Contains('/')).ToFrozenSet();
+ await Parallel.ForEachAsync(unoptimisedKeys, async (key, _) => {
+ var data = await storageProvider.LoadObjectAsync<SyncResponse>(key, SyncResponseSerializerContext.Default.SyncResponse);
+ if (data is null) return;
+ lock (pairs)
+ pairs.Add(key, data.NextBatch);
+ });
+
+ var serializedKeys = new List<string>();
+ var currentKey = start;
+ while (pairs.TryGetValue(currentKey, out var nextKey)) {
+ serializedKeys.Add(currentKey);
+ currentKey = nextKey;
+ }
- oldState.Rooms ??= new SyncResponse.RoomsDataStructure();
- if (newState.Rooms is not null)
- oldState.Rooms = MergeRoomsDataStructure(oldState.Rooms, newState.Rooms);
+ return serializedKeys;
+ }
- oldState.ToDevice ??= new EventList();
- oldState.ToDevice.Events ??= new List<StateEventResponse>();
- if (newState.ToDevice?.Events is not null)
- oldState.ToDevice.Events.MergeStateEventLists(newState.ToDevice?.Events ?? new List<StateEventResponse>());
+ private async Task<SyncResponse> MergeRecursive(string[] keys, int depth = 0) {
+ if (keys.Length > 10) {
+ var newKeys = keys.Chunk((keys.Length / 2) + 1).ToArray();
+ var (left, right) = (MergeRecursive(newKeys[0], depth + 1), MergeRecursive(newKeys[1], depth + 1));
+ await Task.WhenAll(left, right);
+ return await MergeSyncs(await left, await right);
+ }
- oldState.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
- if (newState.DeviceLists?.Changed is not null)
- foreach (var s in oldState.DeviceLists.Changed!)
- oldState.DeviceLists.Changed.Add(s);
- if (newState.DeviceLists?.Left is not null)
- foreach (var s in oldState.DeviceLists.Left!)
- oldState.DeviceLists.Left.Add(s);
+ // Console.WriteLine("Hit max depth: " + depth);
+ SyncResponse merged = await LoadSyncResponse(keys[0]);
+ foreach (var key in keys[1..]) {
+ merged = await MergeSyncs(merged, await LoadSyncResponse(key));
+ }
- return oldState;
+ return merged;
}
-#region Merge rooms
+ public async Task OptimiseStore(Action<int, int>? progressCallback = null) {
+ if (storageProvider is null) return;
+ if (!await storageProvider.ObjectExistsAsync("init")) return;
+ //
+ // {
+ // var a = GetSerializedUnoptimisedResponses();
+ // SyncResponse _merged = null!;
+ // await foreach (var (key, resp) in a) {
+ // if (resp is null) continue;
+ // // if (key == "init") _merged = resp;
+ // // else _merged = await MergeSyncs(_merged, resp);
+ // // Console.WriteLine($"{key} @ {resp.GetDerivedSyncTime()} -> {resp.NextBatch}");
+ // }
+ // Environment.Exit(0);
+ // }
+
+ {
+ // List<string> serialisedKeys = new(4000000);
+ // await foreach (var res in GetSerializedUnoptimisedResponses()) {
+ // if (res.resp is null) continue;
+ // serialisedKeys.Add(res.key);
+ // if (serialisedKeys.Count % 1000 == 0) _ = Console.Out.WriteAsync($"{serialisedKeys.Count}\r");
+ // }
+
+ List<string> serialisedKeys = await GetSerializedUnoptimisedKeysParallel();
+
+ await MergeRecursive(serialisedKeys.ToArray());
+
+ // var chunkSize = serialisedKeys.Count / Environment.ProcessorCount;
+ // var chunks = serialisedKeys.Chunk(chunkSize+1).Select(x => (x.First(), x.Length)).ToList();
+ // Console.WriteLine($"Got {chunks.Count} chunks:");
+ // foreach (var chunk in chunks) {
+ // Console.WriteLine($"Chunk {chunk.Item1} with length {chunk.Length}");
+ // }
+ //
+ // var mergeTasks = chunks.Select(async chunk => {
+ // var (startKey, length) = chunk;
+ // string currentKey = startKey;
+ // SyncResponse merged = await storageProvider.LoadObjectAsync<SyncResponse>(currentKey, SyncResponseSerializerContext.Default.SyncResponse);
+ // for (int i = 0; i < length; i++) {
+ // if (i % 1000 == 0) Console.Write($"{i}... \r");
+ // var newData = await storageProvider.LoadObjectAsync<SyncResponse>(currentKey, SyncResponseSerializerContext.Default.SyncResponse);
+ // merged = await MergeSyncs(merged, newData);
+ // currentKey = merged.NextBatch;
+ // }
+ //
+ // return merged;
+ // }).ToList();
+ //
+ // var mergedResults = await Task.WhenAll(mergeTasks);
+ // SyncResponse _merged = mergedResults[0];
+ // foreach (var key in mergedResults[1..]) {
+ // _merged = await MergeSyncs(_merged, key);
+ // }
+ }
+
+ Environment.Exit(0);
+
+ return;
+
+ var totalSw = Stopwatch.StartNew();
+ Console.Write("Optimising sync store...");
+ var initLoadTask = LoadSyncResponse("init");
+ var keys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.StartsWith("old/")).ToFrozenSet();
+ var count = keys.Count - 1;
+ int total = count;
+ Console.WriteLine($"Found {count} entries to optimise in {totalSw.Elapsed}.");
+
+ var merged = await initLoadTask;
+ if (merged is null) return;
+ if (!keys.Contains(merged.NextBatch)) {
+ Console.WriteLine("Next response after initial sync is not present, not checkpointing!");
+ return;
+ }
+
+ // if (keys.Count > 100_000) {
+ // // batch data by core count
+ //
+ // return;
+ // }
+
+ // We back up old entries
+ var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
+ await storageProvider.MoveObjectAsync("init", $"{oldPath}/init");
+
+ var moveTasks = new List<Task>();
+
+ Dictionary<string, Dictionary<string, TimeSpan>> traces = [];
+ string[] loopTrace = new string[4];
+ while (keys.Contains(merged.NextBatch)) {
+ loopTrace[0] = $"Merging {merged.NextBatch}, {--count} remaining";
+ var sw = Stopwatch.StartNew();
+ var swt = Stopwatch.StartNew();
+ var next = await LoadSyncResponse(merged.NextBatch);
+ loopTrace[1] = $"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms";
+ if (next is null || merged.NextBatch == next.NextBatch) break;
+
+ // back up old entry
+ moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}"));
+
+ if (moveTasks.Count >= 250)
+ moveTasks.RemoveAll(t => t.IsCompleted);
+
+ if (moveTasks.Count >= 500) {
+ Console.Write("Reached 500 moveTasks... ");
+ moveTasks.RemoveAll(t => t.IsCompleted);
+ Console.WriteLine($"{moveTasks.Count} remaining");
+ }
+
+ var trace = new Dictionary<string, TimeSpan>();
+ traces[merged.NextBatch] = trace;
+ merged = await MergeSyncs(merged, next, trace);
+ loopTrace[2] = $"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms";
+ loopTrace[3] = $"Total {swt.Elapsed.TotalMilliseconds}ms";
+
+ if (swt.ElapsedMilliseconds >= 25)
+ Console.WriteLine(string.Join("... ", loopTrace));
+
+ if (count % 50 == 0)
+ progressCallback?.Invoke(count, total);
+#if WRITE_TRACE
+ var traceString = string.Join("\n", traces.Select(x => $"{x.Key}\t{x.Value.ToJson(indent: false, ignoreNull: true)}"));
+ var ms = new MemoryStream(Encoding.UTF8.GetBytes(traceString));
+ var traceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}", ms);
+ var slowtraceString = string.Join("\n",
+ traces
+ .Where(x=>x.Value.Max(y=>y.Value.TotalMilliseconds) >= 100)
+ .OrderBy(x=>x.Value.Max(y=>y.Value))
+ .Select(x => $"{x.Key}\t{x.Value.Where(y => y.Value.TotalMilliseconds >= 100).ToDictionary().ToJson(indent: false, ignoreNull: true)}"));
+ var slowms = new MemoryStream(Encoding.UTF8.GetBytes(slowtraceString));
+ var slowTraceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}-slow", slowms);
+ var slow1straceString = string.Join("\n",
+ traces
+ .Where(x=>x.Value.Max(y=>y.Value.TotalMilliseconds) >= 1000)
+ .OrderBy(x=>x.Value.Max(y=>y.Value))
+ .Select(x => $"{x.Key}\t{x.Value.Where(y => y.Value.TotalMilliseconds >= 1000).ToDictionary().ToJson(indent: false, ignoreNull: true)}"));
+ var slow1sms = new MemoryStream(Encoding.UTF8.GetBytes(slow1straceString));
+ var slow1sTraceSaveTask = storageProvider.SaveStreamAsync($"traces/{oldPath}-slow-1s", slow1sms);
+
+ await Task.WhenAll(traceSaveTask, slowTraceSaveTask, slow1sTraceSaveTask);
+#endif
+ }
- private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure oldState, SyncResponse.RoomsDataStructure newState) {
- oldState.Join ??= new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>();
- foreach (var (key, value) in newState.Join ?? new Dictionary<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure>())
- if (!oldState.Join.ContainsKey(key)) oldState.Join[key] = value;
- else oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value);
+ await SaveSyncResponse("init", merged);
+ await Task.WhenAll(moveTasks);
- oldState.Invite ??= new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>();
- foreach (var (key, value) in newState.Invite ?? new Dictionary<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>())
- if (!oldState.Invite.ContainsKey(key)) oldState.Invite[key] = value;
- else oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value);
+ Console.WriteLine($"Optimised store in {totalSw.Elapsed.TotalMilliseconds}ms");
+ }
+
+ /// <summary>
+ /// Remove all but initial sync and last checkpoint
+ /// </summary>
+ public async Task RemoveOldSnapshots() {
+ if (storageProvider is null) return;
+ var sw = Stopwatch.StartNew();
+
+ var map = await GetCheckpointMap();
+ if (map is null) return;
+ if (map.Count < 3) return;
+
+ var toRemove = map.Keys.Skip(1).Take(map.Count - 2).ToList();
+ Console.Write("Cleaning up old snapshots: ");
+ foreach (var key in toRemove) {
+ var path = $"old/{key}/init";
+ if (await storageProvider.ObjectExistsAsync(path)) {
+ Console.Write($"{key}... ");
+ await storageProvider.DeleteObjectAsync(path);
+ }
+ }
+
+ Console.WriteLine("Done!");
+ Console.WriteLine($"Removed {toRemove.Count} old snapshots in {sw.Elapsed.TotalMilliseconds}ms");
+ }
+
+ public async Task UnrollOptimisedStore() {
+ if (storageProvider is null) return;
+ Console.WriteLine("WARNING: Unrolling sync store!");
+ }
+
+ public async Task SquashOptimisedStore(int targetCountPerCheckpoint) {
+ Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint...");
+ var checkpoints = await GetCheckpointMap();
+ if (checkpoints is null) return;
+
+ Console.WriteLine(
+ $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries");
+ Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints.");
+ }
+
+ public async Task dev() {
+ int i = 0;
+ var sw = Stopwatch.StartNew();
+ var hist = GetSerialisedHistory();
+ await foreach (var (key, resp) in hist) {
+ if (resp is null) continue;
+ // Console.WriteLine($"[{++i}] {key} -> {resp.NextBatch} ({resp.GetDerivedSyncTime()})");
+ i++;
+ }
+
+ Console.WriteLine($"Iterated {i} syncResponses in {sw.Elapsed}");
+ Environment.Exit(0);
+ }
+
+ private async IAsyncEnumerable<(string key, SyncResponse? resp)> GetSerialisedHistory() {
+ if (storageProvider is null) yield break;
+ var map = await GetCheckpointMap();
+ var currentRange = map.First();
+ var nextKey = $"old/{map.First().Key}/init";
+ var next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ while (true) {
+ var data = await next;
+ if (data is null) break;
+ yield return (nextKey, data);
+ if (currentRange.Value.Contains(data.NextBatch)) {
+ nextKey = $"old/{currentRange.Key}/{data.NextBatch}";
+ }
+ else if (map.Any(x => x.Value.Contains(data.NextBatch))) {
+ currentRange = map.First(x => x.Value.Contains(data.NextBatch));
+ nextKey = $"old/{currentRange.Key}/{data.NextBatch}";
+ }
+ else if (await storageProvider.ObjectExistsAsync(data.NextBatch)) {
+ nextKey = data.NextBatch;
+ }
+ else break;
+
+ next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ }
+ }
+
+ private async IAsyncEnumerable<(string key, SyncResponse? resp)> GetSerializedUnoptimisedResponses(string since = "init") {
+ if (storageProvider is null) yield break;
+ var nextKey = since;
+ var next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ while (true) {
+ var data = await next;
+
+ if (data is null) break;
+ yield return (nextKey, data);
+ if (await storageProvider.ObjectExistsAsync(data.NextBatch)) {
+ nextKey = data.NextBatch;
+ }
+ else break;
+
+ next = storageProvider.LoadObjectAsync<SyncResponse>(nextKey);
+ }
+ }
+
+ public async Task<SyncResponse?> GetMergedUpTo(DateTime time) {
+ if (storageProvider is null) return null;
+ var unixTime = new DateTimeOffset(time.ToUniversalTime()).ToUnixTimeMilliseconds();
+ var map = await GetCheckpointMap();
+ if (map is null) return new();
+ var stream = GetSerialisedHistory().GetAsyncEnumerator();
+ SyncResponse? merged = await stream.MoveNextAsync() ? stream.Current.resp : null;
+
+ if (merged.GetDerivedSyncTime() > unixTime) {
+ Console.WriteLine("Initial sync is already past the target time!");
+ Console.WriteLine($"CURRENT: {merged.GetDerivedSyncTime()} (UTC: {DateTimeOffset.FromUnixTimeMilliseconds(merged.GetDerivedSyncTime())})");
+ Console.WriteLine($" TARGET: {unixTime} ({time.Kind}: {time}, UTC: {time.ToUniversalTime()})");
+ return null;
+ }
- oldState.Leave ??= new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>();
- foreach (var (key, value) in newState.Leave ?? new Dictionary<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure>()) {
- if (!oldState.Leave.ContainsKey(key)) oldState.Leave[key] = value;
- else oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value);
- if (oldState.Invite.ContainsKey(key)) oldState.Invite.Remove(key);
- if (oldState.Join.ContainsKey(key)) oldState.Join.Remove(key);
+ while (await stream.MoveNextAsync()) {
+ var (key, resp) = stream.Current;
+ if (resp is null) continue;
+ if (resp.GetDerivedSyncTime() > unixTime) break;
+ merged = await MergeSyncs(merged, resp);
}
+ return merged;
+ }
+
+ private async Task<ImmutableSortedDictionary<ulong, FrozenSet<string>>> GetCheckpointMap() {
+ if (storageProvider is null) return null;
+ var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+ var map = new Dictionary<ulong, List<string>>();
+ foreach (var key in keys) {
+ if (!key.StartsWith("old/")) continue;
+ var parts = key.Split('/');
+ if (parts.Length < 3) continue;
+ if (!ulong.TryParse(parts[1], out var checkpoint)) continue;
+ if (!map.ContainsKey(checkpoint)) map[checkpoint] = new();
+ map[checkpoint].Add(parts[2]);
+ }
+
+ return map.OrderBy(static x => x.Key).ToImmutableSortedDictionary(static x => x.Key, x => x.Value.ToFrozenSet());
+ }
+
+ private async Task<SyncResponse> MergeSyncs(SyncResponse oldSync, SyncResponse newSync, Dictionary<string, TimeSpan>? trace = null) {
+ // var sw = Stopwatch.StartNew();
+ oldSync.NextBatch = newSync.NextBatch;
+
+ void Trace(string key, TimeSpan span) {
+ if (trace is not null) {
+ lock (trace)
+ trace.Add(key, span);
+ }
+ }
+
+ var accountDataTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.AccountData = MergeEventList(oldSync.AccountData, newSync.AccountData);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: AccountData took {sw.ElapsedMilliseconds}ms");
+ Trace("AccountData", sw.GetElapsedAndRestart());
+ });
+
+ var presenceTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.Presence = MergeEventListBy(oldSync.Presence, newSync.Presence,
+ static (oldState, newState) => oldState.Sender == newState.Sender && oldState.Type == newState.Type);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: Presence took {sw.ElapsedMilliseconds}ms");
+ Trace("Presence", sw.GetElapsedAndRestart());
+ });
+
+ {
+ var sw = Stopwatch.StartNew();
+ // TODO: can this be cleaned up?
+ oldSync.DeviceOneTimeKeysCount ??= new();
+ if (newSync.DeviceOneTimeKeysCount is not null)
+ foreach (var (key, value) in newSync.DeviceOneTimeKeysCount)
+ oldSync.DeviceOneTimeKeysCount[key] = value;
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceOneTimeKeysCount took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceOneTimeKeysCount", sw.GetElapsedAndRestart());
+ }
+
+ var roomsTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ if (newSync.Rooms is not null)
+ oldSync.Rooms = MergeRoomsDataStructure(oldSync.Rooms, newSync.Rooms, Trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: Rooms took {sw.ElapsedMilliseconds}ms");
+ Trace("Rooms", sw.GetElapsedAndRestart());
+ });
+
+ var toDeviceTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ // oldSync.ToDevice = MergeEventList(oldSync.ToDevice, newSync.ToDevice);
+ oldSync.ToDevice = AppendEventList(oldSync.ToDevice, newSync.ToDevice);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: ToDevice took {sw.ElapsedMilliseconds}ms");
+ Trace("ToDevice", sw.GetElapsedAndRestart());
+ });
+
+ var deviceListsTask = Task.Run(() => {
+ var sw = Stopwatch.StartNew();
+ oldSync.DeviceLists ??= new SyncResponse.DeviceListsDataStructure();
+ oldSync.DeviceLists.Changed ??= [];
+ oldSync.DeviceLists.Left ??= [];
+ if (newSync.DeviceLists?.Changed is not null)
+ foreach (var s in newSync.DeviceLists.Changed!) {
+ oldSync.DeviceLists.Left.Remove(s);
+ oldSync.DeviceLists.Changed.Add(s);
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceLists.Changed took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceLists.Changed", sw.GetElapsedAndRestart());
+
+ if (newSync.DeviceLists?.Left is not null)
+ foreach (var s in newSync.DeviceLists.Left!) {
+ oldSync.DeviceLists.Changed.Remove(s);
+ oldSync.DeviceLists.Left.Add(s);
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: DeviceLists.Left took {sw.ElapsedMilliseconds}ms");
+ Trace("DeviceLists.Left", sw.GetElapsedAndRestart());
+ });
+
+ await Task.WhenAll(accountDataTask, presenceTask, roomsTask, toDeviceTask, deviceListsTask);
+
+ return oldSync;
+ }
+
+#region Merge rooms
+
+ private SyncResponse.RoomsDataStructure MergeRoomsDataStructure(SyncResponse.RoomsDataStructure? oldState, SyncResponse.RoomsDataStructure newState,
+ Action<string, TimeSpan> trace) {
+ var sw = Stopwatch.StartNew();
+ if (oldState is null) return newState;
+
+ if (newState.Join is { Count: > 0 })
+ if (oldState.Join is null)
+ oldState.Join = newState.Join;
+ else
+ foreach (var (key, value) in newState.Join)
+ if (!oldState.Join.TryAdd(key, value))
+ oldState.Join[key] = MergeJoinedRoomDataStructure(oldState.Join[key], value, key, trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Join took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Join", sw.GetElapsedAndRestart());
+
+ if (newState.Invite is { Count: > 0 })
+ if (oldState.Invite is null)
+ oldState.Invite = newState.Invite;
+ else
+ foreach (var (key, value) in newState.Invite)
+ if (!oldState.Invite.TryAdd(key, value))
+ oldState.Invite[key] = MergeInvitedRoomDataStructure(oldState.Invite[key], value, key, trace);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Invite took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Invite", sw.GetElapsedAndRestart());
+
+ if (newState.Leave is { Count: > 0 })
+ if (oldState.Leave is null)
+ oldState.Leave = newState.Leave;
+ else
+ foreach (var (key, value) in newState.Leave) {
+ if (!oldState.Leave.TryAdd(key, value))
+ oldState.Leave[key] = MergeLeftRoomDataStructure(oldState.Leave[key], value, key, trace);
+ if (oldState.Invite?.ContainsKey(key) ?? false) oldState.Invite.Remove(key);
+ if (oldState.Join?.ContainsKey(key) ?? false) oldState.Join.Remove(key);
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeRoomsDataStructure.Leave took {sw.ElapsedMilliseconds}ms");
+ trace("MergeRoomsDataStructure.Leave", sw.GetElapsedAndRestart());
+
return oldState;
}
- private SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= new List<StateEventResponse>();
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= new List<StateEventResponse>();
- oldData.State ??= new EventList();
- oldData.State.Events ??= new List<StateEventResponse>();
+ private static SyncResponse.RoomsDataStructure.LeftRoomDataStructure MergeLeftRoomDataStructure(SyncResponse.RoomsDataStructure.LeftRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.LeftRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
+ var sw = Stopwatch.StartNew();
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>());
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
+ trace($"LeftRoomDataStructure.AccountData/{roomId}", sw.GetElapsedAndRestart());
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>());
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
+ trace($"LeftRoomDataStructure.Timeline/{roomId}", sw.GetElapsedAndRestart());
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>());
+ oldData.State = MergeEventList(oldData.State, newData.State);
+ trace($"LeftRoomDataStructure.State/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
- private SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData) {
- oldData.InviteState ??= new EventList();
- oldData.InviteState.Events ??= new List<StateEventResponse>();
- if (newData.InviteState?.Events is not null)
- oldData.InviteState.Events.MergeStateEventLists(newData.InviteState?.Events ?? new List<StateEventResponse>());
+ private static SyncResponse.RoomsDataStructure.InvitedRoomDataStructure MergeInvitedRoomDataStructure(SyncResponse.RoomsDataStructure.InvitedRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.InvitedRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
+ var sw = Stopwatch.StartNew();
+ oldData.InviteState = MergeEventList(oldData.InviteState, newData.InviteState);
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeInvitedRoomDataStructure.InviteState took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"InvitedRoomDataStructure.InviteState/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
- private SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
- SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData) {
- oldData.AccountData ??= new EventList();
- oldData.AccountData.Events ??= new List<StateEventResponse>();
- oldData.Timeline ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure();
- oldData.Timeline.Events ??= new List<StateEventResponse>();
- oldData.State ??= new EventList();
- oldData.State.Events ??= new List<StateEventResponse>();
- oldData.Ephemeral ??= new EventList();
- oldData.Ephemeral.Events ??= new List<StateEventResponse>();
+ private static SyncResponse.RoomsDataStructure.JoinedRoomDataStructure MergeJoinedRoomDataStructure(SyncResponse.RoomsDataStructure.JoinedRoomDataStructure oldData,
+ SyncResponse.RoomsDataStructure.JoinedRoomDataStructure newData, string roomId, Action<string, TimeSpan> trace) {
+ var sw = Stopwatch.StartNew();
+
+ oldData.AccountData = MergeEventList(oldData.AccountData, newData.AccountData);
- if (newData.AccountData?.Events is not null)
- oldData.AccountData.Events.MergeStateEventLists(newData.AccountData?.Events ?? new List<StateEventResponse>());
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.AccountData took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.AccountData/{roomId}", sw.GetElapsedAndRestart());
- if (newData.Timeline?.Events is not null)
- oldData.Timeline.Events.MergeStateEventLists(newData.Timeline?.Events ?? new List<StateEventResponse>());
+ oldData.Timeline = AppendEventList(oldData.Timeline, newData.Timeline) as SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.TimelineDataStructure
+ ?? throw new InvalidOperationException("Merged room timeline was not TimelineDataStructure");
oldData.Timeline.Limited = newData.Timeline?.Limited ?? oldData.Timeline.Limited;
oldData.Timeline.PrevBatch = newData.Timeline?.PrevBatch ?? oldData.Timeline.PrevBatch;
- if (newData.State?.Events is not null)
- oldData.State.Events.MergeStateEventLists(newData.State?.Events ?? new List<StateEventResponse>());
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Timeline took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Timeline/{roomId}", sw.GetElapsedAndRestart());
- if (newData.Ephemeral?.Events is not null)
- oldData.Ephemeral.Events.MergeStateEventLists(newData.Ephemeral?.Events ?? new List<StateEventResponse>());
+ oldData.State = MergeEventList(oldData.State, newData.State);
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.State took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.State/{roomId}", sw.GetElapsedAndRestart());
+
+ oldData.Ephemeral = MergeEventList(oldData.Ephemeral, newData.Ephemeral);
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Ephemeral took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Ephemeral/{roomId}", sw.GetElapsedAndRestart());
oldData.UnreadNotifications ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.UnreadNotificationsDataStructure();
oldData.UnreadNotifications.HighlightCount = newData.UnreadNotifications?.HighlightCount ?? oldData.UnreadNotifications.HighlightCount;
oldData.UnreadNotifications.NotificationCount = newData.UnreadNotifications?.NotificationCount ?? oldData.UnreadNotifications.NotificationCount;
- oldData.Summary ??= new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure.SummaryDataStructure {
- Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes,
- JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount,
- InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount
- };
- oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
- oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
- oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.UnreadNotifications took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoom$DataStructure.UnreadNotifications/{roomId}", sw.GetElapsedAndRestart());
+
+ if (oldData.Summary is null)
+ oldData.Summary = newData.Summary;
+ else {
+ oldData.Summary.Heroes = newData.Summary?.Heroes ?? oldData.Summary.Heroes;
+ oldData.Summary.JoinedMemberCount = newData.Summary?.JoinedMemberCount ?? oldData.Summary.JoinedMemberCount;
+ oldData.Summary.InvitedMemberCount = newData.Summary?.InvitedMemberCount ?? oldData.Summary.InvitedMemberCount;
+ }
+
+ if (sw.ElapsedMilliseconds > 100) Console.WriteLine($"WARN: MergeJoinedRoomDataStructure.Summary took {sw.ElapsedMilliseconds}ms for {roomId}");
+ trace($"JoinedRoomDataStructure.Summary/{roomId}", sw.GetElapsedAndRestart());
return oldData;
}
#endregion
+
+ private static EventList? MergeEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ // oldState.Events.MergeStateEventLists(newState.Events);
+ oldState = MergeEventListBy(oldState, newState, static (oldEvt, newEvt) => oldEvt.Type == newEvt.Type && oldEvt.StateKey == newEvt.StateKey);
+ return oldState;
+ }
+
+ private static EventList? MergeEventListBy(EventList? oldState, EventList? newState, Func<StateEventResponse, StateEventResponse, bool> comparer) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.ReplaceBy(newState.Events, comparer);
+ return oldState;
+ }
+
+ private static EventList? AppendEventList(EventList? oldState, EventList? newState) {
+ if (newState is null) return oldState;
+ if (oldState is null) {
+ return newState;
+ }
+
+ if (newState.Events is null) return oldState;
+ if (oldState.Events is null) {
+ oldState.Events = newState.Events;
+ return oldState;
+ }
+
+ oldState.Events.AddRange(newState.Events);
+ return oldState;
+ }
}
\ No newline at end of file
|