diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 05bfb47..adcc714 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,15 +1,18 @@
using System.Diagnostics;
+using System.Net.Http.Json;
using System.Text.Json;
using ArcaneLibs.Collections;
+using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
private SyncFilter? _filter;
private string? _namedFilterName;
private bool _filterIsDirty;
@@ -18,6 +21,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
+ public bool UseInternalStreamingSync { get; set; } = true;
public string? FilterId {
get => _filterId;
@@ -53,6 +57,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public TimeSpan MinimumDelay { get; set; } = new(0);
+ public async Task<int> GetUnoptimisedStoreCount() {
+ if (storageProvider is null) return -1;
+ var keys = await storageProvider.GetAllKeysAsync();
+ return keys.Count(x => !x.StartsWith("old/")) - 1;
+ }
+
private async Task UpdateFilterAsync() {
if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
_filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
@@ -76,6 +86,25 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
+ if (storageProvider is null) return await SyncAsyncInternal(cancellationToken);
+
+ var key = Since ?? "init";
+ if (await storageProvider.ObjectExistsAsync(key)) {
+ var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key);
+ // We explicitly check that NextBatch doesn't match since to prevent infinite loops...
+ if (cached is not null && cached.NextBatch != Since) {
+ logger?.LogInformation("SyncHelper: Using cached sync response for {}", key);
+ return cached;
+ }
+ }
+
+ var sync = await SyncAsyncInternal(cancellationToken);
+ // Ditto here.
+ if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+ return sync;
+ }
+
+ private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null) {
var sw = Stopwatch.StartNew();
if (_filterIsDirty) await UpdateFilterAsync();
@@ -86,15 +115,23 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
// logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
- var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
- if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogTrace("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
- var deserializeSw = Stopwatch.StartNew();
- var stream = await httpResp.Content.ReadAsStreamAsync();
- await using var seekableStream = new SeekableStream(stream);
- var resp = await JsonSerializer.DeserializeAsync<SyncResponse>(seekableStream, cancellationToken: cancellationToken ?? CancellationToken.None,
- jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", seekableStream.Position, deserializeSw.Elapsed, sw.Elapsed);
+ SyncResponse? resp = null;
+ if (UseInternalStreamingSync) {
+ resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
+ logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed);
+ }
+ else {
+ var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
+ if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
+ logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
+ var deserializeSw = Stopwatch.StartNew();
+ // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None);
+ // var resp = jsonResp.Deserialize<SyncResponse>();
+ resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
+ jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
+ }
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
if (timeToWait.TotalMilliseconds > 0)
await Task.Delay(timeToWait);
@@ -214,4 +251,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
-}
\ No newline at end of file
+
+ private void Log(string message) {
+ if (logger is null) Console.WriteLine(message);
+ else logger.LogInformation(message);
+ }
+}
|