summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py162
-rw-r--r--synapse/handlers/directory.py28
-rw-r--r--synapse/handlers/federation.py225
-rw-r--r--synapse/handlers/message.py36
-rw-r--r--synapse/handlers/presence.py38
-rw-r--r--synapse/handlers/profile.py35
-rw-r--r--synapse/handlers/register.py6
-rw-r--r--synapse/handlers/room.py185
-rw-r--r--synapse/handlers/typing.py4
9 files changed, 417 insertions, 302 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 15adc9dc2c..6b5f6e7cd8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,11 +15,12 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError
+from synapse.api.errors import LimitExceededError, SynapseError
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
-from synapse.api.events.room import RoomMemberEvent
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, EventTypes
+
+from synapse.events.snapshot import EventContext
 
 import logging
 
@@ -44,6 +45,8 @@ class BaseHandler(object):
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
 
+        self.event_builder_factory = hs.get_event_builder_factory()
+
     def ratelimit(self, user_id):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -57,62 +60,151 @@ class BaseHandler(object):
             )
 
     @defer.inlineCallbacks
-    def _on_new_room_event(self, event, snapshot, extra_destinations=[],
-                           extra_users=[], suppress_auth=False,
-                           do_invite_host=None):
+    def _create_new_client_event(self, builder):
         yield run_on_reactor()
 
-        snapshot.fill_out_prev_events(event)
+        context = EventContext()
+
+        latest_ret = yield self.store.get_latest_events_in_room(
+            builder.room_id,
+        )
+
+        if latest_ret:
+            depth = max([d for _, _, d in latest_ret]) + 1
+        else:
+            depth = 1
+
+        prev_events = [(e, h) for e, h, _ in latest_ret]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
 
-        yield self.state_handler.annotate_event_with_state(event)
+        state_handler = self.state_handler
+        ret = yield state_handler.annotate_context_with_state(
+            builder,
+            context,
+        )
+        prev_state = ret
 
-        yield self.auth.add_auth_events(event)
+        if builder.is_state():
+            builder.prev_state = prev_state
 
-        logger.debug("Signing event...")
+        yield self.auth.add_auth_events(builder, context)
 
         add_hashes_and_signatures(
-            event, self.server_name, self.signing_key
+            builder, self.server_name, self.signing_key
         )
 
-        logger.debug("Signed event.")
+        event = builder.build()
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(self, event, context, extra_destinations=[],
+                                extra_users=[], suppress_auth=False):
+        yield run_on_reactor()
+
+        # We now need to go and hit out to wherever we need to hit out to.
 
         if not suppress_auth:
-            logger.debug("Authing...")
-            self.auth.check(event, auth_events=event.old_state_events)
-            logger.debug("Authed")
-        else:
-            logger.debug("Suppressed auth.")
+            self.auth.check(event, auth_events=context.auth_events)
 
-        if do_invite_host:
-            federation_handler = self.hs.get_handlers().federation_handler
-            invite_event = yield federation_handler.send_invite(
-                do_invite_host,
-                event
-            )
+        yield self.store.persist_event(event, context=context)
 
-            # FIXME: We need to check if the remote changed anything else
-            event.signatures = invite_event.signatures
+        federation_handler = self.hs.get_handlers().federation_handler
 
-        yield self.store.persist_event(event)
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                invitee = self.hs.parse_userid(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
 
         destinations = set(extra_destinations)
-        # Send a PDU to all hosts who have joined the room.
-
-        for k, s in event.state_events.items():
+        for k, s in context.current_state.items():
             try:
-                if k[0] == RoomMemberEvent.TYPE:
+                if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
                             self.hs.parse_userid(s.state_key).domain
                         )
-            except:
+            except SynapseError:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        event.destinations = list(destinations)
-
         yield self.notifier.on_new_room_event(event, extra_users=extra_users)
 
-        federation_handler = self.hs.get_handlers().federation_handler
-        yield federation_handler.handle_new_event(event, snapshot)
+        yield federation_handler.handle_new_event(
+            event,
+            None,
+            destinations=destinations,
+        )
+
+    # @defer.inlineCallbacks
+    # def _on_new_room_event(self, event, snapshot, extra_destinations=[],
+    #                        extra_users=[], suppress_auth=False,
+    #                        do_invite_host=None):
+    #     yield run_on_reactor()
+    #
+    #     snapshot.fill_out_prev_events(event)
+    #
+    #     yield self.state_handler.annotate_event_with_state(event)
+    #
+    #     yield self.auth.add_auth_events(event)
+    #
+    #     logger.debug("Signing event...")
+    #
+    #     add_hashes_and_signatures(
+    #         event, self.server_name, self.signing_key
+    #     )
+    #
+    #     logger.debug("Signed event.")
+    #
+    #     if not suppress_auth:
+    #         logger.debug("Authing...")
+    #         self.auth.check(event, auth_events=event.old_state_events)
+    #         logger.debug("Authed")
+    #     else:
+    #         logger.debug("Suppressed auth.")
+    #
+    #     if do_invite_host:
+    #         federation_handler = self.hs.get_handlers().federation_handler
+    #         invite_event = yield federation_handler.send_invite(
+    #             do_invite_host,
+    #             event
+    #         )
+    #
+    #         # FIXME: We need to check if the remote changed anything else
+    #         event.signatures = invite_event.signatures
+    #
+    #     yield self.store.persist_event(event)
+    #
+    #     destinations = set(extra_destinations)
+    #     # Send a PDU to all hosts who have joined the room.
+    #
+    #     for k, s in event.state_events.items():
+    #         try:
+    #             if k[0] == RoomMemberEvent.TYPE:
+    #                 if s.content["membership"] == Membership.JOIN:
+    #                     destinations.add(
+    #                         self.hs.parse_userid(s.state_key).domain
+    #                     )
+    #         except:
+    #             logger.warn(
+    #                 "Failed to get destination from event %s", s.event_id
+    #             )
+    #
+    #     event.destinations = list(destinations)
+    #
+    #     yield self.notifier.on_new_room_event(event, extra_users=extra_users)
+    #
+    #     federation_handler = self.hs.get_handlers().federation_handler
+    #     yield federation_handler.handle_new_event(event, snapshot)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 3b37e49e6f..76fb897f20 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -40,7 +40,7 @@ class DirectoryHandler(BaseHandler):
 
         # TODO(erikj): Do auth.
 
-        if not room_alias.is_mine:
+        if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
             # TODO(erikj): Change this.
 
@@ -64,7 +64,7 @@ class DirectoryHandler(BaseHandler):
     def delete_association(self, user_id, room_alias):
         # TODO Check if server admin
 
-        if not room_alias.is_mine:
+        if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
         room_id = yield self.store.delete_room_alias(room_alias)
@@ -75,7 +75,7 @@ class DirectoryHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_association(self, room_alias):
         room_id = None
-        if room_alias.is_mine:
+        if self.hs.is_mine(room_alias):
             result = yield self.store.get_association_from_room_alias(
                 room_alias
             )
@@ -123,7 +123,7 @@ class DirectoryHandler(BaseHandler):
     @defer.inlineCallbacks
     def on_directory_query(self, args):
         room_alias = self.hs.parse_roomalias(args["room_alias"])
-        if not room_alias.is_mine:
+        if not self.hs.is_mine(room_alias):
             raise SynapseError(
                 400, "Room Alias is not hosted on this Home Server"
             )
@@ -148,16 +148,12 @@ class DirectoryHandler(BaseHandler):
     def send_room_alias_update_event(self, user_id, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
-        event = self.event_factory.create_event(
-            etype=RoomAliasesEvent.TYPE,
-            state_key=self.hs.hostname,
-            room_id=room_id,
-            user_id=user_id,
-            content={"aliases": aliases},
-        )
-
-        snapshot = yield self.store.snapshot_room(event)
+        msg_handler = self.hs.get_handlers().message_handler
+        yield msg_handler.handle_event({
+            "type": RoomAliasesEvent.TYPE,
+            "state_key": self.hs.hostname,
+            "room_id": room_id,
+            "sender": user_id,
+            "content": {"aliases": aliases},
+        })
 
-        yield self._on_new_room_event(
-            event, snapshot, extra_users=[user_id], suppress_auth=True
-        )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cfb5029774..e5deb8a9ef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,7 +17,8 @@
 
 from ._base import BaseHandler
 
-from synapse.api.events.utils import prune_event
+from synapse.events.snapshot import EventContext
+from synapse.events.utils import prune_event
 from synapse.api.errors import (
     AuthError, FederationError, SynapseError, StoreError,
 )
@@ -76,7 +77,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def handle_new_event(self, event, snapshot):
+    def handle_new_event(self, event, snapshot, destinations):
         """ Takes in an event from the client to server side, that has already
         been authed and handled by the state module, and sends it to any
         remote home servers that may be interested.
@@ -92,12 +93,7 @@ class FederationHandler(BaseHandler):
 
         yield run_on_reactor()
 
-        pdu = event
-
-        if not hasattr(pdu, "destinations") or not pdu.destinations:
-            pdu.destinations = []
-
-        yield self.replication_layer.send_pdu(pdu)
+        yield self.replication_layer.send_pdu(event, destinations)
 
     @log_function
     @defer.inlineCallbacks
@@ -153,7 +149,7 @@ class FederationHandler(BaseHandler):
             event.room_id,
             self.server_name
         )
-        if not is_in_room and not event.outlier:
+        if not is_in_room and not event.internal_metadata.outlier:
             logger.debug("Got event for room we're not in.")
 
             replication_layer = self.replication_layer
@@ -164,7 +160,7 @@ class FederationHandler(BaseHandler):
             )
 
             for e in auth_chain:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e, fetch_missing=False)
                 except:
@@ -184,7 +180,7 @@ class FederationHandler(BaseHandler):
 
         if state:
             for e in state:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e)
                 except:
@@ -265,11 +261,18 @@ class FederationHandler(BaseHandler):
             event = pdu
 
             # FIXME (erikj): Not sure this actually works :/
-            yield self.state_handler.annotate_event_with_state(event)
+            context = EventContext()
+            yield self.state_handler.annotate_context_with_state(event, context)
 
-            events.append(event)
+            events.append(
+                (event, context)
+            )
 
-            yield self.store.persist_event(event, backfilled=True)
+            yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=True
+            )
 
         defer.returnValue(events)
 
@@ -286,8 +289,6 @@ class FederationHandler(BaseHandler):
             pdu=event
         )
 
-
-
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
@@ -337,27 +338,33 @@ class FederationHandler(BaseHandler):
         assert(event.state_key == joinee)
         assert(event.room_id == room_id)
 
-        event.outlier = False
+        event.internal_metadata.outlier = False
 
         self.room_queues[room_id] = []
 
+        builder = self.event_builder_factory.new(
+            event.get_pdu_json()
+        )
+
         try:
-            event.event_id = self.event_factory.create_event_id()
-            event.origin = self.hs.hostname
-            event.content = content
+            builder.event_id = self.event_factory.create_event_id()
+            builder.origin = self.hs.hostname
+            builder.content = content
 
             if not hasattr(event, "signatures"):
-                event.signatures = {}
+                builder.signatures = {}
 
             add_hashes_and_signatures(
-                event,
+                builder,
                 self.hs.hostname,
                 self.hs.config.signing_key[0],
             )
 
+            new_event = builder.build()
+
             ret = yield self.replication_layer.send_join(
                 target_host,
-                event
+                new_event
             )
 
             state = ret["state"]
@@ -367,7 +374,7 @@ class FederationHandler(BaseHandler):
             logger.debug("do_invite_join auth_chain: %s", auth_chain)
             logger.debug("do_invite_join state: %s", state)
 
-            logger.debug("do_invite_join event: %s", event)
+            logger.debug("do_invite_join event: %s", new_event)
 
             try:
                 yield self.store.store_room(
@@ -380,7 +387,7 @@ class FederationHandler(BaseHandler):
                 pass
 
             for e in auth_chain:
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(e, fetch_missing=False)
                 except:
@@ -391,7 +398,7 @@ class FederationHandler(BaseHandler):
 
             for e in state:
                 # FIXME: Auth these.
-                e.outlier = True
+                e.internal_metadata.outlier = True
                 try:
                     yield self._handle_new_event(
                         e,
@@ -404,13 +411,13 @@ class FederationHandler(BaseHandler):
                     )
 
             yield self._handle_new_event(
-                event,
+                new_event,
                 state=state,
                 current_state=state,
             )
 
             yield self.notifier.on_new_room_event(
-                event, extra_users=[joinee]
+                new_event, extra_users=[joinee]
             )
 
             logger.debug("Finished joining %s to %s", joinee, room_id)
@@ -428,25 +435,24 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_make_join_request(self, context, user_id):
+    def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
         join event for the room and return that. We don *not* persist or
         process it until the other server has signed it and sent it back.
         """
-        event = self.event_factory.create_event(
-            etype=RoomMemberEvent.TYPE,
-            content={"membership": Membership.JOIN},
-            room_id=context,
-            user_id=user_id,
-            state_key=user_id,
-        )
+        builder = self.event_builder_factory.new({
+            "type": RoomMemberEvent.TYPE,
+            "content": {"membership": Membership.JOIN},
+            "room_id": room_id,
+            "sender": user_id,
+            "state_key": user_id,
+        })
 
-        snapshot = yield self.store.snapshot_room(event)
-        snapshot.fill_out_prev_events(event)
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
 
-        yield self.state_handler.annotate_event_with_state(event)
-        yield self.auth.add_auth_events(event)
-        self.auth.check(event, auth_events=event.old_state_events)
+        self.auth.check(event, auth_events=context.auth_events)
 
         pdu = event
 
@@ -460,9 +466,21 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
-        event.outlier = False
+        logger.debug(
+            "on_send_join_request: Got event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        event.internal_metadata.outlier = False
 
-        yield self._handle_new_event(event)
+        context = yield self._handle_new_event(event)
+
+        logger.debug(
+            "on_send_join_request: After _handle_new_event: %s, sigs: %s",
+            event.event_id,
+            event.signatures,
+        )
 
         extra_users = []
         if event.type == RoomMemberEvent.TYPE:
@@ -485,7 +503,7 @@ class FederationHandler(BaseHandler):
 
         destinations = set()
 
-        for k, s in event.state_events.items():
+        for k, s in context.current_state.items():
             try:
                 if k[0] == RoomMemberEvent.TYPE:
                     if s.content["membership"] == Membership.JOIN:
@@ -497,14 +515,18 @@ class FederationHandler(BaseHandler):
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        new_pdu.destinations = list(destinations)
+        logger.debug(
+            "on_send_join_request: Sending event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
 
-        yield self.replication_layer.send_pdu(new_pdu)
+        yield self.replication_layer.send_pdu(new_pdu, destinations)
 
         auth_chain = yield self.store.get_auth_chain(event.event_id)
 
         defer.returnValue({
-            "state": event.state_events.values(),
+            "state": context.current_state.values(),
             "auth_chain": auth_chain,
         })
 
@@ -516,7 +538,9 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
-        event.outlier = True
+        context = EventContext()
+
+        event.internal_metadata.outlier = True
 
         event.signatures.update(
             compute_event_signature(
@@ -526,10 +550,11 @@ class FederationHandler(BaseHandler):
             )
         )
 
-        yield self.state_handler.annotate_event_with_state(event)
+        yield self.state_handler.annotate_context_with_state(event, context)
 
         yield self.store.persist_event(
             event,
+            context=context,
             backfilled=False,
         )
 
@@ -651,74 +676,80 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _handle_new_event(self, event, state=None, backfilled=False,
                           current_state=None, fetch_missing=True):
-        is_new_state = yield self.state_handler.annotate_event_with_state(
+        context = EventContext()
+
+        logger.debug(
+            "_handle_new_event: Before annotate: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
+
+        yield self.state_handler.annotate_context_with_state(
             event,
+            context,
             old_state=state
         )
 
-        if event.old_state_events:
-            known_ids = set(
-                [s.event_id for s in event.old_state_events.values()]
-            )
-            for e_id, _ in event.auth_events:
-                if e_id not in known_ids:
-                    e = yield self.store.get_event(
-                        e_id,
-                        allow_none=True,
-                    )
-
-                    if not e:
-                        # TODO: Do some conflict res to make sure that we're
-                        # not the ones who are wrong.
-                        logger.info(
-                            "Rejecting %s as %s not in %s",
-                            event.event_id, e_id, known_ids,
-                        )
-                        raise AuthError(403, "Auth events are stale")
-
-            auth_events = event.old_state_events
-        else:
-            # We need to get the auth events from somewhere.
+        logger.debug(
+            "_handle_new_event: Before auth fetch: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
-            # TODO: Don't just hit the DBs?
+        is_new_state = not event.internal_metadata.is_outlier()
 
-            auth_events = {}
-            for e_id, _ in event.auth_events:
+        known_ids = set(
+            [s.event_id for s in context.auth_events.values()]
+        )
+        for e_id, _ in event.auth_events:
+            if e_id not in known_ids:
                 e = yield self.store.get_event(
-                    e_id,
-                    allow_none=True,
+                    e_id, allow_none=True,
                 )
 
                 if not e:
-                    e = yield self.replication_layer.get_pdu(
-                        event.origin, e_id, outlier=True
+                    # TODO: Do some conflict res to make sure that we're
+                    # not the ones who are wrong.
+                    logger.info(
+                        "Rejecting %s as %s not in db or %s",
+                        event.event_id, e_id, known_ids,
                     )
+                    raise AuthError(403, "Auth events are stale")
 
-                    if e and fetch_missing:
-                        try:
-                            yield self.on_receive_pdu(event.origin, e, False)
-                        except:
-                            logger.exception(
-                                "Failed to parse auth event %s",
-                                e_id,
-                            )
+                context.auth_events[(e.type, e.state_key)] = e
 
-                if not e:
-                    logger.warn("Can't find auth event %s.", e_id)
+        logger.debug(
+            "_handle_new_event: Before hack: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
+
+        if event.type == RoomMemberEvent.TYPE and not event.auth_events:
+            if len(event.prev_events) == 1:
+                c = yield self.store.get_event(event.prev_events[0][0])
+                if c.type == RoomCreateEvent.TYPE:
+                    context.auth_events[(c.type, c.state_key)] = c
 
-                auth_events[(e.type, e.state_key)] = e
+        logger.debug(
+            "_handle_new_event: Before auth check: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
-            if event.type == RoomMemberEvent.TYPE and not event.auth_events:
-                if len(event.prev_events) == 1:
-                    c = yield self.store.get_event(event.prev_events[0][0])
-                    if c.type == RoomCreateEvent.TYPE:
-                        auth_events[(c.type, c.state_key)] = c
+        self.auth.check(event, auth_events=context.auth_events)
 
-        self.auth.check(event, auth_events=auth_events)
+        logger.debug(
+            "_handle_new_event: Before persist_event: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
 
         yield self.store.persist_event(
             event,
+            context=context,
             backfilled=backfilled,
             is_new_state=(is_new_state and not backfilled),
             current_state=current_state,
         )
+
+        logger.debug(
+            "_handle_new_event: After persist_event: %s, sigs: %s",
+            event.event_id, event.signatures,
+        )
+
+        defer.returnValue(context)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 42dc4d46f3..13fa0be7b4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import RoomError
 from synapse.streams.config import PaginationConfig
 from synapse.util.logcontext import PreserveLoggingContext
@@ -79,7 +79,7 @@ class MessageHandler(BaseHandler):
         self.ratelimit(event.user_id)
         # TODO(paul): Why does 'event' not have a 'user' object?
         user = self.hs.parse_userid(event.user_id)
-        assert user.is_mine, "User must be our own: %s" % (user,)
+        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
         snapshot = yield self.store.snapshot_room(event)
 
@@ -134,6 +134,38 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
+    def handle_event(self, event_dict):
+        builder = self.event_builder_factory.new(event_dict)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            if membership == Membership.JOIN:
+                joinee = self.hs.parse_userid(builder.state_key)
+                # If event doesn't include a display name, add one.
+                yield self.distributor.fire(
+                    "collect_presencelike_data",
+                    joinee,
+                    builder.content
+                )
+
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
+
+        # TODO: self.validator.validate(event)
+
+        if event.type == EventTypes.Member:
+            member_handler = self.hs.get_handlers().room_member_handler
+            yield member_handler.change_membership(event, context)
+        else:
+            yield self.handle_new_client_event(
+                event=event,
+                context=context,
+            )
+
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
     def store_room_data(self, event=None):
         """ Stores data for a room.
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 84a039489f..f3abfecdee 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -147,7 +147,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
-        assert(observed_user.is_mine)
+        assert(self.hs.is_mine(observed_user))
 
         if observer_user == observed_user:
             defer.returnValue(True)
@@ -165,7 +165,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_state(self, target_user, auth_user, as_event=False):
-        if target_user.is_mine:
+        if self.hs.is_mine(target_user):
             visible = yield self.is_presence_visible(
                 observer_user=auth_user,
                 observed_user=target_user
@@ -212,7 +212,7 @@ class PresenceHandler(BaseHandler):
         # TODO (erikj): Turn this back on. Why did we end up sending EDUs
         # everywhere?
 
-        if not target_user.is_mine:
+        if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -291,7 +291,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-        if user.is_mine:
+        if self.hs.is_mine(user):
             statuscache = self._get_or_make_usercache(user)
 
             # No actual update but we need to bump the serial anyway for the
@@ -309,7 +309,7 @@ class PresenceHandler(BaseHandler):
         rm_handler = self.homeserver.get_handlers().room_member_handler
         curr_users = yield rm_handler.get_room_members(room_id)
 
-        for local_user in [c for c in curr_users if c.is_mine]:
+        for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
             self.push_update_to_local_and_remote(
                 observed_user=local_user,
                 users_to_push=[user],
@@ -318,14 +318,14 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def send_invite(self, observer_user, observed_user):
-        if not observer_user.is_mine:
+        if not self.hs.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.add_presence_list_pending(
             observer_user.localpart, observed_user.to_string()
         )
 
-        if observed_user.is_mine:
+        if self.hs.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
             yield self.federation.send_edu(
@@ -339,7 +339,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _should_accept_invite(self, observed_user, observer_user):
-        if not observed_user.is_mine:
+        if not self.hs.is_mine(observed_user):
             defer.returnValue(False)
 
         row = yield self.store.has_presence_state(observed_user.localpart)
@@ -359,7 +359,7 @@ class PresenceHandler(BaseHandler):
                 observed_user.localpart, observer_user.to_string()
             )
 
-        if observer_user.is_mine:
+        if self.hs.is_mine(observer_user):
             if accept:
                 yield self.accept_presence(observed_user, observer_user)
             else:
@@ -396,7 +396,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def drop(self, observed_user, observer_user):
-        if not observer_user.is_mine:
+        if not self.hs.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.del_presence_list(
@@ -410,7 +410,7 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
-        if not observer_user.is_mine:
+        if not self.hs.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         presence = yield self.store.get_presence_list(
@@ -465,7 +465,7 @@ class PresenceHandler(BaseHandler):
         )
 
         for target_user in target_users:
-            if target_user.is_mine:
+            if self.hs.is_mine(target_user):
                 self._start_polling_local(user, target_user)
 
                 # We want to tell the person that just came online
@@ -477,7 +477,7 @@ class PresenceHandler(BaseHandler):
                 )
 
         deferreds = []
-        remote_users = [u for u in target_users if not u.is_mine]
+        remote_users = [u for u in target_users if not self.hs.is_mine(u)]
         remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
         # Only poll for people in our get_presence_list
         for domain in remoteusers_by_domain:
@@ -520,7 +520,7 @@ class PresenceHandler(BaseHandler):
     def stop_polling_presence(self, user, target_user=None):
         logger.debug("Stop polling for presence from %s", user)
 
-        if not target_user or target_user.is_mine:
+        if not target_user or self.hs.is_mine(target_user):
             self._stop_polling_local(user, target_user=target_user)
 
         deferreds = []
@@ -579,7 +579,7 @@ class PresenceHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def push_presence(self, user, statuscache):
-        assert(user.is_mine)
+        assert(self.hs.is_mine(user))
 
         logger.debug("Pushing presence update from %s", user)
 
@@ -696,7 +696,7 @@ class PresenceHandler(BaseHandler):
         for poll in content.get("poll", []):
             user = self.hs.parse_userid(poll)
 
-            if not user.is_mine:
+            if not self.hs.is_mine(user):
                 continue
 
             # TODO(paul) permissions checks
@@ -711,7 +711,7 @@ class PresenceHandler(BaseHandler):
         for unpoll in content.get("unpoll", []):
             user = self.hs.parse_userid(unpoll)
 
-            if not user.is_mine:
+            if not self.hs.is_mine(user):
                 continue
 
             if user in self._remote_sendmap:
@@ -730,7 +730,7 @@ class PresenceHandler(BaseHandler):
 
         localusers, remoteusers = partitionbool(
             users_to_push,
-            lambda u: u.is_mine
+            lambda u: self.hs.is_mine(u)
         )
 
         localusers = set(localusers)
@@ -788,7 +788,7 @@ class PresenceEventSource(object):
                 [u.to_string() for u in observer_user, observed_user])):
             defer.returnValue(True)
 
-        if observed_user.is_mine:
+        if self.hs.is_mine(observed_user):
             pushmap = presence._local_pushmap
 
             defer.returnValue(
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 814b3b68fe..18fd0914e2 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -51,7 +51,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_displayname(self, target_user):
-        if target_user.is_mine:
+        if self.hs.is_mine(target_user):
             displayname = yield self.store.get_profile_displayname(
                 target_user.localpart
             )
@@ -81,7 +81,7 @@ class ProfileHandler(BaseHandler):
     def set_displayname(self, target_user, auth_user, new_displayname):
         """target_user is the user whose displayname is to be changed;
         auth_user is the user attempting to make this change."""
-        if not target_user.is_mine:
+        if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -101,7 +101,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
-        if target_user.is_mine:
+        if self.hs.is_mine(target_user):
             avatar_url = yield self.store.get_profile_avatar_url(
                 target_user.localpart
             )
@@ -130,7 +130,7 @@ class ProfileHandler(BaseHandler):
     def set_avatar_url(self, target_user, auth_user, new_avatar_url):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
-        if not target_user.is_mine:
+        if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -150,7 +150,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def collect_presencelike_data(self, user, state):
-        if not user.is_mine:
+        if not self.hs.is_mine(user):
             defer.returnValue(None)
 
         with PreserveLoggingContext():
@@ -170,7 +170,7 @@ class ProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def on_profile_query(self, args):
         user = self.hs.parse_userid(args["user_id"])
-        if not user.is_mine:
+        if not self.hs.is_mine(user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         just_field = args.get("field", None)
@@ -191,7 +191,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _update_join_states(self, user):
-        if not user.is_mine:
+        if not self.hs.is_mine(user):
             return
 
         joins = yield self.store.get_rooms_for_user_where_membership_is(
@@ -200,8 +200,6 @@ class ProfileHandler(BaseHandler):
         )
 
         for j in joins:
-            snapshot = yield self.store.snapshot_room(j)
-
             content = {
                 "membership": j.content["membership"],
             }
@@ -210,14 +208,11 @@ class ProfileHandler(BaseHandler):
                 "collect_presencelike_data", user, content
             )
 
-            new_event = self.event_factory.create_event(
-                etype=j.type,
-                room_id=j.room_id,
-                state_key=j.state_key,
-                content=content,
-                user_id=j.state_key,
-            )
-
-            yield self._on_new_room_event(
-                new_event, snapshot, suppress_auth=True
-            )
+            msg_handler = self.hs.get_handlers().message_handler
+            yield msg_handler.handle_event({
+                "type": j.type,
+                "room_id": j.room_id,
+                "state_key": j.state_key,
+                "content": content,
+                "sender": j.state_key,
+            })
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 48c326ebf0..15d8716455 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -22,6 +22,7 @@ from synapse.api.errors import (
 )
 from ._base import BaseHandler
 import synapse.util.stringutils as stringutils
+from synapse.util.async import run_on_reactor
 from synapse.http.client import SimpleHttpClient
 from synapse.http.client import CaptchaServerHttpClient
 
@@ -54,12 +55,13 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
+        yield run_on_reactor()
         password_hash = None
         if password:
             password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
 
         if localpart:
-            user = UserID(localpart, self.hs.hostname, True)
+            user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
             token = self._generate_token(user_id)
@@ -78,7 +80,7 @@ class RegistrationHandler(BaseHandler):
             while not user_id and not token:
                 try:
                     localpart = self._generate_user_id()
-                    user = UserID(localpart, self.hs.hostname, True)
+                    user = UserID(localpart, self.hs.hostname)
                     user_id = user.to_string()
 
                     token = self._generate_token(user_id)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a000b44036..93732a9c87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -52,9 +52,9 @@ class RoomCreationHandler(BaseHandler):
         self.ratelimit(user_id)
 
         if "room_alias_name" in config:
-            room_alias = RoomAlias.create_local(
+            room_alias = RoomAlias.create(
                 config["room_alias_name"],
-                self.hs
+                self.hs.hostname,
             )
             mapping = yield self.store.get_association_from_room_alias(
                 room_alias
@@ -77,7 +77,7 @@ class RoomCreationHandler(BaseHandler):
         if room_id:
             # Ensure room_id is the correct type
             room_id_obj = RoomID.from_string(room_id, self.hs)
-            if not room_id_obj.is_mine:
+            if not self.hs.is_mine(room_id_obj):
                 raise SynapseError(400, "Room id must be local")
 
             yield self.store.store_room(
@@ -93,7 +93,10 @@ class RoomCreationHandler(BaseHandler):
             while attempts < 5:
                 try:
                     random_string = stringutils.random_string(18)
-                    gen_room_id = RoomID.create_local(random_string, self.hs)
+                    gen_room_id = RoomID.create(
+                        random_string,
+                        self.hs.hostname,
+                    )
                     yield self.store.store_room(
                         room_id=gen_room_id.to_string(),
                         room_creator_user_id=user_id,
@@ -120,59 +123,37 @@ class RoomCreationHandler(BaseHandler):
             user, room_id, is_public=is_public
         )
 
-        room_member_handler = self.hs.get_handlers().room_member_handler
-
-        @defer.inlineCallbacks
-        def handle_event(event):
-            snapshot = yield self.store.snapshot_room(event)
-
-            logger.debug("Event: %s", event)
-
-            if event.type == RoomMemberEvent.TYPE:
-                yield room_member_handler.change_membership(
-                    event,
-                    do_auth=True
-                )
-            else:
-                yield self._on_new_room_event(
-                    event, snapshot, extra_users=[user], suppress_auth=True
-                )
+        msg_handler = self.hs.get_handlers().message_handler
 
         for event in creation_events:
-            yield handle_event(event)
+            yield msg_handler.handle_event(event)
 
         if "name" in config:
             name = config["name"]
-            name_event = self.event_factory.create_event(
-                etype=RoomNameEvent.TYPE,
-                room_id=room_id,
-                user_id=user_id,
-                content={"name": name},
-            )
-
-            yield handle_event(name_event)
+            yield msg_handler.handle_event({
+                "type": RoomNameEvent.TYPE,
+                "room_id": room_id,
+                "sender": user_id,
+                "content": {"name": name},
+            })
 
         if "topic" in config:
             topic = config["topic"]
-            topic_event = self.event_factory.create_event(
-                etype=RoomTopicEvent.TYPE,
-                room_id=room_id,
-                user_id=user_id,
-                content={"topic": topic},
-            )
+            yield msg_handler.handle_event({
+                "type": RoomTopicEvent.TYPE,
+                "room_id": room_id,
+                "sender": user_id,
+                "content": {"topic": topic},
+            })
 
-            yield handle_event(topic_event)
-
-        content = {"membership": Membership.INVITE}
         for invitee in invite_list:
-            invite_event = self.event_factory.create_event(
-                etype=RoomMemberEvent.TYPE,
-                state_key=invitee,
-                room_id=room_id,
-                user_id=user_id,
-                content=content
-            )
-            yield handle_event(invite_event)
+            yield msg_handler.handle_event({
+                "type": RoomMemberEvent.TYPE,
+                "state_key": invitee,
+                "room_id": room_id,
+                "user_id": user_id,
+                "content": {"membership": Membership.INVITE},
+            })
 
         result = {"room_id": room_id}
 
@@ -189,31 +170,35 @@ class RoomCreationHandler(BaseHandler):
 
         event_keys = {
             "room_id": room_id,
-            "user_id": creator_id,
+            "sender": creator_id,
+            "state_key": "",
         }
 
-        def create(etype, **content):
-            return self.event_factory.create_event(
-                etype=etype,
-                content=content,
-                **event_keys
-            )
+        def create(etype, content, **kwargs):
+            e = {
+                "type": etype,
+                "content": content,
+            }
+
+            e.update(event_keys)
+            e.update(kwargs)
+
+            return e
 
         creation_event = create(
             etype=RoomCreateEvent.TYPE,
-            creator=creator.to_string(),
+            content={"creator": creator.to_string()},
         )
 
-        join_event = self.event_factory.create_event(
+        join_event = create(
             etype=RoomMemberEvent.TYPE,
             state_key=creator_id,
             content={
                 "membership": Membership.JOIN,
             },
-            **event_keys
         )
 
-        power_levels_event = self.event_factory.create_event(
+        power_levels_event = create(
             etype=RoomPowerLevelsEvent.TYPE,
             content={
                 "users": {
@@ -230,13 +215,12 @@ class RoomCreationHandler(BaseHandler):
                 "kick": 50,
                 "redact": 50
             },
-            **event_keys
         )
 
         join_rule = JoinRules.PUBLIC if is_public else JoinRules.INVITE
         join_rules_event = create(
             etype=RoomJoinRulesEvent.TYPE,
-            join_rule=join_rule,
+            content={"join_rule": join_rule},
         )
 
         return [
@@ -287,7 +271,7 @@ class RoomMemberHandler(BaseHandler):
             if ignore_user is not None and member == ignore_user:
                 continue
 
-            if member.is_mine:
+            if self.hs.is_mine(member):
                 if localusers is not None:
                     localusers.add(member)
             else:
@@ -348,7 +332,7 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(member)
 
     @defer.inlineCallbacks
-    def change_membership(self, event=None, do_auth=True):
+    def change_membership(self, event, context, do_auth=True):
         """ Change the membership status of a user in a room.
 
         Args:
@@ -358,9 +342,7 @@ class RoomMemberHandler(BaseHandler):
         """
         target_user_id = event.state_key
 
-        snapshot = yield self.store.snapshot_room(event)
-
-        ## TODO(markjh): get prev state from snapshot.
+        # TODO(markjh): get prev state from snapshot.
         prev_state = yield self.store.get_room_member(
             target_user_id, event.room_id
         )
@@ -371,10 +353,11 @@ class RoomMemberHandler(BaseHandler):
         # if this HS is not currently in the room, i.e. we have to do the
         # invite/join dance.
         if event.membership == Membership.JOIN:
-            yield self._do_join(event, snapshot, do_auth=do_auth)
+            yield self._do_join(event, context, do_auth=do_auth)
         else:
             # This is not a JOIN, so we can handle it normally.
 
+            # FIXME: This isn't idempotency.
             if prev_state and prev_state.membership == event.membership:
                 # double same action, treat this event as a NOOP.
                 defer.returnValue({})
@@ -383,7 +366,7 @@ class RoomMemberHandler(BaseHandler):
             yield self._do_local_membership_update(
                 event,
                 membership=event.content["membership"],
-                snapshot=snapshot,
+                context=context,
                 do_auth=do_auth,
             )
 
@@ -405,32 +388,26 @@ class RoomMemberHandler(BaseHandler):
         host = hosts[0]
 
         content.update({"membership": Membership.JOIN})
-        new_event = self.event_factory.create_event(
-            etype=RoomMemberEvent.TYPE,
-            state_key=joinee.to_string(),
-            room_id=room_id,
-            user_id=joinee.to_string(),
-            membership=Membership.JOIN,
-            content=content,
-        )
-
-        snapshot = yield self.store.snapshot_room(new_event)
+        builder = self.event_builder_factory.new({
+            "type": RoomMemberEvent.TYPE,
+            "state_key": joinee.to_string(),
+            "room_id": room_id,
+            "sender": joinee.to_string(),
+            "membership": Membership.JOIN,
+            "content": content,
+        })
+        event, context = yield self._create_new_client_event(builder)
 
-        yield self._do_join(new_event, snapshot, room_host=host, do_auth=True)
+        yield self._do_join(event, context, room_host=host, do_auth=True)
 
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, snapshot, room_host=None, do_auth=True):
+    def _do_join(self, event, context, room_host=None, do_auth=True):
         joinee = self.hs.parse_userid(event.state_key)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
 
-        # If event doesn't include a display name, add one.
-        yield self.distributor.fire(
-            "collect_presencelike_data", joinee, event.content
-        )
-
         # XXX: We don't do an auth check if we are doing an invite
         # join dance for now, since we're kinda implicitly checking
         # that we are allowed to join when we decide whether or not we
@@ -452,31 +429,29 @@ class RoomMemberHandler(BaseHandler):
             )
 
             if prev_state and prev_state.membership == Membership.INVITE:
-                room = yield self.store.get_room(room_id)
-                inviter = UserID.from_string(
-                    prev_state.user_id, self.hs
-                )
+                inviter = UserID.from_string(prev_state.user_id)
 
-                should_do_dance = not inviter.is_mine and not room
+                should_do_dance = not self.hs.is_mine(inviter)
                 room_host = inviter.domain
             else:
                 should_do_dance = False
 
-        have_joined = False
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
-            have_joined = yield handler.do_invite_join(
-                room_host, room_id, event.user_id, event.content, snapshot
+            yield handler.do_invite_join(
+                room_host,
+                room_id,
+                event.user_id,
+                event.get_dict()["content"],  # FIXME To get a non-frozen dict
+                context
             )
-
-        # We want to do the _do_update inside the room lock.
-        if not have_joined:
+        else:
             logger.debug("Doing normal join")
 
             yield self._do_local_membership_update(
                 event,
                 membership=event.content["membership"],
-                snapshot=snapshot,
+                context=context,
                 do_auth=do_auth,
             )
 
@@ -504,7 +479,7 @@ class RoomMemberHandler(BaseHandler):
                 prev_state.sender, self.hs
             )
 
-            is_remote_invite_join = not inviter.is_mine and not room
+            is_remote_invite_join = not self.hs.is_mine(inviter) and not room
             room_host = inviter.domain
         else:
             is_remote_invite_join = False
@@ -526,25 +501,17 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(room_ids)
 
     @defer.inlineCallbacks
-    def _do_local_membership_update(self, event, membership, snapshot,
+    def _do_local_membership_update(self, event, membership, context,
                                     do_auth):
         yield run_on_reactor()
 
-        # If we're inviting someone, then we should also send it to that
-        # HS.
-        target_user_id = event.state_key
-        target_user = self.hs.parse_userid(target_user_id)
-        if membership == Membership.INVITE and not target_user.is_mine:
-            do_invite_host = target_user.domain
-        else:
-            do_invite_host = None
+        target_user = self.hs.parse_userid(event.state_key)
 
-        yield self._on_new_room_event(
+        yield self.handle_new_client_event(
             event,
-            snapshot,
+            context,
             extra_users=[target_user],
             suppress_auth=(not do_auth),
-            do_invite_host=do_invite_host,
         )
 
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d88a53242c..be67fb2fc2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -47,7 +47,7 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
-        if not target_user.is_mine:
+        if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user:
@@ -72,7 +72,7 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def stopped_typing(self, target_user, auth_user, room_id):
-        if not target_user.is_mine:
+        if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         if target_user != auth_user: