1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
|
using System.Diagnostics;
using System.Net.Http.Json;
using ArcaneLibs.Extensions;
using LibMatrix.Filters;
using LibMatrix.Homeservers;
using LibMatrix.Interfaces.Services;
using LibMatrix.Responses;
using Microsoft.Extensions.Logging;
namespace LibMatrix.Helpers;
public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logger = null, IStorageProvider? storageProvider = null) {
private SyncFilter? _filter;
private string? _namedFilterName;
private bool _filterIsDirty = false;
private string? _filterId = null;
public string? Since { get; set; }
public int Timeout { get; set; } = 30000;
public string? SetPresence { get; set; } = "online";
public string? FilterId {
get => _filterId;
set {
_filterId = value;
_namedFilterName = null;
_filter = null;
}
}
public string? NamedFilterName {
get => _namedFilterName;
set {
_namedFilterName = value;
_filterIsDirty = true;
_filterId = null;
}
}
public SyncFilter? Filter {
get => _filter;
set {
_filter = value;
_filterIsDirty = true;
_filterId = null;
}
}
public bool FullState { get; set; }
public bool IsInitialSync { get; set; } = true;
public TimeSpan MinimumDelay { get; set; } = new(0);
public async Task<int> GetUnoptimisedStoreCount() {
if (storageProvider is null) return -1;
var keys = await storageProvider.GetAllKeysAsync();
return keys.Count(x => !x.StartsWith("old/")) - 1;
}
private async Task UpdateFilterAsync() {
if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
_filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
if (_filterId is null)
if (logger is null) Console.WriteLine($"Failed to get filter ID for named filter {NamedFilterName}");
else logger.LogWarning("Failed to get filter ID for named filter {NamedFilterName}", NamedFilterName);
}
else if (Filter is not null)
_filterId = (await homeserver.UploadFilterAsync(Filter)).FilterId;
else _filterId = null;
}
public async Task<SyncResponse?> SyncAsync(CancellationToken? cancellationToken = null) {
if (homeserver is null) {
Console.WriteLine("Null passed as homeserver for SyncHelper!");
throw new ArgumentNullException(nameof(homeserver), "Null passed as homeserver for SyncHelper!");
}
if (homeserver.ClientHttpClient is null) {
Console.WriteLine("Homeserver for SyncHelper is not properly configured!");
throw new ArgumentNullException(nameof(homeserver.ClientHttpClient), "Null passed as homeserver for SyncHelper!");
}
if (storageProvider is null) return await SyncAsyncInternal(cancellationToken);
var key = Since ?? "init";
if (await storageProvider.ObjectExistsAsync(key)) {
var cached = await storageProvider.LoadObjectAsync<SyncResponse>(key);
// We explicitly check that NextBatch doesn't match since to prevent infinite loops...
if (cached is not null && cached.NextBatch != Since) {
logger?.LogInformation("SyncHelper: Using cached sync response for {}", key);
return cached;
}
}
var sync = await SyncAsyncInternal(cancellationToken);
// Ditto here.
if (sync is not null && sync.NextBatch != Since) await storageProvider.SaveObjectAsync(key, sync);
return sync;
}
private async Task<SyncResponse?> SyncAsyncInternal(CancellationToken? cancellationToken = null) {
var sw = Stopwatch.StartNew();
if (_filterIsDirty) await UpdateFilterAsync();
var url = $"/_matrix/client/v3/sync?timeout={Timeout}&set_presence={SetPresence}&full_state={(FullState ? "true" : "false")}";
if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
if (_filterId is not null) url += $"&filter={_filterId}";
// logger?.LogInformation("SyncHelper: Calling: {}", url);
try {
var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
if (httpResp is null) throw new NullReferenceException("Failed to send HTTP request");
logger?.LogInformation("Got sync response: {} bytes, {} elapsed", httpResp.GetContentLength(), sw.Elapsed);
var deserializeSw = Stopwatch.StartNew();
var resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
if (timeToWait.TotalMilliseconds > 0)
await Task.Delay(timeToWait);
return resp;
}
catch (TaskCanceledException) {
Console.WriteLine("Sync cancelled!");
logger?.LogWarning("Sync cancelled due to TaskCanceledException!");
}
catch (Exception e) {
Console.WriteLine(e);
logger?.LogError(e, "Failed to sync!\n{}", e.ToString());
}
return null;
}
public async IAsyncEnumerable<SyncResponse> EnumerateSyncAsync(CancellationToken? cancellationToken = null) {
while (!cancellationToken?.IsCancellationRequested ?? true) {
var sync = await SyncAsync(cancellationToken);
if (sync is null) continue;
if (!string.IsNullOrWhiteSpace(sync.NextBatch)) Since = sync.NextBatch;
yield return sync;
}
}
public async Task RunSyncLoopAsync(bool skipInitialSyncEvents = true, CancellationToken? cancellationToken = null) {
// var sw = Stopwatch.StartNew();
var emptyInitialSyncCount = 0;
var syncCount = 0;
var oldTimeout = Timeout;
Timeout = 0;
await foreach (var sync in EnumerateSyncAsync(cancellationToken)) {
syncCount++;
if (sync is {
AccountData: null or {
Events: null or { Count: 0 }
},
Rooms: null or {
Invite: null or { Count: 0 },
Join: null or { Count: 0 },
Leave: null or { Count: 0 }
},
Presence: null or {
Events: null or { Count: 0 }
},
DeviceLists: null or {
Changed: null or { Count: 0 },
Left: null or { Count: 0 }
},
ToDevice: null or {
Events: null or { Count: 0 }
}
}) {
emptyInitialSyncCount++;
if (emptyInitialSyncCount >= 2) {
IsInitialSync = false;
Timeout = oldTimeout;
}
}
else if (syncCount > 15 && IsInitialSync)
Console.WriteLine(sync.ToJson(ignoreNull: true, indent: true));
await RunSyncLoopCallbacksAsync(sync, IsInitialSync && skipInitialSyncEvents);
}
}
private async Task RunSyncLoopCallbacksAsync(SyncResponse syncResponse, bool isInitialSync) {
var tasks = SyncReceivedHandlers.Select(x => x(syncResponse)).ToList();
await Task.WhenAll(tasks);
if (syncResponse.AccountData is { Events.Count: > 0 })
foreach (var accountDataEvent in syncResponse.AccountData.Events) {
tasks = AccountDataReceivedHandlers.Select(x => x(accountDataEvent)).ToList();
await Task.WhenAll(tasks);
}
await RunSyncLoopRoomCallbacksAsync(syncResponse, isInitialSync);
}
private async Task RunSyncLoopRoomCallbacksAsync(SyncResponse syncResponse, bool isInitialSync) {
if (syncResponse.Rooms is { Invite.Count: > 0 })
foreach (var roomInvite in syncResponse.Rooms.Invite) {
var tasks = InviteReceivedHandlers.Select(x => x(roomInvite)).ToList();
await Task.WhenAll(tasks);
}
if (isInitialSync) return;
if (syncResponse.Rooms is { Join.Count: > 0 })
foreach (var updatedRoom in syncResponse.Rooms.Join) {
if (updatedRoom.Value.Timeline is null) continue;
foreach (var stateEventResponse in updatedRoom.Value.Timeline.Events) {
stateEventResponse.RoomId = updatedRoom.Key;
var tasks = TimelineEventHandlers.Select(x => x(stateEventResponse)).ToList();
await Task.WhenAll(tasks);
}
}
}
/// <summary>
/// Event fired when a sync response is received
/// </summary>
public List<Func<SyncResponse, Task>> SyncReceivedHandlers { get; } = new();
/// <summary>
/// Event fired when a room invite is received
/// </summary>
public List<Func<KeyValuePair<string, SyncResponse.RoomsDataStructure.InvitedRoomDataStructure>, Task>> InviteReceivedHandlers { get; } = new();
/// <summary>
/// Event fired when a timeline event is received
/// </summary>
public List<Func<StateEventResponse, Task>> TimelineEventHandlers { get; } = new();
/// <summary>
/// Event fired when an account data event is received
/// </summary>
public List<Func<StateEventResponse, Task>> AccountDataReceivedHandlers { get; } = new();
private void Log(string message) {
if (logger is null) Console.WriteLine(message);
else logger.LogInformation(message);
}
}
|