about summary refs log tree commit diff
path: root/LibMatrix/Helpers
diff options
context:
space:
mode:
authorRory& <root@rory.gay>2024-08-23 02:55:07 +0200
committerRory& <root@rory.gay>2024-08-23 02:55:07 +0200
commitf50ed7ccc4347907d3c5ec6b68e1b84c4e0a7a0e (patch)
treed77d1d1f30e0ea01051561d8caaadeed2fdcf439 /LibMatrix/Helpers
parentMinor cleanup (diff)
downloadLibMatrix-f50ed7ccc4347907d3c5ec6b68e1b84c4e0a7a0e.tar.xz
Synapse admin API stuff, a mass of other changes
Diffstat (limited to 'LibMatrix/Helpers')
-rw-r--r--LibMatrix/Helpers/SyncHelper.cs9
-rw-r--r--LibMatrix/Helpers/SyncStateResolver.cs100
2 files changed, 94 insertions, 15 deletions
diff --git a/LibMatrix/Helpers/SyncHelper.cs b/LibMatrix/Helpers/SyncHelper.cs
index 07c3bb0..7d5364b 100644
--- a/LibMatrix/Helpers/SyncHelper.cs
+++ b/LibMatrix/Helpers/SyncHelper.cs
@@ -52,6 +52,12 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
 
     public TimeSpan MinimumDelay { get; set; } = new(0);
 
+    public async Task<int> GetUnoptimisedStoreCount() {
+        if (storageProvider is null) return -1;
+        var keys = await storageProvider.GetAllKeysAsync();
+        return keys.Count(x => !x.StartsWith("old/")) - 1;
+    }
+
     private async Task UpdateFilterAsync() {
         if (!string.IsNullOrWhiteSpace(NamedFilterName)) {
             _filterId = await homeserver.NamedCaches.FilterCache.GetOrSetValueAsync(NamedFilterName);
@@ -101,7 +107,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
         if (!string.IsNullOrWhiteSpace(Since)) url += $"&since={Since}";
         if (_filterId is not null) url += $"&filter={_filterId}";
 
-        logger?.LogInformation("SyncHelper: Calling: {}", url);
+        // logger?.LogInformation("SyncHelper: Calling: {}", url);
 
         try {
             var httpResp = await homeserver.ClientHttpClient.GetAsync(url, cancellationToken ?? CancellationToken.None);
@@ -111,6 +117,7 @@ public class SyncHelper(AuthenticatedHomeserverGeneric homeserver, ILogger? logg
             var resp = await httpResp.Content.ReadFromJsonAsync(cancellationToken: cancellationToken ?? CancellationToken.None,
                 jsonTypeInfo: SyncResponseSerializerContext.Default.SyncResponse);
             logger?.LogInformation("Deserialized sync response: {} bytes, {} elapsed, {} total", httpResp.GetContentLength(), deserializeSw.Elapsed, sw.Elapsed);
+
             var timeToWait = MinimumDelay.Subtract(sw.Elapsed);
             if (timeToWait.TotalMilliseconds > 0)
                 await Task.Delay(timeToWait);
diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index e2dbdee..e9c5938 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,3 +1,5 @@
+using System.Collections.Frozen;
+using System.Diagnostics;
 using ArcaneLibs.Extensions;
 using LibMatrix.Extensions;
 using LibMatrix.Filters;
@@ -26,16 +28,10 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
         _syncHelper.SetPresence = SetPresence;
         _syncHelper.Filter = Filter;
         _syncHelper.FullState = FullState;
-        // run sync or grab from storage if available
-        // var sync = storageProvider != null && await storageProvider.ObjectExistsAsync(Since ?? "init")
-        //     ? await storageProvider.LoadObjectAsync<SyncResponse>(Since ?? "init")
-        //     : await _syncHelper.SyncAsync(cancellationToken);
+
         var sync = await _syncHelper.SyncAsync(cancellationToken);
         if (sync is null) return await ContinueAsync(cancellationToken);
 
-        // if (storageProvider != null && !await storageProvider.ObjectExistsAsync(Since ?? "init"))
-            // await storageProvider.SaveObjectAsync(Since ?? "init", sync);
-
         if (MergedState is null) MergedState = sync;
         else MergedState = MergeSyncs(MergedState, sync);
         Since = sync.NextBatch;
@@ -45,22 +41,98 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
 
     public async Task OptimiseStore() {
         if (storageProvider is null) return;
+        if (!await storageProvider.ObjectExistsAsync("init")) return;
+
+        Console.Write("Optimising sync store...");
+        var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init");
+        var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+        var count = keys.Count(x => !x.StartsWith("old/")) - 1;
+        Console.WriteLine($"Found {count} entries to optimise.");
 
-        var keys = await storageProvider.GetAllKeysAsync();
-        var count = keys.Count - 2;
-        var merged = await storageProvider.LoadObjectAsync<SyncResponse>("init");
+        var merged = await initLoadTask;
         if (merged is null) return;
+        if (!keys.Contains(merged.NextBatch)) {
+            Console.WriteLine("Next response after initial sync is not present, not checkpointing!");
+            return;
+        }
+
+        // We back up old entries
+        var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
+        await storageProvider.MoveObjectAsync("init", $"{oldPath}/init");
+
+        var moveTasks = new List<Task>();
 
         while (keys.Contains(merged.NextBatch)) {
+            Console.Write($"Merging {merged.NextBatch}, {--count} remaining... ");
+            var sw = Stopwatch.StartNew();
+            var swt = Stopwatch.StartNew();
             var next = await storageProvider.LoadObjectAsync<SyncResponse>(merged.NextBatch);
-            if (next is null) break;
+            Console.Write($"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+            if (next is null || merged.NextBatch == next.NextBatch) break;
+
+            Console.Write($"Check {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+            // back up old entry
+            moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}"));
+            Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+
             merged = MergeSyncs(merged, next);
-            Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+            Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+            Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms");
+            // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+        }
+
+        await storageProvider.SaveObjectAsync("init", merged);
+        await Task.WhenAll(moveTasks);
+    }
+
+    public async Task UnrollOptimisedStore() {
+        if (storageProvider is null) return;
+        Console.WriteLine("WARNING: Unrolling sync store!");
+    }
+
+    public async Task SquashOptimisedStore(int targetCountPerCheckpoint) {
+        Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint...");
+        var checkpoints = await GetCheckpointMap();
+        if (checkpoints is null) return;
+
+        Console.WriteLine(
+            $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries");
+        Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints.");
+    }
+
+    public async Task dev() {
+        var keys = (await storageProvider?.GetAllKeysAsync()).ToFrozenSet();
+        var times = new Dictionary<long, List<string>>();
+        var values = keys.Select(async x => Task.Run(async () => (x, await storageProvider?.LoadObjectAsync<SyncResponse>(x)))).ToAsyncEnumerable();
+        await foreach (var task in values) {
+            var (key, data) = await task;
+            if (data is null) continue;
+            var derivTime = data.GetDerivedSyncTime();
+            if (!times.ContainsKey(derivTime)) times[derivTime] = new();
+            times[derivTime].Add(key);
         }
 
-        await storageProvider.SaveObjectAsync("merged", merged);
+        foreach (var (time, ckeys) in times.OrderBy(x => x.Key)) {
+            Console.WriteLine($"{time}: {ckeys.Count} keys");
+        }
+    }
+
+    private async Task<Dictionary<ulong, List<string>>?> GetCheckpointMap() {
+        if (storageProvider is null) return null;
+        var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+        var map = new Dictionary<ulong, List<string>>();
+        foreach (var key in keys) {
+            if (!key.StartsWith("old/")) continue;
+            var parts = key.Split('/');
+            if (parts.Length < 3) continue;
+            // if (!map.ContainsKey(parts[1])) map[parts[1]] = new();
+            // map[parts[1]].Add(parts[2]);
+            if (!ulong.TryParse(parts[1], out var checkpoint)) continue;
+            if (!map.ContainsKey(checkpoint)) map[checkpoint] = new();
+            map[checkpoint].Add(parts[2]);
+        }
 
-        Environment.Exit(0);
+        return map.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value);
     }
 
     private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync) {