summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
committerMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
commit87951d3891efb5bccedf72c12b3da0d6ab482253 (patch)
treede7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/handlers/message.py
parentif inviter_display_name == ""||None then default to inviter MXID (diff)
parentMerge pull request #3595 from matrix-org/erikj/use_deltas (diff)
downloadsynapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py748
1 files changed, 434 insertions, 314 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 24c9ffdb20..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,173 +13,185 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
+
+import six
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
+from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.replication.http.send_event import send_event_to_master
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
+from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
-from canonicaljson import encode_canonical_json
-
-import logging
-import random
-import ujson
-
 logger = logging.getLogger(__name__)
 
 
-class MessageHandler(BaseHandler):
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
+    """
 
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-        self.validator = EventValidator()
-
-        self.pagination_lock = ReadWriteLock()
-
-        # We arbitrarily limit concurrent event creation for a room to 5.
-        # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
-
-        self.action_generator = hs.get_action_generator()
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id):
-        event = yield self.store.get_event(event_id)
+    def get_room_data(self, user_id=None, room_id=None,
+                      event_type=None, state_key="", is_guest=False):
+        """ Get data from a room.
 
-        if event.room_id != room_id:
-            raise SynapseError(400, "Event is for wrong room.")
+        Args:
+            event : The room path event
+        Returns:
+            The path data content.
+        Raises:
+            SynapseError if something went wrong.
+        """
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
+            room_id, user_id
+        )
 
-        depth = event.depth
+        if membership == Membership.JOIN:
+            data = yield self.state.get_current_state(
+                room_id, event_type, state_key
+            )
+        elif membership == Membership.LEAVE:
+            key = (event_type, state_key)
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], [key]
+            )
+            data = room_state[membership_event_id].get(key)
 
-        with (yield self.pagination_lock.write(room_id)):
-            yield self.store.delete_old_state(room_id, depth)
+        defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
+    def get_state_events(self, user_id, room_id, is_guest=False):
+        """Retrieve all state events for a given room. If the user is
+        joined to the room then return the current state. If the user has
+        left the room return the state events from when they left.
 
         Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
+            user_id(str): The user requesting state events.
+            room_id(str): The room ID to get all state events from.
         Returns:
-            dict: Pagination API results
+            A list of dicts representing state events. [{}, {}, {}]
         """
-        user_id = requester.user.to_string()
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
+            room_id, user_id
+        )
 
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
+        if membership == Membership.JOIN:
+            room_state = yield self.state.get_current_state(room_id)
+        elif membership == Membership.LEAVE:
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], None
             )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
+            room_state = room_state[membership_event_id]
 
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
+        now = self.clock.time_msec()
+        defer.returnValue(
+            [serialize_event(c, now) for c in room_state.values()]
         )
 
-        source_config = pagin_config.get_source_config("room")
+    @defer.inlineCallbacks
+    def get_joined_members(self, requester, room_id):
+        """Get all the joined members in the room and their profile information.
+
+        If the user has left the room return the state events from when they left.
 
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
+        Args:
+            requester(Requester): The user requesting state events.
+            room_id(str): The room ID to get all state events from.
+        Returns:
+            A dict of user_id to profile info
+        """
+        user_id = requester.user.to_string()
+        if not requester.app_service:
+            # We check AS auth after fetching the room membership, as it
+            # requires us to pull out all joined members anyway.
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
+            if membership != Membership.JOIN:
+                raise NotImplementedError(
+                    "Getting joined members after leaving is not implemented"
+                )
 
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
+        # If this is an AS, double check that they are allowed to see the members.
+        # This can either be because the AS user is in the room or because there
+        # is a user in the room that the AS is "interested in"
+        if requester.app_service and user_id not in users_with_profile:
+            for uid in users_with_profile:
+                if requester.app_service.is_interested_in_user(uid):
+                    break
+            else:
+                # Loop fell through, AS has no interested users in room
+                raise AuthError(403, "Appservice not in room")
 
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
+        defer.returnValue({
+            user_id: {
+                "avatar_url": profile.avatar_url,
+                "display_name": profile.display_name,
+            }
+            for user_id, profile in iteritems(users_with_profile)
+        })
 
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
 
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
+class EventCreationHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.clock = hs.get_clock()
+        self.validator = EventValidator()
+        self.profile_handler = hs.get_profile_handler()
+        self.event_builder_factory = hs.get_event_builder_factory()
+        self.server_name = hs.hostname
+        self.ratelimiter = hs.get_ratelimiter()
+        self.notifier = hs.get_notifier()
+        self.config = hs.config
 
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
+        self.http_client = hs.get_simple_http_client()
 
-        time_now = self.clock.time_msec()
+        # This is only used to get at ratelimit function, and maybe_kick_guest_users
+        self.base_handler = BaseHandler(hs)
 
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
+        self.pusher_pool = hs.get_pusherpool()
 
-        defer.returnValue(chunk)
+        # We arbitrarily limit concurrent event creation for a room to 5.
+        # This is to stop us from diverging history *too* much.
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
+
+        self.action_generator = hs.get_action_generator()
+
+        self.spam_checker = hs.get_spam_checker()
+
+        if self.config.block_events_without_consent_error is not None:
+            self._consent_uri_builder = ConsentURIBuilder(self.config)
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_event_ids=None):
+                     prev_events_and_hashes=None):
         """
         Given a dict from a client, create a new event.
 
@@ -192,50 +205,143 @@ class MessageHandler(BaseHandler):
             event_dict (dict): An entire event
             token_id (str)
             txn_id (str)
-            prev_event_ids (list): The prev event ids to use when creating the event
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
 
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.hs.get_handlers().profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
+        if not is_exempt:
+            yield self.assert_accepted_privacy_policy(requester)
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            event, context = yield self._create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
+
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_events_and_hashes=prev_events_and_hashes,
+        )
 
         defer.returnValue((event, context))
 
+    def _is_exempt_from_privacy_policy(self, builder, requester):
+        """"Determine if an event to be sent is exempt from having to consent
+        to the privacy policy
+
+        Args:
+            builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
+
+        Returns:
+            Deferred[bool]: true if the event can be sent without the user
+                consenting
+        """
+        # the only thing the user can do is join the server notices room.
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            if membership == Membership.JOIN:
+                return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
+        return succeed(False)
+
+    @defer.inlineCallbacks
+    def _is_server_notices_room(self, room_id):
+        if self.config.server_notices_mxid is None:
+            defer.returnValue(False)
+        user_ids = yield self.store.get_users_in_room(room_id)
+        defer.returnValue(self.config.server_notices_mxid in user_ids)
+
+    @defer.inlineCallbacks
+    def assert_accepted_privacy_policy(self, requester):
+        """Check if a user has accepted the privacy policy
+
+        Called when the given user is about to do something that requires
+        privacy consent. We see if the user is exempt and otherwise check that
+        they have given consent. If they have not, a ConsentNotGiven error is
+        raised.
+
+        Args:
+            requester (synapse.types.Requester):
+                The user making the request
+
+        Returns:
+            Deferred[None]: returns normally if the user has consented or is
+                exempt
+
+        Raises:
+            ConsentNotGivenError: if the user has not given consent yet
+        """
+        if self.config.block_events_without_consent_error is None:
+            return
+
+        # exempt AS users from needing consent
+        if requester.app_service is not None:
+            return
+
+        user_id = requester.user.to_string()
+
+        # exempt the system notices user
+        if (
+            self.config.server_notices_mxid is not None and
+            user_id == self.config.server_notices_mxid
+        ):
+            return
+
+        u = yield self.store.get_user_by_id(user_id)
+        assert u is not None
+        if u["appservice_id"] is not None:
+            # users registered by an appservice are exempt
+            return
+        if u["consent_version"] == self.config.user_consent_version:
+            return
+
+        consent_uri = self._consent_uri_builder.build_user_consent_uri(
+            requester.user.localpart,
+        )
+        msg = self.config.block_events_without_consent_error % {
+            'consent_uri': consent_uri,
+        }
+        raise ConsentNotGivenError(
+            msg=msg,
+            consent_uri=consent_uri,
+        )
+
     @defer.inlineCallbacks
     def send_nonmember_event(self, requester, event, context, ratelimit=True):
         """
@@ -253,11 +359,6 @@ class MessageHandler(BaseHandler):
                 "Tried to send member event through non-member codepath"
             )
 
-        # We check here if we are currently being rate limited, so that we
-        # don't do unnecessary work. We check again just before we actually
-        # send the event.
-        yield self.ratelimit(requester, update=False)
-
         user = UserID.from_string(event.sender)
 
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
@@ -274,12 +375,6 @@ class MessageHandler(BaseHandler):
             ratelimit=ratelimit,
         )
 
-        if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(user)
-
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
         """
@@ -288,7 +383,8 @@ class MessageHandler(BaseHandler):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -313,145 +409,85 @@ class MessageHandler(BaseHandler):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
-        defer.returnValue(event)
 
-    @defer.inlineCallbacks
-    def get_room_data(self, user_id=None, room_id=None,
-                      event_type=None, state_key="", is_guest=False):
-        """ Get data from a room.
-
-        Args:
-            event : The room path event
-        Returns:
-            The path data content.
-        Raises:
-            SynapseError if something went wrong.
-        """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
-
-        if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
-                room_id, event_type, state_key
-            )
-        elif membership == Membership.LEAVE:
-            key = (event_type, state_key)
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], [key]
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
-            data = room_state[membership_event_id].get(key)
 
-        defer.returnValue(data)
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, string_types):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
 
-    @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
             )
+        defer.returnValue(event)
 
+    @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
-        """Retrieve all state events for a given room. If the user is
-        joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+    def create_new_client_event(self, builder, requester=None,
+                                prev_events_and_hashes=None):
+        """Create a new event for a local client
 
         Args:
-            user_id(str): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
+            builder (EventBuilder):
+
+            requester (synapse.types.Requester|None):
+
+            prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+                the forward extremities to use as the prev_events for the
+                new event. For each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+
+                If None, they will be requested from the database.
+
         Returns:
-            A list of dicts representing state events. [{}, {}, {}]
+            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+        if prev_events_and_hashes is not None:
+            assert len(prev_events_and_hashes) <= 10, \
+                "Attempting to create an event with %i prev_events" % (
+                    len(prev_events_and_hashes),
             )
-            room_state = room_state[membership_event_id]
-
-        now = self.clock.time_msec()
-        defer.returnValue(
-            [serialize_event(c, now) for c in room_state.values()]
-        )
-
-    @measure_func("_create_new_client_event")
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
         else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
-
-            # We want to limit the max number of prev events we point to in our
-            # new event
-            if len(latest_ret) > 10:
-                # Sort by reverse depth, so we point to the most recent.
-                latest_ret.sort(key=lambda a: -a[2])
-                new_latest_ret = latest_ret[:5]
-
-                # We also randomly point to some of the older events, to make
-                # sure that we don't completely ignore the older events.
-                if latest_ret[5:]:
-                    sample_size = min(5, len(latest_ret[5:]))
-                    new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
-                latest_ret = new_latest_ret
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
+            prev_events_and_hashes = \
+                yield self.store.get_prev_events_for_room(builder.room_id)
+
+        if prev_events_and_hashes:
+            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+            # we cap depth of generated events, to ensure that they are not
+            # rejected by other servers (and so that they can be persisted in
+            # the db)
+            depth = min(depth, MAX_DEPTH)
+        else:
+            depth = 1
 
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in prev_events_and_hashes
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
 
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
+        context = yield self.state.compute_event_context(builder)
         if requester:
             context.app_service = requester.app_service
 
@@ -470,8 +506,8 @@ class MessageHandler(BaseHandler):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -486,12 +522,21 @@ class MessageHandler(BaseHandler):
         event,
         context,
         ratelimit=True,
-        extra_users=[]
+        extra_users=[],
     ):
-        # We now need to go and hit out to wherever we need to hit out to.
+        """Processes a new event. This includes checking auth, persisting it,
+        notifying users, sending to remote servers, etc.
 
-        if ratelimit:
-            yield self.ratelimit(requester)
+        If called from a worker will hit out to the master process for final
+        processing.
+
+        Args:
+            requester (Requester)
+            event (FrozenEvent)
+            context (EventContext)
+            ratelimit (bool)
+            extra_users (list(UserID)): Any extra users to notify about event
+        """
 
         try:
             yield self.auth.check_from_context(event, context)
@@ -501,13 +546,72 @@ class MessageHandler(BaseHandler):
 
         # Ensure that we can round trip before trying to persist in db
         try:
-            dump = ujson.dumps(event.content)
-            ujson.loads(dump)
-        except:
+            dump = frozendict_json_encoder.encode(event.content)
+            json.loads(dump)
+        except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
-        yield self.maybe_kick_guest_users(event, context)
+        yield self.action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        try:
+            # If we're a worker we need to hit out to the master.
+            if self.config.worker_app:
+                yield send_event_to_master(
+                    clock=self.hs.get_clock(),
+                    store=self.store,
+                    client=self.http_client,
+                    host=self.config.worker_replication_host,
+                    port=self.config.worker_replication_http_port,
+                    requester=requester,
+                    event=event,
+                    context=context,
+                    ratelimit=ratelimit,
+                    extra_users=extra_users,
+                )
+                return
+
+            yield self.persist_and_notify_client_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+                extra_users=extra_users,
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area, if we calculated them.
+            tp, value, tb = sys.exc_info()
+
+            run_in_background(
+                self.store.remove_push_actions_from_staging,
+                event.event_id,
+            )
+
+            six.reraise(tp, value, tb)
+
+    @defer.inlineCallbacks
+    def persist_and_notify_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[],
+    ):
+        """Called when we have fully built the event, have already
+        calculated the push actions for the event, and checked auth.
+
+        This should only be run on master.
+        """
+        assert not self.config.worker_app
+
+        if ratelimit:
+            yield self.base_handler.ratelimit(requester)
+
+        yield self.base_handler.maybe_kick_guest_users(event, context)
 
         if event.type == EventTypes.CanonicalAlias:
             # Check the alias is acually valid (at this time at least)
@@ -535,9 +639,11 @@ class MessageHandler(BaseHandler):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in context.current_state_ids.iteritems()
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -551,7 +657,7 @@ class MessageHandler(BaseHandler):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in state_to_include.itervalues()
+                    for e in itervalues(state_to_include)
                 ]
 
                 invitee = UserID.from_string(event.state_key)
@@ -573,8 +679,9 @@ class MessageHandler(BaseHandler):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -594,15 +701,13 @@ class MessageHandler(BaseHandler):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
-
-        yield self.action_generator.handle_push_actions_for_event(
-            event, context
-        )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
@@ -610,16 +715,31 @@ class MessageHandler(BaseHandler):
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
-        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+        run_in_background(
+            self.pusher_pool.on_new_notifications,
             event_stream_id, max_stream_id
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
+            try:
+                self.notifier.on_new_room_event(
+                    event, event_stream_id, max_stream_id,
+                    extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room event")
+
+        run_in_background(_notify)
 
-        preserve_fn(_notify)()
+        if event.type == EventTypes.Message:
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            run_in_background(self._bump_active_time, requester.user)
+
+    @defer.inlineCallbacks
+    def _bump_active_time(self, user):
+        try:
+            presence = self.hs.get_presence_handler()
+            yield presence.bump_presence_active_time(user)
+        except Exception:
+            logger.exception("Error bumping presence active time")