diff --git a/MxApiExtensions/Controllers/Client/SyncController.cs b/MxApiExtensions/Controllers/Client/SyncController.cs
index 7f9ed1d..615502b 100644
--- a/MxApiExtensions/Controllers/Client/SyncController.cs
+++ b/MxApiExtensions/Controllers/Client/SyncController.cs
@@ -28,12 +28,14 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
private UserContextService.UserContext userContext;
private Stopwatch _syncElapsed = Stopwatch.StartNew();
private static SemaphoreSlim _semaphoreSlim = new(1, 1);
+ public static List<Task> TrackedTasks { get; set; } = new();
[HttpGet("/_matrix/client/{_}/sync")]
public async Task Sync(string _, [FromQuery] string? since, [FromQuery] int timeout = 1000) {
// temporary variables
bool startedNewTask = false;
Task? preloadTask = null;
+ TrackedTasks.RemoveAll(x => x.Status == TaskStatus.RanToCompletion);
// get user context based on authentication
userContext = await userContextService.GetCurrentUserContext();
@@ -50,12 +52,12 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
//prevent duplicate initialisation
await _semaphoreSlim.WaitAsync();
-
+
//if we don't have a sync state for this user...
if (userContext.SyncState is null) {
logger.LogInformation("Started tracking sync state for {} on {} ({})", userContext.Homeserver.WhoAmI.UserId, userContext.Homeserver.ServerName,
userContext.Homeserver.AccessToken);
-
+
//create a new sync state
userContext.SyncState = new SyncState {
Homeserver = userContext.Homeserver,
@@ -68,7 +70,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
})
};
startedNewTask = true;
-
+
//if this is an initial sync, and the user has enabled this, preload data
if (string.IsNullOrWhiteSpace(since) && userContext.UserConfiguration.InitialSyncPreload.Enable) {
logger.LogInformation("Sync data preload for {} on {} ({}) starting", userContext.Homeserver.WhoAmI.UserId, userContext.Homeserver.ServerName,
@@ -102,11 +104,13 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
//await scope-local tasks in order to prevent disposal
if (preloadTask is not null) {
+ TrackedTasks.Add(preloadTask);
await preloadTask;
preloadTask.Dispose();
}
if (startedNewTask && userContext.SyncState?.NextSyncResponse is not null) {
+ TrackedTasks.Add(userContext.SyncState.NextSyncResponse);
var resp = await userContext.SyncState.NextSyncResponse;
var sr = await resp.Content.ReadFromJsonAsync<JsonObject>();
if (sr!.ContainsKey("error")) throw sr.Deserialize<MatrixException>()!;
@@ -123,7 +127,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
do {
if (userContext.SyncState is null) throw new NullReferenceException("syncState is null!");
// if (userContext.SyncState.NextSyncResponse is null) throw new NullReferenceException("NextSyncResponse is null");
-
+
//check if upstream has responded, if so, return upstream response
// if (userContext.SyncState.NextSyncResponse is { IsCompleted: true } syncResponse) {
// var resp = await syncResponse;
@@ -146,7 +150,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
}
// await Task.Delay(Math.Clamp(timeout, 25, 250)); //wait 25-250ms between checks
- await Task.Delay(Math.Clamp(userContextService.SessionCount * 10 ,25, 500));
+ await Task.Delay(Math.Clamp(userContextService.SessionCount * 10, 25, 500));
} while (_syncElapsed.ElapsedMilliseconds < timeout + 500); //... while we haven't gone >500ms over expected timeout
//we didn't get a response, send a bogus response
@@ -155,10 +159,11 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
new());
}
-
- private async Task EnqueuePreloadData(SyncState syncState) {
- await EnqueuePreloadAccountData(syncState);
- await EnqueuePreloadRooms(syncState);
+ private static async Task EnqueuePreloadData(SyncState syncState) {
+ new Thread(async () => {
+ await EnqueuePreloadAccountData(syncState);
+ await EnqueuePreloadRooms(syncState);
+ }).Start();
}
private static List<string> CommonAccountDataKeys = new() {
@@ -179,8 +184,9 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
"m.secret_storage.default_key",
"gay.rory.mxapiextensions.userconfig"
};
+
//enqueue common account data
- private async Task EnqueuePreloadAccountData(SyncState syncState) {
+ private static async Task EnqueuePreloadAccountData(SyncState syncState) {
var syncMsg = new SyncResponse() {
AccountData = new() {
Events = new()
@@ -193,22 +199,23 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
RawContent = await syncState.Homeserver.GetAccountDataAsync<JsonObject>(key)
});
}
- catch {}
+ catch { }
}
+
syncState.SyncQueue.Enqueue(syncMsg);
}
- private async Task EnqueuePreloadRooms(SyncState syncState) {
+ private static async Task EnqueuePreloadRooms(SyncState syncState) {
//get the users's rooms
var rooms = await syncState.Homeserver.GetJoinedRooms();
-
+
//get the user's DM rooms
var mDirectContent = await syncState.Homeserver.GetAccountDataAsync<Dictionary<string, List<string>>>("m.direct");
var dmRooms = mDirectContent.SelectMany(pair => pair.Value);
//get our own homeserver's server_name
var ownHs = syncState.Homeserver.WhoAmI!.UserId!.Split(':')[1];
-
+
//order rooms by expected state size, since large rooms take a long time to return
rooms = rooms.OrderBy(x => {
if (dmRooms.Contains(x.RoomId)) return -1;
@@ -217,20 +224,25 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
if (HomeserverWeightEstimation.EstimatedSize.ContainsKey(parts[1])) return HomeserverWeightEstimation.EstimatedSize[parts[1]] + parts[0].Length;
return 5000;
}).ToList();
-
+
+ foreach (var room in rooms) {
+ new Thread(async () => await EnqueueRoomData(syncState, room)).Start();
+ }
+
//start all fetch tasks
- var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList();
- logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.ServerName, syncState.Homeserver.AccessToken);
+ // var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList();
+ // logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.ServerName, syncState.Homeserver.AccessToken);
//wait for all of them to finish
- await Task.WhenAll(roomDataTasks);
+ // TrackedTasks.AddRange(roomDataTasks);
+ // await Task.WhenAll(roomDataTasks);
}
private static readonly SemaphoreSlim _roomDataSemaphore = new(4, 4);
- private async Task EnqueueRoomData(SyncState syncState, GenericRoom room) {
+ private static async Task EnqueueRoomData(SyncState syncState, GenericRoom room) {
//limit concurrent requests, to not overload upstream
- await _roomDataSemaphore.WaitAsync();
+ // await _roomDataSemaphore.WaitAsync();
//get the room's state
var roomState = room.GetFullStateAsync();
//get the room's timeline, reversed
@@ -277,6 +289,6 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi
//finally, actually put the response in queue
syncState.SyncQueue.Enqueue(syncResponse);
- _roomDataSemaphore.Release();
+ // _roomDataSemaphore.Release();
}
}
\ No newline at end of file
|