diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 1833bd0..c8e2928 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,22 +1,56 @@
using System.Diagnostics;
using System.Net.Http.Json;
+using System.Reflection;
+using System.Text.Json;
+using ArcaneLibs.Collections;
+using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
+using LibMatrix.Helpers.SyncProcessors;
using LibMatrix.Homeservers;
+using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
-public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
+public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
+ private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver, logger).EmulateMsc4222;
+
private SyncFilter? _filter;
private string? _namedFilterName;
- private bool _filterIsDirty = false;
- private string? _filterId = null;
+ private bool _filterIsDirty;
+ private string? _filterId;
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
- public string? SetPresence { get; set; } = "online";
+ public string? SetPresence { get; set; }
+
+ /// <summary>
+ /// Disabling this uses a technically slower code path, useful for checking whether delay comes from waiting for server or deserialising responses
+ /// </summary>
+ public bool UseInternalStreamingSync { get; set; } = true;
+
+ public bool UseMsc4222StateAfter {
+ get;
+ set {
+ field = value;
+ if (value) {
+ AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Added MSC4222 emulation sync processor");
+ }
+ else {
+ AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Removed MSC4222 emulation sync processor");
+ }
+ }
+ } = false;
+
+ public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [
+ SimpleSyncProcessors.FillRoomIds
+ ];
+
+ public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = [];
public string? FilterId {
get => _filterId;
@@ -42,16 +76,26 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
_filter = value;
_filterIsDirty = true;
_filterId = null;
+ _namedFilterName = null;
}
}
+ /// <summary>
+ /// Always include all rooms, and their full state according to passed filter
+ /// </summary>
public bool FullState { get; set; }
public bool IsInitialSync { get; set; } = true;
public TimeSpan MinimumDelay { get; set; } = new(0);
- private async Task updateFilterAsync() {
+ public async Task<int> GetUnoptimisedStoreCount() {
+ if (storageProvider is null) return -1;
+ var keys = await storageProvider.GetAllKeysAsync();
+ return keys.Count(static x => !x.StartsWith("old/")) - 1;
+ }
+
+ private async Task UpdateFilterAsync() {
if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
_filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
if (_filterId is null)
@@ -61,9 +105,11 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
else if (Filter is not null)
_filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId;
else _filterId = null;
+
+ _filterIsDirty = false;
}
- public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null) {
+ public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null, bool noDelay = false) {
if (homeserver is null) {
Console.WriteLine("Null passed as homeserver for SyncHelper!");
throw new ArgumentNullException(nameof(homeserver), "Null passed as homeserver for SyncHelper!");
@@ -74,26 +120,86 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
+ if (storageProvider is null) {
+ var res = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (res is null) return null;
+ if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ res = preprocessor(res);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ res = await preprocessor(res);
+ }
+
+ return res;
+ }
+
+ var key = Since ?? "init";
+ if (await storageProvider.ObjectExistsAsync(key)) {
+ var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key);
+ // We explicitly check that NextBatch doesn't match since to prevent infinite loops...
+ if (cached is not null && cached.NextBatch != Since) {
+ logger?.LogInformation("SyncHelper: Using cached sync response for {}", key);
+ return cached;
+ }
+ }
+
+ var sync = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (sync is null) return null;
+ // Ditto here.
+ if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+
+ if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ sync = preprocessor(sync);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ sync = await preprocessor(sync);
+ }
+
+ return sync;
+ }
+
+ private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null, bool noDelay = false) {
var sw = Stopwatch.StartNew();
- if (_filterIsDirty) await updateFilterAsync();
+ if (_filterIsDirty) await UpdateFilterAsync();
- var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
+ var url = $"/_matrix/client/v3/sync?timeout={Timeout}";
+ if (!string.IsNullOrWhiteSpace(SetPresence)) url += $"&set_presence={SetPresence}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (_filterId is not null) url += $"&filter={_filterId}";
+ if (FullState) url += "&full_state=true";
+ if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility
- logger?.LogInformation("SyncHelper: Calling: {}", url);
+ // logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
- var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
- if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.Content.Headers.ContentLength ?? -1, sw.Elapsed);
- var deserializeSw = Stopwatch.StartNew();
- var resp = await httpResp.Content.ReadFromJsonAsync<SyncResponse>(cancellationToken: cancellationToken ?? CancellationToken.None,
- jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.Content.Headers.ContentLength ?? -1, deserializeSw.Elapsed, sw.Elapsed);
+ SyncResponse? resp;
+ if (UseInternalStreamingSync) {
+ resp = await homeserver.ClientHttpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
+ logger?.LogInformation("Got sync response: ~{} bytes, {} elapsed", resp.ToJson(false, true, true).Length, sw.Elapsed);
+ }
+ else {
+ var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
+ if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
+ var receivedTime = sw.Elapsed;
+ var deserializeSw = Stopwatch.StartNew();
+ resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
+ jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} response time, {} deserialize time, {} total", httpResp.GetContentLength(), receivedTime,
+ deserializeSw.Elapsed, sw.Elapsed);
+ }
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
- if (timeToWait.TotalMilliseconds > 0)
+ if (!noDelay && timeToWait.TotalMilliseconds > 0) {
+ logger?.LogWarning("SyncAsyncInternal: Waiting {delay}", timeToWait);
await Task.Delay(timeToWait);
+ }
+
return resp;
}
catch (TaskCanceledException) {
@@ -103,6 +209,8 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
catch (Exception e) {
Console.WriteLine(e);
logger?.LogError(e, "Failed to sync!\n{}", e.ToString());
+ await Task.WhenAll(ExceptionHandlers.Select(x => x.Invoke(e)).ToList());
+ if (e is MatrixException { ErrorCode: MatrixException.ErrorCodes.M_UNKNOWN_TOKEN }) throw;
}
return null;
@@ -110,10 +218,17 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public async IAsyncEnumerable<SyncResponse> EnumerateSyncAsync(CancellationToken? cancellationToken = null) {
while (!cancellationToken?.IsCancellationRequested ?? true) {
- var sync = await SyncAsync(cancellationToken);
+ var sw = Stopwatch.StartNew();
+ var sync = await SyncAsync(cancellationToken, noDelay: true);
if (sync is null) continue;
if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch;
yield return sync;
+
+ var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
+ if (timeToWait.TotalMilliseconds > 0) {
+ logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait);
+ await Task.Delay(timeToWait);
+ }
}
}
@@ -183,7 +298,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
if (syncResponse.Rooms is { Join.Count: > 0 })
foreach (var updatedRoom in syncResponse.Rooms.Join) {
if (updatedRoom.Value.Timeline is null) continue;
- foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events) {
+ foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events ?? []) {
stateEventResponse.RoomId = updatedRoom.Key;
var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList();
await Task.WhenAll(tasks);
@@ -210,7 +325,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
-
+
+ /// <summary>
+ /// Event fired when an exception is thrown
+ /// </summary>
+ public List<Func<Exception, Task>> ExceptionHandlers { get; } = new();
+
private void Log(string message) {
if (logger is null) Console.WriteLine(message);
else logger.LogInformation(message);
|