diff --git a/Tests/LibMatrix.HomeserverEmulator/Controllers/SyncController.cs b/Tests/LibMatrix.HomeserverEmulator/Controllers/SyncController.cs
index afcf711..024d071 100644
--- a/Tests/LibMatrix.HomeserverEmulator/Controllers/SyncController.cs
+++ b/Tests/LibMatrix.HomeserverEmulator/Controllers/SyncController.cs
@@ -1,6 +1,8 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using ArcaneLibs.Extensions;
+using LibMatrix.EventTypes.Spec.State;
+using LibMatrix.HomeserverEmulator.Extensions;
using LibMatrix.HomeserverEmulator.Services;
using LibMatrix.Responses;
using Microsoft.AspNetCore.Mvc;
@@ -15,11 +17,6 @@ public class SyncController(ILogger<SyncController> logger, TokenService tokenSe
public async Task<SyncResponse> Sync([FromQuery] string? since = null, [FromQuery] int? timeout = 5000) {
var sw = Stopwatch.StartNew();
var token = tokenService.GetAccessToken(HttpContext);
- if (token == null)
- throw new MatrixException() {
- ErrorCode = "M_MISSING_TOKEN",
- Error = "Missing token"
- };
var user = await userStore.GetUserByToken(token);
if (user == null)
@@ -28,63 +25,67 @@ public class SyncController(ILogger<SyncController> logger, TokenService tokenSe
Error = "No such user"
};
var session = user.AccessTokens[token];
+ UserStore.User.SessionInfo.UserSyncState newSyncState = new();
+
+ SyncResponse syncResp;
+ if (string.IsNullOrWhiteSpace(since) || !session.SyncStates.ContainsKey(since))
+ syncResp = InitialSync(user, session);
+ else {
+ var syncState = session.SyncStates[since];
+ newSyncState = syncState.Clone();
- if (string.IsNullOrWhiteSpace(since))
- return InitialSync(user, session);
+ var newSyncToken = Guid.NewGuid().ToString();
+ do {
+ syncResp = IncrementalSync(user, session, syncState);
+ syncResp.NextBatch = newSyncToken;
+ } while (!await HasDataOrStall(syncResp) && sw.ElapsedMilliseconds < timeout);
- if (!session.SyncStates.TryGetValue(since, out var syncState))
- if (!cfg.UnknownSyncTokenIsInitialSync)
- throw new MatrixException() {
- ErrorCode = "M_UNKNOWN",
- Error = "Unknown sync token."
+ if (sw.ElapsedMilliseconds > timeout) {
+ logger.LogTrace("Sync timed out after {Elapsed}", sw.Elapsed);
+ return new() {
+ NextBatch = since
};
- else
- return InitialSync(user, session);
+ }
+ }
- var response = new SyncResponse() {
- NextBatch = Guid.NewGuid().ToString(),
- DeviceOneTimeKeysCount = new()
- };
+ session.SyncStates[syncResp.NextBatch] = RecalculateSyncStates(newSyncState, syncResp);
+ logger.LogTrace("Responding to sync after {totalElapsed}", sw.Elapsed);
+ return syncResp;
+ }
- session.SyncStates.Add(response.NextBatch, new() {
- RoomPositions = syncState.RoomPositions.ToDictionary(x => x.Key, x => new UserStore.User.SessionInfo.UserSyncState.SyncRoomPosition() {
- TimelinePosition = roomStore._rooms.First(y => y.RoomId == x.Key).Timeline.Count,
- AccountDataPosition = roomStore._rooms.First(y => y.RoomId == x.Key).AccountData[user.UserId].Count
- })
- });
-
- if (!string.IsNullOrWhiteSpace(since)) {
- while (sw.ElapsedMilliseconds < timeout && response.Rooms?.Join is not { Count: > 0 }) {
- await Task.Delay(100);
- var rooms = roomStore._rooms.Where(x => x.State.Any(y => y.Type == "m.room.member" && y.StateKey == user.UserId)).ToList();
- foreach (var room in rooms) {
- var roomPositions = syncState.RoomPositions[room.RoomId];
-
- response.Rooms ??= new();
- response.Rooms.Join ??= new();
- response.Rooms.Join[room.RoomId] = new() {
- Timeline = new(events: room.Timeline.Skip(roomPositions.TimelinePosition).ToList(), limited: false),
- AccountData = new(room.AccountData.GetOrCreate(user.UserId, _ => []).Skip(roomPositions.AccountDataPosition).ToList())
- };
- if (response.Rooms.Join[room.RoomId].Timeline?.Events?.Count > 0)
- response.Rooms.Join[room.RoomId].State = new(response.Rooms.Join[room.RoomId].Timeline!.Events.Where(x => x.StateKey != null).ToList());
- session.SyncStates[response.NextBatch].RoomPositions[room.RoomId] = new() {
- TimelinePosition = room.Timeline.Count,
- AccountDataPosition = room.AccountData[user.UserId].Count
- };
+ private UserStore.User.SessionInfo.UserSyncState RecalculateSyncStates(UserStore.User.SessionInfo.UserSyncState newSyncState, SyncResponse sync) {
+ logger.LogTrace("Updating sync state");
+ var syncStateRecalcSw = Stopwatch.StartNew();
+ foreach (var (roomId, roomData) in sync.Rooms?.Join ?? []) {
+ if (!newSyncState.RoomPositions.ContainsKey(roomId))
+ newSyncState.RoomPositions[roomId] = new();
+ var state = newSyncState.RoomPositions[roomId];
- if (response.Rooms.Join[room.RoomId].State?.Events?.Count == 0 &&
- response.Rooms.Join[room.RoomId].Timeline?.Events?.Count == 0 &&
- response.Rooms.Join[room.RoomId].AccountData?.Events?.Count == 0
- )
- response.Rooms.Join.Remove(room.RoomId);
- }
- }
+ state.TimelinePosition += roomData.Timeline?.Events?.Count ?? 0;
+ state.LastTimelineEventId = roomData.Timeline?.Events?.LastOrDefault()?.EventId ?? state.LastTimelineEventId;
+ state.AccountDataPosition += roomData.AccountData?.Events?.Count ?? 0;
+ state.Joined = true;
}
- return response;
+ foreach (var (roomId, _) in sync.Rooms?.Invite ?? []) {
+ if (!newSyncState.RoomPositions.ContainsKey(roomId))
+ newSyncState.RoomPositions[roomId] = new() {
+ Joined = false
+ };
+ }
+
+ foreach (var (roomId, _) in sync.Rooms?.Leave ?? []) {
+ if (newSyncState.RoomPositions.ContainsKey(roomId))
+ newSyncState.RoomPositions.Remove(roomId);
+ }
+
+ logger.LogTrace("Updated sync state in {Elapsed}", syncStateRecalcSw.Elapsed);
+
+ return newSyncState;
}
+#region Initial Sync parts
+
private SyncResponse InitialSync(UserStore.User user, UserStore.User.SessionInfo session) {
var response = new SyncResponse() {
NextBatch = Guid.NewGuid().ToString(),
@@ -98,17 +99,174 @@ public class SyncController(ILogger<SyncController> logger, TokenService tokenSe
foreach (var room in rooms) {
response.Rooms ??= new();
response.Rooms.Join ??= new();
+
response.Rooms.Join[room.RoomId] = new() {
State = new(room.State.ToList()),
Timeline = new(events: room.Timeline.ToList(), limited: false),
AccountData = new(room.AccountData.GetOrCreate(user.UserId, _ => []).ToList())
};
- session.SyncStates[response.NextBatch].RoomPositions[room.RoomId] = new() {
- TimelinePosition = room.Timeline.Count,
- AccountDataPosition = room.AccountData[user.UserId].Count
- };
}
return response;
}
+
+ private SyncResponse.RoomsDataStructure.JoinedRoomDataStructure GetInitialSyncRoomData(RoomStore.Room room, UserStore.User user) {
+ return new() {
+ State = new(room.State.ToList()),
+ Timeline = new(room.Timeline.ToList(), false),
+ AccountData = new(room.AccountData.GetOrCreate(user.UserId, _ => []).ToList())
+ };
+ }
+
+#endregion
+
+ private SyncResponse IncrementalSync(UserStore.User user, UserStore.User.SessionInfo session, UserStore.User.SessionInfo.UserSyncState syncState) {
+ return new SyncResponse {
+ Rooms = GetIncrementalSyncRooms(user, session, syncState)
+ };
+ }
+
+#region Incremental Sync parts
+
+ private SyncResponse.RoomsDataStructure GetIncrementalSyncRooms(UserStore.User user, UserStore.User.SessionInfo session, UserStore.User.SessionInfo.UserSyncState syncState) {
+ SyncResponse.RoomsDataStructure data = new() {
+ Join = [],
+ Invite = [],
+ Leave = []
+ };
+
+ // step 1: check previously synced rooms
+ foreach (var (roomId, roomPosition) in syncState.RoomPositions) {
+ var room = roomStore.GetRoomById(roomId);
+ if (room == null) {
+ // room no longer exists
+ data.Leave[roomId] = new();
+ continue;
+ }
+
+ if (roomPosition.Joined) {
+ var newTimelineEvents = room.Timeline.Skip(roomPosition.TimelinePosition).ToList();
+ var newAccountDataEvents = room.AccountData[user.UserId].Skip(roomPosition.AccountDataPosition).ToList();
+ if (newTimelineEvents.Count == 0 && newAccountDataEvents.Count == 0) continue;
+ data.Join[room.RoomId] = new() {
+ State = new(newTimelineEvents.GetCalculatedState()),
+ Timeline = new(newTimelineEvents, false)
+ };
+ }
+ }
+
+ if (data.Join.Count > 0) return data;
+
+ // step 2: check newly joined rooms
+ var untrackedRooms = roomStore._rooms.Where(r => !syncState.RoomPositions.ContainsKey(r.RoomId)).ToList();
+
+ var allJoinedRooms = roomStore.GetRoomsByMember(user.UserId).ToArray();
+ if (allJoinedRooms.Length == 0) return data;
+ var rooms = Random.Shared.GetItems(allJoinedRooms, Math.Min(allJoinedRooms.Length, 50));
+ foreach (var membership in rooms) {
+ var membershipContent = membership.TypedContent as RoomMemberEventContent ??
+ throw new InvalidOperationException("Membership event content is not RoomMemberEventContent");
+ var room = roomStore.GetRoomById(membership.RoomId!);
+ //handle leave
+ if (syncState.RoomPositions.TryGetValue(membership.RoomId!, out var syncPosition)) {
+ // logger.LogTrace("Found sync position {roomId} {value}", room.RoomId, syncPosition.ToJson(indent: false, ignoreNull: false));
+
+ if (membershipContent.Membership == "join") {
+ var newTimelineEvents = room.Timeline.Skip(syncPosition.TimelinePosition).ToList();
+ var newAccountDataEvents = room.AccountData[user.UserId].Skip(syncPosition.AccountDataPosition).ToList();
+ if (newTimelineEvents.Count == 0 && newAccountDataEvents.Count == 0) continue;
+ data.Join[room.RoomId] = new() {
+ State = new(newTimelineEvents.GetCalculatedState()),
+ Timeline = new(newTimelineEvents, false)
+ };
+ }
+ }
+ else {
+ //syncPosisition = null
+ if (membershipContent.Membership == "join") {
+ var joinData = data.Join[membership.RoomId!] = new() {
+ State = new(room.State.ToList()),
+ Timeline = new(events: room.Timeline.ToList(), limited: false),
+ AccountData = new(room.AccountData.GetOrCreate(user.UserId, _ => []).ToList())
+ };
+ }
+ }
+ }
+
+ //handle nonexistant rooms
+ foreach (var roomId in syncState.RoomPositions.Keys) {
+ if (!roomStore._rooms.Any(x => x.RoomId == roomId)) {
+ data.Leave[roomId] = new();
+ session.SyncStates[session.SyncStates.Last().Key].RoomPositions.Remove(roomId);
+ }
+ }
+
+ return data;
+ }
+
+#endregion
+
+ private async Task<bool> HasDataOrStall(SyncResponse resp) {
+ // logger.LogTrace("Checking if sync response has data: {resp}", resp.ToJson(indent: false, ignoreNull: true));
+ // if (resp.AccountData?.Events?.Count > 0) return true;
+ // if (resp.Rooms?.Invite?.Count > 0) return true;
+ // if (resp.Rooms?.Join?.Count > 0) return true;
+ // if (resp.Rooms?.Leave?.Count > 0) return true;
+ // if (resp.Presence?.Events?.Count > 0) return true;
+ // if (resp.DeviceLists?.Changed?.Count > 0) return true;
+ // if (resp.DeviceLists?.Left?.Count > 0) return true;
+ // if (resp.ToDevice?.Events?.Count > 0) return true;
+ //
+ // var hasData =
+ // resp is not {
+ // AccountData: null or {
+ // Events: null or { Count: 0 }
+ // },
+ // Rooms: null or {
+ // Invite: null or { Count: 0 },
+ // Join: null or { Count: 0 },
+ // Leave: null or { Count: 0 }
+ // },
+ // Presence: null or {
+ // Events: null or { Count: 0 }
+ // },
+ // DeviceLists: null or {
+ // Changed: null or { Count: 0 },
+ // Left: null or { Count: 0 }
+ // },
+ // ToDevice: null or {
+ // Events: null or { Count: 0 }
+ // }
+ // };
+
+ var hasData = resp is {
+ AccountData: {
+ Events: { Count: > 0 }
+ }
+ } or {
+ Presence: {
+ Events: { Count: > 0 }
+ }
+ } or {
+ DeviceLists: {
+ Changed: { Count: > 0 },
+ Left: { Count: > 0 }
+ }
+ } or {
+ ToDevice: {
+ Events: { Count: > 0 }
+ }
+ };
+
+ if (!hasData) {
+ // hasData =
+ }
+
+ if (!hasData) {
+ // logger.LogDebug($"Sync response has no data, stalling for 1000ms: {resp.ToJson(indent: false, ignoreNull: true)}");
+ await Task.Delay(10);
+ }
+
+ return hasData;
+ }
}
\ No newline at end of file
|