summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py34
1 files changed, 20 insertions, 14 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 46bcf8b081..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_replication_layer()
+        self.replication_layer = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.replication_layer.set_handler(self)
-
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@@ -1447,16 +1445,24 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
-        if not event.internal_metadata.is_outlier() and not backfilled:
-            yield self.action_generator.handle_push_actions_for_event(
-                event, context
-            )
+        try:
+            if not event.internal_metadata.is_outlier() and not backfilled:
+                yield self.action_generator.handle_push_actions_for_event(
+                    event, context
+                )
 
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-            backfilled=backfilled,
-        )
+            event_stream_id, max_stream_id = yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=backfilled,
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area
+            logcontext.preserve_fn(
+                self.store.remove_push_actions_from_staging
+            )(event.event_id)
+            raise
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
@@ -2145,7 +2151,7 @@ class FederationHandler(BaseHandler):
                 raise e
 
             yield self._check_signature(event, context)
-            member_handler = self.hs.get_handlers().room_member_handler
+            member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2189,7 +2195,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
 
-        member_handler = self.hs.get_handlers().room_member_handler
+        member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks