diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index abbf541..ebe653c 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -1,10 +1,12 @@
using System.Diagnostics;
using System.Net.Http.Json;
+using System.Reflection;
using System.Text.Json;
using ArcaneLibs.Collections;
using System.Text.Json.Nodes;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
+using LibMatrix.Helpers.SyncProcessors;
using LibMatrix.Homeservers;
using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
@@ -13,6 +15,8 @@ using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
+ private readonly Func<SyncResponse?, Task<SyncResponse?>> _msc4222EmulationSyncProcessor = new Msc4222EmulationSyncProcessor(homeserver, logger).EmulateMsc4222;
+
private SyncFilter? _filter;
private string? _namedFilterName;
private bool _filterIsDirty;
@@ -20,9 +24,34 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
- public string? SetPresence { get; set; } = "online";
+ public string? SetPresence { get; set; }
+
+ /// <summary>
+ /// Disabling this uses a technically slower code path, useful for checking whether delay comes from waiting for server or deserialising responses
+ /// </summary>
public bool UseInternalStreamingSync { get; set; } = true;
+ public bool UseMsc4222StateAfter {
+ get;
+ set {
+ field = value;
+ if (value) {
+ AsyncSyncPreprocessors.Add(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Added MSC4222 emulation sync processor");
+ }
+ else {
+ AsyncSyncPreprocessors.Remove(_msc4222EmulationSyncProcessor);
+ logger?.LogInformation($"Removed MSC4222 emulation sync processor");
+ }
+ }
+ } = false;
+
+ public List<Func<SyncResponse?, SyncResponse?>> SyncPreprocessors { get; } = [
+ SimpleSyncProcessors.FillRoomIds
+ ];
+
+ public List<Func<SyncResponse?, Task<SyncResponse?>>> AsyncSyncPreprocessors { get; } = [];
+
public string? FilterId {
get => _filterId;
set {
@@ -76,7 +105,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
else if (Filter is not null)
_filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId;
else _filterId = null;
-
+
_filterIsDirty = false;
}
@@ -91,7 +120,21 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
- if (storageProvider is null) return await SyncAsyncInternal(cancellationToken, noDelay);
+ if (storageProvider is null) {
+ var res = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (res is null) return null;
+ if (UseMsc4222StateAfter) res.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ res = preprocessor(res);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ res = await preprocessor(res);
+ }
+
+ return res;
+ }
var key = Since ?? "init";
if (await storageProvider.ObjectExistsAsync(key)) {
@@ -104,8 +147,20 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
}
var sync = await SyncAsyncInternal(cancellationToken, noDelay);
+ if (sync is null) return null;
// Ditto here.
- if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+ if (sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
+
+ if (UseMsc4222StateAfter) sync.Msc4222Method = SyncResponse.Msc4222SyncType.Server;
+
+ foreach (var preprocessor in SyncPreprocessors) {
+ sync = preprocessor(sync);
+ }
+
+ foreach (var preprocessor in AsyncSyncPreprocessors) {
+ sync = await preprocessor(sync);
+ }
+
return sync;
}
@@ -113,9 +168,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
var sw = Stopwatch.StartNew();
if (_filterIsDirty) await UpdateFilterAsync();
- var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
+ var url = $"/_matrix/client/v3/sync?timeout={Timeout}";
+ if (!string.IsNullOrWhiteSpace(SetPresence)) url += $"&set_presence={SetPresence}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (_filterId is not null) url += $"&filter={_filterId}";
+ if (FullState) url += "&full_state=true";
+ if (UseMsc4222StateAfter) url += "&org.matrix.msc4222.use_state_after=true&use_state_after=true"; // We use both unstable and stable names for compatibility
// logger?.LogInformation("SyncHelper: Calling: {}", url);
@@ -128,13 +186,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
else {
var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
- logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
+ var receivedTime = sw.Elapsed;
var deserializeSw = Stopwatch.StartNew();
- // var jsonResp = await httpResp.Content.ReadFromJsonAsync<JsonObject>(cancellationToken: cancellationToken ?? CancellationToken.None);
- // var resp = jsonResp.Deserialize<SyncResponse>();
resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
- logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
+ logger?.LogInformation("Deserialized sync response: {} bytes, {} response time, {} deserialize time, {} total", httpResp.GetContentLength(), receivedTime,
+ deserializeSw.Elapsed, sw.Elapsed);
}
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
@@ -153,6 +210,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
Console.WriteLine(e);
logger?.LogError(e, "Failed to sync!\n{}", e.ToString());
await Task.WhenAll(ExceptionHandlers.Select(x => x.Invoke(e)).ToList());
+ if (e is MatrixException { ErrorCode: MatrixException.ErrorCodes.M_UNKNOWN_TOKEN }) throw;
}
return null;
@@ -165,11 +223,10 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
if (sync is null) continue;
if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch;
yield return sync;
-
-
+
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
if (timeToWait.TotalMilliseconds > 0) {
- logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait);
+ logger?.LogWarning("EnumerateSyncAsync: Waiting {delay}", timeToWait);
await Task.Delay(timeToWait);
}
}
@@ -241,9 +298,9 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
if (syncResponse.Rooms is { Join.Count: > 0 })
foreach (var updatedRoom in syncResponse.Rooms.Join) {
if (updatedRoom.Value.Timeline is null) continue;
- foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events ?? []) {
- stateEventResponse.RoomId = updatedRoom.Key;
- var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList();
+ foreach (var MatrixEventResponse in updatedRoom.Value.Timeline.Events ?? []) {
+ MatrixEventResponse.RoomId = updatedRoom.Key;
+ var tasks = TimelineEventHandlers.Select(x => x(MatrixEventResponse)).ToList();
await Task.WhenAll(tasks);
}
}
@@ -262,12 +319,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
/// <summary>
/// Event fired when a timeline event is received
/// </summary>
- public List<Func<StateEventResponse, Task>> TimelineEventHandlers { get; } = new();
+ public List<Func<MatrixEventResponse, Task>> TimelineEventHandlers { get; } = new();
/// <summary>
/// Event fired when an account data event is received
/// </summary>
- public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
+ public List<Func<MatrixEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
/// <summary>
/// Event fired when an exception is thrown
|