summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-24 11:46:09 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-24 11:46:09 +0100
commit30bfed5aa50317a7d05677c8de250d7d925938ac (patch)
tree256f995fef7eed858fff71920f706b866e7ac5e1
parentMerge remote-tracking branch 'origin/develop' into rav/remove_who_forgot_in_room (diff)
parentMerge pull request #3591 from matrix-org/rav/logcontext_fixes (diff)
downloadsynapse-30bfed5aa50317a7d05677c8de250d7d925938ac.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/remove_who_forgot_in_room
-rw-r--r--changelog.d/3555.feature1
-rw-r--r--changelog.d/3584.misc1
-rw-r--r--changelog.d/3590.misc1
-rw-r--r--changelog.d/3591.misc1
-rw-r--r--docs/workers.rst4
-rw-r--r--synapse/api/auth.py34
-rw-r--r--synapse/app/client_reader.py14
-rw-r--r--synapse/events/snapshot.py14
-rw-r--r--synapse/handlers/__init__.py4
-rw-r--r--synapse/handlers/appservice.py5
-rw-r--r--synapse/handlers/initial_sync.py38
-rw-r--r--synapse/handlers/message.py271
-rw-r--r--synapse/handlers/pagination.py265
-rw-r--r--synapse/handlers/room.py6
-rw-r--r--synapse/rest/client/v1/admin.py8
-rw-r--r--synapse/rest/client/v1/room.py24
-rw-r--r--synapse/server.py17
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/pusher.py2
19 files changed, 437 insertions, 317 deletions
diff --git a/changelog.d/3555.feature b/changelog.d/3555.feature
new file mode 100644
index 0000000000..ea4a85e0ae
--- /dev/null
+++ b/changelog.d/3555.feature
@@ -0,0 +1 @@
+Add support for client_reader to handle more APIs
diff --git a/changelog.d/3584.misc b/changelog.d/3584.misc
new file mode 100644
index 0000000000..2374dc0c44
--- /dev/null
+++ b/changelog.d/3584.misc
@@ -0,0 +1 @@
+Lazily load state on master process when using workers to reduce DB consumption
diff --git a/changelog.d/3590.misc b/changelog.d/3590.misc
new file mode 100644
index 0000000000..0f1688fd0f
--- /dev/null
+++ b/changelog.d/3590.misc
@@ -0,0 +1 @@
+Add some measure blocks to persist_events
diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc
new file mode 100644
index 0000000000..f0137766a0
--- /dev/null
+++ b/changelog.d/3591.misc
@@ -0,0 +1 @@
+Fix some random logcontext leaks.
\ No newline at end of file
diff --git a/docs/workers.rst b/docs/workers.rst
index 1d521b9ec5..c5b37c3ded 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
 following regular expressions::
 
     ^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
 
 ``synapse.app.user_dir``
 ~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 535bdb449d..073229b4c4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -739,3 +739,37 @@ class Auth(object):
                 )
 
             return query_params[0]
+
+    @defer.inlineCallbacks
+    def check_in_room_or_world_readable(self, room_id, user_id):
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
+
+        Returns:
+            Deferred[tuple[str, str|None]]: Resolves to the current membership of
+            the user in the room and the membership event ID of the user. If
+            the user is not in the room and never has been, then
+            `(Membership.JOIN, None)` is returned.
+        """
+
+        try:
+            # check_user_was_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            member_event = yield self.check_user_was_in_room(room_id, user_id)
+            defer.returnValue((member_event.membership, member_event.event_id))
+        except AuthError:
+            visibility = yield self.state.get_current_state(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility and
+                visibility.content["history_visibility"] == "world_readable"
+            ):
+                defer.returnValue((Membership.JOIN, None))
+                return
+            raise AuthError(
+                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+            )
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index b0ea26dcb4..398bb36602 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+    JoinedRoomMemberListRestServlet,
+    PublicRoomListRestServlet,
+    RoomEventContextServlet,
+    RoomMemberListRestServlet,
+    RoomStateRestServlet,
+)
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
@@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
+
                     PublicRoomListRestServlet(self).register(resource)
+                    RoomMemberListRestServlet(self).register(resource)
+                    JoinedRoomMemberListRestServlet(self).register(resource)
+                    RoomStateRestServlet(self).register(resource)
+                    RoomEventContextServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index a59064b416..189212b0fa 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -163,6 +163,9 @@ class EventContext(object):
         context._prev_state_id = input["prev_state_id"]
         context._event_type = input["event_type"]
         context._event_state_key = input["event_state_key"]
+
+        context._current_state_ids = None
+        context._prev_state_ids = None
         context._fetching_state_deferred = None
 
         context.state_group = input["state_group"]
@@ -214,6 +217,17 @@ class EventContext(object):
 
         defer.returnValue(self._prev_state_ids)
 
+    def get_cached_current_state_ids(self):
+        """Gets the current state IDs if we have them already cached.
+
+        Returns:
+            dict[(str, str), str]|None: Returns None if we haven't cached the
+            state or if state_group is None, which happens when the associated
+            event is an outlier.
+        """
+
+        return self._current_state_ids
+
     @defer.inlineCallbacks
     def _fill_out_state(self, store):
         """Called to populate the _current_state_ids and _prev_state_ids
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
 from .directory import DirectoryHandler
 from .federation import FederationHandler
 from .identity import IdentityHandler
-from .message import MessageHandler
 from .register import RegistrationHandler
-from .room import RoomContextHandler
 from .search import SearchHandler
 
 
@@ -44,10 +42,8 @@ class Handlers(object):
 
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
-        self.message_handler = MessageHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
-        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
                             yield self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
-                            self.scheduler.start().addErrback(log_failure)
+                            def start_scheduler():
+                                return self.scheduler.start().addErrback(log_failure)
+                            run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
                         # Fork off pushes to these services
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
             try:
                 if event.membership == Membership.JOIN:
                     room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
+                    deferred_room_state = run_in_background(
+                        self.state_handler.get_current_state,
+                        event.room_id,
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
+                    deferred_room_state = run_in_background(
+                        self.store.get_state_for_events,
+                        [event.event_id], None,
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
                 receipts = []
             defer.returnValue(receipts)
 
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                run_in_background(get_presence),
-                run_in_background(get_receipts),
-                run_in_background(
-                    self.store.get_recent_events_for_room,
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        presence, receipts, (messages, token) = yield make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_presence),
+                    run_in_background(get_receipts),
+                    run_in_background(
+                        self.store.get_recent_events_for_room,
+                        room_id,
+                        limit=limit,
+                        end_token=now_token.room_key,
+                    )
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError),
+        )
 
         messages = yield filter_events_for_client(
             self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7571975c22..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Linearizer, ReadWriteLock
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -304,31 +82,6 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
     def get_state_events(self, user_id, room_id, is_guest=False):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
+            room_state = yield self.state.get_current_state(room_id)
         elif membership == Membership.LEAVE:
             room_state = yield self.store.get_state_for_events(
                 [membership_event_id], None
@@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
+class PaginationHandler(object):
+    """Handles pagination and purge history requests.
+
+    These are in the same handler due to the fact we need to block clients
+    paginating during a purge.
+    """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
+
+    def start_purge_history(self, room_id, token,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, token, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, token,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, token, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, requester, room_id=None, pagin_config=None,
+                     as_client_event=True, event_filter=None):
+        """Get messages in a room.
+
+        Args:
+            requester (Requester): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+            event_filter (Filter): Filter to apply to results or None
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        with (yield self.pagination_lock.read(room_id)):
+            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+                room_id, user_id
+            )
+
+            if source_config.direction == 'b':
+                # if we're going backwards, we might need to backfill. This
+                # requires that we have a topo token.
+                if room_token.topological:
+                    max_topo = room_token.topological
+                else:
+                    max_topo = yield self.store.get_max_topological_token(
+                        room_id, room_token.stream
+                    )
+
+                if membership == Membership.LEAVE:
+                    # If they have left the room then clamp the token to be before
+                    # they left the room, to save the effort of loading from the
+                    # database.
+                    leave_token = yield self.store.get_topological_token_for_event(
+                        member_event_id
+                    )
+                    leave_token = RoomStreamToken.parse(leave_token)
+                    if leave_token.topological < max_topo:
+                        source_config.from_key = str(leave_token)
+
+                yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                    room_id, max_topo
+                )
+
+            events, next_key = yield self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=source_config.from_key,
+                to_key=source_config.to_key,
+                direction=source_config.direction,
+                limit=source_config.limit,
+                event_filter=event_filter,
+            )
+
+            next_token = pagin_config.from_token.copy_and_replace(
+                "room_key", next_key
+            )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        if event_filter:
+            events = event_filter.filter(events)
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now, as_client_event)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f67512078b..6150b7e226 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
     @defer.inlineCallbacks
     def get_event_context(self, user, room_id, event_id, limit):
         """Retrieves events, pagination tokens and state around a given event
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 9e9c175970..99f6c6e3c3 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -244,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             hs (synapse.server.HomeServer)
         """
         super(PurgeHistoryRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
         self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
@@ -319,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        purge_id = yield self.handlers.message_handler.start_purge_history(
+        purge_id = yield self.pagination_handler.start_purge_history(
             room_id, token,
             delete_local_events=delete_local_events,
         )
@@ -341,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
             hs (synapse.server.HomeServer)
         """
         super(PurgeHistoryStatusRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, purge_id):
@@ -351,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+        purge_status = self.pagination_handler.get_purge_status(purge_id)
         if purge_status is None:
             raise NotFoundError("purge id '%s' not found" % purge_id)
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b9512a2b61..b7bd878c90 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.message_handler = hs.get_message_handler()
 
     def register(self, http_server):
         # /room/$roomid/state/$eventtype
@@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         format = parse_string(request, "format", default="content",
                               allowed_values=["content", "event"])
 
-        msg_handler = self.handlers.message_handler
+        msg_handler = self.message_handler
         data = yield msg_handler.get_room_data(
             user_id=requester.user.to_string(),
             room_id=room_id,
@@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMemberListRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
         # TODO support Pagination stream API (limit/tokens)
         requester = yield self.auth.get_user_by_req(request)
-        handler = self.handlers.message_handler
-        events = yield handler.get_state_events(
+        events = yield self.message_handler.get_state_events(
             room_id=room_id,
             user_id=requester.user.to_string(),
         )
@@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(JoinedRoomMemberListRestServlet, self).__init__(hs)
-        self.message_handler = hs.get_handlers().message_handler
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
@@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMessageListRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
@@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
             event_filter = Filter(json.loads(filter_json))
         else:
             event_filter = None
-        handler = self.handlers.message_handler
-        msgs = yield handler.get_messages(
+        msgs = yield self.pagination_handler.get_messages(
             room_id=room_id,
             requester=requester,
             pagin_config=pagination_config,
@@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomStateRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
-        handler = self.handlers.message_handler
         # Get all the current state for this room
-        events = yield handler.get_state_events(
+        events = yield self.message_handler.get_state_events(
             room_id=room_id,
             user_id=requester.user.to_string(),
             is_guest=requester.is_guest,
@@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(RoomEventContextServlet, self).__init__(hs)
         self.clock = hs.get_clock()
-        self.handlers = hs.get_handlers()
+        self.room_context_handler = hs.get_room_context_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id, event_id):
@@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
 
         limit = parse_integer(request, "limit", default=10)
 
-        results = yield self.handlers.room_context_handler.get_event_context(
+        results = yield self.room_context_handler.get_event_context(
             requester.user,
             room_id,
             event_id,
diff --git a/synapse/server.py b/synapse/server.py
index fd4f992258..140be9ebe8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.groups_local import GroupsLocalHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
-from synapse.handlers.message import EventCreationHandler
+from synapse.handlers.message import EventCreationHandler, MessageHandler
+from synapse.handlers.pagination import PaginationHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.profile import ProfileHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
 from synapse.handlers.receipts import ReceiptsHandler
-from synapse.handlers.room import RoomCreationHandler
+from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
 from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import RoomMemberMasterHandler
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -165,6 +166,9 @@ class HomeServer(object):
         'federation_registry',
         'server_notices_manager',
         'server_notices_sender',
+        'message_handler',
+        'pagination_handler',
+        'room_context_handler',
     ]
 
     def __init__(self, hostname, reactor=None, **kwargs):
@@ -431,6 +435,15 @@ class HomeServer(object):
             return WorkerServerNoticesSender(self)
         return ServerNoticesSender(self)
 
+    def build_message_handler(self):
+        return MessageHandler(self)
+
+    def build_pagination_handler(self):
+        return PaginationHandler(self)
+
+    def build_room_context_handler(self):
+        return RoomContextHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bf4f3ee92a..4d0706f23d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 
@@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
                     # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
-                        item.deferred.callback(ret)
+                        with PreserveLoggingContext():
+                            item.deferred.callback(ret)
                     except Exception:
                         item.deferred.errback()
             finally:
@@ -417,19 +418,29 @@ class EventsStore(EventsWorkerStore):
                             logger.info(
                                 "Calculating state delta for room %s", room_id,
                             )
-                            current_state = yield self._get_new_state_after_events(
-                                room_id,
-                                ev_ctx_rm,
-                                latest_event_ids,
-                                new_latest_event_ids,
-                            )
+
+                            with Measure(
+                                    self._clock,
+                                    "persist_events.get_new_state_after_events",
+                            ):
+                                current_state = yield self._get_new_state_after_events(
+                                    room_id,
+                                    ev_ctx_rm,
+                                    latest_event_ids,
+                                    new_latest_event_ids,
+                                )
+
                             if current_state is not None:
                                 current_state_for_room[room_id] = current_state
-                                delta = yield self._calculate_state_delta(
-                                    room_id, current_state,
-                                )
-                                if delta is not None:
-                                    state_delta_for_room[room_id] = delta
+                                with Measure(
+                                        self._clock,
+                                        "persist_events.calculate_state_delta",
+                                ):
+                                    delta = yield self._calculate_state_delta(
+                                        room_id, current_state,
+                                    )
+                                    if delta is not None:
+                                        state_delta_for_room[room_id] = delta
 
                 yield self.runInteraction(
                     "persist_events",
@@ -549,7 +560,12 @@ class EventsStore(EventsWorkerStore):
             if ctx.state_group in state_groups_map:
                 continue
 
-            state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self)
+            # We're only interested in pulling out state that has already
+            # been cached in the context. We'll pull stuff out of the DB later
+            # if necessary.
+            current_state_ids = ctx.get_cached_current_state_ids()
+            if current_state_ids is not None:
+                state_groups_map[ctx.state_group] = current_state_ids
 
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
             )
 
             if newly_inserted:
-                self.runInteraction(
+                yield self.runInteraction(
                     "add_pusher",
                     self._invalidate_cache_and_stream,
                     self.get_if_user_has_pusher, (user_id,)