summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py56
-rw-r--r--synapse/storage/deviceinbox.py14
-rw-r--r--synapse/storage/devices.py14
-rw-r--r--synapse/storage/engines/postgres.py13
-rw-r--r--synapse/storage/engines/sqlite.py6
-rw-r--r--synapse/storage/event_federation.py9
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/events_bg_updates.py55
-rw-r--r--synapse/storage/events_worker.py28
-rw-r--r--synapse/storage/filtering.py4
-rw-r--r--synapse/storage/monthly_active_users.py101
-rw-r--r--synapse/storage/presence.py14
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/receipts.py53
-rw-r--r--synapse/storage/roommember.py18
-rw-r--r--synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres17
-rw-r--r--synapse/storage/schema/delta/56/unique_user_filter_index.py58
-rw-r--r--synapse/storage/search.py13
-rw-r--r--synapse/storage/state_deltas.py38
-rw-r--r--synapse/storage/user_erasure_store.py18
20 files changed, 377 insertions, 198 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index abe16334ec..f5906fcd54 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -20,6 +20,7 @@ import random
 import sys
 import threading
 import time
+from typing import Iterable, Tuple
 
 from six import PY2, iteritems, iterkeys, itervalues
 from six.moves import builtins, intern, range
@@ -30,7 +31,7 @@ from prometheus_client import Histogram
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.logging.context import LoggingContext, make_deferred_yieldable
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
@@ -550,8 +551,9 @@ class SQLBaseStore(object):
 
                 return func(conn, *args, **kwargs)
 
-        with PreserveLoggingContext():
-            result = yield self._db_pool.runWithConnection(inner_func, *args, **kwargs)
+        result = yield make_deferred_yieldable(
+            self._db_pool.runWithConnection(inner_func, *args, **kwargs)
+        )
 
         return result
 
@@ -1162,19 +1164,18 @@ class SQLBaseStore(object):
         if not iterable:
             return []
 
-        sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
-
-        clauses = []
-        values = []
-        clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable)))
-        values.extend(iterable)
+        clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
+        clauses = [clause]
 
         for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
-        if clauses:
-            sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
+        sql = "SELECT %s FROM %s WHERE %s" % (
+            ", ".join(retcols),
+            table,
+            " AND ".join(clauses),
+        )
 
         txn.execute(sql, values)
         return cls.cursor_to_dict(txn)
@@ -1323,10 +1324,8 @@ class SQLBaseStore(object):
 
         sql = "DELETE FROM %s" % table
 
-        clauses = []
-        values = []
-        clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable)))
-        values.extend(iterable)
+        clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
+        clauses = [clause]
 
         for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
@@ -1693,3 +1692,30 @@ def db_to_json(db_content):
     except Exception:
         logging.warning("Tried to decode '%r' as JSON and failed", db_content)
         raise
+
+
+def make_in_list_sql_clause(
+    database_engine, column: str, iterable: Iterable
+) -> Tuple[str, Iterable]:
+    """Returns an SQL clause that checks the given column is in the iterable.
+
+    On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres
+    it expands to `column = ANY(?)`. While both DBs support the `IN` form,
+    using the `ANY` form on postgres means that it views queries with
+    different length iterables as the same, helping the query stats.
+
+    Args:
+        database_engine
+        column: Name of the column
+        iterable: The values to check the column against.
+
+    Returns:
+        A tuple of SQL query and the args
+    """
+
+    if database_engine.supports_using_any_list:
+        # This should hopefully be faster, but also makes postgres query
+        # stats easier to understand.
+        return "%s = ANY(?)" % (column,), [list(iterable)]
+    else:
+        return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 70bc2bb2cc..f04aad0743 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -20,7 +20,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
 
@@ -378,15 +378,15 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             else:
                 if not devices:
                     continue
-                sql = (
-                    "SELECT device_id FROM devices"
-                    " WHERE user_id = ? AND device_id IN ("
-                    + ",".join("?" * len(devices))
-                    + ")"
+
+                clause, args = make_in_list_sql_clause(
+                    txn.database_engine, "device_id", devices
                 )
+                sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause
+
                 # TODO: Maybe this needs to be done in batches if there are
                 # too many local devices for a given user.
-                txn.execute(sql, [user_id] + devices)
+                txn.execute(sql, [user_id] + list(args))
                 for row in txn:
                     # Only insert into the local inbox if the device exists on
                     # this server
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 111bfb3d64..ac5239e50a 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -28,7 +28,12 @@ from synapse.logging.opentracing import (
     whitelisted_homeserver,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import Cache, SQLBaseStore, db_to_json
+from synapse.storage._base import (
+    Cache,
+    SQLBaseStore,
+    db_to_json,
+    make_in_list_sql_clause,
+)
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util import batch_iter
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
@@ -448,11 +453,14 @@ class DeviceWorkerStore(SQLBaseStore):
             sql = """
                 SELECT DISTINCT user_id FROM device_lists_stream
                 WHERE stream_id > ?
-                AND user_id IN (%s)
+                AND
             """
 
             for chunk in batch_iter(to_check, 100):
-                txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
+                clause, args = make_in_list_sql_clause(
+                    txn.database_engine, "user_id", chunk
+                )
+                txn.execute(sql + clause, (from_key,) + tuple(args))
                 changes.update(user_id for user_id, in txn)
 
             return changes
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 601617b21e..b7c4eda338 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -22,6 +22,13 @@ class PostgresEngine(object):
     def __init__(self, database_module, database_config):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+        # Disables passing `bytes` to txn.execute, c.f. #6186. If you do
+        # actually want to use bytes than wrap it in `bytearray`.
+        def _disable_bytes_adapter(_):
+            raise Exception("Passing bytes to DB is disabled.")
+
+        self.module.extensions.register_adapter(bytes, _disable_bytes_adapter)
         self.synchronous_commit = database_config.get("synchronous_commit", True)
         self._version = None  # unknown as yet
 
@@ -79,6 +86,12 @@ class PostgresEngine(object):
         """
         return True
 
+    @property
+    def supports_using_any_list(self):
+        """Do we support using `a = ANY(?)` and passing a list
+        """
+        return True
+
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index ac92109366..ddad17dc5a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -46,6 +46,12 @@ class Sqlite3Engine(object):
         """
         return self.module.sqlite_version_info >= (3, 15, 0)
 
+    @property
+    def supports_using_any_list(self):
+        """Do we support using `a = ANY(?)` and passing a list
+        """
+        return False
+
     def check_database(self, txn):
         pass
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index f5e8c39262..47cc10d32a 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.util.caches.descriptors import cached
@@ -68,7 +68,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         else:
             results = set()
 
-        base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
+        base_sql = "SELECT auth_id FROM event_auth WHERE "
 
         front = set(event_ids)
         while front:
@@ -76,7 +76,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             front_list = list(front)
             chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
             for chunk in chunks:
-                txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk)
+                clause, args = make_in_list_sql_clause(
+                    txn.database_engine, "event_id", chunk
+                )
+                txn.execute(base_sql + clause, list(args))
                 new_front.update([r[0] for r in txn])
 
             new_front -= results
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bb6ff0595a..ee49ef235d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,6 +39,7 @@ from synapse.logging.utils import log_function
 from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
+from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
@@ -641,14 +642,16 @@ class EventsStore(
                 LEFT JOIN rejections USING (event_id)
                 LEFT JOIN event_json USING (event_id)
             WHERE
-                prev_event_id IN (%s)
-                AND NOT events.outlier
+                NOT events.outlier
                 AND rejections.event_id IS NULL
-            """ % (
-                ",".join("?" for _ in batch),
+                AND
+            """
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "prev_event_id", batch
             )
 
-            txn.execute(sql, batch)
+            txn.execute(sql + clause, args)
             results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
@@ -695,13 +698,15 @@ class EventsStore(
                     LEFT JOIN rejections USING (event_id)
                     LEFT JOIN event_json USING (event_id)
                 WHERE
-                    event_id IN (%s)
-                    AND NOT events.outlier
-                """ % (
-                    ",".join("?" for _ in to_recursively_check),
+                    NOT events.outlier
+                    AND
+                """
+
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "event_id", to_recursively_check
                 )
 
-                txn.execute(sql, to_recursively_check)
+                txn.execute(sql + clause, args)
                 to_recursively_check = []
 
                 for event_id, prev_event_id, metadata, rejected in txn:
@@ -1543,10 +1548,14 @@ class EventsStore(
                 " FROM events as e"
                 " LEFT JOIN rejections as rej USING (event_id)"
                 " LEFT JOIN redactions as r ON e.event_id = r.redacts"
-                " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"] * len(ev_map)),)
+                " WHERE "
+            )
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "e.event_id", list(ev_map)
+            )
 
-            txn.execute(sql, list(ev_map))
+            txn.execute(sql + clause, args)
             rows = self.cursor_to_dict(txn)
             for row in rows:
                 event = ev_map[row["event_id"]]
@@ -2249,11 +2258,12 @@ class EventsStore(
             sql = """
                 SELECT DISTINCT state_group FROM event_to_state_groups
                 LEFT JOIN events_to_purge AS ep USING (event_id)
-                WHERE state_group IN (%s) AND ep.event_id IS NULL
-            """ % (
-                ",".join("?" for _ in current_search),
+                WHERE ep.event_id IS NULL AND
+            """
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "state_group", current_search
             )
-            txn.execute(sql, list(current_search))
+            txn.execute(sql + clause, list(args))
 
             referenced = set(sg for sg, in txn)
             referenced_groups |= referenced
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 5717baf48c..31ea6f917f 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 
 logger = logging.getLogger(__name__)
@@ -71,6 +72,19 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             "redactions_received_ts", self._redactions_received_ts
         )
 
+        # This index gets deleted in `event_fix_redactions_bytes` update
+        self.register_background_index_update(
+            "event_fix_redactions_bytes_create_index",
+            index_name="redactions_censored_redacts",
+            table="redactions",
+            columns=["redacts"],
+            where_clause="have_censored",
+        )
+
+        self.register_background_update_handler(
+            "event_fix_redactions_bytes", self._event_fix_redactions_bytes
+        )
+
     @defer.inlineCallbacks
     def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -312,12 +326,13 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                     INNER JOIN event_json USING (event_id)
                     LEFT JOIN rejections USING (event_id)
                     WHERE
-                        prev_event_id IN (%s)
-                        AND NOT events.outlier
-                """ % (
-                    ",".join("?" for _ in to_check),
+                        NOT events.outlier
+                        AND
+                """
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "prev_event_id", to_check
                 )
-                txn.execute(sql, to_check)
+                txn.execute(sql + clause, list(args))
 
                 for prev_event_id, event_id, metadata, rejected in txn:
                     if event_id in graph:
@@ -458,3 +473,33 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             yield self._end_background_update("redactions_received_ts")
 
         return count
+
+    @defer.inlineCallbacks
+    def _event_fix_redactions_bytes(self, progress, batch_size):
+        """Undoes hex encoded censored redacted event JSON.
+        """
+
+        def _event_fix_redactions_bytes_txn(txn):
+            # This update is quite fast due to new index.
+            txn.execute(
+                """
+                UPDATE event_json
+                SET
+                    json = convert_from(json::bytea, 'utf8')
+                FROM redactions
+                WHERE
+                    redactions.have_censored
+                    AND event_json.event_id = redactions.redacts
+                    AND json NOT LIKE '{%';
+                """
+            )
+
+            txn.execute("DROP INDEX redactions_censored_redacts")
+
+        yield self.runInteraction(
+            "_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
+        )
+
+        yield self._end_background_update("event_fix_redactions_bytes")
+
+        return 1
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 57ce0304e9..4c4b76bd93 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -31,12 +31,11 @@ from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.types import get_domain_from_id
 from synapse.util import batch_iter
 from synapse.util.metrics import Measure
 
-from ._base import SQLBaseStore
-
 logger = logging.getLogger(__name__)
 
 
@@ -623,10 +622,14 @@ class EventsWorkerStore(SQLBaseStore):
                 " rej.reason "
                 " FROM event_json as e"
                 " LEFT JOIN rejections as rej USING (event_id)"
-                " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"] * len(evs)),)
+                " WHERE "
+            )
 
-            txn.execute(sql, evs)
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "e.event_id", evs
+            )
+
+            txn.execute(sql + clause, args)
 
             for row in txn:
                 event_id = row[0]
@@ -640,11 +643,11 @@ class EventsWorkerStore(SQLBaseStore):
                 }
 
             # check for redactions
-            redactions_sql = (
-                "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)"
-            ) % (",".join(["?"] * len(evs)),)
+            redactions_sql = "SELECT event_id, redacts FROM redactions WHERE "
+
+            clause, args = make_in_list_sql_clause(txn.database_engine, "redacts", evs)
 
-            txn.execute(redactions_sql, evs)
+            txn.execute(redactions_sql + clause, args)
 
             for (redacter, redacted) in txn:
                 d = event_dict.get(redacted)
@@ -753,10 +756,11 @@ class EventsWorkerStore(SQLBaseStore):
         results = set()
 
         def have_seen_events_txn(txn, chunk):
-            sql = "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" % (
-                ",".join("?" * len(chunk)),
+            sql = "SELECT event_id FROM events as e WHERE "
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "e.event_id", chunk
             )
-            txn.execute(sql, chunk)
+            txn.execute(sql + clause, args)
             for (event_id,) in txn:
                 results.add(event_id)
 
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 23b48f6cea..7c2a7da836 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -51,7 +51,7 @@ class FilteringStore(SQLBaseStore):
                 "SELECT filter_id FROM user_filters "
                 "WHERE user_id = ? AND filter_json = ?"
             )
-            txn.execute(sql, (user_localpart, def_json))
+            txn.execute(sql, (user_localpart, bytearray(def_json)))
             filter_id_response = txn.fetchone()
             if filter_id_response is not None:
                 return filter_id_response[0]
@@ -68,7 +68,7 @@ class FilteringStore(SQLBaseStore):
                 "INSERT INTO user_filters (user_id, filter_id, filter_json)"
                 "VALUES(?, ?, ?)"
             )
-            txn.execute(sql, (user_localpart, filter_id, def_json))
+            txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))
 
             return filter_id
 
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 752e9788a2..3803604be7 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -32,7 +32,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         super(MonthlyActiveUsersStore, self).__init__(None, hs)
         self._clock = hs.get_clock()
         self.hs = hs
-        self.reserved_users = ()
         # Do not add more reserved users than the total allowable number
         self._new_transaction(
             dbconn,
@@ -51,7 +50,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             txn (cursor):
             threepids (list[dict]): List of threepid dicts to reserve
         """
-        reserved_user_list = []
 
         for tp in threepids:
             user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
@@ -60,10 +58,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 is_support = self.is_support_user_txn(txn, user_id)
                 if not is_support:
                     self.upsert_monthly_active_user_txn(txn, user_id)
-                    reserved_user_list.append(user_id)
             else:
                 logger.warning("mau limit reserved threepid %s not found in db" % tp)
-        self.reserved_users = tuple(reserved_user_list)
 
     @defer.inlineCallbacks
     def reap_monthly_active_users(self):
@@ -74,8 +70,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             Deferred[]
         """
 
-        def _reap_users(txn):
-            # Purge stale users
+        def _reap_users(txn, reserved_users):
+            """
+            Args:
+                reserved_users (tuple): reserved users to preserve
+            """
 
             thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
             query_args = [thirty_days_ago]
@@ -83,20 +82,19 @@ class MonthlyActiveUsersStore(SQLBaseStore):
 
             # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
             # when len(reserved_users) == 0. Works fine on sqlite.
-            if len(self.reserved_users) > 0:
+            if len(reserved_users) > 0:
                 # questionmarks is a hack to overcome sqlite not supporting
                 # tuples in 'WHERE IN %s'
-                questionmarks = "?" * len(self.reserved_users)
+                question_marks = ",".join("?" * len(reserved_users))
 
-                query_args.extend(self.reserved_users)
-                sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                    ",".join(questionmarks)
-                )
+                query_args.extend(reserved_users)
+                sql = base_sql + " AND user_id NOT IN ({})".format(question_marks)
             else:
                 sql = base_sql
 
             txn.execute(sql, query_args)
 
+            max_mau_value = self.hs.config.max_mau_value
             if self.hs.config.limit_usage_by_mau:
                 # If MAU user count still exceeds the MAU threshold, then delete on
                 # a least recently active basis.
@@ -106,31 +104,52 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 # While Postgres does not require 'LIMIT', but also does not support
                 # negative LIMIT values. So there is no way to write it that both can
                 # support
-                safe_guard = self.hs.config.max_mau_value - len(self.reserved_users)
-                # Must be greater than zero for postgres
-                safe_guard = safe_guard if safe_guard > 0 else 0
-                query_args = [safe_guard]
-
-                base_sql = """
-                    DELETE FROM monthly_active_users
-                    WHERE user_id NOT IN (
-                        SELECT user_id FROM monthly_active_users
-                        ORDER BY timestamp DESC
-                        LIMIT ?
+                if len(reserved_users) == 0:
+                    sql = """
+                        DELETE FROM monthly_active_users
+                        WHERE user_id NOT IN (
+                            SELECT user_id FROM monthly_active_users
+                            ORDER BY timestamp DESC
+                            LIMIT ?
                         )
-                    """
+                        """
+                    txn.execute(sql, (max_mau_value,))
                 # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
                 # when len(reserved_users) == 0. Works fine on sqlite.
-                if len(self.reserved_users) > 0:
-                    query_args.extend(self.reserved_users)
-                    sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                        ",".join(questionmarks)
-                    )
                 else:
-                    sql = base_sql
-                txn.execute(sql, query_args)
+                    # Must be >= 0 for postgres
+                    num_of_non_reserved_users_to_remove = max(
+                        max_mau_value - len(reserved_users), 0
+                    )
+
+                    # It is important to filter reserved users twice to guard
+                    # against the case where the reserved user is present in the
+                    # SELECT, meaning that a legitmate mau is deleted.
+                    sql = """
+                        DELETE FROM monthly_active_users
+                        WHERE user_id NOT IN (
+                            SELECT user_id FROM monthly_active_users
+                            WHERE user_id NOT IN ({})
+                            ORDER BY timestamp DESC
+                            LIMIT ?
+                        )
+                        AND user_id NOT IN ({})
+                    """.format(
+                        question_marks, question_marks
+                    )
+
+                    query_args = [
+                        *reserved_users,
+                        num_of_non_reserved_users_to_remove,
+                        *reserved_users,
+                    ]
 
-        yield self.runInteraction("reap_monthly_active_users", _reap_users)
+                    txn.execute(sql, query_args)
+
+        reserved_users = yield self.get_registered_reserved_users()
+        yield self.runInteraction(
+            "reap_monthly_active_users", _reap_users, reserved_users
+        )
         # It seems poor to invalidate the whole cache, Postgres supports
         # 'Returning' which would allow me to invalidate only the
         # specific users, but sqlite has no way to do this and instead
@@ -159,21 +178,25 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         return self.runInteraction("count_users", _count_users)
 
     @defer.inlineCallbacks
-    def get_registered_reserved_users_count(self):
-        """Of the reserved threepids defined in config, how many are associated
+    def get_registered_reserved_users(self):
+        """Of the reserved threepids defined in config, which are associated
         with registered users?
 
         Returns:
-            Defered[int]: Number of real reserved users
+            Defered[list]: Real reserved users
         """
-        count = 0
-        for tp in self.hs.config.mau_limits_reserved_threepids:
+        users = []
+
+        for tp in self.hs.config.mau_limits_reserved_threepids[
+            : self.hs.config.max_mau_value
+        ]:
             user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
                 tp["medium"], tp["address"]
             )
             if user_id:
-                count = count + 1
-        return count
+                users.append(user_id)
+
+        return users
 
     @defer.inlineCallbacks
     def upsert_monthly_active_user(self, user_id):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 5db6f2d84a..3a641f538b 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -18,11 +18,10 @@ from collections import namedtuple
 from twisted.internet import defer
 
 from synapse.api.constants import PresenceState
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.util import batch_iter
 from synapse.util.caches.descriptors import cached, cachedList
 
-from ._base import SQLBaseStore
-
 
 class UserPresenceState(
     namedtuple(
@@ -119,14 +118,13 @@ class PresenceStore(SQLBaseStore):
         )
 
         # Delete old rows to stop database from getting really big
-        sql = (
-            "DELETE FROM presence_stream WHERE" " stream_id < ?" " AND user_id IN (%s)"
-        )
+        sql = "DELETE FROM presence_stream WHERE stream_id < ? AND "
 
         for states in batch_iter(presence_states, 50):
-            args = [stream_id]
-            args.extend(s.user_id for s in states)
-            txn.execute(sql % (",".join("?" for _ in states),), args)
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "user_id", [s.user_id for s in states]
+            )
+            txn.execute(sql + clause, [stream_id] + list(args))
 
     def get_all_presence_updates(self, last_id, current_id):
         if last_id == current_id:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 3e0e834a62..b12e80440a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -241,7 +241,7 @@ class PusherStore(PusherWorkerStore):
                     "device_display_name": device_display_name,
                     "ts": pushkey_ts,
                     "lang": lang,
-                    "data": encode_canonical_json(data),
+                    "data": bytearray(encode_canonical_json(data)),
                     "last_stream_ordering": last_stream_ordering,
                     "profile_tag": profile_tag,
                     "id": stream_id,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 290ddb30e8..0c24430f28 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,12 +21,11 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-from ._base import SQLBaseStore
-from .util.id_generators import StreamIdGenerator
-
 logger = logging.getLogger(__name__)
 
 
@@ -217,24 +216,26 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
         def f(txn):
             if from_key:
-                sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
-                    " room_id IN (%s) AND stream_id > ? AND stream_id <= ?"
-                ) % (",".join(["?"] * len(room_ids)))
-                args = list(room_ids)
-                args.extend([from_key, to_key])
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id > ? AND stream_id <= ? AND
+                """
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "room_id", room_ids
+                )
 
-                txn.execute(sql, args)
+                txn.execute(sql + clause, [from_key, to_key] + list(args))
             else:
-                sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
-                    " room_id IN (%s) AND stream_id <= ?"
-                ) % (",".join(["?"] * len(room_ids)))
+                sql = """
+                    SELECT * FROM receipts_linearized WHERE
+                    stream_id <= ? AND
+                """
 
-                args = list(room_ids)
-                args.append(to_key)
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "room_id", room_ids
+                )
 
-                txn.execute(sql, args)
+                txn.execute(sql + clause, [to_key] + list(args))
 
             return self.cursor_to_dict(txn)
 
@@ -433,13 +434,19 @@ class ReceiptsStore(ReceiptsWorkerStore):
             # we need to points in graph -> linearized form.
             # TODO: Make this better.
             def graph_to_linear(txn):
-                query = (
-                    "SELECT event_id WHERE room_id = ? AND stream_ordering IN ("
-                    " SELECT max(stream_ordering) WHERE event_id IN (%s)"
-                    ")"
-                ) % (",".join(["?"] * len(event_ids)))
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "event_id", event_ids
+                )
+
+                sql = """
+                    SELECT event_id WHERE room_id = ? AND stream_ordering IN (
+                        SELECT max(stream_ordering) WHERE %s
+                    )
+                """ % (
+                    clause,
+                )
 
-                txn.execute(query, [room_id] + event_ids)
+                txn.execute(sql, [room_id] + list(args))
                 rows = txn.fetchall()
                 if rows:
                     return rows[0][0]
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4e606a8380..ff63487823 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction
+from synapse.storage._base import LoggingTransaction, make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.events_worker import EventsWorkerStore
@@ -372,6 +372,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         results = []
         if membership_list:
             if self._current_state_events_membership_up_to_date:
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "c.membership", membership_list
+                )
                 sql = """
                     SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering
                     FROM current_state_events AS c
@@ -379,11 +382,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     WHERE
                         c.type = 'm.room.member'
                         AND state_key = ?
-                        AND c.membership IN (%s)
+                        AND %s
                 """ % (
-                    ",".join("?" * len(membership_list))
+                    clause,
                 )
             else:
+                clause, args = make_in_list_sql_clause(
+                    self.database_engine, "m.membership", membership_list
+                )
                 sql = """
                     SELECT room_id, e.sender, m.membership, event_id, e.stream_ordering
                     FROM current_state_events AS c
@@ -392,12 +398,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     WHERE
                         c.type = 'm.room.member'
                         AND state_key = ?
-                        AND m.membership IN (%s)
+                        AND %s
                 """ % (
-                    ",".join("?" * len(membership_list))
+                    clause,
                 )
 
-            txn.execute(sql, (user_id, *membership_list))
+            txn.execute(sql, (user_id, *args))
             results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]
 
         if do_invite:
diff --git a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
index f7bcc5e2f2..67471f3ef5 100644
--- a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
+++ b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
@@ -15,12 +15,11 @@
 
 
 -- There was a bug where we may have updated censored redactions as bytes,
--- which can (somehow) cause json to be inserted hex encoded. This goes and
--- undoes any such hex encoded JSON.
-UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
-WHERE event_id IN (
-  SELECT event_json.event_id
-  FROM event_json
-  INNER JOIN redactions ON (event_json.event_id = redacts)
-  WHERE have_censored AND json NOT LIKE '{%'
-);
+-- which can (somehow) cause json to be inserted hex encoded. These updates go
+-- and undoes any such hex encoded JSON.
+
+INSERT into background_updates (update_name, progress_json)
+  VALUES ('event_fix_redactions_bytes_create_index', '{}');
+
+INSERT into background_updates (update_name, progress_json, depends_on)
+  VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index');
diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py
index 60031f23ca..1de8b54961 100644
--- a/synapse/storage/schema/delta/56/unique_user_filter_index.py
+++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py
@@ -5,42 +5,48 @@ from synapse.storage.engines import PostgresEngine
 logger = logging.getLogger(__name__)
 
 
+"""
+This migration updates the user_filters table as follows:
+
+ - drops any (user_id, filter_id) duplicates
+ - makes the columns NON-NULLable
+ - turns the index into a UNIQUE index
+"""
+
+
 def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass
+
+
+def run_create(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
         select_clause = """
-        CREATE TEMPORARY TABLE user_filters_migration AS
             SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
-            FROM user_filters;
+            FROM user_filters
         """
     else:
         select_clause = """
-        CREATE TEMPORARY TABLE user_filters_migration AS
-            SELECT * FROM user_filters GROUP BY user_id, filter_id;
+            SELECT * FROM user_filters GROUP BY user_id, filter_id
         """
-    sql = (
-        """
-        BEGIN;
-            %s
-            DROP INDEX user_filters_by_user_id_filter_id;
-            DELETE FROM user_filters;
-            ALTER TABLE user_filters
-               ALTER COLUMN user_id SET NOT NULL,
-               ALTER COLUMN filter_id SET NOT NULL,
-               ALTER COLUMN filter_json SET NOT NULL;
-            INSERT INTO user_filters(user_id, filter_id, filter_json)
-                SELECT * FROM user_filters_migration;
-            DROP TABLE user_filters_migration;
-            CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique
-                ON user_filters(user_id, filter_id);
-        END;
-    """
-        % select_clause
+    sql = """
+            DROP TABLE IF EXISTS user_filters_migration;
+            DROP INDEX IF EXISTS user_filters_unique;
+            CREATE TABLE user_filters_migration (
+                user_id TEXT NOT NULL,
+                filter_id BIGINT NOT NULL,
+                filter_json BYTEA NOT NULL
+            );
+            INSERT INTO user_filters_migration (user_id, filter_id, filter_json)
+                %s;
+            CREATE UNIQUE INDEX user_filters_unique ON user_filters_migration
+                (user_id, filter_id);
+            DROP TABLE user_filters;
+            ALTER TABLE user_filters_migration RENAME TO user_filters;
+        """ % (
+        select_clause,
     )
+
     if isinstance(database_engine, PostgresEngine):
         cur.execute(sql)
     else:
         cur.executescript(sql)
-
-
-def run_create(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 6ba4190f1a..7695bf09fc 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -24,6 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 from .background_updates import BackgroundUpdateStore
@@ -385,8 +386,10 @@ class SearchStore(SearchBackgroundUpdateStore):
         # Make sure we don't explode because the person is in too many rooms.
         # We filter the results below regardless.
         if len(room_ids) < 500:
-            clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
-            args.extend(room_ids)
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", room_ids
+            )
+            clauses = [clause]
 
         local_clauses = []
         for key in keys:
@@ -492,8 +495,10 @@ class SearchStore(SearchBackgroundUpdateStore):
         # Make sure we don't explode because the person is in too many rooms.
         # We filter the results below regardless.
         if len(room_ids) < 500:
-            clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
-            args.extend(room_ids)
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", room_ids
+            )
+            clauses = [clause]
 
         local_clauses = []
         for key in keys:
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 5fdb442104..28f33ec18f 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
 
 
 class StateDeltasStore(SQLBaseStore):
-    def get_current_state_deltas(self, prev_stream_id):
+    def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int):
         """Fetch a list of room state changes since the given stream id
 
         Each entry in the result contains the following fields:
@@ -36,15 +36,27 @@ class StateDeltasStore(SQLBaseStore):
 
         Args:
             prev_stream_id (int): point to get changes since (exclusive)
+            max_stream_id (int): the point that we know has been correctly persisted
+               - ie, an upper limit to return changes from.
 
         Returns:
-            Deferred[list[dict]]: results
+            Deferred[tuple[int, list[dict]]: A tuple consisting of:
+               - the stream id which these results go up to
+               - list of current_state_delta_stream rows. If it is empty, we are
+                 up to date.
         """
         prev_stream_id = int(prev_stream_id)
+
+        # check we're not going backwards
+        assert prev_stream_id <= max_stream_id
+
         if not self._curr_state_delta_stream_cache.has_any_entity_changed(
             prev_stream_id
         ):
-            return []
+            # if the CSDs haven't changed between prev_stream_id and now, we
+            # know for certain that they haven't changed between prev_stream_id and
+            # max_stream_id.
+            return max_stream_id, []
 
         def get_current_state_deltas_txn(txn):
             # First we calculate the max stream id that will give us less than
@@ -54,21 +66,29 @@ class StateDeltasStore(SQLBaseStore):
             sql = """
                 SELECT stream_id, count(*)
                 FROM current_state_delta_stream
-                WHERE stream_id > ?
+                WHERE stream_id > ? AND stream_id <= ?
                 GROUP BY stream_id
                 ORDER BY stream_id ASC
                 LIMIT 100
             """
-            txn.execute(sql, (prev_stream_id,))
+            txn.execute(sql, (prev_stream_id, max_stream_id))
 
             total = 0
-            max_stream_id = prev_stream_id
-            for max_stream_id, count in txn:
+
+            for stream_id, count in txn:
                 total += count
                 if total > 100:
                     # We arbitarily limit to 100 entries to ensure we don't
                     # select toooo many.
+                    logger.debug(
+                        "Clipping current_state_delta_stream rows to stream_id %i",
+                        stream_id,
+                    )
+                    clipped_stream_id = stream_id
                     break
+            else:
+                # if there's no problem, we may as well go right up to the max_stream_id
+                clipped_stream_id = max_stream_id
 
             # Now actually get the deltas
             sql = """
@@ -77,8 +97,8 @@ class StateDeltasStore(SQLBaseStore):
                 WHERE ? < stream_id AND stream_id <= ?
                 ORDER BY stream_id ASC
             """
-            txn.execute(sql, (prev_stream_id, max_stream_id))
-            return self.cursor_to_dict(txn)
+            txn.execute(sql, (prev_stream_id, clipped_stream_id))
+            return clipped_stream_id, self.cursor_to_dict(txn)
 
         return self.runInteraction(
             "get_current_state_deltas", get_current_state_deltas_txn
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
index 05cabc2282..aa4f0da5f0 100644
--- a/synapse/storage/user_erasure_store.py
+++ b/synapse/storage/user_erasure_store.py
@@ -56,15 +56,15 @@ class UserErasureWorkerStore(SQLBaseStore):
         # iterate it multiple times, and (b) avoiding duplicates.
         user_ids = tuple(set(user_ids))
 
-        def _get_erased_users(txn):
-            txn.execute(
-                "SELECT user_id FROM erased_users WHERE user_id IN (%s)"
-                % (",".join("?" * len(user_ids))),
-                user_ids,
-            )
-            return set(r[0] for r in txn)
-
-        erased_users = yield self.runInteraction("are_users_erased", _get_erased_users)
+        rows = yield self._simple_select_many_batch(
+            table="erased_users",
+            column="user_id",
+            iterable=user_ids,
+            retcols=("user_id",),
+            desc="are_users_erased",
+        )
+        erased_users = set(row["user_id"] for row in rows)
+
         res = dict((u, u in erased_users) for u in user_ids)
         return res