1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
using System.Diagnostics;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null) {
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
public SyncFilter? Filter { get; set; }
public bool FullState { get; set; } = false;
public bool IsInitialSync { get; set; } = true;
public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null) {
var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (Filter is not null) url += $"&filter={Filter.ToJson(ignoreNull: true, indent: false)}";
// Console.WriteLine("Calling: " + url);
logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
return await homeserver._httpClient.GetFromJsonAsync<SyncResponse>(url, cancellationToken: cancellationToken ?? CancellationToken.None);
}
catch (TaskCanceledException) {
Console.WriteLine("Sync cancelled!");
logger?.LogWarning("Sync cancelled due to TaskCanceledException!");
}
catch (Exception e) {
Console.WriteLine(e);
logger?.LogError(e, "Failed to sync!\n{}", e.ToString());
}
return null;
}
public async IAsyncEnumerable<SyncResponse> EnumerateSyncAsync(CancellationToken? cancellationToken = null) {
while (!cancellationToken?.IsCancellationRequested ?? true) {
var sync = await SyncAsync(cancellationToken);
if (sync is null) continue;
Since = string.IsNullOrWhiteSpace(sync?.NextBatch) ? Since : sync.NextBatch;
yield return sync;
}
}
public async Task RunSyncLoopAsync(bool skipInitialSyncEvents = true, CancellationToken? cancellationToken = null) {
var sw = Stopwatch.StartNew();
int emptyInitialSyncCount = 0;
var oldTimeout = Timeout;
Timeout = 0;
await foreach (var sync in EnumerateSyncAsync(cancellationToken)) {
logger?.LogInformation("Got sync response: {} bytes, {} elapsed", sync?.ToJson(ignoreNull: true, indent: false).Length ?? -1, sw.Elapsed);
if (sync?.ToJson(ignoreNull: true, indent: false).Length < 250) {
emptyInitialSyncCount++;
if (emptyInitialSyncCount > 5) {
IsInitialSync = false;
Timeout = oldTimeout;
}
}
await RunSyncLoopCallbacksAsync(sync, IsInitialSync && skipInitialSyncEvents);
}
}
private async Task RunSyncLoopCallbacksAsync(SyncResponse syncResponse, bool isInitialSync) {
var tasks = SyncReceivedHandlers.Select(x => x(syncResponse)).ToList();
await Task.WhenAll(tasks);
if (syncResponse.AccountData is { Events: { Count: > 0 } }) {
foreach (var accountDataEvent in syncResponse.AccountData.Events) {
tasks = AccountDataReceivedHandlers.Select(x => x(accountDataEvent)).ToList();
await Task.WhenAll(tasks);
}
}
await RunSyncLoopRoomCallbacksAsync(syncResponse, isInitialSync);
}
private async Task RunSyncLoopRoomCallbacksAsync(SyncResponse syncResponse, bool isInitialSync) {
if (syncResponse.Rooms is { Invite.Count: > 0 }) {
foreach (var roomInvite in syncResponse.Rooms.Invite) {
var tasks = InviteReceivedHandlers.Select(x => x(roomInvite)).ToList();
await Task.WhenAll(tasks);
}
}
if (isInitialSync) return;
if (syncResponse.Rooms is { Join.Count: > 0 }) {
foreach (var updatedRoom in syncResponse.Rooms.Join) {
if (updatedRoom.Value.Timeline is null) continue;
foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events) {
stateEventResponse.RoomId = updatedRoom.Key;
var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList();
await Task.WhenAll(tasks);
}
}
}
}
/// <summary>
/// Event fired when a sync response is received
/// </summary>
public List<Func<SyncResponse, Task>> SyncReceivedHandlers { get; } = new();
/// <summary>
/// Event fired when a room invite is received
/// </summary>
public List<Func<KeyValuePair<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>, Task>> InviteReceivedHandlers { get; } = new();
/// <summary>
/// Event fired when a timeline event is received
/// </summary>
public List<Func<StateEventResponse, Task>> TimelineEventHandlers { get; } = new();
/// <summary>
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
}
|