summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py92
1 files changed, 55 insertions, 37 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -22,6 +23,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import (
     AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    FederationDeniedError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
@@ -66,7 +68,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_replication_layer()
+        self.replication_layer = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -74,8 +76,7 @@ class FederationHandler(BaseHandler):
         self.is_mine_id = hs.is_mine_id
         self.pusher_pool = hs.get_pusherpool()
         self.spam_checker = hs.get_spam_checker()
-
-        self.replication_layer.set_handler(self)
+        self.event_creation_handler = hs.get_event_creation_handler()
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
                 except NotRetryingDestination as e:
                     logger.info(e.message)
                     continue
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    continue
                 except Exception as e:
                     logger.exception(
                         "Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
         event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
+        resolve = logcontext.preserve_fn(
+            self.state_handler.resolve_state_groups_for_events
+        )
         states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
-                    room_id, [e]
-                )
-                for e in event_ids
-            ], consumeErrors=True,
+            [resolve(room_id, [e]) for e in event_ids],
+            consumeErrors=True,
         ))
         states = dict(zip(event_ids, [s.state for s in states]))
 
@@ -1004,8 +1007,7 @@ class FederationHandler(BaseHandler):
         })
 
         try:
-            message_handler = self.hs.get_handlers().message_handler
-            event, context = yield message_handler._create_new_client_event(
+            event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder,
             )
         except AuthError as e:
@@ -1245,8 +1247,7 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(
+        event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
@@ -1444,16 +1445,24 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
-        if not event.internal_metadata.is_outlier() and not backfilled:
-            yield self.action_generator.handle_push_actions_for_event(
-                event, context
-            )
+        try:
+            if not event.internal_metadata.is_outlier() and not backfilled:
+                yield self.action_generator.handle_push_actions_for_event(
+                    event, context
+                )
 
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-            backfilled=backfilled,
-        )
+            event_stream_id, max_stream_id = yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=backfilled,
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area
+            logcontext.preserve_fn(
+                self.store.remove_push_actions_from_staging
+            )(event.event_id)
+            raise
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
@@ -1828,8 +1837,8 @@ class FederationHandler(BaseHandler):
                 current_state = set(e.event_id for e in auth_events.values())
                 different_auth = event_auth_events - current_state
 
-                self._update_context_for_auth_events(
-                    context, auth_events, event_key,
+                yield self._update_context_for_auth_events(
+                    event, context, auth_events, event_key,
                 )
 
         if different_auth and not event.internal_metadata.is_outlier():
@@ -1910,8 +1919,8 @@ class FederationHandler(BaseHandler):
                 # 4. Look at rejects and their proofs.
                 # TODO.
 
-                self._update_context_for_auth_events(
-                    context, auth_events, event_key,
+                yield self._update_context_for_auth_events(
+                    event, context, auth_events, event_key,
                 )
 
         try:
@@ -1920,11 +1929,15 @@ class FederationHandler(BaseHandler):
             logger.warn("Failed auth resolution for %r because %s", event, e)
             raise e
 
-    def _update_context_for_auth_events(self, context, auth_events,
+    @defer.inlineCallbacks
+    def _update_context_for_auth_events(self, event, context, auth_events,
                                         event_key):
-        """Update the state_ids in an event context after auth event resolution
+        """Update the state_ids in an event context after auth event resolution,
+        storing the changes as a new state group.
 
         Args:
+            event (Event): The event we're handling the context for
+
             context (synapse.events.snapshot.EventContext): event context
                 to be updated
 
@@ -1947,7 +1960,13 @@ class FederationHandler(BaseHandler):
         context.prev_state_ids.update({
             k: a.event_id for k, a in auth_events.iteritems()
         })
-        context.state_group = self.store.get_next_state_group()
+        context.state_group = yield self.store.store_state_group(
+            event.event_id,
+            event.room_id,
+            prev_group=context.prev_group,
+            delta_ids=context.delta_ids,
+            current_state_ids=context.current_state_ids,
+        )
 
     @defer.inlineCallbacks
     def construct_auth_difference(self, local_auth, remote_auth):
@@ -2117,8 +2136,7 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            message_handler = self.hs.get_handlers().message_handler
-            event, context = yield message_handler._create_new_client_event(
+            event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder
             )
 
@@ -2133,7 +2151,7 @@ class FederationHandler(BaseHandler):
                 raise e
 
             yield self._check_signature(event, context)
-            member_handler = self.hs.get_handlers().room_member_handler
+            member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2156,8 +2174,7 @@ class FederationHandler(BaseHandler):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(
+        event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
@@ -2178,7 +2195,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
 
-        member_handler = self.hs.get_handlers().room_member_handler
+        member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
@@ -2207,8 +2224,9 @@ class FederationHandler(BaseHandler):
 
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(builder=builder)
+        event, context = yield self.event_creation_handler.create_new_client_event(
+            builder=builder,
+        )
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks