diff options
Diffstat (limited to 'LibMatrix/Helpers/SyncStateResolver.cs')
-rw-r--r-- | LibMatrix/Helpers/SyncStateResolver.cs | 100 |
1 files changed, 86 insertions, 14 deletions
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs index e2dbdee..e9c5938 100644 --- a/LibMatrix/Helpers/SyncStateResolver.cs +++ b/LibMatrix/Helpers/SyncStateResolver.cs @@ -1,3 +1,5 @@ +using System.Collections.Frozen; +using System.Diagnostics; using ArcaneLibs.Extensions; using LibMatrix.Extensions; using LibMatrix.Filters; @@ -26,16 +28,10 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge _syncHelper.SetPresence = SetPresence; _syncHelper.Filter = Filter; _syncHelper.FullState = FullState; - // run sync or grab from storage if available - // var sync = storageProvider != null && await storageProvider.ObjectExistsAsync(Since ?? "init") - // ? await storageProvider.LoadObjectAsync<SyncResponse>(Since ?? "init") - // : await _syncHelper.SyncAsync(cancellationToken); + var sync = await _syncHelper.SyncAsync(cancellationToken); if (sync is null) return await ContinueAsync(cancellationToken); - // if (storageProvider != null && !await storageProvider.ObjectExistsAsync(Since ?? "init")) - // await storageProvider.SaveObjectAsync(Since ?? "init", sync); - if (MergedState is null) MergedState = sync; else MergedState = MergeSyncs(MergedState, sync); Since = sync.NextBatch; @@ -45,22 +41,98 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge public async Task OptimiseStore() { if (storageProvider is null) return; + if (!await storageProvider.ObjectExistsAsync("init")) return; + + Console.Write("Optimising sync store..."); + var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init"); + var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet(); + var count = keys.Count(x => !x.StartsWith("old/")) - 1; + Console.WriteLine($"Found {count} entries to optimise."); - var keys = await storageProvider.GetAllKeysAsync(); - var count = keys.Count - 2; - var merged = await storageProvider.LoadObjectAsync<SyncResponse>("init"); + var merged = await initLoadTask; if (merged is null) return; + if (!keys.Contains(merged.NextBatch)) { + Console.WriteLine("Next response after initial sync is not present, not checkpointing!"); + return; + } + + // We back up old entries + var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; + await storageProvider.MoveObjectAsync("init", $"{oldPath}/init"); + + var moveTasks = new List<Task>(); while (keys.Contains(merged.NextBatch)) { + Console.Write($"Merging {merged.NextBatch}, {--count} remaining... "); + var sw = Stopwatch.StartNew(); + var swt = Stopwatch.StartNew(); var next = await storageProvider.LoadObjectAsync<SyncResponse>(merged.NextBatch); - if (next is null) break; + Console.Write($"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + if (next is null || merged.NextBatch == next.NextBatch) break; + + Console.Write($"Check {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + // back up old entry + moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}")); + Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + merged = MergeSyncs(merged, next); - Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining..."); + Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... "); + Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms"); + // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining..."); + } + + await storageProvider.SaveObjectAsync("init", merged); + await Task.WhenAll(moveTasks); + } + + public async Task UnrollOptimisedStore() { + if (storageProvider is null) return; + Console.WriteLine("WARNING: Unrolling sync store!"); + } + + public async Task SquashOptimisedStore(int targetCountPerCheckpoint) { + Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint..."); + var checkpoints = await GetCheckpointMap(); + if (checkpoints is null) return; + + Console.WriteLine( + $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries"); + Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints."); + } + + public async Task dev() { + var keys = (await storageProvider?.GetAllKeysAsync()).ToFrozenSet(); + var times = new Dictionary<long, List<string>>(); + var values = keys.Select(async x => Task.Run(async () => (x, await storageProvider?.LoadObjectAsync<SyncResponse>(x)))).ToAsyncEnumerable(); + await foreach (var task in values) { + var (key, data) = await task; + if (data is null) continue; + var derivTime = data.GetDerivedSyncTime(); + if (!times.ContainsKey(derivTime)) times[derivTime] = new(); + times[derivTime].Add(key); } - await storageProvider.SaveObjectAsync("merged", merged); + foreach (var (time, ckeys) in times.OrderBy(x => x.Key)) { + Console.WriteLine($"{time}: {ckeys.Count} keys"); + } + } + + private async Task<Dictionary<ulong, List<string>>?> GetCheckpointMap() { + if (storageProvider is null) return null; + var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet(); + var map = new Dictionary<ulong, List<string>>(); + foreach (var key in keys) { + if (!key.StartsWith("old/")) continue; + var parts = key.Split('/'); + if (parts.Length < 3) continue; + // if (!map.ContainsKey(parts[1])) map[parts[1]] = new(); + // map[parts[1]].Add(parts[2]); + if (!ulong.TryParse(parts[1], out var checkpoint)) continue; + if (!map.ContainsKey(checkpoint)) map[checkpoint] = new(); + map[checkpoint].Add(parts[2]); + } - Environment.Exit(0); + return map.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value); } private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync) { |