summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py40
-rw-r--r--synapse/api/filtering.py20
-rw-r--r--synapse/app/client_reader.py16
-rwxr-xr-xsynapse/app/homeserver.py19
-rw-r--r--synapse/app/synchrotron.py5
-rwxr-xr-xsynapse/app/synctl.py4
-rw-r--r--synapse/config/voip.py4
-rw-r--r--synapse/events/snapshot.py200
-rw-r--r--synapse/federation/federation_server.py11
-rw-r--r--synapse/federation/send_queue.py63
-rw-r--r--synapse/federation/transaction_queue.py57
-rw-r--r--synapse/federation/transport/server.py7
-rw-r--r--synapse/federation/units.py1
-rw-r--r--synapse/groups/attestations.py6
-rw-r--r--synapse/handlers/__init__.py4
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/appservice.py5
-rw-r--r--synapse/handlers/federation.py293
-rw-r--r--synapse/handlers/initial_sync.py38
-rw-r--r--synapse/handlers/message.py304
-rw-r--r--synapse/handlers/pagination.py265
-rw-r--r--synapse/handlers/profile.py9
-rw-r--r--synapse/handlers/room.py42
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/sync.py173
-rw-r--r--synapse/http/client.py6
-rw-r--r--synapse/http/request_metrics.py9
-rw-r--r--synapse/http/site.py5
-rw-r--r--synapse/metrics/background_process_metrics.py185
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/replication/http/send_event.py7
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/replication/tcp/resource.py14
-rw-r--r--synapse/rest/__init__.py43
-rw-r--r--synapse/rest/client/v1/admin.py130
-rw-r--r--synapse/rest/client/v1/room.py38
-rw-r--r--synapse/rest/client/v1_only/__init__.py3
-rw-r--r--synapse/rest/client/v1_only/base.py39
-rw-r--r--synapse/rest/client/v1_only/register.py (renamed from synapse/rest/client/v1/register.py)7
-rw-r--r--synapse/rest/client/v2_alpha/register.py2
-rw-r--r--synapse/rest/media/v1/media_repository.py8
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py8
-rw-r--r--synapse/secrets.py41
-rw-r--r--synapse/server.py22
-rw-r--r--synapse/state.py246
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py8
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/background_updates.py10
-rw-r--r--synapse/storage/client_ips.py15
-rw-r--r--synapse/storage/devices.py61
-rw-r--r--synapse/storage/end_to_end_keys.py27
-rw-r--r--synapse/storage/event_federation.py24
-rw-r--r--synapse/storage/event_push_actions.py13
-rw-r--r--synapse/storage/events.py240
-rw-r--r--synapse/storage/events_worker.py10
-rw-r--r--synapse/storage/push_rule.py20
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/roommember.py70
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py92
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql3
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql7
-rw-r--r--synapse/storage/state.py147
-rw-r--r--synapse/storage/stream.py16
-rw-r--r--synapse/storage/transactions.py8
-rw-r--r--synapse/util/async.py145
-rw-r--r--synapse/util/caches/descriptors.py131
-rw-r--r--synapse/util/caches/expiringcache.py6
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/distributor.py48
-rw-r--r--synapse/util/logcontext.py11
-rw-r--r--synapse/util/metrics.py19
-rw-r--r--synapse/visibility.py178
76 files changed, 2411 insertions, 1338 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 3cde33c0d7..5c0f2f83aa 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.32.2"
+__version__ = "0.33.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bc629832d9..073229b4c4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -65,8 +65,9 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def check_from_context(self, event, context, do_sig_check=True):
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         auth_events_ids = yield self.compute_auth_events(
-            event, context.prev_state_ids, for_verification=True,
+            event, prev_state_ids, for_verification=True,
         )
         auth_events = yield self.store.get_events(auth_events_ids)
         auth_events = {
@@ -544,7 +545,8 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def add_auth_events(self, builder, context):
-        auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
 
         auth_events_entries = yield self.store.add_event_hashes(
             auth_ids
@@ -737,3 +739,37 @@ class Auth(object):
                 )
 
             return query_params[0]
+
+    @defer.inlineCallbacks
+    def check_in_room_or_world_readable(self, room_id, user_id):
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
+
+        Returns:
+            Deferred[tuple[str, str|None]]: Resolves to the current membership of
+            the user in the room and the membership event ID of the user. If
+            the user is not in the room and never has been, then
+            `(Membership.JOIN, None)` is returned.
+        """
+
+        try:
+            # check_user_was_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            member_event = yield self.check_user_was_in_room(room_id, user_id)
+            defer.returnValue((member_event.membership, member_event.event_id))
+        except AuthError:
+            visibility = yield self.state.get_current_state(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility and
+                visibility.content["history_visibility"] == "world_readable"
+            ):
+                defer.returnValue((Membership.JOIN, None))
+                return
+            raise AuthError(
+                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+            )
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 25346baa87..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -113,7 +113,13 @@ ROOM_EVENT_FILTER_SCHEMA = {
         },
         "contains_url": {
             "type": "boolean"
-        }
+        },
+        "lazy_load_members": {
+            "type": "boolean"
+        },
+        "include_redundant_members": {
+            "type": "boolean"
+        },
     }
 }
 
@@ -261,6 +267,12 @@ class FilterCollection(object):
     def ephemeral_limit(self):
         return self._room_ephemeral_filter.limit()
 
+    def lazy_load_members(self):
+        return self._room_state_filter.lazy_load_members()
+
+    def include_redundant_members(self):
+        return self._room_state_filter.include_redundant_members()
+
     def filter_presence(self, events):
         return self._presence_filter.filter(events)
 
@@ -417,6 +429,12 @@ class Filter(object):
     def limit(self):
         return self.filter_json.get("limit", 10)
 
+    def lazy_load_members(self):
+        return self.filter_json.get("lazy_load_members", False)
+
+    def include_redundant_members(self):
+        return self.filter_json.get("include_redundant_members", False)
+
 
 def _matches_wildcard(actual_value, filter_value):
     if filter_value.endswith("*"):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index b0ea26dcb4..e2c91123db 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.directory import DirectoryStore
@@ -40,7 +41,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+    JoinedRoomMemberListRestServlet,
+    PublicRoomListRestServlet,
+    RoomEventContextServlet,
+    RoomMemberListRestServlet,
+    RoomStateRestServlet,
+)
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
@@ -52,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader")
 
 
 class ClientReaderSlavedStore(
+    SlavedAccountDataStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
@@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
+
                     PublicRoomListRestServlet(self).register(resource)
+                    RoomMemberListRestServlet(self).register(resource)
+                    JoinedRoomMemberListRestServlet(self).register(resource)
+                    RoomStateRestServlet(self).register(resource)
+                    RoomEventContextServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 14e6dca522..57b815d777 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,6 +18,8 @@ import logging
 import os
 import sys
 
+from six import iteritems
+
 from twisted.application import service
 from twisted.internet import defer, reactor
 from twisted.web.resource import EncodingResourceWrapper, NoResource
@@ -47,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import RootRedirect
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.module_api import ModuleApi
 from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
@@ -425,6 +428,9 @@ def run(hs):
     # currently either 0 or 1
     stats_process = []
 
+    def start_phone_stats_home():
+        return run_as_background_process("phone_stats_home", phone_stats_home)
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -442,7 +448,7 @@ def run(hs):
         stats["total_nonbridged_users"] = total_nonbridged_users
 
         daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
-        for name, count in daily_user_type_results.iteritems():
+        for name, count in iteritems(daily_user_type_results):
             stats["daily_user_type_" + name] = count
 
         room_count = yield hs.get_datastore().get_room_count()
@@ -453,7 +459,7 @@ def run(hs):
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
         r30_results = yield hs.get_datastore().count_r30_users()
-        for name, count in r30_results.iteritems():
+        for name, count in iteritems(r30_results):
             stats["r30_users_" + name] = count
 
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
@@ -496,7 +502,10 @@ def run(hs):
             )
 
     def generate_user_daily_visit_stats():
-        hs.get_datastore().generate_user_daily_visits()
+        return run_as_background_process(
+            "generate_user_daily_visits",
+            hs.get_datastore().generate_user_daily_visits,
+        )
 
     # Rather than update on per session basis, batch up the requests.
     # If you increase the loop period, the accuracy of user_daily_visits
@@ -505,7 +514,7 @@ def run(hs):
 
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
-        clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+        clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
 
         # We need to defer this init for the cases that we daemonize
         # otherwise the process ID we get is that of the non-daemon process
@@ -513,7 +522,7 @@ def run(hs):
 
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
-        clock.call_later(5 * 60, phone_stats_home)
+        clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
         print (hs.config.pid_file)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 26b9ec85f2..e201f18efd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
@@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
     RoomStore,
     BaseSlavedStore,
 ):
-    did_forget = (
-        RoomMemberStore.__dict__["did_forget"]
-    )
+    pass
 
 
 UPDATE_SYNCING_USERS_MS = 10 * 1000
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 68acc15a9a..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -25,6 +25,8 @@ import subprocess
 import sys
 import time
 
+from six import iteritems
+
 import yaml
 
 SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@@ -173,7 +175,7 @@ def main():
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
     cache_factors = config.get("synctl_cache_factors", {})
-    for cache_name, factor in cache_factors.iteritems():
+    for cache_name, factor in iteritems(cache_factors):
         os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 
     worker_configfiles = []
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
         ## Turn ##
 
         # The public URIs of the TURN server to give to clients
-        turn_uris: []
+        #turn_uris: []
 
         # The shared secret used to compute passwords for the TURN server
-        turn_shared_secret: "YOUR_SHARED_SECRET"
+        #turn_shared_secret: "YOUR_SHARED_SECRET"
 
         # The Username and password if the TURN server needs them and
         # does not use a token
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index bcd9bb5946..368b5f6ae4 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,22 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six import iteritems
+
 from frozendict import frozendict
 
 from twisted.internet import defer
 
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
 
 class EventContext(object):
     """
     Attributes:
-        current_state_ids (dict[(str, str), str]):
-            The current state map including the current event.
-            (type, state_key) -> event_id
-
-        prev_state_ids (dict[(str, str), str]):
-            The current state map excluding the current event.
-            (type, state_key) -> event_id
-
         state_group (int|None): state group id, if the state has been stored
             as a state group. This is usually only None if e.g. the event is
             an outlier.
@@ -45,38 +41,77 @@ class EventContext(object):
 
         prev_state_events (?): XXX: is this ever set to anything other than
             the empty list?
+
+        _current_state_ids (dict[(str, str), str]|None):
+            The current state map including the current event. None if outlier
+            or we haven't fetched the state from DB yet.
+            (type, state_key) -> event_id
+
+        _prev_state_ids (dict[(str, str), str]|None):
+            The current state map excluding the current event. None if outlier
+            or we haven't fetched the state from DB yet.
+            (type, state_key) -> event_id
+
+        _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
+            been calculated. None if we haven't started calculating yet
+
+        _event_type (str): The type of the event the context is associated with.
+            Only set when state has not been fetched yet.
+
+        _event_state_key (str|None): The state_key of the event the context is
+            associated with. Only set when state has not been fetched yet.
+
+        _prev_state_id (str|None): If the event associated with the context is
+            a state event, then `_prev_state_id` is the event_id of the state
+            that was replaced.
+            Only set when state has not been fetched yet.
     """
 
     __slots__ = [
-        "current_state_ids",
-        "prev_state_ids",
         "state_group",
         "rejected",
         "prev_group",
         "delta_ids",
         "prev_state_events",
         "app_service",
+        "_current_state_ids",
+        "_prev_state_ids",
+        "_prev_state_id",
+        "_event_type",
+        "_event_state_key",
+        "_fetching_state_deferred",
     ]
 
     def __init__(self):
+        self.prev_state_events = []
+        self.rejected = False
+        self.app_service = None
+
+    @staticmethod
+    def with_state(state_group, current_state_ids, prev_state_ids,
+                   prev_group=None, delta_ids=None):
+        context = EventContext()
+
         # The current state including the current event
-        self.current_state_ids = None
+        context._current_state_ids = current_state_ids
         # The current state excluding the current event
-        self.prev_state_ids = None
-        self.state_group = None
+        context._prev_state_ids = prev_state_ids
+        context.state_group = state_group
 
-        self.rejected = False
+        context._prev_state_id = None
+        context._event_type = None
+        context._event_state_key = None
+        context._fetching_state_deferred = defer.succeed(None)
 
         # A previously persisted state group and a delta between that
         # and this state.
-        self.prev_group = None
-        self.delta_ids = None
+        context.prev_group = prev_group
+        context.delta_ids = delta_ids
 
-        self.prev_state_events = None
-
-        self.app_service = None
+        return context
 
-    def serialize(self, event):
+    @defer.inlineCallbacks
+    def serialize(self, event, store):
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
@@ -92,11 +127,12 @@ class EventContext(object):
         # the prev_state_ids, so if we're a state event we include the event
         # id that we replaced in the state.
         if event.is_state():
-            prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+            prev_state_ids = yield self.get_prev_state_ids(store)
+            prev_state_id = prev_state_ids.get((event.type, event.state_key))
         else:
             prev_state_id = None
 
-        return {
+        defer.returnValue({
             "prev_state_id": prev_state_id,
             "event_type": event.type,
             "event_state_key": event.state_key if event.is_state() else None,
@@ -106,10 +142,9 @@ class EventContext(object):
             "delta_ids": _encode_state_dict(self.delta_ids),
             "prev_state_events": self.prev_state_events,
             "app_service_id": self.app_service.id if self.app_service else None
-        }
+        })
 
     @staticmethod
-    @defer.inlineCallbacks
     def deserialize(store, input):
         """Converts a dict that was produced by `serialize` back into a
         EventContext.
@@ -122,32 +157,115 @@ class EventContext(object):
             EventContext
         """
         context = EventContext()
+
+        # We use the state_group and prev_state_id stuff to pull the
+        # current_state_ids out of the DB and construct prev_state_ids.
+        context._prev_state_id = input["prev_state_id"]
+        context._event_type = input["event_type"]
+        context._event_state_key = input["event_state_key"]
+
+        context._current_state_ids = None
+        context._prev_state_ids = None
+        context._fetching_state_deferred = None
+
         context.state_group = input["state_group"]
-        context.rejected = input["rejected"]
         context.prev_group = input["prev_group"]
         context.delta_ids = _decode_state_dict(input["delta_ids"])
+
+        context.rejected = input["rejected"]
         context.prev_state_events = input["prev_state_events"]
 
-        # We use the state_group and prev_state_id stuff to pull the
-        # current_state_ids out of the DB and construct prev_state_ids.
-        prev_state_id = input["prev_state_id"]
-        event_type = input["event_type"]
-        event_state_key = input["event_state_key"]
+        app_service_id = input["app_service_id"]
+        if app_service_id:
+            context.app_service = store.get_app_service_by_id(app_service_id)
+
+        return context
+
+    @defer.inlineCallbacks
+    def get_current_state_ids(self, store):
+        """Gets the current state IDs
+
+        Returns:
+            Deferred[dict[(str, str), str]|None]: Returns None if state_group
+            is None, which happens when the associated event is an outlier.
+        """
+
+        if not self._fetching_state_deferred:
+            self._fetching_state_deferred = run_in_background(
+                self._fill_out_state, store,
+            )
+
+        yield make_deferred_yieldable(self._fetching_state_deferred)
+
+        defer.returnValue(self._current_state_ids)
+
+    @defer.inlineCallbacks
+    def get_prev_state_ids(self, store):
+        """Gets the prev state IDs
+
+        Returns:
+            Deferred[dict[(str, str), str]|None]: Returns None if state_group
+            is None, which happens when the associated event is an outlier.
+        """
+
+        if not self._fetching_state_deferred:
+            self._fetching_state_deferred = run_in_background(
+                self._fill_out_state, store,
+            )
+
+        yield make_deferred_yieldable(self._fetching_state_deferred)
 
-        context.current_state_ids = yield store.get_state_ids_for_group(
-            context.state_group,
+        defer.returnValue(self._prev_state_ids)
+
+    def get_cached_current_state_ids(self):
+        """Gets the current state IDs if we have them already cached.
+
+        Returns:
+            dict[(str, str), str]|None: Returns None if we haven't cached the
+            state or if state_group is None, which happens when the associated
+            event is an outlier.
+        """
+
+        return self._current_state_ids
+
+    @defer.inlineCallbacks
+    def _fill_out_state(self, store):
+        """Called to populate the _current_state_ids and _prev_state_ids
+        attributes by loading from the database.
+        """
+        if self.state_group is None:
+            return
+
+        self._current_state_ids = yield store.get_state_ids_for_group(
+            self.state_group,
         )
-        if prev_state_id and event_state_key:
-            context.prev_state_ids = dict(context.current_state_ids)
-            context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
+        if self._prev_state_id and self._event_state_key is not None:
+            self._prev_state_ids = dict(self._current_state_ids)
+
+            key = (self._event_type, self._event_state_key)
+            self._prev_state_ids[key] = self._prev_state_id
         else:
-            context.prev_state_ids = context.current_state_ids
+            self._prev_state_ids = self._current_state_ids
 
-        app_service_id = input["app_service_id"]
-        if app_service_id:
-            context.app_service = store.get_app_service_by_id(app_service_id)
+    @defer.inlineCallbacks
+    def update_state(self, state_group, prev_state_ids, current_state_ids,
+                     prev_group, delta_ids):
+        """Replace the state in the context
+        """
+
+        # We need to make sure we wait for any ongoing fetching of state
+        # to complete so that the updated state doesn't get clobbered
+        if self._fetching_state_deferred:
+            yield make_deferred_yieldable(self._fetching_state_deferred)
+
+        self.state_group = state_group
+        self._prev_state_ids = prev_state_ids
+        self.prev_group = prev_group
+        self._current_state_ids = current_state_ids
+        self.delta_ids = delta_ids
 
-        defer.returnValue(context)
+        # We need to ensure that that we've marked as having fetched the state
+        self._fetching_state_deferred = defer.succeed(None)
 
 
 def _encode_state_dict(state_dict):
@@ -159,7 +277,7 @@ def _encode_state_dict(state_dict):
 
     return [
         (etype, state_key, v)
-        for (etype, state_key), v in state_dict.iteritems()
+        for (etype, state_key), v in iteritems(state_dict)
     ]
 
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 48f26db67c..657935d1ac 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -24,6 +24,7 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
 
 from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@@ -186,8 +187,12 @@ class FederationServer(FederationBase):
                     logger.warn("Error handling PDU %s: %s", event_id, e)
                     pdu_results[event_id] = {"error": str(e)}
                 except Exception as e:
+                    f = failure.Failure()
                     pdu_results[event_id] = {"error": str(e)}
-                    logger.exception("Failed to handle PDU %s", event_id)
+                    logger.error(
+                        "Failed to handle PDU %s: %s",
+                        event_id, f.getTraceback().rstrip(),
+                    )
 
         yield async.concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
@@ -202,10 +207,6 @@ class FederationServer(FederationBase):
                     edu.content
                 )
 
-        pdu_failures = getattr(transaction, "pdu_failures", [])
-        for failure in pdu_failures:
-            logger.info("Got failure %r", failure)
-
         response = {
             "pdus": pdu_results,
         }
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5157c3860d..0bb468385d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = SortedDict()  # stream position -> (destination, Failure)
-
         self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
@@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
-            "edus", "failures", "device_messages", "pos_time",
+            "edus", "device_messages", "pos_time",
         ]:
             register(queue_name, getattr(self, queue_name))
 
@@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of failure map
-            keys = self.failures.keys()
-            i = self.failures.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.failures[key]
-
             # Delete things out of device map
             keys = self.device_messages.keys()
             i = self.device_messages.bisect_left(position_to_delete)
@@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_failure(self, failure, destination):
-        """As per TransactionQueue"""
-        pos = self._next_pos()
-
-        self.failures[pos] = (destination, str(failure))
-        self.notifier.on_new_replication_data()
-
     def send_device_messages(self, destination):
         """As per TransactionQueue"""
         pos = self._next_pos()
@@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed failures
-        i = self.failures.bisect_right(from_token)
-        j = self.failures.bisect_right(to_token) + 1
-        failures = self.failures.items()[i:j]
-
-        for (pos, (destination, failure)) in failures:
-            rows.append((pos, FailureRow(
-                destination=destination,
-                failure=failure,
-            )))
-
         # Fetch changed device messages
         i = self.device_messages.bisect_right(from_token)
         j = self.device_messages.bisect_right(to_token) + 1
@@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
-    "destination",  # str
-    "failure",
-))):
-    """Streams failures to a remote server. Failures are issued when there was
-    something wrong with a transaction the remote sent us, e.g. it included
-    an event that was invalid.
-    """
-
-    TypeId = "f"
-
-    @staticmethod
-    def from_data(data):
-        return FailureRow(
-            destination=data["destination"],
-            failure=data["failure"],
-        )
-
-    def to_data(self):
-        return {
-            "destination": self.destination,
-            "failure": self.failure,
-        }
-
-    def add_to_buffer(self, buff):
-        buff.failures.setdefault(self.destination, []).append(self.failure)
-
-
 class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
     "destination",  # str
 ))):
@@ -471,7 +417,6 @@ TypeToRow = {
         PresenceRow,
         KeyedEduRow,
         EduRow,
-        FailureRow,
         DeviceRow,
     )
 }
@@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
     "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
-    "failures",  # dict of destination -> [failures]
     "device_destinations",  # set of destinations
 ))
 
@@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
         presence=[],
         keyed_edus={},
         edus={},
-        failures={},
         device_destinations=set(),
     )
 
@@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in iteritems(buff.failures):
-        for failure in failure_list:
-            transaction_queue.send_failure(destination, failure)
-
     for destination in buff.device_destinations:
         transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..78f9d40a3a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
     sent_edus_counter,
     sent_transactions_counter,
 )
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -115,9 +116,6 @@ class TransactionQueue(object):
             ),
         )
 
-        # destination -> list of tuple(failure, deferred)
-        self.pending_failures_by_dest = {}
-
         # destination -> stream_id of last successfully sent to-device message.
         # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
@@ -165,10 +163,11 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
-        # fire off a processing loop in the background. It's likely it will
-        # outlast the current request, so run it in the sentinel logcontext.
-        with PreserveLoggingContext():
-            self._process_event_queue_loop()
+        # fire off a processing loop in the background
+        run_as_background_process(
+            "process_event_queue_for_federation",
+            self._process_event_queue_loop,
+        )
 
     @defer.inlineCallbacks
     def _process_event_queue_loop(self):
@@ -380,19 +379,6 @@ class TransactionQueue(object):
 
         self._attempt_new_transaction(destination)
 
-    def send_failure(self, failure, destination):
-        if destination == self.server_name or destination == "localhost":
-            return
-
-        if not self.can_send_to(destination):
-            return
-
-        self.pending_failures_by_dest.setdefault(
-            destination, []
-        ).append(failure)
-
-        self._attempt_new_transaction(destination)
-
     def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
             return
@@ -432,14 +418,11 @@ class TransactionQueue(object):
 
         logger.debug("TX [%s] Starting transaction loop", destination)
 
-        # Drop the logcontext before starting the transaction. It doesn't
-        # really make sense to log all the outbound transactions against
-        # whatever path led us to this point: that's pretty arbitrary really.
-        #
-        # (this also means we can fire off _perform_transaction without
-        # yielding)
-        with logcontext.PreserveLoggingContext():
-            self._transaction_transmission_loop(destination)
+        run_as_background_process(
+            "federation_transaction_transmission_loop",
+            self._transaction_transmission_loop,
+            destination,
+        )
 
     @defer.inlineCallbacks
     def _transaction_transmission_loop(self, destination):
@@ -470,7 +453,6 @@ class TransactionQueue(object):
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
                 pending_edus.extend(
                     self.pending_edus_keyed_by_dest.pop(destination, {}).values()
@@ -498,7 +480,7 @@ class TransactionQueue(object):
                     logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                                  destination, len(pending_pdus))
 
-                if not pending_pdus and not pending_edus and not pending_failures:
+                if not pending_pdus and not pending_edus:
                     logger.debug("TX [%s] Nothing to send", destination)
                     self.last_device_stream_id_by_dest[destination] = (
                         device_stream_id
@@ -508,7 +490,7 @@ class TransactionQueue(object):
                 # END CRITICAL SECTION
 
                 success = yield self._send_new_transaction(
-                    destination, pending_pdus, pending_edus, pending_failures,
+                    destination, pending_pdus, pending_edus,
                 )
                 if success:
                     sent_transactions_counter.inc()
@@ -585,14 +567,12 @@ class TransactionQueue(object):
 
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
-    def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures):
+    def _send_new_transaction(self, destination, pending_pdus, pending_edus):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
         pdus = [x[0] for x in pending_pdus]
         edus = pending_edus
-        failures = [x.get_dict() for x in pending_failures]
 
         success = True
 
@@ -602,11 +582,10 @@ class TransactionQueue(object):
 
         logger.debug(
             "TX [%s] {%s} Attempting new transaction"
-            " (pdus: %d, edus: %d, failures: %d)",
+            " (pdus: %d, edus: %d)",
             destination, txn_id,
             len(pdus),
             len(edus),
-            len(failures)
         )
 
         logger.debug("TX [%s] Persisting transaction...", destination)
@@ -618,7 +597,6 @@ class TransactionQueue(object):
             destination=destination,
             pdus=pdus,
             edus=edus,
-            pdu_failures=failures,
         )
 
         self._next_txn_id += 1
@@ -628,12 +606,11 @@ class TransactionQueue(object):
         logger.debug("TX [%s] Persisted transaction", destination)
         logger.info(
             "TX [%s] {%s} Sending transaction [%s],"
-            " (PDUs: %d, EDUs: %d, failures: %d)",
+            " (PDUs: %d, EDUs: %d)",
             destination, txn_id,
             transaction.transaction_id,
             len(pdus),
             len(edus),
-            len(failures),
         )
 
         # Actually send the transaction
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c9beca27c2..3b5ea9515a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
             )
 
             logger.info(
-                "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+                "Received txn %s from %s. (PDUs: %d, EDUs: %d)",
                 transaction_id, origin,
                 len(transaction_data.get("pdus", [])),
                 len(transaction_data.get("edus", [])),
-                len(transaction_data.get("failures", [])),
             )
 
             # We should ideally be getting this from the security layer.
@@ -404,10 +403,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
 
 
 class FederationSendLeaveServlet(BaseFederationServlet):
-    PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
+    PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
 
     @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, room_id, txid):
+    def on_PUT(self, origin, content, query, room_id, event_id):
         content = yield self.handler.on_send_leave_request(origin, content)
         defer.returnValue((200, content))
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index bb1b3b13f7..c5ab14314e 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
         "previous_ids",
         "pdus",
         "edus",
-        "pdu_failures",
     ]
 
     internal_keys = [
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 47452700a8..b04f4234ca 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -43,6 +43,7 @@ from signedjson.sign import sign_json
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
 from synapse.util.logcontext import run_in_background
 
@@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
         self.attestations = hs.get_groups_attestation_signing()
 
         self._renew_attestations_loop = self.clock.looping_call(
-            self._renew_attestations, 30 * 60 * 1000,
+            self._start_renew_attestations, 30 * 60 * 1000,
         )
 
     @defer.inlineCallbacks
@@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
 
         defer.returnValue({})
 
+    def _start_renew_attestations(self):
+        return run_as_background_process("renew_attestations", self._renew_attestations)
+
     @defer.inlineCallbacks
     def _renew_attestations(self):
         """Called periodically to check if we need to update any of our attestations
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
 from .directory import DirectoryHandler
 from .federation import FederationHandler
 from .identity import IdentityHandler
-from .message import MessageHandler
 from .register import RegistrationHandler
-from .room import RoomContextHandler
 from .search import SearchHandler
 
 
@@ -44,10 +42,8 @@ class Handlers(object):
 
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
-        self.message_handler = MessageHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
-        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
             guest_access = event.content.get("guest_access", "forbidden")
             if guest_access != "can_join":
                 if context:
+                    current_state_ids = yield context.get_current_state_ids(self.store)
                     current_state = yield self.store.get_events(
-                        list(context.current_state_ids.values())
+                        list(current_state_ids.values())
                     )
                 else:
                     current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
                             yield self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
-                            self.scheduler.start().addErrback(log_failure)
+                            def start_scheduler():
+                                return self.scheduler.start().addErrback(log_failure)
+                            run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
                         # Fork off pushes to these services
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d3ecebd29f..91d8def08b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
 import sys
 
 import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
 
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -43,7 +43,6 @@ from synapse.crypto.event_signing import (
     add_hashes_and_signatures,
     compute_event_signature,
 )
-from synapse.events.utils import prune_event
 from synapse.events.validator import EventValidator
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
@@ -52,8 +51,8 @@ from synapse.util.async import Linearizer
 from synapse.util.distributor import user_joined_room
 from synapse.util.frozenutils import unfreeze
 from synapse.util.logutils import log_function
-from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
 
 from ._base import BaseHandler
 
@@ -77,7 +76,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_federation_client()
+        self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -256,7 +255,7 @@ class FederationHandler(BaseHandler):
                     # know about
                     for p in prevs - seen:
                         state, got_auth_chain = (
-                            yield self.replication_layer.get_state_for_room(
+                            yield self.federation_client.get_state_for_room(
                                 origin, pdu.room_id, p
                             )
                         )
@@ -339,7 +338,7 @@ class FederationHandler(BaseHandler):
         #
         # see https://github.com/matrix-org/synapse/pull/1744
 
-        missing_events = yield self.replication_layer.get_missing_events(
+        missing_events = yield self.federation_client.get_missing_events(
             origin,
             pdu.room_id,
             earliest_events_ids=list(latest),
@@ -487,7 +486,10 @@ class FederationHandler(BaseHandler):
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.prev_state_ids.get(
+
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+                prev_state_id = prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -501,137 +503,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-    @measure_func("_filter_events_for_server")
-    @defer.inlineCallbacks
-    def _filter_events_for_server(self, server_name, room_id, events):
-        """Filter the given events for the given server, redacting those the
-        server can't see.
-
-        Assumes the server is currently in the room.
-
-        Returns
-            list[FrozenEvent]
-        """
-        # First lets check to see if all the events have a history visibility
-        # of "shared" or "world_readable". If thats the case then we don't
-        # need to check membership (as we know the server is in the room).
-        event_to_state_ids = yield self.store.get_state_ids_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-            )
-        )
-
-        visibility_ids = set()
-        for sids in event_to_state_ids.itervalues():
-            hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
-            if hist:
-                visibility_ids.add(hist)
-
-        # If we failed to find any history visibility events then the default
-        # is "shared" visiblity.
-        if not visibility_ids:
-            defer.returnValue(events)
-
-        event_map = yield self.store.get_events(visibility_ids)
-        all_open = all(
-            e.content.get("history_visibility") in (None, "shared", "world_readable")
-            for e in event_map.itervalues()
-        )
-
-        if all_open:
-            defer.returnValue(events)
-
-        # Ok, so we're dealing with events that have non-trivial visibility
-        # rules, so we need to also get the memberships of the room.
-
-        event_to_state_ids = yield self.store.get_state_ids_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, None),
-            )
-        )
-
-        # We only want to pull out member events that correspond to the
-        # server's domain.
-
-        def check_match(id):
-            try:
-                return server_name == get_domain_from_id(id)
-            except Exception:
-                return False
-
-        # Parses mapping `event_id -> (type, state_key) -> state event_id`
-        # to get all state ids that we're interested in.
-        event_map = yield self.store.get_events([
-            e_id
-            for key_to_eid in list(event_to_state_ids.values())
-            for key, e_id in key_to_eid.items()
-            if key[0] != EventTypes.Member or check_match(key[1])
-        ])
-
-        event_to_state = {
-            e_id: {
-                key: event_map[inner_e_id]
-                for key, inner_e_id in key_to_eid.iteritems()
-                if inner_e_id in event_map
-            }
-            for e_id, key_to_eid in event_to_state_ids.iteritems()
-        }
-
-        erased_senders = yield self.store.are_users_erased(
-            e.sender for e in events,
-        )
-
-        def redact_disallowed(event, state):
-            # if the sender has been gdpr17ed, always return a redacted
-            # copy of the event.
-            if erased_senders[event.sender]:
-                logger.info(
-                    "Sender of %s has been erased, redacting",
-                    event.event_id,
-                )
-                return prune_event(event)
-
-            if not state:
-                return event
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-                if visibility in ["invited", "joined"]:
-                    # We now loop through all state events looking for
-                    # membership states for the requesting server to determine
-                    # if the server is either in the room or has been invited
-                    # into the room.
-                    for ev in state.itervalues():
-                        if ev.type != EventTypes.Member:
-                            continue
-                        try:
-                            domain = get_domain_from_id(ev.state_key)
-                        except Exception:
-                            continue
-
-                        if domain != server_name:
-                            continue
-
-                        memtype = ev.membership
-                        if memtype == Membership.JOIN:
-                            return event
-                        elif memtype == Membership.INVITE:
-                            if visibility == "invited":
-                                return event
-                    else:
-                        return prune_event(event)
-
-            return event
-
-        defer.returnValue([
-            redact_disallowed(e, event_to_state[e.event_id])
-            for e in events
-        ])
-
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities):
@@ -651,7 +522,7 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
-        events = yield self.replication_layer.backfill(
+        events = yield self.federation_client.backfill(
             dest,
             room_id,
             limit=limit,
@@ -699,7 +570,7 @@ class FederationHandler(BaseHandler):
         state_events = {}
         events_to_state = {}
         for e_id in edges:
-            state, auth = yield self.replication_layer.get_state_for_room(
+            state, auth = yield self.federation_client.get_state_for_room(
                 destination=dest,
                 room_id=room_id,
                 event_id=e_id
@@ -741,7 +612,7 @@ class FederationHandler(BaseHandler):
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
                         logcontext.run_in_background(
-                            self.replication_layer.get_pdu,
+                            self.federation_client.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -863,7 +734,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in state.iteritems()
+                for (e_type, state_key), event in iteritems(state)
                 if e_type == EventTypes.Member
                 and event.membership == Membership.JOIN
             ]
@@ -880,7 +751,7 @@ class FederationHandler(BaseHandler):
                 except Exception:
                     pass
 
-            return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+            return sorted(joined_domains.items(), key=lambda d: d[1])
 
         curr_domains = get_domains_from_state(curr_state)
 
@@ -943,7 +814,7 @@ class FederationHandler(BaseHandler):
         tried_domains = set(likely_domains)
         tried_domains.add(self.server_name)
 
-        event_ids = list(extremities.iterkeys())
+        event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = logcontext.preserve_fn(
@@ -959,15 +830,15 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
-            [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
             get_prev_content=False
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in state_dict.iteritems()
+                for k, e_id in iteritems(state_dict)
                 if e_id in state_map
-            } for key, state_dict in states.iteritems()
+            } for key, state_dict in iteritems(states)
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1022,7 +893,7 @@ class FederationHandler(BaseHandler):
 
         Invites must be signed by the invitee's server before distribution.
         """
-        pdu = yield self.replication_layer.send_invite(
+        pdu = yield self.federation_client.send_invite(
             destination=target_host,
             room_id=event.room_id,
             event_id=event.event_id,
@@ -1038,16 +909,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -1094,7 +955,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.replication_layer.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(target_hosts, event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -1248,10 +1109,12 @@ class FederationHandler(BaseHandler):
                 user = UserID.from_string(event.state_key)
                 yield user_joined_room(self.distributor, user, event.room_id)
 
-        state_ids = list(context.prev_state_ids.values())
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        state_ids = list(prev_state_ids.values())
         auth_chain = yield self.store.get_auth_chain(state_ids)
 
-        state = yield self.store.get_events(list(context.prev_state_ids.values()))
+        state = yield self.store.get_events(list(prev_state_ids.values()))
 
         defer.returnValue({
             "state": list(state.values()),
@@ -1348,7 +1211,7 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
+        yield self.federation_client.send_leave(
             target_hosts,
             event
         )
@@ -1371,7 +1234,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
                                content={},):
-        origin, pdu = yield self.replication_layer.make_membership_event(
+        origin, pdu = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
@@ -1416,7 +1279,7 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_leave_request(self, room_id, user_id):
         """ We've received a /make_leave/ request, so we create a partial
-        join event for the room and return that. We do *not* persist or
+        leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
         builder = self.event_builder_factory.new({
@@ -1503,18 +1366,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1558,7 +1409,7 @@ class FederationHandler(BaseHandler):
             limit
         )
 
-        events = yield self._filter_events_for_server(origin, room_id, events)
+        events = yield filter_events_for_server(self.store, origin, events)
 
         defer.returnValue(events)
 
@@ -1586,18 +1437,6 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
             in_room = yield self.auth.check_host_in_room(
                 event.room_id,
                 origin
@@ -1605,8 +1444,8 @@ class FederationHandler(BaseHandler):
             if not in_room:
                 raise AuthError(403, "Host not in room.")
 
-            events = yield self._filter_events_for_server(
-                origin, event.room_id, [event]
+            events = yield filter_events_for_server(
+                self.store, origin, [event],
             )
             event = events[0]
             defer.returnValue(event)
@@ -1681,7 +1520,7 @@ class FederationHandler(BaseHandler):
         yield self.store.persist_events(
             [
                 (ev_info["event"], context)
-                for ev_info, context in itertools.izip(event_infos, contexts)
+                for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
         )
@@ -1728,7 +1567,7 @@ class FederationHandler(BaseHandler):
                     missing_auth_events.add(e_id)
 
         for e_id in missing_auth_events:
-            m_ev = yield self.replication_layer.get_pdu(
+            m_ev = yield self.federation_client.get_pdu(
                 [origin],
                 e_id,
                 outlier=True,
@@ -1801,8 +1640,9 @@ class FederationHandler(BaseHandler):
         )
 
         if not auth_events:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1862,15 +1702,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
@@ -1896,8 +1727,8 @@ class FederationHandler(BaseHandler):
             min_depth=min_depth,
         )
 
-        missing_events = yield self._filter_events_for_server(
-            origin, room_id, missing_events,
+        missing_events = yield filter_events_for_server(
+            self.store, origin, missing_events,
         )
 
         defer.returnValue(missing_events)
@@ -1946,7 +1777,7 @@ class FederationHandler(BaseHandler):
             logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
             try:
-                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
                 )
 
@@ -2051,9 +1882,10 @@ class FederationHandler(BaseHandler):
                         break
 
             if do_resolution:
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.prev_state_ids
+                    event, prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(
                     auth_ids, include_given=True
@@ -2061,7 +1893,7 @@ class FederationHandler(BaseHandler):
 
                 try:
                     # 2. Get remote difference.
-                    result = yield self.replication_layer.query_auth(
+                    result = yield self.federation_client.query_auth(
                         origin,
                         event.room_id,
                         event.event_id,
@@ -2143,21 +1975,34 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
             if k != event_key
         }
-        context.current_state_ids = dict(context.current_state_ids)
-        context.current_state_ids.update(state_updates)
-        if context.delta_ids is not None:
-            context.delta_ids = dict(context.delta_ids)
-            context.delta_ids.update(state_updates)
-        context.prev_state_ids = dict(context.prev_state_ids)
-        context.prev_state_ids.update({
+        current_state_ids = yield context.get_current_state_ids(self.store)
+        current_state_ids = dict(current_state_ids)
+
+        current_state_ids.update(state_updates)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({
             k: a.event_id for k, a in iteritems(auth_events)
         })
-        context.state_group = yield self.store.store_state_group(
+
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
+        state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
-            prev_group=context.prev_group,
-            delta_ids=context.delta_ids,
-            current_state_ids=context.current_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+            current_state_ids=current_state_ids,
+        )
+
+        yield context.update_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
         )
 
     @defer.inlineCallbacks
@@ -2347,7 +2192,7 @@ class FederationHandler(BaseHandler):
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
-            yield self.replication_layer.forward_third_party_invite(
+            yield self.federation_client.forward_third_party_invite(
                 destinations,
                 room_id,
                 event_dict,
@@ -2397,7 +2242,8 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.prev_state_ids.get(key)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -2439,7 +2285,8 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        invite_event_id = prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
             try:
                 if event.membership == Membership.JOIN:
                     room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
+                    deferred_room_state = run_in_background(
+                        self.state_handler.get_current_state,
+                        event.room_id,
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
+                    deferred_room_state = run_in_background(
+                        self.store.get_state_for_events,
+                        [event.event_id], None,
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
                 receipts = []
             defer.returnValue(receipts)
 
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                run_in_background(get_presence),
-                run_in_background(get_receipts),
-                run_in_background(
-                    self.store.get_recent_events_for_room,
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        presence, receipts, (messages, token) = yield make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_presence),
+                    run_in_background(get_receipts),
+                    run_in_background(
+                        self.store.get_recent_events_for_room,
+                        room_id,
+                        limit=limit,
+                        end_token=now_token.room_key,
+                    )
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError),
+        )
 
         messages = yield filter_events_for_client(
             self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -304,31 +82,6 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
     def get_state_events(self, user_id, room_id, is_guest=False):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
+            room_state = yield self.state.get_current_state(room_id)
         elif membership == Membership.LEAVE:
             room_state = yield self.store.get_state_for_events(
                 [membership_event_id], None
@@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
@@ -427,7 +180,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -630,7 +383,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +506,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -806,8 +560,9 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
-                    self.hs.get_clock(),
-                    self.http_client,
+                    clock=self.hs.get_clock(),
+                    store=self.store,
+                    client=self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
                     requester=requester,
@@ -884,9 +639,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -922,8 +679,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -943,11 +701,13 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
+class PaginationHandler(object):
+    """Handles pagination and purge history requests.
+
+    These are in the same handler due to the fact we need to block clients
+    paginating during a purge.
+    """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
+
+    def start_purge_history(self, room_id, token,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, token, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, token,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, token, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, requester, room_id=None, pagin_config=None,
+                     as_client_event=True, event_filter=None):
+        """Get messages in a room.
+
+        Args:
+            requester (Requester): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+            event_filter (Filter): Filter to apply to results or None
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        with (yield self.pagination_lock.read(room_id)):
+            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+                room_id, user_id
+            )
+
+            if source_config.direction == 'b':
+                # if we're going backwards, we might need to backfill. This
+                # requires that we have a topo token.
+                if room_token.topological:
+                    max_topo = room_token.topological
+                else:
+                    max_topo = yield self.store.get_max_topological_token(
+                        room_id, room_token.stream
+                    )
+
+                if membership == Membership.LEAVE:
+                    # If they have left the room then clamp the token to be before
+                    # they left the room, to save the effort of loading from the
+                    # database.
+                    leave_token = yield self.store.get_topological_token_for_event(
+                        member_event_id
+                    )
+                    leave_token = RoomStreamToken.parse(leave_token)
+                    if leave_token.topological < max_topo:
+                        source_config.from_key = str(leave_token)
+
+                yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                    room_id, max_topo
+                )
+
+            events, next_key = yield self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=source_config.from_key,
+                to_key=source_config.to_key,
+                direction=source_config.direction,
+                limit=source_config.limit,
+                event_filter=event_filter,
+            )
+
+            next_token = pagin_config.from_token.copy_and_replace(
+                "room_key", next_key
+            )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        if event_filter:
+            events = event_filter.filter(events)
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now, as_client_event)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 859f6d2b2e..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -18,6 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import AuthError, CodeMessageException, SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
 
 from ._base import BaseHandler
@@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler):
 
         if hs.config.worker_app is None:
             self.clock.looping_call(
-                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
             )
 
     @defer.inlineCallbacks
@@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler):
                     room_id, str(e.message)
                 )
 
+    def _start_update_remote_profile_cache(self):
+        return run_as_background_process(
+            "Update remote profile", self._update_remote_profile_cache,
+        )
+
+    @defer.inlineCallbacks
     def _update_remote_profile_cache(self):
         """Called periodically to check profiles of remote users we haven't
         checked in a while.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f67512078b..7b7804d9b2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 """Contains functions for performing events on rooms."""
+import itertools
 import logging
 import math
 import string
@@ -24,7 +25,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
-from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
@@ -395,9 +396,13 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
     @defer.inlineCallbacks
-    def get_event_context(self, user, room_id, event_id, limit):
+    def get_event_context(self, user, room_id, event_id, limit, event_filter):
         """Retrieves events, pagination tokens and state around a given event
         in a room.
 
@@ -407,6 +412,8 @@ class RoomContextHandler(BaseHandler):
             event_id (str)
             limit (int): The maximum number of events to return in total
                 (excluding state).
+            event_filter (Filter|None): the filter to apply to the events returned
+                (excluding the target event_id)
 
         Returns:
             dict, or None if the event isn't found
@@ -414,8 +421,6 @@ class RoomContextHandler(BaseHandler):
         before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
         users = yield self.store.get_users_in_room(room_id)
         is_peeking = user.to_string() not in users
 
@@ -441,7 +446,7 @@ class RoomContextHandler(BaseHandler):
             )
 
         results = yield self.store.get_events_around(
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter
         )
 
         results["events_before"] = yield filter_evts(results["events_before"])
@@ -453,16 +458,35 @@ class RoomContextHandler(BaseHandler):
         else:
             last_event_id = event_id
 
+        types = None
+        filtered_types = None
+        if event_filter and event_filter.lazy_load_members():
+            members = set(ev.sender for ev in itertools.chain(
+                results["events_before"],
+                (results["event"],),
+                results["events_after"],
+            ))
+            filtered_types = [EventTypes.Member]
+            types = [(EventTypes.Member, member) for member in members]
+
+        # XXX: why do we return the state as of the last event rather than the
+        # first? Shouldn't we be consistent with /sync?
+        # https://github.com/matrix-org/matrix-doc/issues/687
+
         state = yield self.store.get_state_for_events(
-            [last_event_id], None
+            [last_event_id], types, filtered_types=filtered_types,
         )
         results["state"] = list(state[last_event_id].values())
 
-        results["start"] = now_token.copy_and_replace(
+        # We use a dummy token here as we only care about the room portion of
+        # the token, which we replace.
+        token = StreamToken.START
+
+        results["start"] = token.copy_and_replace(
             "room_key", results["start"]
         ).to_string()
 
-        results["end"] = now_token.copy_and_replace(
+        results["end"] = token.copy_and_replace(
             "room_key", results["end"]
         ).to_string()
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..0d4a3f4677 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
         if prev_event is not None:
             return
 
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+                guest_can_join = yield self._can_guest_join(prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
@@ -705,6 +708,10 @@ class RoomMemberHandler(object):
             inviter_display_name = member_event.content.get("displayname", "")
             inviter_avatar_url = member_event.content.get("avatar_url", "")
 
+        # if user has no display name, default to their MXID
+        if not inviter_display_name:
+            inviter_display_name = user.to_string()
+
         canonical_room_alias = ""
         canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
         if canonical_alias_event:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
             contexts = {}
             for event in allowed_events:
                 res = yield self.store.get_events_around(
-                    event.room_id, event.event_id, before_limit, after_limit
+                    event.room_id, event.event_id, before_limit, after_limit,
                 )
 
                 logger.info(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -181,6 +192,12 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -416,29 +433,44 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None, filtered_types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(
+            event.event_id, types, filtered_types=filtered_types,
+        )
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +485,9 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(
+                last_event, types, filtered_types=filtered_types,
+            )
 
         else:
             # no events in this room - so presumably no state
@@ -485,59 +519,129 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            filtered_types = None
+
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # only apply the filtering to room members
+                filtered_types = [EventTypes.Member]
+
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
                     previous={},
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    if types:
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types,
+                            filtered_types=filtered_types,
+                        )
+
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.lazy_loaded_members_cache.get(cache_key)
+                if cache is None:
+                    logger.debug("creating LruCache for %r", cache_key)
+                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+                    self.lazy_loaded_members_cache[cache_key] = cache
+                else:
+                    logger.debug("found LruCache for %r", cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
 
         state = {}
         if state_ids:
@@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+    timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
     """Works out what state to include in a sync response.
 
     Args:
@@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
+        lazy_load_members (bool): whether to return members from timeline_start
+            or not.  assumes that timeline_start has already been filtered to
+            include only the members the client needs to know about.
 
     Returns:
         dict
@@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
+
+    # If we are lazyloading room members, we explicitly add the membership events
+    # for the senders in the timeline into the state block returned by /sync,
+    # as we may not have sent them to the client before.  We find these membership
+    # events by filtering them out of timeline_start, which has already been filtered
+    # to only include membership events for the senders in the timeline.
+    # In practice, we can do this by removing them from the p_ids list,
+    # which is the list of relevant state we know we have already sent to the client.
+    # see https://github.com/matrix-org/synapse/pull/2970
+    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+    if lazy_load_members:
+        p_ids.difference_update(
+            e for t, e in timeline_start.iteritems()
+            if t[0] == EventTypes.Member
+        )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6a0d75b2b..25b6307884 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
 from twisted.internet import defer, protocol, reactor, ssl, task
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
-from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
 from twisted.web.client import (
+    Agent,
+    BrowserLikeRedirectAgent,
+    ContentDecoderAgent,
+    FileBodyProducer as TwistedFileBodyProducer,
     GzipDecoder,
     HTTPConnectionPool,
     PartialDownloadError,
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
 )
 
 response_timer = Histogram(
-    "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+    "synapse_http_server_response_time_seconds", "sec",
+    ["method", "servlet", "tag", "code"],
 )
 
 response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
                 )
                 return
 
-        outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+        response_code = str(request.code)
+
+        outgoing_responses_counter.labels(request.method, response_code).inc()
 
         response_count.labels(request.method, self.name, tag).inc()
 
-        response_timer.labels(request.method, self.name, tag).observe(
+        response_timer.labels(request.method, self.name, tag, response_code).observe(
             time_sec - self.start
         )
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 41dd974cea..5fd30a4c2c 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -42,9 +42,10 @@ class SynapseRequest(Request):
     which is handling the request, and returns a context manager.
 
     """
-    def __init__(self, site, *args, **kw):
-        Request.__init__(self, *args, **kw)
+    def __init__(self, site, channel, *args, **kw):
+        Request.__init__(self, channel, *args, **kw)
         self.site = site
+        self._channel = channel
         self.authenticated_entity = None
         self.start_time = 0
 
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..ce678d5f75
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+    "synapse_background_process_start_count",
+    "Number of background processes started",
+    ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+    "synapse_background_process_ru_utime_seconds",
+    "User CPU time used by background processes, in seconds",
+    ["name"],
+    registry=None,
+)
+
+_background_process_ru_stime = Counter(
+    "synapse_background_process_ru_stime_seconds",
+    "System CPU time used by background processes, in seconds",
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+    "synapse_background_process_db_txn_count",
+    "Number of database transactions done by background processes",
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+    "synapse_background_process_db_txn_duration_seconds",
+    ("Seconds spent by background processes waiting for database "
+     "transactions, excluding scheduling time"),
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+    "synapse_background_process_db_sched_duration_seconds",
+    "Seconds spent by background processes waiting for database connections",
+    ["name"],
+    registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict()  # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict()  # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+    """A custom metrics collector for the background process metrics.
+
+    Ensures that all of the metrics are up-to-date with any in-flight processes
+    before they are returned.
+    """
+    def collect(self):
+        background_process_in_flight_count = GaugeMetricFamily(
+            "synapse_background_process_in_flight_count",
+            "Number of background processes in flight",
+            labels=["name"],
+        )
+
+        for desc, processes in six.iteritems(_background_processes):
+            background_process_in_flight_count.add_metric(
+                (desc,), len(processes),
+            )
+            for process in processes:
+                process.update_metrics()
+
+        yield background_process_in_flight_count
+
+        # now we need to run collect() over each of the static Counters, and
+        # yield each metric they return.
+        for m in (
+                _background_process_ru_utime,
+                _background_process_ru_stime,
+                _background_process_db_txn_count,
+                _background_process_db_txn_duration,
+                _background_process_db_sched_duration,
+        ):
+            for r in m.collect():
+                yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+    def __init__(self, desc, ctx):
+        self.desc = desc
+        self._context = ctx
+        self._reported_stats = None
+
+    def update_metrics(self):
+        """Updates the metrics with values from this process."""
+        new_stats = self._context.get_resource_usage()
+        if self._reported_stats is None:
+            diff = new_stats
+        else:
+            diff = new_stats - self._reported_stats
+        self._reported_stats = new_stats
+
+        _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+        _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+        _background_process_db_txn_count.labels(self.desc).inc(
+            diff.db_txn_count,
+        )
+        _background_process_db_txn_duration.labels(self.desc).inc(
+            diff.db_txn_duration_sec,
+        )
+        _background_process_db_sched_duration.labels(self.desc).inc(
+            diff.db_sched_duration_sec,
+        )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+    """Run the given function in its own logcontext, with resource metrics
+
+    This should be used to wrap processes which are fired off to run in the
+    background, instead of being associated with a particular request.
+
+    It returns a Deferred which completes when the function completes, but it doesn't
+    follow the synapse logcontext rules, which makes it appropriate for passing to
+    clock.looping_call and friends (or for firing-and-forgetting in the middle of a
+    normal synapse inlineCallbacks function).
+
+    Args:
+        desc (str): a description for this background process type
+        func: a function, which may return a Deferred
+        args: positional args for func
+        kwargs: keyword args for func
+
+    Returns: Deferred which returns the result of func, but note that it does not
+        follow the synapse logcontext rules.
+    """
+    @defer.inlineCallbacks
+    def run():
+        count = _background_process_counts.get(desc, 0)
+        _background_process_counts[desc] = count + 1
+        _background_process_start_count.labels(desc).inc()
+
+        with LoggingContext(desc) as context:
+            context.request = "%s-%i" % (desc, count)
+            proc = _BackgroundProcess(desc, context)
+            _background_processes.setdefault(desc, set()).add(proc)
+            try:
+                yield func(*args, **kwargs)
+            finally:
+                proc.update_metrics()
+                _background_processes[desc].remove(proc)
+
+    with PreserveLoggingContext():
+        return run()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 51cbd66f06..e650c3e494 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -274,7 +274,7 @@ class Notifier(object):
             logger.exception("Error notifying application services of event")
 
     def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
-        """ Used to inform listeners that something has happend event wise.
+        """ Used to inform listeners that something has happened event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bb181d94ee..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
 
     @defer.inlineCallbacks
     def _get_power_levels_and_sender_level(self, event, context):
-        pl_event_id = context.prev_state_ids.get(POWER_KEY)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
             # not having a power level event is an extreme edge case
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
             auth_events = {POWER_KEY: pl_event}
         else:
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=False,
+                event, prev_state_ids, for_verification=False,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -304,7 +305,7 @@ class RulesForRoom(object):
 
                 push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = context.current_state_ids
+                current_state_ids = yield context.get_current_state_ids(self.store)
                 push_rules_delta_state_cache_metric.inc_misses()
 
             push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
         clock (synapse.util.Clock)
+        store (DataStore)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
         host, port, event.event_id,
     )
 
+    serialized_context = yield context.serialize(event, store)
+
     payload = {
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(event),
+        "context": serialized_context,
         "requester": requester.serialize(),
         "ratelimit": ratelimit,
         "extra_users": [u.to_string() for u in extra_users],
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e592ab57bf..970e94313e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -192,7 +192,7 @@ class ReplicationClientHandler(object):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
 
-        Used by tests.
+        [Not currently] used by tests.
         """
         return self.awaiting_syncs.setdefault(data, defer.Deferred())
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 611fb66e1d..fd59f1595f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -25,6 +25,7 @@ from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.metrics import Measure, measure_func
 
 from .protocol import ServerReplicationStreamProtocol
@@ -117,7 +118,6 @@ class ReplicationStreamer(object):
         for conn in self.connections:
             conn.send_error("server shutting down")
 
-    @defer.inlineCallbacks
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.
@@ -132,14 +132,16 @@ class ReplicationStreamer(object):
                 stream.discard_updates_and_advance()
             return
 
-        # If we're in the process of checking for new updates, mark that fact
-        # and return
+        self.pending_updates = True
+
         if self.is_looping:
-            logger.debug("Noitifier poke loop already running")
-            self.pending_updates = True
+            logger.debug("Notifier poke loop already running")
             return
 
-        self.pending_updates = True
+        run_as_background_process("replication_notifier", self._run_notifier_loop)
+
+    @defer.inlineCallbacks
+    def _run_notifier_loop(self):
         self.is_looping = True
 
         try:
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 75c2a4ec8e..3418f06fd6 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,13 +14,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six import PY3
+
 from synapse.http.server import JsonResource
 from synapse.rest.client import versions
-from synapse.rest.client.v1 import admin, directory, events, initial_sync
-from synapse.rest.client.v1 import login as v1_login
-from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
-from synapse.rest.client.v1 import register as v1_register
-from synapse.rest.client.v1 import room, voip
+from synapse.rest.client.v1 import (
+    admin,
+    directory,
+    events,
+    initial_sync,
+    login as v1_login,
+    logout,
+    presence,
+    profile,
+    push_rule,
+    pusher,
+    room,
+    voip,
+)
 from synapse.rest.client.v2_alpha import (
     account,
     account_data,
@@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
     user_directory,
 )
 
+if not PY3:
+    from synapse.rest.client.v1_only import (
+        register as v1_register,
+    )
+
 
 class ClientRestResource(JsonResource):
     """A resource for version 1 of the matrix client API."""
@@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
     def register_servlets(client_resource, hs):
         versions.register_servlets(client_resource)
 
-        # "v1"
-        room.register_servlets(hs, client_resource)
+        if not PY3:
+            # "v1" (Python 2 only)
+            v1_register.register_servlets(hs, client_resource)
+
+        # Deprecated in r0
+        initial_sync.register_servlets(hs, client_resource)
+        room.register_deprecated_servlets(hs, client_resource)
+
+        # Partially deprecated in r0
         events.register_servlets(hs, client_resource)
-        v1_register.register_servlets(hs, client_resource)
+
+        # "v1" + "r0"
+        room.register_servlets(hs, client_resource)
         v1_login.register_servlets(hs, client_resource)
         profile.register_servlets(hs, client_resource)
         presence.register_servlets(hs, client_resource)
-        initial_sync.register_servlets(hs, client_resource)
         directory.register_servlets(hs, client_resource)
         voip.register_servlets(hs, client_resource)
         admin.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 2dc50e582b..99f6c6e3c3 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import hashlib
+import hmac
 import logging
 
 from six.moves import http_client
@@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
         defer.returnValue((200, ret))
 
 
+class UserRegisterServlet(ClientV1RestServlet):
+    """
+    Attributes:
+         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+         nonces (dict[str, int]): The nonces that we will accept. A dict of
+             nonce to the time it was generated, in int seconds.
+    """
+    PATTERNS = client_path_patterns("/admin/register")
+    NONCE_TIMEOUT = 60
+
+    def __init__(self, hs):
+        super(UserRegisterServlet, self).__init__(hs)
+        self.handlers = hs.get_handlers()
+        self.reactor = hs.get_reactor()
+        self.nonces = {}
+        self.hs = hs
+
+    def _clear_old_nonces(self):
+        """
+        Clear out old nonces that are older than NONCE_TIMEOUT.
+        """
+        now = int(self.reactor.seconds())
+
+        for k, v in list(self.nonces.items()):
+            if now - v > self.NONCE_TIMEOUT:
+                del self.nonces[k]
+
+    def on_GET(self, request):
+        """
+        Generate a new nonce.
+        """
+        self._clear_old_nonces()
+
+        nonce = self.hs.get_secrets().token_hex(64)
+        self.nonces[nonce] = int(self.reactor.seconds())
+        return (200, {"nonce": nonce.encode('ascii')})
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        self._clear_old_nonces()
+
+        if not self.hs.config.registration_shared_secret:
+            raise SynapseError(400, "Shared secret registration is not enabled")
+
+        body = parse_json_object_from_request(request)
+
+        if "nonce" not in body:
+            raise SynapseError(
+                400, "nonce must be specified", errcode=Codes.BAD_JSON,
+            )
+
+        nonce = body["nonce"]
+
+        if nonce not in self.nonces:
+            raise SynapseError(
+                400, "unrecognised nonce",
+            )
+
+        # Delete the nonce, so it can't be reused, even if it's invalid
+        del self.nonces[nonce]
+
+        if "username" not in body:
+            raise SynapseError(
+                400, "username must be specified", errcode=Codes.BAD_JSON,
+            )
+        else:
+            if (not isinstance(body['username'], str) or len(body['username']) > 512):
+                raise SynapseError(400, "Invalid username")
+
+            username = body["username"].encode("utf-8")
+            if b"\x00" in username:
+                raise SynapseError(400, "Invalid username")
+
+        if "password" not in body:
+            raise SynapseError(
+                400, "password must be specified", errcode=Codes.BAD_JSON,
+            )
+        else:
+            if (not isinstance(body['password'], str) or len(body['password']) > 512):
+                raise SynapseError(400, "Invalid password")
+
+            password = body["password"].encode("utf-8")
+            if b"\x00" in password:
+                raise SynapseError(400, "Invalid password")
+
+        admin = body.get("admin", None)
+        got_mac = body["mac"]
+
+        want_mac = hmac.new(
+            key=self.hs.config.registration_shared_secret.encode(),
+            digestmod=hashlib.sha1,
+        )
+        want_mac.update(nonce)
+        want_mac.update(b"\x00")
+        want_mac.update(username)
+        want_mac.update(b"\x00")
+        want_mac.update(password)
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
+        want_mac = want_mac.hexdigest()
+
+        if not hmac.compare_digest(want_mac, got_mac):
+            raise SynapseError(
+                403, "HMAC incorrect",
+            )
+
+        # Reuse the parts of RegisterRestServlet to reduce code duplication
+        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+        register = RegisterRestServlet(self.hs)
+
+        (user_id, _) = yield register.registration_handler.register(
+            localpart=username.lower(), password=password, admin=bool(admin),
+            generate_token=False,
+        )
+
+        result = yield register._create_registration_details(user_id, body)
+        defer.returnValue((200, result))
+
+
 class WhoisRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
 
@@ -123,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             hs (synapse.server.HomeServer)
         """
         super(PurgeHistoryRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
         self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
@@ -198,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        purge_id = yield self.handlers.message_handler.start_purge_history(
+        purge_id = yield self.pagination_handler.start_purge_history(
             room_id, token,
             delete_local_events=delete_local_events,
         )
@@ -220,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
             hs (synapse.server.HomeServer)
         """
         super(PurgeHistoryStatusRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, purge_id):
@@ -230,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+        purge_status = self.pagination_handler.get_purge_status(purge_id)
         if purge_status is None:
             raise NotFoundError("purge id '%s' not found" % purge_id)
 
@@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
     ShutdownRoomRestServlet(hs).register(http_server)
     QuarantineMediaInRoom(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
+    UserRegisterServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3d62447854..13c331550b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.message_handler = hs.get_message_handler()
 
     def register(self, http_server):
         # /room/$roomid/state/$eventtype
@@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         format = parse_string(request, "format", default="content",
                               allowed_values=["content", "event"])
 
-        msg_handler = self.handlers.message_handler
+        msg_handler = self.message_handler
         data = yield msg_handler.get_room_data(
             user_id=requester.user.to_string(),
             room_id=room_id,
@@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMemberListRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
         # TODO support Pagination stream API (limit/tokens)
         requester = yield self.auth.get_user_by_req(request)
-        handler = self.handlers.message_handler
-        events = yield handler.get_state_events(
+        events = yield self.message_handler.get_state_events(
             room_id=room_id,
             user_id=requester.user.to_string(),
         )
@@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(JoinedRoomMemberListRestServlet, self).__init__(hs)
-        self.message_handler = hs.get_handlers().message_handler
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
@@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMessageListRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.pagination_handler = hs.get_pagination_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
@@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
             event_filter = Filter(json.loads(filter_json))
         else:
             event_filter = None
-        handler = self.handlers.message_handler
-        msgs = yield handler.get_messages(
+        msgs = yield self.pagination_handler.get_messages(
             room_id=room_id,
             requester=requester,
             pagin_config=pagination_config,
@@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomStateRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.message_handler = hs.get_message_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
-        handler = self.handlers.message_handler
         # Get all the current state for this room
-        events = yield handler.get_state_events(
+        events = yield self.message_handler.get_state_events(
             room_id=room_id,
             user_id=requester.user.to_string(),
             is_guest=requester.is_guest,
@@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(RoomEventContextServlet, self).__init__(hs)
         self.clock = hs.get_clock()
-        self.handlers = hs.get_handlers()
+        self.room_context_handler = hs.get_room_context_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request, room_id, event_id):
@@ -533,11 +531,20 @@ class RoomEventContextServlet(ClientV1RestServlet):
 
         limit = parse_integer(request, "limit", default=10)
 
-        results = yield self.handlers.room_context_handler.get_event_context(
+        # picking the API shape for symmetry with /messages
+        filter_bytes = parse_string(request, "filter")
+        if filter_bytes:
+            filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
+            event_filter = Filter(json.loads(filter_json))
+        else:
+            event_filter = None
+
+        results = yield self.room_context_handler.get_event_context(
             requester.user,
             room_id,
             event_id,
             limit,
+            event_filter,
         )
 
         if not results:
@@ -832,10 +839,13 @@ def register_servlets(hs, http_server):
     RoomSendEventRestServlet(hs).register(http_server)
     PublicRoomListRestServlet(hs).register(http_server)
     RoomStateRestServlet(hs).register(http_server)
-    RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
     SearchRestServlet(hs).register(http_server)
     JoinedRoomsRestServlet(hs).register(http_server)
     RoomEventServlet(hs).register(http_server)
     RoomEventContextServlet(hs).register(http_server)
+
+
+def register_deprecated_servlets(hs, http_server):
+    RoomInitialSyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py
new file mode 100644
index 0000000000..936f902ace
--- /dev/null
+++ b/synapse/rest/client/v1_only/__init__.py
@@ -0,0 +1,3 @@
+"""
+REST APIs that are only used in v1 (the legacy API).
+"""
diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py
new file mode 100644
index 0000000000..9d4db7437c
--- /dev/null
+++ b/synapse/rest/client/v1_only/base.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains base REST classes for constructing client v1 servlets.
+"""
+
+import re
+
+from synapse.api.urls import CLIENT_PREFIX
+
+
+def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
+    """Creates a regex compiled client path with the correct client path
+    prefix.
+
+    Args:
+        path_regex (str): The regex string to match. This should NOT have a ^
+        as this will be prefixed.
+    Returns:
+        list of SRE_Pattern
+    """
+    patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
+    if include_in_unstable:
+        unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
+        patterns.append(re.compile("^" + unstable_prefix + path_regex))
+    return patterns
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py
index 25a143af8d..3439c3c6d4 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
+from synapse.rest.client.v1.base import ClientV1RestServlet
 from synapse.types import create_requester
 
-from .base import ClientV1RestServlet, client_path_patterns
+from .base import v1_only_client_path_patterns
 
 logger = logging.getLogger(__name__)
 
@@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
     handler doesn't have a concept of multi-stages or sessions.
     """
 
-    PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
+    PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
 
     def __init__(self, hs):
         """
@@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
     """Handles user creation via a server-to-server interface
     """
 
-    PATTERNS = client_path_patterns("/createUser$", releases=())
+    PATTERNS = v1_only_client_path_patterns("/createUser$")
 
     def __init__(self, hs):
         super(CreateUserRestServlet, self).__init__(hs)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1918897f68..d6cf915d86 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -643,7 +643,7 @@ class RegisterRestServlet(RestServlet):
     @defer.inlineCallbacks
     def _do_guest_registration(self, params):
         if not self.hs.config.allow_guest_access:
-            defer.returnValue((403, "Guest access is disabled"))
+            raise SynapseError(403, "Guest access is disabled")
         user_id, _ = yield self.registration_handler.register(
             generate_token=False,
             make_guest=True
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 30242c525a..174ad20123 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.async import Linearizer
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
@@ -100,10 +101,15 @@ class MediaRepository(object):
         )
 
         self.clock.looping_call(
-            self._update_recently_accessed,
+            self._start_update_recently_accessed,
             UPDATE_RECENTLY_ACCESSED_TS,
         )
 
+    def _start_update_recently_accessed(self):
+        return run_as_background_process(
+            "update_recently_accessed_media", self._update_recently_accessed,
+        )
+
     @defer.inlineCallbacks
     def _update_recently_accessed(self):
         remote_media = self.recently_accessed_remotes
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index b70b15c4c2..27aa0def2f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -41,6 +41,7 @@ from synapse.http.server import (
     wrap_json_request_handler,
 )
 from synapse.http.servlet import parse_integer, parse_string
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -81,7 +82,7 @@ class PreviewUrlResource(Resource):
         self._cache.start()
 
         self._cleaner_loop = self.clock.looping_call(
-            self._expire_url_cache_data, 10 * 1000
+            self._start_expire_url_cache_data, 10 * 1000,
         )
 
     def render_OPTIONS(self, request):
@@ -371,6 +372,11 @@ class PreviewUrlResource(Resource):
             "etag": headers["ETag"][0] if "ETag" in headers else None,
         })
 
+    def _start_expire_url_cache_data(self):
+        return run_as_background_process(
+            "expire_url_cache_data", self._expire_url_cache_data,
+        )
+
     @defer.inlineCallbacks
     def _expire_url_cache_data(self):
         """Clean up expired url cache content, media and thumbnails.
diff --git a/synapse/secrets.py b/synapse/secrets.py
new file mode 100644
index 0000000000..f05e9ea535
--- /dev/null
+++ b/synapse/secrets.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Injectable secrets module for Synapse.
+
+See https://docs.python.org/3/library/secrets.html#module-secrets for the API
+used in Python 3.6, and the API emulated in Python 2.7.
+"""
+
+import sys
+
+# secrets is available since python 3.6
+if sys.version_info[0:2] >= (3, 6):
+    import secrets
+
+    def Secrets():
+        return secrets
+
+else:
+    import os
+    import binascii
+
+    class Secrets(object):
+        def token_bytes(self, nbytes=32):
+            return os.urandom(nbytes)
+
+        def token_hex(self, nbytes=32):
+            return binascii.hexlify(self.token_bytes(nbytes))
diff --git a/synapse/server.py b/synapse/server.py
index 92bea96c5c..140be9ebe8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.groups_local import GroupsLocalHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
-from synapse.handlers.message import EventCreationHandler
+from synapse.handlers.message import EventCreationHandler, MessageHandler
+from synapse.handlers.pagination import PaginationHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.profile import ProfileHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
 from synapse.handlers.receipts import ReceiptsHandler
-from synapse.handlers.room import RoomCreationHandler
+from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
 from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import RoomMemberMasterHandler
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -74,6 +75,7 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
+from synapse.secrets import Secrets
 from synapse.server_notices.server_notices_manager import ServerNoticesManager
 from synapse.server_notices.server_notices_sender import ServerNoticesSender
 from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
@@ -158,11 +160,15 @@ class HomeServer(object):
         'groups_server_handler',
         'groups_attestation_signing',
         'groups_attestation_renewer',
+        'secrets',
         'spam_checker',
         'room_member_handler',
         'federation_registry',
         'server_notices_manager',
         'server_notices_sender',
+        'message_handler',
+        'pagination_handler',
+        'room_context_handler',
     ]
 
     def __init__(self, hostname, reactor=None, **kwargs):
@@ -405,6 +411,9 @@ class HomeServer(object):
     def build_groups_attestation_renewer(self):
         return GroupAttestionRenewer(self)
 
+    def build_secrets(self):
+        return Secrets()
+
     def build_spam_checker(self):
         return SpamChecker(self)
 
@@ -426,6 +435,15 @@ class HomeServer(object):
             return WorkerServerNoticesSender(self)
         return ServerNoticesSender(self)
 
+    def build_message_handler(self):
+        return MessageHandler(self)
+
+    def build_pagination_handler(self):
+        return PaginationHandler(self)
+
+    def build_room_context_handler(self):
+        return RoomContextHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/state.py b/synapse/state.py
index 15a593d41c..033f55d967 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,7 +18,7 @@ import hashlib
 import logging
 from collections import namedtuple
 
-from six import iteritems, itervalues
+from six import iteritems, iterkeys, itervalues
 
 from frozendict import frozendict
 
@@ -203,25 +203,27 @@ class StateHandler(object):
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
-            context = EventContext()
             if old_state:
-                context.prev_state_ids = {
+                prev_state_ids = {
                     (s.type, s.state_key): s.event_id for s in old_state
                 }
                 if event.is_state():
-                    context.current_state_ids = dict(context.prev_state_ids)
+                    current_state_ids = dict(prev_state_ids)
                     key = (event.type, event.state_key)
-                    context.current_state_ids[key] = event.event_id
+                    current_state_ids[key] = event.event_id
                 else:
-                    context.current_state_ids = context.prev_state_ids
+                    current_state_ids = prev_state_ids
             else:
-                context.current_state_ids = {}
-                context.prev_state_ids = {}
-            context.prev_state_events = []
+                current_state_ids = {}
+                prev_state_ids = {}
 
             # We don't store state for outliers, so we don't generate a state
-            # froup for it.
-            context.state_group = None
+            # group for it.
+            context = EventContext.with_state(
+                state_group=None,
+                current_state_ids=current_state_ids,
+                prev_state_ids=prev_state_ids,
+            )
 
             defer.returnValue(context)
 
@@ -230,31 +232,35 @@ class StateHandler(object):
             # Let's just correctly fill out the context and create a
             # new state group for it.
 
-            context = EventContext()
-            context.prev_state_ids = {
+            prev_state_ids = {
                 (s.type, s.state_key): s.event_id for s in old_state
             }
 
             if event.is_state():
                 key = (event.type, event.state_key)
-                if key in context.prev_state_ids:
-                    replaces = context.prev_state_ids[key]
+                if key in prev_state_ids:
+                    replaces = prev_state_ids[key]
                     if replaces != event.event_id:  # Paranoia check
                         event.unsigned["replaces_state"] = replaces
-                context.current_state_ids = dict(context.prev_state_ids)
-                context.current_state_ids[key] = event.event_id
+                current_state_ids = dict(prev_state_ids)
+                current_state_ids[key] = event.event_id
             else:
-                context.current_state_ids = context.prev_state_ids
+                current_state_ids = prev_state_ids
 
-            context.state_group = yield self.store.store_state_group(
+            state_group = yield self.store.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=None,
                 delta_ids=None,
-                current_state_ids=context.current_state_ids,
+                current_state_ids=current_state_ids,
+            )
+
+            context = EventContext.with_state(
+                state_group=state_group,
+                current_state_ids=current_state_ids,
+                prev_state_ids=prev_state_ids,
             )
 
-            context.prev_state_events = []
             defer.returnValue(context)
 
         logger.debug("calling resolve_state_groups from compute_event_context")
@@ -262,47 +268,47 @@ class StateHandler(object):
             event.room_id, [e for e, _ in event.prev_events],
         )
 
-        curr_state = entry.state
+        prev_state_ids = entry.state
+        prev_group = None
+        delta_ids = None
 
-        context = EventContext()
-        context.prev_state_ids = curr_state
         if event.is_state():
             # If this is a state event then we need to create a new state
             # group for the state after this event.
 
             key = (event.type, event.state_key)
-            if key in context.prev_state_ids:
-                replaces = context.prev_state_ids[key]
+            if key in prev_state_ids:
+                replaces = prev_state_ids[key]
                 event.unsigned["replaces_state"] = replaces
 
-            context.current_state_ids = dict(context.prev_state_ids)
-            context.current_state_ids[key] = event.event_id
+            current_state_ids = dict(prev_state_ids)
+            current_state_ids[key] = event.event_id
 
             if entry.state_group:
                 # If the state at the event has a state group assigned then
                 # we can use that as the prev group
-                context.prev_group = entry.state_group
-                context.delta_ids = {
+                prev_group = entry.state_group
+                delta_ids = {
                     key: event.event_id
                 }
             elif entry.prev_group:
                 # If the state at the event only has a prev group, then we can
                 # use that as a prev group too.
-                context.prev_group = entry.prev_group
-                context.delta_ids = dict(entry.delta_ids)
-                context.delta_ids[key] = event.event_id
+                prev_group = entry.prev_group
+                delta_ids = dict(entry.delta_ids)
+                delta_ids[key] = event.event_id
 
-            context.state_group = yield self.store.store_state_group(
+            state_group = yield self.store.store_state_group(
                 event.event_id,
                 event.room_id,
-                prev_group=context.prev_group,
-                delta_ids=context.delta_ids,
-                current_state_ids=context.current_state_ids,
+                prev_group=prev_group,
+                delta_ids=delta_ids,
+                current_state_ids=current_state_ids,
             )
         else:
-            context.current_state_ids = context.prev_state_ids
-            context.prev_group = entry.prev_group
-            context.delta_ids = entry.delta_ids
+            current_state_ids = prev_state_ids
+            prev_group = entry.prev_group
+            delta_ids = entry.delta_ids
 
             if entry.state_group is None:
                 entry.state_group = yield self.store.store_state_group(
@@ -310,13 +316,20 @@ class StateHandler(object):
                     event.room_id,
                     prev_group=entry.prev_group,
                     delta_ids=entry.delta_ids,
-                    current_state_ids=context.current_state_ids,
+                    current_state_ids=current_state_ids,
                 )
                 entry.state_id = entry.state_group
 
-            context.state_group = entry.state_group
+            state_group = entry.state_group
+
+        context = EventContext.with_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=delta_ids,
+        )
 
-        context.prev_state_events = []
         defer.returnValue(context)
 
     @defer.inlineCallbacks
@@ -458,69 +471,39 @@ class StateResolutionHandler(object):
                 "Resolving state for %s with %d groups", room_id, len(state_groups_ids)
             )
 
-            # build a map from state key to the event_ids which set that state.
-            # dict[(str, str), set[str])
-            state = {}
+            # start by assuming we won't have any conflicted state, and build up the new
+            # state map by iterating through the state groups. If we discover a conflict,
+            # we give up and instead use `resolve_events_with_factory`.
+            #
+            # XXX: is this actually worthwhile, or should we just let
+            # resolve_events_with_factory do it?
+            new_state = {}
+            conflicted_state = False
             for st in itervalues(state_groups_ids):
                 for key, e_id in iteritems(st):
-                    state.setdefault(key, set()).add(e_id)
-
-            # build a map from state key to the event_ids which set that state,
-            # including only those where there are state keys in conflict.
-            conflicted_state = {
-                k: list(v)
-                for k, v in iteritems(state)
-                if len(v) > 1
-            }
+                    if key in new_state:
+                        conflicted_state = True
+                        break
+                    new_state[key] = e_id
+                if conflicted_state:
+                    break
 
             if conflicted_state:
                 logger.info("Resolving conflicted state for %r", room_id)
                 with Measure(self.clock, "state._resolve_events"):
                     new_state = yield resolve_events_with_factory(
-                        list(state_groups_ids.values()),
+                        list(itervalues(state_groups_ids)),
                         event_map=event_map,
                         state_map_factory=state_map_factory,
                     )
-            else:
-                new_state = {
-                    key: e_ids.pop() for key, e_ids in iteritems(state)
-                }
 
-            with Measure(self.clock, "state.create_group_ids"):
-                # if the new state matches any of the input state groups, we can
-                # use that state group again. Otherwise we will generate a state_id
-                # which will be used as a cache key for future resolutions, but
-                # not get persisted.
-                state_group = None
-                new_state_event_ids = frozenset(itervalues(new_state))
-                for sg, events in iteritems(state_groups_ids):
-                    if new_state_event_ids == frozenset(e_id for e_id in events):
-                        state_group = sg
-                        break
+            # if the new state matches any of the input state groups, we can
+            # use that state group again. Otherwise we will generate a state_id
+            # which will be used as a cache key for future resolutions, but
+            # not get persisted.
 
-                # TODO: We want to create a state group for this set of events, to
-                # increase cache hits, but we need to make sure that it doesn't
-                # end up as a prev_group without being added to the database
-
-                prev_group = None
-                delta_ids = None
-                for old_group, old_ids in iteritems(state_groups_ids):
-                    if not set(new_state) - set(old_ids):
-                        n_delta_ids = {
-                            k: v
-                            for k, v in iteritems(new_state)
-                            if old_ids.get(k) != v
-                        }
-                        if not delta_ids or len(n_delta_ids) < len(delta_ids):
-                            prev_group = old_group
-                            delta_ids = n_delta_ids
-
-            cache = _StateCacheEntry(
-                state=new_state,
-                state_group=state_group,
-                prev_group=prev_group,
-                delta_ids=delta_ids,
-            )
+            with Measure(self.clock, "state.create_group_ids"):
+                cache = _make_state_cache_entry(new_state, state_groups_ids)
 
             if self._state_cache is not None:
                 self._state_cache[group_names] = cache
@@ -528,6 +511,70 @@ class StateResolutionHandler(object):
             defer.returnValue(cache)
 
 
+def _make_state_cache_entry(
+    new_state,
+    state_groups_ids,
+):
+    """Given a resolved state, and a set of input state groups, pick one to base
+    a new state group on (if any), and return an appropriately-constructed
+    _StateCacheEntry.
+
+    Args:
+        new_state (dict[(str, str), str]): resolved state map (mapping from
+           (type, state_key) to event_id)
+
+        state_groups_ids (dict[int, dict[(str, str), str]]):
+                 map from state group id to the state in that state group
+                (where 'state' is a map from state key to event id)
+
+    Returns:
+        _StateCacheEntry
+    """
+    # if the new state matches any of the input state groups, we can
+    # use that state group again. Otherwise we will generate a state_id
+    # which will be used as a cache key for future resolutions, but
+    # not get persisted.
+
+    # first look for exact matches
+    new_state_event_ids = set(itervalues(new_state))
+    for sg, state in iteritems(state_groups_ids):
+        if len(new_state_event_ids) != len(state):
+            continue
+
+        old_state_event_ids = set(itervalues(state))
+        if new_state_event_ids == old_state_event_ids:
+            # got an exact match.
+            return _StateCacheEntry(
+                state=new_state,
+                state_group=sg,
+            )
+
+    # TODO: We want to create a state group for this set of events, to
+    # increase cache hits, but we need to make sure that it doesn't
+    # end up as a prev_group without being added to the database
+
+    # failing that, look for the closest match.
+    prev_group = None
+    delta_ids = None
+
+    for old_group, old_state in iteritems(state_groups_ids):
+        n_delta_ids = {
+            k: v
+            for k, v in iteritems(new_state)
+            if old_state.get(k) != v
+        }
+        if not delta_ids or len(n_delta_ids) < len(delta_ids):
+            prev_group = old_group
+            delta_ids = n_delta_ids
+
+    return _StateCacheEntry(
+        state=new_state,
+        state_group=None,
+        prev_group=prev_group,
+        delta_ids=delta_ids,
+    )
+
+
 def _ordered_events(events):
     def key_func(e):
         return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
@@ -569,7 +616,7 @@ def _seperate(state_sets):
     with them in different state sets.
 
     Args:
-        state_sets(list[dict[(str, str), str]]):
+        state_sets(iterable[dict[(str, str), str]]):
             List of dicts of (type, state_key) -> event_id, which are the
             different state groups to resolve.
 
@@ -583,10 +630,11 @@ def _seperate(state_sets):
             conflicted_state is a dict mapping (type, state_key) to a set of
             event ids for conflicted state keys.
     """
-    unconflicted_state = dict(state_sets[0])
+    state_set_iterator = iter(state_sets)
+    unconflicted_state = dict(next(state_set_iterator))
     conflicted_state = {}
 
-    for state_set in state_sets[1:]:
+    for state_set in state_set_iterator:
         for key, value in iteritems(state_set):
             # Check if there is an unconflicted entry for the state key.
             unconflicted_value = unconflicted_state.get(key)
@@ -647,7 +695,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
         for event_id in event_ids
     )
     if event_map is not None:
-        needed_events -= set(event_map.iterkeys())
+        needed_events -= set(iterkeys(event_map))
 
     logger.info("Asking for %d conflicted events", len(needed_events))
 
@@ -668,7 +716,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
     new_needed_events = set(itervalues(auth_events))
     new_needed_events -= needed_events
     if event_map is not None:
-        new_needed_events -= set(event_map.iterkeys())
+        new_needed_events -= set(iterkeys(event_map))
 
     logger.info("Asking for %d auth events", len(new_needed_events))
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ba88a54979..3bd63cd195 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
+                EventsStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 RejectionsStore,
@@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 PusherStore,
                 PushRuleStore,
                 ApplicationServiceTransactionStore,
-                EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
                 SearchStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 98dde77431..44f37b4c1e 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -311,6 +311,12 @@ class SQLBaseStore(object):
         after_callbacks = []
         exception_callbacks = []
 
+        if LoggingContext.current_context() == LoggingContext.sentinel:
+            logger.warn(
+                "Starting db txn '%s' from sentinel context",
+                desc,
+            )
+
         try:
             result = yield self.runWithConnection(
                 self._new_transaction,
@@ -344,7 +350,7 @@ class SQLBaseStore(object):
         parent_context = LoggingContext.current_context()
         if parent_context == LoggingContext.sentinel:
             logger.warn(
-                "Running db txn from sentinel context: metrics will be lost",
+                "Starting db connection from sentinel context: metrics will be lost",
             )
             parent_context = None
 
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 9f12b360bc..31248d5e06 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dc9eca7d15..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,6 +19,8 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
+
 from . import engines
 from ._base import SQLBaseStore
 
@@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_handlers = {}
         self._all_done = False
 
-    @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        logger.info("Starting background schema updates")
+        run_as_background_process(
+            "background_updates", self._run_background_updates,
+        )
 
+    @defer.inlineCallbacks
+    def _run_background_updates(self):
+        logger.info("Starting background schema updates")
         while True:
             yield self.hs.get_clock().sleep(
                 self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b78eda3413..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,6 +19,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
 from . import background_updates
@@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self._batch_row_update[key] = (user_agent, device_id, now)
 
     def _update_client_ips_batch(self):
-        to_update = self._batch_row_update
-        self._batch_row_update = {}
-        return self.runInteraction(
-            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+        def update():
+            to_update = self._batch_row_update
+            self._batch_row_update = {}
+            return self.runInteraction(
+                "_update_client_ips_batch", self._update_client_ips_batch_txn,
+                to_update,
+            )
+
+        return run_as_background_process(
+            "update_client_ips", update,
         )
 
     def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index ec68e39f1e..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 from ._base import Cache, SQLBaseStore
@@ -248,17 +249,31 @@ class DeviceStore(SQLBaseStore):
 
     def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
                                                    content, stream_id):
-        self._simple_upsert_txn(
-            txn,
-            table="device_lists_remote_cache",
-            keyvalues={
-                "user_id": user_id,
-                "device_id": device_id,
-            },
-            values={
-                "content": json.dumps(content),
-            }
-        )
+        if content.get("deleted"):
+            self._simple_delete_txn(
+                txn,
+                table="device_lists_remote_cache",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+            )
+
+            txn.call_after(
+                self.device_id_exists_cache.invalidate, (user_id, device_id,)
+            )
+        else:
+            self._simple_upsert_txn(
+                txn,
+                table="device_lists_remote_cache",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+                values={
+                    "content": json.dumps(content),
+                }
+            )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
         txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
@@ -366,7 +381,7 @@ class DeviceStore(SQLBaseStore):
             now_stream_id = max(stream_id for stream_id in itervalues(query_map))
 
         devices = self._get_e2e_device_keys_txn(
-            txn, query_map.keys(), include_all_devices=True
+            txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True
         )
 
         prev_sent_id_sql = """
@@ -393,12 +408,15 @@ class DeviceStore(SQLBaseStore):
 
                 prev_id = stream_id
 
-                key_json = device.get("key_json", None)
-                if key_json:
-                    result["keys"] = json.loads(key_json)
-                device_display_name = device.get("device_display_name", None)
-                if device_display_name:
-                    result["device_display_name"] = device_display_name
+                if device is not None:
+                    key_json = device.get("key_json", None)
+                    if key_json:
+                        result["keys"] = json.loads(key_json)
+                    device_display_name = device.get("device_display_name", None)
+                    if device_display_name:
+                        result["device_display_name"] = device_display_name
+                else:
+                    result["deleted"] = True
 
                 results.append(result)
 
@@ -694,6 +712,9 @@ class DeviceStore(SQLBaseStore):
 
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
-        return self.runInteraction(
-            "_prune_old_outbound_device_pokes", _prune_txn
+        return run_as_background_process(
+            "prune_old_outbound_device_pokes",
+            self.runInteraction,
+            "_prune_old_outbound_device_pokes",
+            _prune_txn,
         )
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7ae5c65482..523b4360c3 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_e2e_device_keys(self, query_list, include_all_devices=False):
+    def get_e2e_device_keys(
+        self, query_list, include_all_devices=False,
+        include_deleted_devices=False,
+    ):
         """Fetch a list of device keys.
         Args:
             query_list(list): List of pairs of user_ids and device_ids.
             include_all_devices (bool): whether to include entries for devices
                 that don't have device keys
+            include_deleted_devices (bool): whether to include null entries for
+                devices which no longer exist (but were in the query_list).
+                This option only takes effect if include_all_devices is true.
         Returns:
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
@@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore):
 
         results = yield self.runInteraction(
             "get_e2e_device_keys", self._get_e2e_device_keys_txn,
-            query_list, include_all_devices,
+            query_list, include_all_devices, include_deleted_devices,
         )
 
         for user_id, device_keys in iteritems(results):
@@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices):
+    def _get_e2e_device_keys_txn(
+        self, txn, query_list, include_all_devices=False,
+        include_deleted_devices=False,
+    ):
         query_clauses = []
         query_params = []
 
+        if include_all_devices is False:
+            include_deleted_devices = False
+
+        if include_deleted_devices:
+            deleted_devices = set(query_list)
+
         for (user_id, device_id) in query_list:
             query_clause = "user_id = ?"
             query_params.append(user_id)
@@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore):
 
         result = {}
         for row in rows:
+            if include_deleted_devices:
+                deleted_devices.remove((row["user_id"], row["device_id"]))
             result.setdefault(row["user_id"], {})[row["device_id"]] = row
 
+        if include_deleted_devices:
+            for user_id, device_id in deleted_devices:
+                result.setdefault(user_id, {})[device_id] = None
+
         return result
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8d366d1b91..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,8 +23,9 @@ from unpaddedbase64 import encode_base64
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.util.caches.descriptors import cached
 
@@ -113,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
         sql = (
             "SELECT b.event_id, MAX(e.depth) FROM events as e"
             " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+            " ON g.event_id = e.event_id"
             " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+            " ON g.prev_event_id = b.event_id"
             " WHERE b.room_id = ? AND g.is_state is ?"
             " GROUP BY b.event_id"
         )
@@ -329,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             "SELECT depth, prev_event_id FROM event_edges"
             " INNER JOIN events"
             " ON prev_event_id = events.event_id"
-            " AND event_edges.room_id = events.room_id"
-            " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+            " WHERE event_edges.event_id = ?"
             " AND event_edges.is_state = ?"
             " LIMIT ?"
         )
@@ -364,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
             txn.execute(
                 query,
-                (room_id, event_id, False, limit - len(event_results))
+                (event_id, False, limit - len(event_results))
             )
 
             for row in txn:
@@ -401,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+            "WHERE event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -410,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, False, limit - len(event_results))
+                    (event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn:
@@ -446,7 +446,7 @@ class EventFederationStore(EventFederationWorkerStore):
         )
 
         hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
         )
 
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -548,9 +548,11 @@ class EventFederationStore(EventFederationWorkerStore):
                 sql,
                 (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
-        return self.runInteraction(
+        return run_as_background_process(
+            "delete_old_forward_extrem_cache",
+            self.runInteraction,
             "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
+            _delete_old_forward_extrem_cache_txn,
         )
 
     def clean_room_for_join(self, room_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 29b511ae5e..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction, SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
@@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "Error removing push actions after event persistence failure",
             )
 
-    @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
+        return run_as_background_process(
+            "event_push_action_stream_orderings",
+            self.runInteraction,
             "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
+            self._find_stream_orderings_for_times_txn,
         )
 
     def _find_stream_orderings_for_times_txn(self, txn):
@@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
 
         self._doing_notif_rotation = False
         self._rotate_notif_loop = self._clock.looping_call(
-            self._rotate_notifs, 30 * 60 * 1000
+            self._start_rotate_notifs, 30 * 60 * 1000,
         )
 
     def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
         """, (room_id, user_id, stream_ordering))
 
+    def _start_rotate_notifs(self):
+        return run_as_background_process("rotate_notifs", self._rotate_notifs)
+
     @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..c98e524ba1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
 from collections import OrderedDict, deque, namedtuple
 from functools import wraps
 
-from six import iteritems, itervalues
+from six import iteritems
 from six.moves import range
 
 from canonicaljson import json
@@ -33,6 +33,9 @@ from synapse.api.errors import SynapseError
 # these are only included to make the type annotations work
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
@@ -141,25 +144,22 @@ class _EventPeristenceQueue(object):
             try:
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
-                    # handle_queue_loop runs in the sentinel logcontext, so
-                    # there is no need to preserve_fn when running the
-                    # callbacks on the deferred.
                     try:
                         ret = yield per_item_callback(item)
-                        item.deferred.callback(ret)
                     except Exception:
-                        item.deferred.errback()
+                        with PreserveLoggingContext():
+                            item.deferred.errback()
+                    else:
+                        with PreserveLoggingContext():
+                            item.deferred.callback(ret)
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
                     self._event_persist_queues[room_id] = queue
                 self._currently_persisting_rooms.discard(room_id)
 
-        # set handle_queue_loop off on the background. We don't want to
-        # attribute work done in it to the current request, so we drop the
-        # logcontext altogether.
-        with PreserveLoggingContext():
-            handle_queue_loop()
+        # set handle_queue_loop off in the background
+        run_as_background_process("persist_events", handle_queue_loop)
 
     def _get_drainining_queue(self, room_id):
         queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -195,7 +195,9 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -345,11 +347,14 @@ class EventsStore(EventsWorkerStore):
                 new_forward_extremeties = {}
 
                 # map room_id->(type,state_key)->event_id tracking the full
-                # state in each room after adding these events
+                # state in each room after adding these events.
+                # This is simply used to prefill the get_current_state_ids
+                # cache
                 current_state_for_room = {}
 
-                # map room_id->(to_delete, to_insert) where each entry is
-                # a map (type,key)->event_id giving the state delta in each
+                # map room_id->(to_delete, to_insert) where to_delete is a list
+                # of type/state keys to remove from current state, and to_insert
+                # is a map (type,key)->event_id giving the state delta in each
                 # room
                 state_delta_for_room = {}
 
@@ -419,19 +424,40 @@ class EventsStore(EventsWorkerStore):
                             logger.info(
                                 "Calculating state delta for room %s", room_id,
                             )
-                            current_state = yield self._get_new_state_after_events(
-                                room_id,
-                                ev_ctx_rm,
-                                latest_event_ids,
-                                new_latest_event_ids,
-                            )
+                            with Measure(
+                                self._clock,
+                                "persist_events.get_new_state_after_events",
+                            ):
+                                res = yield self._get_new_state_after_events(
+                                    room_id,
+                                    ev_ctx_rm,
+                                    latest_event_ids,
+                                    new_latest_event_ids,
+                                )
+                                current_state, delta_ids = res
+
+                            # If either are not None then there has been a change,
+                            # and we need to work out the delta (or use that
+                            # given)
+                            if delta_ids is not None:
+                                # If there is a delta we know that we've
+                                # only added or replaced state, never
+                                # removed keys entirely.
+                                state_delta_for_room[room_id] = ([], delta_ids)
+                            elif current_state is not None:
+                                with Measure(
+                                    self._clock,
+                                    "persist_events.calculate_state_delta",
+                                ):
+                                    delta = yield self._calculate_state_delta(
+                                        room_id, current_state,
+                                    )
+                                state_delta_for_room[room_id] = delta
+
+                            # If we have the current_state then lets prefill
+                            # the cache with it.
                             if current_state is not None:
                                 current_state_for_room[room_id] = current_state
-                                delta = yield self._calculate_state_delta(
-                                    room_id, current_state,
-                                )
-                                if delta is not None:
-                                    state_delta_for_room[room_id] = delta
 
                 yield self.runInteraction(
                     "persist_events",
@@ -498,7 +524,6 @@ class EventsStore(EventsWorkerStore):
             iterable=list(new_latest_event_ids),
             retcols=["prev_event_id"],
             keyvalues={
-                "room_id": room_id,
                 "is_state": False,
             },
             desc="_calculate_new_extremeties",
@@ -530,9 +555,15 @@ class EventsStore(EventsWorkerStore):
                 the new forward extremities for the room.
 
         Returns:
-            Deferred[dict[(str,str), str]|None]:
-                None if there are no changes to the room state, or
-                a dict of (type, state_key) -> event_id].
+            Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+            Returns a tuple of two state maps, the first being the full new current
+            state and the second being the delta to the existing current state.
+            If both are None then there has been no change.
+
+            If there has been a change then we only return the delta if its
+            already been calculated. Conversely if we do know the delta then
+            the new current state is only returned if we've already calculated
+            it.
         """
 
         if not new_latest_event_ids:
@@ -540,18 +571,32 @@ class EventsStore(EventsWorkerStore):
 
         # map from state_group to ((type, key) -> event_id) state map
         state_groups_map = {}
+
+        # Map from (prev state group, new state group) -> delta state dict
+        state_group_deltas = {}
+
         for ev, ctx in events_context:
             if ctx.state_group is None:
-                # I don't think this can happen, but let's double-check
-                raise Exception(
-                    "Context for new extremity event %s has no state "
-                    "group" % (ev.event_id, ),
-                )
+                # This should only happen for outlier events.
+                if not ev.internal_metadata.is_outlier():
+                    raise Exception(
+                        "Context for new event %s has no state "
+                        "group" % (ev.event_id, ),
+                    )
+                continue
 
             if ctx.state_group in state_groups_map:
                 continue
 
-            state_groups_map[ctx.state_group] = ctx.current_state_ids
+            # We're only interested in pulling out state that has already
+            # been cached in the context. We'll pull stuff out of the DB later
+            # if necessary.
+            current_state_ids = ctx.get_cached_current_state_ids()
+            if current_state_ids is not None:
+                state_groups_map[ctx.state_group] = current_state_ids
+
+            if ctx.prev_group:
+                state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
 
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
@@ -566,7 +611,7 @@ class EventsStore(EventsWorkerStore):
         for event_id in new_latest_event_ids:
             # First search in the list of new events we're adding.
             for ev, ctx in events_context:
-                if event_id == ev.event_id:
+                if event_id == ev.event_id and ctx.state_group is not None:
                     event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
@@ -594,7 +639,26 @@ class EventsStore(EventsWorkerStore):
         # If they old and new groups are the same then we don't need to do
         # anything.
         if old_state_groups == new_state_groups:
-            return
+            defer.returnValue((None, None))
+
+        if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+            # If we're going from one state group to another, lets check if
+            # we have a delta for that transition. If we do then we can just
+            # return that.
+
+            new_state_group = next(iter(new_state_groups))
+            old_state_group = next(iter(old_state_groups))
+
+            delta_ids = state_group_deltas.get(
+                (old_state_group, new_state_group,), None
+            )
+            if delta_ids is not None:
+                # We have a delta from the existing to new current state,
+                # so lets just return that. If we happen to already have
+                # the current state in memory then lets also return that,
+                # but it doesn't matter if we don't.
+                new_state = state_groups_map.get(new_state_group)
+                defer.returnValue((new_state, delta_ids))
 
         # Now that we have calculated new_state_groups we need to get
         # their state IDs so we can resolve to a single state set.
@@ -606,7 +670,7 @@ class EventsStore(EventsWorkerStore):
         if len(new_state_groups) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            defer.returnValue(state_groups_map[new_state_groups.pop()])
+            defer.returnValue((state_groups_map[new_state_groups.pop()], None))
 
         # Ok, we need to defer to the state handler to resolve our state sets.
 
@@ -625,7 +689,7 @@ class EventsStore(EventsWorkerStore):
             room_id, state_groups, events_map, get_events
         )
 
-        defer.returnValue(res.state)
+        defer.returnValue((res.state, None))
 
     @defer.inlineCallbacks
     def _calculate_state_delta(self, room_id, current_state):
@@ -634,28 +698,20 @@ class EventsStore(EventsWorkerStore):
         Assumes that we are only persisting events for one room at a time.
 
         Returns:
-            2-tuple (to_delete, to_insert) where both are state dicts,
-            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
-            first be deleted from current_state_events, `to_insert` are entries
-            to insert.
+            tuple[list, dict] (to_delete, to_insert): where to_delete are the
+            type/state_keys to remove from current_state_events and `to_insert`
+            are the updates to current_state_events.
         """
         existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(itervalues(existing_state))
-        new_events = set(ev_id for ev_id in itervalues(current_state))
-        changed_events = existing_events ^ new_events
-
-        if not changed_events:
-            return
+        to_delete = [
+            key for key in existing_state
+            if key not in current_state
+        ]
 
-        to_delete = {
-            key: ev_id for key, ev_id in iteritems(existing_state)
-            if ev_id in changed_events
-        }
-        events_to_insert = (new_events - existing_events)
         to_insert = {
             key: ev_id for key, ev_id in iteritems(current_state)
-            if ev_id in events_to_insert
+            if ev_id != existing_state.get(key)
         }
 
         defer.returnValue((to_delete, to_insert))
@@ -678,10 +734,10 @@ class EventsStore(EventsWorkerStore):
             delete_existing (bool): True to purge existing table rows for the
                 events from the database. This is useful when retrying due to
                 IntegrityError.
-            state_delta_for_room (dict[str, (list[str], list[str])]):
+            state_delta_for_room (dict[str, (list, dict)]):
                 The current-state delta for each room. For each room, a tuple
-                (to_delete, to_insert), being a list of event ids to be removed
-                from the current state, and a list of event ids to be added to
+                (to_delete, to_insert), being a list of type/state keys to be
+                removed from the current state, and a state set to be added to
                 the current state.
             new_forward_extremeties (dict[str, list[str]]):
                 The new forward extremities for each room. For each room, a
@@ -759,9 +815,46 @@ class EventsStore(EventsWorkerStore):
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
                 to_delete, to_insert = current_state_tuple
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
+                    )
+                """
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, None,
+                        room_id, etype, state_key,
+                    )
+                    for etype, state_key in to_delete
+                    # We sanity check that we're deleting rather than updating
+                    if (etype, state_key) not in to_insert
+                ))
+                txn.executemany(sql, (
+                    (
+                        max_stream_order, room_id, etype, state_key, ev_id,
+                        room_id, etype, state_key,
+                    )
+                    for (etype, state_key), ev_id in iteritems(to_insert)
+                ))
+
+                # Now we actually update the current_state_events table
+
                 txn.executemany(
-                    "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in itervalues(to_delete)],
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
                 )
 
                 self._simple_insert_many_txn(
@@ -778,25 +871,6 @@ class EventsStore(EventsWorkerStore):
                     ],
                 )
 
-                state_deltas = {key: None for key in to_delete}
-                state_deltas.update(to_insert)
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="current_state_delta_stream",
-                    values=[
-                        {
-                            "stream_id": max_stream_order,
-                            "room_id": room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": ev_id,
-                            "prev_event_id": to_delete.get(key, None),
-                        }
-                        for key, ev_id in iteritems(state_deltas)
-                    ]
-                )
-
                 txn.call_after(
                     self._curr_state_delta_stream_cache.entity_has_changed,
                     room_id, max_stream_order,
@@ -810,7 +884,8 @@ class EventsStore(EventsWorkerStore):
                 # and which we have added, then we invlidate the caches for all
                 # those users.
                 members_changed = set(
-                    state_key for ev_type, state_key in state_deltas
+                    state_key
+                    for ev_type, state_key in itertools.chain(to_delete, to_insert)
                     if ev_type == EventTypes.Member
                 )
 
@@ -1066,7 +1141,7 @@ class EventsStore(EventsWorkerStore):
         ):
             txn.executemany(
                 "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts]
+                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
             )
 
     def _store_event_txn(self, txn, events_and_contexts):
@@ -1117,7 +1192,6 @@ class EventsStore(EventsWorkerStore):
                     "type": event.type,
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
-                    "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 67433606c6..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -25,6 +25,7 @@ from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import (
     LoggingContext,
     PreserveLoggingContext,
@@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            with PreserveLoggingContext():
-                self.runWithConnection(
-                    self._do_fetch
-                )
+            run_as_background_process(
+                "fetch_events",
+                self.runWithConnection,
+                self._do_fetch,
+            )
 
         logger.debug("Loading %d events", len(events))
         with PreserveLoggingContext():
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index be655d287b..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
 from synapse.push.baserules import list_with_base_rules
 from synapse.storage.appservice import ApplicationServiceWorkerStore
 from synapse.storage.pusher import PusherWorkerStore
@@ -186,6 +185,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
 
         defer.returnValue(results)
 
+    @defer.inlineCallbacks
     def bulk_get_push_rules_for_room(self, event, context):
         state_group = context.state_group
         if not state_group:
@@ -195,9 +195,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._bulk_get_push_rules_for_room(
-            event.room_id, state_group, context.current_state_ids, event=event
+        current_state_ids = yield context.get_current_state_ids(self)
+        result = yield self._bulk_get_push_rules_for_room(
+            event.room_id, state_group, current_state_ids, event=event
         )
+        defer.returnValue(result)
 
     @cachedInlineCallbacks(num_args=2, cache_context=True)
     def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
@@ -247,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
             if uid in local_users_in_room:
                 user_ids.add(uid)
 
-        forgotten = yield self.who_forgot_in_room(
-            event.room_id, on_invalidate=cache_context.invalidate,
-        )
-
-        for row in forgotten:
-            user_id = row["user_id"]
-            event_id = row["event_id"]
-
-            mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
-            if event_id == mem_id:
-                user_ids.discard(user_id)
-
         rules_by_user = yield self.bulk_get_push_rules(
             user_ids, on_invalidate=cache_context.invalidate,
         )
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
             )
 
             if newly_inserted:
-                self.runInteraction(
+                yield self.runInteraction(
                     "add_pusher",
                     self._invalidate_cache_and_stream,
                     self.get_if_user_has_pusher, (user_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..10dce21cea 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,7 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
@@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         defer.returnValue(user_who_share_room)
 
+    @defer.inlineCallbacks
     def get_joined_users_from_context(self, event, context):
         state_group = context.state_group
         if not state_group:
@@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # To do this we set the state_group to a new object as object() != object()
             state_group = object()
 
-        return self._get_joined_users_from_context(
-            event.room_id, state_group, context.current_state_ids,
+        current_state_ids = yield context.get_current_state_ids(self)
+        result = yield self._get_joined_users_from_context(
+            event.room_id, state_group, current_state_ids,
             event=event,
             context=context,
         )
+        defer.returnValue(result)
 
     def get_joined_users_from_state(self, room_id, state_entry):
         state_group = state_entry.state_group
@@ -458,17 +461,29 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
-    @cached()
-    def who_forgot_in_room(self, room_id):
-        return self._simple_select_list(
-            table="room_memberships",
-            retcols=("user_id", "event_id"),
-            keyvalues={
-                "room_id": room_id,
-                "forgotten": 1,
-            },
-            desc="who_forgot"
-        )
+    @cachedInlineCallbacks(num_args=2)
+    def did_forget(self, user_id, room_id):
+        """Returns whether user_id has elected to discard history for room_id.
+
+        Returns False if they have since re-joined."""
+        def f(txn):
+            sql = (
+                "SELECT"
+                "  COUNT(*)"
+                " FROM"
+                "  room_memberships"
+                " WHERE"
+                "  user_id = ?"
+                " AND"
+                "  room_id = ?"
+                " AND"
+                "  forgotten = 0"
+            )
+            txn.execute(sql, (user_id, room_id))
+            rows = txn.fetchall()
+            return rows[0][0]
+        count = yield self.runInteraction("did_forget_membership", f)
+        defer.returnValue(count == 0)
 
 
 class RoomMemberStore(RoomMemberWorkerStore):
@@ -577,36 +592,11 @@ class RoomMemberStore(RoomMemberWorkerStore):
             )
             txn.execute(sql, (user_id, room_id))
 
-            txn.call_after(self.did_forget.invalidate, (user_id, room_id))
             self._invalidate_cache_and_stream(
-                txn, self.who_forgot_in_room, (room_id,)
+                txn, self.did_forget, (user_id, room_id,),
             )
         return self.runInteraction("forget_membership", f)
 
-    @cachedInlineCallbacks(num_args=2)
-    def did_forget(self, user_id, room_id):
-        """Returns whether user_id has elected to discard history for room_id.
-
-        Returns False if they have since re-joined."""
-        def f(txn):
-            sql = (
-                "SELECT"
-                "  COUNT(*)"
-                " FROM"
-                "  room_memberships"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-                " AND"
-                "  forgotten = 0"
-            )
-            txn.execute(sql, (user_id, room_id))
-            rows = txn.fetchall()
-            return rows[0][0]
-        count = yield self.runInteraction("did_forget_membership", f)
-        defer.returnValue(count == 0)
-
     @defer.inlineCallbacks
     def _background_add_membership_profile(self, progress, batch_size):
         target_min_stream_id = progress.get(
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        );
+
+    UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+    WHERE ej.event_id = events.event_id AND
+        stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+SQLite:
+
+    UPDATE events SET content=(
+        SELECT json_extract(json,'$.content') FROM event_json ej
+        WHERE ej.event_id = events.event_id
+    )
+    WHERE
+        stream_ordering < (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering LIMIT 1
+        )
+        OR stream_ordering > (
+            SELECT stream_ordering FROM events WHERE content IS NOT NULL
+            ORDER BY stream_ordering DESC LIMIT 1
+        );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute("""
+            ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+        """)
+        return
+
+    # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+    cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+    (oldsql,) = cur.fetchone()
+
+    sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+    if sql == oldsql:
+        raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+    logger.info("Replacing definition of 'events' with: %s", sql)
+
+    cur.execute("PRAGMA schema_version")
+    (oldver,) = cur.fetchone()
+    cur.execute("PRAGMA writable_schema=ON")
+    cur.execute(
+        "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+        (sql, ),
+    )
+    cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+    cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
index 52eec88357..6b5a5a88fa 100644
--- a/synapse/storage/schema/full_schemas/16/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT NOT NULL,
     prev_event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    is_state BOOL NOT NULL,
+    is_state BOOL NOT NULL,  -- true if this is a prev_state edge rather than a regular
+                             -- event dag edge.
     UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    content TEXT NOT NULL,
+
+    -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+    -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+    -- database, which breaks the sytests. Hence, we no longer make it nullable.
+    content TEXT,
+
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..b27b3ae144 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -186,7 +186,17 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_state_groups_from_groups(self, groups, types):
-        """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
+        """Returns the state groups for a given set of groups, filtering on
+        types of state events.
+
+        Args:
+            groups(list[int]): list of state group IDs to query
+            types (Iterable[str, str|None]|None): list of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`. If None, all types are returned.
+
+        Returns:
+            dictionary state_group -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -200,8 +210,11 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+    def _get_state_groups_from_groups_txn(
+        self, txn, groups, types=None,
+    ):
         results = {group: {} for group in groups}
+
         if types is not None:
             types = list(set(types))  # deduplicate types list
 
@@ -239,7 +252,7 @@ class StateGroupWorkerStore(SQLBaseStore):
             # Turns out that postgres doesn't like doing a list of OR's and
             # is about 1000x slower, so we just issue a query for each specific
             # type seperately.
-            if types:
+            if types is not None:
                 clause_to_args = [
                     (
                         "AND type = ? AND state_key = ?",
@@ -278,6 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                     else:
                         where_clauses.append("(type = ? AND state_key = ?)")
                         where_args.extend([typ[0], typ[1]])
+
                 where_clause = "AND (%s)" % (" OR ".join(where_clauses))
             else:
                 where_clause = ""
@@ -332,16 +346,20 @@ class StateGroupWorkerStore(SQLBaseStore):
         return results
 
     @defer.inlineCallbacks
-    def get_state_for_events(self, event_ids, types):
+    def get_state_for_events(self, event_ids, types, filtered_types=None):
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event. The state dicts will only have the type/state_keys
         that are in the `types` list.
 
         Args:
-            event_ids (list)
-            types (list): List of (type, state_key) tuples which are used to
-                filter the state fetched. `state_key` may be None, which matches
-                any `state_key`
+            event_ids (list[string])
+            types (list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             deferred: A list of dicts corresponding to the event_ids given.
@@ -352,7 +370,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types)
+        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
 
         state_event_map = yield self.get_events(
             [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -371,15 +389,19 @@ class StateGroupWorkerStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_ids_for_events(self, event_ids, types=None):
+    def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
         """
         Get the state dicts corresponding to a list of events
 
         Args:
             event_ids(list(str)): events whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from event_id -> (type, state_key) -> state_event
@@ -389,7 +411,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         )
 
         groups = set(itervalues(event_to_groups))
-        group_to_state = yield self._get_state_for_groups(groups, types)
+        group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
 
         event_to_state = {
             event_id: group_to_state[group]
@@ -399,37 +421,45 @@ class StateGroupWorkerStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_for_event(self, event_id, types=None):
+    def get_state_for_event(self, event_id, types=None, filtered_types=None):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_for_events([event_id], types)
+        state_map = yield self.get_state_for_events([event_id], types, filtered_types)
         defer.returnValue(state_map[event_id])
 
     @defer.inlineCallbacks
-    def get_state_ids_for_event(self, event_id, types=None):
+    def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
         """
         Get the state dict corresponding to a particular event
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
         """
-        state_map = yield self.get_state_ids_for_events([event_id], types)
+        state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
         defer.returnValue(state_map[event_id])
 
     @cached(max_entries=50000)
@@ -460,56 +490,73 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
 
-    def _get_some_state_from_cache(self, group, types):
+    def _get_some_state_from_cache(self, group, types, filtered_types=None):
         """Checks if group is in cache. See `_get_state_for_groups`
 
-        Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
-        `missing_types` is the list of types that aren't in the cache for that
-        group. `got_all` is a bool indicating if we successfully retrieved all
+        Args:
+            group(int): The state group to lookup
+            types(list[str, str|None]): List of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+
+        Returns 2-tuple (`state_dict`, `got_all`).
+        `got_all` is a bool indicating if we successfully retrieved all
         requests state from the cache, if False we need to query the DB for the
         missing state.
-
-        Args:
-            group: The state group to lookup
-            types (list): List of 2-tuples of the form (`type`, `state_key`),
-                where a `state_key` of `None` matches all state_keys for the
-                `type`.
         """
         is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
 
         type_to_key = {}
-        missing_types = set()
+
+        # tracks whether any of ourrequested types are missing from the cache
+        missing_types = False
 
         for typ, state_key in types:
             key = (typ, state_key)
-            if state_key is None:
+
+            if (
+                state_key is None or
+                (filtered_types is not None and typ not in filtered_types)
+            ):
                 type_to_key[typ] = None
-                missing_types.add(key)
+                # we mark the type as missing from the cache because
+                # when the cache was populated it might have been done with a
+                # restricted set of state_keys, so the wildcard will not work
+                # and the cache may be incomplete.
+                missing_types = True
             else:
                 if type_to_key.get(typ, object()) is not None:
                     type_to_key.setdefault(typ, set()).add(state_key)
 
                 if key not in state_dict_ids and key not in known_absent:
-                    missing_types.add(key)
+                    missing_types = True
 
         sentinel = object()
 
         def include(typ, state_key):
             valid_state_keys = type_to_key.get(typ, sentinel)
             if valid_state_keys is sentinel:
-                return False
+                return filtered_types is not None and typ not in filtered_types
             if valid_state_keys is None:
                 return True
             if state_key in valid_state_keys:
                 return True
             return False
 
-        got_all = is_all or not missing_types
+        got_all = is_all
+        if not got_all:
+            # the cache is incomplete. We may still have got all the results we need, if
+            # we don't have any wildcards in the match list.
+            if not missing_types and filtered_types is None:
+                got_all = True
 
         return {
             k: v for k, v in iteritems(state_dict_ids)
             if include(k[0], k[1])
-        }, missing_types, got_all
+        }, got_all
 
     def _get_all_state_from_cache(self, group):
         """Checks if group is in cache. See `_get_state_for_groups`
@@ -526,7 +573,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         return state_dict_ids, is_all
 
     @defer.inlineCallbacks
-    def _get_state_for_groups(self, groups, types=None):
+    def _get_state_for_groups(self, groups, types=None, filtered_types=None):
         """Gets the state at each of a list of state groups, optionally
         filtering by type/state_key
 
@@ -540,6 +587,9 @@ class StateGroupWorkerStore(SQLBaseStore):
                 Otherwise, each entry should be a `(type, state_key)` tuple to
                 include in the response. A `state_key` of None is a wildcard
                 meaning that we require all state with that type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             Deferred[dict[int, dict[(type, state_key), EventBase]]]
@@ -551,8 +601,8 @@ class StateGroupWorkerStore(SQLBaseStore):
         missing_groups = []
         if types is not None:
             for group in set(groups):
-                state_dict_ids, _, got_all = self._get_some_state_from_cache(
-                    group, types,
+                state_dict_ids, got_all = self._get_some_state_from_cache(
+                    group, types, filtered_types
                 )
                 results[group] = state_dict_ids
 
@@ -579,13 +629,13 @@ class StateGroupWorkerStore(SQLBaseStore):
             # cache. Hence, if we are doing a wildcard lookup, populate the
             # cache fully so that we can do an efficient lookup next time.
 
-            if types and any(k is None for (t, k) in types):
+            if filtered_types or (types and any(k is None for (t, k) in types)):
                 types_to_fetch = None
             else:
                 types_to_fetch = types
 
             group_to_state_dict = yield self._get_state_groups_from_groups(
-                missing_groups, types_to_fetch,
+                missing_groups, types_to_fetch
             )
 
             for group, group_state_dict in iteritems(group_to_state_dict):
@@ -595,7 +645,10 @@ class StateGroupWorkerStore(SQLBaseStore):
                 if types:
                     for k, v in iteritems(group_state_dict):
                         (typ, _) = k
-                        if k in types or (typ, None) in types:
+                        if (
+                            (k in types or (typ, None) in types) or
+                            (filtered_types and typ not in filtered_types)
+                        ):
                             state_dict[k] = v
                 else:
                     state_dict.update(group_state_dict)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66856342f0..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -527,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
 
     @defer.inlineCallbacks
-    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+    def get_events_around(
+        self, room_id, event_id, before_limit, after_limit, event_filter=None,
+    ):
         """Retrieve events and pagination tokens around a given event in a
         room.
 
@@ -536,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -543,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = yield self.runInteraction(
             "get_events_around", self._get_events_around_txn,
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter,
         )
 
         events_before = yield self._get_events(
@@ -563,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "end": results["after"]["token"],
         })
 
-    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+    def _get_events_around_txn(
+        self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+    ):
         """Retrieves event_ids and pagination tokens around a given event in a
         room.
 
@@ -572,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             event_id (str)
             before_limit (int)
             after_limit (int)
+            event_filter (Filter|None)
 
         Returns:
             dict
@@ -601,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows, start_token = self._paginate_room_events_txn(
             txn, room_id, before_token, direction='b', limit=before_limit,
+            event_filter=event_filter,
         )
         events_before = [r.event_id for r in rows]
 
         rows, end_token = self._paginate_room_events_txn(
             txn, room_id, after_token, direction='f', limit=after_limit,
+            event_filter=event_filter,
         )
         events_after = [r.event_id for r in rows]
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c3bc94f56d..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore
@@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(TransactionStore, self).__init__(db_conn, hs)
 
-        self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
+        self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
@@ -271,6 +272,11 @@ class TransactionStore(SQLBaseStore):
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
 
+    def _start_cleanup_transactions(self):
+        return run_as_background_process(
+            "cleanup_transactions", self._cleanup_transactions,
+        )
+
     def _cleanup_transactions(self):
         now = self._clock.time_msec()
         month_ago = now - 30 * 24 * 60 * 60 * 1000
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5d0fb39130..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import collections
 import logging
 from contextlib import contextmanager
 
@@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit):
 
 
 class Linearizer(object):
-    """Linearizes access to resources based on a key. Useful to ensure only one
-    thing is happening at a time on a given resource.
+    """Limits concurrent access to resources based on a key. Useful to ensure
+    only a few things happen at a time on a given resource.
 
     Example:
 
-        with (yield linearizer.queue("test_key")):
+        with (yield limiter.queue("test_key")):
             # do some work.
 
     """
-    def __init__(self, name=None, clock=None):
+    def __init__(self, name=None, max_count=1, clock=None):
+        """
+        Args:
+            max_count(int): The maximum number of concurrent accesses
+        """
         if name is None:
             self.name = id(self)
         else:
             self.name = name
-        self.key_to_defer = {}
 
         if not clock:
             from twisted.internet import reactor
             clock = Clock(reactor)
         self._clock = clock
+        self.max_count = max_count
+
+        # key_to_defer is a map from the key to a 2 element list where
+        # the first element is the number of things executing, and
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
+        self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        # If there is already a deferred in the queue, we pull it out so that
-        # we can wait on it later.
-        # Then we replace it with a deferred that we resolve *after* the
-        # context manager has exited.
-        # We only return the context manager after the previous deferred has
-        # resolved.
-        # This all has the net effect of creating a chain of deferreds that
-        # wait for the previous deferred before starting their work.
-        current_defer = self.key_to_defer.get(key)
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
-        new_defer = defer.Deferred()
-        self.key_to_defer[key] = new_defer
+        # If the number of things executing is greater than the maximum
+        # then add a deferred to the list of blocked items
+        # When on of the things currently executing finishes it will callback
+        # this item so that it can continue executing.
+        if entry[0] >= self.max_count:
+            new_defer = defer.Deferred()
+            entry[1][new_defer] = 1
 
-        if current_defer:
             logger.info(
-                "Waiting to acquire linearizer lock %r for key %r", self.name, key
+                "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
             try:
-                with PreserveLoggingContext():
-                    yield current_defer
-            except Exception:
-                logger.exception("Unexpected exception in Linearizer")
-
-            logger.info("Acquired linearizer lock %r for key %r", self.name,
-                        key)
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
+
+            logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
             # will recursively run the next claimant on the list. That can
@@ -213,15 +232,15 @@ class Linearizer(object):
             # In order to break the cycle, we add a cheeky sleep(0) here to
             # ensure that we fall back to the reactor between each iteration.
             #
-            # (There's no particular need for it to happen before we return
-            # the context manager, but it needs to happen while we hold the
-            # lock, and the context manager's exit code must be synchronous,
-            # so actually this is the only sensible place.
+            # (This needs to happen while we hold the lock, and the context manager's exit
+            # code must be synchronous, so this is the only sensible place.)
             yield self._clock.sleep(0)
 
         else:
-            logger.info("Acquired uncontended linearizer lock %r for key %r",
-                        self.name, key)
+            logger.info(
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+            )
+            entry[0] += 1
 
         @contextmanager
         def _ctx_manager():
@@ -229,73 +248,15 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                with PreserveLoggingContext():
-                    new_defer.callback(None)
-                current_d = self.key_to_defer.get(key)
-                if current_d is new_defer:
-                    self.key_to_defer.pop(key, None)
-
-        defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
-    """Limits concurrent access to resources based on a key. Useful to ensure
-    only a few thing happen at a time on a given resource.
-
-    Example:
-
-        with (yield limiter.queue("test_key")):
-            # do some work.
-
-    """
-    def __init__(self, max_count):
-        """
-        Args:
-            max_count(int): The maximum number of concurrent access
-        """
-        self.max_count = max_count
-
-        # key_to_defer is a map from the key to a 2 element list where
-        # the first element is the number of things executing
-        # the second element is a list of deferreds for the things blocked from
-        # executing.
-        self.key_to_defer = {}
-
-    @defer.inlineCallbacks
-    def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, []])
-
-        # If the number of things executing is greater than the maximum
-        # then add a deferred to the list of blocked items
-        # When on of the things currently executing finishes it will callback
-        # this item so that it can continue executing.
-        if entry[0] >= self.max_count:
-            new_defer = defer.Deferred()
-            entry[1].append(new_defer)
-
-            logger.info("Waiting to acquire limiter lock for key %r", key)
-            with PreserveLoggingContext():
-                yield new_defer
-            logger.info("Acquired limiter lock for key %r", key)
-        else:
-            logger.info("Acquired uncontended limiter lock for key %r", key)
-
-        entry[0] += 1
-
-        @contextmanager
-        def _ctx_manager():
-            try:
-                yield
-            finally:
-                logger.info("Releasing limiter lock for key %r", key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].pop(0)
+                    (next_def, _) = entry[1].popitem(last=False)
 
+                    # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():
                         next_def.callback(None)
                 elif entry[0] == 0:
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index f8a07df6b8..861c24809c 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -473,105 +473,101 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
         @functools.wraps(self.orig)
         def wrapped(*args, **kwargs):
-            # If we're passed a cache_context then we'll want to call its invalidate()
-            # whenever we are invalidated
+            # If we're passed a cache_context then we'll want to call its
+            # invalidate() whenever we are invalidated
             invalidate_callback = kwargs.pop("on_invalidate", None)
 
             arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
             keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
             list_args = arg_dict[self.list_name]
 
-            # cached is a dict arg -> deferred, where deferred results in a
-            # 2-tuple (`arg`, `result`)
             results = {}
-            cached_defers = {}
-            missing = []
+
+            def update_results_dict(res, arg):
+                results[arg] = res
+
+            # list of deferreds to wait for
+            cached_defers = []
+
+            missing = set()
 
             # If the cache takes a single arg then that is used as the key,
             # otherwise a tuple is used.
             if num_args == 1:
-                def cache_get(arg):
-                    return cache.get(arg, callback=invalidate_callback)
+                def arg_to_cache_key(arg):
+                    return arg
             else:
-                key = list(keyargs)
+                keylist = list(keyargs)
 
-                def cache_get(arg):
-                    key[self.list_pos] = arg
-                    return cache.get(tuple(key), callback=invalidate_callback)
+                def arg_to_cache_key(arg):
+                    keylist[self.list_pos] = arg
+                    return tuple(keylist)
 
             for arg in list_args:
                 try:
-                    res = cache_get(arg)
-
+                    res = cache.get(arg_to_cache_key(arg),
+                                    callback=invalidate_callback)
                     if not isinstance(res, ObservableDeferred):
                         results[arg] = res
                     elif not res.has_succeeded():
                         res = res.observe()
-                        res.addCallback(lambda r, arg: (arg, r), arg)
-                        cached_defers[arg] = res
+                        res.addCallback(update_results_dict, arg)
+                        cached_defers.append(res)
                     else:
                         results[arg] = res.get_result()
                 except KeyError:
-                    missing.append(arg)
+                    missing.add(arg)
 
             if missing:
+                # we need an observable deferred for each entry in the list,
+                # which we put in the cache. Each deferred resolves with the
+                # relevant result for that key.
+                deferreds_map = {}
+                for arg in missing:
+                    deferred = defer.Deferred()
+                    deferreds_map[arg] = deferred
+                    key = arg_to_cache_key(arg)
+                    observable = ObservableDeferred(deferred)
+                    cache.set(key, observable, callback=invalidate_callback)
+
+                def complete_all(res):
+                    # the wrapped function has completed. It returns a
+                    # a dict. We can now resolve the observable deferreds in
+                    # the cache and update our own result map.
+                    for e in missing:
+                        val = res.get(e, None)
+                        deferreds_map[e].callback(val)
+                        results[e] = val
+
+                def errback(f):
+                    # the wrapped function has failed. Invalidate any cache
+                    # entries we're supposed to be populating, and fail
+                    # their deferreds.
+                    for e in missing:
+                        key = arg_to_cache_key(e)
+                        cache.invalidate(key)
+                        deferreds_map[e].errback(f)
+
+                    # return the failure, to propagate to our caller.
+                    return f
+
                 args_to_call = dict(arg_dict)
-                args_to_call[self.list_name] = missing
+                args_to_call[self.list_name] = list(missing)
 
-                ret_d = defer.maybeDeferred(
+                cached_defers.append(defer.maybeDeferred(
                     logcontext.preserve_fn(self.function_to_call),
                     **args_to_call
-                )
-
-                ret_d = ObservableDeferred(ret_d)
-
-                # We need to create deferreds for each arg in the list so that
-                # we can insert the new deferred into the cache.
-                for arg in missing:
-                    observer = ret_d.observe()
-                    observer.addCallback(lambda r, arg: r.get(arg, None), arg)
-
-                    observer = ObservableDeferred(observer)
-
-                    if num_args == 1:
-                        cache.set(
-                            arg, observer,
-                            callback=invalidate_callback
-                        )
-
-                        def invalidate(f, key):
-                            cache.invalidate(key)
-                            return f
-                        observer.addErrback(invalidate, arg)
-                    else:
-                        key = list(keyargs)
-                        key[self.list_pos] = arg
-                        cache.set(
-                            tuple(key), observer,
-                            callback=invalidate_callback
-                        )
-
-                        def invalidate(f, key):
-                            cache.invalidate(key)
-                            return f
-                        observer.addErrback(invalidate, tuple(key))
-
-                    res = observer.observe()
-                    res.addCallback(lambda r, arg: (arg, r), arg)
-
-                    cached_defers[arg] = res
+                ).addCallbacks(complete_all, errback))
 
             if cached_defers:
-                def update_results_dict(res):
-                    results.update(res)
-                    return results
-
-                return logcontext.make_deferred_yieldable(defer.gatherResults(
-                    list(cached_defers.values()),
+                d = defer.gatherResults(
+                    cached_defers,
                     consumeErrors=True,
-                ).addCallback(update_results_dict).addErrback(
+                ).addCallbacks(
+                    lambda _: results,
                     unwrapFirstError
-                ))
+                )
+                return logcontext.make_deferred_yieldable(d)
             else:
                 return results
 
@@ -625,7 +621,8 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
     cache.
 
     Args:
-        cache (Cache): The underlying cache to use.
+        cached_method_name (str): The name of the single-item lookup method.
+            This is only used to find the cache to use.
         list_name (str): The name of the argument that is the list to use to
             do batch lookups in the cache.
         num_args (int): Number of arguments to use as the key in the cache
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 4abca91f6d..ce85b2ae11 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,7 @@
 import logging
 from collections import OrderedDict
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
 
 logger = logging.getLogger(__name__)
@@ -63,7 +64,10 @@ class ExpiringCache(object):
             return
 
         def f():
-            self._prune_cache()
+            return run_as_background_process(
+                "prune_cache_%s" % self._cache_name,
+                self._prune_cache,
+            )
 
         self._clock.looping_call(f, self._expiry_ms / 2)
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 258655349b..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -74,12 +74,14 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            result = {
+            changed_entities = {
                 self._cache[k] for k in self._cache.islice(
                     start=self._cache.bisect_right(stream_pos),
                 )
             }
 
+            result = changed_entities.intersection(entities)
+
             self.metrics.inc_hits()
         else:
             result = set(entities)
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
 
 def user_left_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_left_room", user=user, room_id=room_id)
+    distributor.fire("user_left_room", user=user, room_id=room_id)
 
 
 def user_joined_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_joined_room", user=user, room_id=room_id)
+    distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
 class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
       model will do for today.
     """
 
-    def __init__(self, suppress_failures=True):
-        self.suppress_failures = suppress_failures
-
+    def __init__(self):
         self.signals = {}
         self.pre_registration = {}
 
@@ -56,7 +52,6 @@ class Distributor(object):
 
         self.signals[name] = Signal(
             name,
-            suppress_failures=self.suppress_failures,
         )
 
         if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
             self.pre_registration[name].append(observer)
 
     def fire(self, name, *args, **kwargs):
+        """Dispatches the given signal to the registered observers.
+
+        Runs the observers as a background process. Does not return a deferred.
+        """
         if name not in self.signals:
             raise KeyError("%r does not have a signal named %s" % (self, name))
 
-        return self.signals[name].fire(*args, **kwargs)
+        run_as_background_process(
+            name,
+            self.signals[name].fire,
+            *args, **kwargs
+        )
 
 
 class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
     method into all of the observers.
     """
 
-    def __init__(self, name, suppress_failures):
+    def __init__(self, name):
         self.name = name
-        self.suppress_failures = suppress_failures
         self.observers = []
 
     def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
         Each observer callable may return a Deferred."""
         self.observers.append(observer)
 
-    @defer.inlineCallbacks
     def fire(self, *args, **kwargs):
         """Invokes every callable in the observer list, passing in the args and
         kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
                         failure.type,
                         failure.value,
                         failure.getTracebackObject()))
-                if not self.suppress_failures:
-                    return failure
 
             return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
 
-        with PreserveLoggingContext():
-            deferreds = [
-                do(observer)
-                for observer in self.observers
-            ]
-
-            res = yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        deferreds = [
+            run_in_background(do, o)
+            for o in self.observers
+        ]
 
-        defer.returnValue(res)
+        return make_deferred_yieldable(defer.gatherResults(
+            deferreds, consumeErrors=True,
+        ))
 
     def __repr__(self):
         return "<Signal name=%r>" % (self.name,)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index f6c7175f74..8dcae50b39 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -99,6 +99,17 @@ class ContextResourceUsage(object):
         self.db_sched_duration_sec = 0
         self.evt_db_fetch_count = 0
 
+    def __repr__(self):
+        return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+                "db_txn_count='%r', db_txn_duration_sec='%r', "
+                "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
+                    self.ru_stime,
+                    self.ru_utime,
+                    self.db_txn_count,
+                    self.db_txn_duration_sec,
+                    self.db_sched_duration_sec,
+                    self.evt_db_fetch_count,)
+
     def __iadd__(self, other):
         """Add another ContextResourceUsage's stats to this one's.
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6ba7107896..97f1267380 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -104,12 +104,19 @@ class Measure(object):
             logger.warn("Expected context. (%r)", self.name)
             return
 
-        usage = context.get_resource_usage() - self.start_usage
-        block_ru_utime.labels(self.name).inc(usage.ru_utime)
-        block_ru_stime.labels(self.name).inc(usage.ru_stime)
-        block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
-        block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
-        block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+        current = context.get_resource_usage()
+        usage = current - self.start_usage
+        try:
+            block_ru_utime.labels(self.name).inc(usage.ru_utime)
+            block_ru_stime.labels(self.name).inc(usage.ru_stime)
+            block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+            block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+            block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+        except ValueError:
+            logger.warn(
+                "Failed to save metrics! OLD: %r, NEW: %r",
+                self.start_usage, current
+            )
 
         if self.created_context:
             self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 015c2bab37..d4680863d3 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -12,15 +12,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import itertools
+
 import logging
 import operator
 
+from six import iteritems, itervalues
+from six.moves import map
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.events.utils import prune_event
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
 
@@ -72,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         types=types,
     )
 
-    forgotten = yield make_deferred_yieldable(defer.gatherResults([
-        defer.maybeDeferred(
-            preserve_fn(store.who_forgot_in_room),
-            room_id,
-        )
-        for room_id in frozenset(e.room_id for e in events)
-    ], consumeErrors=True))
-
-    # Set of membership event_ids that have been forgotten
-    event_id_forgotten = frozenset(
-        row["event_id"] for rows in forgotten for row in rows
-    )
-
     ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
         "m.ignored_user_list", user_id,
     )
@@ -173,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         if membership is None:
             membership_event = state.get((EventTypes.Member, user_id), None)
             if membership_event:
-                # XXX why do we do this?
-                # https://github.com/matrix-org/synapse/issues/3350
-                if membership_event.event_id not in event_id_forgotten:
-                    membership = membership_event.membership
+                membership = membership_event.membership
 
         # if the user was a member of the room at the time of the event,
         # they can see it.
@@ -218,10 +205,161 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         return event
 
     # check each event: gives an iterable[None|EventBase]
-    filtered_events = itertools.imap(allowed, events)
+    filtered_events = map(allowed, events)
 
     # remove the None entries
     filtered_events = filter(operator.truth, filtered_events)
 
     # we turn it into a list before returning it.
     defer.returnValue(list(filtered_events))
+
+
+@defer.inlineCallbacks
+def filter_events_for_server(store, server_name, events):
+    # Whatever else we do, we need to check for senders which have requested
+    # erasure of their data.
+    erased_senders = yield store.are_users_erased(
+        e.sender for e in events,
+    )
+
+    def redact_disallowed(event, state):
+        # if the sender has been gdpr17ed, always return a redacted
+        # copy of the event.
+        if erased_senders[event.sender]:
+            logger.info(
+                "Sender of %s has been erased, redacting",
+                event.event_id,
+            )
+            return prune_event(event)
+
+        # state will be None if we decided we didn't need to filter by
+        # room membership.
+        if not state:
+            return event
+
+        history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+        if history:
+            visibility = history.content.get("history_visibility", "shared")
+            if visibility in ["invited", "joined"]:
+                # We now loop through all state events looking for
+                # membership states for the requesting server to determine
+                # if the server is either in the room or has been invited
+                # into the room.
+                for ev in itervalues(state):
+                    if ev.type != EventTypes.Member:
+                        continue
+                    try:
+                        domain = get_domain_from_id(ev.state_key)
+                    except Exception:
+                        continue
+
+                    if domain != server_name:
+                        continue
+
+                    memtype = ev.membership
+                    if memtype == Membership.JOIN:
+                        return event
+                    elif memtype == Membership.INVITE:
+                        if visibility == "invited":
+                            return event
+                else:
+                    # server has no users in the room: redact
+                    return prune_event(event)
+
+        return event
+
+    # Next lets check to see if all the events have a history visibility
+    # of "shared" or "world_readable". If thats the case then we don't
+    # need to check membership (as we know the server is in the room).
+    event_to_state_ids = yield store.get_state_ids_for_events(
+        frozenset(e.event_id for e in events),
+        types=(
+            (EventTypes.RoomHistoryVisibility, ""),
+        )
+    )
+
+    visibility_ids = set()
+    for sids in itervalues(event_to_state_ids):
+        hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
+        if hist:
+            visibility_ids.add(hist)
+
+    # If we failed to find any history visibility events then the default
+    # is "shared" visiblity.
+    if not visibility_ids:
+        all_open = True
+    else:
+        event_map = yield store.get_events(visibility_ids)
+        all_open = all(
+            e.content.get("history_visibility") in (None, "shared", "world_readable")
+            for e in itervalues(event_map)
+        )
+
+    if all_open:
+        # all the history_visibility state affecting these events is open, so
+        # we don't need to filter by membership state. We *do* need to check
+        # for user erasure, though.
+        if erased_senders:
+            events = [
+                redact_disallowed(e, None)
+                for e in events
+            ]
+
+        defer.returnValue(events)
+
+    # Ok, so we're dealing with events that have non-trivial visibility
+    # rules, so we need to also get the memberships of the room.
+
+    # first, for each event we're wanting to return, get the event_ids
+    # of the history vis and membership state at those events.
+    event_to_state_ids = yield store.get_state_ids_for_events(
+        frozenset(e.event_id for e in events),
+        types=(
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.Member, None),
+        )
+    )
+
+    # We only want to pull out member events that correspond to the
+    # server's domain.
+    #
+    # event_to_state_ids contains lots of duplicates, so it turns out to be
+    # cheaper to build a complete set of unique
+    # ((type, state_key), event_id) tuples, and then filter out the ones we
+    # don't want.
+    #
+    state_key_to_event_id_set = {
+        e
+        for key_to_eid in itervalues(event_to_state_ids)
+        for e in key_to_eid.items()
+    }
+
+    def include(typ, state_key):
+        if typ != EventTypes.Member:
+            return True
+
+        # we avoid using get_domain_from_id here for efficiency.
+        idx = state_key.find(":")
+        if idx == -1:
+            return False
+        return state_key[idx + 1:] == server_name
+
+    event_map = yield store.get_events([
+        e_id
+        for key, e_id in state_key_to_event_id_set
+        if include(key[0], key[1])
+    ])
+
+    event_to_state = {
+        e_id: {
+            key: event_map[inner_e_id]
+            for key, inner_e_id in iteritems(key_to_eid)
+            if inner_e_id in event_map
+        }
+        for e_id, key_to_eid in iteritems(event_to_state_ids)
+    }
+
+    defer.returnValue([
+        redact_disallowed(e, event_to_state[e.event_id])
+        for e in events
+    ])