Don't process events we've already processed. Remember to process state events
1 files changed, 22 insertions, 0 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index caf6a1f8eb..acbb53d6c5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -250,6 +250,7 @@ class FederationHandler(BaseHandler):
# For each edge get the current state.
auth_events = {}
+ state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = yield self.replication_layer.get_state_for_room(
@@ -258,8 +259,13 @@ class FederationHandler(BaseHandler):
event_id=e_id
)
auth_events.update({a.event_id: a for a in auth})
+ state_events.update({s.event_id: s for s in state})
events_to_state[e_id] = state
+ seen_events = yield self.store.have_events(
+ set(auth_events.keys()) | set(state_events.keys())
+ )
+
yield defer.gatherResults(
[
self._handle_new_event(
@@ -270,6 +276,22 @@ class FederationHandler(BaseHandler):
}
)
for a in auth_events.values()
+ if a.event_id not in seen_events
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ yield defer.gatherResults(
+ [
+ self._handle_new_event(
+ dest, s,
+ auth_events={
+ (e.type, e.state_key): e for e in auth_events
+ if e.event_id in [a_id for a_id, _ in s.auth_events]
+ }
+ )
+ for s in state_events.values()
+ if s.event_id not in seen_events
],
consumeErrors=True,
).addErrback(unwrapFirstError)
|