From 229d07261e67a48d93103d6bcec84cce370153ec Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 3 May 2025 13:33:48 +0200 Subject: Sync preprocessor support --- LibMatrix/Helpers/SyncHelper.cs | 17 +++++++-- .../Helpers/SyncProcessors/SimpleSyncProcessors.cs | 43 ++++++++++++++++++++++ LibMatrix/LibMatrix.csproj | 8 ++-- 3 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs (limited to 'LibMatrix') diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs index abbf541..e8ca8b7 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs @@ -5,6 +5,7 @@ using ArcaneLibs.Collections; using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; +using LibMatrix.Helpers.SyncProcessors; using LibMatrix.Homeservers; using LibMatrix.Interfaces.Services; using LibMatrix.Responses; @@ -23,6 +24,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? SetPresence { get; set; } = "online"; public bool UseInternalStreamingSync { get; set; } = true; + public List> SyncPreprocessors { get; } = [ + SimpleSyncProcessors.FillRoomIds + ]; + public string? FilterId { get => _filterId; set { @@ -76,7 +81,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg else if (Filter is not null) _filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId; else _filterId = null; - + _filterIsDirty = false; } @@ -106,6 +111,11 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg var sync = await SyncAsyncInternal(cancellationToken, noDelay); // Ditto here. if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + foreach (var preprocessor in SyncPreprocessors) { + sync = preprocessor(sync); + } + return sync; } @@ -165,11 +175,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg if (sync is null) continue; if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch; yield return sync; - - + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) { - logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); + logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); await Task.Delay(timeToWait); } } diff --git a/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs new file mode 100644 index 0000000..5cf5c36 --- /dev/null +++ b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs @@ -0,0 +1,43 @@ +using System.Diagnostics; +using LibMatrix.Responses; + +namespace LibMatrix.Helpers.SyncProcessors; + +public class SimpleSyncProcessors { + public static SyncResponse? FillRoomIds(SyncResponse? resp) { + var sw = Stopwatch.StartNew(); + if (resp is not { Rooms: not null }) return resp; + if (resp.Rooms.Join is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Join, (roomEntry) => { + var (id, data) = roomEntry; + if (data.AccountData is { Events.Count: > 0 }) + Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id); + if (data.Ephemeral is { Events.Count: > 0 }) + Parallel.ForEach(data.Ephemeral.Events, evt => evt.RoomId = id); + if (data.Timeline is { Events.Count: > 0 }) + Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id); + if (data.State is { Events.Count: > 0 }) + Parallel.ForEach(data.State.Events, evt => evt.RoomId = id); + }); + if (resp.Rooms.Leave is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Leave, (roomEntry) => { + var (id, data) = roomEntry; + if (data.AccountData is { Events.Count: > 0 }) + Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id); + if (data.Timeline is { Events.Count: > 0 }) + Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id); + if (data.State is { Events.Count: > 0 }) + Parallel.ForEach(data.State.Events, evt => evt.RoomId = id); + }); + if (resp.Rooms.Invite is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Invite, (roomEntry) => { + var (id, data) = roomEntry; + if (data.InviteState is { Events.Count: > 0 }) + Parallel.ForEach(data.InviteState.Events, evt => evt.RoomId = id); + }); + + Console.WriteLine($"SimpleSyncProcessors.FillRoomIds took {sw.Elapsed}"); + + return resp; + } +} \ No newline at end of file diff --git a/LibMatrix/LibMatrix.csproj b/LibMatrix/LibMatrix.csproj index 3d10487..62bb48f 100644 --- a/LibMatrix/LibMatrix.csproj +++ b/LibMatrix/LibMatrix.csproj @@ -12,14 +12,14 @@ - - + + - - + + -- cgit 1.5.1