diff --git a/LibMatrix/Helpers/SyncStateResolver.cs b/LibMatrix/Helpers/SyncStateResolver.cs
index e2dbdee..e9c5938 100644
--- a/LibMatrix/Helpers/SyncStateResolver.cs
+++ b/LibMatrix/Helpers/SyncStateResolver.cs
@@ -1,3 +1,5 @@
+using System.Collections.Frozen;
+using System.Diagnostics;
using ArcaneLibs.Extensions;
using LibMatrix.Extensions;
using LibMatrix.Filters;
@@ -26,16 +28,10 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
_syncHelper.SetPresence = SetPresence;
_syncHelper.Filter = Filter;
_syncHelper.FullState = FullState;
- // run sync or grab from storage if available
- // var sync = storageProvider != null && await storageProvider.ObjectExistsAsync(Since ?? "init")
- // ? await storageProvider.LoadObjectAsync<SyncResponse>(Since ?? "init")
- // : await _syncHelper.SyncAsync(cancellationToken);
+
var sync = await _syncHelper.SyncAsync(cancellationToken);
if (sync is null) return await ContinueAsync(cancellationToken);
- // if (storageProvider != null && !await storageProvider.ObjectExistsAsync(Since ?? "init"))
- // await storageProvider.SaveObjectAsync(Since ?? "init", sync);
-
if (MergedState is null) MergedState = sync;
else MergedState = MergeSyncs(MergedState, sync);
Since = sync.NextBatch;
@@ -45,22 +41,98 @@ public class SyncStateResolver(AuthenticatedHomeserverGeneric homeserver, ILogge
public async Task OptimiseStore() {
if (storageProvider is null) return;
+ if (!await storageProvider.ObjectExistsAsync("init")) return;
+
+ Console.Write("Optimising sync store...");
+ var initLoadTask = storageProvider.LoadObjectAsync<SyncResponse>("init");
+ var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+ var count = keys.Count(x => !x.StartsWith("old/")) - 1;
+ Console.WriteLine($"Found {count} entries to optimise.");
- var keys = await storageProvider.GetAllKeysAsync();
- var count = keys.Count - 2;
- var merged = await storageProvider.LoadObjectAsync<SyncResponse>("init");
+ var merged = await initLoadTask;
if (merged is null) return;
+ if (!keys.Contains(merged.NextBatch)) {
+ Console.WriteLine("Next response after initial sync is not present, not checkpointing!");
+ return;
+ }
+
+ // We back up old entries
+ var oldPath = $"old/{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}";
+ await storageProvider.MoveObjectAsync("init", $"{oldPath}/init");
+
+ var moveTasks = new List<Task>();
while (keys.Contains(merged.NextBatch)) {
+ Console.Write($"Merging {merged.NextBatch}, {--count} remaining... ");
+ var sw = Stopwatch.StartNew();
+ var swt = Stopwatch.StartNew();
var next = await storageProvider.LoadObjectAsync<SyncResponse>(merged.NextBatch);
- if (next is null) break;
+ Console.Write($"Load {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ if (next is null || merged.NextBatch == next.NextBatch) break;
+
+ Console.Write($"Check {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ // back up old entry
+ moveTasks.Add(storageProvider.MoveObjectAsync(merged.NextBatch, $"{oldPath}/{merged.NextBatch}"));
+ Console.Write($"Move {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+
merged = MergeSyncs(merged, next);
- Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+ Console.Write($"Merge {sw.GetElapsedAndRestart().TotalMilliseconds}ms... ");
+ Console.WriteLine($"Total {swt.Elapsed.TotalMilliseconds}ms");
+ // Console.WriteLine($"Merged {merged.NextBatch}, {--count} remaining...");
+ }
+
+ await storageProvider.SaveObjectAsync("init", merged);
+ await Task.WhenAll(moveTasks);
+ }
+
+ public async Task UnrollOptimisedStore() {
+ if (storageProvider is null) return;
+ Console.WriteLine("WARNING: Unrolling sync store!");
+ }
+
+ public async Task SquashOptimisedStore(int targetCountPerCheckpoint) {
+ Console.Write($"Balancing optimised store to {targetCountPerCheckpoint} per checkpoint...");
+ var checkpoints = await GetCheckpointMap();
+ if (checkpoints is null) return;
+
+ Console.WriteLine(
+ $" Stats: {checkpoints.Count} checkpoints with [{checkpoints.Min(x => x.Value.Count)} < ~{checkpoints.Average(x => x.Value.Count)} < {checkpoints.Max(x => x.Value.Count)}] entries");
+ Console.WriteLine($"Found {checkpoints?.Count ?? 0} checkpoints.");
+ }
+
+ public async Task dev() {
+ var keys = (await storageProvider?.GetAllKeysAsync()).ToFrozenSet();
+ var times = new Dictionary<long, List<string>>();
+ var values = keys.Select(async x => Task.Run(async () => (x, await storageProvider?.LoadObjectAsync<SyncResponse>(x)))).ToAsyncEnumerable();
+ await foreach (var task in values) {
+ var (key, data) = await task;
+ if (data is null) continue;
+ var derivTime = data.GetDerivedSyncTime();
+ if (!times.ContainsKey(derivTime)) times[derivTime] = new();
+ times[derivTime].Add(key);
}
- await storageProvider.SaveObjectAsync("merged", merged);
+ foreach (var (time, ckeys) in times.OrderBy(x => x.Key)) {
+ Console.WriteLine($"{time}: {ckeys.Count} keys");
+ }
+ }
+
+ private async Task<Dictionary<ulong, List<string>>?> GetCheckpointMap() {
+ if (storageProvider is null) return null;
+ var keys = (await storageProvider.GetAllKeysAsync()).ToFrozenSet();
+ var map = new Dictionary<ulong, List<string>>();
+ foreach (var key in keys) {
+ if (!key.StartsWith("old/")) continue;
+ var parts = key.Split('/');
+ if (parts.Length < 3) continue;
+ // if (!map.ContainsKey(parts[1])) map[parts[1]] = new();
+ // map[parts[1]].Add(parts[2]);
+ if (!ulong.TryParse(parts[1], out var checkpoint)) continue;
+ if (!map.ContainsKey(checkpoint)) map[checkpoint] = new();
+ map[checkpoint].Add(parts[2]);
+ }
- Environment.Exit(0);
+ return map.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value);
}
private SyncResponse MergeSyncs(SyncResponse oldSync, SyncResponse newSync) {
|