summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/directory.py36
-rw-r--r--synapse/handlers/events.py12
-rw-r--r--synapse/handlers/federation.py319
-rw-r--r--synapse/handlers/message.py4
5 files changed, 271 insertions, 102 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d53cd3df3e..15adc9dc2c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -78,7 +78,7 @@ class BaseHandler(object):
 
         if not suppress_auth:
             logger.debug("Authing...")
-            self.auth.check(event, raises=True)
+            self.auth.check(event, auth_events=event.old_state_events)
             logger.debug("Authed")
         else:
             logger.debug("Suppressed auth.")
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index af4e7d49c8..3b37e49e6f 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 from ._base import BaseHandler
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import SynapseError, Codes, CodeMessageException
 from synapse.api.events.room import RoomAliasesEvent
 
 import logging
@@ -84,22 +84,32 @@ class DirectoryHandler(BaseHandler):
                 room_id = result.room_id
                 servers = result.servers
         else:
-            result = yield self.federation.make_query(
-                destination=room_alias.domain,
-                query_type="directory",
-                args={
-                    "room_alias": room_alias.to_string(),
-                },
-                retry_on_dns_fail=False,
-            )
+            try:
+                result = yield self.federation.make_query(
+                    destination=room_alias.domain,
+                    query_type="directory",
+                    args={
+                        "room_alias": room_alias.to_string(),
+                    },
+                    retry_on_dns_fail=False,
+                )
+            except CodeMessageException as e:
+                logging.warn("Error retrieving alias")
+                if e.code == 404:
+                    result = None
+                else:
+                    raise
 
             if result and "room_id" in result and "servers" in result:
                 room_id = result["room_id"]
                 servers = result["servers"]
 
         if not room_id:
-            defer.returnValue({})
-            return
+            raise SynapseError(
+                404,
+                "Room alias %r not found" % (room_alias.to_string(),),
+                Codes.NOT_FOUND
+            )
 
         extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
         servers = list(set(extra_servers) | set(servers))
@@ -129,7 +139,9 @@ class DirectoryHandler(BaseHandler):
             })
         else:
             raise SynapseError(
-                404, "Room alias \"%s\" not found" % (room_alias,)
+                404,
+                "Room alias %r not found" % (room_alias.to_string(),),
+                Codes.NOT_FOUND
             )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
             if auth_user not in self._streams_per_user:
                 self._streams_per_user[auth_user] = 0
                 if auth_user in self._stop_timer_per_user:
-                    self.clock.cancel_call_later(
-                        self._stop_timer_per_user.pop(auth_user))
+                    try:
+                        self.clock.cancel_call_later(
+                            self._stop_timer_per_user.pop(auth_user)
+                        )
+                    except:
+                        logger.exception("Failed to cancel event timer")
                 else:
                     yield self.distributor.fire(
                         "started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
                     logger.debug(
                         "_later stopped_user_eventstream %s", auth_user
                     )
+
+                    self._stop_timer_per_user.pop(auth_user, None)
+
                     yield self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
-                    del self._stop_timer_per_user[auth_user]
 
                 logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2e8b8a1f9a..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,13 +18,16 @@
 from ._base import BaseHandler
 
 from synapse.api.events.utils import prune_event
-from synapse.api.errors import AuthError, FederationError, SynapseError
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import (
+    AuthError, FederationError, SynapseError, StoreError,
+)
+from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
 from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import (
-    compute_event_signature, check_event_content_hash
+    compute_event_signature, check_event_content_hash,
+    add_hashes_and_signatures,
 )
 from syutil.jsonutil import encode_canonical_json
 
@@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu, backfilled, state=None):
+    def on_receive_pdu(self, origin, pdu, backfilled, state=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
         """
@@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append(pdu)
+            self.room_queues[event.room_id].append((pdu, origin))
             return
 
         logger.debug("Processing event: %s", event.event_id)
@@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
             )
             event = redacted_event
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(
-            event,
-            old_state=state
+        logger.debug("Event: %s", event)
+
+        # FIXME (erikj): Awful hack to make the case where we are not currently
+        # in the room work
+        current_state = None
+        is_in_room = yield self.auth.check_host_in_room(
+            event.room_id,
+            self.server_name
         )
+        if not is_in_room and not event.outlier:
+            logger.debug("Got event for room we're not in.")
+
+            replication_layer = self.replication_layer
+            auth_chain = yield replication_layer.get_event_auth(
+                origin,
+                context=event.room_id,
+                event_id=event.event_id,
+            )
 
-        logger.debug("Event: %s", event)
+            for e in auth_chain:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
+
+            if not state:
+                state = yield replication_layer.get_state_for_context(
+                    origin,
+                    context=event.room_id,
+                    event_id=event.event_id,
+                )
+
+            current_state = state
+
+        if state:
+            for e in state:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e)
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
         try:
-            self.auth.check(event, raises=True)
+            yield self._handle_new_event(
+                event,
+                state=state,
+                backfilled=backfilled,
+                current_state=current_state,
+            )
         except AuthError as e:
             raise FederationError(
                 "ERROR",
@@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
                 affected=event.event_id,
             )
 
-        is_new_state = is_new_state and not backfilled
-
-        # TODO: Implement something in federation that allows us to
-        # respond to PDU.
-
-        yield self.store.persist_event(
-            event,
-            backfilled,
-            is_new_state=is_new_state
-        )
-
         room = yield self.store.get_room(event.room_id)
 
         if not room:
-            # Huh, let's try and get the current state
             try:
-                yield self.replication_layer.get_state_for_context(
-                    event.origin, event.room_id, event.event_id,
-                )
-
-                hosts = yield self.store.get_joined_hosts_for_room(
-                    event.room_id
-                )
-                if self.hs.hostname in hosts:
-                    try:
-                        yield self.store.store_room(
-                            room_id=event.room_id,
-                            room_creator_user_id="",
-                            is_public=False,
-                        )
-                    except:
-                        pass
-            except:
-                logger.exception(
-                    "Failed to get current state for room %s",
-                    event.room_id
+                yield self.store.store_room(
+                    room_id=event.room_id,
+                    room_creator_user_id="",
+                    is_public=False,
                 )
+            except StoreError:
+                logger.exception("Failed to store room.")
 
         if not backfilled:
             extra_users = []
@@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
             pdu=event
         )
 
+
+
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
     def on_event_auth(self, event_id):
         auth = yield self.store.get_auth_chain(event_id)
+
+        for event in auth:
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
         We suspend processing of any received events from this room until we
         have finished processing the join.
         """
+        logger.debug("Joining %s to %s", joinee, room_id)
+
         pdu = yield self.replication_layer.make_join(
             target_host,
             room_id,
@@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
 
         try:
             event.event_id = self.event_factory.create_event_id()
+            event.origin = self.hs.hostname
             event.content = content
 
-            state = yield self.replication_layer.send_join(
+            if not hasattr(event, "signatures"):
+                event.signatures = {}
+
+            add_hashes_and_signatures(
+                event,
+                self.hs.hostname,
+                self.hs.config.signing_key[0],
+            )
+
+            ret = yield self.replication_layer.send_join(
                 target_host,
                 event
             )
 
-            logger.debug("do_invite_join state: %s", state)
+            state = ret["state"]
+            auth_chain = ret["auth_chain"]
+            auth_chain.sort(key=lambda e: e.depth)
 
-            yield self.state_handler.annotate_event_with_state(
-                event,
-                old_state=state
-            )
+            logger.debug("do_invite_join auth_chain: %s", auth_chain)
+            logger.debug("do_invite_join state: %s", state)
 
             logger.debug("do_invite_join event: %s", event)
 
@@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
+            for e in auth_chain:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
+
             for e in state:
                 # FIXME: Auth these.
                 e.outlier = True
+                try:
+                    yield self._handle_new_event(
+                        e,
+                        fetch_missing=True
+                    )
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
-                yield self.state_handler.annotate_event_with_state(
-                    e,
-                )
-
-                yield self.store.persist_event(
-                    e,
-                    backfilled=False,
-                    is_new_state=True
-                )
-
-            yield self.store.persist_event(
+            yield self._handle_new_event(
                 event,
-                backfilled=False,
-                is_new_state=True
+                state=state,
+                current_state=state,
             )
+
+            yield self.notifier.on_new_room_event(
+                event, extra_users=[joinee]
+            )
+
+            logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p in room_queue:
+            for p, origin in room_queue:
                 try:
-                    yield self.on_receive_pdu(p, backfilled=False)
+                    self.on_receive_pdu(origin, p, backfilled=False)
                 except:
-                    pass
+                    logger.exception("Couldn't handle pdu")
 
         defer.returnValue(True)
 
@@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
 
         yield self.state_handler.annotate_event_with_state(event)
         yield self.auth.add_auth_events(event)
-        self.auth.check(event, raises=True)
+        self.auth.check(event, auth_events=event.old_state_events)
 
         pdu = event
 
@@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
 
         event.outlier = False
 
-        state_handler = self.state_handler
-        is_new_state = yield state_handler.annotate_event_with_state(event)
-        self.auth.check(event, raises=True)
-
-        # FIXME (erikj):  All this is duplicated above :(
-
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
+        yield self._handle_new_event(event)
 
         extra_users = []
         if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
         )
 
         if event.type == RoomMemberEvent.TYPE:
-            if event.membership == Membership.JOIN:
+            if event.content["membership"] == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
@@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
                 else:
                     del results[(event.type, event.state_key)]
 
-            defer.returnValue(results.values())
+            res = results.values()
+            for event in res:
+                event.signatures.update(
+                    compute_event_signature(
+                        event,
+                        self.hs.hostname,
+                        self.hs.config.signing_key[0]
+                    )
+                )
+
+            defer.returnValue(res)
         else:
             defer.returnValue([])
 
@@ -529,7 +593,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id):
+    def get_persisted_pdu(self, origin, event_id, do_auth=True):
         """ Get a PDU from the database with given origin and id.
 
         Returns:
@@ -541,12 +605,24 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            in_room = yield self.auth.check_host_in_room(
-                event.room_id,
-                origin
+            # FIXME: This is a temporary work around where we occasionally
+            # return events slightly differently than when they were
+            # originally signed
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
             )
-            if not in_room:
-                raise AuthError(403, "Host not in room.")
+
+            if do_auth:
+                in_room = yield self.auth.check_host_in_room(
+                    event.room_id,
+                    origin
+                )
+                if not in_room:
+                    raise AuthError(403, "Host not in room.")
 
             defer.returnValue(event)
         else:
@@ -564,3 +640,78 @@ class FederationHandler(BaseHandler):
         )
         while waiters:
             waiters.pop().callback(None)
+
+    @defer.inlineCallbacks
+    def _handle_new_event(self, event, state=None, backfilled=False,
+                          current_state=None, fetch_missing=True):
+        is_new_state = yield self.state_handler.annotate_event_with_state(
+            event,
+            old_state=state
+        )
+
+        if event.old_state_events:
+            known_ids = set(
+                [s.event_id for s in event.old_state_events.values()]
+            )
+            for e_id, _ in event.auth_events:
+                if e_id not in known_ids:
+                    e = yield self.store.get_event(
+                        e_id,
+                        allow_none=True,
+                    )
+
+                    if not e:
+                        # TODO: Do some conflict res to make sure that we're
+                        # not the ones who are wrong.
+                        logger.info(
+                            "Rejecting %s as %s not in %s",
+                            event.event_id, e_id, known_ids,
+                        )
+                        raise AuthError(403, "Auth events are stale")
+
+            auth_events = event.old_state_events
+        else:
+            # We need to get the auth events from somewhere.
+
+            # TODO: Don't just hit the DBs?
+
+            auth_events = {}
+            for e_id, _ in event.auth_events:
+                e = yield self.store.get_event(
+                    e_id,
+                    allow_none=True,
+                )
+
+                if not e:
+                    e = yield self.replication_layer.get_pdu(
+                        event.origin, e_id, outlier=True
+                    )
+
+                    if e and fetch_missing:
+                        try:
+                            yield self.on_receive_pdu(event.origin, e, False)
+                        except:
+                            logger.exception(
+                                "Failed to parse auth event %s",
+                                e_id,
+                            )
+
+                if not e:
+                    logger.warn("Can't find auth event %s.", e_id)
+
+                auth_events[(e.type, e.state_key)] = e
+
+            if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+                if len(event.prev_events) == 1:
+                    c = yield self.store.get_event(event.prev_events[0][0])
+                    if c.type == RoomCreateEvent.TYPE:
+                        auth_events[(c.type, c.state_key)] = c
+
+        self.auth.check(event, auth_events=auth_events)
+
+        yield self.store.persist_event(
+            event,
+            backfilled=backfilled,
+            is_new_state=(is_new_state and not backfilled),
+            current_state=current_state,
+        )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 06a4e173f6..42dc4d46f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,7 +243,7 @@ class MessageHandler(BaseHandler):
         public_room_ids = [r["room_id"] for r in public_rooms]
 
         limit = pagin_config.limit
-        if not limit:
+        if limit is None:
             limit = 10
 
         for event in room_list:
@@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
         auth_user = self.hs.parse_userid(user_id)
 
         # TODO: These concurrently
-        state_tuples = yield self.store.get_current_state(room_id)
+        state_tuples = yield self.state_handler.get_current_state(room_id)
         state = [self.hs.serialize_event(x) for x in state_tuples]
 
         member_event = (yield self.store.get_room_member(