summary refs log tree commit diff
path: root/MxApiExtensions/Controllers/SyncController.cs
diff options
context:
space:
mode:
authorTheArcaneBrony <myrainbowdash949@gmail.com>2023-09-04 02:18:47 +0200
committerTheArcaneBrony <myrainbowdash949@gmail.com>2023-09-04 02:18:47 +0200
commit7e40421d0eaee613be5b807502eb25fafebde5b1 (patch)
treeaf84e4c39fd0c3cfa05e1b66fb566d5cda59607f /MxApiExtensions/Controllers/SyncController.cs
parentCode cleanup (diff)
downloadMxApiExtensions-7e40421d0eaee613be5b807502eb25fafebde5b1.tar.xz
Added a lot of utilities
Diffstat (limited to 'MxApiExtensions/Controllers/SyncController.cs')
-rw-r--r--MxApiExtensions/Controllers/SyncController.cs280
1 files changed, 223 insertions, 57 deletions
diff --git a/MxApiExtensions/Controllers/SyncController.cs b/MxApiExtensions/Controllers/SyncController.cs

index d883377..382d670 100644 --- a/MxApiExtensions/Controllers/SyncController.cs +++ b/MxApiExtensions/Controllers/SyncController.cs
@@ -1,5 +1,19 @@ +using System.Collections.Concurrent; using System.Net.Http.Headers; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Web; +using LibMatrix; +using LibMatrix.Helpers; +using LibMatrix.Homeservers; +using LibMatrix.Responses; +using LibMatrix.RoomTypes; +using LibMatrix.StateEventTypes.Spec; using Microsoft.AspNetCore.Mvc; +using MxApiExtensions.Classes; +using MxApiExtensions.Extensions; +using MxApiExtensions.Services; namespace MxApiExtensions.Controllers; @@ -7,103 +21,255 @@ namespace MxApiExtensions.Controllers; [Route("/")] public class SyncController : ControllerBase { private readonly ILogger<SyncController> _logger; - private readonly CacheConfiguration _config; - private readonly Auth _auth; + private readonly MxApiExtensionsConfiguration _config; + private readonly AuthenticationService _auth; + private readonly AuthenticatedHomeserverProviderService _hs; - public SyncController(ILogger<SyncController> logger, CacheConfiguration config, Auth auth) { + private static readonly ConcurrentDictionary<string, SyncState> _syncStates = new(); + + public SyncController(ILogger<SyncController> logger, MxApiExtensionsConfiguration config, AuthenticationService auth, AuthenticatedHomeserverProviderService hs) { _logger = logger; _config = config; _auth = auth; + _hs = hs; } [HttpGet("/_matrix/client/v3/sync")] - public async Task Sync([FromQuery] string? since, [FromQuery] string? access_token) { + public async Task Sync([FromQuery] string? since, [FromQuery] int timeout = 1000) { + Task? preloadTask = null; + AuthenticatedHomeserverGeneric? hs = null; try { - access_token ??= _auth.GetToken(); - var mxid = _auth.GetUserId(); - var cacheFile = GetFilePath(mxid, since); + hs = await _hs.GetHomeserver(); + } + catch (Exception e) { + Console.WriteLine(); + } + var qs = HttpUtility.ParseQueryString(Request.QueryString.Value!); + qs.Remove("access_token"); - if (!await TrySendCached(cacheFile)) { - using var hc = new HttpClient(); - hc.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", access_token); - hc.Timeout = TimeSpan.FromMinutes(10); - //remove access_token from query string - Request.QueryString = new QueryString( - Request.QueryString.Value - .Replace("&access_token", "access_token") - .Replace($"access_token={access_token}", "") - ); + if (!_config.FastInitialSync.Enabled) { + _logger.LogInformation("Starting sync for {} on {} ({})", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); + var result = await hs._httpClient.GetAsync($"{Request.Path}?{qs}"); + await Response.WriteHttpResponse(result); + return; + } - var resp = hc.GetAsync($"{_config.Homeserver}{Request.Path}{Request.QueryString}").Result; - // var resp = await hs._httpClient.GetAsync($"/_matrix/client/v3/sync?since={since}"); + try { + var syncState = _syncStates.GetOrAdd(hs.AccessToken, _ => { + _logger.LogInformation("Started tracking sync state for {} on {} ({})", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); + return new SyncState { + IsInitialSync = string.IsNullOrWhiteSpace(since), + Homeserver = hs + }; + }); + + if (syncState.NextSyncResult is null) { + _logger.LogInformation("Starting sync for {} on {} ({})", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); - if (resp.Content is null) { - throw new MatrixException { - ErrorCode = "M_UNKNOWN", - Error = "No content in response" - }; + if (syncState.IsInitialSync) { + preloadTask = EnqueuePreloadData(syncState); } - Response.StatusCode = (int)resp.StatusCode; + syncState.NextSyncResultStartedAt = DateTime.Now; + syncState.NextSyncResult = Task.Delay(30_000); + syncState.NextSyncResult.ContinueWith(x => { + _logger.LogInformation("Sync for {} on {} ({}) starting", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); + syncState.NextSyncResult = hs._httpClient.GetAsync($"{Request.Path}?{qs}"); + }); + } + + if (syncState.SyncQueue.Count > 0) { + _logger.LogInformation("Sync for {} on {} ({}) has {} queued results", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken, syncState.SyncQueue.Count); + syncState.SyncQueue.TryDequeue(out var result); + + Response.StatusCode = StatusCodes.Status200OK; Response.ContentType = "application/json"; await Response.StartAsync(); - await using var stream = await resp.Content.ReadAsStreamAsync(); - await using var target = System.IO.File.OpenWrite(cacheFile); - var buffer = new byte[1]; + await JsonSerializer.SerializeAsync(Response.Body, result, new JsonSerializerOptions { + WriteIndented = true, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }); + await Response.CompleteAsync(); + return; + } - int bytesRead; - while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0) { - await Response.Body.WriteAsync(buffer.AsMemory(0, bytesRead)); - target.Write(buffer, 0, bytesRead); - } + timeout = Math.Clamp(timeout, 0, 100); + _logger.LogInformation("Sync for {} on {} ({}) is still running, waiting for {}ms, {} elapsed", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken, timeout, + DateTime.Now.Subtract(syncState.NextSyncResultStartedAt)); - await target.FlushAsync(); - await Response.CompleteAsync(); + try { + await syncState.NextSyncResult.WaitAsync(TimeSpan.FromMilliseconds(timeout)); + } + catch { } + + if (syncState.NextSyncResult is Task<HttpResponseMessage> { IsCompleted: true } response) { + _logger.LogInformation("Sync for {} on {} ({}) completed", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); + var resp = await response; + await Response.WriteHttpResponse(resp); + return; } + + // await Task.Delay(timeout); + _logger.LogInformation("Sync for {} on {} ({}): sending bogus response", hs.WhoAmI.UserId, hs.HomeServerDomain, hs.AccessToken); + Response.StatusCode = StatusCodes.Status200OK; + Response.ContentType = "application/json"; + await Response.StartAsync(); + var syncResult = new SyncResult { + // NextBatch = "MxApiExtensions::Next" + Random.Shared.NextInt64(), + NextBatch = since ?? "", + Presence = new() { + Events = new() { + await GetStatusMessage(syncState, $"{DateTime.Now.Subtract(syncState.NextSyncResultStartedAt)} {syncState.NextSyncResult.Status}") + } + }, + Rooms = new() { + Invite = new(), + Join = new() + } + }; + await JsonSerializer.SerializeAsync(Response.Body, syncResult, new JsonSerializerOptions { + WriteIndented = true, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull + }); + await Response.CompleteAsync(); } - catch (MatrixException e) { + catch (MxApiMatrixException e) { + _logger.LogError(e, "Error while syncing for {} on {} ({})", _hs.GetHomeserver().Result.WhoAmI.UserId, + _hs.GetHomeserver().Result.HomeServerDomain, _hs.GetHomeserver().Result.AccessToken); + Response.StatusCode = StatusCodes.Status500InternalServerError; Response.ContentType = "application/json"; await Response.WriteAsJsonAsync(e.GetAsJson()); await Response.CompleteAsync(); } + catch (Exception e) { + //catch SSL connection errors and retry + if (e.InnerException is HttpRequestException && e.InnerException.Message.Contains("The SSL connection could not be established")) { + _logger.LogWarning("Caught SSL connection error, retrying sync for {} on {} ({})", _hs.GetHomeserver().Result.WhoAmI.UserId, + _hs.GetHomeserver().Result.HomeServerDomain, _hs.GetHomeserver().Result.AccessToken); + await Sync(since, timeout); + return; + } + + _logger.LogError(e, "Error while syncing for {} on {} ({})", _hs.GetHomeserver().Result.WhoAmI.UserId, + _hs.GetHomeserver().Result.HomeServerDomain, _hs.GetHomeserver().Result.AccessToken); + Response.StatusCode = StatusCodes.Status500InternalServerError; Response.ContentType = "text/plain"; await Response.WriteAsync(e.ToString()); await Response.CompleteAsync(); } + + Response.Body.Close(); + if (preloadTask is not null) + await preloadTask; } - private async Task<bool> TrySendCached(string cacheFile) { - if (!System.IO.File.Exists(cacheFile)) return false; + private async Task EnqueuePreloadData(SyncState syncState) { + var rooms = await syncState.Homeserver.GetJoinedRooms(); + var dm_rooms = (await syncState.Homeserver.GetAccountData<Dictionary<string, List<string>>>("m.direct")).Aggregate(new List<string>(), (list, entry) => { + list.AddRange(entry.Value); + return list; + }); + + var ownHs = syncState.Homeserver.WhoAmI.UserId.Split(':')[1]; + rooms = rooms.OrderBy(x => { + if (dm_rooms.Contains(x.RoomId)) return -1; + var parts = x.RoomId.Split(':'); + if (parts[1] == ownHs) return 200; + if (HomeserverWeightEstimation.EstimatedSize.ContainsKey(parts[1])) return HomeserverWeightEstimation.EstimatedSize[parts[1]] + parts[0].Length; + return 5000; + }).ToList(); + var roomDataTasks = rooms.Select(room => EnqueueRoomData(syncState, room)).ToList(); + _logger.LogInformation("Preloading data for {} rooms on {} ({})", roomDataTasks.Count, syncState.Homeserver.HomeServerDomain, syncState.Homeserver.AccessToken); - Response.StatusCode = 200; - Response.ContentType = "application/json"; - await Response.StartAsync(); - await using var stream = System.IO.File.OpenRead(cacheFile); - await stream.CopyToAsync(Response.Body); - await Response.CompleteAsync(); - return true; + await Task.WhenAll(roomDataTasks); } -#region Cache management + private SemaphoreSlim _roomDataSemaphore = new(4, 4); - public string GetFilePath(string mxid, string since) { - var cacheDir = Path.Join("cache", mxid); - Directory.CreateDirectory(cacheDir); - var cacheFile = Path.Join(cacheDir, $"sync-{since}.json"); - if (!Path.GetFullPath(cacheFile).StartsWith(Path.GetFullPath(cacheDir))) { - throw new MatrixException { - ErrorCode = "M_UNKNOWN", - Error = "[Rory&::MxSyncCache] Cache file path is not in cache directory" - }; + private async Task EnqueueRoomData(SyncState syncState, GenericRoom room) { + await _roomDataSemaphore.WaitAsync(); + var roomState = room.GetFullStateAsync(); + var timeline = await room.GetMessagesAsync(limit: 100, dir: "b"); + timeline.Chunk.Reverse(); + var syncResult = new SyncResult { + Rooms = new() { + Join = new() { + { + room.RoomId, + new SyncResult.RoomsDataStructure.JoinedRoomDataStructure() { + AccountData = new() { + Events = new() + }, + Ephemeral = new() { + Events = new() + }, + State = new() { + Events = timeline.State + }, + UnreadNotifications = new() { + HighlightCount = 0, + NotificationCount = 0 + }, + Timeline = new() { + Events = timeline.Chunk, + Limited = false, + PrevBatch = timeline.Start + }, + Summary = new() { + Heroes = new(), + InvitedMemberCount = 0, + JoinedMemberCount = 1 + } + } + } + } + }, + Presence = new() { + Events = new() { + await GetStatusMessage(syncState, $"{DateTime.Now.Subtract(syncState.NextSyncResultStartedAt)} {syncState.NextSyncResult.Status} {room.RoomId}") + } + }, + NextBatch = "" + }; + + await foreach (var stateEvent in roomState) { + syncResult.Rooms.Join[room.RoomId].State.Events.Add(stateEvent); } - return cacheFile; + var joinRoom = syncResult.Rooms.Join[room.RoomId]; + joinRoom.Summary.Heroes.AddRange(joinRoom.State.Events + .Where(x => + x.Type == "m.room.member" + && x.StateKey != syncState.Homeserver.WhoAmI.UserId + && (x.TypedContent as RoomMemberEventData).Membership == "join" + ) + .Select(x => x.StateKey)); + joinRoom.Summary.JoinedMemberCount = joinRoom.Summary.Heroes.Count; + + syncState.SyncQueue.Enqueue(syncResult); + _roomDataSemaphore.Release(); } -#endregion + private async Task<StateEventResponse> GetStatusMessage(SyncState syncState, string message) { + return new StateEventResponse() { + TypedContent = new PresenceStateEventData() { + DisplayName = "MxApiExtensions", + Presence = "online", + StatusMessage = message, + // AvatarUrl = (await syncState.Homeserver.GetProfile(syncState.Homeserver.WhoAmI.UserId)).AvatarUrl + AvatarUrl = "" + }, + Type = "m.presence", + StateKey = syncState.Homeserver.WhoAmI.UserId, + Sender = syncState.Homeserver.WhoAmI.UserId, + UserId = syncState.Homeserver.WhoAmI.UserId, + EventId = Guid.NewGuid().ToString(), + OriginServerTs = 0 + }; + } }