1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
using System.Diagnostics;
using LibMatrix;
using LibMatrix.EventTypes.Spec.State.Policy;
using Microsoft.Extensions.Logging;
namespace MatrixAntiDmSpam.Core;
public class PolicyStore(ILogger<PolicyStore> logger) {
public Dictionary<string, StateEventResponse> AllPolicies { get; } = [];
#region Single policy events
/// <summary>
/// Fired when any policy event is received
/// </summary>
public List<Func<StateEventResponse, Task>> OnPolicyReceived { get; } = [];
/// <summary>
/// Fired when a new policy is added
/// </summary>
public List<Func<StateEventResponse, Task>> OnPolicyAdded { get; } = [];
/// <summary>
/// Fired when a policy is updated, without being removed.
/// </summary>
public List<Func<StateEventResponse, StateEventResponse, Task>> OnPolicyUpdated { get; } = [];
/// <summary>
/// Fired when a policy is removed.
/// </summary>
public List<Func<StateEventResponse, StateEventResponse, Task>> OnPolicyRemoved { get; } = [];
#endregion
#region Bulk policy events
/// <summary>
/// Fired when any policy event is received
/// </summary>
public List<Func<(List<StateEventResponse> NewPolicies,
List<(StateEventResponse Old, StateEventResponse New)> UpdatedPolicies,
List<(StateEventResponse Old, StateEventResponse New)> RemovedPolicies), Task>> OnPoliciesChanged { get; } = [];
#endregion
public async Task AddPoliciesAsync(IEnumerable<StateEventResponse> events) {
var policyEvents = events
.Where(evt => evt.TypedContent is PolicyRuleEventContent)
.ToList();
List<StateEventResponse> newPolicies = new();
List<(StateEventResponse Old, StateEventResponse New)> updatedPolicies = new();
List<(StateEventResponse Old, StateEventResponse New)> removedPolicies = new();
var sw = Stopwatch.StartNew();
try {
foreach (var evt in policyEvents) {
var eventKey = $"{evt.RoomId}:{evt.Type}:{evt.StateKey}";
if (evt.TypedContent is not PolicyRuleEventContent policy) continue;
if (policy.GetNormalizedRecommendation() is "m.ban") {
if (AllPolicies.TryGetValue(eventKey, out var oldContent))
updatedPolicies.Add((oldContent, evt));
else
newPolicies.Add(evt);
AllPolicies[eventKey] = evt;
}
else if (AllPolicies.Remove(eventKey, out var oldContent))
removedPolicies.Add((oldContent, evt));
}
}
catch (Exception e) {
Console.WriteLine(e);
}
logger.LogInformation("Processed {Count} policies in {Elapsed}", policyEvents.Count, sw.Elapsed);
// execute all the callbacks in parallel, as much as possible...
await Task.WhenAll(
Task.WhenAll(
policyEvents.Select(evt =>
Task.WhenAll(OnPolicyReceived.Select(callback => callback(evt)).ToList())
)
),
Task.WhenAll(
newPolicies.Select(evt =>
Task.WhenAll(OnPolicyAdded.Select(callback => callback(evt)).ToList())
)
),
Task.WhenAll(
updatedPolicies.Select(evt =>
Task.WhenAll(OnPolicyUpdated.Select(callback => callback(evt.Old, evt.New)).ToList())
)
),
Task.WhenAll(
removedPolicies.Select(evt =>
Task.WhenAll(OnPolicyRemoved.Select(callback => callback(evt.Old, evt.New)).ToList())
)
),
Task.WhenAll(OnPoliciesChanged.Select(callback => callback((newPolicies, updatedPolicies, removedPolicies))).ToList())
);
}
private async Task AddPolicyAsync(StateEventResponse evt) { }
}
|