about summary refs log tree commit diff
path: root/LibMatrix/Helpers/SyncHelper.cs
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2025-03-09 18:34:53 +0100
committerRory& <root@rory.gay>2025-03-09 18:34:53 +0100
commit9659093fcd9745f7030418998ca1cf886ff820b3 (patch)
treee7223357f7deb85738c195fa5bbca30d9e4cc548 /LibMatrix/Helpers/SyncHelper.cs
parentWell known resolver work, synapse admin work (diff)
parentFix merge conflicts (diff)
downloadLibMatrix-9659093fcd9745f7030418998ca1cf886ff820b3.tar.xz
Merge remote-tracking branch 'origin/dev/moderationclient-changes'
Diffstat (limited to 'LibMatrix/Helpers/SyncHelper.cs')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs64
1 files changed, 53 insertions, 11 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs

index 05bfb47..adcc714 100644 --- a/LibMatrix/Helpers/SyncHelper.cs +++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,15 +1,18 @@ using System.Diagnostics; +using System.Net.Http.Json; using System.Text.Json; using ArcaneLibs.Collections; +using System.Text.Json.Nodes; using ArcaneLibs.Extensions; using LibMatrix.Filters; using LibMatrix.Homeservers; +using LibMatrix.Interfaces.Services; using LibMatrix.Responses; using Microsoft.Extensions.Logging; namespace LibMatrix.Helpers; -public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) { +public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) { private SyncFilter? _filter; private string? _namedFilterName; private bool _filterIsDirty; @@ -18,6 +21,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public string? Since { get; set; } public int Timeout { get; set; } = 30000; public string? SetPresence { get; set; } = "online"; + public bool UseInternalStreamingSync { get; set; } = true; public string? FilterId { get => _filterId; @@ -53,6 +57,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg public TimeSpan MinimumDelay { get; set; } = new(0); + public async Task<int> GetUnoptimisedStoreCount() { + if (storageProvider is null) return -1; + var keys = await storageProvider.GetAllKeysAsync(); + return keys.Count(x => !x.StartsWith("old/")) - 1; + } + private async Task UpdateFilterAsync() { if (!string.IsNullOrWhiteSpace(NamedFilterName)) { _filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName); @@ -76,6 +86,25 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!"); } + if (storageProvider is null) return await SyncAsyncInternal(cancellationToken); + + var key = Since ?? "init"; + if (await storageProvider.ObjectExistsAsync(key)) { + var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key); + // We explicitly check that NextBatch doesn't match since to prevent infinite loops... + if (cached is not null && cached.NextBatch != Since) { + logger?.LogInformation("SyncHelper: Using cached sync response for {}", key); + return cached; + } + } + + var sync = await SyncAsyncInternal(cancellationToken); + // Ditto here. + if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync); + return sync; + } + + private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null) { var sw = Stopwatch.StartNew(); if (_filterIsDirty) await UpdateFilterAsync(); @@ -86,15 +115,23 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg // logger?.LogInformation("SyncHelper: Calling: {}", url); try { - var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); - if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); - logger?.LogTrace("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); - var deserializeSw = Stopwatch.StartNew(); - var stream = await httpResp.Content.ReadAsStreamAsync(); - await using var seekableStream = new SeekableStream(stream); - var resp = await JsonSerializer.DeserializeAsync<SyncResponse>(seekableStream, cancellationToken: cancellationToken ?? CancellationToken.None, - jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); - logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", seekableStream.Position, deserializeSw.Elapsed, sw.Elapsed); + SyncResponse? resp = null; + if (UseInternalStreamingSync) { + resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None); + logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed); + } + else { + var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None); + if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request"); + logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed); + var deserializeSw = Stopwatch.StartNew(); + // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None); + // var resp = jsonResp.Deserialize<SyncResponse>(); + resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None, + jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse); + logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed); + } + var timeToWait = MinimumDelay.Subtract(sw.Elapsed); if (timeToWait.TotalMilliseconds > 0) await Task.Delay(timeToWait); @@ -214,4 +251,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg /// Event fired when an account data event is received /// </summary> public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new(); -} \ No newline at end of file + + private void Log(string message) { + if (logger is null) Console.WriteLine(message); + else logger.LogInformation(message); + } +}