summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py137
1 files changed, 100 insertions, 37 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index adafd06b24..d95e0b23b1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,7 @@ from synapse.api.errors import (
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
@@ -40,6 +40,7 @@ from synapse.events.utils import prune_event
 from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.push.action_generator import ActionGenerator
+from synapse.util.distributor import user_joined_room
 
 from twisted.internet import defer
 
@@ -49,10 +50,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user, room_id)
-
-
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -283,7 +280,14 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
+
+        This will attempt to get more events from the remote. This may return
+        be successfull and still return no events if the other side has no new
+        events to offer.
         """
+        if dest == self.server_name:
+            raise SynapseError(400, "Can't backfill from self.")
+
         if not extremities:
             extremities = yield self.store.get_oldest_events_in_room(room_id)
 
@@ -294,6 +298,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # Don't bother processing events we already have.
+        seen_events = yield self.store.have_events_in_timeline(
+            set(e.event_id for e in events)
+        )
+
+        events = [e for e in events if e.event_id not in seen_events]
+
+        if not events:
+            defer.returnValue([])
+
         event_map = {e.event_id: e for e in events}
 
         event_ids = set(e.event_id for e in events)
@@ -353,6 +367,7 @@ class FederationHandler(BaseHandler):
         for a in auth_events.values():
             if a.event_id in seen_events:
                 continue
+            a.internal_metadata.outlier = True
             ev_infos.append({
                 "event": a,
                 "auth_events": {
@@ -373,20 +388,23 @@ class FederationHandler(BaseHandler):
                 }
             })
 
+        yield self._handle_new_events(
+            dest, ev_infos,
+            backfilled=True,
+        )
+
         events.sort(key=lambda e: e.depth)
 
         for event in events:
             if event in events_to_state:
                 continue
 
-            ev_infos.append({
-                "event": event,
-            })
-
-        yield self._handle_new_events(
-            dest, ev_infos,
-            backfilled=True,
-        )
+            # We store these one at a time since each event depends on the
+            # previous to work out the state.
+            # TODO: We can probably do something more clever here.
+            yield self._handle_new_event(
+                dest, event
+            )
 
         defer.returnValue(events)
 
@@ -450,7 +468,7 @@ class FederationHandler(BaseHandler):
 
         likely_domains = [
             domain for domain, depth in curr_domains
-            if domain is not self.server_name
+            if domain != self.server_name
         ]
 
         @defer.inlineCallbacks
@@ -458,11 +476,15 @@ class FederationHandler(BaseHandler):
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
                 try:
-                    events = yield self.backfill(
+                    yield self.backfill(
                         dom, room_id,
                         limit=100,
                         extremities=[e for e in extremities.keys()]
                     )
+                    # If this succeeded then we probably already have the
+                    # appropriate stuff.
+                    # TODO: We can probably do something more intelligent here.
+                    defer.returnValue(True)
                 except SynapseError as e:
                     logger.info(
                         "Failed to backfill from %s because %s",
@@ -488,8 +510,6 @@ class FederationHandler(BaseHandler):
                     )
                     continue
 
-                if events:
-                    defer.returnValue(True)
             defer.returnValue(False)
 
         success = yield try_backfill(likely_domains)
@@ -661,9 +681,13 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
-            builder=builder,
-        )
+        try:
+            event, context = yield self._create_new_client_event(
+                builder=builder,
+            )
+        except AuthError as e:
+            logger.warn("Failed to create join %r because %s", event, e)
+            raise e
 
         self.auth.check(event, auth_events=context.current_state)
 
@@ -784,13 +808,19 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        origin, event = yield self._make_and_verify_event(
-            target_hosts,
-            room_id,
-            user_id,
-            "leave"
-        )
-        signed_event = self._sign_event(event)
+        try:
+            origin, event = yield self._make_and_verify_event(
+                target_hosts,
+                room_id,
+                user_id,
+                "leave"
+            )
+            signed_event = self._sign_event(event)
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         # Try the host we successfully got a response to /make_join/
         # request first.
@@ -800,10 +830,16 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
-            target_hosts,
-            signed_event
-        )
+        try:
+            yield self.replication_layer.send_leave(
+                target_hosts,
+                signed_event
+            )
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         context = yield self.state_handler.compute_event_context(event)
 
@@ -883,7 +919,11 @@ class FederationHandler(BaseHandler):
             builder=builder,
         )
 
-        self.auth.check(event, auth_events=context.current_state)
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as e:
+            logger.warn("Failed to create new leave %r because %s", event, e)
+            raise e
 
         defer.returnValue(event)
 
@@ -1064,7 +1104,8 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_event(self, origin, event, state=None, auth_events=None):
+    def _handle_new_event(self, origin, event, state=None, auth_events=None,
+                          backfilled=False):
         context = yield self._prep_event(
             origin, event,
             state=state,
@@ -1080,12 +1121,24 @@ class FederationHandler(BaseHandler):
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
+            backfilled=backfilled,
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
         )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
 
     @defer.inlineCallbacks
     def _handle_new_events(self, origin, event_infos, backfilled=False):
+        """Creates the appropriate contexts and persists events. The events
+        should not depend on one another, e.g. this should be used to persist
+        a bunch of outliers, but not a chunk of individual events that depend
+        on each other for state calculations.
+        """
         contexts = yield defer.gatherResults(
             [
                 self._prep_event(
@@ -1467,8 +1520,9 @@ class FederationHandler(BaseHandler):
 
         try:
             self.auth.check(event, auth_events=auth_events)
-        except AuthError:
-            raise
+        except AuthError as e:
+            logger.warn("Failed auth resolution for %r because %s", event, e)
+            raise e
 
     @defer.inlineCallbacks
     def construct_auth_difference(self, local_auth, remote_auth):
@@ -1644,7 +1698,12 @@ class FederationHandler(BaseHandler):
                 event_dict, event, context
             )
 
-            self.auth.check(event, context.current_state)
+            try:
+                self.auth.check(event, context.current_state)
+            except AuthError as e:
+                logger.warn("Denying new third party invite %r because %s", event, e)
+                raise e
+
             yield self._check_signature(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
             yield member_handler.send_membership_event(None, event, context)
@@ -1669,7 +1728,11 @@ class FederationHandler(BaseHandler):
             event_dict, event, context
         )
 
-        self.auth.check(event, auth_events=context.current_state)
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as e:
+            logger.warn("Denying third party invite %r because %s", event, e)
+            raise e
         yield self._check_signature(event, auth_events=context.current_state)
 
         returned_invite = yield self.send_invite(origin, event)