summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/auth.py6
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/room.py13
-rw-r--r--synapse/handlers/room_member.py45
-rw-r--r--synapse/handlers/sync.py101
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/slave/storage/directory.py8
-rw-r--r--synapse/replication/slave/storage/events.py110
-rw-r--r--synapse/replication/slave/storage/registration.py18
-rw-r--r--synapse/replication/slave/storage/room.py21
-rw-r--r--synapse/rest/client/v1/admin.py65
-rw-r--r--synapse/rest/client/v1/room.py19
-rw-r--r--synapse/rest/client/v2_alpha/register.py2
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/storage/__init__.py32
-rw-r--r--synapse/storage/_base.py26
-rw-r--r--synapse/storage/appservice.py82
-rw-r--r--synapse/storage/directory.py50
-rw-r--r--synapse/storage/event_federation.py264
-rw-r--r--synapse/storage/event_push_actions.py208
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/registration.py118
-rw-r--r--synapse/storage/room.py239
-rw-r--r--synapse/storage/roommember.py27
-rw-r--r--synapse/storage/signatures.py12
-rw-r--r--synapse/storage/stream.py310
-rw-r--r--synapse/util/logcontext.py20
31 files changed, 927 insertions, 902 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
 from .room import (
     RoomCreationHandler, RoomContextHandler,
 )
-from .room_member import RoomMemberHandler
 from .message import MessageHandler
 from .federation import FederationHandler
 from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
         self.registration_handler = RegistrationHandler(hs)
         self.message_handler = MessageHandler(hs)
         self.room_creation_handler = RoomCreationHandler(hs)
-        self.room_member_handler = RoomMemberHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
                 # homeserver.
                 requester = synapse.types.create_requester(
                     target_user, is_guest=True)
-                handler = self.hs.get_handlers().room_member_handler
+                handler = self.hs.get_room_member_handler()
                 yield handler.update_membership(
                     requester,
                     target_user,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 258cc345dc..a5365c4fe4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -863,8 +863,10 @@ class AuthHandler(BaseHandler):
         """
 
         def _do_validate_hash():
-            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
-                                 stored_hash.encode('utf8')) == stored_hash
+            return bcrypt.checkpw(
+                password.encode('utf8') + self.hs.config.password_pepper,
+                stored_hash.encode('utf8')
+            )
 
         if stored_hash:
             return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8832ba58bc..520612683e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2153,7 +2153,7 @@ class FederationHandler(BaseHandler):
                 raise e
 
             yield self._check_signature(event, context)
-            member_handler = self.hs.get_handlers().room_member_handler
+            member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2197,7 +2197,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
 
-        member_handler = self.hs.get_handlers().room_member_handler
+        member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d28c2745c..dd00d8a86c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,16 +52,12 @@ class MessageHandler(BaseHandler):
         self.pagination_lock = ReadWriteLock()
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id, delete_local_events=False):
-        event = yield self.store.get_event(event_id)
-
-        if event.room_id != room_id:
-            raise SynapseError(400, "Event is for wrong room.")
-
-        depth = event.depth
-
+    def purge_history(self, room_id, topological_ordering,
+                      delete_local_events=False):
         with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(room_id, depth, delete_local_events)
+            yield self.store.purge_history(
+                room_id, topological_ordering, delete_local_events,
+            )
 
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..c9c2879038 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -233,7 +233,7 @@ class ProfileHandler(BaseHandler):
         )
 
         for room_id in room_ids:
-            handler = self.hs.get_handlers().room_member_handler
+            handler = self.hs.get_room_member_handler()
             try:
                 # Assume the target_user isn't a guest,
                 # because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6ab020bf41..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -165,7 +165,7 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
-        room_member_handler = self.hs.get_handlers().room_member_handler
+        room_member_handler = self.hs.get_room_member_handler()
 
         yield self._send_events_for_new_room(
             requester,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
             id_server = invite_3pid["id_server"]
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
-            yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+            yield self.hs.get_room_member_handler().do_3pid_invite(
                 room_id,
                 requester.user,
                 medium,
@@ -475,12 +475,9 @@ class RoomEventSource(object):
             user.to_string()
         )
         if app_service:
-            events, end_key = yield self.store.get_appservice_room_stream(
-                service=app_service,
-                from_key=from_key,
-                to_key=to_key,
-                limit=limit,
-            )
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             room_events = yield self.store.get_membership_changes_for_user(
                 user.to_string(), from_key, to_key
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 37dc5e99ab..ed3b97730d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,24 +30,32 @@ from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, RoomID
 from synapse.util.async import Linearizer
 from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
 
 
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
     # TODO(paul): This handler currently contains a messy conflation of
     #   low-level API that works on UserID objects and so on, and REST-level
     #   API that takes ID strings and returns pagination chunks. These concerns
     #   ought to be separated out a lot better.
 
     def __init__(self, hs):
-        super(RoomMemberHandler, self).__init__(hs)
-
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+        self.state_handler = hs.get_state_handler()
+        self.config = hs.config
+        self.simple_http_client = hs.get_simple_http_client()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.directory_handler = hs.get_handlers().directory_handler
+        self.registration_handler = hs.get_handlers().registration_handler
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_hander = hs.get_event_creation_handler()
+        self.replication_layer = hs.get_replication_layer()
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -138,7 +146,7 @@ class RoomMemberHandler(BaseHandler):
         # join dance for now, since we're kinda implicitly checking
         # that we are allowed to join when we decide whether or not we
         # need to do the invite/join dance.
-        yield self.hs.get_handlers().federation_handler.do_invite_join(
+        yield self.federation_handler.do_invite_join(
             remote_room_hosts,
             room_id,
             user.to_string(),
@@ -204,8 +212,7 @@ class RoomMemberHandler(BaseHandler):
         # if this is a join with a 3pid signature, we may need to turn a 3pid
         # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
-            replication = self.hs.get_replication_layer()
-            yield replication.exchange_third_party_invite(
+            yield self.replication_layer.exchange_third_party_invite(
                 third_party_signed["sender"],
                 target.to_string(),
                 room_id,
@@ -226,7 +233,7 @@ class RoomMemberHandler(BaseHandler):
                 requester.user,
             )
             if not is_requester_admin:
-                if self.hs.config.block_non_admin_invites:
+                if self.config.block_non_admin_invites:
                     logger.info(
                         "Blocking invite: user is not admin and non-admin "
                         "invites disabled"
@@ -321,7 +328,7 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    fed_handler = self.hs.get_handlers().federation_handler
+                    fed_handler = self.federation_handler
                     try:
                         ret = yield fed_handler.do_remotely_reject_invite(
                             remote_room_hosts,
@@ -477,7 +484,7 @@ class RoomMemberHandler(BaseHandler):
         Raises:
             SynapseError if room alias could not be found.
         """
-        directory_handler = self.hs.get_handlers().directory_handler
+        directory_handler = self.directory_handler
         mapping = yield directory_handler.get_association(room_alias)
 
         if not mapping:
@@ -508,7 +515,7 @@ class RoomMemberHandler(BaseHandler):
             requester,
             txn_id
     ):
-        if self.hs.config.block_non_admin_invites:
+        if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(
                 requester.user,
             )
@@ -555,7 +562,7 @@ class RoomMemberHandler(BaseHandler):
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
         try:
-            data = yield self.hs.get_simple_http_client().get_json(
+            data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
                 {
                     "medium": medium,
@@ -566,7 +573,7 @@ class RoomMemberHandler(BaseHandler):
             if "mxid" in data:
                 if "signatures" not in data:
                     raise AuthError(401, "No signatures on 3pid binding")
-                self.verify_any_signature(data, id_server)
+                yield self.verify_any_signature(data, id_server)
                 defer.returnValue(data["mxid"])
 
         except IOError as e:
@@ -578,7 +585,7 @@ class RoomMemberHandler(BaseHandler):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.hs.get_simple_http_client().get_json(
+            key_data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/pubkey/%s" %
                 (id_server_scheme, server_hostname, key_name,),
             )
@@ -603,7 +610,7 @@ class RoomMemberHandler(BaseHandler):
             user,
             txn_id
     ):
-        room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+        room_state = yield self.state_handler.get_current_state(room_id)
 
         inviter_display_name = ""
         inviter_avatar_url = ""
@@ -727,15 +734,15 @@ class RoomMemberHandler(BaseHandler):
             "sender_avatar_url": inviter_avatar_url,
         }
 
-        if self.hs.config.invite_3pid_guest:
-            registration_handler = self.hs.get_handlers().registration_handler
+        if self.config.invite_3pid_guest:
+            registration_handler = self.registration_handler
             guest_access_token = yield registration_handler.guest_access_token_for(
                 medium=medium,
                 address=address,
                 inviter_user_id=inviter_user_id,
             )
 
-            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+            guest_user_info = yield self.auth.get_user_by_access_token(
                 guest_access_token
             )
 
@@ -744,7 +751,7 @@ class RoomMemberHandler(BaseHandler):
                 "guest_user_id": guest_user_info["user"].to_string(),
             })
 
-        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+        data = yield self.simple_http_client.post_urlencoded_get_json(
             is_url,
             invite_config
         )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..0f713ce038 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -235,10 +235,10 @@ class SyncHandler(object):
         defer.returnValue(rules)
 
     @defer.inlineCallbacks
-    def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+    def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
         Args:
-            sync_config (SyncConfig): The flags, filters and user for the sync.
+            sync_result_builder(SyncResultBuilder)
             now_token (StreamToken): Where the server is currently up to.
             since_token (StreamToken): Where the server was when the client
                 last synced.
@@ -248,10 +248,12 @@ class SyncHandler(object):
             typing events for that room.
         """
 
+        sync_config = sync_result_builder.sync_config
+
         with Measure(self.clock, "ephemeral_by_room"):
             typing_key = since_token.typing_key if since_token else "0"
 
-            room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = sync_result_builder.joined_room_ids
 
             typing_source = self.event_sources.sources["typing"]
             typing, typing_key = yield typing_source.get_new_events(
@@ -565,10 +567,22 @@ class SyncHandler(object):
         # Always use the `now_token` in `SyncResultBuilder`
         now_token = yield self.event_sources.get_current_token()
 
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+        else:
+            joined_room_ids = yield self.get_rooms_for_user_at(
+                user_id, now_token.room_stream_id,
+            )
+
         sync_result_builder = SyncResultBuilder(
             sync_config, full_state,
             since_token=since_token,
             now_token=now_token,
+            joined_room_ids=joined_room_ids,
         )
 
         account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -603,7 +617,6 @@ class SyncHandler(object):
         device_id = sync_config.device_id
         one_time_key_counts = {}
         if device_id:
-            user_id = sync_config.user.to_string()
             one_time_key_counts = yield self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
@@ -891,7 +904,7 @@ class SyncHandler(object):
             ephemeral_by_room = {}
         else:
             now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-                sync_result_builder.sync_config,
+                sync_result_builder,
                 now_token=sync_result_builder.now_token,
                 since_token=sync_result_builder.since_token,
             )
@@ -996,15 +1009,8 @@ class SyncHandler(object):
         if rooms_changed:
             defer.returnValue(True)
 
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
         stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
-        for room_id in joined_room_ids:
+        for room_id in sync_result_builder.joined_room_ids:
             if self.store.has_room_changed_since(room_id, stream_id):
                 defer.returnValue(True)
         defer.returnValue(False)
@@ -1028,13 +1034,6 @@ class SyncHandler(object):
 
         assert since_token
 
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
@@ -1057,7 +1056,7 @@ class SyncHandler(object):
             # we do send down the room, and with full state, where necessary
 
             old_state_ids = None
-            if room_id in joined_room_ids and non_joins:
+            if room_id in sync_result_builder.joined_room_ids and non_joins:
                 # Always include if the user (re)joined the room, especially
                 # important so that device list changes are calculated correctly.
                 # If there are non join member events, but we are still in the room,
@@ -1067,7 +1066,7 @@ class SyncHandler(object):
                 # User is in the room so we don't need to do the invite/leave checks
                 continue
 
-            if room_id in joined_room_ids or has_join:
+            if room_id in sync_result_builder.joined_room_ids or has_join:
                 old_state_ids = yield self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                 old_mem_ev = None
@@ -1079,7 +1078,7 @@ class SyncHandler(object):
                     newly_joined_rooms.append(room_id)
 
             # If user is in the room then we don't need to do the invite/leave checks
-            if room_id in joined_room_ids:
+            if room_id in sync_result_builder.joined_room_ids:
                 continue
 
             if not non_joins:
@@ -1146,7 +1145,7 @@ class SyncHandler(object):
 
         # Get all events for rooms we're currently joined to.
         room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
+            room_ids=sync_result_builder.joined_room_ids,
             from_key=since_token.room_key,
             to_key=now_token.room_key,
             limit=timeline_limit + 1,
@@ -1154,7 +1153,7 @@ class SyncHandler(object):
 
         # We loop through all room ids, even if there are no new events, in case
         # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
+        for room_id in sync_result_builder.joined_room_ids:
             room_entry = room_to_events.get(room_id, None)
 
             if room_entry:
@@ -1362,6 +1361,54 @@ class SyncHandler(object):
         else:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
+    @defer.inlineCallbacks
+    def get_rooms_for_user_at(self, user_id, stream_ordering):
+        """Get set of joined rooms for a user at the given stream ordering.
+
+        The stream ordering *must* be recent, otherwise this may throw an
+        exception if older than a month. (This function is called with the
+        current token, which should be perfectly fine).
+
+        Args:
+            user_id (str)
+            stream_ordering (int)
+
+        ReturnValue:
+            Deferred[frozenset[str]]: Set of room_ids the user is in at given
+            stream_ordering.
+        """
+        joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
+            user_id,
+        )
+
+        joined_room_ids = set()
+
+        # We need to check that the stream ordering of the join for each room
+        # is before the stream_ordering asked for. This might not be the case
+        # if the user joins a room between us getting the current token and
+        # calling `get_rooms_for_user_with_stream_ordering`.
+        # If the membership's stream ordering is after the given stream
+        # ordering, we need to go and work out if the user was in the room
+        # before.
+        for room_id, membership_stream_ordering in joined_rooms:
+            if membership_stream_ordering <= stream_ordering:
+                joined_room_ids.add(room_id)
+                continue
+
+            logger.info("User joined room after current token: %s", room_id)
+
+            extrems = yield self.store.get_forward_extremeties_for_room(
+                room_id, stream_ordering,
+            )
+            users_in_room = yield self.state.get_current_user_in_room(
+                room_id, extrems,
+            )
+            if user_id in users_in_room:
+                joined_room_ids.add(room_id)
+
+        joined_room_ids = frozenset(joined_room_ids)
+        defer.returnValue(joined_room_ids)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1411,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
 
 class SyncResultBuilder(object):
     "Used to help build up a new SyncResult for a user"
-    def __init__(self, sync_config, full_state, since_token, now_token):
+    def __init__(self, sync_config, full_state, since_token, now_token,
+                 joined_room_ids):
         """
         Args:
             sync_config(SyncConfig)
@@ -1423,6 +1471,7 @@ class SyncResultBuilder(object):
         self.full_state = full_state
         self.since_token = since_token
         self.now_token = now_token
+        self.joined_room_ids = joined_room_ids
 
         self.presence = []
         self.account_data = []
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 5d65b5fd6e..91179ce532 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -31,7 +31,7 @@ REQUIREMENTS = {
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
     "daemonize": ["daemonize"],
-    "bcrypt": ["bcrypt"],
+    "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
     "ujson": ["ujson"],
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 7301d885f2..6deecd3963 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -14,10 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage.directory import DirectoryStore
+from synapse.storage.directory import DirectoryWorkerStore
 
 
-class DirectoryStore(BaseSlavedStore):
-    get_aliases_for_room = DirectoryStore.__dict__[
-        "get_aliases_for_room"
-    ]
+class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index de0b26f437..b1f64ef0d8 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,15 +16,13 @@
 import logging
 
 from synapse.api.constants import EventTypes
-from synapse.storage import DataStore
-from synapse.storage.event_federation import EventFederationStore
+from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
-from synapse.storage.stream import StreamStore
-from synapse.storage.signatures import SignatureStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.stream import StreamWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
@@ -40,107 +38,33 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
-                       EventsWorkerStore, StateGroupWorkerStore,
+class SlavedEventStore(EventFederationWorkerStore,
+                       RoomMemberWorkerStore,
+                       EventPushActionsWorkerStore,
+                       StreamWorkerStore,
+                       EventsWorkerStore,
+                       StateGroupWorkerStore,
+                       SignatureWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedEventStore, self).__init__(db_conn, hs)
         self._stream_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering",
         )
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
 
-        self.stream_ordering_month_ago = 0
-        self._stream_order_on_start = self.get_room_max_stream_ordering()
+        super(SlavedEventStore, self).__init__(db_conn, hs)
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
-    get_latest_event_ids_in_room = EventFederationStore.__dict__[
-        "get_latest_event_ids_in_room"
-    ]
-
-    get_recent_event_ids_for_room = (
-        StreamStore.__dict__["get_recent_event_ids_for_room"]
-    )
-    has_room_changed_since = DataStore.has_room_changed_since.__func__
-
-    get_membership_changes_for_user = (
-        DataStore.get_membership_changes_for_user.__func__
-    )
-    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
-    get_room_events_stream_for_room = (
-        DataStore.get_room_events_stream_for_room.__func__
-    )
-    get_events_around = DataStore.get_events_around.__func__
-
-    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
-    get_room_events_stream_for_rooms = (
-        DataStore.get_room_events_stream_for_rooms.__func__
-    )
-    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
-    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
-    _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
-    get_backfill_events = DataStore.get_backfill_events.__func__
-    _get_backfill_events = DataStore._get_backfill_events.__func__
-    get_missing_events = DataStore.get_missing_events.__func__
-    _get_missing_events = DataStore._get_missing_events.__func__
-
-    get_auth_chain = DataStore.get_auth_chain.__func__
-    get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
-    _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
-
-    get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
-
-    get_forward_extremeties_for_room = (
-        DataStore.get_forward_extremeties_for_room.__func__
-    )
-    _get_forward_extremeties_for_room = (
-        EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
-    )
-
-    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
-
-    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
-    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
-
-    get_latest_event_ids_and_hashes_in_room = (
-        DataStore.get_latest_event_ids_and_hashes_in_room.__func__
-    )
-    _get_latest_event_ids_and_hashes_in_room = (
-        DataStore._get_latest_event_ids_and_hashes_in_room.__func__
-    )
-    _get_event_reference_hashes_txn = (
-        DataStore._get_event_reference_hashes_txn.__func__
-    )
-    add_event_hashes = (
-        DataStore.add_event_hashes.__func__
-    )
-    get_event_reference_hashes = (
-        SignatureStore.__dict__["get_event_reference_hashes"]
-    )
-    get_event_reference_hash = (
-        SignatureStore.__dict__["get_event_reference_hash"]
-    )
+
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index e27c7332d2..7323bf0f1e 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -14,20 +14,8 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.registration import RegistrationStore
+from synapse.storage.registration import RegistrationWorkerStore
 
 
-class SlavedRegistrationStore(BaseSlavedStore):
-    def __init__(self, db_conn, hs):
-        super(SlavedRegistrationStore, self).__init__(db_conn, hs)
-
-    # TODO: use the cached version and invalidate deleted tokens
-    get_user_by_access_token = RegistrationStore.__dict__[
-        "get_user_by_access_token"
-    ]
-
-    _query_for_auth = DataStore._query_for_auth.__func__
-    get_user_by_id = RegistrationStore.__dict__[
-        "get_user_by_id"
-    ]
+class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..5ae1670157 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -14,32 +14,19 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
+from synapse.storage.room import RoomWorkerStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(RoomStore, self).__init__(db_conn, hs)
         self._public_room_id_gen = SlavedIdTracker(
             db_conn, "public_room_list_stream", "stream_id"
         )
 
-    get_public_room_ids = DataStore.get_public_room_ids.__func__
-    get_current_public_room_stream_id = (
-        DataStore.get_current_public_room_stream_id.__func__
-    )
-    get_public_room_ids_at_stream_id = (
-        RoomStore.__dict__["get_public_room_ids_at_stream_id"]
-    )
-    get_public_room_ids_at_stream_id_txn = (
-        DataStore.get_public_room_ids_at_stream_id_txn.__func__
-    )
-    get_published_at_stream_id_txn = (
-        DataStore.get_published_at_stream_id_txn.__func__
-    )
-    get_public_room_changes = DataStore.get_public_room_changes.__func__
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(RoomStore, self).stream_positions()
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6073cc6fa2..dcf6215dad 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import AuthError, SynapseError
+from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, create_requester
 from synapse.http.servlet import parse_json_object_from_request
 
@@ -114,12 +114,18 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet):
 
 class PurgeHistoryRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns(
-        "/admin/purge_history/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
+        "/admin/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
     )
 
     def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer)
+        """
         super(PurgeHistoryRestServlet, self).__init__(hs)
         self.handlers = hs.get_handlers()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id, event_id):
@@ -133,8 +139,54 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
 
         delete_local_events = bool(body.get("delete_local_events", False))
 
+        # establish the topological ordering we should keep events from. The
+        # user can provide an event_id in the URL or the request body, or can
+        # provide a timestamp in the request body.
+        if event_id is None:
+            event_id = body.get('purge_up_to_event_id')
+
+        if event_id is not None:
+            event = yield self.store.get_event(event_id)
+
+            if event.room_id != room_id:
+                raise SynapseError(400, "Event is for wrong room.")
+
+            depth = event.depth
+            logger.info(
+                "[purge] purging up to depth %i (event_id %s)",
+                depth, event_id,
+            )
+        elif 'purge_up_to_ts' in body:
+            ts = body['purge_up_to_ts']
+            if not isinstance(ts, int):
+                raise SynapseError(
+                    400, "purge_up_to_ts must be an int",
+                    errcode=Codes.BAD_JSON,
+                )
+
+            stream_ordering = (
+                yield self.store.find_first_stream_ordering_after_ts(ts)
+            )
+
+            (_, depth, _) = (
+                yield self.store.get_room_event_after_stream_ordering(
+                    room_id, stream_ordering,
+                )
+            )
+            logger.info(
+                "[purge] purging up to depth %i (received_ts %i => "
+                "stream_ordering %i)",
+                depth, ts, stream_ordering,
+            )
+        else:
+            raise SynapseError(
+                400,
+                "must specify purge_up_to_event_id or purge_up_to_ts",
+                errcode=Codes.BAD_JSON,
+            )
+
         yield self.handlers.message_handler.purge_history(
-            room_id, event_id,
+            room_id, depth,
             delete_local_events=delete_local_events,
         )
 
@@ -180,6 +232,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
         self.state = hs.get_state_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id):
@@ -238,7 +291,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
             logger.info("Kicking %r from %r...", user_id, room_id)
 
             target_requester = create_requester(user_id)
-            yield self.handlers.room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester=target_requester,
                 target=target_requester.user,
                 room_id=room_id,
@@ -247,9 +300,9 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
                 ratelimit=False
             )
 
-            yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
+            yield self.room_member_handler.forget(target_requester.user, room_id)
 
-            yield self.handlers.room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester=target_requester,
                 target=target_requester.user,
                 room_id=new_room_id,
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 817fd47842..9d745174c7 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -84,6 +84,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         super(RoomStateEventRestServlet, self).__init__(hs)
         self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /room/$roomid/state/$eventtype
@@ -156,7 +157,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
 
         if event_type == EventTypes.Member:
             membership = content.get("membership", None)
-            event = yield self.handlers.room_member_handler.update_membership(
+            event = yield self.room_member_handler.update_membership(
                 requester,
                 target=UserID.from_string(state_key),
                 room_id=room_id,
@@ -229,7 +230,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 class JoinRoomAliasServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(JoinRoomAliasServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /join/$room_identifier[/$txn_id]
@@ -257,7 +258,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
             except Exception:
                 remote_room_hosts = None
         elif RoomAlias.is_valid(room_identifier):
-            handler = self.handlers.room_member_handler
+            handler = self.room_member_handler
             room_alias = RoomAlias.from_string(room_identifier)
             room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias)
             room_id = room_id.to_string()
@@ -266,7 +267,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
                 room_identifier,
             ))
 
-        yield self.handlers.room_member_handler.update_membership(
+        yield self.room_member_handler.update_membership(
             requester=requester,
             target=requester.user,
             room_id=room_id,
@@ -562,7 +563,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
 class RoomForgetRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(RoomForgetRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         PATTERNS = ("/rooms/(?P<room_id>[^/]*)/forget")
@@ -575,7 +576,7 @@ class RoomForgetRestServlet(ClientV1RestServlet):
             allow_guest=False,
         )
 
-        yield self.handlers.room_member_handler.forget(
+        yield self.room_member_handler.forget(
             user=requester.user,
             room_id=room_id,
         )
@@ -593,7 +594,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMembershipRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /rooms/$roomid/[invite|join|leave]
@@ -622,7 +623,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
             content = {}
 
         if membership_action == "invite" and self._has_3pid_invite_keys(content):
-            yield self.handlers.room_member_handler.do_3pid_invite(
+            yield self.room_member_handler.do_3pid_invite(
                 room_id,
                 requester.user,
                 content["medium"],
@@ -644,7 +645,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
         if 'reason' in content and membership_action in ['kick', 'ban']:
             event_content = {'reason': content['reason']}
 
-        yield self.handlers.room_member_handler.update_membership(
+        yield self.room_member_handler.update_membership(
             requester=requester,
             target=target,
             room_id=room_id,
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c6f4680a76..0ba62bddc1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -183,7 +183,7 @@ class RegisterRestServlet(RestServlet):
         self.auth_handler = hs.get_auth_handler()
         self.registration_handler = hs.get_handlers().registration_handler
         self.identity_handler = hs.get_handlers().identity_handler
-        self.room_member_handler = hs.get_handlers().room_member_handler
+        self.room_member_handler = hs.get_room_member_handler()
         self.device_handler = hs.get_device_handler()
         self.macaroon_gen = hs.get_macaroon_generator()
 
diff --git a/synapse/server.py b/synapse/server.py
index fbd602d40e..5b6effbe31 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -45,6 +45,7 @@ from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.room_member import RoomMemberHandler
 from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
@@ -145,6 +146,7 @@ class HomeServer(object):
         'groups_attestation_signing',
         'groups_attestation_renewer',
         'spam_checker',
+        'room_member_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -382,6 +384,9 @@ class HomeServer(object):
     def build_spam_checker(self):
         return SpamChecker(self)
 
+    def build_room_member_handler(self):
+        return RoomMemberHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0f136f8a06..de00cae447 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -141,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
         else:
             self._cache_id_gen = None
 
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
-
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
@@ -204,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
+        events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
             db_conn, "current_state_delta_stream",
             entity_column="room_id",
@@ -228,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=_group_updates_prefill,
         )
 
-        cur = LoggingTransaction(
-            db_conn.cursor(),
-            name="_find_stream_orderings_for_times_txn",
-            database_engine=self.database_engine,
-            after_callbacks=[],
-            final_callbacks=[],
-        )
-        self._find_stream_orderings_for_times_txn(cur)
-        cur.close()
-
-        self.find_stream_orderings_looping_call = self._clock.looping_call(
-            self._find_stream_orderings_for_times, 10 * 60 * 1000
-        )
-
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 68125006eb..2fbebd4907 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -48,16 +48,16 @@ class LoggingTransaction(object):
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
     __slots__ = [
-        "txn", "name", "database_engine", "after_callbacks", "final_callbacks",
+        "txn", "name", "database_engine", "after_callbacks", "exception_callbacks",
     ]
 
     def __init__(self, txn, name, database_engine, after_callbacks,
-                 final_callbacks):
+                 exception_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
-        object.__setattr__(self, "final_callbacks", final_callbacks)
+        object.__setattr__(self, "exception_callbacks", exception_callbacks)
 
     def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
@@ -66,8 +66,8 @@ class LoggingTransaction(object):
         """
         self.after_callbacks.append((callback, args, kwargs))
 
-    def call_finally(self, callback, *args, **kwargs):
-        self.final_callbacks.append((callback, args, kwargs))
+    def call_on_exception(self, callback, *args, **kwargs):
+        self.exception_callbacks.append((callback, args, kwargs))
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -215,7 +215,7 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
+    def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
                          logging_context, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
@@ -236,7 +236,7 @@ class SQLBaseStore(object):
                     txn = conn.cursor()
                     txn = LoggingTransaction(
                         txn, name, self.database_engine, after_callbacks,
-                        final_callbacks,
+                        exception_callbacks,
                     )
                     r = func(txn, *args, **kwargs)
                     conn.commit()
@@ -308,11 +308,11 @@ class SQLBaseStore(object):
         current_context = LoggingContext.current_context()
 
         after_callbacks = []
-        final_callbacks = []
+        exception_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
             return self._new_transaction(
-                conn, desc, after_callbacks, final_callbacks, current_context,
+                conn, desc, after_callbacks, exception_callbacks, current_context,
                 func, *args, **kwargs
             )
 
@@ -321,9 +321,10 @@ class SQLBaseStore(object):
 
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
-        finally:
-            for after_callback, after_args, after_kwargs in final_callbacks:
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            for after_callback, after_args, after_kwargs in exception_callbacks:
                 after_callback(*after_args, **after_kwargs)
+            raise
 
         defer.returnValue(result)
 
@@ -1000,7 +1001,8 @@ class SQLBaseStore(object):
             # __exit__ called after the transaction finishes.
             ctx = self._cache_id_gen.get_next()
             stream_id = ctx.__enter__()
-            txn.call_finally(ctx.__exit__, None, None, None)
+            txn.call_on_exception(ctx.__exit__, None, None, None)
+            txn.call_after(ctx.__exit__, None, None, None)
             txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
             self._simple_insert_txn(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 90fb51d43c..12ea8a158c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,11 +18,9 @@ import re
 import simplejson as json
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
-from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
 
 
 class ApplicationServiceStore(ApplicationServiceWorkerStore):
-
-    def __init__(self, db_conn, hs):
-        super(ApplicationServiceStore, self).__init__(db_conn, hs)
-        self.hostname = hs.hostname
-
-    def get_app_service_rooms(self, service):
-        """Get a list of RoomsForUser for this application service.
-
-        Application services may be "interested" in lots of rooms depending on
-        the room ID, the room aliases, or the members in the room. This function
-        takes all of these into account and returns a list of RoomsForUser which
-        represent the entire list of room IDs that this application service
-        wants to know about.
-
-        Args:
-            service: The application service to get a room list for.
-        Returns:
-            A list of RoomsForUser.
-        """
-        return self.runInteraction(
-            "get_app_service_rooms",
-            self._get_app_service_rooms_txn,
-            service,
-        )
-
-    def _get_app_service_rooms_txn(self, txn, service):
-        # get all rooms matching the room ID regex.
-        room_entries = self._simple_select_list_txn(
-            txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
-        )
-        matching_room_list = set([
-            r["room_id"] for r in room_entries if
-            service.is_interested_in_room(r["room_id"])
-        ])
-
-        # resolve room IDs for matching room alias regex.
-        room_alias_mappings = self._simple_select_list_txn(
-            txn=txn, table="room_aliases", keyvalues=None,
-            retcols=["room_id", "room_alias"]
-        )
-        matching_room_list |= set([
-            r["room_id"] for r in room_alias_mappings if
-            service.is_interested_in_alias(r["room_alias"])
-        ])
-
-        # get all rooms for every user for this AS. This is scoped to users on
-        # this HS only.
-        user_list = self._simple_select_list_txn(
-            txn=txn, table="users", keyvalues=None, retcols=["name"]
-        )
-        user_list = [
-            u["name"] for u in user_list if
-            service.is_interested_in_user(u["name"])
-        ]
-        rooms_for_user_matching_user_id = set()  # RoomsForUser list
-        for user_id in user_list:
-            # FIXME: This assumes this store is linked with RoomMemberStore :(
-            rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
-                txn=txn,
-                user_id=user_id,
-                membership_list=[Membership.JOIN]
-            )
-            rooms_for_user_matching_user_id |= set(rooms_for_user)
-
-        # make RoomsForUser tuples for room ids and aliases which are not in the
-        # main rooms_for_user_list - e.g. they are rooms which do not have AS
-        # registered users in it.
-        known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
-        missing_rooms_for_user = [
-            RoomsForUser(r, service.sender, "join") for r in
-            matching_room_list if r not in known_room_ids
-        ]
-        rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
-
-        return rooms_for_user_matching_user_id
+    # This is currently empty due to there not being any AS storage functions
+    # that can't be run on the workers. Since this may change in future, and
+    # to keep consistency with the other stores, we keep this empty class for
+    # now.
+    pass
 
 
 class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 79e7c540ad..d0c0059757 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -29,8 +29,7 @@ RoomAliasMapping = namedtuple(
 )
 
 
-class DirectoryStore(SQLBaseStore):
-
+class DirectoryWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
         """ Get's the room_id and server list for a given room_alias
@@ -69,6 +68,28 @@ class DirectoryStore(SQLBaseStore):
             RoomAliasMapping(room_id, room_alias.to_string(), servers)
         )
 
+    def get_room_alias_creator(self, room_alias):
+        return self._simple_select_one_onecol(
+            table="room_aliases",
+            keyvalues={
+                "room_alias": room_alias,
+            },
+            retcol="creator",
+            desc="get_room_alias_creator",
+            allow_none=True
+        )
+
+    @cached(max_entries=5000)
+    def get_aliases_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_aliases",
+            {"room_id": room_id},
+            "room_alias",
+            desc="get_aliases_for_room",
+        )
+
+
+class DirectoryStore(DirectoryWorkerStore):
     @defer.inlineCallbacks
     def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
         """ Creates an associatin between  a room alias and room_id/servers
@@ -116,17 +137,6 @@ class DirectoryStore(SQLBaseStore):
             )
         defer.returnValue(ret)
 
-    def get_room_alias_creator(self, room_alias):
-        return self._simple_select_one_onecol(
-            table="room_aliases",
-            keyvalues={
-                "room_alias": room_alias,
-            },
-            retcol="creator",
-            desc="get_room_alias_creator",
-            allow_none=True
-        )
-
     @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
         room_id = yield self.runInteraction(
@@ -135,7 +145,6 @@ class DirectoryStore(SQLBaseStore):
             room_alias,
         )
 
-        self.get_aliases_for_room.invalidate((room_id,))
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
@@ -160,17 +169,12 @@ class DirectoryStore(SQLBaseStore):
             (room_alias.to_string(),)
         )
 
-        return room_id
-
-    @cached(max_entries=5000)
-    def get_aliases_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_aliases",
-            {"room_id": room_id},
-            "room_alias",
-            desc="get_aliases_for_room",
+        self._invalidate_cache_and_stream(
+            txn, self.get_aliases_for_room, (room_id,)
         )
 
+        return room_id
+
     def update_aliases_for_room(self, old_room_id, new_room_id, creator):
         def _update_aliases_for_room_txn(txn):
             sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 55a05c59d5..00ee82d300 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -15,7 +15,10 @@
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+
 from synapse.api.errors import StoreError
 from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
@@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty
 logger = logging.getLogger(__name__)
 
 
-class EventFederationStore(SQLBaseStore):
-    """ Responsible for storing and serving up the various graphs associated
-    with an event. Including the main event graph and the auth chains for an
-    event.
-
-    Also has methods for getting the front (latest) and back (oldest) edges
-    of the event graphs. These are used to generate the parents for new events
-    and backfilling from another server respectively.
-    """
-
-    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
-
-    def __init__(self, db_conn, hs):
-        super(EventFederationStore, self).__init__(db_conn, hs)
-
-        self.register_background_update_handler(
-            self.EVENT_AUTH_STATE_ONLY,
-            self._background_delete_non_state_event_auth,
-        )
-
-        hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
-        )
-
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
+                                 SQLBaseStore):
     def get_auth_chain(self, event_ids, include_given=False):
         """Get auth events for given event_ids. The events *must* be state events.
 
@@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore):
 
         return int(min_depth) if min_depth is not None else None
 
-    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
-        min_depth = self._get_min_depth_interaction(txn, room_id)
-
-        if min_depth and depth >= min_depth:
-            return
-
-        self._simple_upsert_txn(
-            txn,
-            table="room_depth",
-            keyvalues={
-                "room_id": room_id,
-            },
-            values={
-                "min_depth": depth,
-            },
-        )
-
-    def _handle_mult_prev_events(self, txn, events):
-        """
-        For the given event, update the event edges table and forward and
-        backward extremities tables.
-        """
-        self._simple_insert_many_txn(
-            txn,
-            table="event_edges",
-            values=[
-                {
-                    "event_id": ev.event_id,
-                    "prev_event_id": e_id,
-                    "room_id": ev.room_id,
-                    "is_state": False,
-                }
-                for ev in events
-                for e_id, _ in ev.prev_events
-            ],
-        )
-
-        self._update_backward_extremeties(txn, events)
-
-    def _update_backward_extremeties(self, txn, events):
-        """Updates the event_backward_extremities tables based on the new/updated
-        events being persisted.
-
-        This is called for new events *and* for events that were outliers, but
-        are now being persisted as non-outliers.
-
-        Forward extremities are handled when we first start persisting the events.
-        """
-        events_by_room = {}
-        for ev in events:
-            events_by_room.setdefault(ev.room_id, []).append(ev)
-
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-            " )"
-            " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
-            " )"
-        )
-
-        txn.executemany(query, [
-            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id, _ in ev.prev_events
-            if not ev.internal_metadata.is_outlier()
-        ])
-
-        query = (
-            "DELETE FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-        )
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id) for ev in events
-                if not ev.internal_metadata.is_outlier()
-            ]
-        )
-
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
         """For a given room_id and stream_ordering, return the forward
         extremeties of the room at that point in "time".
@@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore):
             get_forward_extremeties_for_room_txn
         )
 
-    def _delete_old_forward_extrem_cache(self):
-        def _delete_old_forward_extrem_cache_txn(txn):
-            # Delete entries older than a month, while making sure we don't delete
-            # the only entries for a room.
-            sql = ("""
-                DELETE FROM stream_ordering_to_exterm
-                WHERE
-                room_id IN (
-                    SELECT room_id
-                    FROM stream_ordering_to_exterm
-                    WHERE stream_ordering > ?
-                ) AND stream_ordering < ?
-            """)
-            txn.execute(
-                sql,
-                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
-            )
-        return self.runInteraction(
-            "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
-        )
-
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
@@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore):
 
         return event_results
 
+
+class EventFederationStore(EventFederationWorkerStore):
+    """ Responsible for storing and serving up the various graphs associated
+    with an event. Including the main event graph and the auth chains for an
+    event.
+
+    Also has methods for getting the front (latest) and back (oldest) edges
+    of the event graphs. These are used to generate the parents for new events
+    and backfilling from another server respectively.
+    """
+
+    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
+    def __init__(self, db_conn, hs):
+        super(EventFederationStore, self).__init__(db_conn, hs)
+
+        self.register_background_update_handler(
+            self.EVENT_AUTH_STATE_ONLY,
+            self._background_delete_non_state_event_auth,
+        )
+
+        hs.get_clock().looping_call(
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+        )
+
+    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+        min_depth = self._get_min_depth_interaction(txn, room_id)
+
+        if min_depth and depth >= min_depth:
+            return
+
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
+
+    def _handle_mult_prev_events(self, txn, events):
+        """
+        For the given event, update the event edges table and forward and
+        backward extremities tables.
+        """
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
+                    "event_id": ev.event_id,
+                    "prev_event_id": e_id,
+                    "room_id": ev.room_id,
+                    "is_state": False,
+                }
+                for ev in events
+                for e_id, _ in ev.prev_events
+            ],
+        )
+
+        self._update_backward_extremeties(txn, events)
+
+    def _update_backward_extremeties(self, txn, events):
+        """Updates the event_backward_extremities tables based on the new/updated
+        events being persisted.
+
+        This is called for new events *and* for events that were outliers, but
+        are now being persisted as non-outliers.
+
+        Forward extremities are handled when we first start persisting the events.
+        """
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
+
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
+
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
+
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [
+                (ev.event_id, ev.room_id) for ev in events
+                if not ev.internal_metadata.is_outlier()
+            ]
+        )
+
+    def _delete_old_forward_extrem_cache(self):
+        def _delete_old_forward_extrem_cache_txn(txn):
+            # Delete entries older than a month, while making sure we don't delete
+            # the only entries for a room.
+            sql = ("""
+                DELETE FROM stream_ordering_to_exterm
+                WHERE
+                room_id IN (
+                    SELECT room_id
+                    FROM stream_ordering_to_exterm
+                    WHERE stream_ordering > ?
+                ) AND stream_ordering < ?
+            """)
+            txn.execute(
+                sql,
+                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+            )
+        return self.runInteraction(
+            "_delete_old_forward_extrem_cache",
+            _delete_old_forward_extrem_cache_txn
+        )
+
     def clean_room_for_join(self, room_id):
         return self.runInteraction(
             "clean_room_for_join",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6454045c2d..01f8339825 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
 from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
 
 
 class EventPushActionsWorkerStore(SQLBaseStore):
+    def __init__(self, db_conn, hs):
+        super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+        # These get correctly set by _find_stream_orderings_for_times_txn
+        self.stream_ordering_month_ago = None
+        self.stream_ordering_day_ago = None
+
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[],
+            exception_callbacks=[],
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 10 * 60 * 1000
+        )
+
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
@@ -443,6 +464,128 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             desc="remove_push_actions_from_staging",
         )
 
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
+        )
+        logger.info("Searching for stream ordering 1 day ago")
+        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 day ago: it's %d",
+            self.stream_ordering_day_ago
+        )
+
+    def find_first_stream_ordering_after_ts(self, ts):
+        """Gets the stream ordering corresponding to a given timestamp.
+
+        Specifically, finds the stream_ordering of the first event that was
+        received on or after the timestamp. This is done by a binary search on
+        the events table, since there is no index on received_ts, so is
+        relatively slow.
+
+        Args:
+            ts (int): timestamp in millis
+
+        Returns:
+            Deferred[int]: stream ordering of the first event received on/after
+                the timestamp
+        """
+        return self.runInteraction(
+            "_find_first_stream_ordering_after_ts_txn",
+            self._find_first_stream_ordering_after_ts_txn,
+            ts,
+        )
+
+    @staticmethod
+    def _find_first_stream_ordering_after_ts_txn(txn, ts):
+        """
+        Find the stream_ordering of the first event that was received on or
+        after a given timestamp. This is relatively slow as there is no index
+        on received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+
+        Args:
+            txn (twisted.enterprise.adbapi.Transaction):
+            ts (int): timestamp to search for
+
+        Returns:
+            int: stream ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        # We want the first stream_ordering in which received_ts is greater
+        # than or equal to ts. Call this point X.
+        #
+        # We maintain the invariants:
+        #
+        #   range_start <= X <= range_end
+        #
+        range_start = 0
+        range_end = max_stream_ordering + 1
+
+        # Given a stream_ordering, look up the timestamp at that
+        # stream_ordering.
+        #
+        # The array may be sparse (we may be missing some stream_orderings).
+        # We treat the gaps as the same as having the same value as the
+        # preceding entry, because we will pick the lowest stream_ordering
+        # which satisfies our requirement of received_ts >= ts.
+        #
+        # For example, if our array of events indexed by stream_ordering is
+        # [10, <none>, 20], we should treat this as being equivalent to
+        # [10, 10, 20].
+        #
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering <= ?"
+            " ORDER BY stream_ordering DESC"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 0:
+            middle = (range_end + range_start) // 2
+            txn.execute(sql, (middle,))
+            row = txn.fetchone()
+            if row is None:
+                # no rows with stream_ordering<=middle
+                range_start = middle + 1
+                continue
+
+            middle_ts = row[0]
+            if ts > middle_ts:
+                # we got a timestamp lower than the one we were looking for.
+                # definitely need to look higher: X > middle.
+                range_start = middle + 1
+            else:
+                # we got a timestamp higher than (or the same as) the one we
+                # were looking for. We aren't yet sure about the point we
+                # looked up, but we can be sure that X <= middle.
+                range_end = middle
+
+        return range_end
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -651,69 +794,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         """, (room_id, user_id, stream_ordering))
 
     @defer.inlineCallbacks
-    def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
-            "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
-        )
-
-    def _find_stream_orderings_for_times_txn(self, txn):
-        logger.info("Searching for stream ordering 1 month ago")
-        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 month ago: it's %d",
-            self.stream_ordering_month_ago
-        )
-        logger.info("Searching for stream ordering 1 day ago")
-        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 day ago: it's %d",
-            self.stream_ordering_day_ago
-        )
-
-    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
-        """
-        Find the stream_ordering of the first event that was received after
-        a given timestamp. This is relatively slow as there is no index on
-        received_ts but we can then use this to delete push actions before
-        this.
-
-        received_ts must necessarily be in the same order as stream_ordering
-        and stream_ordering is indexed, so we manually binary search using
-        stream_ordering
-        """
-        txn.execute("SELECT MAX(stream_ordering) FROM events")
-        max_stream_ordering = txn.fetchone()[0]
-
-        if max_stream_ordering is None:
-            return 0
-
-        range_start = 0
-        range_end = max_stream_ordering
-
-        sql = (
-            "SELECT received_ts FROM events"
-            " WHERE stream_ordering > ?"
-            " ORDER BY stream_ordering"
-            " LIMIT 1"
-        )
-
-        while range_end - range_start > 1:
-            middle = int((range_end + range_start) / 2)
-            txn.execute(sql, (middle,))
-            middle_ts = txn.fetchone()[0]
-            if ts > middle_ts:
-                range_start = middle
-            else:
-                range_end = middle
-
-        return range_end
-
-    @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
             return
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 057b1be4d5..826fad307e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -754,7 +754,7 @@ class EventsStore(EventsWorkerStore):
 
                 for member in members_changed:
                     self._invalidate_cache_and_stream(
-                        txn, self.get_rooms_for_user, (member,)
+                        txn, self.get_rooms_for_user_with_stream_ordering, (member,)
                     )
 
                 for host in set(get_domain_from_id(u) for u in members_changed):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 95f75d6df1..d809b2ba46 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -19,10 +19,70 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError, Codes
 from synapse.storage import background_updates
+from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 
-class RegistrationStore(background_updates.BackgroundUpdateStore):
+class RegistrationWorkerStore(SQLBaseStore):
+    @cached()
+    def get_user_by_id(self, user_id):
+        return self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash", "is_guest"],
+            allow_none=True,
+            desc="get_user_by_id",
+        )
+
+    @cached()
+    def get_user_by_access_token(self, token):
+        """Get a user from the given access token.
+
+        Args:
+            token (str): The access token of a user.
+        Returns:
+            defer.Deferred: None, if the token did not match, otherwise dict
+                including the keys `name`, `is_guest`, `device_id`, `token_id`.
+        """
+        return self.runInteraction(
+            "get_user_by_access_token",
+            self._query_for_auth,
+            token
+        )
+
+    @defer.inlineCallbacks
+    def is_server_admin(self, user):
+        res = yield self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user.to_string()},
+            retcol="admin",
+            allow_none=True,
+            desc="is_server_admin",
+        )
+
+        defer.returnValue(res if res else False)
+
+    def _query_for_auth(self, txn, token):
+        sql = (
+            "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
+            " access_tokens.device_id"
+            " FROM users"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
+            " WHERE token = ?"
+        )
+
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
+        if rows:
+            return rows[0]
+
+        return None
+
+
+class RegistrationStore(RegistrationWorkerStore,
+                        background_updates.BackgroundUpdateStore):
 
     def __init__(self, db_conn, hs):
         super(RegistrationStore, self).__init__(db_conn, hs)
@@ -187,18 +247,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         )
         txn.call_after(self.is_guest.invalidate, (user_id,))
 
-    @cached()
-    def get_user_by_id(self, user_id):
-        return self._simple_select_one(
-            table="users",
-            keyvalues={
-                "name": user_id,
-            },
-            retcols=["name", "password_hash", "is_guest"],
-            allow_none=True,
-            desc="get_user_by_id",
-        )
-
     def get_users_by_id_case_insensitive(self, user_id):
         """Gets users that match user_id case insensitively.
         Returns a mapping of user_id -> password_hash.
@@ -304,34 +352,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         return self.runInteraction("delete_access_token", f)
 
-    @cached()
-    def get_user_by_access_token(self, token):
-        """Get a user from the given access token.
-
-        Args:
-            token (str): The access token of a user.
-        Returns:
-            defer.Deferred: None, if the token did not match, otherwise dict
-                including the keys `name`, `is_guest`, `device_id`, `token_id`.
-        """
-        return self.runInteraction(
-            "get_user_by_access_token",
-            self._query_for_auth,
-            token
-        )
-
-    @defer.inlineCallbacks
-    def is_server_admin(self, user):
-        res = yield self._simple_select_one_onecol(
-            table="users",
-            keyvalues={"name": user.to_string()},
-            retcol="admin",
-            allow_none=True,
-            desc="is_server_admin",
-        )
-
-        defer.returnValue(res if res else False)
-
     @cachedInlineCallbacks()
     def is_guest(self, user_id):
         res = yield self._simple_select_one_onecol(
@@ -344,22 +364,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         defer.returnValue(res if res else False)
 
-    def _query_for_auth(self, txn, token):
-        sql = (
-            "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
-            " access_tokens.device_id"
-            " FROM users"
-            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
-            " WHERE token = ?"
-        )
-
-        txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return rows[0]
-
-        return None
-
     @defer.inlineCallbacks
     def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
         yield self._simple_upsert("user_threepids", {
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fff6652e05..7f2c08d7a6 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
@@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple(
 )
 
 
-class RoomStore(SearchStore):
+class RoomWorkerStore(SQLBaseStore):
+    def get_public_room_ids(self):
+        return self._simple_select_onecol(
+            table="rooms",
+            keyvalues={
+                "is_public": True,
+            },
+            retcol="room_id",
+            desc="get_public_room_ids",
+        )
+
+    @cached(num_args=2, max_entries=100)
+    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+        """Get pulbic rooms for a particular list, or across all lists.
+
+        Args:
+            stream_id (int)
+            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+                means the main list, None means all lsits.
+        """
+        return self.runInteraction(
+            "get_public_room_ids_at_stream_id",
+            self.get_public_room_ids_at_stream_id_txn,
+            stream_id, network_tuple=network_tuple
+        )
+
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+                                             network_tuple):
+        return {
+            rm
+            for rm, vis in self.get_published_at_stream_id_txn(
+                txn, stream_id, network_tuple=network_tuple
+            ).items()
+            if vis
+        }
+
+    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+        if network_tuple:
+            # We want to get from a particular list. No aggregation required.
+
+            sql = ("""
+                SELECT room_id, visibility FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT room_id, max(stream_id) AS stream_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ? %s
+                    GROUP BY room_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            if network_tuple.appservice_id is not None:
+                txn.execute(
+                    sql % ("AND appservice_id = ? AND network_id = ?",),
+                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+                )
+            else:
+                txn.execute(
+                    sql % ("AND appservice_id IS NULL",),
+                    (stream_id,)
+                )
+            return dict(txn)
+        else:
+            # We want to get from all lists, so we need to aggregate the results
+
+            logger.info("Executing full list")
+
+            sql = ("""
+                SELECT room_id, visibility
+                FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT
+                        room_id, max(stream_id) AS stream_id, appservice_id,
+                        network_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ?
+                    GROUP BY room_id, appservice_id, network_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            txn.execute(
+                sql,
+                (stream_id,)
+            )
+
+            results = {}
+            # A room is visible if its visible on any list.
+            for room_id, visibility in txn:
+                results[room_id] = bool(visibility) or results.get(room_id, False)
+
+            return results
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id,
+                                network_tuple):
+        def get_public_room_changes_txn(txn):
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(
+                txn, prev_stream_id, network_tuple
+            )
+
+            now_rooms_dict = self.get_published_at_stream_id_txn(
+                txn, new_stream_id, network_tuple
+            )
+
+            now_rooms_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if vis
+            )
+            now_rooms_not_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if not vis
+            )
+
+            newly_visible = now_rooms_visible - then_rooms
+            newly_unpublished = now_rooms_not_visible & then_rooms
+
+            return newly_visible, newly_unpublished
+
+        return self.runInteraction(
+            "get_public_room_changes", get_public_room_changes_txn
+        )
+
+
+class RoomStore(RoomWorkerStore, SearchStore):
 
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
@@ -225,16 +345,6 @@ class RoomStore(SearchStore):
             )
         self.hs.get_notifier().on_new_replication_data()
 
-    def get_public_room_ids(self):
-        return self._simple_select_onecol(
-            table="rooms",
-            keyvalues={
-                "is_public": True,
-            },
-            retcol="room_id",
-            desc="get_public_room_ids",
-        )
-
     def get_room_count(self):
         """Retrieve a list of all rooms
         """
@@ -326,113 +436,6 @@ class RoomStore(SearchStore):
     def get_current_public_room_stream_id(self):
         return self._public_room_id_gen.get_current_token()
 
-    @cached(num_args=2, max_entries=100)
-    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
-        """Get pulbic rooms for a particular list, or across all lists.
-
-        Args:
-            stream_id (int)
-            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
-                means the main list, None means all lsits.
-        """
-        return self.runInteraction(
-            "get_public_room_ids_at_stream_id",
-            self.get_public_room_ids_at_stream_id_txn,
-            stream_id, network_tuple=network_tuple
-        )
-
-    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
-                                             network_tuple):
-        return {
-            rm
-            for rm, vis in self.get_published_at_stream_id_txn(
-                txn, stream_id, network_tuple=network_tuple
-            ).items()
-            if vis
-        }
-
-    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
-        if network_tuple:
-            # We want to get from a particular list. No aggregation required.
-
-            sql = ("""
-                SELECT room_id, visibility FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT room_id, max(stream_id) AS stream_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ? %s
-                    GROUP BY room_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            if network_tuple.appservice_id is not None:
-                txn.execute(
-                    sql % ("AND appservice_id = ? AND network_id = ?",),
-                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
-                )
-            else:
-                txn.execute(
-                    sql % ("AND appservice_id IS NULL",),
-                    (stream_id,)
-                )
-            return dict(txn)
-        else:
-            # We want to get from all lists, so we need to aggregate the results
-
-            logger.info("Executing full list")
-
-            sql = ("""
-                SELECT room_id, visibility
-                FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT
-                        room_id, max(stream_id) AS stream_id, appservice_id,
-                        network_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ?
-                    GROUP BY room_id, appservice_id, network_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            txn.execute(
-                sql,
-                (stream_id,)
-            )
-
-            results = {}
-            # A room is visible if its visible on any list.
-            for room_id, visibility in txn:
-                results[room_id] = bool(visibility) or results.get(room_id, False)
-
-            return results
-
-    def get_public_room_changes(self, prev_stream_id, new_stream_id,
-                                network_tuple):
-        def get_public_room_changes_txn(txn):
-            then_rooms = self.get_public_room_ids_at_stream_id_txn(
-                txn, prev_stream_id, network_tuple
-            )
-
-            now_rooms_dict = self.get_published_at_stream_id_txn(
-                txn, new_stream_id, network_tuple
-            )
-
-            now_rooms_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if vis
-            )
-            now_rooms_not_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if not vis
-            )
-
-            newly_visible = now_rooms_visible - then_rooms
-            newly_unpublished = now_rooms_not_visible & then_rooms
-
-            return newly_visible, newly_unpublished
-
-        return self.runInteraction(
-            "get_public_room_changes", get_public_room_changes_txn
-        )
-
     def get_all_new_public_rooms(self, prev_id, current_id, limit):
         def get_all_new_public_rooms(txn):
             sql = ("""
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d79877dac7..52e19e16b0 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -38,6 +38,11 @@ RoomsForUser = namedtuple(
     ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
+GetRoomsForUserWithStreamOrdering = namedtuple(
+    "_GetRoomsForUserWithStreamOrdering",
+    ("room_id", "stream_ordering",)
+)
+
 
 # We store this using a namedtuple so that we save about 3x space over using a
 # dict.
@@ -181,12 +186,32 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         return results
 
     @cachedInlineCallbacks(max_entries=500000, iterable=True)
-    def get_rooms_for_user(self, user_id):
+    def get_rooms_for_user_with_stream_ordering(self, user_id):
         """Returns a set of room_ids the user is currently joined to
+
+        Args:
+            user_id (str)
+
+        Returns:
+            Deferred[frozenset[GetRoomsForUserWithStreamOrdering]]: Returns
+            the rooms the user is in currently, along with the stream ordering
+            of the most recent join for that user and room.
         """
         rooms = yield self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
+        defer.returnValue(frozenset(
+            GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+            for r in rooms
+        ))
+
+    @defer.inlineCallbacks
+    def get_rooms_for_user(self, user_id, on_invalidate=None):
+        """Returns a set of room_ids the user is currently joined to
+        """
+        rooms = yield self.get_rooms_for_user_with_stream_ordering(
+            user_id, on_invalidate=on_invalidate,
+        )
         defer.returnValue(frozenset(r.room_id for r in rooms))
 
     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 67d5d9969a..9e6eaaa532 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -22,12 +22,12 @@ from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.util.caches.descriptors import cached, cachedList
 
 
-class SignatureStore(SQLBaseStore):
-    """Persistence for event signatures and hashes"""
-
+class SignatureWorkerStore(SQLBaseStore):
     @cached()
     def get_event_reference_hash(self, event_id):
-        return self._get_event_reference_hashes_txn(event_id)
+        # This is a dummy function to allow get_event_reference_hashes
+        # to use its cache
+        raise NotImplementedError()
 
     @cachedList(cached_method_name="get_event_reference_hash",
                 list_name="event_ids", num_args=1)
@@ -74,6 +74,10 @@ class SignatureStore(SQLBaseStore):
         txn.execute(query, (event_id, ))
         return {k: v for k, v in txn}
 
+
+class SignatureStore(SignatureWorkerStore):
+    """Persistence for event signatures and hashes"""
+
     def _store_event_reference_hashes_txn(self, txn, events):
         """Store a hash for a PDU
         Args:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..2956c3b3e0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,16 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
 from synapse.util.caches.descriptors import cached
-from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
+import abc
 import logging
 
 
@@ -143,81 +146,41 @@ def filter_to_clause(event_filter):
     return " AND ".join(clauses), args
 
 
-class StreamStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
-        # NB this lives here instead of appservice.py so we can reuse the
-        # 'private' StreamToken class in this file.
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
-
-        # select all the events between from/to with a sensible limit
-        sql = (
-            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e "
-            "LEFT JOIN state_events as s ON "
-            "e.event_id = s.event_id "
-            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "limit": limit
-        }
-
-        def f(txn):
-            # pull out all the events between the tokens
-            txn.execute(sql, (from_id.stream, to_id.stream,))
-            rows = self.cursor_to_dict(txn)
-
-            # Logic:
-            #  - We want ALL events which match the AS room_id regex
-            #  - We want ALL events which match the rooms represented by the AS
-            #    room_alias regex
-            #  - We want ALL events for rooms that AS users have joined.
-            # This is currently supported via get_app_service_rooms (which is
-            # used for the Notifier listener rooms). We can't reasonably make a
-            # SQL query for these room IDs, so we'll pull all the events between
-            # from/to and filter in python.
-            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
-            room_ids_for_as = [r.room_id for r in rooms_for_as]
-
-            def app_service_interested(row):
-                if row["room_id"] in room_ids_for_as:
-                    return True
-
-                if row["type"] == EventTypes.Member:
-                    if service.is_interested_in_user(row.get("state_key")):
-                        return True
-                return False
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+    """This is an abstract base class where subclasses must implement
+    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
+    which can be called in the initializer.
+    """
 
-            return [r for r in rows if app_service_interested(r)]
+    __metaclass__ = abc.ABCMeta
 
-        rows = yield self.runInteraction("get_appservice_room_stream", f)
+    def __init__(self, db_conn, hs):
+        super(StreamWorkerStore, self).__init__(db_conn, hs)
 
-        ret = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
+        events_max = self.get_room_max_stream_ordering()
+        event_cache_prefill, min_event_val = self._get_cache_dict(
+            db_conn, "events",
+            entity_column="room_id",
+            stream_column="stream_ordering",
+            max_value=events_max,
+        )
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", min_event_val,
+            prefilled_cache=event_cache_prefill,
+        )
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
         )
 
-        self._set_before_and_after(ret, rows, topo_order=from_id is None)
+        self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-        if rows:
-            key = "s%d" % max(r["stream_ordering"] for r in rows)
-        else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = to_key
+    @abc.abstractmethod
+    def get_room_max_stream_ordering(self):
+        raise NotImplementedError()
 
-        defer.returnValue((ret, key))
+    @abc.abstractmethod
+    def get_room_min_stream_ordering(self):
+        raise NotImplementedError()
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +344,6 @@ class StreamStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
-        # Tokens really represent positions between elements, but we use
-        # the convention of pointing to the event before the gap. Hence
-        # we have a bit of asymmetry when it comes to equalities.
-        args = [False, room_id]
-        if direction == 'b':
-            order = "DESC"
-            bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-        else:
-            order = "ASC"
-            bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-
-        filter_clause, filter_args = filter_to_clause(event_filter)
-
-        if filter_clause:
-            bounds += " AND " + filter_clause
-            args.extend(filter_args)
-
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
-
-        sql = (
-            "SELECT * FROM events"
-            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
-            " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
-        ) % {
-            "bounds": bounds,
-            "order": order,
-            "limit": limit_str
-        }
-
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
-
-            return rows, next_token,
-
-        rows, token = yield self.runInteraction("paginate_room_events", f)
-
-        events = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-
-        self._set_before_and_after(events, rows)
-
-        defer.returnValue((events, token))
-
-    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
         rows, token = yield self.get_recent_event_ids_for_room(
             room_id, limit, end_token, from_token
@@ -534,6 +415,33 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+    def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
+        """Gets details of the first event in a room at or after a stream ordering
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        def _f(txn):
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering >= ?"
+                " AND NOT outlier"
+                " ORDER BY stream_ordering"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (room_id, stream_ordering, ))
+            return txn.fetchone()
+
+        return self.runInteraction(
+            "get_room_event_after_stream_ordering", _f,
+        )
+
     @defer.inlineCallbacks
     def get_room_events_max_id(self, room_id=None):
         """Returns the current token for rooms stream.
@@ -542,7 +450,7 @@ class StreamStore(SQLBaseStore):
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self._stream_id_gen.get_current_token()
+        token = yield self.get_room_max_stream_ordering()
         if room_id is None:
             defer.returnValue("s%d" % (token,))
         else:
@@ -552,12 +460,6 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
-
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
         Args:
@@ -832,3 +734,93 @@ class StreamStore(SQLBaseStore):
 
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        # Tokens really represent positions between elements, but we use
+        # the convention of pointing to the event before the gap. Hence
+        # we have a bit of asymmetry when it comes to equalities.
+        args = [False, room_id]
+        if direction == 'b':
+            order = "DESC"
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+        else:
+            order = "ASC"
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+
+        filter_clause, filter_args = filter_to_clause(event_filter)
+
+        if filter_clause:
+            bounds += " AND " + filter_clause
+            args.extend(filter_args)
+
+        if int(limit) > 0:
+            args.append(int(limit))
+            limit_str = " LIMIT ?"
+        else:
+            limit_str = ""
+
+        sql = (
+            "SELECT * FROM events"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
+        ) % {
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
+
+        def f(txn):
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+
+            if rows:
+                topo = rows[-1]["topological_ordering"]
+                toke = rows[-1]["stream_ordering"]
+                if direction == 'b':
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
+                    toke -= 1
+                next_token = str(RoomStreamToken(topo, toke))
+            else:
+                # TODO (erikj): We should work out what to do here instead.
+                next_token = to_key if to_key else from_key
+
+            return rows, next_token,
+
+        rows, token = yield self.runInteraction("paginate_room_events", f)
+
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 94fa7cac98..a8dea15c1b 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -299,10 +299,6 @@ def preserve_fn(f):
     Useful for wrapping functions that return a deferred which you don't yield
     on.
     """
-    def reset_context(result):
-        LoggingContext.set_current_context(LoggingContext.sentinel)
-        return result
-
     def g(*args, **kwargs):
         current = LoggingContext.current_context()
         res = f(*args, **kwargs)
@@ -323,12 +319,11 @@ def preserve_fn(f):
             # which is supposed to have a single entry and exit point. But
             # by spawning off another deferred, we are effectively
             # adding a new exit point.)
-            res.addBoth(reset_context)
+            res.addBoth(_set_context_cb, LoggingContext.sentinel)
         return res
     return g
 
 
-@defer.inlineCallbacks
 def make_deferred_yieldable(deferred):
     """Given a deferred, make it follow the Synapse logcontext rules:
 
@@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred):
 
     (This is more-or-less the opposite operation to preserve_fn.)
     """
-    with PreserveLoggingContext():
-        r = yield deferred
-    defer.returnValue(r)
+    if isinstance(deferred, defer.Deferred) and not deferred.called:
+        prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
+        deferred.addBoth(_set_context_cb, prev_context)
+    return deferred
+
+
+def _set_context_cb(result, context):
+    """A callback function which just sets the logging context"""
+    LoggingContext.set_current_context(context)
+    return result
 
 
 # modules to ignore in `logcontext_tracer`