summary refs log tree commit diff
path: root/MxApiExtensions/Controllers/Client/SyncController.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MxApiExtensions/Controllers/Client/SyncController.cs')
-rw-r--r--MxApiExtensions/Controllers/Client/SyncController.cs54
1 files changed, 33 insertions, 21 deletions
diff --git a/MxApiExtensions/Controllers/Client/SyncController.cs b/MxApiExtensions/Controllers/Client/SyncController.cs

index 7f9ed1d..615502b 100644 --- a/MxApiExtensions/Controllers/Client/SyncController.cs +++ b/MxApiExtensions/Controllers/Client/SyncController.cs
@@ -28,12 +28,14 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi private UserContextService.UserContext userContext; private Stopwatch _syncElapsed = Stopwatch.StartNew(); private static SemaphoreSlim _semaphoreSlim = new(1, 1); + public static List<Task> TrackedTasks { get; set; } = new(); [HttpGet("/_matrix/client/{_}/sync")] public async Task Sync(string _, [FromQuery] string? since, [FromQuery] int timeout = 1000) { // temporary variables bool startedNewTask = false; Task? preloadTask = null; + TrackedTasks.RemoveAll(x => x.Status == TaskStatus.RanToCompletion); // get user context based on authentication userContext = await userContextService.GetCurrentUserContext(); @@ -50,12 +52,12 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi //prevent duplicate initialisation await _semaphoreSlim.WaitAsync(); - + //if we don't have a sync state for this user... if (userContext.SyncState is null) { logger.LogInformation("Started tracking sync state for {} on {} ({})", userContext.Homeserver.WhoAmI.UserId, userContext.Homeserver.ServerName, userContext.Homeserver.AccessToken); - + //create a new sync state userContext.SyncState = new SyncState { Homeserver = userContext.Homeserver, @@ -68,7 +70,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi }) }; startedNewTask = true; - + //if this is an initial sync, and the user has enabled this, preload data if (string.IsNullOrWhiteSpace(since) && userContext.UserConfiguration.InitialSyncPreload.Enable) { logger.LogInformation("Sync data preload for {} on {} ({}) starting", userContext.Homeserver.WhoAmI.UserId, userContext.Homeserver.ServerName, @@ -102,11 +104,13 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi //await scope-local tasks in order to prevent disposal if (preloadTask is not null) { + TrackedTasks.Add(preloadTask); await preloadTask; preloadTask.Dispose(); } if (startedNewTask && userContext.SyncState?.NextSyncResponse is not null) { + TrackedTasks.Add(userContext.SyncState.NextSyncResponse); var resp = await userContext.SyncState.NextSyncResponse; var sr = await resp.Content.ReadFromJsonAsync<JsonObject>(); if (sr!.ContainsKey("error")) throw sr.Deserialize<MatrixException>()!; @@ -123,7 +127,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi do { if (userContext.SyncState is null) throw new NullReferenceException("syncState is null!"); // if (userContext.SyncState.NextSyncResponse is null) throw new NullReferenceException("NextSyncResponse is null"); - + //check if upstream has responded, if so, return upstream response // if (userContext.SyncState.NextSyncResponse is { IsCompleted: true } syncResponse) { // var resp = await syncResponse; @@ -146,7 +150,7 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi } // await Task.Delay(Math.Clamp(timeout, 25, 250)); //wait 25-250ms between checks - await Task.Delay(Math.Clamp(userContextService.SessionCount * 10 ,25, 500)); + await Task.Delay(Math.Clamp(userContextService.SessionCount * 10, 25, 500)); } while (_syncElapsed.ElapsedMilliseconds < timeout + 500); //... while we haven't gone >500ms over expected timeout //we didn't get a response, send a bogus response @@ -155,10 +159,11 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi new()); } - - private async Task EnqueuePreloadData(SyncState syncState) { - await EnqueuePreloadAccountData(syncState); - await EnqueuePreloadRooms(syncState); + private static async Task EnqueuePreloadData(SyncState syncState) { + new Thread(async () => { + await EnqueuePreloadAccountData(syncState); + await EnqueuePreloadRooms(syncState); + }).Start(); } private static List<string> CommonAccountDataKeys = new() { @@ -179,8 +184,9 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi "m.secret_storage.default_key", "gay.rory.mxapiextensions.userconfig" }; + //enqueue common account data - private async Task EnqueuePreloadAccountData(SyncState syncState) { + private static async Task EnqueuePreloadAccountData(SyncState syncState) { var syncMsg = new SyncResponse() { AccountData = new() { Events = new() @@ -193,22 +199,23 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi RawContent = await syncState.Homeserver.GetAccountDataAsync<JsonObject>(key) }); } - catch {} + catch { } } + syncState.SyncQueue.Enqueue(syncMsg); } - private async Task EnqueuePreloadRooms(SyncState syncState) { + private static async Task EnqueuePreloadRooms(SyncState syncState) { //get the users's rooms var rooms = await syncState.Homeserver.GetJoinedRooms(); - + //get the user's DM rooms var mDirectContent = await syncState.Homeserver.GetAccountDataAsync<Dictionary<string, List<string>>>("m.direct"); var dmRooms = mDirectContent.SelectMany(pair => pair.Value); //get our own homeserver's server_name var ownHs = syncState.Homeserver.WhoAmI!.UserId!.Split(':')[1]; - + //order rooms by expected state size, since large rooms take a long time to return rooms = rooms.OrderBy(x => { if (dmRooms.Contains(x.RoomId)) return -1; @@ -217,20 +224,25 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi if (HomeserverWeightEstimation.EstimatedSize.ContainsKey(parts[1])) return HomeserverWeightEstimation.EstimatedSize[parts[1]] + parts[0].Length; return 5000; }).ToList(); - + + foreach (var room in rooms) { + new Thread(async () => await EnqueueRoomData(syncState, room)).Start(); + } + //start all fetch tasks - var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList(); - logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.ServerName, syncState.Homeserver.AccessToken); + // var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList(); + // logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.ServerName, syncState.Homeserver.AccessToken); //wait for all of them to finish - await Task.WhenAll(roomDataTasks); + // TrackedTasks.AddRange(roomDataTasks); + // await Task.WhenAll(roomDataTasks); } private static readonly SemaphoreSlim _roomDataSemaphore = new(4, 4); - private async Task EnqueueRoomData(SyncState syncState, GenericRoom room) { + private static async Task EnqueueRoomData(SyncState syncState, GenericRoom room) { //limit concurrent requests, to not overload upstream - await _roomDataSemaphore.WaitAsync(); + // await _roomDataSemaphore.WaitAsync(); //get the room's state var roomState = room.GetFullStateAsync(); //get the room's timeline, reversed @@ -277,6 +289,6 @@ public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfi //finally, actually put the response in queue syncState.SyncQueue.Enqueue(syncResponse); - _roomDataSemaphore.Release(); + // _roomDataSemaphore.Release(); } } \ No newline at end of file