summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py32
-rw-r--r--synapse/storage/_base.py26
-rw-r--r--synapse/storage/appservice.py82
-rw-r--r--synapse/storage/directory.py50
-rw-r--r--synapse/storage/event_federation.py264
-rw-r--r--synapse/storage/event_push_actions.py208
-rw-r--r--synapse/storage/events.py11
-rw-r--r--synapse/storage/profile.py50
-rw-r--r--synapse/storage/registration.py118
-rw-r--r--synapse/storage/room.py269
-rw-r--r--synapse/storage/roommember.py27
-rw-r--r--synapse/storage/signatures.py12
-rw-r--r--synapse/storage/state.py34
-rw-r--r--synapse/storage/stream.py310
14 files changed, 769 insertions, 724 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0f136f8a06..de00cae447 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -141,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
         else:
             self._cache_id_gen = None
 
-        events_max = self._stream_id_gen.get_current_token()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
-            db_conn, "events",
-            entity_column="room_id",
-            stream_column="stream_ordering",
-            max_value=events_max,
-        )
-        self._events_stream_cache = StreamChangeCache(
-            "EventsRoomStreamChangeCache", min_event_val,
-            prefilled_cache=event_cache_prefill,
-        )
-
-        self._membership_stream_cache = StreamChangeCache(
-            "MembershipStreamChangeCache", events_max,
-        )
-
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
@@ -204,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
+        events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
             db_conn, "current_state_delta_stream",
             entity_column="room_id",
@@ -228,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=_group_updates_prefill,
         )
 
-        cur = LoggingTransaction(
-            db_conn.cursor(),
-            name="_find_stream_orderings_for_times_txn",
-            database_engine=self.database_engine,
-            after_callbacks=[],
-            final_callbacks=[],
-        )
-        self._find_stream_orderings_for_times_txn(cur)
-        cur.close()
-
-        self.find_stream_orderings_looping_call = self._clock.looping_call(
-            self._find_stream_orderings_for_times, 10 * 60 * 1000
-        )
-
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 68125006eb..2fbebd4907 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -48,16 +48,16 @@ class LoggingTransaction(object):
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
     __slots__ = [
-        "txn", "name", "database_engine", "after_callbacks", "final_callbacks",
+        "txn", "name", "database_engine", "after_callbacks", "exception_callbacks",
     ]
 
     def __init__(self, txn, name, database_engine, after_callbacks,
-                 final_callbacks):
+                 exception_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
-        object.__setattr__(self, "final_callbacks", final_callbacks)
+        object.__setattr__(self, "exception_callbacks", exception_callbacks)
 
     def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
@@ -66,8 +66,8 @@ class LoggingTransaction(object):
         """
         self.after_callbacks.append((callback, args, kwargs))
 
-    def call_finally(self, callback, *args, **kwargs):
-        self.final_callbacks.append((callback, args, kwargs))
+    def call_on_exception(self, callback, *args, **kwargs):
+        self.exception_callbacks.append((callback, args, kwargs))
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -215,7 +215,7 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
+    def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
                          logging_context, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
@@ -236,7 +236,7 @@ class SQLBaseStore(object):
                     txn = conn.cursor()
                     txn = LoggingTransaction(
                         txn, name, self.database_engine, after_callbacks,
-                        final_callbacks,
+                        exception_callbacks,
                     )
                     r = func(txn, *args, **kwargs)
                     conn.commit()
@@ -308,11 +308,11 @@ class SQLBaseStore(object):
         current_context = LoggingContext.current_context()
 
         after_callbacks = []
-        final_callbacks = []
+        exception_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
             return self._new_transaction(
-                conn, desc, after_callbacks, final_callbacks, current_context,
+                conn, desc, after_callbacks, exception_callbacks, current_context,
                 func, *args, **kwargs
             )
 
@@ -321,9 +321,10 @@ class SQLBaseStore(object):
 
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
-        finally:
-            for after_callback, after_args, after_kwargs in final_callbacks:
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            for after_callback, after_args, after_kwargs in exception_callbacks:
                 after_callback(*after_args, **after_kwargs)
+            raise
 
         defer.returnValue(result)
 
@@ -1000,7 +1001,8 @@ class SQLBaseStore(object):
             # __exit__ called after the transaction finishes.
             ctx = self._cache_id_gen.get_next()
             stream_id = ctx.__enter__()
-            txn.call_finally(ctx.__exit__, None, None, None)
+            txn.call_on_exception(ctx.__exit__, None, None, None)
+            txn.call_after(ctx.__exit__, None, None, None)
             txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
             self._simple_insert_txn(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 90fb51d43c..12ea8a158c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -18,11 +18,9 @@ import re
 import simplejson as json
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
 from synapse.storage.events import EventsWorkerStore
-from synapse.storage.roommember import RoomsForUser
 from ._base import SQLBaseStore
 
 
@@ -115,81 +113,11 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
 
 
 class ApplicationServiceStore(ApplicationServiceWorkerStore):
-
-    def __init__(self, db_conn, hs):
-        super(ApplicationServiceStore, self).__init__(db_conn, hs)
-        self.hostname = hs.hostname
-
-    def get_app_service_rooms(self, service):
-        """Get a list of RoomsForUser for this application service.
-
-        Application services may be "interested" in lots of rooms depending on
-        the room ID, the room aliases, or the members in the room. This function
-        takes all of these into account and returns a list of RoomsForUser which
-        represent the entire list of room IDs that this application service
-        wants to know about.
-
-        Args:
-            service: The application service to get a room list for.
-        Returns:
-            A list of RoomsForUser.
-        """
-        return self.runInteraction(
-            "get_app_service_rooms",
-            self._get_app_service_rooms_txn,
-            service,
-        )
-
-    def _get_app_service_rooms_txn(self, txn, service):
-        # get all rooms matching the room ID regex.
-        room_entries = self._simple_select_list_txn(
-            txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
-        )
-        matching_room_list = set([
-            r["room_id"] for r in room_entries if
-            service.is_interested_in_room(r["room_id"])
-        ])
-
-        # resolve room IDs for matching room alias regex.
-        room_alias_mappings = self._simple_select_list_txn(
-            txn=txn, table="room_aliases", keyvalues=None,
-            retcols=["room_id", "room_alias"]
-        )
-        matching_room_list |= set([
-            r["room_id"] for r in room_alias_mappings if
-            service.is_interested_in_alias(r["room_alias"])
-        ])
-
-        # get all rooms for every user for this AS. This is scoped to users on
-        # this HS only.
-        user_list = self._simple_select_list_txn(
-            txn=txn, table="users", keyvalues=None, retcols=["name"]
-        )
-        user_list = [
-            u["name"] for u in user_list if
-            service.is_interested_in_user(u["name"])
-        ]
-        rooms_for_user_matching_user_id = set()  # RoomsForUser list
-        for user_id in user_list:
-            # FIXME: This assumes this store is linked with RoomMemberStore :(
-            rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
-                txn=txn,
-                user_id=user_id,
-                membership_list=[Membership.JOIN]
-            )
-            rooms_for_user_matching_user_id |= set(rooms_for_user)
-
-        # make RoomsForUser tuples for room ids and aliases which are not in the
-        # main rooms_for_user_list - e.g. they are rooms which do not have AS
-        # registered users in it.
-        known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
-        missing_rooms_for_user = [
-            RoomsForUser(r, service.sender, "join") for r in
-            matching_room_list if r not in known_room_ids
-        ]
-        rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
-
-        return rooms_for_user_matching_user_id
+    # This is currently empty due to there not being any AS storage functions
+    # that can't be run on the workers. Since this may change in future, and
+    # to keep consistency with the other stores, we keep this empty class for
+    # now.
+    pass
 
 
 class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 79e7c540ad..d0c0059757 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -29,8 +29,7 @@ RoomAliasMapping = namedtuple(
 )
 
 
-class DirectoryStore(SQLBaseStore):
-
+class DirectoryWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
         """ Get's the room_id and server list for a given room_alias
@@ -69,6 +68,28 @@ class DirectoryStore(SQLBaseStore):
             RoomAliasMapping(room_id, room_alias.to_string(), servers)
         )
 
+    def get_room_alias_creator(self, room_alias):
+        return self._simple_select_one_onecol(
+            table="room_aliases",
+            keyvalues={
+                "room_alias": room_alias,
+            },
+            retcol="creator",
+            desc="get_room_alias_creator",
+            allow_none=True
+        )
+
+    @cached(max_entries=5000)
+    def get_aliases_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_aliases",
+            {"room_id": room_id},
+            "room_alias",
+            desc="get_aliases_for_room",
+        )
+
+
+class DirectoryStore(DirectoryWorkerStore):
     @defer.inlineCallbacks
     def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
         """ Creates an associatin between  a room alias and room_id/servers
@@ -116,17 +137,6 @@ class DirectoryStore(SQLBaseStore):
             )
         defer.returnValue(ret)
 
-    def get_room_alias_creator(self, room_alias):
-        return self._simple_select_one_onecol(
-            table="room_aliases",
-            keyvalues={
-                "room_alias": room_alias,
-            },
-            retcol="creator",
-            desc="get_room_alias_creator",
-            allow_none=True
-        )
-
     @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
         room_id = yield self.runInteraction(
@@ -135,7 +145,6 @@ class DirectoryStore(SQLBaseStore):
             room_alias,
         )
 
-        self.get_aliases_for_room.invalidate((room_id,))
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
@@ -160,17 +169,12 @@ class DirectoryStore(SQLBaseStore):
             (room_alias.to_string(),)
         )
 
-        return room_id
-
-    @cached(max_entries=5000)
-    def get_aliases_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_aliases",
-            {"room_id": room_id},
-            "room_alias",
-            desc="get_aliases_for_room",
+        self._invalidate_cache_and_stream(
+            txn, self.get_aliases_for_room, (room_id,)
         )
 
+        return room_id
+
     def update_aliases_for_room(self, old_room_id, new_room_id, creator):
         def _update_aliases_for_room_txn(txn):
             sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 55a05c59d5..00ee82d300 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -15,7 +15,10 @@
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
+
 from synapse.api.errors import StoreError
 from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
@@ -27,30 +30,8 @@ from Queue import PriorityQueue, Empty
 logger = logging.getLogger(__name__)
 
 
-class EventFederationStore(SQLBaseStore):
-    """ Responsible for storing and serving up the various graphs associated
-    with an event. Including the main event graph and the auth chains for an
-    event.
-
-    Also has methods for getting the front (latest) and back (oldest) edges
-    of the event graphs. These are used to generate the parents for new events
-    and backfilling from another server respectively.
-    """
-
-    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
-
-    def __init__(self, db_conn, hs):
-        super(EventFederationStore, self).__init__(db_conn, hs)
-
-        self.register_background_update_handler(
-            self.EVENT_AUTH_STATE_ONLY,
-            self._background_delete_non_state_event_auth,
-        )
-
-        hs.get_clock().looping_call(
-            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
-        )
-
+class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
+                                 SQLBaseStore):
     def get_auth_chain(self, event_ids, include_given=False):
         """Get auth events for given event_ids. The events *must* be state events.
 
@@ -228,88 +209,6 @@ class EventFederationStore(SQLBaseStore):
 
         return int(min_depth) if min_depth is not None else None
 
-    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
-        min_depth = self._get_min_depth_interaction(txn, room_id)
-
-        if min_depth and depth >= min_depth:
-            return
-
-        self._simple_upsert_txn(
-            txn,
-            table="room_depth",
-            keyvalues={
-                "room_id": room_id,
-            },
-            values={
-                "min_depth": depth,
-            },
-        )
-
-    def _handle_mult_prev_events(self, txn, events):
-        """
-        For the given event, update the event edges table and forward and
-        backward extremities tables.
-        """
-        self._simple_insert_many_txn(
-            txn,
-            table="event_edges",
-            values=[
-                {
-                    "event_id": ev.event_id,
-                    "prev_event_id": e_id,
-                    "room_id": ev.room_id,
-                    "is_state": False,
-                }
-                for ev in events
-                for e_id, _ in ev.prev_events
-            ],
-        )
-
-        self._update_backward_extremeties(txn, events)
-
-    def _update_backward_extremeties(self, txn, events):
-        """Updates the event_backward_extremities tables based on the new/updated
-        events being persisted.
-
-        This is called for new events *and* for events that were outliers, but
-        are now being persisted as non-outliers.
-
-        Forward extremities are handled when we first start persisting the events.
-        """
-        events_by_room = {}
-        for ev in events:
-            events_by_room.setdefault(ev.room_id, []).append(ev)
-
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-            " )"
-            " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
-            " )"
-        )
-
-        txn.executemany(query, [
-            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id, _ in ev.prev_events
-            if not ev.internal_metadata.is_outlier()
-        ])
-
-        query = (
-            "DELETE FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
-        )
-        txn.executemany(
-            query,
-            [
-                (ev.event_id, ev.room_id) for ev in events
-                if not ev.internal_metadata.is_outlier()
-            ]
-        )
-
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
         """For a given room_id and stream_ordering, return the forward
         extremeties of the room at that point in "time".
@@ -371,28 +270,6 @@ class EventFederationStore(SQLBaseStore):
             get_forward_extremeties_for_room_txn
         )
 
-    def _delete_old_forward_extrem_cache(self):
-        def _delete_old_forward_extrem_cache_txn(txn):
-            # Delete entries older than a month, while making sure we don't delete
-            # the only entries for a room.
-            sql = ("""
-                DELETE FROM stream_ordering_to_exterm
-                WHERE
-                room_id IN (
-                    SELECT room_id
-                    FROM stream_ordering_to_exterm
-                    WHERE stream_ordering > ?
-                ) AND stream_ordering < ?
-            """)
-            txn.execute(
-                sql,
-                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
-            )
-        return self.runInteraction(
-            "_delete_old_forward_extrem_cache",
-            _delete_old_forward_extrem_cache_txn
-        )
-
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
@@ -522,6 +399,135 @@ class EventFederationStore(SQLBaseStore):
 
         return event_results
 
+
+class EventFederationStore(EventFederationWorkerStore):
+    """ Responsible for storing and serving up the various graphs associated
+    with an event. Including the main event graph and the auth chains for an
+    event.
+
+    Also has methods for getting the front (latest) and back (oldest) edges
+    of the event graphs. These are used to generate the parents for new events
+    and backfilling from another server respectively.
+    """
+
+    EVENT_AUTH_STATE_ONLY = "event_auth_state_only"
+
+    def __init__(self, db_conn, hs):
+        super(EventFederationStore, self).__init__(db_conn, hs)
+
+        self.register_background_update_handler(
+            self.EVENT_AUTH_STATE_ONLY,
+            self._background_delete_non_state_event_auth,
+        )
+
+        hs.get_clock().looping_call(
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+        )
+
+    def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+        min_depth = self._get_min_depth_interaction(txn, room_id)
+
+        if min_depth and depth >= min_depth:
+            return
+
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
+
+    def _handle_mult_prev_events(self, txn, events):
+        """
+        For the given event, update the event edges table and forward and
+        backward extremities tables.
+        """
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
+                    "event_id": ev.event_id,
+                    "prev_event_id": e_id,
+                    "room_id": ev.room_id,
+                    "is_state": False,
+                }
+                for ev in events
+                for e_id, _ in ev.prev_events
+            ],
+        )
+
+        self._update_backward_extremeties(txn, events)
+
+    def _update_backward_extremeties(self, txn, events):
+        """Updates the event_backward_extremities tables based on the new/updated
+        events being persisted.
+
+        This is called for new events *and* for events that were outliers, but
+        are now being persisted as non-outliers.
+
+        Forward extremities are handled when we first start persisting the events.
+        """
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
+
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
+
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
+
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [
+                (ev.event_id, ev.room_id) for ev in events
+                if not ev.internal_metadata.is_outlier()
+            ]
+        )
+
+    def _delete_old_forward_extrem_cache(self):
+        def _delete_old_forward_extrem_cache_txn(txn):
+            # Delete entries older than a month, while making sure we don't delete
+            # the only entries for a room.
+            sql = ("""
+                DELETE FROM stream_ordering_to_exterm
+                WHERE
+                room_id IN (
+                    SELECT room_id
+                    FROM stream_ordering_to_exterm
+                    WHERE stream_ordering > ?
+                ) AND stream_ordering < ?
+            """)
+            txn.execute(
+                sql,
+                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
+            )
+        return self.runInteraction(
+            "_delete_old_forward_extrem_cache",
+            _delete_old_forward_extrem_cache_txn
+        )
+
     def clean_room_for_join(self, room_id):
         return self.runInteraction(
             "clean_room_for_join",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e8cd8fc8ec..0a906a3966 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
 from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
 
 
 class EventPushActionsWorkerStore(SQLBaseStore):
+    def __init__(self, db_conn, hs):
+        super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+        # These get correctly set by _find_stream_orderings_for_times_txn
+        self.stream_ordering_month_ago = None
+        self.stream_ordering_day_ago = None
+
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[],
+            exception_callbacks=[],
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 10 * 60 * 1000
+        )
+
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
@@ -443,6 +464,128 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             desc="remove_push_actions_from_staging",
         )
 
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
+        )
+        logger.info("Searching for stream ordering 1 day ago")
+        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 day ago: it's %d",
+            self.stream_ordering_day_ago
+        )
+
+    def find_first_stream_ordering_after_ts(self, ts):
+        """Gets the stream ordering corresponding to a given timestamp.
+
+        Specifically, finds the stream_ordering of the first event that was
+        received on or after the timestamp. This is done by a binary search on
+        the events table, since there is no index on received_ts, so is
+        relatively slow.
+
+        Args:
+            ts (int): timestamp in millis
+
+        Returns:
+            Deferred[int]: stream ordering of the first event received on/after
+                the timestamp
+        """
+        return self.runInteraction(
+            "_find_first_stream_ordering_after_ts_txn",
+            self._find_first_stream_ordering_after_ts_txn,
+            ts,
+        )
+
+    @staticmethod
+    def _find_first_stream_ordering_after_ts_txn(txn, ts):
+        """
+        Find the stream_ordering of the first event that was received on or
+        after a given timestamp. This is relatively slow as there is no index
+        on received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+
+        Args:
+            txn (twisted.enterprise.adbapi.Transaction):
+            ts (int): timestamp to search for
+
+        Returns:
+            int: stream ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        # We want the first stream_ordering in which received_ts is greater
+        # than or equal to ts. Call this point X.
+        #
+        # We maintain the invariants:
+        #
+        #   range_start <= X <= range_end
+        #
+        range_start = 0
+        range_end = max_stream_ordering + 1
+
+        # Given a stream_ordering, look up the timestamp at that
+        # stream_ordering.
+        #
+        # The array may be sparse (we may be missing some stream_orderings).
+        # We treat the gaps as the same as having the same value as the
+        # preceding entry, because we will pick the lowest stream_ordering
+        # which satisfies our requirement of received_ts >= ts.
+        #
+        # For example, if our array of events indexed by stream_ordering is
+        # [10, <none>, 20], we should treat this as being equivalent to
+        # [10, 10, 20].
+        #
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering <= ?"
+            " ORDER BY stream_ordering DESC"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 0:
+            middle = (range_end + range_start) // 2
+            txn.execute(sql, (middle,))
+            row = txn.fetchone()
+            if row is None:
+                # no rows with stream_ordering<=middle
+                range_start = middle + 1
+                continue
+
+            middle_ts = row[0]
+            if ts > middle_ts:
+                # we got a timestamp lower than the one we were looking for.
+                # definitely need to look higher: X > middle.
+                range_start = middle + 1
+            else:
+                # we got a timestamp higher than (or the same as) the one we
+                # were looking for. We aren't yet sure about the point we
+                # looked up, but we can be sure that X <= middle.
+                range_end = middle
+
+        return range_end
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -654,69 +797,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         """, (room_id, user_id, stream_ordering))
 
     @defer.inlineCallbacks
-    def _find_stream_orderings_for_times(self):
-        yield self.runInteraction(
-            "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn
-        )
-
-    def _find_stream_orderings_for_times_txn(self, txn):
-        logger.info("Searching for stream ordering 1 month ago")
-        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 month ago: it's %d",
-            self.stream_ordering_month_ago
-        )
-        logger.info("Searching for stream ordering 1 day ago")
-        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 day ago: it's %d",
-            self.stream_ordering_day_ago
-        )
-
-    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
-        """
-        Find the stream_ordering of the first event that was received after
-        a given timestamp. This is relatively slow as there is no index on
-        received_ts but we can then use this to delete push actions before
-        this.
-
-        received_ts must necessarily be in the same order as stream_ordering
-        and stream_ordering is indexed, so we manually binary search using
-        stream_ordering
-        """
-        txn.execute("SELECT MAX(stream_ordering) FROM events")
-        max_stream_ordering = txn.fetchone()[0]
-
-        if max_stream_ordering is None:
-            return 0
-
-        range_start = 0
-        range_end = max_stream_ordering
-
-        sql = (
-            "SELECT received_ts FROM events"
-            " WHERE stream_ordering > ?"
-            " ORDER BY stream_ordering"
-            " LIMIT 1"
-        )
-
-        while range_end - range_start > 1:
-            middle = int((range_end + range_start) / 2)
-            txn.execute(sql, (middle,))
-            middle_ts = txn.fetchone()[0]
-            if ts > middle_ts:
-                range_start = middle
-            else:
-                range_end = middle
-
-        return range_end
-
-    @defer.inlineCallbacks
     def _rotate_notifs(self):
         if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
             return
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d5a2bf5254..8392e13b52 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -283,10 +283,11 @@ class EventsStore(EventsWorkerStore):
     def _maybe_start_persisting(self, room_id):
         @defer.inlineCallbacks
         def persisting_queue(item):
-            yield self._persist_events(
-                item.events_and_contexts,
-                backfilled=item.backfilled,
-            )
+            with Measure(self._clock, "persist_events"):
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
@@ -754,7 +755,7 @@ class EventsStore(EventsWorkerStore):
 
                 for member in members_changed:
                     self._invalidate_cache_and_stream(
-                        txn, self.get_rooms_for_user, (member,)
+                        txn, self.get_rooms_for_user_with_stream_ordering, (member,)
                     )
 
                 for host in set(get_domain_from_id(u) for u in members_changed):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index ec02e73bc2..8612bd5ecc 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -21,14 +21,7 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore
 
 
-class ProfileStore(SQLBaseStore):
-    def create_profile(self, user_localpart):
-        return self._simple_insert(
-            table="profiles",
-            values={"user_id": user_localpart},
-            desc="create_profile",
-        )
-
+class ProfileWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_profileinfo(self, user_localpart):
         try:
@@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore):
             desc="get_profile_displayname",
         )
 
-    def set_profile_displayname(self, user_localpart, new_displayname):
-        return self._simple_update_one(
-            table="profiles",
-            keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
-            desc="set_profile_displayname",
-        )
-
     def get_profile_avatar_url(self, user_localpart):
         return self._simple_select_one_onecol(
             table="profiles",
@@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore):
             desc="get_profile_avatar_url",
         )
 
-    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
-        return self._simple_update_one(
-            table="profiles",
-            keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
-            desc="set_profile_avatar_url",
-        )
-
     def get_from_remote_profile_cache(self, user_id):
         return self._simple_select_one(
             table="remote_profile_cache",
@@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore):
             desc="get_from_remote_profile_cache",
         )
 
+
+class ProfileStore(ProfileWorkerStore):
+    def create_profile(self, user_localpart):
+        return self._simple_insert(
+            table="profiles",
+            values={"user_id": user_localpart},
+            desc="create_profile",
+        )
+
+    def set_profile_displayname(self, user_localpart, new_displayname):
+        return self._simple_update_one(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            updatevalues={"displayname": new_displayname},
+            desc="set_profile_displayname",
+        )
+
+    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
+        return self._simple_update_one(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            updatevalues={"avatar_url": new_avatar_url},
+            desc="set_profile_avatar_url",
+        )
+
     def add_remote_profile_cache(self, user_id, displayname, avatar_url):
         """Ensure we are caching the remote user's profiles.
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 95f75d6df1..d809b2ba46 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -19,10 +19,70 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError, Codes
 from synapse.storage import background_updates
+from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 
-class RegistrationStore(background_updates.BackgroundUpdateStore):
+class RegistrationWorkerStore(SQLBaseStore):
+    @cached()
+    def get_user_by_id(self, user_id):
+        return self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash", "is_guest"],
+            allow_none=True,
+            desc="get_user_by_id",
+        )
+
+    @cached()
+    def get_user_by_access_token(self, token):
+        """Get a user from the given access token.
+
+        Args:
+            token (str): The access token of a user.
+        Returns:
+            defer.Deferred: None, if the token did not match, otherwise dict
+                including the keys `name`, `is_guest`, `device_id`, `token_id`.
+        """
+        return self.runInteraction(
+            "get_user_by_access_token",
+            self._query_for_auth,
+            token
+        )
+
+    @defer.inlineCallbacks
+    def is_server_admin(self, user):
+        res = yield self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user.to_string()},
+            retcol="admin",
+            allow_none=True,
+            desc="is_server_admin",
+        )
+
+        defer.returnValue(res if res else False)
+
+    def _query_for_auth(self, txn, token):
+        sql = (
+            "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
+            " access_tokens.device_id"
+            " FROM users"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
+            " WHERE token = ?"
+        )
+
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
+        if rows:
+            return rows[0]
+
+        return None
+
+
+class RegistrationStore(RegistrationWorkerStore,
+                        background_updates.BackgroundUpdateStore):
 
     def __init__(self, db_conn, hs):
         super(RegistrationStore, self).__init__(db_conn, hs)
@@ -187,18 +247,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         )
         txn.call_after(self.is_guest.invalidate, (user_id,))
 
-    @cached()
-    def get_user_by_id(self, user_id):
-        return self._simple_select_one(
-            table="users",
-            keyvalues={
-                "name": user_id,
-            },
-            retcols=["name", "password_hash", "is_guest"],
-            allow_none=True,
-            desc="get_user_by_id",
-        )
-
     def get_users_by_id_case_insensitive(self, user_id):
         """Gets users that match user_id case insensitively.
         Returns a mapping of user_id -> password_hash.
@@ -304,34 +352,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         return self.runInteraction("delete_access_token", f)
 
-    @cached()
-    def get_user_by_access_token(self, token):
-        """Get a user from the given access token.
-
-        Args:
-            token (str): The access token of a user.
-        Returns:
-            defer.Deferred: None, if the token did not match, otherwise dict
-                including the keys `name`, `is_guest`, `device_id`, `token_id`.
-        """
-        return self.runInteraction(
-            "get_user_by_access_token",
-            self._query_for_auth,
-            token
-        )
-
-    @defer.inlineCallbacks
-    def is_server_admin(self, user):
-        res = yield self._simple_select_one_onecol(
-            table="users",
-            keyvalues={"name": user.to_string()},
-            retcol="admin",
-            allow_none=True,
-            desc="is_server_admin",
-        )
-
-        defer.returnValue(res if res else False)
-
     @cachedInlineCallbacks()
     def is_guest(self, user_id):
         res = yield self._simple_select_one_onecol(
@@ -344,22 +364,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         defer.returnValue(res if res else False)
 
-    def _query_for_auth(self, txn, token):
-        sql = (
-            "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
-            " access_tokens.device_id"
-            " FROM users"
-            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
-            " WHERE token = ?"
-        )
-
-        txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return rows[0]
-
-        return None
-
     @defer.inlineCallbacks
     def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
         yield self._simple_upsert("user_threepids", {
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fff6652e05..34ed84ea22 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
@@ -38,7 +39,138 @@ RatelimitOverride = collections.namedtuple(
 )
 
 
-class RoomStore(SearchStore):
+class RoomWorkerStore(SQLBaseStore):
+    def get_public_room_ids(self):
+        return self._simple_select_onecol(
+            table="rooms",
+            keyvalues={
+                "is_public": True,
+            },
+            retcol="room_id",
+            desc="get_public_room_ids",
+        )
+
+    @cached(num_args=2, max_entries=100)
+    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+        """Get pulbic rooms for a particular list, or across all lists.
+
+        Args:
+            stream_id (int)
+            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+                means the main list, None means all lsits.
+        """
+        return self.runInteraction(
+            "get_public_room_ids_at_stream_id",
+            self.get_public_room_ids_at_stream_id_txn,
+            stream_id, network_tuple=network_tuple
+        )
+
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+                                             network_tuple):
+        return {
+            rm
+            for rm, vis in self.get_published_at_stream_id_txn(
+                txn, stream_id, network_tuple=network_tuple
+            ).items()
+            if vis
+        }
+
+    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+        if network_tuple:
+            # We want to get from a particular list. No aggregation required.
+
+            sql = ("""
+                SELECT room_id, visibility FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT room_id, max(stream_id) AS stream_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ? %s
+                    GROUP BY room_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            if network_tuple.appservice_id is not None:
+                txn.execute(
+                    sql % ("AND appservice_id = ? AND network_id = ?",),
+                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+                )
+            else:
+                txn.execute(
+                    sql % ("AND appservice_id IS NULL",),
+                    (stream_id,)
+                )
+            return dict(txn)
+        else:
+            # We want to get from all lists, so we need to aggregate the results
+
+            logger.info("Executing full list")
+
+            sql = ("""
+                SELECT room_id, visibility
+                FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT
+                        room_id, max(stream_id) AS stream_id, appservice_id,
+                        network_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ?
+                    GROUP BY room_id, appservice_id, network_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            txn.execute(
+                sql,
+                (stream_id,)
+            )
+
+            results = {}
+            # A room is visible if its visible on any list.
+            for room_id, visibility in txn:
+                results[room_id] = bool(visibility) or results.get(room_id, False)
+
+            return results
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id,
+                                network_tuple):
+        def get_public_room_changes_txn(txn):
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(
+                txn, prev_stream_id, network_tuple
+            )
+
+            now_rooms_dict = self.get_published_at_stream_id_txn(
+                txn, new_stream_id, network_tuple
+            )
+
+            now_rooms_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if vis
+            )
+            now_rooms_not_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if not vis
+            )
+
+            newly_visible = now_rooms_visible - then_rooms
+            newly_unpublished = now_rooms_not_visible & then_rooms
+
+            return newly_visible, newly_unpublished
+
+        return self.runInteraction(
+            "get_public_room_changes", get_public_room_changes_txn
+        )
+
+    @cached(max_entries=10000)
+    def is_room_blocked(self, room_id):
+        return self._simple_select_one_onecol(
+            table="blocked_rooms",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="1",
+            allow_none=True,
+            desc="is_room_blocked",
+        )
+
+
+class RoomStore(RoomWorkerStore, SearchStore):
 
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
@@ -225,16 +357,6 @@ class RoomStore(SearchStore):
             )
         self.hs.get_notifier().on_new_replication_data()
 
-    def get_public_room_ids(self):
-        return self._simple_select_onecol(
-            table="rooms",
-            keyvalues={
-                "is_public": True,
-            },
-            retcol="room_id",
-            desc="get_public_room_ids",
-        )
-
     def get_room_count(self):
         """Retrieve a list of all rooms
         """
@@ -326,113 +448,6 @@ class RoomStore(SearchStore):
     def get_current_public_room_stream_id(self):
         return self._public_room_id_gen.get_current_token()
 
-    @cached(num_args=2, max_entries=100)
-    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
-        """Get pulbic rooms for a particular list, or across all lists.
-
-        Args:
-            stream_id (int)
-            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
-                means the main list, None means all lsits.
-        """
-        return self.runInteraction(
-            "get_public_room_ids_at_stream_id",
-            self.get_public_room_ids_at_stream_id_txn,
-            stream_id, network_tuple=network_tuple
-        )
-
-    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
-                                             network_tuple):
-        return {
-            rm
-            for rm, vis in self.get_published_at_stream_id_txn(
-                txn, stream_id, network_tuple=network_tuple
-            ).items()
-            if vis
-        }
-
-    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
-        if network_tuple:
-            # We want to get from a particular list. No aggregation required.
-
-            sql = ("""
-                SELECT room_id, visibility FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT room_id, max(stream_id) AS stream_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ? %s
-                    GROUP BY room_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            if network_tuple.appservice_id is not None:
-                txn.execute(
-                    sql % ("AND appservice_id = ? AND network_id = ?",),
-                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
-                )
-            else:
-                txn.execute(
-                    sql % ("AND appservice_id IS NULL",),
-                    (stream_id,)
-                )
-            return dict(txn)
-        else:
-            # We want to get from all lists, so we need to aggregate the results
-
-            logger.info("Executing full list")
-
-            sql = ("""
-                SELECT room_id, visibility
-                FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT
-                        room_id, max(stream_id) AS stream_id, appservice_id,
-                        network_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ?
-                    GROUP BY room_id, appservice_id, network_id
-                ) grouped USING (room_id, stream_id)
-            """)
-
-            txn.execute(
-                sql,
-                (stream_id,)
-            )
-
-            results = {}
-            # A room is visible if its visible on any list.
-            for room_id, visibility in txn:
-                results[room_id] = bool(visibility) or results.get(room_id, False)
-
-            return results
-
-    def get_public_room_changes(self, prev_stream_id, new_stream_id,
-                                network_tuple):
-        def get_public_room_changes_txn(txn):
-            then_rooms = self.get_public_room_ids_at_stream_id_txn(
-                txn, prev_stream_id, network_tuple
-            )
-
-            now_rooms_dict = self.get_published_at_stream_id_txn(
-                txn, new_stream_id, network_tuple
-            )
-
-            now_rooms_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if vis
-            )
-            now_rooms_not_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if not vis
-            )
-
-            newly_visible = now_rooms_visible - then_rooms
-            newly_unpublished = now_rooms_not_visible & then_rooms
-
-            return newly_visible, newly_unpublished
-
-        return self.runInteraction(
-            "get_public_room_changes", get_public_room_changes_txn
-        )
-
     def get_all_new_public_rooms(self, prev_id, current_id, limit):
         def get_all_new_public_rooms(txn):
             sql = ("""
@@ -482,18 +497,6 @@ class RoomStore(SearchStore):
         else:
             defer.returnValue(None)
 
-    @cached(max_entries=10000)
-    def is_room_blocked(self, room_id):
-        return self._simple_select_one_onecol(
-            table="blocked_rooms",
-            keyvalues={
-                "room_id": room_id,
-            },
-            retcol="1",
-            allow_none=True,
-            desc="is_room_blocked",
-        )
-
     @defer.inlineCallbacks
     def block_room(self, room_id, user_id):
         yield self._simple_insert(
@@ -504,7 +507,11 @@ class RoomStore(SearchStore):
             },
             desc="block_room",
         )
-        self.is_room_blocked.invalidate((room_id,))
+        yield self.runInteraction(
+            "block_room_invalidation",
+            self._invalidate_cache_and_stream,
+            self.is_room_blocked, (room_id,),
+        )
 
     def get_media_mxcs_in_room(self, room_id):
         """Retrieves all the local and remote media MXC URIs in a given room
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index c097b36372..76820a1da1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -38,6 +38,11 @@ RoomsForUser = namedtuple(
     ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
+GetRoomsForUserWithStreamOrdering = namedtuple(
+    "_GetRoomsForUserWithStreamOrdering",
+    ("room_id", "stream_ordering",)
+)
+
 
 # We store this using a namedtuple so that we save about 3x space over using a
 # dict.
@@ -181,12 +186,32 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         return results
 
     @cachedInlineCallbacks(max_entries=500000, iterable=True)
-    def get_rooms_for_user(self, user_id):
+    def get_rooms_for_user_with_stream_ordering(self, user_id):
         """Returns a set of room_ids the user is currently joined to
+
+        Args:
+            user_id (str)
+
+        Returns:
+            Deferred[frozenset[GetRoomsForUserWithStreamOrdering]]: Returns
+            the rooms the user is in currently, along with the stream ordering
+            of the most recent join for that user and room.
         """
         rooms = yield self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
+        defer.returnValue(frozenset(
+            GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+            for r in rooms
+        ))
+
+    @defer.inlineCallbacks
+    def get_rooms_for_user(self, user_id, on_invalidate=None):
+        """Returns a set of room_ids the user is currently joined to
+        """
+        rooms = yield self.get_rooms_for_user_with_stream_ordering(
+            user_id, on_invalidate=on_invalidate,
+        )
         defer.returnValue(frozenset(r.room_id for r in rooms))
 
     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 67d5d9969a..9e6eaaa532 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -22,12 +22,12 @@ from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.util.caches.descriptors import cached, cachedList
 
 
-class SignatureStore(SQLBaseStore):
-    """Persistence for event signatures and hashes"""
-
+class SignatureWorkerStore(SQLBaseStore):
     @cached()
     def get_event_reference_hash(self, event_id):
-        return self._get_event_reference_hashes_txn(event_id)
+        # This is a dummy function to allow get_event_reference_hashes
+        # to use its cache
+        raise NotImplementedError()
 
     @cachedList(cached_method_name="get_event_reference_hash",
                 list_name="event_ids", num_args=1)
@@ -74,6 +74,10 @@ class SignatureStore(SQLBaseStore):
         txn.execute(query, (event_id, ))
         return {k: v for k, v in txn}
 
+
+class SignatureStore(SignatureWorkerStore):
+    """Persistence for event signatures and hashes"""
+
     def _store_event_reference_hashes_txn(self, txn, events):
         """Store a hash for a PDU
         Args:
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 121fb6b1d6..1123f5010b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -240,6 +240,9 @@ class StateGroupWorkerStore(SQLBaseStore):
                     (
                         "AND type = ? AND state_key = ?",
                         (etype, state_key)
+                    ) if state_key is not None else (
+                        "AND type = ?",
+                        (etype,)
                     )
                     for etype, state_key in types
                 ]
@@ -259,10 +262,19 @@ class StateGroupWorkerStore(SQLBaseStore):
                         key = (typ, state_key)
                         results[group][key] = event_id
         else:
+            where_args = []
+            where_clauses = []
+            wildcard_types = False
             if types is not None:
-                where_clause = "AND (%s)" % (
-                    " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
-                )
+                for typ in types:
+                    if typ[1] is None:
+                        where_clauses.append("(type = ?)")
+                        where_args.extend(typ[0])
+                        wildcard_types = True
+                    else:
+                        where_clauses.append("(type = ? AND state_key = ?)")
+                        where_args.extend([typ[0], typ[1]])
+                where_clause = "AND (%s)" % (" OR ".join(where_clauses))
             else:
                 where_clause = ""
 
@@ -279,7 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                     # after we finish deduping state, which requires this func)
                     args = [next_group]
                     if types:
-                        args.extend(i for typ in types for i in typ)
+                        args.extend(where_args)
 
                     txn.execute(
                         "SELECT type, state_key, event_id FROM state_groups_state"
@@ -292,9 +304,17 @@ class StateGroupWorkerStore(SQLBaseStore):
                         if (typ, state_key) not in results[group]
                     )
 
-                    # If the lengths match then we must have all the types,
-                    # so no need to go walk further down the tree.
-                    if types is not None and len(results[group]) == len(types):
+                    # If the number of entries in the (type,state_key)->event_id dict
+                    # matches the number of (type,state_keys) types we were searching
+                    # for, then we must have found them all, so no need to go walk
+                    # further down the tree... UNLESS our types filter contained
+                    # wildcards (i.e. Nones) in which case we have to do an exhaustive
+                    # search
+                    if (
+                        types is not None and
+                        not wildcard_types and
+                        len(results[group]) == len(types)
+                    ):
                         break
 
                     next_group = self._simple_select_one_onecol_txn(
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..2956c3b3e0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,16 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
 from synapse.util.caches.descriptors import cached
-from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
+import abc
 import logging
 
 
@@ -143,81 +146,41 @@ def filter_to_clause(event_filter):
     return " AND ".join(clauses), args
 
 
-class StreamStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
-        # NB this lives here instead of appservice.py so we can reuse the
-        # 'private' StreamToken class in this file.
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            defer.returnValue(([], to_key))
-            return
-
-        # select all the events between from/to with a sensible limit
-        sql = (
-            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
-            "e.stream_ordering FROM events AS e "
-            "LEFT JOIN state_events as s ON "
-            "e.event_id = s.event_id "
-            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "limit": limit
-        }
-
-        def f(txn):
-            # pull out all the events between the tokens
-            txn.execute(sql, (from_id.stream, to_id.stream,))
-            rows = self.cursor_to_dict(txn)
-
-            # Logic:
-            #  - We want ALL events which match the AS room_id regex
-            #  - We want ALL events which match the rooms represented by the AS
-            #    room_alias regex
-            #  - We want ALL events for rooms that AS users have joined.
-            # This is currently supported via get_app_service_rooms (which is
-            # used for the Notifier listener rooms). We can't reasonably make a
-            # SQL query for these room IDs, so we'll pull all the events between
-            # from/to and filter in python.
-            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
-            room_ids_for_as = [r.room_id for r in rooms_for_as]
-
-            def app_service_interested(row):
-                if row["room_id"] in room_ids_for_as:
-                    return True
-
-                if row["type"] == EventTypes.Member:
-                    if service.is_interested_in_user(row.get("state_key")):
-                        return True
-                return False
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+    """This is an abstract base class where subclasses must implement
+    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
+    which can be called in the initializer.
+    """
 
-            return [r for r in rows if app_service_interested(r)]
+    __metaclass__ = abc.ABCMeta
 
-        rows = yield self.runInteraction("get_appservice_room_stream", f)
+    def __init__(self, db_conn, hs):
+        super(StreamWorkerStore, self).__init__(db_conn, hs)
 
-        ret = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
+        events_max = self.get_room_max_stream_ordering()
+        event_cache_prefill, min_event_val = self._get_cache_dict(
+            db_conn, "events",
+            entity_column="room_id",
+            stream_column="stream_ordering",
+            max_value=events_max,
+        )
+        self._events_stream_cache = StreamChangeCache(
+            "EventsRoomStreamChangeCache", min_event_val,
+            prefilled_cache=event_cache_prefill,
+        )
+        self._membership_stream_cache = StreamChangeCache(
+            "MembershipStreamChangeCache", events_max,
         )
 
-        self._set_before_and_after(ret, rows, topo_order=from_id is None)
+        self._stream_order_on_start = self.get_room_max_stream_ordering()
 
-        if rows:
-            key = "s%d" % max(r["stream_ordering"] for r in rows)
-        else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = to_key
+    @abc.abstractmethod
+    def get_room_max_stream_ordering(self):
+        raise NotImplementedError()
 
-        defer.returnValue((ret, key))
+    @abc.abstractmethod
+    def get_room_min_stream_ordering(self):
+        raise NotImplementedError()
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +344,6 @@ class StreamStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
-        # Tokens really represent positions between elements, but we use
-        # the convention of pointing to the event before the gap. Hence
-        # we have a bit of asymmetry when it comes to equalities.
-        args = [False, room_id]
-        if direction == 'b':
-            order = "DESC"
-            bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-        else:
-            order = "ASC"
-            bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
-            )
-            if to_key:
-                bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
-                ))
-
-        filter_clause, filter_args = filter_to_clause(event_filter)
-
-        if filter_clause:
-            bounds += " AND " + filter_clause
-            args.extend(filter_args)
-
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
-
-        sql = (
-            "SELECT * FROM events"
-            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
-            " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
-        ) % {
-            "bounds": bounds,
-            "order": order,
-            "limit": limit_str
-        }
-
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
-
-            return rows, next_token,
-
-        rows, token = yield self.runInteraction("paginate_room_events", f)
-
-        events = yield self._get_events(
-            [r["event_id"] for r in rows],
-            get_prev_content=True
-        )
-
-        self._set_before_and_after(events, rows)
-
-        defer.returnValue((events, token))
-
-    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
         rows, token = yield self.get_recent_event_ids_for_room(
             room_id, limit, end_token, from_token
@@ -534,6 +415,33 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+    def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
+        """Gets details of the first event in a room at or after a stream ordering
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        def _f(txn):
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering >= ?"
+                " AND NOT outlier"
+                " ORDER BY stream_ordering"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (room_id, stream_ordering, ))
+            return txn.fetchone()
+
+        return self.runInteraction(
+            "get_room_event_after_stream_ordering", _f,
+        )
+
     @defer.inlineCallbacks
     def get_room_events_max_id(self, room_id=None):
         """Returns the current token for rooms stream.
@@ -542,7 +450,7 @@ class StreamStore(SQLBaseStore):
         `room_id` causes it to return the current room specific topological
         token.
         """
-        token = yield self._stream_id_gen.get_current_token()
+        token = yield self.get_room_max_stream_ordering()
         if room_id is None:
             defer.returnValue("s%d" % (token,))
         else:
@@ -552,12 +460,6 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
-
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
         Args:
@@ -832,3 +734,93 @@ class StreamStore(SQLBaseStore):
 
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        # Tokens really represent positions between elements, but we use
+        # the convention of pointing to the event before the gap. Hence
+        # we have a bit of asymmetry when it comes to equalities.
+        args = [False, room_id]
+        if direction == 'b':
+            order = "DESC"
+            bounds = upper_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, lower_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+        else:
+            order = "ASC"
+            bounds = lower_bound(
+                RoomStreamToken.parse(from_key), self.database_engine
+            )
+            if to_key:
+                bounds = "%s AND %s" % (bounds, upper_bound(
+                    RoomStreamToken.parse(to_key), self.database_engine
+                ))
+
+        filter_clause, filter_args = filter_to_clause(event_filter)
+
+        if filter_clause:
+            bounds += " AND " + filter_clause
+            args.extend(filter_args)
+
+        if int(limit) > 0:
+            args.append(int(limit))
+            limit_str = " LIMIT ?"
+        else:
+            limit_str = ""
+
+        sql = (
+            "SELECT * FROM events"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
+        ) % {
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
+
+        def f(txn):
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+
+            if rows:
+                topo = rows[-1]["topological_ordering"]
+                toke = rows[-1]["stream_ordering"]
+                if direction == 'b':
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
+                    toke -= 1
+                next_token = str(RoomStreamToken(topo, toke))
+            else:
+                # TODO (erikj): We should work out what to do here instead.
+                next_token = to_key if to_key else from_key
+
+            return rows, next_token,
+
+        rows, token = yield self.runInteraction("paginate_room_events", f)
+
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))