about summary refs log tree commit diff
path: root/LibMatrix/Helpers/SyncHelper.cs
diff options
context:
space:
mode:
Diffstat (limited to 'LibMatrix/Helpers/SyncHelper.cs')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs91
1 files changed, 74 insertions, 17 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs

index abbf541..ebe653c 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,10 +1,12 @@ using System.Diagnostics; using System.Net.Http.Json; +using System.Reflection; using System.Text.Json; using ArcaneLibs.Collections; using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; +using LibMatrix.Helpers.SyncProcessors; using LibMatrix.Homeservers; using LibMatrix.Interfaces.Services; using LibMatrix.Responses; @@ -13,6 +15,8 @@ using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { + private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver, logger).EmulateMsc4222; + private SyncFilter? _filter; private string? _namedFilterName; private bool _filterIsDirty; @@ -20,9 +24,34 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? Since { get; set; } public int Timeout { get; set; } = 30000; - public string? SetPresence { get; set; } = "online"; + public string? SetPresence { get; set; } + + /// <summary> + /// Disabling this uses a technically slower code path, useful for checking whether delay comes from waiting for server or deserialising responses + /// </summary> public bool UseInternalStreamingSync { get; set; } = true; + public bool UseMsc4222StateAfter { + get; + set { + field = value; + if (value) { + AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor); + logger?.LogInformation($"Added MSC4222 emulation sync processor"); + } + else { + AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor); + logger?.LogInformation($"Removed MSC4222 emulation sync processor"); + } + } + } = false; + + public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [ + SimpleSyncProcessors.FillRoomIds + ]; + + public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = []; + public string? FilterId { get => _filterId; set { @@ -76,7 +105,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg else if (Filter is not null) _filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId; else _filterId = null; - + _filterIsDirty = false; } @@ -91,7 +120,21 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } - if (storageProvider is null) return await SyncAsyncInternal(cancellationToken, noDelay); + if (storageProvider is null) { + var res = await SyncAsyncInternal(cancellationToken, noDelay); + if (res is null) return null; + if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + res = preprocessor(res); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + res = await preprocessor(res); + } + + return res; + } var key = Since ?? "init"; if (await storageProvider.ObjectExistsAsync(key)) { @@ -104,8 +147,20 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg } var sync = await SyncAsyncInternal(cancellationToken, noDelay); + if (sync is null) return null; // Ditto here. - if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + sync = preprocessor(sync); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + sync = await preprocessor(sync); + } + return sync; } @@ -113,9 +168,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg var sw = Stopwatch.StartNew(); if (_filterIsDirty) await UpdateFilterAsync(); - var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}"; + var url = $"/_matrix/client/v3/sync?timeout={Timeout}"; + if (!string.IsNullOrWhiteSpace(SetPresence)) url += $"&set_presence={SetPresence}"; if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}"; if (_filterId is not null) url += $"&filter={_filterId}"; + if (FullState) url += "&full_state=true"; + if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility // logger?.LogInformation("SyncHelper: Calling: {}", url); @@ -128,13 +186,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg else { var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); - logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); + var receivedTime = sw.Elapsed; var deserializeSw = Stopwatch.StartNew(); - // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None); - // var resp = jsonResp.Deserialize<SyncResponse>(); resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); - logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed); + logger?.LogInformation("Deserialized sync response: {} bytes, {} response time, {} deserialize time, {} total", httpResp.GetContentLength(), receivedTime, + deserializeSw.Elapsed, sw.Elapsed); } var timeToWait = MinimumDelay.Subtract(sw.Elapsed); @@ -153,6 +210,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg Console.WriteLine(e); logger?.LogError(e, "Failed to sync!\n{}", e.ToString()); await Task.WhenAll(ExceptionHandlers.Select(x => x.Invoke(e)).ToList()); + if (e is MatrixException { ErrorCode: MatrixException.ErrorCodes.M_UNKNOWN_TOKEN }) throw; } return null; @@ -165,11 +223,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg if (sync is null) continue; if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch; yield return sync; - - + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) { - logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); + logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); await Task.Delay(timeToWait); } } @@ -241,9 +298,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg if (syncResponse.Rooms is { Join.Count: > 0 }) foreach (var updatedRoom in syncResponse.Rooms.Join) { if (updatedRoom.Value.Timeline is null) continue; - foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events ?? []) { - stateEventResponse.RoomId = updatedRoom.Key; - var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList(); + foreach (var MatrixEventResponse in updatedRoom.Value.Timeline.Events ?? []) { + MatrixEventResponse.RoomId = updatedRoom.Key; + var tasks = TimelineEventHandlers.Select(x => x(MatrixEventResponse)).ToList(); await Task.WhenAll(tasks); } } @@ -262,12 +319,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg /// <summary> /// Event fired when a timeline event is received /// </summary> - public List<Func<StateEventResponse, Task>> TimelineEventHandlers { get; } = new(); + public List<Func<MatrixEventResponse, Task>> TimelineEventHandlers { get; } = new(); /// <summary> /// Event fired when an account data event is received /// </summary> - public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new(); + public List<Func<MatrixEventResponse, Task>> AccountDataReceivedHandlers { get; } = new(); /// <summary> /// Event fired when an exception is thrown