about summary refs log tree commit diff
path: root/LibMatrix/Helpers/SyncHelper.cs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs160
1 files changed, 140 insertions, 20 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs

index 1833bd0..c8e2928 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,22 +1,56 @@ using System.Diagnostics; using System.Net.Http.Json; +using System.Reflection; +using System.Text.Json; +using ArcaneLibs.Collections; +using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; +using LibMatrix.Helpers.SyncProcessors; using LibMatrix.Homeservers; +using LibMatrix.Interfaces.Services; using LibMatrix.Responses; using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; -public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) { +public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { + private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver, logger).EmulateMsc4222; + private SyncFilter? _filter; private string? _namedFilterName; - private bool _filterIsDirty = false; - private string? _filterId = null; + private bool _filterIsDirty; + private string? _filterId; public string? Since { get; set; } public int Timeout { get; set; } = 30000; - public string? SetPresence { get; set; } = "online"; + public string? SetPresence { get; set; } + + /// <summary> + /// Disabling this uses a technically slower code path, useful for checking whether delay comes from waiting for server or deserialising responses + /// </summary> + public bool UseInternalStreamingSync { get; set; } = true; + + public bool UseMsc4222StateAfter { + get; + set { + field = value; + if (value) { + AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor); + logger?.LogInformation($"Added MSC4222 emulation sync processor"); + } + else { + AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor); + logger?.LogInformation($"Removed MSC4222 emulation sync processor"); + } + } + } = false; + + public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [ + SimpleSyncProcessors.FillRoomIds + ]; + + public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = []; public string? FilterId { get => _filterId; @@ -42,16 +76,26 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg _filter = value; _filterIsDirty = true; _filterId = null; + _namedFilterName = null; } } + /// <summary> + /// Always include all rooms, and their full state according to passed filter + /// </summary> public bool FullState { get; set; } public bool IsInitialSync { get; set; } = true; public TimeSpan MinimumDelay { get; set; } = new(0); - private async Task updateFilterAsync() { + public async Task<int> GetUnoptimisedStoreCount() { + if (storageProvider is null) return -1; + var keys = await storageProvider.GetAllKeysAsync(); + return keys.Count(static x => !x.StartsWith("old/")) - 1; + } + + private async Task UpdateFilterAsync() { if (!string.IsNullOrWhiteSpace(NamedFilterName)) { _filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName); if (_filterId is null) @@ -61,9 +105,11 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg else if (Filter is not null) _filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId; else _filterId = null; + + _filterIsDirty = false; } - public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null) { + public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null, bool noDelay = false) { if (homeserver is null) { Console.WriteLine("Null passed as homeserver for SyncHelper!"); throw new ArgumentNullException(nameof(homeserver), "Null passed as homeserver for SyncHelper!"); @@ -74,26 +120,86 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } + if (storageProvider is null) { + var res = await SyncAsyncInternal(cancellationToken, noDelay); + if (res is null) return null; + if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + res = preprocessor(res); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + res = await preprocessor(res); + } + + return res; + } + + var key = Since ?? "init"; + if (await storageProvider.ObjectExistsAsync(key)) { + var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key); + // We explicitly check that NextBatch doesn't match since to prevent infinite loops... + if (cached is not null && cached.NextBatch != Since) { + logger?.LogInformation("SyncHelper: Using cached sync response for {}", key); + return cached; + } + } + + var sync = await SyncAsyncInternal(cancellationToken, noDelay); + if (sync is null) return null; + // Ditto here. + if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + + if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server; + + foreach (var preprocessor in SyncPreprocessors) { + sync = preprocessor(sync); + } + + foreach (var preprocessor in AsyncSyncPreprocessors) { + sync = await preprocessor(sync); + } + + return sync; + } + + private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null, bool noDelay = false) { var sw = Stopwatch.StartNew(); - if (_filterIsDirty) await updateFilterAsync(); + if (_filterIsDirty) await UpdateFilterAsync(); - var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}"; + var url = $"/_matrix/client/v3/sync?timeout={Timeout}"; + if (!string.IsNullOrWhiteSpace(SetPresence)) url += $"&set_presence={SetPresence}"; if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}"; if (_filterId is not null) url += $"&filter={_filterId}"; + if (FullState) url += "&full_state=true"; + if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility - logger?.LogInformation("SyncHelper: Calling: {}", url); + // logger?.LogInformation("SyncHelper: Calling: {}", url); try { - var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); - if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); - logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.Content.Headers.ContentLength ?? -1, sw.Elapsed); - var deserializeSw = Stopwatch.StartNew(); - var resp = await httpResp.Content.ReadFromJsonAsync<SyncResponse>(cancellationToken: cancellationToken ?? CancellationToken.None, - jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); - logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.Content.Headers.ContentLength ?? -1, deserializeSw.Elapsed, sw.Elapsed); + SyncResponse? resp; + if (UseInternalStreamingSync) { + resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None); + logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed); + } + else { + var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); + if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); + var receivedTime = sw.Elapsed; + var deserializeSw = Stopwatch.StartNew(); + resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, + jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); + logger?.LogInformation("Deserialized sync response: {} bytes, {} response time, {} deserialize time, {} total", httpResp.GetContentLength(), receivedTime, + deserializeSw.Elapsed, sw.Elapsed); + } + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); - if (timeToWait.TotalMilliseconds > 0) + if (!noDelay && timeToWait.TotalMilliseconds > 0) { + logger?.LogWarning("SyncAsyncInternal: Waiting {delay}", timeToWait); await Task.Delay(timeToWait); + } + return resp; } catch (TaskCanceledException) { @@ -103,6 +209,8 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg catch (Exception e) { Console.WriteLine(e); logger?.LogError(e, "Failed to sync!\n{}", e.ToString()); + await Task.WhenAll(ExceptionHandlers.Select(x => x.Invoke(e)).ToList()); + if (e is MatrixException { ErrorCode: MatrixException.ErrorCodes.M_UNKNOWN_TOKEN }) throw; } return null; @@ -110,10 +218,17 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public async IAsyncEnumerable<SyncResponse> EnumerateSyncAsync(CancellationToken? cancellationToken = null) { while (!cancellationToken?.IsCancellationRequested ?? true) { - var sync = await SyncAsync(cancellationToken); + var sw = Stopwatch.StartNew(); + var sync = await SyncAsync(cancellationToken, noDelay: true); if (sync is null) continue; if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch; yield return sync; + + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); + if (timeToWait.TotalMilliseconds > 0) { + logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait); + await Task.Delay(timeToWait); + } } } @@ -183,7 +298,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg if (syncResponse.Rooms is { Join.Count: > 0 }) foreach (var updatedRoom in syncResponse.Rooms.Join) { if (updatedRoom.Value.Timeline is null) continue; - foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events) { + foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events ?? []) { stateEventResponse.RoomId = updatedRoom.Key; var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList(); await Task.WhenAll(tasks); @@ -210,7 +325,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg /// Event fired when an account data event is received /// </summary> public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new(); - + + /// <summary> + /// Event fired when an exception is thrown + /// </summary> + public List<Func<Exception, Task>> ExceptionHandlers { get; } = new(); + private void Log(string message) { if (logger is null) Console.WriteLine(message); else logger.LogInformation(message);