about summary refs log tree commit diff
path: root/LibMatrix
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2025-05-03 13:33:48 +0200
committerRory& <root@rory.gay>2025-05-03 13:33:48 +0200
commit229d07261e67a48d93103d6bcec84cce370153ec (patch)
treec8334ad4705617c99fc8c98a8721d1abe0d41468 /LibMatrix
parentAdd support for ignoring users, add user/room/event reporting (diff)
downloadLibMatrix-229d07261e67a48d93103d6bcec84cce370153ec.tar.xz
Sync preprocessor support
Diffstat (limited to 'LibMatrix')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs17
-rw-r--r--LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs43
-rw-r--r--LibMatrix/LibMatrix.csproj8
3 files changed, 60 insertions, 8 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs

index abbf541..e8ca8b7 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -5,6 +5,7 @@ using ArcaneLibs.Collections; using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; +using LibMatrix.Helpers.SyncProcessors; using LibMatrix.Homeservers; using LibMatrix.Interfaces.Services; using LibMatrix.Responses; @@ -23,6 +24,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? SetPresence { get; set; } = "online"; public bool UseInternalStreamingSync { get; set; } = true; + public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [ + SimpleSyncProcessors.FillRoomIds + ]; + public string? FilterId { get => _filterId; set { @@ -76,7 +81,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg else if (Filter is not null) _filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId; else _filterId = null; - + _filterIsDirty = false; } @@ -106,6 +111,11 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg var sync = await SyncAsyncInternal(cancellationToken, noDelay); // Ditto here. if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + foreach (var preprocessor in SyncPreprocessors) { + sync = preprocessor(sync); + } + return sync; } @@ -165,11 +175,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg if (sync is null) continue; if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch; yield return sync; - - + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) { - logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); + logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); await Task.Delay(timeToWait); } } diff --git a/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs new file mode 100644
index 0000000..5cf5c36 --- /dev/null +++ b/LibMatrix/Helpers/SyncProcessors/SimpleSyncProcessors.cs
@@ -0,0 +1,43 @@ +using System.Diagnostics; +using LibMatrix.Responses; + +namespace LibMatrix.Helpers.SyncProcessors; + +public class SimpleSyncProcessors { + public static SyncResponse? FillRoomIds(SyncResponse? resp) { + var sw = Stopwatch.StartNew(); + if (resp is not { Rooms: not null }) return resp; + if (resp.Rooms.Join is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Join, (roomEntry) => { + var (id, data) = roomEntry; + if (data.AccountData is { Events.Count: > 0 }) + Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id); + if (data.Ephemeral is { Events.Count: > 0 }) + Parallel.ForEach(data.Ephemeral.Events, evt => evt.RoomId = id); + if (data.Timeline is { Events.Count: > 0 }) + Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id); + if (data.State is { Events.Count: > 0 }) + Parallel.ForEach(data.State.Events, evt => evt.RoomId = id); + }); + if (resp.Rooms.Leave is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Leave, (roomEntry) => { + var (id, data) = roomEntry; + if (data.AccountData is { Events.Count: > 0 }) + Parallel.ForEach(data.AccountData.Events, evt => evt.RoomId = id); + if (data.Timeline is { Events.Count: > 0 }) + Parallel.ForEach(data.Timeline.Events, evt => evt.RoomId = id); + if (data.State is { Events.Count: > 0 }) + Parallel.ForEach(data.State.Events, evt => evt.RoomId = id); + }); + if (resp.Rooms.Invite is { Count: > 0 }) + Parallel.ForEach(resp.Rooms.Invite, (roomEntry) => { + var (id, data) = roomEntry; + if (data.InviteState is { Events.Count: > 0 }) + Parallel.ForEach(data.InviteState.Events, evt => evt.RoomId = id); + }); + + Console.WriteLine($"SimpleSyncProcessors.FillRoomIds took {sw.Elapsed}"); + + return resp; + } +} \ No newline at end of file diff --git a/LibMatrix/LibMatrix.csproj b/LibMatrix/LibMatrix.csproj
index 3d10487..62bb48f 100644 --- a/LibMatrix/LibMatrix.csproj +++ b/LibMatrix/LibMatrix.csproj
@@ -12,14 +12,14 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.1" /> - <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.1" /> + <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.1"/> + <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.1"/> <ProjectReference Include="..\LibMatrix.EventTypes\LibMatrix.EventTypes.csproj"/> </ItemGroup> <ItemGroup> -<!-- <PackageReference Include="ArcaneLibs" Version="1.0.0-preview.20250313-104848" Condition="'$(Configuration)' == 'Release'" />--> -<!-- <ProjectReference Include="..\ArcaneLibs\ArcaneLibs\ArcaneLibs.csproj" Condition="'$(Configuration)' == 'Debug'"/>--> + <!-- <PackageReference Include="ArcaneLibs" Version="1.0.0-preview.20250313-104848" Condition="'$(Configuration)' == 'Release'" />--> + <!-- <ProjectReference Include="..\ArcaneLibs\ArcaneLibs\ArcaneLibs.csproj" Condition="'$(Configuration)' == 'Debug'"/>--> <ProjectReference Include="..\ArcaneLibs\ArcaneLibs\ArcaneLibs.csproj"/> </ItemGroup>