summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2014-12-10 16:14:17 +0000
committerMark Haines <mark.haines@matrix.org>2014-12-10 16:14:17 +0000
commit61fc37e467bafe8b1178ec35daf0655049b3cc73 (patch)
treefd47305db5a3d6d8f08514d0c1620628b35495a2 /synapse
parentimport Image as PIL.Image. (diff)
parentpoint the entry_point for synapse-homeserver at the right method (diff)
downloadsynapse-61fc37e467bafe8b1178ec35daf0655049b3cc73.tar.xz
Merge branch 'develop' into media_repository
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py148
-rw-r--r--synapse/api/events/__init__.py1
-rw-r--r--synapse/config/logger.py6
-rw-r--r--synapse/config/server.py7
-rw-r--r--synapse/federation/replication.py97
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/events.py12
-rw-r--r--synapse/handlers/federation.py306
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/presence.py15
-rw-r--r--synapse/handlers/room.py6
-rw-r--r--synapse/rest/room.py6
-rw-r--r--synapse/state.py2
-rw-r--r--synapse/storage/__init__.py59
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/schema/delta/v8.sql34
-rw-r--r--synapse/storage/schema/event_signatures.sql2
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--synapse/streams/config.py6
21 files changed, 512 insertions, 209 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 14564e735e..723e15d506 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a synapse home server.
 """
 
-__version__ = "0.5.0"
+__version__ = "0.5.4"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index c4597c1757..2b0475543d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -38,79 +38,66 @@ class Auth(object):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
 
-    def check(self, event, raises=False):
+    def check(self, event, auth_events):
         """ Checks if this event is correctly authed.
 
         Returns:
             True if the auth checks pass.
-        Raises:
-            AuthError if there was a problem authorising this event. This will
-            be raised only if raises=True.
         """
         try:
-            if hasattr(event, "room_id"):
-                if event.old_state_events is None:
-                    # Oh, we don't know what the state of the room was, so we
-                    # are trusting that this is allowed (at least for now)
-                    logger.warn("Trusting event: %s", event.event_id)
-                    return True
-
-                if hasattr(event, "outlier") and event.outlier is True:
-                    # TODO (erikj): Auth for outliers is done differently.
-                    return True
+            if not hasattr(event, "room_id"):
+                raise AuthError(500, "Event has no room_id: %s" % event)
+            if auth_events is None:
+                # Oh, we don't know what the state of the room was, so we
+                # are trusting that this is allowed (at least for now)
+                logger.warn("Trusting event: %s", event.event_id)
+                return True
 
-                if event.type == RoomCreateEvent.TYPE:
-                    # FIXME
-                    return True
+            if event.type == RoomCreateEvent.TYPE:
+                # FIXME
+                return True
 
-                # FIXME: Temp hack
-                if event.type == RoomAliasesEvent.TYPE:
-                    return True
+            # FIXME: Temp hack
+            if event.type == RoomAliasesEvent.TYPE:
+                return True
 
-                if event.type == RoomMemberEvent.TYPE:
-                    allowed = self.is_membership_change_allowed(event)
-                    if allowed:
-                        logger.debug("Allowing! %s", event)
-                    else:
-                        logger.debug("Denying! %s", event)
-                    return allowed
+            if event.type == RoomMemberEvent.TYPE:
+                allowed = self.is_membership_change_allowed(
+                    event, auth_events
+                )
+                if allowed:
+                    logger.debug("Allowing! %s", event)
+                else:
+                    logger.debug("Denying! %s", event)
+                return allowed
 
-                self.check_event_sender_in_room(event)
-                self._can_send_event(event)
+            self.check_event_sender_in_room(event, auth_events)
+            self._can_send_event(event, auth_events)
 
-                if event.type == RoomPowerLevelsEvent.TYPE:
-                    self._check_power_levels(event)
+            if event.type == RoomPowerLevelsEvent.TYPE:
+                self._check_power_levels(event, auth_events)
 
-                if event.type == RoomRedactionEvent.TYPE:
-                    self._check_redaction(event)
+            if event.type == RoomRedactionEvent.TYPE:
+                self._check_redaction(event, auth_events)
 
-                logger.debug("Allowing! %s", event)
-                return True
-            else:
-                raise AuthError(500, "Unknown event: %s" % event)
+            logger.debug("Allowing! %s", event)
         except AuthError as e:
             logger.info(
                 "Event auth check failed on event %s with msg: %s",
                 event, e.msg
             )
             logger.info("Denying! %s", event)
-            if raises:
-                raise
-
-        return False
+            raise
 
     @defer.inlineCallbacks
     def check_joined_room(self, room_id, user_id):
-        try:
-            member = yield self.store.get_room_member(
-                room_id=room_id,
-                user_id=user_id
-            )
-            self._check_joined_room(member, user_id, room_id)
-            defer.returnValue(member)
-        except AttributeError:
-            pass
-        defer.returnValue(None)
+        member = yield self.state.get_current_state(
+            room_id=room_id,
+            event_type=RoomMemberEvent.TYPE,
+            state_key=user_id
+        )
+        self._check_joined_room(member, user_id, room_id)
+        defer.returnValue(member)
 
     @defer.inlineCallbacks
     def check_host_in_room(self, room_id, host):
@@ -130,9 +117,9 @@ class Auth(object):
 
         defer.returnValue(False)
 
-    def check_event_sender_in_room(self, event):
+    def check_event_sender_in_room(self, event, auth_events):
         key = (RoomMemberEvent.TYPE, event.user_id, )
-        member_event = event.state_events.get(key)
+        member_event = auth_events.get(key)
 
         return self._check_joined_room(
             member_event,
@@ -147,14 +134,14 @@ class Auth(object):
             ))
 
     @log_function
-    def is_membership_change_allowed(self, event):
+    def is_membership_change_allowed(self, event, auth_events):
         membership = event.content["membership"]
 
         # Check if this is the room creator joining:
         if len(event.prev_events) == 1 and Membership.JOIN == membership:
             # Get room creation event:
             key = (RoomCreateEvent.TYPE, "", )
-            create = event.old_state_events.get(key)
+            create = auth_events.get(key)
             if create and event.prev_events[0][0] == create.event_id:
                 if create.content["creator"] == event.state_key:
                     return True
@@ -163,19 +150,19 @@ class Auth(object):
 
         # get info about the caller
         key = (RoomMemberEvent.TYPE, event.user_id, )
-        caller = event.old_state_events.get(key)
+        caller = auth_events.get(key)
 
         caller_in_room = caller and caller.membership == Membership.JOIN
         caller_invited = caller and caller.membership == Membership.INVITE
 
         # get info about the target
         key = (RoomMemberEvent.TYPE, target_user_id, )
-        target = event.old_state_events.get(key)
+        target = auth_events.get(key)
 
         target_in_room = target and target.membership == Membership.JOIN
 
         key = (RoomJoinRulesEvent.TYPE, "", )
-        join_rule_event = event.old_state_events.get(key)
+        join_rule_event = auth_events.get(key)
         if join_rule_event:
             join_rule = join_rule_event.content.get(
                 "join_rule", JoinRules.INVITE
@@ -186,11 +173,13 @@ class Auth(object):
         user_level = self._get_power_level_from_event_state(
             event,
             event.user_id,
+            auth_events,
         )
 
         ban_level, kick_level, redact_level = (
             self._get_ops_level_from_event_state(
-                event
+                event,
+                auth_events,
             )
         )
 
@@ -213,7 +202,10 @@ class Auth(object):
 
             # Invites are valid iff caller is in the room and target isn't.
             if not caller_in_room:  # caller isn't joined
-                raise AuthError(403, "You are not in room %s." % event.room_id)
+                raise AuthError(
+                    403,
+                    "%s not in room %s." % (event.user_id, event.room_id,)
+                )
             elif target_in_room:  # the target is already in the room.
                 raise AuthError(403, "%s is already in the room." %
                                      target_user_id)
@@ -236,7 +228,10 @@ class Auth(object):
             # TODO (erikj): Implement kicks.
 
             if not caller_in_room:  # trying to leave a room you aren't joined
-                raise AuthError(403, "You are not in room %s." % event.room_id)
+                raise AuthError(
+                    403,
+                    "%s not in room %s." % (target_user_id, event.room_id,)
+                )
             elif target_user_id != event.user_id:
                 if kick_level:
                     kick_level = int(kick_level)
@@ -260,9 +255,9 @@ class Auth(object):
 
         return True
 
-    def _get_power_level_from_event_state(self, event, user_id):
+    def _get_power_level_from_event_state(self, event, user_id, auth_events):
         key = (RoomPowerLevelsEvent.TYPE, "", )
-        power_level_event = event.old_state_events.get(key)
+        power_level_event = auth_events.get(key)
         level = None
         if power_level_event:
             level = power_level_event.content.get("users", {}).get(user_id)
@@ -270,16 +265,16 @@ class Auth(object):
                 level = power_level_event.content.get("users_default", 0)
         else:
             key = (RoomCreateEvent.TYPE, "", )
-            create_event = event.old_state_events.get(key)
+            create_event = auth_events.get(key)
             if (create_event is not None and
                     create_event.content["creator"] == user_id):
                 return 100
 
         return level
 
-    def _get_ops_level_from_event_state(self, event):
+    def _get_ops_level_from_event_state(self, event, auth_events):
         key = (RoomPowerLevelsEvent.TYPE, "", )
-        power_level_event = event.old_state_events.get(key)
+        power_level_event = auth_events.get(key)
 
         if power_level_event:
             return (
@@ -375,6 +370,11 @@ class Auth(object):
         key = (RoomMemberEvent.TYPE, event.user_id, )
         member_event = event.old_state_events.get(key)
 
+        key = (RoomCreateEvent.TYPE, "", )
+        create_event = event.old_state_events.get(key)
+        if create_event:
+            auth_events.append(create_event.event_id)
+
         if join_rule_event:
             join_rule = join_rule_event.content.get("join_rule")
             is_public = join_rule == JoinRules.PUBLIC if join_rule else False
@@ -406,9 +406,9 @@ class Auth(object):
         event.auth_events = zip(auth_events, hashes)
 
     @log_function
-    def _can_send_event(self, event):
+    def _can_send_event(self, event, auth_events):
         key = (RoomPowerLevelsEvent.TYPE, "", )
-        send_level_event = event.old_state_events.get(key)
+        send_level_event = auth_events.get(key)
         send_level = None
         if send_level_event:
             send_level = send_level_event.content.get("events", {}).get(
@@ -432,6 +432,7 @@ class Auth(object):
         user_level = self._get_power_level_from_event_state(
             event,
             event.user_id,
+            auth_events,
         )
 
         if user_level:
@@ -468,14 +469,16 @@ class Auth(object):
 
         return True
 
-    def _check_redaction(self, event):
+    def _check_redaction(self, event, auth_events):
         user_level = self._get_power_level_from_event_state(
             event,
             event.user_id,
+            auth_events,
         )
 
         _, _, redact_level = self._get_ops_level_from_event_state(
-            event
+            event,
+            auth_events,
         )
 
         if user_level < redact_level:
@@ -484,7 +487,7 @@ class Auth(object):
                 "You don't have permission to redact events"
             )
 
-    def _check_power_levels(self, event):
+    def _check_power_levels(self, event, auth_events):
         user_list = event.content.get("users", {})
         # Validate users
         for k, v in user_list.items():
@@ -499,7 +502,7 @@ class Auth(object):
                 raise SynapseError(400, "Not a valid power level: %s" % (v,))
 
         key = (event.type, event.state_key, )
-        current_state = event.old_state_events.get(key)
+        current_state = auth_events.get(key)
 
         if not current_state:
             return
@@ -507,6 +510,7 @@ class Auth(object):
         user_level = self._get_power_level_from_event_state(
             event,
             event.user_id,
+            auth_events,
         )
 
         # Check other levels:
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index 8a35b4cb7d..22939d011a 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -125,6 +125,7 @@ class SynapseEvent(JsonEncodedObject):
         pdu_json.pop("outlier", None)
         pdu_json.pop("replaces_state", None)
         pdu_json.pop("redacted", None)
+        pdu_json.pop("prev_content", None)
         state_hash = pdu_json.pop("state_hash", None)
         if state_hash is not None:
             pdu_json.setdefault("unsigned", {})["state_hash"] = state_hash
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 8566296433..089d906fa5 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -52,12 +52,18 @@ class LoggingConfig(Config):
         if self.log_config is None:
 
             level = logging.INFO
+            level_for_storage = logging.INFO
             if self.verbosity:
                 level = logging.DEBUG
+                if self.verbosity > 1:
+                    level_for_storage = logging.DEBUG
 
             # FIXME: we need a logging.WARN for a -q quiet option
             logger = logging.getLogger('')
             logger.setLevel(level)
+
+            logging.getLogger('synapse.storage').setLevel(level_for_storage)
+
             formatter = logging.Formatter(log_format)
             if self.log_file:
                 handler = logging.FileHandler(self.log_file)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 814a4c349b..f8a0844b8c 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -35,8 +35,11 @@ class ServerConfig(Config):
         if not args.content_addr:
             host = args.server_name
             if ':' not in host:
-                host = "%s:%d" % (host, args.bind_port)
-            args.content_addr = "https://%s" % (host,)
+                host = "%s:%d" % (host, args.unsecure_port)
+            else:
+                host = host.split(':')[0]
+                host = "%s:%d" % (host, args.unsecure_port)
+            args.content_addr = "http://%s" % (host,)
 
         self.content_addr = args.content_addr
 
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index fa2463d4a3..01f87fe423 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
+    def get_event_auth(self, destination, context, event_id):
+        res = yield self.transport_layer.get_event_auth(
+            destination, context, event_id,
+        )
+
+        auth_chain = [
+            self.event_from_pdu_json(p, outlier=True)
+            for p in res["auth_chain"]
+        ]
+
+        auth_chain.sort(key=lambda e: e.depth)
+
+        defer.returnValue(auth_chain)
+
+    @defer.inlineCallbacks
+    @log_function
     def on_backfill_request(self, origin, context, versions, limit):
         pdus = yield self.handler.on_backfill_request(
             origin, context, versions, limit
@@ -481,11 +497,17 @@ class ReplicationLayer(object):
         # FIXME: We probably want to do something with the auth_chain given
         # to us
 
-        # auth_chain = [
-        #    Pdu(outlier=True, **p) for p in content.get("auth_chain", [])
-        # ]
+        auth_chain = [
+            self.event_from_pdu_json(p, outlier=True)
+            for p in content.get("auth_chain", [])
+        ]
+
+        auth_chain.sort(key=lambda e: e.depth)
 
-        defer.returnValue(state)
+        defer.returnValue({
+            "state": state,
+            "auth_chain": auth_chain,
+        })
 
     @defer.inlineCallbacks
     def send_invite(self, destination, context, event_id, pdu):
@@ -543,20 +565,34 @@ class ReplicationLayer(object):
         state = None
 
         # We need to make sure we have all the auth events.
-        for e_id, _ in pdu.auth_events:
-            exists = yield self._get_persisted_pdu(
-                origin,
-                e_id,
-                do_auth=False
-            )
-
-            if not exists:
-                yield self.get_pdu(
-                    origin,
-                    event_id=e_id,
-                    outlier=True,
-                )
-                logger.debug("Processed pdu %s", e_id)
+        # for e_id, _ in pdu.auth_events:
+        #     exists = yield self._get_persisted_pdu(
+        #         origin,
+        #         e_id,
+        #         do_auth=False
+        #     )
+        #
+        #     if not exists:
+        #         try:
+        #             logger.debug(
+        #                 "_handle_new_pdu fetch missing auth event %s from %s",
+        #                 e_id,
+        #                 origin,
+        #             )
+        #
+        #             yield self.get_pdu(
+        #                 origin,
+        #                 event_id=e_id,
+        #                 outlier=True,
+        #             )
+        #
+        #             logger.debug("Processed pdu %s", e_id)
+        #         except:
+        #             logger.warn(
+        #                 "Failed to get auth event %s from %s",
+        #                 e_id,
+        #                 origin
+        #             )
 
         # Get missing pdus if necessary.
         if not pdu.outlier:
@@ -565,6 +601,11 @@ class ReplicationLayer(object):
                 pdu.room_id
             )
 
+            logger.debug(
+                "_handle_new_pdu min_depth for %s: %d",
+                pdu.room_id, min_depth
+            )
+
             if min_depth and pdu.depth > min_depth:
                 for event_id, hashes in pdu.prev_events:
                     exists = yield self._get_persisted_pdu(
@@ -574,11 +615,14 @@ class ReplicationLayer(object):
                     )
 
                     if not exists:
-                        logger.debug("Requesting pdu %s", event_id)
+                        logger.debug(
+                            "_handle_new_pdu requesting pdu %s",
+                            event_id
+                        )
 
                         try:
                             yield self.get_pdu(
-                                pdu.origin,
+                                origin,
                                 event_id=event_id,
                             )
                             logger.debug("Processed pdu %s", event_id)
@@ -588,12 +632,17 @@ class ReplicationLayer(object):
             else:
                 # We need to get the state at this event, since we have reached
                 # a backward extremity edge.
+                logger.debug(
+                    "_handle_new_pdu getting state for %s",
+                    pdu.room_id
+                )
                 state = yield self.get_state_for_context(
                     origin, pdu.room_id, pdu.event_id,
                 )
 
         if not backfilled:
             ret = yield self.handler.on_receive_pdu(
+                origin,
                 pdu,
                 backfilled=backfilled,
                 state=state,
@@ -804,7 +853,10 @@ class _TransactionQueue(object):
 
                 # Ensures we don't continue until all callbacks on that
                 # deferred have fired
-                yield deferred
+                try:
+                    yield deferred
+                except:
+                    pass
 
             logger.debug("TX [%s] Yielded to callbacks", destination)
 
@@ -816,7 +868,8 @@ class _TransactionQueue(object):
             logger.exception(e)
 
             for deferred in deferreds:
-                deferred.errback(e)
+                if not deferred.called:
+                    deferred.errback(e)
 
         finally:
             # We want to be *very* sure we delete this after we stop processing
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d53cd3df3e..15adc9dc2c 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -78,7 +78,7 @@ class BaseHandler(object):
 
         if not suppress_auth:
             logger.debug("Authing...")
-            self.auth.check(event, raises=True)
+            self.auth.check(event, auth_events=event.old_state_events)
             logger.debug("Authed")
         else:
             logger.debug("Suppressed auth.")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d59221a4fb..02202692d4 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler):
             if auth_user not in self._streams_per_user:
                 self._streams_per_user[auth_user] = 0
                 if auth_user in self._stop_timer_per_user:
-                    self.clock.cancel_call_later(
-                        self._stop_timer_per_user.pop(auth_user))
+                    try:
+                        self.clock.cancel_call_later(
+                            self._stop_timer_per_user.pop(auth_user)
+                        )
+                    except:
+                        logger.exception("Failed to cancel event timer")
                 else:
                     yield self.distributor.fire(
                         "started_user_eventstream", auth_user
@@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler):
                     logger.debug(
                         "_later stopped_user_eventstream %s", auth_user
                     )
+
+                    self._stop_timer_per_user.pop(auth_user, None)
+
                     yield self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
-                    del self._stop_timer_per_user[auth_user]
 
                 logger.debug("Scheduling _later: for %s", auth_user)
                 self._stop_timer_per_user[auth_user] = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f601de4488..925eb5376e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,13 +18,16 @@
 from ._base import BaseHandler
 
 from synapse.api.events.utils import prune_event
-from synapse.api.errors import AuthError, FederationError, SynapseError
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import (
+    AuthError, FederationError, SynapseError, StoreError,
+)
+from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent
 from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import (
-    compute_event_signature, check_event_content_hash
+    compute_event_signature, check_event_content_hash,
+    add_hashes_and_signatures,
 )
 from syutil.jsonutil import encode_canonical_json
 
@@ -98,7 +101,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, pdu, backfilled, state=None):
+    def on_receive_pdu(self, origin, pdu, backfilled, state=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
         """
@@ -109,7 +112,7 @@ class FederationHandler(BaseHandler):
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append(pdu)
+            self.room_queues[event.room_id].append((pdu, origin))
             return
 
         logger.debug("Processing event: %s", event.event_id)
@@ -141,15 +144,62 @@ class FederationHandler(BaseHandler):
             )
             event = redacted_event
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(
-            event,
-            old_state=state
+        logger.debug("Event: %s", event)
+
+        # FIXME (erikj): Awful hack to make the case where we are not currently
+        # in the room work
+        current_state = None
+        is_in_room = yield self.auth.check_host_in_room(
+            event.room_id,
+            self.server_name
         )
+        if not is_in_room and not event.outlier:
+            logger.debug("Got event for room we're not in.")
+
+            replication_layer = self.replication_layer
+            auth_chain = yield replication_layer.get_event_auth(
+                origin,
+                context=event.room_id,
+                event_id=event.event_id,
+            )
 
-        logger.debug("Event: %s", event)
+            for e in auth_chain:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
+
+            if not state:
+                state = yield replication_layer.get_state_for_context(
+                    origin,
+                    context=event.room_id,
+                    event_id=event.event_id,
+                )
+
+            current_state = state
+
+        if state:
+            for e in state:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e)
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
         try:
-            self.auth.check(event, raises=True)
+            yield self._handle_new_event(
+                event,
+                state=state,
+                backfilled=backfilled,
+                current_state=current_state,
+            )
         except AuthError as e:
             raise FederationError(
                 "ERROR",
@@ -158,43 +208,17 @@ class FederationHandler(BaseHandler):
                 affected=event.event_id,
             )
 
-        is_new_state = is_new_state and not backfilled
-
-        # TODO: Implement something in federation that allows us to
-        # respond to PDU.
-
-        yield self.store.persist_event(
-            event,
-            backfilled,
-            is_new_state=is_new_state
-        )
-
         room = yield self.store.get_room(event.room_id)
 
         if not room:
-            # Huh, let's try and get the current state
             try:
-                yield self.replication_layer.get_state_for_context(
-                    event.origin, event.room_id, event.event_id,
-                )
-
-                hosts = yield self.store.get_joined_hosts_for_room(
-                    event.room_id
-                )
-                if self.hs.hostname in hosts:
-                    try:
-                        yield self.store.store_room(
-                            room_id=event.room_id,
-                            room_creator_user_id="",
-                            is_public=False,
-                        )
-                    except:
-                        pass
-            except:
-                logger.exception(
-                    "Failed to get current state for room %s",
-                    event.room_id
+                yield self.store.store_room(
+                    room_id=event.room_id,
+                    room_creator_user_id="",
+                    is_public=False,
                 )
+            except StoreError:
+                logger.exception("Failed to store room.")
 
         if not backfilled:
             extra_users = []
@@ -255,11 +279,23 @@ class FederationHandler(BaseHandler):
             pdu=event
         )
 
+
+
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
     def on_event_auth(self, event_id):
         auth = yield self.store.get_auth_chain(event_id)
+
+        for event in auth:
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -276,6 +312,8 @@ class FederationHandler(BaseHandler):
         We suspend processing of any received events from this room until we
         have finished processing the join.
         """
+        logger.debug("Joining %s to %s", joinee, room_id)
+
         pdu = yield self.replication_layer.make_join(
             target_host,
             room_id,
@@ -298,19 +336,29 @@ class FederationHandler(BaseHandler):
 
         try:
             event.event_id = self.event_factory.create_event_id()
+            event.origin = self.hs.hostname
             event.content = content
 
-            state = yield self.replication_layer.send_join(
+            if not hasattr(event, "signatures"):
+                event.signatures = {}
+
+            add_hashes_and_signatures(
+                event,
+                self.hs.hostname,
+                self.hs.config.signing_key[0],
+            )
+
+            ret = yield self.replication_layer.send_join(
                 target_host,
                 event
             )
 
-            logger.debug("do_invite_join state: %s", state)
+            state = ret["state"]
+            auth_chain = ret["auth_chain"]
+            auth_chain.sort(key=lambda e: e.depth)
 
-            yield self.state_handler.annotate_event_with_state(
-                event,
-                old_state=state
-            )
+            logger.debug("do_invite_join auth_chain: %s", auth_chain)
+            logger.debug("do_invite_join state: %s", state)
 
             logger.debug("do_invite_join event: %s", event)
 
@@ -324,34 +372,50 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
+            for e in auth_chain:
+                e.outlier = True
+                try:
+                    yield self._handle_new_event(e, fetch_missing=False)
+                except:
+                    logger.exception(
+                        "Failed to parse auth event %s",
+                        e.event_id,
+                    )
+
             for e in state:
                 # FIXME: Auth these.
                 e.outlier = True
+                try:
+                    yield self._handle_new_event(
+                        e,
+                        fetch_missing=True
+                    )
+                except:
+                    logger.exception(
+                        "Failed to parse state event %s",
+                        e.event_id,
+                    )
 
-                yield self.state_handler.annotate_event_with_state(
-                    e,
-                )
-
-                yield self.store.persist_event(
-                    e,
-                    backfilled=False,
-                    is_new_state=True
-                )
-
-            yield self.store.persist_event(
+            yield self._handle_new_event(
                 event,
-                backfilled=False,
-                is_new_state=True
+                state=state,
+                current_state=state,
             )
+
+            yield self.notifier.on_new_room_event(
+                event, extra_users=[joinee]
+            )
+
+            logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p in room_queue:
+            for p, origin in room_queue:
                 try:
-                    yield self.on_receive_pdu(p, backfilled=False)
+                    self.on_receive_pdu(origin, p, backfilled=False)
                 except:
-                    pass
+                    logger.exception("Couldn't handle pdu")
 
         defer.returnValue(True)
 
@@ -375,7 +439,7 @@ class FederationHandler(BaseHandler):
 
         yield self.state_handler.annotate_event_with_state(event)
         yield self.auth.add_auth_events(event)
-        self.auth.check(event, raises=True)
+        self.auth.check(event, auth_events=event.old_state_events)
 
         pdu = event
 
@@ -391,17 +455,7 @@ class FederationHandler(BaseHandler):
 
         event.outlier = False
 
-        state_handler = self.state_handler
-        is_new_state = yield state_handler.annotate_event_with_state(event)
-        self.auth.check(event, raises=True)
-
-        # FIXME (erikj):  All this is duplicated above :(
-
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
+        yield self._handle_new_event(event)
 
         extra_users = []
         if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +468,7 @@ class FederationHandler(BaseHandler):
         )
 
         if event.type == RoomMemberEvent.TYPE:
-            if event.membership == Membership.JOIN:
+            if event.content["membership"] == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
@@ -508,7 +562,17 @@ class FederationHandler(BaseHandler):
                 else:
                     del results[(event.type, event.state_key)]
 
-            defer.returnValue(results.values())
+            res = results.values()
+            for event in res:
+                event.signatures.update(
+                    compute_event_signature(
+                        event,
+                        self.hs.hostname,
+                        self.hs.config.signing_key[0]
+                    )
+                )
+
+            defer.returnValue(res)
         else:
             defer.returnValue([])
 
@@ -541,6 +605,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
+            # FIXME: This is a temporary work around where we occasionally
+            # return events slightly differently than when they were
+            # originally signed
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
             if do_auth:
                 in_room = yield self.auth.check_host_in_room(
                     event.room_id,
@@ -565,3 +640,78 @@ class FederationHandler(BaseHandler):
         )
         while waiters:
             waiters.pop().callback(None)
+
+    @defer.inlineCallbacks
+    def _handle_new_event(self, event, state=None, backfilled=False,
+                          current_state=None, fetch_missing=True):
+        is_new_state = yield self.state_handler.annotate_event_with_state(
+            event,
+            old_state=state
+        )
+
+        if event.old_state_events:
+            known_ids = set(
+                [s.event_id for s in event.old_state_events.values()]
+            )
+            for e_id, _ in event.auth_events:
+                if e_id not in known_ids:
+                    e = yield self.store.get_event(
+                        e_id,
+                        allow_none=True,
+                    )
+
+                    if not e:
+                        # TODO: Do some conflict res to make sure that we're
+                        # not the ones who are wrong.
+                        logger.info(
+                            "Rejecting %s as %s not in %s",
+                            event.event_id, e_id, known_ids,
+                        )
+                        raise AuthError(403, "Auth events are stale")
+
+            auth_events = event.old_state_events
+        else:
+            # We need to get the auth events from somewhere.
+
+            # TODO: Don't just hit the DBs?
+
+            auth_events = {}
+            for e_id, _ in event.auth_events:
+                e = yield self.store.get_event(
+                    e_id,
+                    allow_none=True,
+                )
+
+                if not e:
+                    e = yield self.replication_layer.get_pdu(
+                        event.origin, e_id, outlier=True
+                    )
+
+                    if e and fetch_missing:
+                        try:
+                            yield self.on_receive_pdu(event.origin, e, False)
+                        except:
+                            logger.exception(
+                                "Failed to parse auth event %s",
+                                e_id,
+                            )
+
+                if not e:
+                    logger.warn("Can't find auth event %s.", e_id)
+
+                auth_events[(e.type, e.state_key)] = e
+
+            if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+                if len(event.prev_events) == 1:
+                    c = yield self.store.get_event(event.prev_events[0][0])
+                    if c.type == RoomCreateEvent.TYPE:
+                        auth_events[(c.type, c.state_key)] = c
+
+        self.auth.check(event, auth_events=auth_events)
+
+        yield self.store.persist_event(
+            event,
+            backfilled=backfilled,
+            is_new_state=(is_new_state and not backfilled),
+            current_state=current_state,
+        )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ae0fc43ca2..42dc4d46f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -306,7 +306,7 @@ class MessageHandler(BaseHandler):
         auth_user = self.hs.parse_userid(user_id)
 
         # TODO: These concurrently
-        state_tuples = yield self.store.get_current_state(room_id)
+        state_tuples = yield self.state_handler.get_current_state(room_id)
         state = [self.hs.serialize_event(x) for x in state_tuples]
 
         member_event = (yield self.store.get_room_member(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b55d589daf..84a039489f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -651,12 +651,13 @@ class PresenceHandler(BaseHandler):
             logger.debug("Incoming presence update from %s", user)
 
             observers = set(self._remote_recvmap.get(user, set()))
+            if observers:
+                logger.debug(" | %d interested local observers %r", len(observers), observers)
 
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
-
-            if not observers and not room_ids:
-                continue
+            if room_ids:
+                logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
 
             state = dict(push)
             del state["user_id"]
@@ -678,6 +679,10 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
+            if not observers and not room_ids:
+                logger.debug(" | no interested observers or room IDs")
+                continue
+
             self.push_update_to_clients(
                 observed_user=user,
                 users_to_push=observers,
@@ -799,6 +804,7 @@ class PresenceEventSource(object):
             )
 
     @defer.inlineCallbacks
+    @log_function
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
 
@@ -811,7 +817,8 @@ class PresenceEventSource(object):
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
             cached = cachemap[observed_user]
-            if not (from_key < cached.serial):
+
+            if cached.serial <= from_key:
                 continue
 
             if (yield self.is_visible(observer_user, observed_user)):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 88955160c5..a000b44036 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -519,7 +519,11 @@ class RoomMemberHandler(BaseHandler):
             user_id=user.to_string(), membership_list=membership_list
         )
 
-        defer.returnValue([r.room_id for r in rooms])
+        # For some reason the list of events contains duplicates
+        # TODO(paul): work out why because I really don't think it should
+        room_ids = set(r.room_id for r in rooms)
+
+        defer.returnValue(room_ids)
 
     @defer.inlineCallbacks
     def _do_local_membership_update(self, event, membership, snapshot,
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 4f6d039b61..3147d7a60b 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -148,7 +148,7 @@ class RoomStateEventRestServlet(RestServlet):
         content = _parse_json(request)
 
         event = self.event_factory.create_event(
-            etype=urllib.unquote(event_type),
+            etype=event_type,  # already urldecoded
             content=content,
             room_id=urllib.unquote(room_id),
             user_id=user.to_string(),
@@ -327,7 +327,9 @@ class RoomMessageListRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
         user = yield self.auth.get_user_by_req(request)
-        pagination_config = PaginationConfig.from_request(request)
+        pagination_config = PaginationConfig.from_request(request,
+            default_limit=10,
+        )
         with_feedback = "feedback" in request.args
         handler = self.handlers.message_handler
         msgs = yield handler.get_messages(
diff --git a/synapse/state.py b/synapse/state.py
index 1c999e4d79..430665f7ba 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -82,7 +82,7 @@ class StateHandler(object):
         if hasattr(event, "outlier") and event.outlier:
             event.state_group = None
             event.old_state_events = None
-            event.state_events = {}
+            event.state_events = None
             defer.returnValue(False)
             return
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f6811a8117..e4bdad98bb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 7
+SCHEMA_VERSION = 8
 
 
 class _RollbackButIsFineException(Exception):
@@ -97,7 +97,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, backfilled=False, is_new_state=True):
+    def persist_event(self, event, backfilled=False, is_new_state=True,
+                      current_state=None):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -113,6 +114,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 backfilled=backfilled,
                 stream_ordering=stream_ordering,
                 is_new_state=is_new_state,
+                current_state=current_state,
             )
         except _RollbackButIsFineException:
             pass
@@ -141,7 +143,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @log_function
     def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
-                           is_new_state=True):
+                           is_new_state=True, current_state=None):
         if event.type == RoomMemberEvent.TYPE:
             self._store_room_member_txn(txn, event)
         elif event.type == FeedbackEvent.TYPE:
@@ -210,8 +212,27 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self._store_state_groups_txn(txn, event)
 
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
         is_state = hasattr(event, "state_key") and event.state_key is not None
-        if is_new_state and is_state:
+        if is_state:
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -229,17 +250,18 @@ class DataStore(RoomMemberStore, RoomStore,
                 or_replace=True,
             )
 
-            self._simple_insert_txn(
-                txn,
-                "current_state_events",
-                {
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "type": event.type,
-                    "state_key": event.state_key,
-                },
-                or_replace=True,
-            )
+            if is_new_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
 
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
@@ -316,7 +338,12 @@ class DataStore(RoomMemberStore, RoomStore,
             txn, event.event_id, ref_alg, ref_hash_bytes
         )
 
-        self._update_min_depth_for_room_txn(txn, event.room_id, event.depth)
+        if not outlier:
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
 
     def _store_redaction(self, txn, event):
         txn.execute(
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c37df59d45..05b275663e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -218,7 +218,9 @@ class RoomMemberStore(SQLBaseStore):
             "ON m.event_id = c.event_id "
             "WHERE m.membership = 'join' "
             "AND (%(clause)s) "
-            "GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
+            # TODO(paul): We've got duplicate rows in the database somewhere
+            #   so we have to DISTINCT m.user_id here
+            "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
         ) % {"clause": user_list_clause}
 
         args = list(user_id_list)
diff --git a/synapse/storage/schema/delta/v8.sql b/synapse/storage/schema/delta/v8.sql
new file mode 100644
index 0000000000..daf6646ed5
--- /dev/null
+++ b/synapse/storage/schema/delta/v8.sql
@@ -0,0 +1,34 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE IF NOT EXISTS event_signatures_2 (
+    event_id TEXT,
+    signature_name TEXT,
+    key_id TEXT,
+    signature BLOB,
+    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+);
+
+INSERT INTO event_signatures_2 (event_id, signature_name, key_id, signature)
+SELECT event_id, signature_name, key_id, signature FROM event_signatures;
+
+DROP TABLE event_signatures;
+ALTER TABLE event_signatures_2 RENAME TO event_signatures;
+
+CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
+    event_id
+);
+
+PRAGMA user_version = 8;
\ No newline at end of file
diff --git a/synapse/storage/schema/event_signatures.sql b/synapse/storage/schema/event_signatures.sql
index 4efa8a3e63..b6b56b47a2 100644
--- a/synapse/storage/schema/event_signatures.sql
+++ b/synapse/storage/schema/event_signatures.sql
@@ -42,7 +42,7 @@ CREATE TABLE IF NOT EXISTS event_signatures (
     signature_name TEXT,
     key_id TEXT,
     signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, key_id)
+    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
 );
 
 CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 55ea567793..e0f44b3e59 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -87,7 +87,7 @@ class StateStore(SQLBaseStore):
         )
 
     def _store_state_groups_txn(self, txn, event):
-        if not event.state_events:
+        if event.state_events is None:
             return
 
         state_group = event.state_group
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b84735e61c..3405cb365e 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT *, (%(redacted)s) AS redacted FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? "
+            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
         ) % {
             "redacted": del_sql,
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 0317e78c08..2114c940e7 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -47,7 +47,8 @@ class PaginationConfig(object):
         self.limit = int(limit) if limit is not None else None
 
     @classmethod
-    def from_request(cls, request, raise_invalid_params=True):
+    def from_request(cls, request, raise_invalid_params=True,
+                     default_limit=None):
         def get_param(name, default=None):
             lst = request.args.get(name, [])
             if len(lst) > 1:
@@ -84,6 +85,9 @@ class PaginationConfig(object):
         if limit is not None and not limit.isdigit():
             raise SynapseError(400, "'limit' parameter must be an integer.")
 
+        if limit is None:
+            limit = default_limit
+
         try:
             return PaginationConfig(from_tok, to_tok, direction, limit)
         except: