about summary refs log tree commit diff
path: root/LibMatrix/Helpers
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2025-05-07 20:58:12 +0200
committerRory& <root@rory.gay>2025-05-07 20:58:12 +0200
commita0fc4d145c77cc14af09cbbe285a0835e842728a (patch)
tree6958d610b44d11f90ba31059eb35ae8447828054 /LibMatrix/Helpers
parentSync preprocessor support (diff)
downloadLibMatrix-a0fc4d145c77cc14af09cbbe285a0835e842728a.tar.xz
MSC4222 emulation for left rooms
Diffstat (limited to 'LibMatrix/Helpers')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs46
-rw-r--r--LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs125
2 files changed, 169 insertions, 2 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs

index e8ca8b7..862c8ad 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,5 +1,6 @@ using System.Diagnostics; using System.Net.Http.Json; +using System.Reflection; using System.Text.Json; using ArcaneLibs.Collections; using System.Text.Json.Nodes; @@ -14,6 +15,8 @@ using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { + private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver).EmulateMsc4222; + private SyncFilter? _filter; private string? _namedFilterName; private bool _filterIsDirty; @@ -24,10 +27,27 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? SetPresence { get; set; } = "online"; public bool UseInternalStreamingSync { get; set; } = true; + public bool UseMsc4222StateAfter { + get; + set { + field = value; + if (value) { + AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor); + Console.WriteLine($"Added MSC4222 emulation sync processor"); + } + else { + AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor); + Console.WriteLine($"Removed MSC4222 emulation sync processor"); + } + } + } = false; + public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [ SimpleSyncProcessors.FillRoomIds ]; + public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = []; + public string? FilterId { get => _filterId; set { @@ -96,7 +116,21 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } - if (storageProvider is null) return await SyncAsyncInternal(cancellationToken, noDelay); + if (storageProvider is null) { + var res = await SyncAsyncInternal(cancellationToken, noDelay); + if (res is null) return null; + if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + res = preprocessor(res); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + res = await preprocessor(res); + } + + return res; + } var key = Since ?? "init"; if (await storageProvider.ObjectExistsAsync(key)) { @@ -109,13 +143,20 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg } var sync = await SyncAsyncInternal(cancellationToken, noDelay); + if (sync is null) return null; // Ditto here. - if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server; foreach (var preprocessor in SyncPreprocessors) { sync = preprocessor(sync); } + foreach (var preprocessor in AsyncSyncPreprocessors) { + sync = await preprocessor(sync); + } + return sync; } @@ -126,6 +167,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}"; if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}"; if (_filterId is not null) url += $"&filter={_filterId}"; + if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility // logger?.LogInformation("SyncHelper: Calling: {}", url); diff --git a/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs new file mode 100644
index 0000000..b18a8e0 --- /dev/null +++ b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
@@ -0,0 +1,125 @@ +using System.Diagnostics; +using ArcaneLibs.Extensions; +using LibMatrix.Homeservers; +using LibMatrix.Responses; + +namespace LibMatrix.Helpers.SyncProcessors; + +public class Msc4222EmulationSyncProcessor(AuthenticatedHomeserverGeneric homeserver) { + private static bool StateEventsMatch(StateEventResponse a, StateEventResponse b) { + return a.Type == b.Type && a.StateKey == b.StateKey && a.OriginServerTs < b.OriginServerTs; + } + + public async Task<SyncResponse?> EmulateMsc4222(SyncResponse? resp) { + var sw = Stopwatch.StartNew(); + if (resp is null or { Rooms: null }) return resp; + + if ( + resp.Rooms.Join?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true + || resp.Rooms.Leave?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true + ) { + Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 determined that no emulation is needed in {sw.Elapsed}"); + return resp; + } + + var modified = false; + List<Task<bool>> tasks = []; + if (resp.Rooms is { Join.Count: > 0 }) { + tasks.AddRange(resp.Rooms.Join.Select(ProcessJoinedRooms)); + } + + if (resp.Rooms is { Leave.Count: > 0 }) { + tasks.AddRange(resp.Rooms.Leave.Select(ProcessLeftRooms)); + } + + var tasksEnum = tasks.ToAsyncEnumerable(); + await foreach (var wasModified in tasksEnum) { + if (wasModified) { + modified = true; + } + } + + Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 processed {resp.Rooms?.Join?.Count}/{resp.Rooms?.Leave?.Count} rooms in {sw.Elapsed}"); + if (modified) + resp.Msc4222Method = SyncResponse.Msc4222SyncType.Emulated; + + return resp; + } + + private async Task<bool> ProcessJoinedRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure> roomData) { + var (roomId, data) = roomData; + + if (data.StateAfter is { Events.Count: > 0 }) { + return false; + } + + data.StateAfter = new() { }; + + return false; + } + + private async Task<bool> ProcessLeftRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure> roomData) { + var (roomId, data) = roomData; + var room = homeserver.GetRoom(roomId); + + if (data.StateAfter is { Events.Count: > 0 }) { + return false; + } + + data.StateAfter = new() { + Events = [] + }; + + try { + data.StateAfter.Events = await room.GetFullStateAsListAsync(); + return true; + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get full state for room {roomId}, state may be incomplete!\n{e}"); + } + + var oldState = new List<StateEventResponse>(); + if (data.State is { Events.Count: > 0 }) { + oldState.ReplaceBy(data.State.Events, StateEventsMatch); + } + + if (data.Timeline is { Limited: true }) { + if (data.Timeline.Events != null) + oldState.ReplaceBy(data.Timeline.Events, StateEventsMatch); + + try { + var timeline = await homeserver.GetRoom(roomId).GetMessagesAsync(limit: 250); + if (timeline is { State.Count: > 0 }) { + oldState.ReplaceBy(timeline.State, StateEventsMatch); + } + + if (timeline is { Chunk.Count: > 0 }) { + oldState.ReplaceBy(timeline.Chunk.Where(x => x.StateKey != null), StateEventsMatch); + } + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get timeline for room {roomId}, state may be incomplete!\n{e}"); + } + } + + oldState = oldState.DistinctBy(x => (x.Type, x.StateKey)).ToList(); + + var tasks = oldState + .Select(async oldEvt => { + try { + return await room.GetStateEventAsync(oldEvt.Type, oldEvt.StateKey!); + } + catch (Exception e) { + Console.WriteLine($"Msc4222Emulation: Failed to get state event {oldEvt.Type}/{oldEvt.StateKey} for room {roomId}, state may be incomplete!\n{e}"); + return oldEvt; + } + }); + + var tasksEnum = tasks.ToAsyncEnumerable(); + await foreach (var evt in tasksEnum) { + data.StateAfter.Events.Add(evt); + } + + return true; + } +} \ No newline at end of file