about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2025-05-07 20:58:12 +0200
committerRory& <root@rory.gay>2025-05-07 20:58:12 +0200
commita0fc4d145c77cc14af09cbbe285a0835e842728a (patch)
tree6958d610b44d11f90ba31059eb35ae8447828054
parentSync preprocessor support (diff)
downloadLibMatrix-a0fc4d145c77cc14af09cbbe285a0835e842728a.tar.xz
MSC4222 emulation for left rooms
-rw-r--r--LibMatrix.EventTypes/LibMatrix.EventTypes.csproj2
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs46
-rw-r--r--LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs125
-rw-r--r--LibMatrix/Responses/SyncResponse.cs19
-rw-r--r--Utilities/LibMatrix.DevTestBot/LibMatrix.DevTestBot.csproj2
5 files changed, 188 insertions, 6 deletions
diff --git a/LibMatrix.EventTypes/LibMatrix.EventTypes.csproj b/LibMatrix.EventTypes/LibMatrix.EventTypes.csproj

index 02ce82d..0924aba 100644 --- a/LibMatrix.EventTypes/LibMatrix.EventTypes.csproj +++ b/LibMatrix.EventTypes/LibMatrix.EventTypes.csproj
@@ -7,7 +7,7 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="ArcaneLibs" Version="1.0.0-preview.20250313-104848" Condition="'$(Configuration)' == 'Release'" /> + <PackageReference Include="ArcaneLibs" Version="1.0.0-preview.20250419-174711" Condition="'$(Configuration)' == 'Release'" /> <ProjectReference Include="..\ArcaneLibs\ArcaneLibs\ArcaneLibs.csproj" Condition="'$(Configuration)' == 'Debug'"/> </ItemGroup> diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index e8ca8b7..862c8ad 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,5 +1,6 @@ using System.Diagnostics; using System.Net.Http.Json; +using System.Reflection; using System.Text.Json; using ArcaneLibs.Collections; using System.Text.Json.Nodes; @@ -14,6 +15,8 @@ using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { + private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver).EmulateMsc4222; + private SyncFilter? _filter; private string? _namedFilterName; private bool _filterIsDirty; @@ -24,10 +27,27 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? SetPresence { get; set; } = "online"; public bool UseInternalStreamingSync { get; set; } = true; + public bool UseMsc4222StateAfter { + get; + set { + field = value; + if (value) { + AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor); + Console.WriteLine($"Added MSC4222 emulation sync processor"); + } + else { + AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor); + Console.WriteLine($"Removed MSC4222 emulation sync processor"); + } + } + } = false; + public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [ SimpleSyncProcessors.FillRoomIds ]; + public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = []; + public string? FilterId { get => _filterId; set { @@ -96,7 +116,21 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } - if (storageProvider is null) return await SyncAsyncInternal(cancellationToken, noDelay); + if (storageProvider is null) { + var res = await SyncAsyncInternal(cancellationToken, noDelay); + if (res is null) return null; + if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + res = preprocessor(res); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + res = await preprocessor(res); + } + + return res; + } var key = Since ?? "init"; if (await storageProvider.ObjectExistsAsync(key)) { @@ -109,13 +143,20 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg } var sync = await SyncAsyncInternal(cancellationToken, noDelay); + if (sync is null) return null; // Ditto here. - if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server; foreach (var preprocessor in SyncPreprocessors) { sync = preprocessor(sync); } + foreach (var preprocessor in AsyncSyncPreprocessors) { + sync = await preprocessor(sync); + } + return sync; } @@ -126,6 +167,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}"; if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}"; if (_filterId is not null) url += $"&filter={_filterId}"; + if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility // logger?.LogInformation("SyncHelper: Calling: {}", url); diff --git a/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs new file mode 100644
index 0000000..b18a8e0 --- /dev/null +++ b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
@@ -0,0 +1,125 @@ +using System.Diagnostics; +using ArcaneLibs.Extensions; +using LibMatrix.Homeservers; +using LibMatrix.Responses; + +namespace LibMatrix.Helpers.SyncProcessors; + +public class Msc4222EmulationSyncProcessor(AuthenticatedHomeserverGeneric homeserver) { + private static bool StateEventsMatch(StateEventResponse a, StateEventResponse b) { + return a.Type == b.Type && a.StateKey == b.StateKey && a.OriginServerTs < b.OriginServerTs; + } + + public async Task<SyncResponse?> EmulateMsc4222(SyncResponse? resp) { + var sw = Stopwatch.StartNew(); + if (resp is null or { Rooms: null }) return resp; + + if ( + resp.Rooms.Join?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true + || resp.Rooms.Leave?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true + ) { + Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 determined that no emulation is needed in {sw.Elapsed}"); + return resp; + } + + var modified = false; + List<Task<bool>> tasks = []; + if (resp.Rooms is { Join.Count: > 0 }) { + tasks.AddRange(resp.Rooms.Join.Select(ProcessJoinedRooms)); + } + + if (resp.Rooms is { Leave.Count: > 0 }) { + tasks.AddRange(resp.Rooms.Leave.Select(ProcessLeftRooms)); + } + + var tasksEnum = tasks.ToAsyncEnumerable(); + await foreach (var wasModified in tasksEnum) { + if (wasModified) { + modified = true; + } + } + + Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 processed {resp.Rooms?.Join?.Count}/{resp.Rooms?.Leave?.Count} rooms in {sw.Elapsed}"); + if (modified) + resp.Msc4222Method = SyncResponse.Msc4222SyncType.Emulated; + + return resp; + } + + private async Task<bool> ProcessJoinedRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure> roomData) { + var (roomId, data) = roomData; + + if (data.StateAfter is { Events.Count: > 0 }) { + return false; + } + + data.StateAfter = new() { }; + + return false; + } + + private async Task<bool> ProcessLeftRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure> roomData) { + var (roomId, data) = roomData; + var room = homeserver.GetRoom(roomId); + + if (data.StateAfter is { Events.Count: > 0 }) { + return false; + } + + data.StateAfter = new() { + Events = [] + }; + + try { + data.StateAfter.Events = await room.GetFullStateAsListAsync(); + return true; + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get full state for room {roomId}, state may be incomplete!\n{e}"); + } + + var oldState = new List<StateEventResponse>(); + if (data.State is { Events.Count: > 0 }) { + oldState.ReplaceBy(data.State.Events, StateEventsMatch); + } + + if (data.Timeline is { Limited: true }) { + if (data.Timeline.Events != null) + oldState.ReplaceBy(data.Timeline.Events, StateEventsMatch); + + try { + var timeline = await homeserver.GetRoom(roomId).GetMessagesAsync(limit: 250); + if (timeline is { State.Count: > 0 }) { + oldState.ReplaceBy(timeline.State, StateEventsMatch); + } + + if (timeline is { Chunk.Count: > 0 }) { + oldState.ReplaceBy(timeline.Chunk.Where(x => x.StateKey != null), StateEventsMatch); + } + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get timeline for room {roomId}, state may be incomplete!\n{e}"); + } + } + + oldState = oldState.DistinctBy(x => (x.Type, x.StateKey)).ToList(); + + var tasks = oldState + .Select(async oldEvt => { + try { + return await room.GetStateEventAsync(oldEvt.Type, oldEvt.StateKey!); + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get state event {oldEvt.Type}/{oldEvt.StateKey} for room {roomId}, state may be incomplete!\n{e}"); + return oldEvt; + } + }); + + var tasksEnum = tasks.ToAsyncEnumerable(); + await foreach (var evt in tasksEnum) { + data.StateAfter.Events.Add(evt); + } + + return true; + } +} \ No newline at end of file diff --git a/LibMatrix/Responses/SyncResponse.cs b/LibMatrix/Responses/SyncResponse.cs
index 977de3e..a7aebda 100644 --- a/LibMatrix/Responses/SyncResponse.cs +++ b/LibMatrix/Responses/SyncResponse.cs
@@ -1,6 +1,4 @@ using System.Text.Json.Serialization; -using LibMatrix.EventTypes.Spec.Ephemeral; -using LibMatrix.EventTypes.Spec.State; using LibMatrix.EventTypes.Spec.State.RoomInfo; namespace LibMatrix.Responses; @@ -30,6 +28,9 @@ public class SyncResponse { [JsonPropertyName("device_lists")] public DeviceListsDataStructure? DeviceLists { get; set; } + + [JsonPropertyName("gay.rory.libmatrix.msc4222_sync_type")] + public Msc4222SyncType Msc4222Method { get; set; } = Msc4222SyncType.None; public class DeviceListsDataStructure { [JsonPropertyName("changed")] @@ -64,6 +65,10 @@ public class SyncResponse { [JsonPropertyName("state")] public EventList? State { get; set; } + + [JsonPropertyName("state_after")] + public EventList? StateAfter { get; set; } + public override string ToString() { var lastEvent = Timeline?.Events?.LastOrDefault(x => x.Type == "m.room.member"); @@ -78,6 +83,9 @@ public class SyncResponse { [JsonPropertyName("state")] public EventList? State { get; set; } + + [JsonPropertyName("state_after")] + public EventList? StateAfter { get; set; } [JsonPropertyName("account_data")] public EventList? AccountData { get; set; } @@ -145,4 +153,11 @@ public class SyncResponse { Rooms?.Leave?.Values?.Max(x => x.Timeline?.Events?.Max(y => y.OriginServerTs)) ?? 0 ]).Max(); } + + [JsonConverter(typeof(JsonStringEnumConverter))] + public enum Msc4222SyncType { + None, + Server, + Emulated + } } diff --git a/Utilities/LibMatrix.DevTestBot/LibMatrix.DevTestBot.csproj b/Utilities/LibMatrix.DevTestBot/LibMatrix.DevTestBot.csproj
index 2a0c329..acff3e2 100644 --- a/Utilities/LibMatrix.DevTestBot/LibMatrix.DevTestBot.csproj +++ b/Utilities/LibMatrix.DevTestBot/LibMatrix.DevTestBot.csproj
@@ -18,7 +18,7 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="ArcaneLibs.StringNormalisation" Version="1.0.0-preview.20250313-104848" Condition="'$(Configuration)' == 'Release'" /> + <PackageReference Include="ArcaneLibs.StringNormalisation" Version="1.0.0-preview.20250419-174711" Condition="'$(Configuration)' == 'Release'" /> <ProjectReference Include="..\..\ArcaneLibs\ArcaneLibs.StringNormalisation\ArcaneLibs.StringNormalisation.csproj" Condition="'$(Configuration)' == 'Debug'"/> <ProjectReference Include="..\..\LibMatrix\LibMatrix.csproj"/> <ProjectReference Include="..\LibMatrix.Utilities.Bot\LibMatrix.Utilities.Bot.csproj" />