summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py438
-rw-r--r--synapse/storage/background_updates.py14
-rw-r--r--synapse/storage/data_stores/main/__init__.py79
-rw-r--r--synapse/storage/data_stores/main/account_data.py24
-rw-r--r--synapse/storage/data_stores/main/appservice.py10
-rw-r--r--synapse/storage/data_stores/main/cache.py131
-rw-r--r--synapse/storage/data_stores/main/client_ips.py10
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py23
-rw-r--r--synapse/storage/data_stores/main/devices.py60
-rw-r--r--synapse/storage/data_stores/main/directory.py12
-rw-r--r--synapse/storage/data_stores/main/e2e_room_keys.py238
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py49
-rw-r--r--synapse/storage/data_stores/main/event_federation.py16
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py12
-rw-r--r--synapse/storage/data_stores/main/events.py183
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py51
-rw-r--r--synapse/storage/data_stores/main/events_worker.py53
-rw-r--r--synapse/storage/data_stores/main/filtering.py4
-rw-r--r--synapse/storage/data_stores/main/group_server.py128
-rw-r--r--synapse/storage/data_stores/main/keys.py6
-rw-r--r--synapse/storage/data_stores/main/media_repository.py30
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py6
-rw-r--r--synapse/storage/data_stores/main/openid.py2
-rw-r--r--synapse/storage/data_stores/main/presence.py8
-rw-r--r--synapse/storage/data_stores/main/profile.py24
-rw-r--r--synapse/storage/data_stores/main/push_rule.py24
-rw-r--r--synapse/storage/data_stores/main/pusher.py26
-rw-r--r--synapse/storage/data_stores/main/receipts.py18
-rw-r--r--synapse/storage/data_stores/main/registration.py187
-rw-r--r--synapse/storage/data_stores/main/rejections.py4
-rw-r--r--synapse/storage/data_stores/main/relations.py4
-rw-r--r--synapse/storage/data_stores/main/room.py291
-rw-r--r--synapse/storage/data_stores/main/roommember.py31
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql21
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql17
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/room_retention.sql33
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql3
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql22
-rw-r--r--synapse/storage/data_stores/main/search.py8
-rw-r--r--synapse/storage/data_stores/main/signatures.py2
-rw-r--r--synapse/storage/data_stores/main/state.py36
-rw-r--r--synapse/storage/data_stores/main/state_deltas.py2
-rw-r--r--synapse/storage/data_stores/main/stats.py30
-rw-r--r--synapse/storage/data_stores/main/stream.py19
-rw-r--r--synapse/storage/data_stores/main/tags.py10
-rw-r--r--synapse/storage/data_stores/main/transactions.py12
-rw-r--r--synapse/storage/data_stores/main/user_directory.py56
-rw-r--r--synapse/storage/data_stores/main/user_erasure_store.py4
-rw-r--r--synapse/storage/prepare_database.py2
49 files changed, 1564 insertions, 909 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1a2b7ebe25..0d7c7dff27 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -14,11 +14,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import itertools
 import logging
 import random
 import sys
-import threading
 import time
 from typing import Iterable, Tuple
 
@@ -35,8 +33,6 @@ from synapse.logging.context import LoggingContext, make_deferred_yieldable
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
-from synapse.util import batch_iter
-from synapse.util.caches.descriptors import Cache
 from synapse.util.stringutils import exception_to_unicode
 
 # import a function which will return a monotonic time, in seconds
@@ -79,10 +75,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
     "event_search": "event_search_event_id_idx",
 }
 
-# This is a special cache name we use to batch multiple invalidations of caches
-# based on the current state when notifying workers over replication.
-_CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
-
 
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
@@ -237,23 +229,11 @@ class SQLBaseStore(object):
         #   to watch it
         self._txn_perf_counters = PerformanceCounters()
 
-        self._get_event_cache = Cache(
-            "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
-        )
-
-        self._event_fetch_lock = threading.Condition()
-        self._event_fetch_list = []
-        self._event_fetch_ongoing = 0
-
-        self._pending_ds = []
-
         self.database_engine = hs.database_engine
 
         # A set of tables that are not safe to use native upserts in.
         self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
 
-        self._account_validity = self.hs.config.account_validity
-
         # We add the user_directory_search table to the blacklist on SQLite
         # because the existing search table does not have an index, making it
         # unsafe to use native upserts.
@@ -272,14 +252,6 @@ class SQLBaseStore(object):
 
         self.rand = random.SystemRandom()
 
-        if self._account_validity.enabled:
-            self._clock.call_later(
-                0.0,
-                run_as_background_process,
-                "account_validity_set_expiration_dates",
-                self._set_expiration_date_when_missing,
-            )
-
     @defer.inlineCallbacks
     def _check_safe_to_upsert(self):
         """
@@ -290,7 +262,7 @@ class SQLBaseStore(object):
 
         If the background updates have not completed, wait 15 sec and check again.
         """
-        updates = yield self._simple_select_list(
+        updates = yield self.simple_select_list(
             "background_updates",
             keyvalues=None,
             retcols=["update_name"],
@@ -312,65 +284,6 @@ class SQLBaseStore(object):
                 self._check_safe_to_upsert,
             )
 
-    @defer.inlineCallbacks
-    def _set_expiration_date_when_missing(self):
-        """
-        Retrieves the list of registered users that don't have an expiration date, and
-        adds an expiration date for each of them.
-        """
-
-        def select_users_with_no_expiration_date_txn(txn):
-            """Retrieves the list of registered users with no expiration date from the
-            database, filtering out deactivated users.
-            """
-            sql = (
-                "SELECT users.name FROM users"
-                " LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
-                " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
-            )
-            txn.execute(sql, [])
-
-            res = self.cursor_to_dict(txn)
-            if res:
-                for user in res:
-                    self.set_expiration_date_for_user_txn(
-                        txn, user["name"], use_delta=True
-                    )
-
-        yield self.runInteraction(
-            "get_users_with_no_expiration_date",
-            select_users_with_no_expiration_date_txn,
-        )
-
-    def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
-        """Sets an expiration date to the account with the given user ID.
-
-        Args:
-             user_id (str): User ID to set an expiration date for.
-             use_delta (bool): If set to False, the expiration date for the user will be
-                now + validity period. If set to True, this expiration date will be a
-                random value in the [now + period - d ; now + period] range, d being a
-                delta equal to 10% of the validity period.
-        """
-        now_ms = self._clock.time_msec()
-        expiration_ts = now_ms + self._account_validity.period
-
-        if use_delta:
-            expiration_ts = self.rand.randrange(
-                expiration_ts - self._account_validity.startup_job_max_delta,
-                expiration_ts,
-            )
-
-        self._simple_insert_txn(
-            txn,
-            "account_validity",
-            values={
-                "user_id": user_id,
-                "expiration_ts_ms": expiration_ts,
-                "email_sent": False,
-            },
-        )
-
     def start_profiling(self):
         self._previous_loop_ts = monotonic_time()
 
@@ -394,7 +307,7 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(
+    def new_transaction(
         self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs
     ):
         start = monotonic_time()
@@ -412,16 +325,15 @@ class SQLBaseStore(object):
             i = 0
             N = 5
             while True:
+                cursor = LoggingTransaction(
+                    conn.cursor(),
+                    name,
+                    self.database_engine,
+                    after_callbacks,
+                    exception_callbacks,
+                )
                 try:
-                    txn = conn.cursor()
-                    txn = LoggingTransaction(
-                        txn,
-                        name,
-                        self.database_engine,
-                        after_callbacks,
-                        exception_callbacks,
-                    )
-                    r = func(txn, *args, **kwargs)
+                    r = func(cursor, *args, **kwargs)
                     conn.commit()
                     return r
                 except self.database_engine.module.OperationalError as e:
@@ -459,6 +371,40 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
+                finally:
+                    # we're either about to retry with a new cursor, or we're about to
+                    # release the connection. Once we release the connection, it could
+                    # get used for another query, which might do a conn.rollback().
+                    #
+                    # In the latter case, even though that probably wouldn't affect the
+                    # results of this transaction, python's sqlite will reset all
+                    # statements on the connection [1], which will make our cursor
+                    # invalid [2].
+                    #
+                    # In any case, continuing to read rows after commit()ing seems
+                    # dubious from the PoV of ACID transactional semantics
+                    # (sqlite explicitly says that once you commit, you may see rows
+                    # from subsequent updates.)
+                    #
+                    # In psycopg2, cursors are essentially a client-side fabrication -
+                    # all the data is transferred to the client side when the statement
+                    # finishes executing - so in theory we could go on streaming results
+                    # from the cursor, but attempting to do so would make us
+                    # incompatible with sqlite, so let's make sure we're not doing that
+                    # by closing the cursor.
+                    #
+                    # (*named* cursors in psycopg2 are different and are proper server-
+                    # side things, but (a) we don't use them and (b) they are implicitly
+                    # closed by ending the transaction anyway.)
+                    #
+                    # In short, if we haven't finished with the cursor yet, that's a
+                    # problem waiting to bite us.
+                    #
+                    # TL;DR: we're done with the cursor, so we can close it.
+                    #
+                    # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+                    # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+                    cursor.close()
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
@@ -498,7 +444,7 @@ class SQLBaseStore(object):
 
         try:
             result = yield self.runWithConnection(
-                self._new_transaction,
+                self.new_transaction,
                 desc,
                 after_callbacks,
                 exception_callbacks,
@@ -570,7 +516,7 @@ class SQLBaseStore(object):
         results = list(dict(zip(col_headers, row)) for row in cursor)
         return results
 
-    def _execute(self, desc, decoder, query, *args):
+    def execute(self, desc, decoder, query, *args):
         """Runs a single query for a result set.
 
         Args:
@@ -595,7 +541,7 @@ class SQLBaseStore(object):
     # no complex WHERE clauses, just a dict of values for columns.
 
     @defer.inlineCallbacks
-    def _simple_insert(self, table, values, or_ignore=False, desc="_simple_insert"):
+    def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
@@ -611,7 +557,7 @@ class SQLBaseStore(object):
             `or_ignore` is True
         """
         try:
-            yield self.runInteraction(desc, self._simple_insert_txn, table, values)
+            yield self.runInteraction(desc, self.simple_insert_txn, table, values)
         except self.database_engine.module.IntegrityError:
             # We have to do or_ignore flag at this layer, since we can't reuse
             # a cursor after we receive an error from the db.
@@ -621,7 +567,7 @@ class SQLBaseStore(object):
         return True
 
     @staticmethod
-    def _simple_insert_txn(txn, table, values):
+    def simple_insert_txn(txn, table, values):
         keys, vals = zip(*values.items())
 
         sql = "INSERT INTO %s (%s) VALUES(%s)" % (
@@ -632,11 +578,11 @@ class SQLBaseStore(object):
 
         txn.execute(sql, vals)
 
-    def _simple_insert_many(self, table, values, desc):
-        return self.runInteraction(desc, self._simple_insert_many_txn, table, values)
+    def simple_insert_many(self, table, values, desc):
+        return self.runInteraction(desc, self.simple_insert_many_txn, table, values)
 
     @staticmethod
-    def _simple_insert_many_txn(txn, table, values):
+    def simple_insert_many_txn(txn, table, values):
         if not values:
             return
 
@@ -665,13 +611,13 @@ class SQLBaseStore(object):
         txn.executemany(sql, vals)
 
     @defer.inlineCallbacks
-    def _simple_upsert(
+    def simple_upsert(
         self,
         table,
         keyvalues,
         values,
         insertion_values={},
-        desc="_simple_upsert",
+        desc="simple_upsert",
         lock=True,
     ):
         """
@@ -703,7 +649,7 @@ class SQLBaseStore(object):
             try:
                 result = yield self.runInteraction(
                     desc,
-                    self._simple_upsert_txn,
+                    self.simple_upsert_txn,
                     table,
                     keyvalues,
                     values,
@@ -723,7 +669,7 @@ class SQLBaseStore(object):
                     "IntegrityError when upserting into %s; retrying: %s", table, e
                 )
 
-    def _simple_upsert_txn(
+    def simple_upsert_txn(
         self, txn, table, keyvalues, values, insertion_values={}, lock=True
     ):
         """
@@ -747,11 +693,11 @@ class SQLBaseStore(object):
             self.database_engine.can_native_upsert
             and table not in self._unsafe_to_upsert_tables
         ):
-            return self._simple_upsert_txn_native_upsert(
+            return self.simple_upsert_txn_native_upsert(
                 txn, table, keyvalues, values, insertion_values=insertion_values
             )
         else:
-            return self._simple_upsert_txn_emulated(
+            return self.simple_upsert_txn_emulated(
                 txn,
                 table,
                 keyvalues,
@@ -760,7 +706,7 @@ class SQLBaseStore(object):
                 lock=lock,
             )
 
-    def _simple_upsert_txn_emulated(
+    def simple_upsert_txn_emulated(
         self, txn, table, keyvalues, values, insertion_values={}, lock=True
     ):
         """
@@ -829,7 +775,7 @@ class SQLBaseStore(object):
         # successfully inserted
         return True
 
-    def _simple_upsert_txn_native_upsert(
+    def simple_upsert_txn_native_upsert(
         self, txn, table, keyvalues, values, insertion_values={}
     ):
         """
@@ -854,7 +800,7 @@ class SQLBaseStore(object):
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
+        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
@@ -863,7 +809,7 @@ class SQLBaseStore(object):
         )
         txn.execute(sql, list(allvalues.values()))
 
-    def _simple_upsert_many_txn(
+    def simple_upsert_many_txn(
         self, txn, table, key_names, key_values, value_names, value_values
     ):
         """
@@ -883,15 +829,15 @@ class SQLBaseStore(object):
             self.database_engine.can_native_upsert
             and table not in self._unsafe_to_upsert_tables
         ):
-            return self._simple_upsert_many_txn_native_upsert(
+            return self.simple_upsert_many_txn_native_upsert(
                 txn, table, key_names, key_values, value_names, value_values
             )
         else:
-            return self._simple_upsert_many_txn_emulated(
+            return self.simple_upsert_many_txn_emulated(
                 txn, table, key_names, key_values, value_names, value_values
             )
 
-    def _simple_upsert_many_txn_emulated(
+    def simple_upsert_many_txn_emulated(
         self, txn, table, key_names, key_values, value_names, value_values
     ):
         """
@@ -916,9 +862,9 @@ class SQLBaseStore(object):
             _keys = {x: y for x, y in zip(key_names, keyv)}
             _vals = {x: y for x, y in zip(value_names, valv)}
 
-            self._simple_upsert_txn_emulated(txn, table, _keys, _vals)
+            self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
 
-    def _simple_upsert_many_txn_native_upsert(
+    def simple_upsert_many_txn_native_upsert(
         self, txn, table, key_names, key_values, value_names, value_values
     ):
         """
@@ -963,8 +909,8 @@ class SQLBaseStore(object):
 
         return txn.execute_batch(sql, args)
 
-    def _simple_select_one(
-        self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"
+    def simple_select_one(
+        self, table, keyvalues, retcols, allow_none=False, desc="simple_select_one"
     ):
         """Executes a SELECT query on the named table, which is expected to
         return a single row, returning multiple columns from it.
@@ -978,16 +924,16 @@ class SQLBaseStore(object):
               statement returns no rows
         """
         return self.runInteraction(
-            desc, self._simple_select_one_txn, table, keyvalues, retcols, allow_none
+            desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
         )
 
-    def _simple_select_one_onecol(
+    def simple_select_one_onecol(
         self,
         table,
         keyvalues,
         retcol,
         allow_none=False,
-        desc="_simple_select_one_onecol",
+        desc="simple_select_one_onecol",
     ):
         """Executes a SELECT query on the named table, which is expected to
         return a single row, returning a single column from it.
@@ -999,7 +945,7 @@ class SQLBaseStore(object):
         """
         return self.runInteraction(
             desc,
-            self._simple_select_one_onecol_txn,
+            self.simple_select_one_onecol_txn,
             table,
             keyvalues,
             retcol,
@@ -1007,10 +953,10 @@ class SQLBaseStore(object):
         )
 
     @classmethod
-    def _simple_select_one_onecol_txn(
+    def simple_select_one_onecol_txn(
         cls, txn, table, keyvalues, retcol, allow_none=False
     ):
-        ret = cls._simple_select_onecol_txn(
+        ret = cls.simple_select_onecol_txn(
             txn, table=table, keyvalues=keyvalues, retcol=retcol
         )
 
@@ -1023,7 +969,7 @@ class SQLBaseStore(object):
                 raise StoreError(404, "No row found")
 
     @staticmethod
-    def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+    def simple_select_onecol_txn(txn, table, keyvalues, retcol):
         sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
 
         if keyvalues:
@@ -1034,8 +980,8 @@ class SQLBaseStore(object):
 
         return [r[0] for r in txn]
 
-    def _simple_select_onecol(
-        self, table, keyvalues, retcol, desc="_simple_select_onecol"
+    def simple_select_onecol(
+        self, table, keyvalues, retcol, desc="simple_select_onecol"
     ):
         """Executes a SELECT query on the named table, which returns a list
         comprising of the values of the named column from the selected rows.
@@ -1049,12 +995,10 @@ class SQLBaseStore(object):
             Deferred: Results in a list
         """
         return self.runInteraction(
-            desc, self._simple_select_onecol_txn, table, keyvalues, retcol
+            desc, self.simple_select_onecol_txn, table, keyvalues, retcol
         )
 
-    def _simple_select_list(
-        self, table, keyvalues, retcols, desc="_simple_select_list"
-    ):
+    def simple_select_list(self, table, keyvalues, retcols, desc="simple_select_list"):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1068,11 +1012,11 @@ class SQLBaseStore(object):
             defer.Deferred: resolves to list[dict[str, Any]]
         """
         return self.runInteraction(
-            desc, self._simple_select_list_txn, table, keyvalues, retcols
+            desc, self.simple_select_list_txn, table, keyvalues, retcols
         )
 
     @classmethod
-    def _simple_select_list_txn(cls, txn, table, keyvalues, retcols):
+    def simple_select_list_txn(cls, txn, table, keyvalues, retcols):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1098,14 +1042,14 @@ class SQLBaseStore(object):
         return cls.cursor_to_dict(txn)
 
     @defer.inlineCallbacks
-    def _simple_select_many_batch(
+    def simple_select_many_batch(
         self,
         table,
         column,
         iterable,
         retcols,
         keyvalues={},
-        desc="_simple_select_many_batch",
+        desc="simple_select_many_batch",
         batch_size=100,
     ):
         """Executes a SELECT query on the named table, which may return zero or
@@ -1134,7 +1078,7 @@ class SQLBaseStore(object):
         for chunk in chunks:
             rows = yield self.runInteraction(
                 desc,
-                self._simple_select_many_txn,
+                self.simple_select_many_txn,
                 table,
                 column,
                 chunk,
@@ -1147,7 +1091,7 @@ class SQLBaseStore(object):
         return results
 
     @classmethod
-    def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
+    def simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1180,13 +1124,13 @@ class SQLBaseStore(object):
         txn.execute(sql, values)
         return cls.cursor_to_dict(txn)
 
-    def _simple_update(self, table, keyvalues, updatevalues, desc):
+    def simple_update(self, table, keyvalues, updatevalues, desc):
         return self.runInteraction(
-            desc, self._simple_update_txn, table, keyvalues, updatevalues
+            desc, self.simple_update_txn, table, keyvalues, updatevalues
         )
 
     @staticmethod
-    def _simple_update_txn(txn, table, keyvalues, updatevalues):
+    def simple_update_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
             where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
         else:
@@ -1202,8 +1146,8 @@ class SQLBaseStore(object):
 
         return txn.rowcount
 
-    def _simple_update_one(
-        self, table, keyvalues, updatevalues, desc="_simple_update_one"
+    def simple_update_one(
+        self, table, keyvalues, updatevalues, desc="simple_update_one"
     ):
         """Executes an UPDATE query on the named table, setting new values for
         columns in a row matching the key values.
@@ -1223,12 +1167,12 @@ class SQLBaseStore(object):
         the update column in the 'keyvalues' dict as well.
         """
         return self.runInteraction(
-            desc, self._simple_update_one_txn, table, keyvalues, updatevalues
+            desc, self.simple_update_one_txn, table, keyvalues, updatevalues
         )
 
     @classmethod
-    def _simple_update_one_txn(cls, txn, table, keyvalues, updatevalues):
-        rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues)
+    def simple_update_one_txn(cls, txn, table, keyvalues, updatevalues):
+        rowcount = cls.simple_update_txn(txn, table, keyvalues, updatevalues)
 
         if rowcount == 0:
             raise StoreError(404, "No row found (%s)" % (table,))
@@ -1236,7 +1180,7 @@ class SQLBaseStore(object):
             raise StoreError(500, "More than one row matched (%s)" % (table,))
 
     @staticmethod
-    def _simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False):
+    def simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False):
         select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
@@ -1255,7 +1199,7 @@ class SQLBaseStore(object):
 
         return dict(zip(retcols, row))
 
-    def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
+    def simple_delete_one(self, table, keyvalues, desc="simple_delete_one"):
         """Executes a DELETE query on the named table, expecting to delete a
         single row.
 
@@ -1263,10 +1207,10 @@ class SQLBaseStore(object):
             table : string giving the table name
             keyvalues : dict of column names and values to select the row with
         """
-        return self.runInteraction(desc, self._simple_delete_one_txn, table, keyvalues)
+        return self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
 
     @staticmethod
-    def _simple_delete_one_txn(txn, table, keyvalues):
+    def simple_delete_one_txn(txn, table, keyvalues):
         """Executes a DELETE query on the named table, expecting to delete a
         single row.
 
@@ -1285,11 +1229,11 @@ class SQLBaseStore(object):
         if txn.rowcount > 1:
             raise StoreError(500, "More than one row matched (%s)" % (table,))
 
-    def _simple_delete(self, table, keyvalues, desc):
-        return self.runInteraction(desc, self._simple_delete_txn, table, keyvalues)
+    def simple_delete(self, table, keyvalues, desc):
+        return self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
 
     @staticmethod
-    def _simple_delete_txn(txn, table, keyvalues):
+    def simple_delete_txn(txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
             table,
             " AND ".join("%s = ?" % (k,) for k in keyvalues),
@@ -1298,13 +1242,13 @@ class SQLBaseStore(object):
         txn.execute(sql, list(keyvalues.values()))
         return txn.rowcount
 
-    def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
+    def simple_delete_many(self, table, column, iterable, keyvalues, desc):
         return self.runInteraction(
-            desc, self._simple_delete_many_txn, table, column, iterable, keyvalues
+            desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
         )
 
     @staticmethod
-    def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
+    def simple_delete_many_txn(txn, table, column, iterable, keyvalues):
         """Executes a DELETE query on the named table.
 
         Filters rows by if value of `column` is in `iterable`.
@@ -1337,7 +1281,7 @@ class SQLBaseStore(object):
 
         return txn.rowcount
 
-    def _get_cache_dict(
+    def get_cache_dict(
         self, db_conn, table, entity_column, stream_column, max_value, limit=100000
     ):
         # Fetch a mapping of room_id -> max stream position for "recent" rooms.
@@ -1370,47 +1314,6 @@ class SQLBaseStore(object):
 
         return cache, min_val
 
-    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
-        """Invalidates the cache and adds it to the cache stream so slaves
-        will know to invalidate their caches.
-
-        This should only be used to invalidate caches where slaves won't
-        otherwise know from other replication streams that the cache should
-        be invalidated.
-        """
-        txn.call_after(cache_func.invalidate, keys)
-        self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
-
-    def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
-        """Special case invalidation of caches based on current state.
-
-        We special case this so that we can batch the cache invalidations into a
-        single replication poke.
-
-        Args:
-            txn
-            room_id (str): Room where state changed
-            members_changed (iterable[str]): The user_ids of members that have changed
-        """
-        txn.call_after(self._invalidate_state_caches, room_id, members_changed)
-
-        if members_changed:
-            # We need to be careful that the size of the `members_changed` list
-            # isn't so large that it causes problems sending over replication, so we
-            # send them in chunks.
-            # Max line length is 16K, and max user ID length is 255, so 50 should
-            # be safe.
-            for chunk in batch_iter(members_changed, 50):
-                keys = itertools.chain([room_id], chunk)
-                self._send_invalidation_to_replication(
-                    txn, _CURRENT_STATE_CACHE_NAME, keys
-                )
-        else:
-            # if no members changed, we still need to invalidate the other caches.
-            self._send_invalidation_to_replication(
-                txn, _CURRENT_STATE_CACHE_NAME, [room_id]
-            )
-
     def _invalidate_state_caches(self, room_id, members_changed):
         """Invalidates caches that are based on the current state, but does
         not stream invalidations down replication.
@@ -1444,73 +1347,17 @@ class SQLBaseStore(object):
             # which is fine.
             pass
 
-    def _send_invalidation_to_replication(self, txn, cache_name, keys):
-        """Notifies replication that given cache has been invalidated.
-
-        Note that this does *not* invalidate the cache locally.
-
-        Args:
-            txn
-            cache_name (str)
-            keys (iterable[str])
-        """
-
-        if isinstance(self.database_engine, PostgresEngine):
-            # get_next() returns a context manager which is designed to wrap
-            # the transaction. However, we want to only get an ID when we want
-            # to use it, here, so we need to call __enter__ manually, and have
-            # __exit__ called after the transaction finishes.
-            ctx = self._cache_id_gen.get_next()
-            stream_id = ctx.__enter__()
-            txn.call_on_exception(ctx.__exit__, None, None, None)
-            txn.call_after(ctx.__exit__, None, None, None)
-            txn.call_after(self.hs.get_notifier().on_new_replication_data)
-
-            self._simple_insert_txn(
-                txn,
-                table="cache_invalidation_stream",
-                values={
-                    "stream_id": stream_id,
-                    "cache_func": cache_name,
-                    "keys": list(keys),
-                    "invalidation_ts": self.clock.time_msec(),
-                },
-            )
-
-    def get_all_updated_caches(self, last_id, current_id, limit):
-        if last_id == current_id:
-            return defer.succeed([])
-
-        def get_all_updated_caches_txn(txn):
-            # We purposefully don't bound by the current token, as we want to
-            # send across cache invalidations as quickly as possible. Cache
-            # invalidations are idempotent, so duplicates are fine.
-            sql = (
-                "SELECT stream_id, cache_func, keys, invalidation_ts"
-                " FROM cache_invalidation_stream"
-                " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, limit))
-            return txn.fetchall()
-
-        return self.runInteraction("get_all_updated_caches", get_all_updated_caches_txn)
-
-    def get_cache_stream_token(self):
-        if self._cache_id_gen:
-            return self._cache_id_gen.get_current_token()
-        else:
-            return 0
-
-    def _simple_select_list_paginate(
+    def simple_select_list_paginate(
         self,
         table,
-        keyvalues,
         orderby,
         start,
         limit,
         retcols,
+        filters=None,
+        keyvalues=None,
         order_direction="ASC",
-        desc="_simple_select_list_paginate",
+        desc="simple_select_list_paginate",
     ):
         """
         Executes a SELECT query on the named table with start and limit,
@@ -1519,6 +1366,9 @@ class SQLBaseStore(object):
 
         Args:
             table (str): the table name
+            filters (dict[str, T] | None):
+                column names and values to filter the rows with, or None to not
+                apply a WHERE ? LIKE ? clause.
             keyvalues (dict[str, T] | None):
                 column names and values to select the rows with, or None to not
                 apply a WHERE clause.
@@ -1532,26 +1382,28 @@ class SQLBaseStore(object):
         """
         return self.runInteraction(
             desc,
-            self._simple_select_list_paginate_txn,
+            self.simple_select_list_paginate_txn,
             table,
-            keyvalues,
             orderby,
             start,
             limit,
             retcols,
+            filters=filters,
+            keyvalues=keyvalues,
             order_direction=order_direction,
         )
 
     @classmethod
-    def _simple_select_list_paginate_txn(
+    def simple_select_list_paginate_txn(
         cls,
         txn,
         table,
-        keyvalues,
         orderby,
         start,
         limit,
         retcols,
+        filters=None,
+        keyvalues=None,
         order_direction="ASC",
     ):
         """
@@ -1559,16 +1411,23 @@ class SQLBaseStore(object):
         of row numbers, which may return zero or number of rows from start to limit,
         returning the result as a list of dicts.
 
+        Use `filters` to search attributes using SQL wildcards and/or `keyvalues` to
+        select attributes with exact matches. All constraints are joined together
+        using 'AND'.
+
         Args:
             txn : Transaction object
             table (str): the table name
-            keyvalues (dict[str, T] | None):
-                column names and values to select the rows with, or None to not
-                apply a WHERE clause.
             orderby (str): Column to order the results by.
             start (int): Index to begin the query at.
             limit (int): Number of results to return.
             retcols (iterable[str]): the names of the columns to return
+            filters (dict[str, T] | None):
+                column names and values to filter the rows with, or None to not
+                apply a WHERE ? LIKE ? clause.
+            keyvalues (dict[str, T] | None):
+                column names and values to select the rows with, or None to not
+                apply a WHERE clause.
             order_direction (str): Whether the results should be ordered "ASC" or "DESC".
         Returns:
             defer.Deferred: resolves to list[dict[str, Any]]
@@ -1576,10 +1435,15 @@ class SQLBaseStore(object):
         if order_direction not in ["ASC", "DESC"]:
             raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
 
+        where_clause = "WHERE " if filters or keyvalues else ""
+        arg_list = []
+        if filters:
+            where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
+            arg_list += list(filters.values())
+        where_clause += " AND " if filters and keyvalues else ""
         if keyvalues:
-            where_clause = "WHERE " + " AND ".join("%s = ?" % (k,) for k in keyvalues)
-        else:
-            where_clause = ""
+            where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
+            arg_list += list(keyvalues.values())
 
         sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
             ", ".join(retcols),
@@ -1588,25 +1452,11 @@ class SQLBaseStore(object):
             orderby,
             order_direction,
         )
-        txn.execute(sql, list(keyvalues.values()) + [limit, start])
+        txn.execute(sql, arg_list + [limit, start])
 
         return cls.cursor_to_dict(txn)
 
-    def get_user_count_txn(self, txn):
-        """Get a total number of registered users in the users list.
-
-        Args:
-            txn : Transaction object
-        Returns:
-            int : number of users
-        """
-        sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
-        txn.execute(sql_count)
-        return txn.fetchone()[0]
-
-    def _simple_search_list(
-        self, table, term, col, retcols, desc="_simple_search_list"
-    ):
+    def simple_search_list(self, table, term, col, retcols, desc="simple_search_list"):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1621,11 +1471,11 @@ class SQLBaseStore(object):
         """
 
         return self.runInteraction(
-            desc, self._simple_search_list_txn, table, term, col, retcols
+            desc, self.simple_search_list_txn, table, term, col, retcols
         )
 
     @classmethod
-    def _simple_search_list_txn(cls, txn, table, term, col, retcols):
+    def simple_search_list_txn(cls, txn, table, term, col, retcols):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1648,14 +1498,6 @@ class SQLBaseStore(object):
 
         return cls.cursor_to_dict(txn)
 
-    @property
-    def database_engine_name(self):
-        return self.database_engine.module.__name__
-
-    def get_server_version(self):
-        """Returns a string describing the server version number"""
-        return self.database_engine.server_version
-
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 37d469ffd7..06955a0537 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -139,7 +139,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         # otherwise, check if there are updates to be run. This is important,
         # as we may be running on a worker which doesn't perform the bg updates
         # itself, but still wants to wait for them to happen.
-        updates = yield self._simple_select_onecol(
+        updates = yield self.simple_select_onecol(
             "background_updates",
             keyvalues=None,
             retcol="1",
@@ -161,7 +161,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         if update_name in self._background_update_queue:
             return False
 
-        update_exists = await self._simple_select_one_onecol(
+        update_exists = await self.simple_select_one_onecol(
             "background_updates",
             keyvalues={"update_name": update_name},
             retcol="1",
@@ -184,7 +184,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             no more work to do.
         """
         if not self._background_update_queue:
-            updates = yield self._simple_select_list(
+            updates = yield self.simple_select_list(
                 "background_updates",
                 keyvalues=None,
                 retcols=("update_name", "depends_on"),
@@ -226,7 +226,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         else:
             batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
 
-        progress_json = yield self._simple_select_one_onecol(
+        progress_json = yield self.simple_select_one_onecol(
             "background_updates",
             keyvalues={"update_name": update_name},
             retcol="progress_json",
@@ -413,7 +413,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_queue = []
         progress_json = json.dumps(progress)
 
-        return self._simple_insert(
+        return self.simple_insert(
             "background_updates",
             {"update_name": update_name, "progress_json": progress_json},
         )
@@ -429,7 +429,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_queue = [
             name for name in self._background_update_queue if name != update_name
         ]
-        return self._simple_delete_one(
+        return self.simple_delete_one(
             "background_updates", keyvalues={"update_name": update_name}
         )
 
@@ -444,7 +444,7 @@ class BackgroundUpdateStore(SQLBaseStore):
 
         progress_json = json.dumps(progress)
 
-        self._simple_update_one_txn(
+        self.simple_update_one_txn(
             txn,
             "background_updates",
             keyvalues={"update_name": update_name},
diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py
index 10c940df1e..3720ff3088 100644
--- a/synapse/storage/data_stores/main/__init__.py
+++ b/synapse/storage/data_stores/main/__init__.py
@@ -19,8 +19,6 @@ import calendar
 import logging
 import time
 
-from twisted.internet import defer
-
 from synapse.api.constants import PresenceState
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
@@ -32,6 +30,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from .account_data import AccountDataStore
 from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
+from .cache import CacheInvalidationStore
 from .client_ips import ClientIpStore
 from .deviceinbox import DeviceInboxStore
 from .devices import DeviceStore
@@ -110,6 +109,7 @@ class DataStore(
     MonthlyActiveUsersStore,
     StatsStore,
     RelationsStore,
+    CacheInvalidationStore,
 ):
     def __init__(self, db_conn, hs):
         self.hs = hs
@@ -171,7 +171,7 @@ class DataStore(
 
         self._presence_on_startup = self._get_active_presence(db_conn)
 
-        presence_cache_prefill, min_presence_val = self._get_cache_dict(
+        presence_cache_prefill, min_presence_val = self.get_cache_dict(
             db_conn,
             "presence_stream",
             entity_column="user_id",
@@ -185,7 +185,7 @@ class DataStore(
         )
 
         max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
-        device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
+        device_inbox_prefill, min_device_inbox_id = self.get_cache_dict(
             db_conn,
             "device_inbox",
             entity_column="user_id",
@@ -200,7 +200,7 @@ class DataStore(
         )
         # The federation outbox and the local device inbox uses the same
         # stream_id generator.
-        device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
+        device_outbox_prefill, min_device_outbox_id = self.get_cache_dict(
             db_conn,
             "device_federation_outbox",
             entity_column="destination",
@@ -226,7 +226,7 @@ class DataStore(
         )
 
         events_max = self._stream_id_gen.get_current_token()
-        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
+        curr_state_delta_prefill, min_curr_state_delta_id = self.get_cache_dict(
             db_conn,
             "current_state_delta_stream",
             entity_column="room_id",
@@ -240,7 +240,7 @@ class DataStore(
             prefilled_cache=curr_state_delta_prefill,
         )
 
-        _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
+        _group_updates_prefill, min_group_updates_id = self.get_cache_dict(
             db_conn,
             "local_group_updates",
             entity_column="user_id",
@@ -474,45 +474,68 @@ class DataStore(
         )
 
     def get_users(self):
-        """Function to reterive a list of users in users table.
+        """Function to retrieve a list of users in users table.
 
         Args:
         Returns:
             defer.Deferred: resolves to list[dict[str, Any]]
         """
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="users",
             keyvalues={},
-            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin",
+                "user_type",
+                "deactivated",
+            ],
             desc="get_users",
         )
 
-    @defer.inlineCallbacks
-    def get_users_paginate(self, order, start, limit):
-        """Function to reterive a paginated list of users from
-        users list. This will return a json object, which contains
-        list of users and the total number of users in users table.
+    def get_users_paginate(
+        self, start, limit, name=None, guests=True, deactivated=False
+    ):
+        """Function to retrieve a paginated list of users from
+        users list. This will return a json list of users.
 
         Args:
-            order (str): column name to order the select by this column
             start (int): start number to begin the query from
-            limit (int): number of rows to reterive
+            limit (int): number of rows to retrieve
+            name (string): filter for user names
+            guests (bool): whether to in include guest users
+            deactivated (bool): whether to include deactivated users
         Returns:
-            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
+            defer.Deferred: resolves to list[dict[str, Any]]
         """
-        users = yield self.runInteraction(
-            "get_users_paginate",
-            self._simple_select_list_paginate_txn,
+        name_filter = {}
+        if name:
+            name_filter["name"] = "%" + name + "%"
+
+        attr_filter = {}
+        if not guests:
+            attr_filter["is_guest"] = False
+        if not deactivated:
+            attr_filter["deactivated"] = False
+
+        return self.simple_select_list_paginate(
+            desc="get_users_paginate",
             table="users",
-            keyvalues={"is_guest": False},
-            orderby=order,
+            orderby="name",
             start=start,
             limit=limit,
-            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
+            filters=name_filter,
+            keyvalues=attr_filter,
+            retcols=[
+                "name",
+                "password_hash",
+                "is_guest",
+                "admin",
+                "user_type",
+                "deactivated",
+            ],
         )
-        count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
-        retval = {"users": users, "total": count}
-        return retval
 
     def search_users(self, term):
         """Function to search users list for one or more users with
@@ -524,7 +547,7 @@ class DataStore(
         Returns:
             defer.Deferred: resolves to list[dict[str, Any]]
         """
-        return self._simple_search_list(
+        return self.simple_search_list(
             table="users",
             term=term,
             col="name",
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index 6afbfc0d74..b0d22faf3f 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -67,7 +67,7 @@ class AccountDataWorkerStore(SQLBaseStore):
         """
 
         def get_account_data_for_user_txn(txn):
-            rows = self._simple_select_list_txn(
+            rows = self.simple_select_list_txn(
                 txn,
                 "account_data",
                 {"user_id": user_id},
@@ -78,7 +78,7 @@ class AccountDataWorkerStore(SQLBaseStore):
                 row["account_data_type"]: json.loads(row["content"]) for row in rows
             }
 
-            rows = self._simple_select_list_txn(
+            rows = self.simple_select_list_txn(
                 txn,
                 "room_account_data",
                 {"user_id": user_id},
@@ -102,7 +102,7 @@ class AccountDataWorkerStore(SQLBaseStore):
         Returns:
             Deferred: A dict
         """
-        result = yield self._simple_select_one_onecol(
+        result = yield self.simple_select_one_onecol(
             table="account_data",
             keyvalues={"user_id": user_id, "account_data_type": data_type},
             retcol="content",
@@ -127,7 +127,7 @@ class AccountDataWorkerStore(SQLBaseStore):
         """
 
         def get_account_data_for_room_txn(txn):
-            rows = self._simple_select_list_txn(
+            rows = self.simple_select_list_txn(
                 txn,
                 "room_account_data",
                 {"user_id": user_id, "room_id": room_id},
@@ -156,7 +156,7 @@ class AccountDataWorkerStore(SQLBaseStore):
         """
 
         def get_account_data_for_room_and_type_txn(txn):
-            content_json = self._simple_select_one_onecol_txn(
+            content_json = self.simple_select_one_onecol_txn(
                 txn,
                 table="room_account_data",
                 keyvalues={
@@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
             current_id(int): The position to fetch up to.
         Returns:
             A deferred pair of lists of tuples of stream_id int, user_id string,
-            room_id string, type string, and content string.
+            room_id string, and type string.
         """
         if last_room_id == current_id and last_global_id == current_id:
             return defer.succeed(([], []))
 
         def get_updated_account_data_txn(txn):
             sql = (
-                "SELECT stream_id, user_id, account_data_type, content"
+                "SELECT stream_id, user_id, account_data_type"
                 " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC LIMIT ?"
             )
@@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             global_results = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, room_id, account_data_type, content"
+                "SELECT stream_id, user_id, room_id, account_data_type"
                 " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC LIMIT ?"
             )
@@ -300,9 +300,9 @@ class AccountDataStore(AccountDataWorkerStore):
 
         with self._account_data_id_gen.get_next() as next_id:
             # no need to lock here as room_account_data has a unique constraint
-            # on (user_id, room_id, account_data_type) so _simple_upsert will
+            # on (user_id, room_id, account_data_type) so simple_upsert will
             # retry if there is a conflict.
-            yield self._simple_upsert(
+            yield self.simple_upsert(
                 desc="add_room_account_data",
                 table="room_account_data",
                 keyvalues={
@@ -346,9 +346,9 @@ class AccountDataStore(AccountDataWorkerStore):
 
         with self._account_data_id_gen.get_next() as next_id:
             # no need to lock here as account_data has a unique constraint on
-            # (user_id, account_data_type) so _simple_upsert will retry if
+            # (user_id, account_data_type) so simple_upsert will retry if
             # there is a conflict.
-            yield self._simple_upsert(
+            yield self.simple_upsert(
                 desc="add_user_account_data",
                 table="account_data",
                 keyvalues={"user_id": user_id, "account_data_type": account_data_type},
diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py
index 81babf2029..6b82fd392a 100644
--- a/synapse/storage/data_stores/main/appservice.py
+++ b/synapse/storage/data_stores/main/appservice.py
@@ -133,7 +133,7 @@ class ApplicationServiceTransactionWorkerStore(
             A Deferred which resolves to a list of ApplicationServices, which
             may be empty.
         """
-        results = yield self._simple_select_list(
+        results = yield self.simple_select_list(
             "application_services_state", dict(state=state), ["as_id"]
         )
         # NB: This assumes this class is linked with ApplicationServiceStore
@@ -155,7 +155,7 @@ class ApplicationServiceTransactionWorkerStore(
         Returns:
             A Deferred which resolves to ApplicationServiceState.
         """
-        result = yield self._simple_select_one(
+        result = yield self.simple_select_one(
             "application_services_state",
             dict(as_id=service.id),
             ["state"],
@@ -175,7 +175,7 @@ class ApplicationServiceTransactionWorkerStore(
         Returns:
             A Deferred which resolves when the state was set successfully.
         """
-        return self._simple_upsert(
+        return self.simple_upsert(
             "application_services_state", dict(as_id=service.id), dict(state=state)
         )
 
@@ -249,7 +249,7 @@ class ApplicationServiceTransactionWorkerStore(
                 )
 
             # Set current txn_id for AS to 'txn_id'
-            self._simple_upsert_txn(
+            self.simple_upsert_txn(
                 txn,
                 "application_services_state",
                 dict(as_id=service.id),
@@ -257,7 +257,7 @@ class ApplicationServiceTransactionWorkerStore(
             )
 
             # Delete txn
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id)
             )
 
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
new file mode 100644
index 0000000000..de3256049d
--- /dev/null
+++ b/synapse/storage/data_stores/main/cache.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import itertools
+import logging
+
+from twisted.internet import defer
+
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
+from synapse.util import batch_iter
+
+logger = logging.getLogger(__name__)
+
+
+# This is a special cache name we use to batch multiple invalidations of caches
+# based on the current state when notifying workers over replication.
+CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
+
+
+class CacheInvalidationStore(SQLBaseStore):
+    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        """Invalidates the cache and adds it to the cache stream so slaves
+        will know to invalidate their caches.
+
+        This should only be used to invalidate caches where slaves won't
+        otherwise know from other replication streams that the cache should
+        be invalidated.
+        """
+        txn.call_after(cache_func.invalidate, keys)
+        self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
+
+    def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
+        """Special case invalidation of caches based on current state.
+
+        We special case this so that we can batch the cache invalidations into a
+        single replication poke.
+
+        Args:
+            txn
+            room_id (str): Room where state changed
+            members_changed (iterable[str]): The user_ids of members that have changed
+        """
+        txn.call_after(self._invalidate_state_caches, room_id, members_changed)
+
+        if members_changed:
+            # We need to be careful that the size of the `members_changed` list
+            # isn't so large that it causes problems sending over replication, so we
+            # send them in chunks.
+            # Max line length is 16K, and max user ID length is 255, so 50 should
+            # be safe.
+            for chunk in batch_iter(members_changed, 50):
+                keys = itertools.chain([room_id], chunk)
+                self._send_invalidation_to_replication(
+                    txn, CURRENT_STATE_CACHE_NAME, keys
+                )
+        else:
+            # if no members changed, we still need to invalidate the other caches.
+            self._send_invalidation_to_replication(
+                txn, CURRENT_STATE_CACHE_NAME, [room_id]
+            )
+
+    def _send_invalidation_to_replication(self, txn, cache_name, keys):
+        """Notifies replication that given cache has been invalidated.
+
+        Note that this does *not* invalidate the cache locally.
+
+        Args:
+            txn
+            cache_name (str)
+            keys (iterable[str])
+        """
+
+        if isinstance(self.database_engine, PostgresEngine):
+            # get_next() returns a context manager which is designed to wrap
+            # the transaction. However, we want to only get an ID when we want
+            # to use it, here, so we need to call __enter__ manually, and have
+            # __exit__ called after the transaction finishes.
+            ctx = self._cache_id_gen.get_next()
+            stream_id = ctx.__enter__()
+            txn.call_on_exception(ctx.__exit__, None, None, None)
+            txn.call_after(ctx.__exit__, None, None, None)
+            txn.call_after(self.hs.get_notifier().on_new_replication_data)
+
+            self.simple_insert_txn(
+                txn,
+                table="cache_invalidation_stream",
+                values={
+                    "stream_id": stream_id,
+                    "cache_func": cache_name,
+                    "keys": list(keys),
+                    "invalidation_ts": self.clock.time_msec(),
+                },
+            )
+
+    def get_all_updated_caches(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_updated_caches_txn(txn):
+            # We purposefully don't bound by the current token, as we want to
+            # send across cache invalidations as quickly as possible. Cache
+            # invalidations are idempotent, so duplicates are fine.
+            sql = (
+                "SELECT stream_id, cache_func, keys, invalidation_ts"
+                " FROM cache_invalidation_stream"
+                " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, limit))
+            return txn.fetchall()
+
+        return self.runInteraction("get_all_updated_caches", get_all_updated_caches_txn)
+
+    def get_cache_stream_token(self):
+        if self._cache_id_gen:
+            return self._cache_id_gen.get_current_token()
+        else:
+            return 0
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 706c6a1f3f..66522a04b7 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -21,8 +21,8 @@ from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage import background_updates
-from synapse.storage._base import Cache
 from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import Cache
 
 logger = logging.getLogger(__name__)
 
@@ -431,7 +431,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
             try:
-                self._simple_upsert_txn(
+                self.simple_upsert_txn(
                     txn,
                     table="user_ips",
                     keyvalues={
@@ -450,7 +450,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                 # Technically an access token might not be associated with
                 # a device so we need to check.
                 if device_id:
-                    self._simple_upsert_txn(
+                    self.simple_upsert_txn(
                         txn,
                         table="devices",
                         keyvalues={"user_id": user_id, "device_id": device_id},
@@ -483,7 +483,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
         if device_id is not None:
             keyvalues["device_id"] = device_id
 
-        res = yield self._simple_select_list(
+        res = yield self.simple_select_list(
             table="devices",
             keyvalues=keyvalues,
             retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
@@ -516,7 +516,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                 user_agent, _, last_seen = self._batch_row_update[key]
                 results[(access_token, ip)] = (user_agent, last_seen)
 
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="user_ips",
             keyvalues={"user_id": user_id},
             retcols=["access_token", "ip", "user_agent", "last_seen"],
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index f04aad0743..206d39134d 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -314,7 +314,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             # Check if we've already inserted a matching message_id for that
             # origin. This can happen if the origin doesn't receive our
             # acknowledgement from the first time we received the message.
-            already_inserted = self._simple_select_one_txn(
+            already_inserted = self.simple_select_one_txn(
                 txn,
                 table="device_federation_inbox",
                 keyvalues={"origin": origin, "message_id": message_id},
@@ -326,7 +326,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
 
             # Add an entry for this message_id so that we know we've processed
             # it.
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="device_federation_inbox",
                 values={
@@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
-        sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
-        txn.execute(sql, (stream_id, stream_id))
+        # Compatible method of performing an upsert
+        sql = "SELECT stream_id FROM device_max_stream_id"
+
+        txn.execute(sql)
+        rows = txn.fetchone()
+        if rows:
+            db_stream_id = rows[0]
+            if db_stream_id < stream_id:
+                # Insert the new stream_id
+                sql = "UPDATE device_max_stream_id SET stream_id = ?"
+        else:
+            # No rows, perform an insert
+            sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
+
+        txn.execute(sql, (stream_id,))
 
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
@@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
+                sql = "SELECT device_id FROM devices WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
                 message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 71f62036c0..727c582121 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -30,16 +30,16 @@ from synapse.logging.opentracing import (
     whitelisted_homeserver,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import (
-    Cache,
-    SQLBaseStore,
-    db_to_json,
-    make_in_list_sql_clause,
-)
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.types import get_verify_key_from_cross_signing_key
 from synapse.util import batch_iter
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.descriptors import (
+    Cache,
+    cached,
+    cachedInlineCallbacks,
+    cachedList,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -61,7 +61,7 @@ class DeviceWorkerStore(SQLBaseStore):
         Raises:
             StoreError: if the device is not found
         """
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="devices",
             keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             retcols=("user_id", "device_id", "display_name"),
@@ -80,7 +80,7 @@ class DeviceWorkerStore(SQLBaseStore):
             containing "device_id", "user_id" and "display_name" for each
             device.
         """
-        devices = yield self._simple_select_list(
+        devices = yield self.simple_select_list(
             table="devices",
             keyvalues={"user_id": user_id, "hidden": False},
             retcols=("user_id", "device_id", "display_name"),
@@ -414,7 +414,7 @@ class DeviceWorkerStore(SQLBaseStore):
             from_user_id,
             stream_id,
         )
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             "user_signature_stream",
             values={
@@ -466,7 +466,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=2, tree=True)
     def _get_cached_user_device(self, user_id, device_id):
-        content = yield self._simple_select_one_onecol(
+        content = yield self.simple_select_one_onecol(
             table="device_lists_remote_cache",
             keyvalues={"user_id": user_id, "device_id": device_id},
             retcol="content",
@@ -476,7 +476,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
     @cachedInlineCallbacks()
     def _get_cached_devices_for_user(self, user_id):
-        devices = yield self._simple_select_list(
+        devices = yield self.simple_select_list(
             table="device_lists_remote_cache",
             keyvalues={"user_id": user_id},
             retcols=("device_id", "content"),
@@ -584,7 +584,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 SELECT DISTINCT user_ids FROM user_signature_stream
                 WHERE from_user_id = ? AND stream_id > ?
             """
-            rows = yield self._execute(
+            rows = yield self.execute(
                 "get_users_whose_signatures_changed", None, sql, user_id, from_key
             )
             return set(user for row in rows for user in json.loads(row[0]))
@@ -605,7 +605,7 @@ class DeviceWorkerStore(SQLBaseStore):
             WHERE ? < stream_id AND stream_id <= ?
             GROUP BY user_id, destination
         """
-        return self._execute(
+        return self.execute(
             "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
         )
 
@@ -614,7 +614,7 @@ class DeviceWorkerStore(SQLBaseStore):
         """Get the last stream_id we got for a user. May be None if we haven't
         got any information for them.
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="device_lists_remote_extremeties",
             keyvalues={"user_id": user_id},
             retcol="stream_id",
@@ -628,7 +628,7 @@ class DeviceWorkerStore(SQLBaseStore):
         inlineCallbacks=True,
     )
     def get_device_list_last_stream_id_for_remotes(self, user_ids):
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="device_lists_remote_extremeties",
             column="user_id",
             iterable=user_ids,
@@ -722,7 +722,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             return False
 
         try:
-            inserted = yield self._simple_insert(
+            inserted = yield self.simple_insert(
                 "devices",
                 values={
                     "user_id": user_id,
@@ -736,7 +736,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             if not inserted:
                 # if the device already exists, check if it's a real device, or
                 # if the device ID is reserved by something else
-                hidden = yield self._simple_select_one_onecol(
+                hidden = yield self.simple_select_one_onecol(
                     "devices",
                     keyvalues={"user_id": user_id, "device_id": device_id},
                     retcol="hidden",
@@ -771,7 +771,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         Returns:
             defer.Deferred
         """
-        yield self._simple_delete_one(
+        yield self.simple_delete_one(
             table="devices",
             keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             desc="delete_device",
@@ -789,7 +789,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         Returns:
             defer.Deferred
         """
-        yield self._simple_delete_many(
+        yield self.simple_delete_many(
             table="devices",
             column="device_id",
             iterable=device_ids,
@@ -818,7 +818,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             updates["display_name"] = new_display_name
         if not updates:
             return defer.succeed(None)
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="devices",
             keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
             updatevalues=updates,
@@ -829,7 +829,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
     def mark_remote_user_device_list_as_unsubscribed(self, user_id):
         """Mark that we no longer track device lists for remote user.
         """
-        yield self._simple_delete(
+        yield self.simple_delete(
             table="device_lists_remote_extremeties",
             keyvalues={"user_id": user_id},
             desc="mark_remote_user_device_list_as_unsubscribed",
@@ -866,7 +866,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         self, txn, user_id, device_id, content, stream_id
     ):
         if content.get("deleted"):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="device_lists_remote_cache",
                 keyvalues={"user_id": user_id, "device_id": device_id},
@@ -874,7 +874,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
             txn.call_after(self.device_id_exists_cache.invalidate, (user_id, device_id))
         else:
-            self._simple_upsert_txn(
+            self.simple_upsert_txn(
                 txn,
                 table="device_lists_remote_cache",
                 keyvalues={"user_id": user_id, "device_id": device_id},
@@ -890,7 +890,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
         )
 
-        self._simple_upsert_txn(
+        self.simple_upsert_txn(
             txn,
             table="device_lists_remote_extremeties",
             keyvalues={"user_id": user_id},
@@ -923,11 +923,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         )
 
     def _update_remote_device_list_cache_txn(self, txn, user_id, devices, stream_id):
-        self._simple_delete_txn(
+        self.simple_delete_txn(
             txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id}
         )
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="device_lists_remote_cache",
             values=[
@@ -946,7 +946,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
         )
 
-        self._simple_upsert_txn(
+        self.simple_upsert_txn(
             txn,
             table="device_lists_remote_extremeties",
             keyvalues={"user_id": user_id},
@@ -995,7 +995,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             [(user_id, device_id, stream_id) for device_id in device_ids],
         )
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="device_lists_stream",
             values=[
@@ -1006,7 +1006,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         context = get_active_span_text_map()
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
             values=[
diff --git a/synapse/storage/data_stores/main/directory.py b/synapse/storage/data_stores/main/directory.py
index 297966d9f4..d332f8a409 100644
--- a/synapse/storage/data_stores/main/directory.py
+++ b/synapse/storage/data_stores/main/directory.py
@@ -36,7 +36,7 @@ class DirectoryWorkerStore(SQLBaseStore):
             Deferred: results in namedtuple with keys "room_id" and
             "servers" or None if no association can be found
         """
-        room_id = yield self._simple_select_one_onecol(
+        room_id = yield self.simple_select_one_onecol(
             "room_aliases",
             {"room_alias": room_alias.to_string()},
             "room_id",
@@ -47,7 +47,7 @@ class DirectoryWorkerStore(SQLBaseStore):
         if not room_id:
             return None
 
-        servers = yield self._simple_select_onecol(
+        servers = yield self.simple_select_onecol(
             "room_alias_servers",
             {"room_alias": room_alias.to_string()},
             "server",
@@ -60,7 +60,7 @@ class DirectoryWorkerStore(SQLBaseStore):
         return RoomAliasMapping(room_id, room_alias.to_string(), servers)
 
     def get_room_alias_creator(self, room_alias):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="room_aliases",
             keyvalues={"room_alias": room_alias},
             retcol="creator",
@@ -69,7 +69,7 @@ class DirectoryWorkerStore(SQLBaseStore):
 
     @cached(max_entries=5000)
     def get_aliases_for_room(self, room_id):
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             "room_aliases",
             {"room_id": room_id},
             "room_alias",
@@ -93,7 +93,7 @@ class DirectoryStore(DirectoryWorkerStore):
         """
 
         def alias_txn(txn):
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 "room_aliases",
                 {
@@ -103,7 +103,7 @@ class DirectoryStore(DirectoryWorkerStore):
                 },
             )
 
-            self._simple_insert_many_txn(
+            self.simple_insert_many_txn(
                 txn,
                 table="room_alias_servers",
                 values=[
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 1cbbae5b63..df89eda337 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
 
 class EndToEndRoomKeyStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_e2e_room_key(self, user_id, version, room_id, session_id):
-        """Get the encrypted E2E room key for a given session from a given
-        backup version of room_keys.  We only store the 'best' room key for a given
-        session at a given time, as determined by the handler.
-
-        Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): the ID of the room whose keys we're querying.
-                This is a bit redundant as it's implied by the session_id, but
-                we include for consistency with the rest of the API.
-            session_id(str): the session whose room_key we're querying.
-
-        Returns:
-            A deferred dict giving the session_data and message metadata for
-            this room key.
-        """
-
-        row = yield self._simple_select_one(
-            table="e2e_room_keys",
-            keyvalues={
-                "user_id": user_id,
-                "version": version,
-                "room_id": room_id,
-                "session_id": session_id,
-            },
-            retcols=(
-                "first_message_index",
-                "forwarded_count",
-                "is_verified",
-                "session_data",
-            ),
-            desc="get_e2e_room_key",
-        )
-
-        row["session_data"] = json.loads(row["session_data"])
-
-        return row
-
-    @defer.inlineCallbacks
-    def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Replaces or inserts the encrypted E2E room key for a given session in
-        a given backup
+    def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Replaces the encrypted E2E room key for a given session in a given backup
 
         Args:
             user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             StoreError
         """
 
-        yield self._simple_upsert(
+        yield self.simple_update_one(
             table="e2e_room_keys",
             keyvalues={
                 "user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "room_id": room_id,
                 "session_id": session_id,
             },
-            values={
+            updatevalues={
                 "first_message_index": room_key["first_message_index"],
                 "forwarded_count": room_key["forwarded_count"],
                 "is_verified": room_key["is_verified"],
                 "session_data": json.dumps(room_key["session_data"]),
             },
-            lock=False,
+            desc="update_e2e_room_key",
         )
-        log_kv(
-            {
-                "message": "Set room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "room_key": room_key,
-            }
+
+    @defer.inlineCallbacks
+    def add_e2e_room_keys(self, user_id, version, room_keys):
+        """Bulk add room keys to a given backup.
+
+        Args:
+            user_id (str): the user whose backup we're adding to
+            version (str): the version ID of the backup for the set of keys we're adding to
+            room_keys (iterable[(str, str, dict)]): the keys to add, in the form
+                (roomID, sessionID, keyData)
+        """
+
+        values = []
+        for (room_id, session_id, room_key) in room_keys:
+            values.append(
+                {
+                    "user_id": user_id,
+                    "version": version,
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "first_message_index": room_key["first_message_index"],
+                    "forwarded_count": room_key["forwarded_count"],
+                    "is_verified": room_key["is_verified"],
+                    "session_data": json.dumps(room_key["session_data"]),
+                }
+            )
+            log_kv(
+                {
+                    "message": "Set room key",
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "room_key": room_key,
+                }
+            )
+
+        yield self.simple_insert_many(
+            table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
         )
 
     @trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         room, or a given session.
 
         Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup for the set of keys we're querying
+            room_id (str): Optional. the ID of the room whose keys we're querying, if any.
                 If not specified, we return the keys for all the rooms in the backup.
-            session_id(str): Optional. the session whose room_key we're querying, if any.
+            session_id (str): Optional. the session whose room_key we're querying, if any.
                 If specified, we also require the room_id to be specified.
                 If not specified, we return all the keys in this version of
                 the backup (or for the specified room)
@@ -135,7 +125,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             if session_id:
                 keyvalues["session_id"] = session_id
 
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="e2e_room_keys",
             keyvalues=keyvalues,
             retcols=(
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+        """Get multiple room keys at a time.  The difference between this function and
+        get_e2e_room_keys is that this function can be used to retrieve
+        multiple specific keys at a time, whereas get_e2e_room_keys is used for
+        getting all the keys in a backup version, all the keys for a room, or a
+        specific key.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+            room_keys (dict[str, dict[str, iterable[str]]]): a map from
+                room ID -> {"session": [session ids]} indicating the session IDs
+                that we want to query
+
+        Returns:
+           Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+        """
+
+        return self.runInteraction(
+            "get_e2e_room_keys_multi",
+            self._get_e2e_room_keys_multi_txn,
+            user_id,
+            version,
+            room_keys,
+        )
+
+    @staticmethod
+    def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
+        if not room_keys:
+            return {}
+
+        where_clauses = []
+        params = [user_id, version]
+        for room_id, room in room_keys.items():
+            sessions = list(room["sessions"])
+            if not sessions:
+                continue
+            params.append(room_id)
+            params.extend(sessions)
+            where_clauses.append(
+                "(room_id = ? AND session_id IN (%s))"
+                % (",".join(["?" for _ in sessions]),)
+            )
+
+        # check if we're actually querying something
+        if not where_clauses:
+            return {}
+
+        sql = """
+        SELECT room_id, session_id, first_message_index, forwarded_count,
+               is_verified, session_data
+        FROM e2e_room_keys
+        WHERE user_id = ? AND version = ? AND (%s)
+        """ % (
+            " OR ".join(where_clauses)
+        )
+
+        txn.execute(sql, params)
+
+        ret = {}
+
+        for row in txn:
+            room_id = row[0]
+            session_id = row[1]
+            ret.setdefault(room_id, {})
+            ret[room_id][session_id] = {
+                "first_message_index": row[2],
+                "forwarded_count": row[3],
+                "is_verified": row[4],
+                "session_data": json.loads(row[5]),
+            }
+
+        return ret
+
+    def count_e2e_room_keys(self, user_id, version):
+        """Get the number of keys in a backup version.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+        """
+
+        return self.simple_select_one_onecol(
+            table="e2e_room_keys",
+            keyvalues={"user_id": user_id, "version": version},
+            retcol="COUNT(*)",
+            desc="count_e2e_room_keys",
+        )
+
     @trace
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -188,7 +267,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             if session_id:
                 keyvalues["session_id"] = session_id
 
-        yield self._simple_delete(
+        yield self.simple_delete(
             table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
         )
 
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 version(str)
                 algorithm(str)
                 auth_data(object): opaque dict supplied by the client
+                etag(int): tag of the keys in the backup
         """
 
         def _get_e2e_room_keys_version_info_txn(txn):
@@ -232,14 +312,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                     # it isn't there.
                     raise StoreError(404, "No row found")
 
-            result = self._simple_select_one_txn(
+            result = self.simple_select_one_txn(
                 txn,
                 table="e2e_room_keys_versions",
                 keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
-                retcols=("version", "algorithm", "auth_data"),
+                retcols=("version", "algorithm", "auth_data", "etag"),
             )
             result["auth_data"] = json.loads(result["auth_data"])
             result["version"] = str(result["version"])
+            if result["etag"] is None:
+                result["etag"] = 0
             return result
 
         return self.runInteraction(
@@ -270,7 +352,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
             new_version = str(int(current_version) + 1)
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="e2e_room_keys_versions",
                 values={
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
 
     @trace
-    def update_e2e_room_keys_version(self, user_id, version, info):
+    def update_e2e_room_keys_version(
+        self, user_id, version, info=None, version_etag=None
+    ):
         """Update a given backup version
 
         Args:
             user_id(str): the user whose backup version we're updating
             version(str): the version ID of the backup version we're updating
-            info(dict): the new backup version info to store
+            info (dict): the new backup version info to store.  If None, then
+                the backup version info is not updated
+            version_etag (Optional[int]): etag of the keys in the backup.  If
+                None, then the etag is not updated
         """
+        updatevalues = {}
 
-        return self._simple_update(
-            table="e2e_room_keys_versions",
-            keyvalues={"user_id": user_id, "version": version},
-            updatevalues={"auth_data": json.dumps(info["auth_data"])},
-            desc="update_e2e_room_keys_version",
-        )
+        if info is not None and "auth_data" in info:
+            updatevalues["auth_data"] = json.dumps(info["auth_data"])
+        if version_etag is not None:
+            updatevalues["etag"] = version_etag
+
+        if updatevalues:
+            return self.simple_update(
+                table="e2e_room_keys_versions",
+                keyvalues={"user_id": user_id, "version": version},
+                updatevalues=updatevalues,
+                desc="update_e2e_room_keys_version",
+            )
 
     @trace
     def delete_e2e_room_keys_version(self, user_id, version=None):
@@ -326,13 +420,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             else:
                 this_version = version
 
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="e2e_room_keys",
                 keyvalues={"user_id": user_id, "version": this_version},
             )
 
-            return self._simple_update_one_txn(
+            return self.simple_update_one_txn(
                 txn,
                 table="e2e_room_keys_versions",
                 keyvalues={"user_id": user_id, "version": this_version},
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 073412a78d..08bcdc4725 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -138,20 +138,35 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 result.setdefault(user_id, {})[device_id] = None
 
         # get signatures on the device
-        signature_sql = (
-            "SELECT * " "  FROM e2e_cross_signing_signatures " " WHERE %s"
-        ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
+        signature_sql = ("SELECT *  FROM e2e_cross_signing_signatures WHERE %s") % (
+            " OR ".join("(" + q + ")" for q in signature_query_clauses)
+        )
 
         txn.execute(signature_sql, signature_query_params)
         rows = self.cursor_to_dict(txn)
 
+        # add each cross-signing signature to the correct device in the result dict.
         for row in rows:
+            signing_user_id = row["user_id"]
+            signing_key_id = row["key_id"]
             target_user_id = row["target_user_id"]
             target_device_id = row["target_device_id"]
-            if target_user_id in result and target_device_id in result[target_user_id]:
-                result[target_user_id][target_device_id].setdefault(
-                    "signatures", {}
-                ).setdefault(row["user_id"], {})[row["key_id"]] = row["signature"]
+            signature = row["signature"]
+
+            target_user_result = result.get(target_user_id)
+            if not target_user_result:
+                continue
+
+            target_device_result = target_user_result.get(target_device_id)
+            if not target_device_result:
+                # note that target_device_result will be None for deleted devices.
+                continue
+
+            target_device_signatures = target_device_result.setdefault("signatures", {})
+            signing_user_signatures = target_device_signatures.setdefault(
+                signing_user_id, {}
+            )
+            signing_user_signatures[signing_key_id] = signature
 
         log_kv(result)
         return result
@@ -171,7 +186,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             key_id) to json string for key
         """
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="e2e_one_time_keys_json",
             column="key_id",
             iterable=key_ids,
@@ -204,7 +219,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
             # insert one set.
-            self._simple_insert_many_txn(
+            self.simple_insert_many_txn(
                 txn,
                 table="e2e_one_time_keys_json",
                 values=[
@@ -335,7 +350,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             WHERE ? < stream_id AND stream_id <= ?
             GROUP BY user_id
         """
-        return self._execute(
+        return self.execute(
             "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
         )
 
@@ -352,7 +367,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             set_tag("time_now", time_now)
             set_tag("device_keys", device_keys)
 
-            old_key_json = self._simple_select_one_onecol_txn(
+            old_key_json = self.simple_select_one_onecol_txn(
                 txn,
                 table="e2e_device_keys_json",
                 keyvalues={"user_id": user_id, "device_id": device_id},
@@ -368,7 +383,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 log_kv({"Message": "Device key already stored."})
                 return False
 
-            self._simple_upsert_txn(
+            self.simple_upsert_txn(
                 txn,
                 table="e2e_device_keys_json",
                 keyvalues={"user_id": user_id, "device_id": device_id},
@@ -427,12 +442,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                     "user_id": user_id,
                 }
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",
                 keyvalues={"user_id": user_id, "device_id": device_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="e2e_one_time_keys_json",
                 keyvalues={"user_id": user_id, "device_id": device_id},
@@ -477,7 +492,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         # The "keys" property must only have one entry, which will be the public
         # key, so we just grab the first value in there
         pubkey = next(iter(key["keys"].values()))
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             "devices",
             values={
@@ -490,7 +505,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
 
         # and finally, store the key itself
         with self._cross_signing_id_gen.get_next() as stream_id:
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 "e2e_cross_signing_keys",
                 values={
@@ -524,7 +539,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             user_id (str): the user who made the signatures
             signatures (iterable[SignatureListItem]): signatures to add
         """
-        return self._simple_insert_many(
+        return self.simple_insert_many(
             "e2e_cross_signing_signatures",
             [
                 {
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 90bef0cd2c..051ac7a8cb 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -126,7 +126,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         Returns
             Deferred[int]
         """
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="events",
             column="event_id",
             iterable=event_ids,
@@ -140,7 +140,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             return max(row["depth"] for row in rows)
 
     def _get_oldest_events_in_room_txn(self, txn, room_id):
-        return self._simple_select_onecol_txn(
+        return self.simple_select_onecol_txn(
             txn,
             table="event_backward_extremities",
             keyvalues={"room_id": room_id},
@@ -235,7 +235,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
     @cached(max_entries=5000, iterable=True)
     def get_latest_event_ids_in_room(self, room_id):
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="event_forward_extremities",
             keyvalues={"room_id": room_id},
             retcol="event_id",
@@ -271,7 +271,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         )
 
     def _get_min_depth_interaction(self, txn, room_id):
-        min_depth = self._simple_select_one_onecol_txn(
+        min_depth = self.simple_select_one_onecol_txn(
             txn,
             table="room_depth",
             keyvalues={"room_id": room_id},
@@ -383,7 +383,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         queue = PriorityQueue()
 
         for event_id in event_list:
-            depth = self._simple_select_one_onecol_txn(
+            depth = self.simple_select_one_onecol_txn(
                 txn,
                 table="events",
                 keyvalues={"event_id": event_id, "room_id": room_id},
@@ -468,7 +468,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         Returns:
             Deferred[list[str]]
         """
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="event_edges",
             column="prev_event_id",
             iterable=event_ids,
@@ -508,7 +508,7 @@ class EventFederationStore(EventFederationWorkerStore):
         if min_depth and depth >= min_depth:
             return
 
-        self._simple_upsert_txn(
+        self.simple_upsert_txn(
             txn,
             table="room_depth",
             keyvalues={"room_id": room_id},
@@ -520,7 +520,7 @@ class EventFederationStore(EventFederationWorkerStore):
         For the given event, update the event edges table and forward and
         backward extremities tables.
         """
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_edges",
             values=[
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 04ce21ac66..0a37847cfd 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -441,7 +441,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             )
 
         def _add_push_actions_to_staging_txn(txn):
-            # We don't use _simple_insert_many here to avoid the overhead
+            # We don't use simple_insert_many here to avoid the overhead
             # of generating lists of dicts.
 
             sql = """
@@ -472,7 +472,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         """
 
         try:
-            res = yield self._simple_delete(
+            res = yield self.simple_delete(
                 table="event_push_actions_staging",
                 keyvalues={"event_id": event_id},
                 desc="remove_push_actions_from_staging",
@@ -677,7 +677,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             )
 
         for event, _ in events_and_contexts:
-            user_ids = self._simple_select_onecol_txn(
+            user_ids = self.simple_select_onecol_txn(
                 txn,
                 table="event_push_actions_staging",
                 keyvalues={"event_id": event.event_id},
@@ -844,7 +844,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         the archiving process has caught up or not.
         """
 
-        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+        old_rotate_stream_ordering = self.simple_select_one_onecol_txn(
             txn,
             table="event_push_summary_stream_ordering",
             keyvalues={},
@@ -880,7 +880,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         return caught_up
 
     def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
-        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+        old_rotate_stream_ordering = self.simple_select_one_onecol_txn(
             txn,
             table="event_push_summary_stream_ordering",
             keyvalues={},
@@ -912,7 +912,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         # If the `old.user_id` above is NULL then we know there isn't already an
         # entry in the table, so we simply insert it. Otherwise we update the
         # existing table.
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_push_summary",
             values=[
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 878f7568a6..98ae69e996 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -130,6 +130,8 @@ class EventsStore(
         if self.hs.config.redaction_retention_period is not None:
             hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
 
+        self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+
     @defer.inlineCallbacks
     def _read_forward_extremities(self):
         def fetch(txn):
@@ -430,7 +432,7 @@ class EventsStore(
         # event's auth chain, but its easier for now just to store them (and
         # it doesn't take much storage compared to storing the entire event
         # anyway).
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_auth",
             values=[
@@ -578,12 +580,12 @@ class EventsStore(
         self, txn, new_forward_extremities, max_stream_order
     ):
         for room_id, new_extrem in iteritems(new_forward_extremities):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
             )
             txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_forward_extremities",
             values=[
@@ -596,7 +598,7 @@ class EventsStore(
         # new stream_ordering to new forward extremeties in the room.
         # This allows us to later efficiently look up the forward extremeties
         # for a room before a given stream_ordering
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="stream_ordering_to_exterm",
             values=[
@@ -713,16 +715,14 @@ class EventsStore(
 
                 metadata_json = encode_json(event.internal_metadata.get_dict())
 
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
-                )
+                sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
                 txn.execute(sql, (metadata_json, event.event_id))
 
                 # Add an entry to the ex_outlier_stream table to replicate the
                 # change in outlier status to our workers.
                 stream_order = event.internal_metadata.stream_ordering
                 state_group_id = context.state_group
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="ex_outlier_stream",
                     values={
@@ -732,7 +732,7 @@ class EventsStore(
                     },
                 )
 
-                sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+                sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
                 txn.execute(sql, (False, event.event_id))
 
                 # Update the event_backward_extremities table now that this
@@ -794,7 +794,7 @@ class EventsStore(
             d.pop("redacted_because", None)
             return d
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_json",
             values=[
@@ -811,7 +811,7 @@ class EventsStore(
             ],
         )
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="events",
             values=[
@@ -841,7 +841,7 @@ class EventsStore(
                 # If we're persisting an unredacted event we go and ensure
                 # that we mark any redactions that reference this event as
                 # requiring censoring.
-                self._simple_update_txn(
+                self.simple_update_txn(
                     txn,
                     table="redactions",
                     keyvalues={"redacts": event.event_id},
@@ -929,6 +929,9 @@ class EventsStore(
             elif event.type == EventTypes.Redaction:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
+            elif event.type == EventTypes.Retention:
+                # Update the room_retention table.
+                self._store_retention_policy_for_room_txn(txn, event)
 
             self._handle_event_relations(txn, event)
 
@@ -939,6 +942,12 @@ class EventsStore(
                     txn, event.event_id, labels, event.room_id, event.depth
                 )
 
+            if self._ephemeral_messages_enabled:
+                # If there's an expiry timestamp on the event, store it.
+                expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+                if isinstance(expiry_ts, int) and not event.is_state():
+                    self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)
+
         # Insert into the room_memberships table.
         self._store_room_members_txn(
             txn,
@@ -974,7 +983,7 @@ class EventsStore(
 
             state_values.append(vals)
 
-        self._simple_insert_many_txn(txn, table="state_events", values=state_values)
+        self.simple_insert_many_txn(txn, table="state_events", values=state_values)
 
         # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
@@ -1023,7 +1032,7 @@ class EventsStore(
         # invalidate the cache for the redacted event
         txn.call_after(self._invalidate_get_event_cache, event.redacts)
 
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             table="redactions",
             values={
@@ -1068,9 +1077,7 @@ class EventsStore(
             LIMIT ?
         """
 
-        rows = yield self._execute(
-            "_censor_redactions_fetch", None, sql, before_ts, 100
-        )
+        rows = yield self.execute("_censor_redactions_fetch", None, sql, before_ts, 100)
 
         updates = []
 
@@ -1100,14 +1107,9 @@ class EventsStore(
         def _update_censor_txn(txn):
             for redaction_id, event_id, pruned_json in updates:
                 if pruned_json:
-                    self._simple_update_one_txn(
-                        txn,
-                        table="event_json",
-                        keyvalues={"event_id": event_id},
-                        updatevalues={"json": pruned_json},
-                    )
+                    self._censor_event_txn(txn, event_id, pruned_json)
 
-                self._simple_update_one_txn(
+                self.simple_update_one_txn(
                     txn,
                     table="redactions",
                     keyvalues={"event_id": redaction_id},
@@ -1116,6 +1118,22 @@ class EventsStore(
 
         yield self.runInteraction("_update_censor_txn", _update_censor_txn)
 
+    def _censor_event_txn(self, txn, event_id, pruned_json):
+        """Censor an event by replacing its JSON in the event_json table with the
+        provided pruned JSON.
+
+        Args:
+            txn (LoggingTransaction): The database transaction.
+            event_id (str): The ID of the event to censor.
+            pruned_json (str): The pruned JSON
+        """
+        self.simple_update_one_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event_id},
+            updatevalues={"json": pruned_json},
+        )
+
     @defer.inlineCallbacks
     def count_daily_messages(self):
         """
@@ -1479,7 +1497,7 @@ class EventsStore(
 
         # We do joins against events_to_purge for e.g. calculating state
         # groups to purge, etc., so lets make an index.
-        txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
+        txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
 
         txn.execute("SELECT event_id, should_delete FROM events_to_purge")
         event_rows = txn.fetchall()
@@ -1760,7 +1778,7 @@ class EventsStore(
             "[purge] found %i state groups to delete", len(state_groups_to_delete)
         )
 
-        rows = self._simple_select_many_txn(
+        rows = self.simple_select_many_txn(
             txn,
             table="state_group_edges",
             column="prev_state_group",
@@ -1787,15 +1805,15 @@ class EventsStore(
             curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
             curr_state = curr_state[sg]
 
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="state_groups_state", keyvalues={"state_group": sg}
             )
 
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="state_group_edges", keyvalues={"state_group": sg}
             )
 
-            self._simple_insert_many_txn(
+            self.simple_insert_many_txn(
                 txn,
                 table="state_groups_state",
                 values=[
@@ -1832,7 +1850,7 @@ class EventsStore(
             state group.
         """
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="state_group_edges",
             column="prev_state_group",
             iterable=state_groups,
@@ -1862,7 +1880,7 @@ class EventsStore(
         # first we have to delete the state groups states
         logger.info("[purge] removing %s from state_groups_state", room_id)
 
-        self._simple_delete_many_txn(
+        self.simple_delete_many_txn(
             txn,
             table="state_groups_state",
             column="state_group",
@@ -1873,7 +1891,7 @@ class EventsStore(
         # ... and the state group edges
         logger.info("[purge] removing %s from state_group_edges", room_id)
 
-        self._simple_delete_many_txn(
+        self.simple_delete_many_txn(
             txn,
             table="state_group_edges",
             column="state_group",
@@ -1884,7 +1902,7 @@ class EventsStore(
         # ... and the state groups
         logger.info("[purge] removing %s from state_groups", room_id)
 
-        self._simple_delete_many_txn(
+        self.simple_delete_many_txn(
             txn,
             table="state_groups",
             column="id",
@@ -1901,7 +1919,7 @@ class EventsStore(
 
     @cachedInlineCallbacks(max_entries=5000)
     def _get_event_ordering(self, event_id):
-        res = yield self._simple_select_one(
+        res = yield self.simple_select_one(
             table="events",
             retcols=["topological_ordering", "stream_ordering"],
             keyvalues={"event_id": event_id},
@@ -1942,7 +1960,7 @@ class EventsStore(
             room_id (str): The ID of the room the event was sent to.
             topological_ordering (int): The position of the event in the room's topology.
         """
-        return self._simple_insert_many_txn(
+        return self.simple_insert_many_txn(
             txn=txn,
             table="event_labels",
             values=[
@@ -1956,6 +1974,101 @@ class EventsStore(
             ],
         )
 
+    def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
+        """Save the expiry timestamp associated with a given event ID.
+
+        Args:
+            txn (LoggingTransaction): The database transaction to use.
+            event_id (str): The event ID the expiry timestamp is associated with.
+            expiry_ts (int): The timestamp at which to expire (delete) the event.
+        """
+        return self.simple_insert_txn(
+            txn=txn,
+            table="event_expiry",
+            values={"event_id": event_id, "expiry_ts": expiry_ts},
+        )
+
+    @defer.inlineCallbacks
+    def expire_event(self, event_id):
+        """Retrieve and expire an event that has expired, and delete its associated
+        expiry timestamp. If the event can't be retrieved, delete its associated
+        timestamp so we don't try to expire it again in the future.
+
+        Args:
+             event_id (str): The ID of the event to delete.
+        """
+        # Try to retrieve the event's content from the database or the event cache.
+        event = yield self.get_event(event_id)
+
+        def delete_expired_event_txn(txn):
+            # Delete the expiry timestamp associated with this event from the database.
+            self._delete_event_expiry_txn(txn, event_id)
+
+            if not event:
+                # If we can't find the event, log a warning and delete the expiry date
+                # from the database so that we don't try to expire it again in the
+                # future.
+                logger.warning(
+                    "Can't expire event %s because we don't have it.", event_id
+                )
+                return
+
+            # Prune the event's dict then convert it to JSON.
+            pruned_json = encode_json(prune_event_dict(event.get_dict()))
+
+            # Update the event_json table to replace the event's JSON with the pruned
+            # JSON.
+            self._censor_event_txn(txn, event.event_id, pruned_json)
+
+            # We need to invalidate the event cache entry for this event because we
+            # changed its content in the database. We can't call
+            # self._invalidate_cache_and_stream because self.get_event_cache isn't of the
+            # right type.
+            txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
+            # Send that invalidation to replication so that other workers also invalidate
+            # the event cache.
+            self._send_invalidation_to_replication(
+                txn, "_get_event_cache", (event.event_id,)
+            )
+
+        yield self.runInteraction("delete_expired_event", delete_expired_event_txn)
+
+    def _delete_event_expiry_txn(self, txn, event_id):
+        """Delete the expiry timestamp associated with an event ID without deleting the
+        actual event.
+
+        Args:
+            txn (LoggingTransaction): The transaction to use to perform the deletion.
+            event_id (str): The event ID to delete the associated expiry timestamp of.
+        """
+        return self.simple_delete_txn(
+            txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
+        )
+
+    def get_next_event_to_expire(self):
+        """Retrieve the entry with the lowest expiry timestamp in the event_expiry
+        table, or None if there's no more event to expire.
+
+        Returns: Deferred[Optional[Tuple[str, int]]]
+            A tuple containing the event ID as its first element and an expiry timestamp
+            as its second one, if there's at least one row in the event_expiry table.
+            None otherwise.
+        """
+
+        def get_next_event_to_expire_txn(txn):
+            txn.execute(
+                """
+                SELECT event_id, expiry_ts FROM event_expiry
+                ORDER BY expiry_ts ASC LIMIT 1
+                """
+            )
+
+            return txn.fetchone()
+
+        return self.runInteraction(
+            desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
+        )
+
 
 AllNewEventsResult = namedtuple(
     "AllNewEventsResult",
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 0ed59ef48e..37dfc8c871 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -189,7 +189,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 
             chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
             for chunk in chunks:
-                ev_rows = self._simple_select_many_txn(
+                ev_rows = self.simple_select_many_txn(
                     txn,
                     table="event_json",
                     column="event_id",
@@ -366,7 +366,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 
             to_delete.intersection_update(original_set)
 
-            deleted = self._simple_delete_many_txn(
+            deleted = self.simple_delete_many_txn(
                 txn=txn,
                 table="event_forward_extremities",
                 column="event_id",
@@ -382,7 +382,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 
             if deleted:
                 # We now need to invalidate the caches of these rooms
-                rows = self._simple_select_many_txn(
+                rows = self.simple_select_many_txn(
                     txn,
                     table="events",
                     column="event_id",
@@ -396,7 +396,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                         self.get_latest_event_ids_in_room.invalidate, (room_id,)
                     )
 
-            self._simple_delete_many_txn(
+            self.simple_delete_many_txn(
                 txn=txn,
                 table="_extremities_to_check",
                 column="event_id",
@@ -530,24 +530,31 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             nbrows = 0
             last_row_event_id = ""
             for (event_id, event_json_raw) in results:
-                event_json = json.loads(event_json_raw)
-
-                self._simple_insert_many_txn(
-                    txn=txn,
-                    table="event_labels",
-                    values=[
-                        {
-                            "event_id": event_id,
-                            "label": label,
-                            "room_id": event_json["room_id"],
-                            "topological_ordering": event_json["depth"],
-                        }
-                        for label in event_json["content"].get(
-                            EventContentFields.LABELS, []
-                        )
-                        if isinstance(label, str)
-                    ],
-                )
+                try:
+                    event_json = json.loads(event_json_raw)
+
+                    self.simple_insert_many_txn(
+                        txn=txn,
+                        table="event_labels",
+                        values=[
+                            {
+                                "event_id": event_id,
+                                "label": label,
+                                "room_id": event_json["room_id"],
+                                "topological_ordering": event_json["depth"],
+                            }
+                            for label in event_json["content"].get(
+                                EventContentFields.LABELS, []
+                            )
+                            if isinstance(label, str)
+                        ],
+                    )
+                except Exception as e:
+                    logger.warning(
+                        "Unable to load event %s (no labels will be imported): %s",
+                        event_id,
+                        e,
+                    )
 
                 nbrows += 1
                 last_row_event_id = event_id
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 4c4b76bd93..6a08a746b6 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -17,6 +17,7 @@ from __future__ import division
 
 import itertools
 import logging
+import threading
 from collections import namedtuple
 
 from canonicaljson import json
@@ -34,6 +35,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.types import get_domain_from_id
 from synapse.util import batch_iter
+from synapse.util.caches.descriptors import Cache
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -53,6 +55,17 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
 class EventsWorkerStore(SQLBaseStore):
+    def __init__(self, db_conn, hs):
+        super(EventsWorkerStore, self).__init__(db_conn, hs)
+
+        self._get_event_cache = Cache(
+            "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
+        )
+
+        self._event_fetch_lock = threading.Condition()
+        self._event_fetch_list = []
+        self._event_fetch_ongoing = 0
+
     def get_received_ts(self, event_id):
         """Get received_ts (when it was persisted) for the event.
 
@@ -65,7 +78,7 @@ class EventsWorkerStore(SQLBaseStore):
             Deferred[int|None]: Timestamp in milliseconds, or None for events
             that were persisted before received_ts was implemented.
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="events",
             keyvalues={"event_id": event_id},
             retcol="received_ts",
@@ -439,7 +452,7 @@ class EventsWorkerStore(SQLBaseStore):
                     event_id for events, _ in event_list for event_id in events
                 )
 
-                row_dict = self._new_transaction(
+                row_dict = self.new_transaction(
                     conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
                 )
 
@@ -732,7 +745,7 @@ class EventsWorkerStore(SQLBaseStore):
         """Given a list of event ids, check if we have already processed and
         stored them as non outliers.
         """
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="events",
             retcols=("event_id",),
             column="event_id",
@@ -770,40 +783,6 @@ class EventsWorkerStore(SQLBaseStore):
             yield self.runInteraction("have_seen_events", have_seen_events_txn, chunk)
         return results
 
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_seen_events_with_rejections", f)
-
     def _get_total_state_event_counts_txn(self, txn, room_id):
         """
         See get_total_state_event_counts.
diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py
index a2a2a67927..17ef7b9354 100644
--- a/synapse/storage/data_stores/main/filtering.py
+++ b/synapse/storage/data_stores/main/filtering.py
@@ -30,7 +30,7 @@ class FilteringStore(SQLBaseStore):
         except ValueError:
             raise SynapseError(400, "Invalid filter ID", Codes.INVALID_PARAM)
 
-        def_json = yield self._simple_select_one_onecol(
+        def_json = yield self.simple_select_one_onecol(
             table="user_filters",
             keyvalues={"user_id": user_localpart, "filter_id": filter_id},
             retcol="filter_json",
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
             if filter_id_response is not None:
                 return filter_id_response[0]
 
-            sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
+            sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
             txn.execute(sql, (user_localpart,))
             max_id = txn.fetchone()[0]
             if max_id is None:
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index 5ded539af8..9e1d12bcb7 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -35,7 +35,7 @@ class GroupServerStore(SQLBaseStore):
          * "invite"
          * "open"
         """
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="groups",
             keyvalues={"group_id": group_id},
             updatevalues={"join_policy": join_policy},
@@ -43,7 +43,7 @@ class GroupServerStore(SQLBaseStore):
         )
 
     def get_group(self, group_id):
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="groups",
             keyvalues={"group_id": group_id},
             retcols=(
@@ -65,7 +65,7 @@ class GroupServerStore(SQLBaseStore):
         if not include_private:
             keyvalues["is_public"] = True
 
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="group_users",
             keyvalues=keyvalues,
             retcols=("user_id", "is_public", "is_admin"),
@@ -75,7 +75,7 @@ class GroupServerStore(SQLBaseStore):
     def get_invited_users_in_group(self, group_id):
         # TODO: Pagination
 
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="group_invites",
             keyvalues={"group_id": group_id},
             retcol="user_id",
@@ -89,7 +89,7 @@ class GroupServerStore(SQLBaseStore):
         if not include_private:
             keyvalues["is_public"] = True
 
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="group_rooms",
             keyvalues=keyvalues,
             retcols=("room_id", "is_public"),
@@ -180,7 +180,7 @@ class GroupServerStore(SQLBaseStore):
                 an order of 1 will put the room first. Otherwise, the room gets
                 added to the end.
         """
-        room_in_group = self._simple_select_one_onecol_txn(
+        room_in_group = self.simple_select_one_onecol_txn(
             txn,
             table="group_rooms",
             keyvalues={"group_id": group_id, "room_id": room_id},
@@ -193,7 +193,7 @@ class GroupServerStore(SQLBaseStore):
         if category_id is None:
             category_id = _DEFAULT_CATEGORY_ID
         else:
-            cat_exists = self._simple_select_one_onecol_txn(
+            cat_exists = self.simple_select_one_onecol_txn(
                 txn,
                 table="group_room_categories",
                 keyvalues={"group_id": group_id, "category_id": category_id},
@@ -204,7 +204,7 @@ class GroupServerStore(SQLBaseStore):
                 raise SynapseError(400, "Category doesn't exist")
 
             # TODO: Check category is part of summary already
-            cat_exists = self._simple_select_one_onecol_txn(
+            cat_exists = self.simple_select_one_onecol_txn(
                 txn,
                 table="group_summary_room_categories",
                 keyvalues={"group_id": group_id, "category_id": category_id},
@@ -224,7 +224,7 @@ class GroupServerStore(SQLBaseStore):
                     (group_id, category_id, group_id, category_id),
                 )
 
-        existing = self._simple_select_one_txn(
+        existing = self.simple_select_one_txn(
             txn,
             table="group_summary_rooms",
             keyvalues={
@@ -257,7 +257,7 @@ class GroupServerStore(SQLBaseStore):
                 to_update["room_order"] = order
             if is_public is not None:
                 to_update["is_public"] = is_public
-            self._simple_update_txn(
+            self.simple_update_txn(
                 txn,
                 table="group_summary_rooms",
                 keyvalues={
@@ -271,7 +271,7 @@ class GroupServerStore(SQLBaseStore):
             if is_public is None:
                 is_public = True
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="group_summary_rooms",
                 values={
@@ -287,7 +287,7 @@ class GroupServerStore(SQLBaseStore):
         if category_id is None:
             category_id = _DEFAULT_CATEGORY_ID
 
-        return self._simple_delete(
+        return self.simple_delete(
             table="group_summary_rooms",
             keyvalues={
                 "group_id": group_id,
@@ -299,7 +299,7 @@ class GroupServerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_group_categories(self, group_id):
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="group_room_categories",
             keyvalues={"group_id": group_id},
             retcols=("category_id", "is_public", "profile"),
@@ -316,7 +316,7 @@ class GroupServerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_group_category(self, group_id, category_id):
-        category = yield self._simple_select_one(
+        category = yield self.simple_select_one(
             table="group_room_categories",
             keyvalues={"group_id": group_id, "category_id": category_id},
             retcols=("is_public", "profile"),
@@ -343,7 +343,7 @@ class GroupServerStore(SQLBaseStore):
         else:
             update_values["is_public"] = is_public
 
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="group_room_categories",
             keyvalues={"group_id": group_id, "category_id": category_id},
             values=update_values,
@@ -352,7 +352,7 @@ class GroupServerStore(SQLBaseStore):
         )
 
     def remove_group_category(self, group_id, category_id):
-        return self._simple_delete(
+        return self.simple_delete(
             table="group_room_categories",
             keyvalues={"group_id": group_id, "category_id": category_id},
             desc="remove_group_category",
@@ -360,7 +360,7 @@ class GroupServerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_group_roles(self, group_id):
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="group_roles",
             keyvalues={"group_id": group_id},
             retcols=("role_id", "is_public", "profile"),
@@ -377,7 +377,7 @@ class GroupServerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_group_role(self, group_id, role_id):
-        role = yield self._simple_select_one(
+        role = yield self.simple_select_one(
             table="group_roles",
             keyvalues={"group_id": group_id, "role_id": role_id},
             retcols=("is_public", "profile"),
@@ -404,7 +404,7 @@ class GroupServerStore(SQLBaseStore):
         else:
             update_values["is_public"] = is_public
 
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="group_roles",
             keyvalues={"group_id": group_id, "role_id": role_id},
             values=update_values,
@@ -413,7 +413,7 @@ class GroupServerStore(SQLBaseStore):
         )
 
     def remove_group_role(self, group_id, role_id):
-        return self._simple_delete(
+        return self.simple_delete(
             table="group_roles",
             keyvalues={"group_id": group_id, "role_id": role_id},
             desc="remove_group_role",
@@ -444,7 +444,7 @@ class GroupServerStore(SQLBaseStore):
                 an order of 1 will put the user first. Otherwise, the user gets
                 added to the end.
         """
-        user_in_group = self._simple_select_one_onecol_txn(
+        user_in_group = self.simple_select_one_onecol_txn(
             txn,
             table="group_users",
             keyvalues={"group_id": group_id, "user_id": user_id},
@@ -457,7 +457,7 @@ class GroupServerStore(SQLBaseStore):
         if role_id is None:
             role_id = _DEFAULT_ROLE_ID
         else:
-            role_exists = self._simple_select_one_onecol_txn(
+            role_exists = self.simple_select_one_onecol_txn(
                 txn,
                 table="group_roles",
                 keyvalues={"group_id": group_id, "role_id": role_id},
@@ -468,7 +468,7 @@ class GroupServerStore(SQLBaseStore):
                 raise SynapseError(400, "Role doesn't exist")
 
             # TODO: Check role is part of the summary already
-            role_exists = self._simple_select_one_onecol_txn(
+            role_exists = self.simple_select_one_onecol_txn(
                 txn,
                 table="group_summary_roles",
                 keyvalues={"group_id": group_id, "role_id": role_id},
@@ -488,7 +488,7 @@ class GroupServerStore(SQLBaseStore):
                     (group_id, role_id, group_id, role_id),
                 )
 
-        existing = self._simple_select_one_txn(
+        existing = self.simple_select_one_txn(
             txn,
             table="group_summary_users",
             keyvalues={"group_id": group_id, "user_id": user_id, "role_id": role_id},
@@ -517,7 +517,7 @@ class GroupServerStore(SQLBaseStore):
                 to_update["user_order"] = order
             if is_public is not None:
                 to_update["is_public"] = is_public
-            self._simple_update_txn(
+            self.simple_update_txn(
                 txn,
                 table="group_summary_users",
                 keyvalues={
@@ -531,7 +531,7 @@ class GroupServerStore(SQLBaseStore):
             if is_public is None:
                 is_public = True
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="group_summary_users",
                 values={
@@ -547,7 +547,7 @@ class GroupServerStore(SQLBaseStore):
         if role_id is None:
             role_id = _DEFAULT_ROLE_ID
 
-        return self._simple_delete(
+        return self.simple_delete(
             table="group_summary_users",
             keyvalues={"group_id": group_id, "role_id": role_id, "user_id": user_id},
             desc="remove_user_from_summary",
@@ -561,7 +561,7 @@ class GroupServerStore(SQLBaseStore):
             Deferred[list[str]]: A twisted.Deferred containing a list of group ids
                 containing this room
         """
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="group_rooms",
             keyvalues={"room_id": room_id},
             retcol="group_id",
@@ -630,7 +630,7 @@ class GroupServerStore(SQLBaseStore):
         )
 
     def is_user_in_group(self, user_id, group_id):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="group_users",
             keyvalues={"group_id": group_id, "user_id": user_id},
             retcol="user_id",
@@ -639,7 +639,7 @@ class GroupServerStore(SQLBaseStore):
         ).addCallback(lambda r: bool(r))
 
     def is_user_admin_in_group(self, group_id, user_id):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="group_users",
             keyvalues={"group_id": group_id, "user_id": user_id},
             retcol="is_admin",
@@ -650,7 +650,7 @@ class GroupServerStore(SQLBaseStore):
     def add_group_invite(self, group_id, user_id):
         """Record that the group server has invited a user
         """
-        return self._simple_insert(
+        return self.simple_insert(
             table="group_invites",
             values={"group_id": group_id, "user_id": user_id},
             desc="add_group_invite",
@@ -659,7 +659,7 @@ class GroupServerStore(SQLBaseStore):
     def is_user_invited_to_local_group(self, group_id, user_id):
         """Has the group server invited a user?
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="group_invites",
             keyvalues={"group_id": group_id, "user_id": user_id},
             retcol="user_id",
@@ -682,7 +682,7 @@ class GroupServerStore(SQLBaseStore):
         """
 
         def _get_users_membership_in_group_txn(txn):
-            row = self._simple_select_one_txn(
+            row = self.simple_select_one_txn(
                 txn,
                 table="group_users",
                 keyvalues={"group_id": group_id, "user_id": user_id},
@@ -697,7 +697,7 @@ class GroupServerStore(SQLBaseStore):
                     "is_privileged": row["is_admin"],
                 }
 
-            row = self._simple_select_one_onecol_txn(
+            row = self.simple_select_one_onecol_txn(
                 txn,
                 table="group_invites",
                 keyvalues={"group_id": group_id, "user_id": user_id},
@@ -738,7 +738,7 @@ class GroupServerStore(SQLBaseStore):
         """
 
         def _add_user_to_group_txn(txn):
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="group_users",
                 values={
@@ -749,14 +749,14 @@ class GroupServerStore(SQLBaseStore):
                 },
             )
 
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_invites",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
 
             if local_attestation:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="group_attestations_renewals",
                     values={
@@ -766,7 +766,7 @@ class GroupServerStore(SQLBaseStore):
                     },
                 )
             if remote_attestation:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="group_attestations_remote",
                     values={
@@ -781,27 +781,27 @@ class GroupServerStore(SQLBaseStore):
 
     def remove_user_from_group(self, group_id, user_id):
         def _remove_user_from_group_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_users",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_invites",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_attestations_renewals",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_attestations_remote",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_summary_users",
                 keyvalues={"group_id": group_id, "user_id": user_id},
@@ -812,14 +812,14 @@ class GroupServerStore(SQLBaseStore):
         )
 
     def add_room_to_group(self, group_id, room_id, is_public):
-        return self._simple_insert(
+        return self.simple_insert(
             table="group_rooms",
             values={"group_id": group_id, "room_id": room_id, "is_public": is_public},
             desc="add_room_to_group",
         )
 
     def update_room_in_group_visibility(self, group_id, room_id, is_public):
-        return self._simple_update(
+        return self.simple_update(
             table="group_rooms",
             keyvalues={"group_id": group_id, "room_id": room_id},
             updatevalues={"is_public": is_public},
@@ -828,13 +828,13 @@ class GroupServerStore(SQLBaseStore):
 
     def remove_room_from_group(self, group_id, room_id):
         def _remove_room_from_group_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_rooms",
                 keyvalues={"group_id": group_id, "room_id": room_id},
             )
 
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="group_summary_rooms",
                 keyvalues={"group_id": group_id, "room_id": room_id},
@@ -847,7 +847,7 @@ class GroupServerStore(SQLBaseStore):
     def get_publicised_groups_for_user(self, user_id):
         """Get all groups a user is publicising
         """
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="local_group_membership",
             keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
             retcol="group_id",
@@ -857,7 +857,7 @@ class GroupServerStore(SQLBaseStore):
     def update_group_publicity(self, group_id, user_id, publicise):
         """Update whether the user is publicising their membership of the group
         """
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="local_group_membership",
             keyvalues={"group_id": group_id, "user_id": user_id},
             updatevalues={"is_publicised": publicise},
@@ -893,12 +893,12 @@ class GroupServerStore(SQLBaseStore):
 
         def _register_user_group_membership_txn(txn, next_id):
             # TODO: Upsert?
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="local_group_membership",
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="local_group_membership",
                 values={
@@ -911,7 +911,7 @@ class GroupServerStore(SQLBaseStore):
                 },
             )
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="local_group_updates",
                 values={
@@ -930,7 +930,7 @@ class GroupServerStore(SQLBaseStore):
 
             if membership == "join":
                 if local_attestation:
-                    self._simple_insert_txn(
+                    self.simple_insert_txn(
                         txn,
                         table="group_attestations_renewals",
                         values={
@@ -940,7 +940,7 @@ class GroupServerStore(SQLBaseStore):
                         },
                     )
                 if remote_attestation:
-                    self._simple_insert_txn(
+                    self.simple_insert_txn(
                         txn,
                         table="group_attestations_remote",
                         values={
@@ -951,12 +951,12 @@ class GroupServerStore(SQLBaseStore):
                         },
                     )
             else:
-                self._simple_delete_txn(
+                self.simple_delete_txn(
                     txn,
                     table="group_attestations_renewals",
                     keyvalues={"group_id": group_id, "user_id": user_id},
                 )
-                self._simple_delete_txn(
+                self.simple_delete_txn(
                     txn,
                     table="group_attestations_remote",
                     keyvalues={"group_id": group_id, "user_id": user_id},
@@ -976,7 +976,7 @@ class GroupServerStore(SQLBaseStore):
     def create_group(
         self, group_id, user_id, name, avatar_url, short_description, long_description
     ):
-        yield self._simple_insert(
+        yield self.simple_insert(
             table="groups",
             values={
                 "group_id": group_id,
@@ -991,7 +991,7 @@ class GroupServerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def update_group_profile(self, group_id, profile):
-        yield self._simple_update_one(
+        yield self.simple_update_one(
             table="groups",
             keyvalues={"group_id": group_id},
             updatevalues=profile,
@@ -1017,7 +1017,7 @@ class GroupServerStore(SQLBaseStore):
     def update_attestation_renewal(self, group_id, user_id, attestation):
         """Update an attestation that we have renewed
         """
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="group_attestations_renewals",
             keyvalues={"group_id": group_id, "user_id": user_id},
             updatevalues={"valid_until_ms": attestation["valid_until_ms"]},
@@ -1027,7 +1027,7 @@ class GroupServerStore(SQLBaseStore):
     def update_remote_attestion(self, group_id, user_id, attestation):
         """Update an attestation that a remote has renewed
         """
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="group_attestations_remote",
             keyvalues={"group_id": group_id, "user_id": user_id},
             updatevalues={
@@ -1046,7 +1046,7 @@ class GroupServerStore(SQLBaseStore):
             group_id (str)
             user_id (str)
         """
-        return self._simple_delete(
+        return self.simple_delete(
             table="group_attestations_renewals",
             keyvalues={"group_id": group_id, "user_id": user_id},
             desc="remove_attestation_renewal",
@@ -1057,7 +1057,7 @@ class GroupServerStore(SQLBaseStore):
         """Get the attestation that proves the remote agrees that the user is
         in the group.
         """
-        row = yield self._simple_select_one(
+        row = yield self.simple_select_one(
             table="group_attestations_remote",
             keyvalues={"group_id": group_id, "user_id": user_id},
             retcols=("valid_until_ms", "attestation_json"),
@@ -1072,7 +1072,7 @@ class GroupServerStore(SQLBaseStore):
         return None
 
     def get_joined_groups(self, user_id):
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="local_group_membership",
             keyvalues={"user_id": user_id, "membership": "join"},
             retcol="group_id",
@@ -1188,7 +1188,7 @@ class GroupServerStore(SQLBaseStore):
             ]
 
             for table in tables:
-                self._simple_delete_txn(
+                self.simple_delete_txn(
                     txn, table=table, keyvalues={"group_id": group_id}
                 )
 
diff --git a/synapse/storage/data_stores/main/keys.py b/synapse/storage/data_stores/main/keys.py
index ebc7db3ed6..c7150432b3 100644
--- a/synapse/storage/data_stores/main/keys.py
+++ b/synapse/storage/data_stores/main/keys.py
@@ -129,7 +129,7 @@ class KeyStore(SQLBaseStore):
 
         return self.runInteraction(
             "store_server_verify_keys",
-            self._simple_upsert_many_txn,
+            self.simple_upsert_many_txn,
             table="server_signature_keys",
             key_names=("server_name", "key_id"),
             key_values=key_values,
@@ -157,7 +157,7 @@ class KeyStore(SQLBaseStore):
             ts_valid_until_ms (int): The time when this json stops being valid.
             key_json (bytes): The encoded JSON.
         """
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="server_keys_json",
             keyvalues={
                 "server_name": server_name,
@@ -196,7 +196,7 @@ class KeyStore(SQLBaseStore):
                     keyvalues["key_id"] = key_id
                 if from_server is not None:
                     keyvalues["from_server"] = from_server
-                rows = self._simple_select_list_txn(
+                rows = self.simple_select_list_txn(
                     txn,
                     "server_keys_json",
                     keyvalues=keyvalues,
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 84b5f3ad5e..0cb9446f96 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -39,7 +39,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         Returns:
             None if the media_id doesn't exist.
         """
-        return self._simple_select_one(
+        return self.simple_select_one(
             "local_media_repository",
             {"media_id": media_id},
             (
@@ -64,7 +64,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         user_id,
         url_cache=None,
     ):
-        return self._simple_insert(
+        return self.simple_insert(
             "local_media_repository",
             {
                 "media_id": media_id,
@@ -129,7 +129,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
     def store_url_cache(
         self, url, response_code, etag, expires_ts, og, media_id, download_ts
     ):
-        return self._simple_insert(
+        return self.simple_insert(
             "local_media_repository_url_cache",
             {
                 "url": url,
@@ -144,7 +144,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         )
 
     def get_local_media_thumbnails(self, media_id):
-        return self._simple_select_list(
+        return self.simple_select_list(
             "local_media_repository_thumbnails",
             {"media_id": media_id},
             (
@@ -166,7 +166,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         thumbnail_method,
         thumbnail_length,
     ):
-        return self._simple_insert(
+        return self.simple_insert(
             "local_media_repository_thumbnails",
             {
                 "media_id": media_id,
@@ -180,7 +180,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         )
 
     def get_cached_remote_media(self, origin, media_id):
-        return self._simple_select_one(
+        return self.simple_select_one(
             "remote_media_cache",
             {"media_origin": origin, "media_id": media_id},
             (
@@ -205,7 +205,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         upload_name,
         filesystem_id,
     ):
-        return self._simple_insert(
+        return self.simple_insert(
             "remote_media_cache",
             {
                 "media_origin": origin,
@@ -253,7 +253,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         return self.runInteraction("update_cached_last_access_time", update_cache_txn)
 
     def get_remote_media_thumbnails(self, origin, media_id):
-        return self._simple_select_list(
+        return self.simple_select_list(
             "remote_media_cache_thumbnails",
             {"media_origin": origin, "media_id": media_id},
             (
@@ -278,7 +278,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         thumbnail_method,
         thumbnail_length,
     ):
-        return self._simple_insert(
+        return self.simple_insert(
             "remote_media_cache_thumbnails",
             {
                 "media_origin": origin,
@@ -300,18 +300,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             " WHERE last_access_ts < ?"
         )
 
-        return self._execute(
+        return self.execute(
             "get_remote_media_before", self.cursor_to_dict, sql, before_ts
         )
 
     def delete_remote_media(self, media_origin, media_id):
         def delete_remote_media_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 "remote_media_cache",
                 keyvalues={"media_origin": media_origin, "media_id": media_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 "remote_media_cache_thumbnails",
                 keyvalues={"media_origin": media_origin, "media_id": media_id},
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         if len(media_ids) == 0:
             return
 
-        sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
+        sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             return
 
         def _delete_url_cache_media_txn(txn):
-            sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
-            sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index b41c3d317a..b8fc28f97b 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -32,7 +32,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         self._clock = hs.get_clock()
         self.hs = hs
         # Do not add more reserved users than the total allowable number
-        self._new_transaction(
+        self.new_transaction(
             dbconn,
             "initialise_mau_threepids",
             [],
@@ -261,7 +261,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         # never be a big table and alternative approaches (batching multiple
         # upserts into a single txn) introduced a lot of extra complexity.
         # See https://github.com/matrix-org/synapse/issues/3854 for more
-        is_insert = self._simple_upsert_txn(
+        is_insert = self.simple_upsert_txn(
             txn,
             table="monthly_active_users",
             keyvalues={"user_id": user_id},
@@ -281,7 +281,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
         """
 
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="monthly_active_users",
             keyvalues={"user_id": user_id},
             retcol="timestamp",
diff --git a/synapse/storage/data_stores/main/openid.py b/synapse/storage/data_stores/main/openid.py
index 79b40044d9..650e49750e 100644
--- a/synapse/storage/data_stores/main/openid.py
+++ b/synapse/storage/data_stores/main/openid.py
@@ -3,7 +3,7 @@ from synapse.storage._base import SQLBaseStore
 
 class OpenIdStore(SQLBaseStore):
     def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
-        return self._simple_insert(
+        return self.simple_insert(
             table="open_id_tokens",
             values={
                 "token": token,
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index 523ed6575e..a5e121efd1 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -46,7 +46,7 @@ class PresenceStore(SQLBaseStore):
             txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,))
 
         # Actually insert new rows
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="presence_stream",
             values=[
@@ -103,7 +103,7 @@ class PresenceStore(SQLBaseStore):
         inlineCallbacks=True,
     )
     def get_presence_for_users(self, user_ids):
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="presence_stream",
             column="user_id",
             iterable=user_ids,
@@ -129,7 +129,7 @@ class PresenceStore(SQLBaseStore):
         return self._presence_id_gen.get_current_token()
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
-        return self._simple_insert(
+        return self.simple_insert(
             table="presence_allow_inbound",
             values={
                 "observed_user_id": observed_localpart,
@@ -140,7 +140,7 @@ class PresenceStore(SQLBaseStore):
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
-        return self._simple_delete_one(
+        return self.simple_delete_one(
             table="presence_allow_inbound",
             keyvalues={
                 "observed_user_id": observed_localpart,
diff --git a/synapse/storage/data_stores/main/profile.py b/synapse/storage/data_stores/main/profile.py
index e4e8a1c1d6..c8b5b60301 100644
--- a/synapse/storage/data_stores/main/profile.py
+++ b/synapse/storage/data_stores/main/profile.py
@@ -24,7 +24,7 @@ class ProfileWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_profileinfo(self, user_localpart):
         try:
-            profile = yield self._simple_select_one(
+            profile = yield self.simple_select_one(
                 table="profiles",
                 keyvalues={"user_id": user_localpart},
                 retcols=("displayname", "avatar_url"),
@@ -42,7 +42,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     def get_profile_displayname(self, user_localpart):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="displayname",
@@ -50,7 +50,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     def get_profile_avatar_url(self, user_localpart):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             retcol="avatar_url",
@@ -58,7 +58,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     def get_from_remote_profile_cache(self, user_id):
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="remote_profile_cache",
             keyvalues={"user_id": user_id},
             retcols=("displayname", "avatar_url"),
@@ -67,12 +67,12 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     def create_profile(self, user_localpart):
-        return self._simple_insert(
+        return self.simple_insert(
             table="profiles", values={"user_id": user_localpart}, desc="create_profile"
         )
 
     def set_profile_displayname(self, user_localpart, new_displayname):
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             updatevalues={"displayname": new_displayname},
@@ -80,7 +80,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     def set_profile_avatar_url(self, user_localpart, new_avatar_url):
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="profiles",
             keyvalues={"user_id": user_localpart},
             updatevalues={"avatar_url": new_avatar_url},
@@ -95,7 +95,7 @@ class ProfileStore(ProfileWorkerStore):
         This should only be called when `is_subscribed_remote_profile_for_user`
         would return true for the user.
         """
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="remote_profile_cache",
             keyvalues={"user_id": user_id},
             values={
@@ -107,7 +107,7 @@ class ProfileStore(ProfileWorkerStore):
         )
 
     def update_remote_profile_cache(self, user_id, displayname, avatar_url):
-        return self._simple_update(
+        return self.simple_update(
             table="remote_profile_cache",
             keyvalues={"user_id": user_id},
             values={
@@ -125,7 +125,7 @@ class ProfileStore(ProfileWorkerStore):
         """
         subscribed = yield self.is_subscribed_remote_profile_for_user(user_id)
         if not subscribed:
-            yield self._simple_delete(
+            yield self.simple_delete(
                 table="remote_profile_cache",
                 keyvalues={"user_id": user_id},
                 desc="delete_remote_profile_cache",
@@ -155,7 +155,7 @@ class ProfileStore(ProfileWorkerStore):
     def is_subscribed_remote_profile_for_user(self, user_id):
         """Check whether we are interested in a remote user's profile.
         """
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="group_users",
             keyvalues={"user_id": user_id},
             retcol="user_id",
@@ -166,7 +166,7 @@ class ProfileStore(ProfileWorkerStore):
         if res:
             return True
 
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="group_invites",
             keyvalues={"user_id": user_id},
             retcol="user_id",
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index b520062d84..75bd499bcd 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -75,7 +75,7 @@ class PushRulesWorkerStore(
     def __init__(self, db_conn, hs):
         super(PushRulesWorkerStore, self).__init__(db_conn, hs)
 
-        push_rules_prefill, push_rules_id = self._get_cache_dict(
+        push_rules_prefill, push_rules_id = self.get_cache_dict(
             db_conn,
             "push_rules_stream",
             entity_column="user_id",
@@ -100,7 +100,7 @@ class PushRulesWorkerStore(
 
     @cachedInlineCallbacks(max_entries=5000)
     def get_push_rules_for_user(self, user_id):
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="push_rules",
             keyvalues={"user_name": user_id},
             retcols=(
@@ -124,7 +124,7 @@ class PushRulesWorkerStore(
 
     @cachedInlineCallbacks(max_entries=5000)
     def get_push_rules_enabled_for_user(self, user_id):
-        results = yield self._simple_select_list(
+        results = yield self.simple_select_list(
             table="push_rules_enable",
             keyvalues={"user_name": user_id},
             retcols=("user_name", "rule_id", "enabled"),
@@ -162,7 +162,7 @@ class PushRulesWorkerStore(
 
         results = {user_id: [] for user_id in user_ids}
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="push_rules",
             column="user_name",
             iterable=user_ids,
@@ -320,7 +320,7 @@ class PushRulesWorkerStore(
 
         results = {user_id: {} for user_id in user_ids}
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="push_rules_enable",
             column="user_name",
             iterable=user_ids,
@@ -395,7 +395,7 @@ class PushRuleStore(PushRulesWorkerStore):
 
         relative_to_rule = before or after
 
-        res = self._simple_select_one_txn(
+        res = self.simple_select_one_txn(
             txn,
             table="push_rules",
             keyvalues={"user_name": user_id, "rule_id": relative_to_rule},
@@ -499,7 +499,7 @@ class PushRuleStore(PushRulesWorkerStore):
         actions_json,
         update_stream=True,
     ):
-        """Specialised version of _simple_upsert_txn that picks a push_rule_id
+        """Specialised version of simple_upsert_txn that picks a push_rule_id
         using the _push_rule_id_gen if it needs to insert the rule. It assumes
         that the "push_rules" table is locked"""
 
@@ -518,7 +518,7 @@ class PushRuleStore(PushRulesWorkerStore):
             # We didn't update a row with the given rule_id so insert one
             push_rule_id = self._push_rule_id_gen.get_next()
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="push_rules",
                 values={
@@ -561,7 +561,7 @@ class PushRuleStore(PushRulesWorkerStore):
         """
 
         def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
-            self._simple_delete_one_txn(
+            self.simple_delete_one_txn(
                 txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
             )
 
@@ -596,7 +596,7 @@ class PushRuleStore(PushRulesWorkerStore):
         self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled
     ):
         new_id = self._push_rules_enable_id_gen.get_next()
-        self._simple_upsert_txn(
+        self.simple_upsert_txn(
             txn,
             "push_rules_enable",
             {"user_name": user_id, "rule_id": rule_id},
@@ -636,7 +636,7 @@ class PushRuleStore(PushRulesWorkerStore):
                     update_stream=False,
                 )
             else:
-                self._simple_update_one_txn(
+                self.simple_update_one_txn(
                     txn,
                     "push_rules",
                     {"user_name": user_id, "rule_id": rule_id},
@@ -675,7 +675,7 @@ class PushRuleStore(PushRulesWorkerStore):
         if data is not None:
             values.update(data)
 
-        self._simple_insert_txn(txn, "push_rules_stream", values=values)
+        self.simple_insert_txn(txn, "push_rules_stream", values=values)
 
         txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,))
         txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,))
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index d76861cdc0..d5a169872b 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -59,7 +59,7 @@ class PusherWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def user_has_pusher(self, user_id):
-        ret = yield self._simple_select_one_onecol(
+        ret = yield self.simple_select_one_onecol(
             "pushers", {"user_name": user_id}, "id", allow_none=True
         )
         return ret is not None
@@ -72,7 +72,7 @@ class PusherWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_pushers_by(self, keyvalues):
-        ret = yield self._simple_select_list(
+        ret = yield self.simple_select_list(
             "pushers",
             keyvalues,
             [
@@ -193,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore):
         inlineCallbacks=True,
     )
     def get_if_users_have_pushers(self, user_ids):
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="pushers",
             column="user_name",
             iterable=user_ids,
@@ -229,8 +229,8 @@ class PusherStore(PusherWorkerStore):
     ):
         with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
-            # (app_id, pushkey, user_name) so _simple_upsert will retry
-            yield self._simple_upsert(
+            # (app_id, pushkey, user_name) so simple_upsert will retry
+            yield self.simple_upsert(
                 table="pushers",
                 keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
                 values={
@@ -269,7 +269,7 @@ class PusherStore(PusherWorkerStore):
                 txn, self.get_if_user_has_pusher, (user_id,)
             )
 
-            self._simple_delete_one_txn(
+            self.simple_delete_one_txn(
                 txn,
                 "pushers",
                 {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
@@ -278,7 +278,7 @@ class PusherStore(PusherWorkerStore):
             # it's possible for us to end up with duplicate rows for
             # (app_id, pushkey, user_id) at different stream_ids, but that
             # doesn't really matter.
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="deleted_pushers",
                 values={
@@ -296,7 +296,7 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_last_stream_ordering(
         self, app_id, pushkey, user_id, last_stream_ordering
     ):
-        yield self._simple_update_one(
+        yield self.simple_update_one(
             "pushers",
             {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             {"last_stream_ordering": last_stream_ordering},
@@ -319,7 +319,7 @@ class PusherStore(PusherWorkerStore):
         Returns:
             Deferred[bool]: True if the pusher still exists; False if it has been deleted.
         """
-        updated = yield self._simple_update(
+        updated = yield self.simple_update(
             table="pushers",
             keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             updatevalues={
@@ -333,7 +333,7 @@ class PusherStore(PusherWorkerStore):
 
     @defer.inlineCallbacks
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
-        yield self._simple_update(
+        yield self.simple_update(
             table="pushers",
             keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             updatevalues={"failing_since": failing_since},
@@ -342,7 +342,7 @@ class PusherStore(PusherWorkerStore):
 
     @defer.inlineCallbacks
     def get_throttle_params_by_room(self, pusher_id):
-        res = yield self._simple_select_list(
+        res = yield self.simple_select_list(
             "pusher_throttle",
             {"pusher": pusher_id},
             ["room_id", "last_sent_ts", "throttle_ms"],
@@ -361,8 +361,8 @@ class PusherStore(PusherWorkerStore):
     @defer.inlineCallbacks
     def set_throttle_params(self, pusher_id, room_id, params):
         # no need to lock because `pusher_throttle` has a primary key on
-        # (pusher, room_id) so _simple_upsert will retry
-        yield self._simple_upsert(
+        # (pusher, room_id) so simple_upsert will retry
+        yield self.simple_upsert(
             "pusher_throttle",
             {"pusher": pusher_id, "room_id": room_id},
             params,
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..380f388e30 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -61,7 +61,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="receipts_linearized",
             keyvalues={"room_id": room_id, "receipt_type": receipt_type},
             retcols=("user_id", "event_id"),
@@ -70,7 +70,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
     @cached(num_args=3)
     def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="receipts_linearized",
             keyvalues={
                 "room_id": room_id,
@@ -84,7 +84,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=2)
     def get_receipts_for_user(self, user_id, receipt_type):
-        rows = yield self._simple_select_list(
+        rows = yield self.simple_select_list(
             table="receipts_linearized",
             keyvalues={"user_id": user_id, "receipt_type": receipt_type},
             retcols=("room_id", "event_id"),
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 args.append(limit)
             txn.execute(sql, args)
 
-            return (r[0:5] + (json.loads(r[5]),) for r in txn)
+            return list(r[0:5] + (json.loads(r[5]),) for r in txn)
 
         return self.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
@@ -335,7 +335,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             otherwise, the rx timestamp of the event that the RR corresponds to
                 (or 0 if the event is unknown)
         """
-        res = self._simple_select_one_txn(
+        res = self.simple_select_one_txn(
             txn,
             table="events",
             retcols=["stream_ordering", "received_ts"],
@@ -388,7 +388,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type),
         )
 
-        self._simple_delete_txn(
+        self.simple_delete_txn(
             txn,
             table="receipts_linearized",
             keyvalues={
@@ -398,7 +398,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             },
         )
 
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             table="receipts_linearized",
             values={
@@ -514,7 +514,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
         )
 
-        self._simple_delete_txn(
+        self.simple_delete_txn(
             txn,
             table="receipts_graph",
             keyvalues={
@@ -523,7 +523,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 "user_id": user_id,
             },
         )
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             table="receipts_graph",
             values={
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index ee1b2b2bbf..debc6706f5 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,7 +19,6 @@ import logging
 import re
 
 from six import iterkeys
-from six.moves import range
 
 from twisted.internet import defer
 from twisted.internet.defer import Deferred
@@ -46,7 +45,7 @@ class RegistrationWorkerStore(SQLBaseStore):
 
     @cached()
     def get_user_by_id(self, user_id):
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="users",
             keyvalues={"name": user_id},
             retcols=[
@@ -110,7 +109,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                 otherwise int representation of the timestamp (as a number of
                 milliseconds since epoch).
         """
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="account_validity",
             keyvalues={"user_id": user_id},
             retcol="expiration_ts_ms",
@@ -138,7 +137,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def set_account_validity_for_user_txn(txn):
-            self._simple_update_txn(
+            self.simple_update_txn(
                 txn=txn,
                 table="account_validity",
                 keyvalues={"user_id": user_id},
@@ -168,7 +167,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Raises:
             StoreError: The provided token is already set for another user.
         """
-        yield self._simple_update_one(
+        yield self.simple_update_one(
             table="account_validity",
             keyvalues={"user_id": user_id},
             updatevalues={"renewal_token": renewal_token},
@@ -185,7 +184,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             defer.Deferred[str]: The ID of the user to which the token belongs.
         """
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="account_validity",
             keyvalues={"renewal_token": renewal_token},
             retcol="user_id",
@@ -204,7 +203,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             defer.Deferred[str]: The renewal token associated with this user ID.
         """
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="account_validity",
             keyvalues={"user_id": user_id},
             retcol="renewal_token",
@@ -251,7 +250,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             email_sent (bool): Flag which indicates whether a renewal email has been sent
                 to this user.
         """
-        yield self._simple_update_one(
+        yield self.simple_update_one(
             table="account_validity",
             keyvalues={"user_id": user_id},
             updatevalues={"email_sent": email_sent},
@@ -266,7 +265,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Args:
             user_id (str): ID of the user to remove from the account validity table.
         """
-        yield self._simple_delete_one(
+        yield self.simple_delete_one(
             table="account_validity",
             keyvalues={"user_id": user_id},
             desc="delete_account_validity_for_user",
@@ -282,7 +281,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns (bool):
             true iff the user is a server admin, false otherwise.
         """
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="users",
             keyvalues={"name": user.to_string()},
             retcol="admin",
@@ -300,7 +299,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             admin (bool): true iff the user is to be a server admin,
                 false otherwise.
         """
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="users",
             keyvalues={"name": user.to_string()},
             updatevalues={"admin": 1 if admin else 0},
@@ -352,7 +351,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         return res
 
     def is_real_user_txn(self, txn, user_id):
-        res = self._simple_select_one_onecol_txn(
+        res = self.simple_select_one_onecol_txn(
             txn=txn,
             table="users",
             keyvalues={"name": user_id},
@@ -362,7 +361,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         return res is None
 
     def is_support_user_txn(self, txn, user_id):
-        res = self._simple_select_one_onecol_txn(
+        res = self.simple_select_one_onecol_txn(
             txn=txn,
             table="users",
             keyvalues={"name": user_id},
@@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def f(txn):
-            sql = (
-                "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
-            )
+            sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
             txn.execute(sql, (user_id,))
             return dict(txn)
 
@@ -397,7 +394,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             str|None: the mxid of the user, or None if they are not known
         """
-        return await self._simple_select_one_onecol(
+        return await self.simple_select_one_onecol(
             table="user_external_ids",
             keyvalues={"auth_provider": auth_provider, "external_id": external_id},
             retcol="user_id",
@@ -484,12 +481,8 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
         Gets the localpart of the next generated user ID.
 
-        Generated user IDs are integers, and we aim for them to be as small as
-        we can. Unfortunately, it's possible some of them are already taken by
-        existing users, and there may be gaps in the already taken range. This
-        function returns the start of the first allocatable gap. This is to
-        avoid the case of ID 1000 being pre-allocated and starting at 1001 while
-        0-999 are available.
+        Generated user IDs are integers, so we find the largest integer user ID
+        already taken and return that plus one.
         """
 
         def _find_next_generated_user_id(txn):
@@ -499,15 +492,14 @@ class RegistrationWorkerStore(SQLBaseStore):
 
             regex = re.compile(r"^@(\d+):")
 
-            found = set()
+            max_found = 0
 
             for (user_id,) in txn:
                 match = regex.search(user_id)
                 if match:
-                    found.add(int(match.group(1)))
-            for i in range(len(found) + 1):
-                if i not in found:
-                    return i
+                    max_found = max(int(match.group(1)), max_found)
+
+            return max_found + 1
 
         return (
             (
@@ -544,7 +536,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             str|None: user id or None if no user id/threepid mapping exists
         """
-        ret = self._simple_select_one_txn(
+        ret = self.simple_select_one_txn(
             txn,
             "user_threepids",
             {"medium": medium, "address": address},
@@ -557,7 +549,7 @@ class RegistrationWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
-        yield self._simple_upsert(
+        yield self.simple_upsert(
             "user_threepids",
             {"medium": medium, "address": address},
             {"user_id": user_id, "validated_at": validated_at, "added_at": added_at},
@@ -565,7 +557,7 @@ class RegistrationWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def user_get_threepids(self, user_id):
-        ret = yield self._simple_select_list(
+        ret = yield self.simple_select_list(
             "user_threepids",
             {"user_id": user_id},
             ["medium", "address", "validated_at", "added_at"],
@@ -574,9 +566,22 @@ class RegistrationWorkerStore(SQLBaseStore):
         return ret
 
     def user_delete_threepid(self, user_id, medium, address):
-        return self._simple_delete(
+        return self.simple_delete(
             "user_threepids",
             keyvalues={"user_id": user_id, "medium": medium, "address": address},
+            desc="user_delete_threepid",
+        )
+
+    def user_delete_threepids(self, user_id: str):
+        """Delete all threepid this user has bound
+
+        Args:
+             user_id: The user id to delete all threepids of
+
+        """
+        return self.simple_delete(
+            "user_threepids",
+            keyvalues={"user_id": user_id},
             desc="user_delete_threepids",
         )
 
@@ -596,7 +601,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
         # We need to use an upsert, in case they user had already bound the
         # threepid
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="user_threepid_id_server",
             keyvalues={
                 "user_id": user_id,
@@ -622,7 +627,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                 medium (str): The medium of the threepid (e.g "email")
                 address (str): The address of the threepid (e.g "bob@example.com")
         """
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="user_threepid_id_server",
             keyvalues={"user_id": user_id},
             retcols=["medium", "address"],
@@ -643,7 +648,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             Deferred
         """
-        return self._simple_delete(
+        return self.simple_delete(
             table="user_threepid_id_server",
             keyvalues={
                 "user_id": user_id,
@@ -666,7 +671,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             Deferred[list[str]]: Resolves to a list of identity servers
         """
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="user_threepid_id_server",
             keyvalues={"user_id": user_id, "medium": medium, "address": address},
             retcol="id_server",
@@ -684,7 +689,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             defer.Deferred(bool): The requested value.
         """
 
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="users",
             keyvalues={"name": user_id},
             retcol="deactivated",
@@ -771,12 +776,12 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def delete_threepid_session_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="threepid_validation_token",
                 keyvalues={"session_id": session_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
@@ -921,6 +926,14 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         self._account_validity = hs.config.account_validity
 
+        if self._account_validity.enabled:
+            self._clock.call_later(
+                0.0,
+                run_as_background_process,
+                "account_validity_set_expiration_dates",
+                self._set_expiration_date_when_missing,
+            )
+
         # Create a background job for culling expired 3PID validity tokens
         def start_cull():
             # run as a background process to make sure that the database transactions
@@ -948,7 +961,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         """
         next_id = self._access_tokens_id_gen.get_next()
 
-        yield self._simple_insert(
+        yield self.simple_insert(
             "access_tokens",
             {
                 "id": next_id,
@@ -1024,7 +1037,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                 # Ensure that the guest user actually exists
                 # ``allow_none=False`` makes this raise an exception
                 # if the row isn't in the database.
-                self._simple_select_one_txn(
+                self.simple_select_one_txn(
                     txn,
                     "users",
                     keyvalues={"name": user_id, "is_guest": 1},
@@ -1032,7 +1045,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                     allow_none=False,
                 )
 
-                self._simple_update_one_txn(
+                self.simple_update_one_txn(
                     txn,
                     "users",
                     keyvalues={"name": user_id, "is_guest": 1},
@@ -1046,7 +1059,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                     },
                 )
             else:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     "users",
                     values={
@@ -1101,7 +1114,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             external_id: id on that system
             user_id: complete mxid that it is mapped to
         """
-        return self._simple_insert(
+        return self.simple_insert(
             table="user_external_ids",
             values={
                 "auth_provider": auth_provider,
@@ -1119,7 +1132,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         """
 
         def user_set_password_hash_txn(txn):
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn, "users", {"name": user_id}, {"password_hash": password_hash}
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -1139,7 +1152,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         """
 
         def f(txn):
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn,
                 table="users",
                 keyvalues={"name": user_id},
@@ -1163,7 +1176,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         """
 
         def f(txn):
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn,
                 table="users",
                 keyvalues={"name": user_id},
@@ -1221,7 +1234,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
     def delete_access_token(self, access_token):
         def f(txn):
-            self._simple_delete_one_txn(
+            self.simple_delete_one_txn(
                 txn, table="access_tokens", keyvalues={"token": access_token}
             )
 
@@ -1233,7 +1246,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
     @cachedInlineCallbacks()
     def is_guest(self, user_id):
-        res = yield self._simple_select_one_onecol(
+        res = yield self.simple_select_one_onecol(
             table="users",
             keyvalues={"name": user_id},
             retcol="is_guest",
@@ -1248,7 +1261,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         Adds a user to the table of users who need to be parted from all the rooms they're
         in
         """
-        return self._simple_insert(
+        return self.simple_insert(
             "users_pending_deactivation",
             values={"user_id": user_id},
             desc="add_user_pending_deactivation",
@@ -1261,7 +1274,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         """
         # XXX: This should be simple_delete_one but we failed to put a unique index on
         # the table, so somehow duplicate entries have ended up in it.
-        return self._simple_delete(
+        return self.simple_delete(
             "users_pending_deactivation",
             keyvalues={"user_id": user_id},
             desc="del_user_pending_deactivation",
@@ -1272,7 +1285,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         Gets one user from the table of users waiting to be parted from all the rooms
         they're in.
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             "users_pending_deactivation",
             keyvalues={},
             retcol="user_id",
@@ -1302,7 +1315,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         # Insert everything into a transaction in order to run atomically
         def validate_threepid_session_txn(txn):
-            row = self._simple_select_one_txn(
+            row = self.simple_select_one_txn(
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
@@ -1320,7 +1333,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                     400, "This client_secret does not match the provided session_id"
                 )
 
-            row = self._simple_select_one_txn(
+            row = self.simple_select_one_txn(
                 txn,
                 table="threepid_validation_token",
                 keyvalues={"session_id": session_id, "token": token},
@@ -1345,7 +1358,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                 )
 
             # Looks good. Validate the session
-            self._simple_update_txn(
+            self.simple_update_txn(
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
@@ -1388,7 +1401,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         if validated_at:
             insertion_values["validated_at"] = validated_at
 
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="threepid_validation_session",
             keyvalues={"session_id": session_id},
             values={"last_send_attempt": send_attempt},
@@ -1426,7 +1439,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         def start_or_continue_validation_session_txn(txn):
             # Create or update a validation session
-            self._simple_upsert_txn(
+            self.simple_upsert_txn(
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
@@ -1439,7 +1452,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             )
 
             # Create a new validation token with this session ID
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="threepid_validation_token",
                 values={
@@ -1488,7 +1501,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         )
 
     def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
-        self._simple_update_one_txn(
+        self.simple_update_one_txn(
             txn=txn,
             table="users",
             keyvalues={"name": user_id},
@@ -1497,3 +1510,59 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         self._invalidate_cache_and_stream(
             txn, self.get_user_deactivated_status, (user_id,)
         )
+
+    @defer.inlineCallbacks
+    def _set_expiration_date_when_missing(self):
+        """
+        Retrieves the list of registered users that don't have an expiration date, and
+        adds an expiration date for each of them.
+        """
+
+        def select_users_with_no_expiration_date_txn(txn):
+            """Retrieves the list of registered users with no expiration date from the
+            database, filtering out deactivated users.
+            """
+            sql = (
+                "SELECT users.name FROM users"
+                " LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
+                " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
+            )
+            txn.execute(sql, [])
+
+            res = self.cursor_to_dict(txn)
+            if res:
+                for user in res:
+                    self.set_expiration_date_for_user_txn(
+                        txn, user["name"], use_delta=True
+                    )
+
+        yield self.runInteraction(
+            "get_users_with_no_expiration_date",
+            select_users_with_no_expiration_date_txn,
+        )
+
+    def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
+        """Sets an expiration date to the account with the given user ID.
+
+        Args:
+             user_id (str): User ID to set an expiration date for.
+             use_delta (bool): If set to False, the expiration date for the user will be
+                now + validity period. If set to True, this expiration date will be a
+                random value in the [now + period - d ; now + period] range, d being a
+                delta equal to 10% of the validity period.
+        """
+        now_ms = self._clock.time_msec()
+        expiration_ts = now_ms + self._account_validity.period
+
+        if use_delta:
+            expiration_ts = self.rand.randrange(
+                expiration_ts - self._account_validity.startup_job_max_delta,
+                expiration_ts,
+            )
+
+        self.simple_upsert_txn(
+            txn,
+            "account_validity",
+            keyvalues={"user_id": user_id},
+            values={"expiration_ts_ms": expiration_ts, "email_sent": False},
+        )
diff --git a/synapse/storage/data_stores/main/rejections.py b/synapse/storage/data_stores/main/rejections.py
index 7d5de0ea2e..f81f9279a1 100644
--- a/synapse/storage/data_stores/main/rejections.py
+++ b/synapse/storage/data_stores/main/rejections.py
@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
 
 class RejectionsStore(SQLBaseStore):
     def _store_rejections_txn(self, txn, event_id, reason):
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             table="rejections",
             values={
@@ -33,7 +33,7 @@ class RejectionsStore(SQLBaseStore):
         )
 
     def get_rejection_reason(self, event_id):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="rejections",
             retcol="reason",
             keyvalues={"event_id": event_id},
diff --git a/synapse/storage/data_stores/main/relations.py b/synapse/storage/data_stores/main/relations.py
index 858f65582b..aa5e10538b 100644
--- a/synapse/storage/data_stores/main/relations.py
+++ b/synapse/storage/data_stores/main/relations.py
@@ -352,7 +352,7 @@ class RelationsStore(RelationsWorkerStore):
 
         aggregation_key = relation.get("key")
 
-        self._simple_insert_txn(
+        self.simple_insert_txn(
             txn,
             table="event_relations",
             values={
@@ -380,6 +380,6 @@ class RelationsStore(RelationsWorkerStore):
             redacted_event_id (str): The event that was redacted.
         """
 
-        self._simple_delete_txn(
+        self.simple_delete_txn(
             txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
         )
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 67bb1b6f60..f309e3640c 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -19,12 +19,16 @@ import logging
 import re
 from typing import Optional, Tuple
 
+from six import integer_types
+
 from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.data_stores.main.search import SearchStore
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -50,7 +54,7 @@ class RoomWorkerStore(SQLBaseStore):
         Returns:
             A dict containing the room information, or None if the room is unknown.
         """
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="rooms",
             keyvalues={"room_id": room_id},
             retcols=("room_id", "is_public", "creator"),
@@ -59,7 +63,7 @@ class RoomWorkerStore(SQLBaseStore):
         )
 
     def get_public_room_ids(self):
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="rooms",
             keyvalues={"is_public": True},
             retcol="room_id",
@@ -263,7 +267,7 @@ class RoomWorkerStore(SQLBaseStore):
 
     @cached(max_entries=10000)
     def is_room_blocked(self, room_id):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="blocked_rooms",
             keyvalues={"room_id": room_id},
             retcol="1",
@@ -284,7 +288,7 @@ class RoomWorkerStore(SQLBaseStore):
             of RatelimitOverride are None or 0 then ratelimitng has been
             disabled for that user entirely.
         """
-        row = yield self._simple_select_one(
+        row = yield self.simple_select_one(
             table="ratelimit_override",
             keyvalues={"user_id": user_id},
             retcols=("messages_per_second", "burst_count"),
@@ -300,8 +304,148 @@ class RoomWorkerStore(SQLBaseStore):
         else:
             return None
 
+    @cachedInlineCallbacks()
+    def get_retention_policy_for_room(self, room_id):
+        """Get the retention policy for a given room.
+
+        If no retention policy has been found for this room, returns a policy defined
+        by the configured default policy (which has None as both the 'min_lifetime' and
+        the 'max_lifetime' if no default policy has been defined in the server's
+        configuration).
+
+        Args:
+            room_id (str): The ID of the room to get the retention policy of.
+
+        Returns:
+            dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
+        """
+
+        def get_retention_policy_for_room_txn(txn):
+            txn.execute(
+                """
+                SELECT min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                WHERE room_id = ?;
+                """,
+                (room_id,),
+            )
+
+            return self.cursor_to_dict(txn)
+
+        ret = yield self.runInteraction(
+            "get_retention_policy_for_room", get_retention_policy_for_room_txn,
+        )
+
+        # If we don't know this room ID, ret will be None, in this case return the default
+        # policy.
+        if not ret:
+            defer.returnValue(
+                {
+                    "min_lifetime": self.config.retention_default_min_lifetime,
+                    "max_lifetime": self.config.retention_default_max_lifetime,
+                }
+            )
+
+        row = ret[0]
+
+        # If one of the room's policy's attributes isn't defined, use the matching
+        # attribute from the default policy.
+        # The default values will be None if no default policy has been defined, or if one
+        # of the attributes is missing from the default policy.
+        if row["min_lifetime"] is None:
+            row["min_lifetime"] = self.config.retention_default_min_lifetime
+
+        if row["max_lifetime"] is None:
+            row["max_lifetime"] = self.config.retention_default_max_lifetime
+
+        defer.returnValue(row)
+
+
+class RoomBackgroundUpdateStore(BackgroundUpdateStore):
+    def __init__(self, db_conn, hs):
+        super(RoomBackgroundUpdateStore, self).__init__(db_conn, hs)
+
+        self.config = hs.config
+
+        self.register_background_update_handler(
+            "insert_room_retention", self._background_insert_retention,
+        )
+
+    @defer.inlineCallbacks
+    def _background_insert_retention(self, progress, batch_size):
+        """Retrieves a list of all rooms within a range and inserts an entry for each of
+        them into the room_retention table.
+        NULLs the property's columns if missing from the retention event in the room's
+        state (or NULLs all of them if there's no retention event in the room's state),
+        so that we fall back to the server's retention policy.
+        """
+
+        last_room = progress.get("room_id", "")
+
+        def _background_insert_retention_txn(txn):
+            txn.execute(
+                """
+                SELECT state.room_id, state.event_id, events.json
+                FROM current_state_events as state
+                LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
+                WHERE state.room_id > ? AND state.type = '%s'
+                ORDER BY state.room_id ASC
+                LIMIT ?;
+                """
+                % EventTypes.Retention,
+                (last_room, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            for row in rows:
+                if not row["json"]:
+                    retention_policy = {}
+                else:
+                    ev = json.loads(row["json"])
+                    retention_policy = json.dumps(ev["content"])
+
+                self.simple_insert_txn(
+                    txn=txn,
+                    table="room_retention",
+                    values={
+                        "room_id": row["room_id"],
+                        "event_id": row["event_id"],
+                        "min_lifetime": retention_policy.get("min_lifetime"),
+                        "max_lifetime": retention_policy.get("max_lifetime"),
+                    },
+                )
+
+            logger.info("Inserted %d rows into room_retention", len(rows))
+
+            self._background_update_progress_txn(
+                txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "insert_room_retention", _background_insert_retention_txn,
+        )
+
+        if end:
+            yield self._end_background_update("insert_room_retention")
+
+        defer.returnValue(batch_size)
+
+
+class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
+    def __init__(self, db_conn, hs):
+        super(RoomStore, self).__init__(db_conn, hs)
+
+        self.config = hs.config
 
-class RoomStore(RoomWorkerStore, SearchStore):
     @defer.inlineCallbacks
     def store_room(self, room_id, room_creator_user_id, is_public):
         """Stores a room.
@@ -317,7 +461,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         try:
 
             def store_room_txn(txn, next_id):
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     "rooms",
                     {
@@ -327,7 +471,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     },
                 )
                 if is_public:
-                    self._simple_insert_txn(
+                    self.simple_insert_txn(
                         txn,
                         table="public_room_list_stream",
                         values={
@@ -346,14 +490,14 @@ class RoomStore(RoomWorkerStore, SearchStore):
     @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
         def set_room_is_public_txn(txn, next_id):
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn,
                 table="rooms",
                 keyvalues={"room_id": room_id},
                 updatevalues={"is_public": is_public},
             )
 
-            entries = self._simple_select_list_txn(
+            entries = self.simple_select_list_txn(
                 txn,
                 table="public_room_list_stream",
                 keyvalues={
@@ -371,7 +515,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 add_to_stream = bool(entries[-1]["visibility"]) != is_public
 
             if add_to_stream:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="public_room_list_stream",
                     values={
@@ -411,7 +555,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         def set_room_is_public_appservice_txn(txn, next_id):
             if is_public:
                 try:
-                    self._simple_insert_txn(
+                    self.simple_insert_txn(
                         txn,
                         table="appservice_room_list",
                         values={
@@ -424,7 +568,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     # We've already inserted, nothing to do.
                     return
             else:
-                self._simple_delete_txn(
+                self.simple_delete_txn(
                     txn,
                     table="appservice_room_list",
                     keyvalues={
@@ -434,7 +578,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     },
                 )
 
-            entries = self._simple_select_list_txn(
+            entries = self.simple_select_list_txn(
                 txn,
                 table="public_room_list_stream",
                 keyvalues={
@@ -452,7 +596,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 add_to_stream = bool(entries[-1]["visibility"]) != is_public
 
             if add_to_stream:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="public_room_list_stream",
                     values={
@@ -502,11 +646,40 @@ class RoomStore(RoomWorkerStore, SearchStore):
                 txn, event, "content.body", event.content["body"]
             )
 
+    def _store_retention_policy_for_room_txn(self, txn, event):
+        if hasattr(event, "content") and (
+            "min_lifetime" in event.content or "max_lifetime" in event.content
+        ):
+            if (
+                "min_lifetime" in event.content
+                and not isinstance(event.content.get("min_lifetime"), integer_types)
+            ) or (
+                "max_lifetime" in event.content
+                and not isinstance(event.content.get("max_lifetime"), integer_types)
+            ):
+                # Ignore the event if one of the value isn't an integer.
+                return
+
+            self.simple_insert_txn(
+                txn=txn,
+                table="room_retention",
+                values={
+                    "room_id": event.room_id,
+                    "event_id": event.event_id,
+                    "min_lifetime": event.content.get("min_lifetime"),
+                    "max_lifetime": event.content.get("max_lifetime"),
+                },
+            )
+
+            self._invalidate_cache_and_stream(
+                txn, self.get_retention_policy_for_room, (event.room_id,)
+            )
+
     def add_event_report(
         self, room_id, event_id, user_id, reason, content, received_ts
     ):
         next_id = self._event_reports_id_gen.get_next()
-        return self._simple_insert(
+        return self.simple_insert(
             table="event_reports",
             values={
                 "id": next_id,
@@ -552,7 +725,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         Returns:
             Deferred
         """
-        yield self._simple_upsert(
+        yield self.simple_upsert(
             table="blocked_rooms",
             keyvalues={"room_id": room_id},
             values={},
@@ -683,3 +856,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
                             remote_media_mxcs.append((hostname, media_id))
 
         return local_media_mxcs, remote_media_mxcs
+
+    @defer.inlineCallbacks
+    def get_rooms_for_retention_period_in_range(
+        self, min_ms, max_ms, include_null=False
+    ):
+        """Retrieves all of the rooms within the given retention range.
+
+        Optionally includes the rooms which don't have a retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, doesn't set a lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, doesn't set an upper limit.
+            include_null (bool): Whether to include rooms which retention policy is NULL
+                in the returned set.
+
+        Returns:
+            dict[str, dict]: The rooms within this range, along with their retention
+                policy. The key is "room_id", and maps to a dict describing the retention
+                policy associated with this room ID. The keys for this nested dict are
+                "min_lifetime" (int|None), and "max_lifetime" (int|None).
+        """
+
+        def get_rooms_for_retention_period_in_range_txn(txn):
+            range_conditions = []
+            args = []
+
+            if min_ms is not None:
+                range_conditions.append("max_lifetime > ?")
+                args.append(min_ms)
+
+            if max_ms is not None:
+                range_conditions.append("max_lifetime <= ?")
+                args.append(max_ms)
+
+            # Do a first query which will retrieve the rooms that have a retention policy
+            # in their current state.
+            sql = """
+                SELECT room_id, min_lifetime, max_lifetime FROM room_retention
+                INNER JOIN current_state_events USING (event_id, room_id)
+                """
+
+            if len(range_conditions):
+                sql += " WHERE (" + " AND ".join(range_conditions) + ")"
+
+                if include_null:
+                    sql += " OR max_lifetime IS NULL"
+
+            txn.execute(sql, args)
+
+            rows = self.cursor_to_dict(txn)
+            rooms_dict = {}
+
+            for row in rows:
+                rooms_dict[row["room_id"]] = {
+                    "min_lifetime": row["min_lifetime"],
+                    "max_lifetime": row["max_lifetime"],
+                }
+
+            if include_null:
+                # If required, do a second query that retrieves all of the rooms we know
+                # of so we can handle rooms with no retention policy.
+                sql = "SELECT DISTINCT room_id FROM current_state_events"
+
+                txn.execute(sql)
+
+                rows = self.cursor_to_dict(txn)
+
+                # If a room isn't already in the dict (i.e. it doesn't have a retention
+                # policy in its state), add it with a null policy.
+                for row in rows:
+                    if row["room_id"] not in rooms_dict:
+                        rooms_dict[row["room_id"]] = {
+                            "min_lifetime": None,
+                            "max_lifetime": None,
+                        }
+
+            return rooms_dict
+
+        rooms = yield self.runInteraction(
+            "get_rooms_for_retention_period_in_range",
+            get_rooms_for_retention_period_in_range_txn,
+        )
+
+        defer.returnValue(rooms)
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 2af24a20b7..fe2428a281 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import Iterable, List
 
 from six import iteritems, itervalues
 
@@ -127,7 +128,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         membership column is up to date
         """
 
-        pending_update = self._simple_select_one_txn(
+        pending_update = self.simple_select_one_txn(
             txn,
             table="background_updates",
             keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
@@ -602,7 +603,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             to `user_id` and ProfileInfo (or None if not join event).
         """
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="room_memberships",
             column="event_id",
             iterable=event_ids,
@@ -642,7 +643,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # the returned user actually has the correct domain.
         like_clause = "%:" + host
 
-        rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
+        rows = yield self.execute("is_host_joined", None, sql, room_id, like_clause)
 
         if not rows:
             return False
@@ -682,7 +683,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # the returned user actually has the correct domain.
         like_clause = "%:" + host
 
-        rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
+        rows = yield self.execute("was_host_joined", None, sql, room_id, like_clause)
 
         if not rows:
             return False
@@ -804,7 +805,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             Deferred[set[str]]: Set of room IDs.
         """
 
-        room_ids = yield self._simple_select_onecol(
+        room_ids = yield self.simple_select_onecol(
             table="room_memberships",
             keyvalues={"membership": Membership.JOIN, "user_id": user_id},
             retcol="room_id",
@@ -813,6 +814,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return set(room_ids)
 
+    def get_membership_from_event_ids(
+        self, member_event_ids: Iterable[str]
+    ) -> List[dict]:
+        """Get user_id and membership of a set of event IDs.
+        """
+
+        return self.simple_select_many_batch(
+            table="room_memberships",
+            column="event_id",
+            iterable=member_event_ids,
+            retcols=("user_id", "membership", "event_id"),
+            keyvalues={},
+            batch_size=500,
+            desc="get_membership_from_event_ids",
+        )
+
 
 class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
     def __init__(self, db_conn, hs):
@@ -973,7 +990,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
     def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
         """
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="room_memberships",
             values=[
@@ -1011,7 +1028,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
             is_mine = self.hs.is_mine_id(event.state_key)
             if is_new_state and is_mine:
                 if event.membership == Membership.INVITE:
-                    self._simple_insert_txn(
+                    self.simple_insert_txn(
                         txn,
                         table="local_invites",
                         values={
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql
new file mode 100644
index 0000000000..81a36a8b1d
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_expiry.sql
@@ -0,0 +1,21 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_expiry (
+    event_id TEXT PRIMARY KEY,
+    expiry_ts BIGINT NOT NULL
+);
+
+CREATE INDEX event_expiry_expiry_ts_idx ON event_expiry(expiry_ts);
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
new file mode 100644
index 0000000000..7d70dd071e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- store the current etag of backup version
+ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
new file mode 100644
index 0000000000..ee6cdf7a14
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
@@ -0,0 +1,33 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Tracks the retention policy of a room.
+-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
+-- the room's retention policy state event.
+-- If a room doesn't have a retention policy state event in its state, both max_lifetime
+-- and min_lifetime are NULL.
+CREATE TABLE IF NOT EXISTS room_retention(
+    room_id TEXT,
+    event_id TEXT,
+    min_lifetime BIGINT,
+    max_lifetime BIGINT,
+
+    PRIMARY KEY(room_id, event_id)
+);
+
+CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('insert_room_retention', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql b/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
index 27a96123e3..5c5fffcafb 100644
--- a/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
+++ b/synapse/storage/data_stores/main/schema/delta/56/signing_keys.sql
@@ -40,7 +40,8 @@ CREATE TABLE IF NOT EXISTS e2e_cross_signing_signatures (
     signature TEXT NOT NULL
 );
 
-CREATE UNIQUE INDEX e2e_cross_signing_signatures_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
+-- replaced by the index created in signing_keys_nonunique_signatures.sql
+-- CREATE UNIQUE INDEX e2e_cross_signing_signatures_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
 
 -- stream of user signature updates
 CREATE TABLE IF NOT EXISTS user_signature_stream (
diff --git a/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql b/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql
new file mode 100644
index 0000000000..0aa90ebf0c
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/signing_keys_nonunique_signatures.sql
@@ -0,0 +1,22 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* The cross-signing signatures index should not be a unique index, because a
+ * user may upload multiple signatures for the same target user. The previous
+ * index was unique, so delete it if it's there and create a new non-unique
+ * index. */
+
+DROP INDEX IF EXISTS e2e_cross_signing_signatures_idx; CREATE INDEX IF NOT
+EXISTS e2e_cross_signing_signatures2_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id);
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index d1d7c6863d..f735cf095c 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -441,7 +441,7 @@ class SearchStore(SearchBackgroundUpdateStore):
         # entire table from the database.
         sql += " ORDER BY rank DESC LIMIT 500"
 
-        results = yield self._execute("search_msgs", self.cursor_to_dict, sql, *args)
+        results = yield self.execute("search_msgs", self.cursor_to_dict, sql, *args)
 
         results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
@@ -455,7 +455,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self._execute(
+        count_results = yield self.execute(
             "search_rooms_count", self.cursor_to_dict, count_sql, *count_args
         )
 
@@ -586,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         args.append(limit)
 
-        results = yield self._execute("search_rooms", self.cursor_to_dict, sql, *args)
+        results = yield self.execute("search_rooms", self.cursor_to_dict, sql, *args)
 
         results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
@@ -600,7 +600,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self._execute(
+        count_results = yield self.execute(
             "search_rooms_count", self.cursor_to_dict, count_sql, *count_args
         )
 
diff --git a/synapse/storage/data_stores/main/signatures.py b/synapse/storage/data_stores/main/signatures.py
index 556191b76f..f3da29ce14 100644
--- a/synapse/storage/data_stores/main/signatures.py
+++ b/synapse/storage/data_stores/main/signatures.py
@@ -98,4 +98,4 @@ class SignatureStore(SignatureWorkerStore):
                 }
             )
 
-        self._simple_insert_many_txn(txn, table="event_reference_hashes", values=vals)
+        self.simple_insert_many_txn(txn, table="event_reference_hashes", values=vals)
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 6a90daea31..2b33ec1a35 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -89,7 +89,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
             count = 0
 
             while next_group:
-                next_group = self._simple_select_one_onecol_txn(
+                next_group = self.simple_select_one_onecol_txn(
                     txn,
                     table="state_group_edges",
                     keyvalues={"state_group": next_group},
@@ -192,7 +192,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
                     ):
                         break
 
-                    next_group = self._simple_select_one_onecol_txn(
+                    next_group = self.simple_select_one_onecol_txn(
                         txn,
                         table="state_group_edges",
                         keyvalues={"state_group": next_group},
@@ -431,7 +431,7 @@ class StateGroupWorkerStore(
         """
 
         def _get_state_group_delta_txn(txn):
-            prev_group = self._simple_select_one_onecol_txn(
+            prev_group = self.simple_select_one_onecol_txn(
                 txn,
                 table="state_group_edges",
                 keyvalues={"state_group": state_group},
@@ -442,7 +442,7 @@ class StateGroupWorkerStore(
             if not prev_group:
                 return _GetStateGroupDelta(None, None)
 
-            delta_ids = self._simple_select_list_txn(
+            delta_ids = self.simple_select_list_txn(
                 txn,
                 table="state_groups_state",
                 keyvalues={"state_group": state_group},
@@ -644,7 +644,7 @@ class StateGroupWorkerStore(
 
     @cached(max_entries=50000)
     def _get_state_group_for_event(self, event_id):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="event_to_state_groups",
             keyvalues={"event_id": event_id},
             retcol="state_group",
@@ -661,7 +661,7 @@ class StateGroupWorkerStore(
     def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="event_to_state_groups",
             column="event_id",
             iterable=event_ids,
@@ -902,7 +902,7 @@ class StateGroupWorkerStore(
 
             state_group = self.database_engine.get_next_state_group_id(txn)
 
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={"id": state_group, "room_id": room_id, "event_id": event_id},
@@ -911,7 +911,7 @@ class StateGroupWorkerStore(
             # We persist as a delta if we can, while also ensuring the chain
             # of deltas isn't tooo long, as otherwise read performance degrades.
             if prev_group:
-                is_in_db = self._simple_select_one_onecol_txn(
+                is_in_db = self.simple_select_one_onecol_txn(
                     txn,
                     table="state_groups",
                     keyvalues={"id": prev_group},
@@ -926,13 +926,13 @@ class StateGroupWorkerStore(
 
                 potential_hops = self._count_state_group_hops_txn(txn, prev_group)
             if prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
-                self._simple_insert_txn(
+                self.simple_insert_txn(
                     txn,
                     table="state_group_edges",
                     values={"state_group": state_group, "prev_state_group": prev_group},
                 )
 
-                self._simple_insert_many_txn(
+                self.simple_insert_many_txn(
                     txn,
                     table="state_groups_state",
                     values=[
@@ -947,7 +947,7 @@ class StateGroupWorkerStore(
                     ],
                 )
             else:
-                self._simple_insert_many_txn(
+                self.simple_insert_many_txn(
                     txn,
                     table="state_groups_state",
                     values=[
@@ -1007,7 +1007,7 @@ class StateGroupWorkerStore(
             referenced.
         """
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="event_to_state_groups",
             column="state_group",
             iterable=state_groups,
@@ -1065,7 +1065,7 @@ class StateBackgroundUpdateStore(
         batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR))
 
         if max_group is None:
-            rows = yield self._execute(
+            rows = yield self.execute(
                 "_background_deduplicate_state",
                 None,
                 "SELECT coalesce(max(id), 0) FROM state_groups",
@@ -1135,13 +1135,13 @@ class StateBackgroundUpdateStore(
                             if prev_state.get(key, None) != value
                         }
 
-                        self._simple_delete_txn(
+                        self.simple_delete_txn(
                             txn,
                             table="state_group_edges",
                             keyvalues={"state_group": state_group},
                         )
 
-                        self._simple_insert_txn(
+                        self.simple_insert_txn(
                             txn,
                             table="state_group_edges",
                             values={
@@ -1150,13 +1150,13 @@ class StateBackgroundUpdateStore(
                             },
                         )
 
-                        self._simple_delete_txn(
+                        self.simple_delete_txn(
                             txn,
                             table="state_groups_state",
                             keyvalues={"state_group": state_group},
                         )
 
-                        self._simple_insert_many_txn(
+                        self.simple_insert_many_txn(
                             txn,
                             table="state_groups_state",
                             values=[
@@ -1263,7 +1263,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
 
             state_groups[event.event_id] = context.state_group
 
-        self._simple_insert_many_txn(
+        self.simple_insert_many_txn(
             txn,
             table="event_to_state_groups",
             values=[
diff --git a/synapse/storage/data_stores/main/state_deltas.py b/synapse/storage/data_stores/main/state_deltas.py
index 28f33ec18f..03b908026b 100644
--- a/synapse/storage/data_stores/main/state_deltas.py
+++ b/synapse/storage/data_stores/main/state_deltas.py
@@ -105,7 +105,7 @@ class StateDeltasStore(SQLBaseStore):
         )
 
     def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
-        return self._simple_select_one_onecol_txn(
+        return self.simple_select_one_onecol_txn(
             txn,
             table="current_state_delta_stream",
             keyvalues={},
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 45b3de7d56..b306478824 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -186,7 +186,7 @@ class StatsStore(StateDeltasStore):
         """
         Returns the stats processor positions.
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="stats_incremental_position",
             keyvalues={},
             retcol="stream_id",
@@ -215,7 +215,7 @@ class StatsStore(StateDeltasStore):
             if field and "\0" in field:
                 fields[col] = None
 
-        return self._simple_upsert(
+        return self.simple_upsert(
             table="room_stats_state",
             keyvalues={"room_id": room_id},
             values=fields,
@@ -257,14 +257,14 @@ class StatsStore(StateDeltasStore):
             ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
         )
 
-        slice_list = self._simple_select_list_paginate_txn(
+        slice_list = self.simple_select_list_paginate_txn(
             txn,
             table + "_historical",
-            {id_col: stats_id},
             "end_ts",
             start,
             size,
             retcols=selected_columns + ["bucket_size", "end_ts"],
+            keyvalues={id_col: stats_id},
             order_direction="DESC",
         )
 
@@ -282,7 +282,7 @@ class StatsStore(StateDeltasStore):
                 "name", "topic", "canonical_alias", "avatar", "join_rules",
                 "history_visibility"
         """
-        return self._simple_select_one(
+        return self.simple_select_one(
             "room_stats_state",
             {"room_id": room_id},
             retcols=(
@@ -308,7 +308,7 @@ class StatsStore(StateDeltasStore):
         """
         table, id_col = TYPE_TO_TABLE[stats_type]
 
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             "%s_current" % (table,),
             keyvalues={id_col: id},
             retcol="completed_delta_stream_id",
@@ -344,7 +344,7 @@ class StatsStore(StateDeltasStore):
                         complete_with_stream_id=stream_id,
                     )
 
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn,
                 table="stats_incremental_position",
                 keyvalues={},
@@ -517,17 +517,17 @@ class StatsStore(StateDeltasStore):
         else:
             self.database_engine.lock_table(txn, table)
             retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self._simple_select_one_txn(
+            current_row = self.simple_select_one_txn(
                 txn, table, keyvalues, retcols, allow_none=True
             )
             if current_row is None:
                 merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self._simple_insert_txn(txn, table, merged_dict)
+                self.simple_insert_txn(txn, table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():
                     current_row[key] += val
                 current_row.update(absolutes)
-                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+                self.simple_update_one_txn(txn, table, keyvalues, current_row)
 
     def _upsert_copy_from_table_with_additive_relatives_txn(
         self,
@@ -614,11 +614,11 @@ class StatsStore(StateDeltasStore):
             txn.execute(sql, qargs)
         else:
             self.database_engine.lock_table(txn, into_table)
-            src_row = self._simple_select_one_txn(
+            src_row = self.simple_select_one_txn(
                 txn, src_table, keyvalues, copy_columns
             )
             all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
-            dest_current_row = self._simple_select_one_txn(
+            dest_current_row = self.simple_select_one_txn(
                 txn,
                 into_table,
                 keyvalues=all_dest_keyvalues,
@@ -634,11 +634,11 @@ class StatsStore(StateDeltasStore):
                     **src_row,
                     **additive_relatives,
                 }
-                self._simple_insert_txn(txn, into_table, merged_dict)
+                self.simple_insert_txn(txn, into_table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():
                     src_row[key] = dest_current_row[key] + val
-                self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
+                self.simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
 
     def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
         """Fetches the counts of events in the given range of stream IDs.
@@ -735,7 +735,7 @@ class StatsStore(StateDeltasStore):
         def _fetch_current_state_stats(txn):
             pos = self.get_room_max_stream_ordering()
 
-            rows = self._simple_select_many_txn(
+            rows = self.simple_select_many_txn(
                 txn,
                 table="current_state_events",
                 column="type",
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 8780fdd989..60487c4559 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -1,5 +1,8 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -252,7 +255,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         super(StreamWorkerStore, self).__init__(db_conn, hs)
 
         events_max = self.get_room_max_stream_ordering()
-        event_cache_prefill, min_event_val = self._get_cache_dict(
+        event_cache_prefill, min_event_val = self.get_cache_dict(
             db_conn,
             "events",
             entity_column="room_id",
@@ -573,7 +576,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             A deferred "s%d" stream token.
         """
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
         ).addCallback(lambda row: "s%d" % (row,))
 
@@ -586,7 +589,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             A deferred "t%d-%d" topological token.
         """
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="events",
             keyvalues={"event_id": event_id},
             retcols=("stream_ordering", "topological_ordering"),
@@ -610,13 +613,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "SELECT coalesce(max(topological_ordering), 0) FROM events"
             " WHERE room_id = ? AND stream_ordering < ?"
         )
-        return self._execute(
+        return self.execute(
             "get_max_topological_token", None, sql, room_id, stream_key
         ).addCallback(lambda r: r[0][0] if r else 0)
 
     def _get_max_topological_txn(self, txn, room_id):
         txn.execute(
-            "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
+            "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
             (room_id,),
         )
 
@@ -706,7 +709,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             dict
         """
 
-        results = self._simple_select_one_txn(
+        results = self.simple_select_one_txn(
             txn,
             "events",
             keyvalues={"event_id": event_id, "room_id": room_id},
@@ -794,7 +797,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         return upper_bound, events
 
     def get_federation_out_pos(self, typ):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="federation_stream_position",
             retcol="stream_id",
             keyvalues={"type": typ},
@@ -802,7 +805,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     def update_federation_out_pos(self, typ, stream_id):
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="federation_stream_position",
             keyvalues={"type": typ},
             updatevalues={"stream_id": stream_id},
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 10d1887f75..85012403be 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -41,7 +41,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             tag strings to tag content.
         """
 
-        deferred = self._simple_select_list(
+        deferred = self.simple_select_list(
             "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
         )
 
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
         )
 
         def get_tag_content(txn, tag_ids):
-            sql = (
-                "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
-            )
+            sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
             results = []
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
@@ -155,7 +153,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
         Returns:
             A deferred list of string tags.
         """
-        return self._simple_select_list(
+        return self.simple_select_list(
             table="room_tags",
             keyvalues={"user_id": user_id, "room_id": room_id},
             retcols=("tag", "content"),
@@ -180,7 +178,7 @@ class TagsStore(TagsWorkerStore):
         content_json = json.dumps(content)
 
         def add_tag_txn(txn, next_id):
-            self._simple_upsert_txn(
+            self.simple_upsert_txn(
                 txn,
                 table="room_tags",
                 keyvalues={"user_id": user_id, "room_id": room_id, "tag": tag},
diff --git a/synapse/storage/data_stores/main/transactions.py b/synapse/storage/data_stores/main/transactions.py
index 01b1be5e14..c162f3ea16 100644
--- a/synapse/storage/data_stores/main/transactions.py
+++ b/synapse/storage/data_stores/main/transactions.py
@@ -85,7 +85,7 @@ class TransactionStore(SQLBaseStore):
         )
 
     def _get_received_txn_response(self, txn, transaction_id, origin):
-        result = self._simple_select_one_txn(
+        result = self.simple_select_one_txn(
             txn,
             table="received_transactions",
             keyvalues={"transaction_id": transaction_id, "origin": origin},
@@ -119,7 +119,7 @@ class TransactionStore(SQLBaseStore):
             response_json (str)
         """
 
-        return self._simple_insert(
+        return self.simple_insert(
             table="received_transactions",
             values={
                 "transaction_id": transaction_id,
@@ -160,7 +160,7 @@ class TransactionStore(SQLBaseStore):
         return result
 
     def _get_destination_retry_timings(self, txn, destination):
-        result = self._simple_select_one_txn(
+        result = self.simple_select_one_txn(
             txn,
             table="destinations",
             keyvalues={"destination": destination},
@@ -227,7 +227,7 @@ class TransactionStore(SQLBaseStore):
         # We need to be careful here as the data may have changed from under us
         # due to a worker setting the timings.
 
-        prev_row = self._simple_select_one_txn(
+        prev_row = self.simple_select_one_txn(
             txn,
             table="destinations",
             keyvalues={"destination": destination},
@@ -236,7 +236,7 @@ class TransactionStore(SQLBaseStore):
         )
 
         if not prev_row:
-            self._simple_insert_txn(
+            self.simple_insert_txn(
                 txn,
                 table="destinations",
                 values={
@@ -247,7 +247,7 @@ class TransactionStore(SQLBaseStore):
                 },
             )
         elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
-            self._simple_update_one_txn(
+            self.simple_update_one_txn(
                 txn,
                 "destinations",
                 keyvalues={"destination": destination},
diff --git a/synapse/storage/data_stores/main/user_directory.py b/synapse/storage/data_stores/main/user_directory.py
index 652abe0e6a..1a85aabbfb 100644
--- a/synapse/storage/data_stores/main/user_directory.py
+++ b/synapse/storage/data_stores/main/user_directory.py
@@ -85,7 +85,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
             """
             txn.execute(sql)
             rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
-            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+            self.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
             del rooms
 
             # If search all users is on, get all the users we want to add.
@@ -100,13 +100,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
                 txn.execute("SELECT name FROM users")
                 users = [{"user_id": x[0]} for x in txn.fetchall()]
 
-                self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+                self.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
 
         new_pos = yield self.get_max_stream_id_in_current_state_deltas()
         yield self.runInteraction(
             "populate_user_directory_temp_build", _make_staging_area
         )
-        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+        yield self.simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
 
         yield self._end_background_update("populate_user_directory_createtables")
         return 1
@@ -116,7 +116,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         """
         Update the user directory stream position, then clean up the old tables.
         """
-        position = yield self._simple_select_one_onecol(
+        position = yield self.simple_select_one_onecol(
             TEMP_TABLE + "_position", None, "position"
         )
         yield self.update_user_directory_stream_pos(position)
@@ -243,7 +243,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
                         to_insert.clear()
 
             # We've finished a room. Delete it from the table.
-            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+            yield self.simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
             # Update the remaining counter.
             progress["remaining"] -= 1
             yield self.runInteraction(
@@ -312,7 +312,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
             )
 
             # We've finished processing a user. Delete it from the table.
-            yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
+            yield self.simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
             # Update the remaining counter.
             progress["remaining"] -= 1
             yield self.runInteraction(
@@ -361,7 +361,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         """
 
         def _update_profile_in_user_dir_txn(txn):
-            new_entry = self._simple_upsert_txn(
+            new_entry = self.simple_upsert_txn(
                 txn,
                 table="user_directory",
                 keyvalues={"user_id": user_id},
@@ -435,7 +435,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
                         )
             elif isinstance(self.database_engine, Sqlite3Engine):
                 value = "%s %s" % (user_id, display_name) if display_name else user_id
-                self._simple_upsert_txn(
+                self.simple_upsert_txn(
                     txn,
                     table="user_directory_search",
                     keyvalues={"user_id": user_id},
@@ -462,7 +462,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         """
 
         def _add_users_who_share_room_txn(txn):
-            self._simple_upsert_many_txn(
+            self.simple_upsert_many_txn(
                 txn,
                 table="users_who_share_private_rooms",
                 key_names=["user_id", "other_user_id", "room_id"],
@@ -489,7 +489,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
 
         def _add_users_in_public_rooms_txn(txn):
 
-            self._simple_upsert_many_txn(
+            self.simple_upsert_many_txn(
                 txn,
                 table="users_in_public_rooms",
                 key_names=["user_id", "room_id"],
@@ -519,7 +519,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
 
     @cached()
     def get_user_in_directory(self, user_id):
-        return self._simple_select_one(
+        return self.simple_select_one(
             table="user_directory",
             keyvalues={"user_id": user_id},
             retcols=("display_name", "avatar_url"),
@@ -528,7 +528,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         )
 
     def update_user_directory_stream_pos(self, stream_id):
-        return self._simple_update_one(
+        return self.simple_update_one(
             table="user_directory_stream_pos",
             keyvalues={},
             updatevalues={"stream_id": stream_id},
@@ -547,21 +547,21 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
 
     def remove_from_user_dir(self, user_id):
         def _remove_from_user_dir_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="user_directory", keyvalues={"user_id": user_id}
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="user_directory_search", keyvalues={"user_id": user_id}
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="users_who_share_private_rooms",
                 keyvalues={"user_id": user_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="users_who_share_private_rooms",
                 keyvalues={"other_user_id": user_id},
@@ -575,14 +575,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
-        user_ids_share_pub = yield self._simple_select_onecol(
+        user_ids_share_pub = yield self.simple_select_onecol(
             table="users_in_public_rooms",
             keyvalues={"room_id": room_id},
             retcol="user_id",
             desc="get_users_in_dir_due_to_room",
         )
 
-        user_ids_share_priv = yield self._simple_select_onecol(
+        user_ids_share_priv = yield self.simple_select_onecol(
             table="users_who_share_private_rooms",
             keyvalues={"room_id": room_id},
             retcol="other_user_id",
@@ -605,17 +605,17 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         """
 
         def _remove_user_who_share_room_txn(txn):
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="users_who_share_private_rooms",
                 keyvalues={"user_id": user_id, "room_id": room_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="users_who_share_private_rooms",
                 keyvalues={"other_user_id": user_id, "room_id": room_id},
             )
-            self._simple_delete_txn(
+            self.simple_delete_txn(
                 txn,
                 table="users_in_public_rooms",
                 keyvalues={"user_id": user_id, "room_id": room_id},
@@ -636,14 +636,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         Returns:
             list: user_id
         """
-        rows = yield self._simple_select_onecol(
+        rows = yield self.simple_select_onecol(
             table="users_who_share_private_rooms",
             keyvalues={"user_id": user_id},
             retcol="room_id",
             desc="get_rooms_user_is_in",
         )
 
-        pub_rows = yield self._simple_select_onecol(
+        pub_rows = yield self.simple_select_onecol(
             table="users_in_public_rooms",
             keyvalues={"user_id": user_id},
             retcol="room_id",
@@ -674,14 +674,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             ) f2 USING (room_id)
         """
 
-        rows = yield self._execute(
+        rows = yield self.execute(
             "get_rooms_in_common_for_users", None, sql, user_id, other_user_id
         )
 
         return [room_id for room_id, in rows]
 
     def get_user_directory_stream_pos(self):
-        return self._simple_select_one_onecol(
+        return self.simple_select_one_onecol(
             table="user_directory_stream_pos",
             keyvalues={},
             retcol="stream_id",
@@ -786,9 +786,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
 
-        results = yield self._execute(
-            "search_user_dir", self.cursor_to_dict, sql, *args
-        )
+        results = yield self.execute("search_user_dir", self.cursor_to_dict, sql, *args)
 
         limited = len(results) > limit
 
diff --git a/synapse/storage/data_stores/main/user_erasure_store.py b/synapse/storage/data_stores/main/user_erasure_store.py
index aa4f0da5f0..37860af070 100644
--- a/synapse/storage/data_stores/main/user_erasure_store.py
+++ b/synapse/storage/data_stores/main/user_erasure_store.py
@@ -31,7 +31,7 @@ class UserErasureWorkerStore(SQLBaseStore):
         Returns:
             Deferred[bool]: True if the user has requested erasure
         """
-        return self._simple_select_onecol(
+        return self.simple_select_onecol(
             table="erased_users",
             keyvalues={"user_id": user_id},
             retcol="1",
@@ -56,7 +56,7 @@ class UserErasureWorkerStore(SQLBaseStore):
         # iterate it multiple times, and (b) avoiding duplicates.
         user_ids = tuple(set(user_ids))
 
-        rows = yield self._simple_select_many_batch(
+        rows = yield self.simple_select_many_batch(
             table="erased_users",
             column="user_id",
             iterable=user_ids,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2e7753820e..731e1c9d9c 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
         # Mark as done.
         cur.execute(
             database_engine.convert_param_style(
-                "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
+                "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
             ),
             (modname, name),
         )