summary refs log tree commit diff
path: root/MxApiExtensions/Controllers/Client/SyncController.cs
diff options
context:
space:
mode:
Diffstat (limited to 'MxApiExtensions/Controllers/Client/SyncController.cs')
-rw-r--r--MxApiExtensions/Controllers/Client/SyncController.cs243
1 files changed, 243 insertions, 0 deletions
diff --git a/MxApiExtensions/Controllers/Client/SyncController.cs b/MxApiExtensions/Controllers/Client/SyncController.cs
new file mode 100644

index 0000000..2944c3b --- /dev/null +++ b/MxApiExtensions/Controllers/Client/SyncController.cs
@@ -0,0 +1,243 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; +using System.Web; +using ArcaneLibs; +using LibMatrix; +using LibMatrix.EventTypes.Spec.State; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.Responses; +using LibMatrix.RoomTypes; +using Microsoft.AspNetCore.Mvc; +using MxApiExtensions.Classes; +using MxApiExtensions.Classes.LibMatrix; +using MxApiExtensions.Extensions; +using MxApiExtensions.Services; + +namespace MxApiExtensions.Controllers; + +[ApiController] +[Route("/")] +public class SyncController(ILogger<SyncController> logger, MxApiExtensionsConfiguration config, AuthenticationService auth, AuthenticatedHomeserverProviderService hsProvider) + : ControllerBase { + public static readonly ConcurrentDictionary<string, SyncState> _syncStates = new(); + + private static SemaphoreSlim _semaphoreSlim = new(1, 1); + private Stopwatch _syncElapsed = Stopwatch.StartNew(); + + [HttpGet("/_matrix/client/{_}/sync")] + public async Task Sync(string _, [FromQuery] string? since, [FromQuery] int timeout = 1000) { + Task? preloadTask = null; + AuthenticatedHomeserverGeneric? hs = null; + try { + hs = await hsProvider.GetHomeserver(); + } + catch (Exception e) { + Console.WriteLine(e); + } + + var qs = HttpUtility.ParseQueryString(Request.QueryString.Value!); + qs.Remove("access_token"); + if (since == "null") qs.Remove("since"); + + if (!config.FastInitialSync.Enabled) { + logger.LogInformation("Starting sync for {} on {} ({})", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken); + var result = await hs.ClientHttpClient.GetAsync($"{Request.Path}?{qs}"); + await Response.WriteHttpResponse(result); + return; + } + + await _semaphoreSlim.WaitAsync(); + var syncState = _syncStates.GetOrAdd($"{hs.WhoAmI.UserId}/{hs.WhoAmI.DeviceId}/{hs.ServerName}:{hs.AccessToken}", _ => { + logger.LogInformation("Started tracking sync state for {} on {} ({})", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken); + var ss = new SyncState { + IsInitialSync = string.IsNullOrWhiteSpace(since), + Homeserver = hs + }; + if (ss.IsInitialSync) { + preloadTask = EnqueuePreloadData(ss); + } + + logger.LogInformation("Starting sync for {} on {} ({})", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken); + + ss.NextSyncResponseStartedAt = DateTime.Now; + ss.NextSyncResponse = Task.Delay(15_000); + ss.NextSyncResponse.ContinueWith(async x => { + logger.LogInformation("Sync for {} on {} ({}) starting", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken); + ss.NextSyncResponse = hs.ClientHttpClient.GetAsync($"/_matrix/client/v3/sync?{qs}"); + (ss.NextSyncResponse as Task<HttpResponseMessage>).ContinueWith(async x => EnqueueSyncResponse(ss, await x)); + }); + return ss; + }); + _semaphoreSlim.Release(); + + if (syncState.SyncQueue.Count > 0) { + logger.LogInformation("Sync for {} on {} ({}) has {} queued results", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken, syncState.SyncQueue.Count); + syncState.SyncQueue.TryDequeue(out var result); + while (result is null) + syncState.SyncQueue.TryDequeue(out result); + Response.StatusCode = StatusCodes.Status200OK; + Response.ContentType = "application/json"; + await Response.StartAsync(); + result.NextBatch ??= since ?? syncState.NextBatch; + await JsonSerializer.SerializeAsync(Response.Body, result, new JsonSerializerOptions { + WriteIndented = true, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }); + await Response.CompleteAsync(); + return; + } + + var newTimeout = Math.Clamp(timeout, 0, syncState.IsInitialSync ? syncState.SyncQueue.Count >= 2 ? 0 : 250 : timeout); + logger.LogInformation("Sync for {} on {} ({}) is still running, waiting for {}ms, {} elapsed", hs.WhoAmI.UserId, hs.ServerName, hs.AccessToken, newTimeout, + DateTime.Now.Subtract(syncState.NextSyncResponseStartedAt)); + + try { + if (syncState.NextSyncResponse is not null) + await syncState.NextSyncResponse.WaitAsync(TimeSpan.FromMilliseconds(newTimeout)); + else { + syncState.NextSyncResponse = hs.ClientHttpClient.GetAsync($"/_matrix/client/v3/sync?{qs}"); + (syncState.NextSyncResponse as Task<HttpResponseMessage>).ContinueWith(async x => EnqueueSyncResponse(syncState, await x)); + // await Task.Delay(250); + } + } + catch (TimeoutException) { } + + // if (_syncElapsed.ElapsedMilliseconds > timeout) + if(syncState.NextSyncResponse?.IsCompleted == false) + syncState.SendStatusMessage( + $"M={Util.BytesToString(Process.GetCurrentProcess().WorkingSet64)} TE={DateTime.Now.Subtract(syncState.NextSyncResponseStartedAt)} S={syncState.NextSyncResponse?.Status} QL={syncState.SyncQueue.Count}"); + Response.StatusCode = StatusCodes.Status200OK; + Response.ContentType = "application/json"; + await Response.StartAsync(); + var response = syncState.SyncQueue.FirstOrDefault(); + if (response is null) + response = new(); + response.NextBatch ??= since ?? syncState.NextBatch; + await JsonSerializer.SerializeAsync(Response.Body, response, new JsonSerializerOptions { + WriteIndented = true, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }); + await Response.CompleteAsync(); + + Response.Body.Close(); + if (preloadTask is not null) + await preloadTask; + } + + private async Task EnqueuePreloadData(SyncState syncState) { + var rooms = await syncState.Homeserver.GetJoinedRooms(); + var dm_rooms = (await syncState.Homeserver.GetAccountDataAsync<Dictionary<string, List<string>>>("m.direct")).Aggregate(new List<string>(), (list, entry) => { + list.AddRange(entry.Value); + return list; + }); + + var ownHs = syncState.Homeserver.WhoAmI.UserId.Split(':')[1]; + rooms = rooms.OrderBy(x => { + if (dm_rooms.Contains(x.RoomId)) return -1; + var parts = x.RoomId.Split(':'); + if (parts[1] == ownHs) return 200; + if (HomeserverWeightEstimation.EstimatedSize.ContainsKey(parts[1])) return HomeserverWeightEstimation.EstimatedSize[parts[1]] + parts[0].Length; + return 5000; + }).ToList(); + var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList(); + logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.ServerName, syncState.Homeserver.AccessToken); + + await Task.WhenAll(roomDataTasks); + } + + private SemaphoreSlim _roomDataSemaphore = new(32, 32); + + private async Task EnqueueRoomData(SyncState syncState, GenericRoom room) { + await _roomDataSemaphore.WaitAsync(); + var roomState = room.GetFullStateAsync(); + var timeline = await room.GetMessagesAsync(limit: 100, dir: "b"); + timeline.Chunk.Reverse(); + var SyncResponse = new SyncResponse { + Rooms = new() { + Join = new() { + { + room.RoomId, + new SyncResponse.RoomsDataStructure.JoinedRoomDataStructure { + AccountData = new() { + Events = new() + }, + Ephemeral = new() { + Events = new() + }, + State = new() { + Events = timeline.State + }, + UnreadNotifications = new() { + HighlightCount = 0, + NotificationCount = 0 + }, + Timeline = new() { + Events = timeline.Chunk, + Limited = false, + PrevBatch = timeline.Start + }, + Summary = new() { + Heroes = new(), + InvitedMemberCount = 0, + JoinedMemberCount = 1 + } + } + } + } + }, + Presence = new() { + Events = new() { + await GetStatusMessage(syncState, $"{DateTime.Now.Subtract(syncState.NextSyncResponseStartedAt)} {syncState.NextSyncResponse.Status} {room.RoomId}") + } + }, + NextBatch = "" + }; + + await foreach (var stateEvent in roomState) { + SyncResponse.Rooms.Join[room.RoomId].State.Events.Add(stateEvent); + } + + var joinRoom = SyncResponse.Rooms.Join[room.RoomId]; + joinRoom.Summary.Heroes.AddRange(joinRoom.State.Events + .Where(x => + x.Type == "m.room.member" + && x.StateKey != syncState.Homeserver.WhoAmI.UserId + && (x.TypedContent as RoomMemberEventContent).Membership == "join" + ) + .Select(x => x.StateKey)); + joinRoom.Summary.JoinedMemberCount = joinRoom.Summary.Heroes.Count; + + syncState.SyncQueue.Enqueue(SyncResponse); + _roomDataSemaphore.Release(); + } + + private async Task<StateEventResponse> GetStatusMessage(SyncState syncState, string message) { + return new StateEventResponse { + TypedContent = new PresenceEventContent { + DisplayName = "MxApiExtensions", + Presence = "online", + StatusMessage = message, + // AvatarUrl = (await syncState.Homeserver.GetProfile(syncState.Homeserver.WhoAmI.UserId)).AvatarUrl + AvatarUrl = "" + }, + Type = "m.presence", + StateKey = syncState.Homeserver.WhoAmI.UserId, + Sender = syncState.Homeserver.WhoAmI.UserId, + EventId = Guid.NewGuid().ToString(), + OriginServerTs = 0 + }; + } + + private async Task EnqueueSyncResponse(SyncState ss, HttpResponseMessage task) { + var sr = await task.Content.ReadFromJsonAsync<JsonObject>(); + if (sr.ContainsKey("error")) throw sr.Deserialize<MatrixException>()!; + ss.NextBatch = sr["next_batch"].GetValue<string>(); + ss.IsInitialSync = false; + ss.SyncQueue.Enqueue(sr.Deserialize<SyncResponse>()); + ss.NextSyncResponse = null; + } +} \ No newline at end of file