diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index e8ca8b7..862c8ad 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Net.Http.Json;
+using System.Reflection;
using System.Text.Json;
using ArcaneLibs.Collections;
using System.Text.Json.Nodes;
@@ -14,6 +15,8 @@ using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
+ private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver).EmulateMsc4222;
+
private SyncFilter? _filter;
private string? _namedFilterName;
private bool _filterIsDirty;
@@ -24,10 +27,27 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public string? SetPresence { get; set; } = "online";
public bool UseInternalStreamingSync { get; set; } = true;
+ public bool UseMsc4222StateAfter {
+ get;
+ set {
+ field = value;
+ if (value) {
+ AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor);
+ Console.WriteLine($"Added MSC4222 emulation sync processor");
+ }
+ else {
+ AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor);
+ Console.WriteLine($"Removed MSC4222 emulation sync processor");
+ }
+ }
+ } = false;
+
public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [
SimpleSyncProcessors.FillRoomIds
];
+ public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = [];
+
public string? FilterId {
get => _filterId;
set {
@@ -96,7 +116,21 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
- if (storageProvider is null) return await SyncAsyncInternal(cancellationToken, noDelay);
+ if (storageProvider is null) {
+ var res = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (res is null) return null;
+ if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ res = preprocessor(res);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ res = await preprocessor(res);
+ }
+
+ return res;
+ }
var key = Since ?? "init";
if (await storageProvider.ObjectExistsAsync(key)) {
@@ -109,13 +143,20 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
}
var sync = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (sync is null) return null;
// Ditto here.
- if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+ if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+
+ if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
foreach (var preprocessor in SyncPreprocessors) {
sync = preprocessor(sync);
}
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ sync = await preprocessor(sync);
+ }
+
return sync;
}
@@ -126,6 +167,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (_filterId is not null) url += $"&filter={_filterId}";
+ if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility
// logger?.LogInformation("SyncHelper: Calling: {}", url);
diff --git a/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
new file mode 100644
index 0000000..b18a8e0
--- /dev/null
+++ b/LibMatrix/Helpers/SyncProcessors/Msc4222EmulationSyncProcessor.cs
@@ -0,0 +1,125 @@
+using System.Diagnostics;
+using ArcaneLibs.Extensions;
+using LibMatrix.Homeservers;
+using LibMatrix.Responses;
+
+namespace LibMatrix.Helpers.SyncProcessors;
+
+public class Msc4222EmulationSyncProcessor(AuthenticatedHomeserverGeneric homeserver) {
+ private static bool StateEventsMatch(StateEventResponse a, StateEventResponse b) {
+ return a.Type == b.Type && a.StateKey == b.StateKey && a.OriginServerTs < b.OriginServerTs;
+ }
+
+ public async Task<SyncResponse?> EmulateMsc4222(SyncResponse? resp) {
+ var sw = Stopwatch.StartNew();
+ if (resp is null or { Rooms: null }) return resp;
+
+ if (
+ resp.Rooms.Join?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true
+ || resp.Rooms.Leave?.Any(x => x.Value.StateAfter is { Events.Count: > 0 }) == true
+ ) {
+ Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 determined that no emulation is needed in {sw.Elapsed}");
+ return resp;
+ }
+
+ var modified = false;
+ List<Task<bool>> tasks = [];
+ if (resp.Rooms is { Join.Count: > 0 }) {
+ tasks.AddRange(resp.Rooms.Join.Select(ProcessJoinedRooms));
+ }
+
+ if (resp.Rooms is { Leave.Count: > 0 }) {
+ tasks.AddRange(resp.Rooms.Leave.Select(ProcessLeftRooms));
+ }
+
+ var tasksEnum = tasks.ToAsyncEnumerable();
+ await foreach (var wasModified in tasksEnum) {
+ if (wasModified) {
+ modified = true;
+ }
+ }
+
+ Console.WriteLine($"Msc4222EmulationSyncProcessor.EmulateMsc4222 processed {resp.Rooms?.Join?.Count}/{resp.Rooms?.Leave?.Count} rooms in {sw.Elapsed}");
+ if (modified)
+ resp.Msc4222Method = SyncResponse.Msc4222SyncType.Emulated;
+
+ return resp;
+ }
+
+ private async Task<bool> ProcessJoinedRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.JoinedRoomDataStructure> roomData) {
+ var (roomId, data) = roomData;
+
+ if (data.StateAfter is { Events.Count: > 0 }) {
+ return false;
+ }
+
+ data.StateAfter = new() { };
+
+ return false;
+ }
+
+ private async Task<bool> ProcessLeftRooms(KeyValuePair<string, SyncResponse.RoomsDataStructure.LeftRoomDataStructure> roomData) {
+ var (roomId, data) = roomData;
+ var room = homeserver.GetRoom(roomId);
+
+ if (data.StateAfter is { Events.Count: > 0 }) {
+ return false;
+ }
+
+ data.StateAfter = new() {
+ Events = []
+ };
+
+ try {
+ data.StateAfter.Events = await room.GetFullStateAsListAsync();
+ return true;
+ }
+ catch (Exception e) {
+ Console.WriteLine($"Msc4222Emulation: Failed to get full state for room {roomId}, state may be incomplete!\n{e}");
+ }
+
+ var oldState = new List<StateEventResponse>();
+ if (data.State is { Events.Count: > 0 }) {
+ oldState.ReplaceBy(data.State.Events, StateEventsMatch);
+ }
+
+ if (data.Timeline is { Limited: true }) {
+ if (data.Timeline.Events != null)
+ oldState.ReplaceBy(data.Timeline.Events, StateEventsMatch);
+
+ try {
+ var timeline = await homeserver.GetRoom(roomId).GetMessagesAsync(limit: 250);
+ if (timeline is { State.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.State, StateEventsMatch);
+ }
+
+ if (timeline is { Chunk.Count: > 0 }) {
+ oldState.ReplaceBy(timeline.Chunk.Where(x => x.StateKey != null), StateEventsMatch);
+ }
+ }
+ catch (Exception e) {
+ Console.WriteLine($"Msc4222Emulation: Failed to get timeline for room {roomId}, state may be incomplete!\n{e}");
+ }
+ }
+
+ oldState = oldState.DistinctBy(x => (x.Type, x.StateKey)).ToList();
+
+ var tasks = oldState
+ .Select(async oldEvt => {
+ try {
+ return await room.GetStateEventAsync(oldEvt.Type, oldEvt.StateKey!);
+ }
+ catch (Exception e) {
+ Console.WriteLine($"Msc4222Emulation: Failed to get state event {oldEvt.Type}/{oldEvt.StateKey} for room {roomId}, state may be incomplete!\n{e}");
+ return oldEvt;
+ }
+ });
+
+ var tasksEnum = tasks.ToAsyncEnumerable();
+ await foreach (var evt in tasksEnum) {
+ data.StateAfter.Events.Add(evt);
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
|