summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_member.py43
-rw-r--r--synapse/replication/slave/storage/events.py54
-rw-r--r--synapse/replication/slave/storage/room.py21
-rw-r--r--synapse/rest/client/v1/admin.py7
-rw-r--r--synapse/rest/client/v1/room.py19
-rw-r--r--synapse/rest/client/v2_alpha/register.py2
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/storage/__init__.py32
-rw-r--r--synapse/storage/event_push_actions.py149
-rw-r--r--synapse/storage/room.py239
-rw-r--r--synapse/storage/stream.py357
-rw-r--r--synapse/util/logcontext.py20
17 files changed, 480 insertions, 482 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
 from .room import (
     RoomCreationHandler, RoomContextHandler,
 )
-from .room_member import RoomMemberHandler
 from .message import MessageHandler
 from .federation import FederationHandler
 from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
         self.registration_handler = RegistrationHandler(hs)
         self.message_handler = MessageHandler(hs)
         self.room_creation_handler = RoomCreationHandler(hs)
-        self.room_member_handler = RoomMemberHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
                 # homeserver.
                 requester = synapse.types.create_requester(
                     target_user, is_guest=True)
-                handler = self.hs.get_handlers().room_member_handler
+                handler = self.hs.get_room_member_handler()
                 yield handler.update_membership(
                     requester,
                     target_user,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8832ba58bc..520612683e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2153,7 +2153,7 @@ class FederationHandler(BaseHandler):
                 raise e
 
             yield self._check_signature(event, context)
-            member_handler = self.hs.get_handlers().room_member_handler
+            member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2197,7 +2197,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
 
-        member_handler = self.hs.get_handlers().room_member_handler
+        member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..c9c2879038 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -233,7 +233,7 @@ class ProfileHandler(BaseHandler):
         )
 
         for room_id in room_ids:
-            handler = self.hs.get_handlers().room_member_handler
+            handler = self.hs.get_room_member_handler()
             try:
                 # Assume the target_user isn't a guest,
                 # because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6ab020bf41..6c425828c1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -165,7 +165,7 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
-        room_member_handler = self.hs.get_handlers().room_member_handler
+        room_member_handler = self.hs.get_room_member_handler()
 
         yield self._send_events_for_new_room(
             requester,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
             id_server = invite_3pid["id_server"]
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
-            yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+            yield self.hs.get_room_member_handler().do_3pid_invite(
                 room_id,
                 requester.user,
                 medium,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 37dc5e99ab..7ecdf87246 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,24 +30,32 @@ from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, RoomID
 from synapse.util.async import Linearizer
 from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
 
 
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
     # TODO(paul): This handler currently contains a messy conflation of
     #   low-level API that works on UserID objects and so on, and REST-level
     #   API that takes ID strings and returns pagination chunks. These concerns
     #   ought to be separated out a lot better.
 
     def __init__(self, hs):
-        super(RoomMemberHandler, self).__init__(hs)
-
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+        self.state_handler = hs.get_state_handler()
+        self.config = hs.config
+        self.simple_http_client = hs.get_simple_http_client()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.directory_handler = hs.get_handlers().directory_handler
+        self.registration_handler = hs.get_handlers().registration_handler
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_hander = hs.get_event_creation_handler()
+        self.replication_layer = hs.get_replication_layer()
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -138,7 +146,7 @@ class RoomMemberHandler(BaseHandler):
         # join dance for now, since we're kinda implicitly checking
         # that we are allowed to join when we decide whether or not we
         # need to do the invite/join dance.
-        yield self.hs.get_handlers().federation_handler.do_invite_join(
+        yield self.federation_handler.do_invite_join(
             remote_room_hosts,
             room_id,
             user.to_string(),
@@ -204,8 +212,7 @@ class RoomMemberHandler(BaseHandler):
         # if this is a join with a 3pid signature, we may need to turn a 3pid
         # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
-            replication = self.hs.get_replication_layer()
-            yield replication.exchange_third_party_invite(
+            yield self.replication_layer.exchange_third_party_invite(
                 third_party_signed["sender"],
                 target.to_string(),
                 room_id,
@@ -226,7 +233,7 @@ class RoomMemberHandler(BaseHandler):
                 requester.user,
             )
             if not is_requester_admin:
-                if self.hs.config.block_non_admin_invites:
+                if self.config.block_non_admin_invites:
                     logger.info(
                         "Blocking invite: user is not admin and non-admin "
                         "invites disabled"
@@ -321,7 +328,7 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    fed_handler = self.hs.get_handlers().federation_handler
+                    fed_handler = self.federation_handler
                     try:
                         ret = yield fed_handler.do_remotely_reject_invite(
                             remote_room_hosts,
@@ -477,7 +484,7 @@ class RoomMemberHandler(BaseHandler):
         Raises:
             SynapseError if room alias could not be found.
         """
-        directory_handler = self.hs.get_handlers().directory_handler
+        directory_handler = self.directory_handler
         mapping = yield directory_handler.get_association(room_alias)
 
         if not mapping:
@@ -508,7 +515,7 @@ class RoomMemberHandler(BaseHandler):
             requester,
             txn_id
     ):
-        if self.hs.config.block_non_admin_invites:
+        if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(
                 requester.user,
             )
@@ -555,7 +562,7 @@ class RoomMemberHandler(BaseHandler):
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
         try:
-            data = yield self.hs.get_simple_http_client().get_json(
+            data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
                 {
                     "medium": medium,
@@ -578,7 +585,7 @@ class RoomMemberHandler(BaseHandler):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.hs.get_simple_http_client().get_json(
+            key_data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/pubkey/%s" %
                 (id_server_scheme, server_hostname, key_name,),
             )
@@ -603,7 +610,7 @@ class RoomMemberHandler(BaseHandler):
             user,
             txn_id
     ):
-        room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+        room_state = yield self.state_handler.get_current_state(room_id)
 
         inviter_display_name = ""
         inviter_avatar_url = ""
@@ -727,15 +734,15 @@ class RoomMemberHandler(BaseHandler):
             "sender_avatar_url": inviter_avatar_url,
         }
 
-        if self.hs.config.invite_3pid_guest:
-            registration_handler = self.hs.get_handlers().registration_handler
+        if self.config.invite_3pid_guest:
+            registration_handler = self.registration_handler
             guest_access_token = yield registration_handler.guest_access_token_for(
                 medium=medium,
                 address=address,
                 inviter_user_id=inviter_user_id,
             )
 
-            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+            guest_user_info = yield self.auth.get_user_by_access_token(
                 guest_access_token
             )
 
@@ -744,7 +751,7 @@ class RoomMemberHandler(BaseHandler):
                 "guest_user_id": guest_user_info["user"].to_string(),
             })
 
-        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+        data = yield self.simple_http_client.post_urlencoded_get_json(
             is_url,
             invite_config
         )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index de0b26f437..ec634c1bf9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
-from synapse.storage.stream import StreamStore
+from synapse.storage.stream import StreamWorkerStore
 from synapse.storage.signatures import SignatureStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
@@ -41,34 +40,18 @@ logger = logging.getLogger(__name__)
 
 
 class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
-                       EventsWorkerStore, StateGroupWorkerStore,
+                       StreamWorkerStore, EventsWorkerStore, StateGroupWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedEventStore, self).__init__(db_conn, hs)
         self._stream_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering",
         )
         self._backfill_id_gen = SlavedIdTracker(
             db_conn, "events", "stream_ordering", step=-1
         )
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
 
-        self.stream_ordering_month_ago = 0
-        self._stream_order_on_start = self.get_room_max_stream_ordering()
+        super(SlavedEventStore, self).__init__(db_conn, hs)
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
@@ -76,30 +59,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
         "get_latest_event_ids_in_room"
     ]
 
-    get_recent_event_ids_for_room = (
-        StreamStore.__dict__["get_recent_event_ids_for_room"]
-    )
-    has_room_changed_since = DataStore.has_room_changed_since.__func__
-
-    get_membership_changes_for_user = (
-        DataStore.get_membership_changes_for_user.__func__
-    )
-    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
-    get_room_events_stream_for_room = (
-        DataStore.get_room_events_stream_for_room.__func__
-    )
-    get_events_around = DataStore.get_events_around.__func__
-
-    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
-    get_room_events_stream_for_rooms = (
-        DataStore.get_room_events_stream_for_rooms.__func__
-    )
-    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
-
-    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
-
-    _get_events_around_txn = DataStore._get_events_around_txn.__func__
-
     get_backfill_events = DataStore.get_backfill_events.__func__
     _get_backfill_events = DataStore._get_backfill_events.__func__
     get_missing_events = DataStore.get_missing_events.__func__
@@ -120,8 +79,11 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
 
     get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
 
-    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
-    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
 
     get_latest_event_ids_and_hashes_in_room = (
         DataStore.get_latest_event_ids_and_hashes_in_room.__func__
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index f510384033..5ae1670157 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -14,32 +14,19 @@
 # limitations under the License.
 
 from ._base import BaseSlavedStore
-from synapse.storage import DataStore
-from synapse.storage.room import RoomStore
+from synapse.storage.room import RoomWorkerStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
-class RoomStore(BaseSlavedStore):
+class RoomStore(RoomWorkerStore, BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(RoomStore, self).__init__(db_conn, hs)
         self._public_room_id_gen = SlavedIdTracker(
             db_conn, "public_room_list_stream", "stream_id"
         )
 
-    get_public_room_ids = DataStore.get_public_room_ids.__func__
-    get_current_public_room_stream_id = (
-        DataStore.get_current_public_room_stream_id.__func__
-    )
-    get_public_room_ids_at_stream_id = (
-        RoomStore.__dict__["get_public_room_ids_at_stream_id"]
-    )
-    get_public_room_ids_at_stream_id_txn = (
-        DataStore.get_public_room_ids_at_stream_id_txn.__func__
-    )
-    get_published_at_stream_id_txn = (
-        DataStore.get_published_at_stream_id_txn.__func__
-    )
-    get_public_room_changes = DataStore.get_public_room_changes.__func__
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(RoomStore, self).stream_positions()
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6073cc6fa2..3917eee42d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -180,6 +180,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
         self.state = hs.get_state_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id):
@@ -238,7 +239,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
             logger.info("Kicking %r from %r...", user_id, room_id)
 
             target_requester = create_requester(user_id)
-            yield self.handlers.room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester=target_requester,
                 target=target_requester.user,
                 room_id=room_id,
@@ -247,9 +248,9 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
                 ratelimit=False
             )
 
-            yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
+            yield self.room_member_handler.forget(target_requester.user, room_id)
 
-            yield self.handlers.room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester=target_requester,
                 target=target_requester.user,
                 room_id=new_room_id,
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 817fd47842..9d745174c7 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -84,6 +84,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         super(RoomStateEventRestServlet, self).__init__(hs)
         self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /room/$roomid/state/$eventtype
@@ -156,7 +157,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
 
         if event_type == EventTypes.Member:
             membership = content.get("membership", None)
-            event = yield self.handlers.room_member_handler.update_membership(
+            event = yield self.room_member_handler.update_membership(
                 requester,
                 target=UserID.from_string(state_key),
                 room_id=room_id,
@@ -229,7 +230,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 class JoinRoomAliasServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(JoinRoomAliasServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /join/$room_identifier[/$txn_id]
@@ -257,7 +258,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
             except Exception:
                 remote_room_hosts = None
         elif RoomAlias.is_valid(room_identifier):
-            handler = self.handlers.room_member_handler
+            handler = self.room_member_handler
             room_alias = RoomAlias.from_string(room_identifier)
             room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias)
             room_id = room_id.to_string()
@@ -266,7 +267,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
                 room_identifier,
             ))
 
-        yield self.handlers.room_member_handler.update_membership(
+        yield self.room_member_handler.update_membership(
             requester=requester,
             target=requester.user,
             room_id=room_id,
@@ -562,7 +563,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
 class RoomForgetRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(RoomForgetRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         PATTERNS = ("/rooms/(?P<room_id>[^/]*)/forget")
@@ -575,7 +576,7 @@ class RoomForgetRestServlet(ClientV1RestServlet):
             allow_guest=False,
         )
 
-        yield self.handlers.room_member_handler.forget(
+        yield self.room_member_handler.forget(
             user=requester.user,
             room_id=room_id,
         )
@@ -593,7 +594,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomMembershipRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
+        self.room_member_handler = hs.get_room_member_handler()
 
     def register(self, http_server):
         # /rooms/$roomid/[invite|join|leave]
@@ -622,7 +623,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
             content = {}
 
         if membership_action == "invite" and self._has_3pid_invite_keys(content):
-            yield self.handlers.room_member_handler.do_3pid_invite(
+            yield self.room_member_handler.do_3pid_invite(
                 room_id,
                 requester.user,
                 content["medium"],
@@ -644,7 +645,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
         if 'reason' in content and membership_action in ['kick', 'ban']:
             event_content = {'reason': content['reason']}
 
-        yield self.handlers.room_member_handler.update_membership(
+        yield self.room_member_handler.update_membership(
             requester=requester,
             target=target,
             room_id=room_id,
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c6f4680a76..0ba62bddc1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -183,7 +183,7 @@ class RegisterRestServlet(RestServlet):
         self.auth_handler = hs.get_auth_handler()
         self.registration_handler = hs.get_handlers().registration_handler
         self.identity_handler = hs.get_handlers().identity_handler
-        self.room_member_handler = hs.get_handlers().room_member_handler
+        self.room_member_handler = hs.get_room_member_handler()
         self.device_handler = hs.get_device_handler()
         self.macaroon_gen = hs.get_macaroon_generator()
 
diff --git a/synapse/server.py b/synapse/server.py
index fbd602d40e..5b6effbe31 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -45,6 +45,7 @@ from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.room_member import RoomMemberHandler
 from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
@@ -145,6 +146,7 @@ class HomeServer(object):
         'groups_attestation_signing',
         'groups_attestation_renewer',
         'spam_checker',
+        'room_member_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -382,6 +384,9 @@ class HomeServer(object):
     def build_spam_checker(self):
         return SpamChecker(self)
 
+    def build_room_member_handler(self):
+        return RoomMemberHandler(self)
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0f136f8a06..de00cae447 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -141,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
         else:
             self._cache_id_gen = None
 
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
-
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
@@ -204,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
+        events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
             db_conn, "current_state_delta_stream",
             entity_column="room_id",
@@ -228,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=_group_updates_prefill,
         )
 
-        cur = LoggingTransaction(
-            db_conn.cursor(),
-            name="_find_stream_orderings_for_times_txn",
-            database_engine=self.database_engine,
-            after_callbacks=[],
-            final_callbacks=[],
-        )
-        self._find_stream_orderings_for_times_txn(cur)
-        cur.close()
-
-        self.find_stream_orderings_looping_call = self._clock.looping_call(
-            self._find_stream_orderings_for_times, 10 * 60 * 1000
-        )
-
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6454045c2d..7164293568 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
 from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
 
 
 class EventPushActionsWorkerStore(SQLBaseStore):
+    def __init__(self, db_conn, hs):
+        super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+        # These get correctly set by _find_stream_orderings_for_times_txn
+        self.stream_ordering_month_ago = None
+        self.stream_ordering_day_ago = None
+
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[],
+            final_callbacks=[],
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 10 * 60 * 1000
+        )
+
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
@@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             desc="remove_push_actions_from_staging",
         )
 
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
+        )
+        logger.info("Searching for stream ordering 1 day ago")
+        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 day ago: it's %d",
+            self.stream_ordering_day_ago
+        )
+
+    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+        """
+        Find the stream_ordering of the first event that was received after
+        a given timestamp. This is relatively slow as there is no index on
+        received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        range_start = 0
+        range_end = max_stream_ordering
+
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering > ?"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 1:
+            middle = int((range_end + range_start) / 2)
+            txn.execute(sql, (middle,))
+            middle_ts = txn.fetchone()[0]
+            if ts > middle_ts:
+                range_start = middle
+            else:
+                range_end = middle
+
+        return range_end
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -651,69 +735,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         """, (room_id, user_id, stream_ordering))
 
     @defer.inlineCallbacks
-    def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
-            "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
-        )
-
-    def _find_stream_orderings_for_times_txn(self, txn):
-        logger.info("Searching for stream ordering 1 month ago")
-        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 month ago: it's %d",
-            self.stream_ordering_month_ago
-        )
-        logger.info("Searching for stream ordering 1 day ago")
-        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 day ago: it's %d",
-            self.stream_ordering_day_ago
-        )
-
-    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
-        """
-        Find the stream_ordering of the first event that was received after
-        a given timestamp. This is relatively slow as there is no index on
-        received_ts but we can then use this to delete push actions before
-        this.
-
-        received_ts must necessarily be in the same order as stream_ordering
-        and stream_ordering is indexed, so we manually binary search using
-        stream_ordering
-        """
-        txn.execute("SELECT MAX(stream_ordering) FROM events")
-        max_stream_ordering = txn.fetchone()[0]
-
-        if max_stream_ordering is None:
-            return 0
-
-        range_start = 0
-        range_end = max_stream_ordering
-
-        sql = (
-            "SELECT received_ts FROM events"
-            " WHERE stream_ordering > ?"
-            " ORDER BY stream_ordering"
-            " LIMIT 1"
-        )
-
-        while range_end - range_start > 1:
-            middle = int((range_end + range_start) / 2)
-            txn.execute(sql, (middle,))
-            middle_ts = txn.fetchone()[0]
-            if ts > middle_ts:
-                range_start = middle
-            else:
-                range_end = middle
-
-        return range_end
-
-    @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
             return
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fff6652e05..7f2c08d7a6 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
@@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple(
 )
 
 
-class RoomStore(SearchStore):
+class RoomWorkerStore(SQLBaseStore):
+    def get_public_room_ids(self):
+        return self._simple_select_onecol(
+            table="rooms",
+            keyvalues={
+                "is_public": True,
+            },
+            retcol="room_id",
+            desc="get_public_room_ids",
+        )
+
+    @cached(num_args=2, max_entries=100)
+    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+        """Get pulbic rooms for a particular list, or across all lists.
+
+        Args:
+            stream_id (int)
+            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+                means the main list, None means all lsits.
+        """
+        return self.runInteraction(
+            "get_public_room_ids_at_stream_id",
+            self.get_public_room_ids_at_stream_id_txn,
+            stream_id, network_tuple=network_tuple
+        )
+
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+                                             network_tuple):
+        return {
+            rm
+            for rm, vis in self.get_published_at_stream_id_txn(
+                txn, stream_id, network_tuple=network_tuple
+            ).items()
+            if vis
+        }
+
+    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+        if network_tuple:
+            # We want to get from a particular list. No aggregation required.
+
+            sql = ("""
+                SELECT room_id, visibility FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT room_id, max(stream_id) AS stream_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ? %s
+                    GROUP BY room_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            if network_tuple.appservice_id is not None:
+                txn.execute(
+                    sql % ("AND appservice_id = ? AND network_id = ?",),
+                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+                )
+            else:
+                txn.execute(
+                    sql % ("AND appservice_id IS NULL",),
+                    (stream_id,)
+                )
+            return dict(txn)
+        else:
+            # We want to get from all lists, so we need to aggregate the results
+
+            logger.info("Executing full list")
+
+            sql = ("""
+                SELECT room_id, visibility
+                FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT
+                        room_id, max(stream_id) AS stream_id, appservice_id,
+                        network_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ?
+                    GROUP BY room_id, appservice_id, network_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            txn.execute(
+                sql,
+                (stream_id,)
+            )
+
+            results = {}
+            # A room is visible if its visible on any list.
+            for room_id, visibility in txn:
+                results[room_id] = bool(visibility) or results.get(room_id, False)
+
+            return results
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id,
+                                network_tuple):
+        def get_public_room_changes_txn(txn):
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(
+                txn, prev_stream_id, network_tuple
+            )
+
+            now_rooms_dict = self.get_published_at_stream_id_txn(
+                txn, new_stream_id, network_tuple
+            )
+
+            now_rooms_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if vis
+            )
+            now_rooms_not_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if not vis
+            )
+
+            newly_visible = now_rooms_visible - then_rooms
+            newly_unpublished = now_rooms_not_visible & then_rooms
+
+            return newly_visible, newly_unpublished
+
+        return self.runInteraction(
+            "get_public_room_changes", get_public_room_changes_txn
+        )
+
+
+class RoomStore(RoomWorkerStore, SearchStore):
 
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
@@ -225,16 +345,6 @@ class RoomStore(SearchStore):
             )
         self.hs.get_notifier().on_new_replication_data()
 
-    def get_public_room_ids(self):
-        return self._simple_select_onecol(
-            table="rooms",
-            keyvalues={
-                "is_public": True,
-            },
-            retcol="room_id",
-            desc="get_public_room_ids",
-        )
-
     def get_room_count(self):
         """Retrieve a list of all rooms
         """
@@ -326,113 +436,6 @@ class RoomStore(SearchStore):
     def get_current_public_room_stream_id(self):
         return self._public_room_id_gen.get_current_token()
 
-    @cached(num_args=2, max_entries=100)
-    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
-        """Get pulbic rooms for a particular list, or across all lists.
-
-        Args:
-            stream_id (int)
-            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
-                means the main list, None means all lsits.
-        """
-        return self.runInteraction(
-            "get_public_room_ids_at_stream_id",
-            self.get_public_room_ids_at_stream_id_txn,
-            stream_id, network_tuple=network_tuple
-        )
-
-    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
-                                             network_tuple):
-        return {
-            rm
-            for rm, vis in self.get_published_at_stream_id_txn(
-                txn, stream_id, network_tuple=network_tuple
-            ).items()
-            if vis
-        }
-
-    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
-        if network_tuple:
-            # We want to get from a particular list. No aggregation required.
-
-            sql = ("""
-                SELECT room_id, visibility FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT room_id, max(stream_id) AS stream_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ? %s
-                    GROUP BY room_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            if network_tuple.appservice_id is not None:
-                txn.execute(
-                    sql % ("AND appservice_id = ? AND network_id = ?",),
-                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
-                )
-            else:
-                txn.execute(
-                    sql % ("AND appservice_id IS NULL",),
-                    (stream_id,)
-                )
-            return dict(txn)
-        else:
-            # We want to get from all lists, so we need to aggregate the results
-
-            logger.info("Executing full list")
-
-            sql = ("""
-                SELECT room_id, visibility
-                FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT
-                        room_id, max(stream_id) AS stream_id, appservice_id,
-                        network_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ?
-                    GROUP BY room_id, appservice_id, network_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            txn.execute(
-                sql,
-                (stream_id,)
-            )
-
-            results = {}
-            # A room is visible if its visible on any list.
-            for room_id, visibility in txn:
-                results[room_id] = bool(visibility) or results.get(room_id, False)
-
-            return results
-
-    def get_public_room_changes(self, prev_stream_id, new_stream_id,
-                                network_tuple):
-        def get_public_room_changes_txn(txn):
-            then_rooms = self.get_public_room_ids_at_stream_id_txn(
-                txn, prev_stream_id, network_tuple
-            )
-
-            now_rooms_dict = self.get_published_at_stream_id_txn(
-                txn, new_stream_id, network_tuple
-            )
-
-            now_rooms_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if vis
-            )
-            now_rooms_not_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if not vis
-            )
-
-            newly_visible = now_rooms_visible - then_rooms
-            newly_unpublished = now_rooms_not_visible & then_rooms
-
-            return newly_visible, newly_unpublished
-
-        return self.runInteraction(
-            "get_public_room_changes", get_public_room_changes_txn
-        )
-
     def get_all_new_public_rooms(self, prev_id, current_id, limit):
         def get_all_new_public_rooms(txn):
             sql = ("""
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..a2527d2a36 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,17 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
 from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
+import abc
 import logging
 
 
@@ -143,81 +147,41 @@ def filter_to_clause(event_filter):
     return " AND ".join(clauses), args
 
 
-class StreamStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
-        # NB this lives here instead of appservice.py so we can reuse the
-        # 'private' StreamToken class in this file.
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+    """This is an abstract base class where subclasses must implement
+    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
+    which can be called in the initializer.
+    """
 
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
+    __metaclass__ = abc.ABCMeta
 
-        if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
+    def __init__(self, db_conn, hs):
+        super(StreamWorkerStore, self).__init__(db_conn, hs)
 
-        # select all the events between from/to with a sensible limit
-        sql = (
-            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e "
-            "LEFT JOIN state_events as s ON "
-            "e.event_id = s.event_id "
-            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "limit": limit
-        }
-
-        def f(txn):
-            # pull out all the events between the tokens
-            txn.execute(sql, (from_id.stream, to_id.stream,))
-            rows = self.cursor_to_dict(txn)
-
-            # Logic:
-            #  - We want ALL events which match the AS room_id regex
-            #  - We want ALL events which match the rooms represented by the AS
-            #    room_alias regex
-            #  - We want ALL events for rooms that AS users have joined.
-            # This is currently supported via get_app_service_rooms (which is
-            # used for the Notifier listener rooms). We can't reasonably make a
-            # SQL query for these room IDs, so we'll pull all the events between
-            # from/to and filter in python.
-            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
-            room_ids_for_as = [r.room_id for r in rooms_for_as]
-
-            def app_service_interested(row):
-                if row["room_id"] in room_ids_for_as:
-                    return True
-
-                if row["type"] == EventTypes.Member:
-                    if service.is_interested_in_user(row.get("state_key")):
-                        return True
-                return False
-
-            return [r for r in rows if app_service_interested(r)]
-
-        rows = yield self.runInteraction("get_appservice_room_stream", f)
-
-        ret = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
+        events_max = self.get_room_max_stream_ordering()
+        event_cache_prefill, min_event_val = self._get_cache_dict(
+            db_conn, "events",
+            entity_column="room_id",
+            stream_column="stream_ordering",
+            max_value=events_max,
+        )
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", min_event_val,
+            prefilled_cache=event_cache_prefill,
+        )
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
         )
 
-        self._set_before_and_after(ret, rows, topo_order=from_id is None)
+        self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-        if rows:
-            key = "s%d" % max(r["stream_ordering"] for r in rows)
-        else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = to_key
+    @abc.abstractmethod
+    def get_room_max_stream_ordering(self):
+        raise NotImplementedError()
 
-        defer.returnValue((ret, key))
+    @abc.abstractmethod
+    def get_room_min_stream_ordering(self):
+        raise NotImplementedError()
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +345,6 @@ class StreamStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
-        # Tokens really represent positions between elements, but we use
-        # the convention of pointing to the event before the gap. Hence
-        # we have a bit of asymmetry when it comes to equalities.
-        args = [False, room_id]
-        if direction == 'b':
-            order = "DESC"
-            bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-        else:
-            order = "ASC"
-            bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-
-        filter_clause, filter_args = filter_to_clause(event_filter)
-
-        if filter_clause:
-            bounds += " AND " + filter_clause
-            args.extend(filter_args)
-
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
-
-        sql = (
-            "SELECT * FROM events"
-            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
-            " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
-        ) % {
-            "bounds": bounds,
-            "order": order,
-            "limit": limit_str
-        }
-
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
-
-            return rows, next_token,
-
-        rows, token = yield self.runInteraction("paginate_room_events", f)
-
-        events = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-
-        self._set_before_and_after(events, rows)
-
-        defer.returnValue((events, token))
-
-    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
         rows, token = yield self.get_recent_event_ids_for_room(
             room_id, limit, end_token, from_token
@@ -542,7 +424,7 @@ class StreamStore(SQLBaseStore):
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self._stream_id_gen.get_current_token()
+        token = yield self.get_room_max_stream_ordering()
         if room_id is None:
             defer.returnValue("s%d" % (token,))
         else:
@@ -552,12 +434,6 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
-
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
         Args:
@@ -832,3 +708,168 @@ class StreamStore(SQLBaseStore):
 
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
+
+    @defer.inlineCallbacks
+    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+        # NB this lives here instead of appservice.py so we can reuse the
+        # 'private' StreamToken class in this file.
+        if limit:
+            limit = max(limit, MAX_STREAM_SIZE)
+        else:
+            limit = MAX_STREAM_SIZE
+
+        # From and to keys should be integers from ordering.
+        from_id = RoomStreamToken.parse_stream_token(from_key)
+        to_id = RoomStreamToken.parse_stream_token(to_key)
+
+        if from_key == to_key:
+            defer.returnValue(([], to_key))
+            return
+
+        # select all the events between from/to with a sensible limit
+        sql = (
+            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+            "e.stream_ordering FROM events AS e "
+            "LEFT JOIN state_events as s ON "
+            "e.event_id = s.event_id "
+            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+        ) % {
+            "limit": limit
+        }
+
+        def f(txn):
+            # pull out all the events between the tokens
+            txn.execute(sql, (from_id.stream, to_id.stream,))
+            rows = self.cursor_to_dict(txn)
+
+            # Logic:
+            #  - We want ALL events which match the AS room_id regex
+            #  - We want ALL events which match the rooms represented by the AS
+            #    room_alias regex
+            #  - We want ALL events for rooms that AS users have joined.
+            # This is currently supported via get_app_service_rooms (which is
+            # used for the Notifier listener rooms). We can't reasonably make a
+            # SQL query for these room IDs, so we'll pull all the events between
+            # from/to and filter in python.
+            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+            room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+            def app_service_interested(row):
+                if row["room_id"] in room_ids_for_as:
+                    return True
+
+                if row["type"] == EventTypes.Member:
+                    if service.is_interested_in_user(row.get("state_key")):
+                        return True
+                return False
+
+            return [r for r in rows if app_service_interested(r)]
+
+        rows = yield self.runInteraction("get_appservice_room_stream", f)
+
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(ret, rows, topo_order=from_id is None)
+
+        if rows:
+            key = "s%d" % max(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = to_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        # Tokens really represent positions between elements, but we use
+        # the convention of pointing to the event before the gap. Hence
+        # we have a bit of asymmetry when it comes to equalities.
+        args = [False, room_id]
+        if direction == 'b':
+            order = "DESC"
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+        else:
+            order = "ASC"
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+
+        filter_clause, filter_args = filter_to_clause(event_filter)
+
+        if filter_clause:
+            bounds += " AND " + filter_clause
+            args.extend(filter_args)
+
+        if int(limit) > 0:
+            args.append(int(limit))
+            limit_str = " LIMIT ?"
+        else:
+            limit_str = ""
+
+        sql = (
+            "SELECT * FROM events"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
+        ) % {
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
+
+        def f(txn):
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+
+            if rows:
+                topo = rows[-1]["topological_ordering"]
+                toke = rows[-1]["stream_ordering"]
+                if direction == 'b':
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
+                    toke -= 1
+                next_token = str(RoomStreamToken(topo, toke))
+            else:
+                # TODO (erikj): We should work out what to do here instead.
+                next_token = to_key if to_key else from_key
+
+            return rows, next_token,
+
+        rows, token = yield self.runInteraction("paginate_room_events", f)
+
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 94fa7cac98..a8dea15c1b 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -299,10 +299,6 @@ def preserve_fn(f):
     Useful for wrapping functions that return a deferred which you don't yield
     on.
     """
-    def reset_context(result):
-        LoggingContext.set_current_context(LoggingContext.sentinel)
-        return result
-
     def g(*args, **kwargs):
         current = LoggingContext.current_context()
         res = f(*args, **kwargs)
@@ -323,12 +319,11 @@ def preserve_fn(f):
             # which is supposed to have a single entry and exit point. But
             # by spawning off another deferred, we are effectively
             # adding a new exit point.)
-            res.addBoth(reset_context)
+            res.addBoth(_set_context_cb, LoggingContext.sentinel)
         return res
     return g
 
 
-@defer.inlineCallbacks
 def make_deferred_yieldable(deferred):
     """Given a deferred, make it follow the Synapse logcontext rules:
 
@@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred):
 
     (This is more-or-less the opposite operation to preserve_fn.)
     """
-    with PreserveLoggingContext():
-        r = yield deferred
-    defer.returnValue(r)
+    if isinstance(deferred, defer.Deferred) and not deferred.called:
+        prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
+        deferred.addBoth(_set_context_cb, prev_context)
+    return deferred
+
+
+def _set_context_cb(result, context):
+    """A callback function which just sets the logging context"""
+    LoggingContext.set_current_context(context)
+    return result
 
 
 # modules to ignore in `logcontext_tracer`