summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--CHANGES.rst8
-rw-r--r--VERSION2
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py10
-rw-r--r--synapse/federation/replication.py73
-rw-r--r--synapse/handlers/events.py12
-rw-r--r--synapse/handlers/federation.py141
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--tests/handlers/test_federation.py6
9 files changed, 190 insertions, 66 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 207f1e4d74..6779a36f72 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,11 @@
+Changes in synapse 0.5.3 (2014-11-27)
+=====================================
+
+ * Fix bug that caused joining a remote room to fail if a single event was not
+   signed correctly.
+ * Fix bug which caused servers to continuously try and fetch events from other
+   servers.
+
 Changes in synapse 0.5.2 (2014-11-26)
 =====================================
 
diff --git a/VERSION b/VERSION
index cb0c939a93..be14282b7f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.2
+0.5.3
diff --git a/synapse/__init__.py b/synapse/__init__.py
index d5c2f25484..c99cd96537 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a synapse home server.
 """
 
-__version__ = "0.5.2"
+__version__ = "0.5.3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index fb911e51a6..2b0475543d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -202,7 +202,10 @@ class Auth(object):
 
             # Invites are valid iff caller is in the room and target isn't.
             if not caller_in_room:  # caller isn't joined
-                raise AuthError(403, "You are not in room %s." % event.room_id)
+                raise AuthError(
+                    403,
+                    "%s not in room %s." % (event.user_id, event.room_id,)
+                )
             elif target_in_room:  # the target is already in the room.
                 raise AuthError(403, "%s is already in the room." %
                                      target_user_id)
@@ -225,7 +228,10 @@ class Auth(object):
             # TODO (erikj): Implement kicks.
 
             if not caller_in_room:  # trying to leave a room you aren't joined
-                raise AuthError(403, "You are not in room %s." % event.room_id)
+                raise AuthError(
+                    403,
+                    "%s not in room %s." % (target_user_id, event.room_id,)
+                )
             elif target_user_id != event.user_id:
                 if kick_level:
                     kick_level = int(kick_level)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 6bfb30b42d..312d69fcaa 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
+    def get_event_auth(self, destination, context, event_id):
+        res = yield self.transport_layer.get_event_auth(
+            destination, context, event_id,
+        )
+
+        auth_chain = [
+            self.event_from_pdu_json(p, outlier=True)
+            for p in res["auth_chain"]
+        ]
+
+        auth_chain.sort(key=lambda e: e.depth)
+
+        defer.returnValue(auth_chain)
+
+    @defer.inlineCallbacks
+    @log_function
     def on_backfill_request(self, origin, context, versions, limit):
         pdus = yield self.handler.on_backfill_request(
             origin, context, versions, limit
@@ -549,34 +565,34 @@ class ReplicationLayer(object):
         state = None
 
         # We need to make sure we have all the auth events.
-        for e_id, _ in pdu.auth_events:
-            exists = yield self._get_persisted_pdu(
-                origin,
-                e_id,
-                do_auth=False
-            )
-
-            if not exists:
-                try:
-                    logger.debug(
-                        "_handle_new_pdu fetch missing auth event %s from %s",
-                        e_id,
-                        origin,
-                    )
-
-                    yield self.get_pdu(
-                        origin,
-                        event_id=e_id,
-                        outlier=True,
-                    )
-
-                    logger.debug("Processed pdu %s", e_id)
-                except:
-                    logger.warn(
-                        "Failed to get auth event %s from %s",
-                        e_id,
-                        origin
-                    )
+        # for e_id, _ in pdu.auth_events:
+        #     exists = yield self._get_persisted_pdu(
+        #         origin,
+        #         e_id,
+        #         do_auth=False
+        #     )
+        #
+        #     if not exists:
+        #         try:
+        #             logger.debug(
+        #                 "_handle_new_pdu fetch missing auth event %s from %s",
+        #                 e_id,
+        #                 origin,
+        #             )
+        #
+        #             yield self.get_pdu(
+        #                 origin,
+        #                 event_id=e_id,
+        #                 outlier=True,
+        #             )
+        #
+        #             logger.debug("Processed pdu %s", e_id)
+        #         except:
+        #             logger.warn(
+        #                 "Failed to get auth event %s from %s",
+        #                 e_id,
+        #                 origin
+        #             )
 
         # Get missing pdus if necessary.
         if not pdu.outlier:
@@ -626,6 +642,7 @@ class ReplicationLayer(object):
 
         if not backfilled:
             ret = yield self.handler.on_receive_pdu(
+                origin,
                 pdu,
                 backfilled=backfilled,
                 state=state,
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
             if auth_user not in self._streams_per_user:
                 self._streams_per_user[auth_user] = 0
                 if auth_user in self._stop_timer_per_user:
-                    self.clock.cancel_call_later(
-                        self._stop_timer_per_user.pop(auth_user))
+                    try:
+                        self.clock.cancel_call_later(
+                            self._stop_timer_per_user.pop(auth_user)
+                        )
+                    except:
+                        logger.exception("Failed to cancel event timer")
                 else:
                     yield self.distributor.fire(
                         "started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
                     logger.debug(
                         "_later stopped_user_eventstream %s", auth_user
                     )
+
+                    self._stop_timer_per_user.pop(auth_user, None)
+
                     yield self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
-                    del self._stop_timer_per_user[auth_user]
 
                 logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fcef602055..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -101,7 +101,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu, backfilled, state=None):
+    def on_receive_pdu(self, origin, pdu, backfilled, state=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
         """
@@ -112,7 +112,7 @@ class FederationHandler(BaseHandler):
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append(pdu)
+            self.room_queues[event.room_id].append((pdu, origin))
             return
 
         logger.debug("Processing event: %s", event.event_id)
@@ -149,14 +149,49 @@ class FederationHandler(BaseHandler):
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
         current_state = None
-        if state:
-            is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
-                self.server_name
+        is_in_room = yield self.auth.check_host_in_room(
+            event.room_id,
+            self.server_name
+        )
+        if not is_in_room and not event.outlier:
+            logger.debug("Got event for room we're not in.")
+
+            replication_layer = self.replication_layer
+            auth_chain = yield replication_layer.get_event_auth(
+                origin,
+                context=event.room_id,
+                event_id=event.event_id,
             )
-            if not is_in_room:
-                logger.debug("Got event for room we're not in.")
-                current_state = state
+
+            for e in auth_chain:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
+
+            if not state:
+                state = yield replication_layer.get_state_for_context(
+                    origin,
+                    context=event.room_id,
+                    event_id=event.event_id,
+                )
+
+            current_state = state
+
+        if state:
+            for e in state:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e)
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
         try:
             yield self._handle_new_event(
@@ -251,6 +286,16 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def on_event_auth(self, event_id):
         auth = yield self.store.get_auth_chain(event_id)
+
+        for event in auth:
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -310,6 +355,7 @@ class FederationHandler(BaseHandler):
 
             state = ret["state"]
             auth_chain = ret["auth_chain"]
+            auth_chain.sort(key=lambda e: e.depth)
 
             logger.debug("do_invite_join auth_chain: %s", auth_chain)
             logger.debug("do_invite_join state: %s", state)
@@ -328,23 +374,32 @@ class FederationHandler(BaseHandler):
 
             for e in auth_chain:
                 e.outlier = True
-                yield self._handle_new_event(e)
-                yield self.notifier.on_new_room_event(
-                    e, extra_users=[joinee]
-                )
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
 
             for e in state:
                 # FIXME: Auth these.
                 e.outlier = True
-                yield self._handle_new_event(e)
-                yield self.notifier.on_new_room_event(
-                    e, extra_users=[joinee]
-                )
+                try:
+                    yield self._handle_new_event(
+                        e,
+                        fetch_missing=True
+                    )
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
             yield self._handle_new_event(
                 event,
                 state=state,
-                current_state=state
+                current_state=state,
             )
 
             yield self.notifier.on_new_room_event(
@@ -356,9 +411,9 @@ class FederationHandler(BaseHandler):
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p in room_queue:
+            for p, origin in room_queue:
                 try:
-                    self.on_receive_pdu(p, backfilled=False)
+                    self.on_receive_pdu(origin, p, backfilled=False)
                 except:
                     logger.exception("Couldn't handle pdu")
 
@@ -507,7 +562,17 @@ class FederationHandler(BaseHandler):
                 else:
                     del results[(event.type, event.state_key)]
 
-            defer.returnValue(results.values())
+            res = results.values()
+            for event in res:
+                event.signatures.update(
+                    compute_event_signature(
+                        event,
+                        self.hs.hostname,
+                        self.hs.config.signing_key[0]
+                    )
+                )
+
+            defer.returnValue(res)
         else:
             defer.returnValue([])
 
@@ -540,6 +605,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
+            # FIXME: This is a temporary work around where we occasionally
+            # return events slightly differently than when they were
+            # originally signed
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
             if do_auth:
                 in_room = yield self.auth.check_host_in_room(
                     event.room_id,
@@ -567,11 +643,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _handle_new_event(self, event, state=None, backfilled=False,
-                          current_state=None):
-        if state:
-            for s in state:
-                yield self._handle_new_event(s)
-
+                          current_state=None, fetch_missing=True):
         is_new_state = yield self.state_handler.annotate_event_with_state(
             event,
             old_state=state
@@ -611,11 +683,22 @@ class FederationHandler(BaseHandler):
                 )
 
                 if not e:
-                    raise AuthError(
-                        403,
-                        "Can't find auth event %s." % (e_id, )
+                    e = yield self.replication_layer.get_pdu(
+                        event.origin, e_id, outlier=True
                     )
 
+                    if e and fetch_missing:
+                        try:
+                            yield self.on_receive_pdu(event.origin, e, False)
+                        except:
+                            logger.exception(
+                                "Failed to parse auth event %s",
+                                e_id,
+                            )
+
+                if not e:
+                    logger.warn("Can't find auth event %s.", e_id)
+
                 auth_events[(e.type, e.state_key)] = e
 
             if event.type == RoomMemberEvent.TYPE and not event.auth_events:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b84735e61c..3405cb365e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT *, (%(redacted)s) AS redacted FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? "
+            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
         ) % {
             "redacted": del_sql,
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 98cfbe50b3..33016c16ef 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -42,6 +42,7 @@ class FederationTestCase(unittest.TestCase):
 
         self.auth = NonCallableMock(spec_set=[
             "check",
+            "check_host_in_room",
         ])
 
         self.hostname = "test"
@@ -89,13 +90,16 @@ class FederationTestCase(unittest.TestCase):
 
         self.datastore.persist_event.return_value = defer.succeed(None)
         self.datastore.get_room.return_value = defer.succeed(True)
+        self.auth.check_host_in_room.return_value = defer.succeed(True)
 
         def annotate(ev, old_state=None):
             ev.old_state_events = []
             return defer.succeed(False)
         self.state_handler.annotate_event_with_state.side_effect = annotate
 
-        yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
+        yield self.handlers.federation_handler.on_receive_pdu(
+            "fo", pdu, False
+        )
 
         self.datastore.persist_event.assert_called_once_with(
             ANY, is_new_state=False, backfilled=False, current_state=None