summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py46
-rw-r--r--synapse/storage/_base.py10
-rw-r--r--synapse/storage/background_updates.py2
-rw-r--r--synapse/storage/devices.py12
-rw-r--r--synapse/storage/e2e_room_keys.py28
-rw-r--r--synapse/storage/engines/postgres.py8
-rw-r--r--synapse/storage/engines/sqlite.py2
-rw-r--r--synapse/storage/event_federation.py28
-rw-r--r--synapse/storage/event_push_actions.py4
-rw-r--r--synapse/storage/events.py71
-rw-r--r--synapse/storage/events_bg_updates.py16
-rw-r--r--synapse/storage/events_worker.py10
-rw-r--r--synapse/storage/group_server.py8
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/media_repository.py22
-rw-r--r--synapse/storage/monthly_active_users.py6
-rw-r--r--synapse/storage/prepare_database.py15
-rw-r--r--synapse/storage/profile.py2
-rw-r--r--synapse/storage/push_rule.py30
-rw-r--r--synapse/storage/pusher.py36
-rw-r--r--synapse/storage/receipts.py2
-rw-r--r--synapse/storage/registration.py227
-rw-r--r--synapse/storage/relations.py6
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/storage/schema/delta/20/pushers.py22
-rw-r--r--synapse/storage/schema/delta/30/as_users.py17
-rw-r--r--synapse/storage/schema/delta/31/pushers.py24
-rw-r--r--synapse/storage/schema/delta/33/remote_media_ts.py2
-rw-r--r--synapse/storage/schema/delta/47/state_group_seq.py5
-rw-r--r--synapse/storage/schema/delta/48/group_unique_indexes.py14
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py12
-rw-r--r--synapse/storage/schema/delta/55/users_alter_deactivated.sql19
-rw-r--r--synapse/storage/search.py28
-rw-r--r--synapse/storage/stats.py34
-rw-r--r--synapse/storage/stream.py32
35 files changed, 513 insertions, 297 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..6b0ca80087 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,35 @@ class DataStore(
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
+        yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+        return self.runInteraction("count_daily_users", self._count_users, yesterday)
 
-        def _count_users(txn):
-            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
-            sql = """
-                SELECT COALESCE(count(*), 0) FROM (
-                    SELECT user_id FROM user_ips
-                    WHERE last_seen > ?
-                    GROUP BY user_id
-                ) u
-            """
-
-            txn.execute(sql, (yesterday,))
-            count, = txn.fetchone()
-            return count
+    def count_monthly_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 30 days.
+        Note this method is intended for phonehome metrics only and is different
+        from the mau figure in synapse.storage.monthly_active_users which,
+        amongst other things, includes a 3 day grace period before a user counts.
+        """
+        thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+        return self.runInteraction(
+            "count_monthly_users", self._count_users, thirty_days_ago
+        )
 
-        return self.runInteraction("count_users", _count_users)
+    def _count_users(self, txn, time_from):
+        """
+        Returns number of users seen in the past time_from period
+        """
+        sql = """
+            SELECT COALESCE(count(*), 0) FROM (
+                SELECT user_id FROM user_ips
+                WHERE last_seen > ?
+                GROUP BY user_id
+            ) u
+        """
+        txn.execute(sql, (time_from,))
+        count, = txn.fetchone()
+        return count
 
     def count_r30_users(self):
         """
@@ -347,7 +359,7 @@ class DataStore(
             txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             for row in txn:
-                if row[0] == 'unknown':
+                if row[0] == "unknown":
                     pass
                 results[row[0]] = row[1]
 
@@ -374,7 +386,7 @@ class DataStore(
             txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             count, = txn.fetchone()
-            results['all'] = count
+            results["all"] = count
 
             return results
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d8054cd69d..aae43d0f99 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -307,12 +307,12 @@ class SQLBaseStore(object):
 
         def select_users_with_no_expiration_date_txn(txn):
             """Retrieves the list of registered users with no expiration date from the
-            database.
+            database, filtering out deactivated users.
             """
             sql = (
                 "SELECT users.name FROM users"
                 " LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
-                " WHERE account_validity.user_id is NULL;"
+                " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
             )
             txn.execute(sql, [])
 
@@ -320,9 +320,7 @@ class SQLBaseStore(object):
             if res:
                 for user in res:
                     self.set_expiration_date_for_user_txn(
-                        txn,
-                        user["name"],
-                        use_delta=True,
+                        txn, user["name"], use_delta=True
                     )
 
         yield self.runInteraction(
@@ -1675,7 +1673,7 @@ def db_to_json(db_content):
     # Decode it to a Unicode string before feeding it to json.loads, so we
     # consistenty get a Unicode-containing object out.
     if isinstance(db_content, (bytes, bytearray)):
-        db_content = db_content.decode('utf8')
+        db_content = db_content.decode("utf8")
 
     try:
         return json.loads(db_content)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b8b8273f73..50f913a414 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             in_flight = set(update["update_name"] for update in updates)
             for update in updates:
                 if update["depends_on"] not in in_flight:
-                    self._background_update_queue.append(update['update_name'])
+                    self._background_update_queue.append(update["update_name"])
 
         if not self._background_update_queue:
             # no work left to do
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d102e07372..3413a46675 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -149,9 +149,7 @@ class DeviceWorkerStore(SQLBaseStore):
             defer.returnValue((stream_id_cutoff, []))
 
         results = yield self._get_device_update_edus_by_remote(
-            destination,
-            from_stream_id,
-            query_map,
+            destination, from_stream_id, query_map
         )
 
         defer.returnValue((now_stream_id, results))
@@ -182,9 +180,7 @@ class DeviceWorkerStore(SQLBaseStore):
         return list(txn)
 
     @defer.inlineCallbacks
-    def _get_device_update_edus_by_remote(
-        self, destination, from_stream_id, query_map,
-    ):
+    def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map):
         """Returns a list of device update EDUs as well as E2EE keys
 
         Args:
@@ -210,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
             # The prev_id for the first row is always the last row before
             # `from_stream_id`
             prev_id = yield self._get_last_device_update_for_remote_user(
-                destination, user_id, from_stream_id,
+                destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
                 stream_id = query_map[(user_id, device_id)]
@@ -238,7 +234,7 @@ class DeviceWorkerStore(SQLBaseStore):
         defer.returnValue(results)
 
     def _get_last_device_update_for_remote_user(
-        self, destination, user_id, from_stream_id,
+        self, destination, user_id, from_stream_id
     ):
         def f(txn):
             prev_sent_id_sql = """
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 521936e3b0..f40ef2ab64 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             },
             values={
                 "version": version,
-                "first_message_index": room_key['first_message_index'],
-                "forwarded_count": room_key['forwarded_count'],
-                "is_verified": room_key['is_verified'],
-                "session_data": json.dumps(room_key['session_data']),
+                "first_message_index": room_key["first_message_index"],
+                "forwarded_count": room_key["forwarded_count"],
+                "is_verified": room_key["is_verified"],
+                "session_data": json.dumps(room_key["session_data"]),
             },
             lock=False,
         )
@@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         try:
             version = int(version)
         except ValueError:
-            defer.returnValue({'rooms': {}})
+            defer.returnValue({"rooms": {}})
 
         keyvalues = {"user_id": user_id, "version": version}
         if room_id:
-            keyvalues['room_id'] = room_id
+            keyvalues["room_id"] = room_id
             if session_id:
-                keyvalues['session_id'] = session_id
+                keyvalues["session_id"] = session_id
 
         rows = yield self._simple_select_list(
             table="e2e_room_keys",
@@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             desc="get_e2e_room_keys",
         )
 
-        sessions = {'rooms': {}}
+        sessions = {"rooms": {}}
         for row in rows:
-            room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
-            room_entry['sessions'][row['session_id']] = {
+            room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}})
+            room_entry["sessions"][row["session_id"]] = {
                 "first_message_index": row["first_message_index"],
                 "forwarded_count": row["forwarded_count"],
                 "is_verified": row["is_verified"],
@@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         keyvalues = {"user_id": user_id, "version": int(version)}
         if room_id:
-            keyvalues['room_id'] = room_id
+            keyvalues["room_id"] = room_id
             if session_id:
-                keyvalues['session_id'] = session_id
+                keyvalues["session_id"] = session_id
 
         yield self._simple_delete(
             table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
@@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
         row = txn.fetchone()
         if not row:
-            raise StoreError(404, 'No current backup version')
+            raise StoreError(404, "No current backup version")
         return row[0]
 
     def get_e2e_room_keys_version_info(self, user_id, version=None):
@@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             )
             current_version = txn.fetchone()[0]
             if current_version is None:
-                current_version = '0'
+                current_version = "0"
 
             new_version = str(int(current_version) + 1)
 
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 1b97ee74e3..289b6bc281 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -45,6 +45,10 @@ class PostgresEngine(object):
         # together. For example, version 8.1.5 will be returned as 80105
         self._version = db_conn.server_version
 
+        # Are we on a supported PostgreSQL version?
+        if self._version < 90500:
+            raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.")
+
         db_conn.set_isolation_level(
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
@@ -64,9 +68,9 @@ class PostgresEngine(object):
     @property
     def can_native_upsert(self):
         """
-        Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+        Can we use native UPSERTs?
         """
-        return self._version >= 90500
+        return True
 
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 933bcf42c2..e9b9caa49a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -85,7 +85,7 @@ class Sqlite3Engine(object):
 
 def _parse_match_info(buf):
     bufsize = len(buf)
-    return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
+    return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
 
 
 def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 09e39c2c28..cb4478342f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             room_id,
         )
 
+    def get_rooms_with_many_extremities(self, min_count, limit):
+        """Get the top rooms with at least N extremities.
+
+        Args:
+            min_count (int): The minimum number of extremities
+            limit (int): The maximum number of rooms to return.
+
+        Returns:
+            Deferred[list]: At most `limit` room IDs that have at least
+            `min_count` extremities, sorted by extremity count.
+        """
+
+        def _get_rooms_with_many_extremities_txn(txn):
+            sql = """
+                SELECT room_id FROM event_forward_extremities
+                GROUP BY room_id
+                HAVING count(*) > ?
+                ORDER BY count(*) DESC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (min_count, limit))
+            return [room_id for room_id, in txn]
+
+        return self.runInteraction(
+            "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
+        )
+
     @cached(max_entries=5000, iterable=True)
     def get_latest_event_ids_in_room(self, room_id):
         return self._simple_select_onecol(
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index a729f3e067..eca77069fd 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
         # by stream_ordering, oldest first.
-        notifs.sort(key=lambda r: r['stream_ordering'])
+        notifs.sort(key=lambda r: r["stream_ordering"])
 
         # Take only up to the limit. We have to stop at the limit because
         # one of the subqueries may have hit the limit.
@@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
         # by received_ts (most recent first)
-        notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+        notifs.sort(key=lambda r: -(r["received_ts"] or 0))
 
         # Now return the first `limit`
         defer.returnValue(notifs[:limit])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..fefba39ea1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,14 +17,14 @@
 
 import itertools
 import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
 from functools import wraps
 
 from six import iteritems, text_type
 from six.moves import range
 
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter(
     "synapse_storage_events_state_delta_reuse_delta", ""
 )
 
+# The number of forward extremities for each new event.
+forward_extremities_counter = Histogram(
+    "synapse_storage_events_forward_extremities_persisted",
+    "Number of forward extremities for each new event",
+    buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
+# The number of stale forward extremities for each new event. Stale extremities
+# are those that were in the previous set of extremities as well as the new.
+stale_forward_extremities_counter = Histogram(
+    "synapse_storage_events_stale_forward_extremities_persisted",
+    "Number of unchanged forward extremities for each new event",
+    buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
 
 def encode_json(json_object):
     """
@@ -220,13 +236,39 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
 
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+        # Collect metrics on the number of forward extremities that exist.
+        # Counter of number of extremities to count
+        self._current_forward_extremities_amount = c_counter()
+
+        BucketCollector(
+            "synapse_forward_extremities",
+            lambda: self._current_forward_extremities_amount,
+            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
+        )
+
+        # Read the extrems every 60 minutes
+        hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+    @defer.inlineCallbacks
+    def _read_forward_extremities(self):
+        def fetch(txn):
+            txn.execute(
+                """
+                select count(*) c from event_forward_extremities
+                group by room_id
+                """
+            )
+            return txn.fetchall()
+
+        res = yield self.runInteraction("read_forward_extremities", fetch)
+        self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
@@ -514,6 +556,8 @@ class EventsStore(
             and not event.internal_metadata.is_soft_failed()
         ]
 
+        latest_event_ids = set(latest_event_ids)
+
         # start with the existing forward extremities
         result = set(latest_event_ids)
 
@@ -537,6 +581,13 @@ class EventsStore(
         )
         result.difference_update(existing_prevs)
 
+        # We only update metrics for events that change forward extremities
+        # (e.g. we ignore backfill/outliers/etc)
+        if result != latest_event_ids:
+            forward_extremities_counter.observe(len(result))
+            stale = latest_event_ids & result
+            stale_forward_extremities_counter.observe(len(stale))
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -568,17 +619,11 @@ class EventsStore(
             )
 
             txn.execute(sql, batch)
-            results.extend(
-                r[0]
-                for r in txn
-                if not json.loads(r[1]).get("soft_failed")
-            )
+            results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_events_which_are_prevs",
-                _get_events_which_are_prevs_txn,
-                chunk,
+                "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
             )
 
         defer.returnValue(results)
@@ -640,9 +685,7 @@ class EventsStore(
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_prevs_before_rejected",
-                _get_prevs_before_rejected_txn,
-                chunk,
+                "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
             )
 
         defer.returnValue(existing_prevs)
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 75c1935bf3..1ce21d190c 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         self.register_background_update_handler(
-            self.DELETE_SOFT_FAILED_EXTREMITIES,
-            self._cleanup_extremities_bg_update,
+            self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
         )
 
     @defer.inlineCallbacks
@@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 LEFT JOIN events USING (event_id)
                 LEFT JOIN event_json USING (event_id)
                 LEFT JOIN rejections USING (event_id)
-                """, (batch_size,)
+                """,
+                (batch_size,),
             )
 
             for prev_event_id, event_id, metadata, rejected, outlier in txn:
@@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                     column="event_id",
                     iterable=to_delete,
                     keyvalues={},
-                    retcols=("room_id",)
+                    retcols=("room_id",),
                 )
                 room_ids = set(row["room_id"] for row in rows)
                 for room_id in room_ids:
                     txn.call_after(
-                        self.get_latest_event_ids_in_room.invalidate,
-                        (room_id,)
+                        self.get_latest_event_ids_in_room.invalidate, (room_id,)
                     )
 
             self._simple_delete_many_txn(
@@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             return len(original_set)
 
         num_handled = yield self.runInteraction(
-            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
         )
 
         if not num_handled:
@@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 txn.execute("DROP TABLE _extremities_to_check")
 
             yield self.runInteraction(
-                "_cleanup_extremities_bg_update_drop_table",
-                _drop_table_txn,
+                "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
             )
 
         defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index cc7df5cf14..6d680d405a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -27,7 +27,6 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import NotFoundError
 from synapse.api.room_versions import EventFormatVersions
 from synapse.events import FrozenEvent, event_type_from_format_version  # noqa: F401
-# these are only included to make the type annotations work
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -111,8 +110,7 @@ class EventsWorkerStore(SQLBaseStore):
             return ts
 
         return self.runInteraction(
-            "get_approximate_received_ts",
-            _get_approximate_received_ts_txn,
+            "get_approximate_received_ts", _get_approximate_received_ts_txn
         )
 
     @defer.inlineCallbacks
@@ -677,7 +675,8 @@ class EventsWorkerStore(SQLBaseStore):
         """
         return self.runInteraction(
             "get_total_state_event_counts",
-            self._get_total_state_event_counts_txn, room_id
+            self._get_total_state_event_counts_txn,
+            room_id,
         )
 
     def _get_current_state_event_counts_txn(self, txn, room_id):
@@ -701,7 +700,8 @@ class EventsWorkerStore(SQLBaseStore):
         """
         return self.runInteraction(
             "get_current_state_event_counts",
-            self._get_current_state_event_counts_txn, room_id
+            self._get_current_state_event_counts_txn,
+            room_id,
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index dce6a43ac1..73e6fc6de2 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore):
 
             for table in tables:
                 self._simple_delete_txn(
-                    txn,
-                    table=table,
-                    keyvalues={"group_id": group_id},
+                    txn, table=table, keyvalues={"group_id": group_id}
                 )
 
-        return self.runInteraction(
-            "delete_group", _delete_group_txn
-        )
+        return self.runInteraction("delete_group", _delete_group_txn)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index e3655ad8d7..e72f89e446 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore):
         def _invalidate(res):
             f = self._get_server_verify_key.invalidate
             for i in invalidations:
-                f((i, ))
+                f((i,))
             return res
 
         return self.runInteraction(
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 3ecf47e7a7..6b1238ce4a 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore):
         super(MediaRepositoryStore, self).__init__(db_conn, hs)
 
         self.register_background_index_update(
-            update_name='local_media_repository_url_idx',
-            index_name='local_media_repository_url_idx',
-            table='local_media_repository',
-            columns=['created_ts'],
-            where_clause='url_cache IS NOT NULL',
+            update_name="local_media_repository_url_idx",
+            index_name="local_media_repository_url_idx",
+            table="local_media_repository",
+            columns=["created_ts"],
+            where_clause="url_cache IS NOT NULL",
         )
 
     def get_local_media(self, media_id):
@@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore):
             return dict(
                 zip(
                     (
-                        'response_code',
-                        'etag',
-                        'expires_ts',
-                        'og',
-                        'media_id',
-                        'download_ts',
+                        "response_code",
+                        "etag",
+                        "expires_ts",
+                        "og",
+                        "media_id",
+                        "download_ts",
                     ),
                     row,
                 )
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 8aa8abc470..081564360f 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             if len(self.reserved_users) > 0:
                 # questionmarks is a hack to overcome sqlite not supporting
                 # tuples in 'WHERE IN %s'
-                questionmarks = '?' * len(self.reserved_users)
+                questionmarks = "?" * len(self.reserved_users)
 
                 query_args.extend(self.reserved_users)
                 sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                    ','.join(questionmarks)
+                    ",".join(questionmarks)
                 )
             else:
                 sql = base_sql
@@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 if len(self.reserved_users) > 0:
                     query_args.extend(self.reserved_users)
                     sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                        ','.join(questionmarks)
+                        ",".join(questionmarks)
                     )
                 else:
                     sql = base_sql
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index f2c1bed487..7c4e1dc7ec 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -133,7 +133,7 @@ def _setup_new_database(cur, database_engine):
             if ver <= SCHEMA_VERSION:
                 valid_dirs.append((ver, abs_path))
         else:
-            logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+            logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
 
     if not valid_dirs:
         raise PrepareDatabaseException(
@@ -146,9 +146,10 @@ def _setup_new_database(cur, database_engine):
 
     directory_entries = os.listdir(sql_dir)
 
-    for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter(
-        directory_entries, "*.sql." + specific
-    )):
+    for filename in sorted(
+        fnmatch.filter(directory_entries, "*.sql")
+        + fnmatch.filter(directory_entries, "*.sql." + specific)
+    ):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
         executescript(cur, sql_loc)
@@ -313,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config):
             application config
     """
     for (mod, _config) in config.password_providers:
-        if not hasattr(mod, 'get_db_schema_files'):
+        if not hasattr(mod, "get_db_schema_files"):
             continue
         modname = ".".join((mod.__module__, mod.__name__))
         _apply_module_schema_files(
@@ -343,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
             continue
 
         root_name, ext = os.path.splitext(name)
-        if ext != '.sql':
+        if ext != ".sql":
             raise PrepareDatabaseException(
                 "only .sql files are currently supported for module schemas"
             )
@@ -407,7 +408,7 @@ def get_statements(f):
 
 
 def executescript(txn, schema_path):
-    with open(schema_path, 'r') as f:
+    with open(schema_path, "r") as f:
         for statement in get_statements(f):
             txn.execute(statement)
 
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index aeec2f57c4..0ff392bdb4 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore):
 
         defer.returnValue(
             ProfileInfo(
-                avatar_url=profile['avatar_url'], display_name=profile['displayname']
+                avatar_url=profile["avatar_url"], display_name=profile["displayname"]
             )
         )
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9e406baafa..98cec8c82b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map):
     rules = list(list_with_base_rules(ruleslist))
 
     for i, rule in enumerate(rules):
-        rule_id = rule['rule_id']
+        rule_id = rule["rule_id"]
         if rule_id in enabled_map:
-            if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+            if rule.get("enabled", True) != bool(enabled_map[rule_id]):
                 # Rules are cached across users.
                 rule = dict(rule)
-                rule['enabled'] = bool(enabled_map[rule_id])
+                rule["enabled"] = bool(enabled_map[rule_id])
                 rules[i] = rule
 
     return rules
@@ -126,12 +126,12 @@ class PushRulesWorkerStore(
     def get_push_rules_enabled_for_user(self, user_id):
         results = yield self._simple_select_list(
             table="push_rules_enable",
-            keyvalues={'user_name': user_id},
+            keyvalues={"user_name": user_id},
             retcols=("user_name", "rule_id", "enabled"),
             desc="get_push_rules_enabled_for_user",
         )
         defer.returnValue(
-            {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
+            {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
         )
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
@@ -175,7 +175,7 @@ class PushRulesWorkerStore(
         rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
 
         for row in rows:
-            results.setdefault(row['user_name'], []).append(row)
+            results.setdefault(row["user_name"], []).append(row)
 
         enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
 
@@ -194,7 +194,7 @@ class PushRulesWorkerStore(
             rule (Dict): A push rule.
         """
         # Create new rule id
-        rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1])
+        rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
         new_rule_id = rule_id_scope + "/" + new_room_id
 
         # Change room id in each condition
@@ -334,8 +334,8 @@ class PushRulesWorkerStore(
             desc="bulk_get_push_rules_enabled",
         )
         for row in rows:
-            enabled = bool(row['enabled'])
-            results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
+            enabled = bool(row["enabled"])
+            results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         defer.returnValue(results)
 
 
@@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore):
 
         def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
             self._simple_delete_one_txn(
-                txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id}
+                txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
             )
 
             self._insert_push_rules_update_txn(
@@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore):
         self._simple_upsert_txn(
             txn,
             "push_rules_enable",
-            {'user_name': user_id, 'rule_id': rule_id},
-            {'enabled': 1 if enabled else 0},
-            {'id': new_id},
+            {"user_name": user_id, "rule_id": rule_id},
+            {"enabled": 1 if enabled else 0},
+            {"id": new_id},
         )
 
         self._insert_push_rules_update_txn(
@@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore):
                 self._simple_update_one_txn(
                     txn,
                     "push_rules",
-                    {'user_name': user_id, 'rule_id': rule_id},
-                    {'actions': actions_json},
+                    {"user_name": user_id, "rule_id": rule_id},
+                    {"actions": actions_json},
                 )
 
             self._insert_push_rules_update_txn(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1567e1df48..cfe0a94330 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -37,24 +37,24 @@ else:
 class PusherWorkerStore(SQLBaseStore):
     def _decode_pushers_rows(self, rows):
         for r in rows:
-            dataJson = r['data']
-            r['data'] = None
+            dataJson = r["data"]
+            r["data"] = None
             try:
                 if isinstance(dataJson, db_binary_type):
                     dataJson = str(dataJson).decode("UTF8")
 
-                r['data'] = json.loads(dataJson)
+                r["data"] = json.loads(dataJson)
             except Exception as e:
                 logger.warn(
                     "Invalid JSON in data for pusher %d: %s, %s",
-                    r['id'],
+                    r["id"],
                     dataJson,
                     e.args[0],
                 )
                 pass
 
-            if isinstance(r['pushkey'], db_binary_type):
-                r['pushkey'] = str(r['pushkey']).decode("UTF8")
+            if isinstance(r["pushkey"], db_binary_type):
+                r["pushkey"] = str(r["pushkey"]).decode("UTF8")
 
         return rows
 
@@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore):
     )
     def get_if_users_have_pushers(self, user_ids):
         rows = yield self._simple_select_many_batch(
-            table='pushers',
-            column='user_name',
+            table="pushers",
+            column="user_name",
             iterable=user_ids,
-            retcols=['user_name'],
-            desc='get_if_users_have_pushers',
+            retcols=["user_name"],
+            desc="get_if_users_have_pushers",
         )
 
         result = {user_id: False for user_id in user_ids}
-        result.update({r['user_name']: True for r in rows})
+        result.update({r["user_name"]: True for r in rows})
 
         defer.returnValue(result)
 
@@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore):
     ):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_stream_ordering': last_stream_ordering},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            {"last_stream_ordering": last_stream_ordering},
             desc="update_pusher_last_stream_ordering",
         )
 
@@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore):
     ):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             {
-                'last_stream_ordering': last_stream_ordering,
-                'last_success': last_success,
+                "last_stream_ordering": last_stream_ordering,
+                "last_success": last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
@@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'failing_since': failing_since},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            {"failing_since": failing_since},
             desc="update_pusher_failing_since",
         )
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index a1647e50a1..b477da12b1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
     @cachedInlineCallbacks()
     def get_users_with_read_receipts_in_room(self, room_id):
         receipts = yield self.get_receipts_for_room(room_id, "m.read")
-        defer.returnValue(set(r['user_id'] for r in receipts))
+        defer.returnValue(set(r["user_id"] for r in receipts))
 
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1dd1182e82..983ce13291 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import re
 
 from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
 
+logger = logging.getLogger(__name__)
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
@@ -113,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore):
         defer.returnValue(res)
 
     @defer.inlineCallbacks
-    def set_account_validity_for_user(self, user_id, expiration_ts, email_sent,
-                                      renewal_token=None):
+    def set_account_validity_for_user(
+        self, user_id, expiration_ts, email_sent, renewal_token=None
+    ):
         """Updates the account validity properties of the given account, with the
         given values.
 
@@ -128,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             renewal_token (str): Renewal token the user can use to extend the validity
                 of their account. Defaults to no token.
         """
+
         def set_account_validity_for_user_txn(txn):
             self._simple_update_txn(
                 txn=txn,
@@ -140,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore):
                 },
             )
             self._invalidate_cache_and_stream(
-                txn, self.get_expiration_ts_for_user, (user_id,),
+                txn, self.get_expiration_ts_for_user, (user_id,)
             )
 
         yield self.runInteraction(
-            "set_account_validity_for_user",
-            set_account_validity_for_user_txn,
+            "set_account_validity_for_user", set_account_validity_for_user_txn
         )
 
     @defer.inlineCallbacks
@@ -214,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]]
         """
+
         def select_users_txn(txn, now_ms, renew_at):
             sql = (
                 "SELECT user_id, expiration_ts_ms FROM account_validity"
@@ -226,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore):
         res = yield self.runInteraction(
             "get_users_expiring_soon",
             select_users_txn,
-            self.clock.time_msec(), self.config.account_validity.renew_at,
+            self.clock.time_msec(),
+            self.config.account_validity.renew_at,
         )
 
         defer.returnValue(res)
@@ -249,6 +255,20 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
+    def delete_account_validity_for_user(self, user_id):
+        """Deletes the entry for the given user in the account validity table, removing
+        their expiration date and renewal token.
+
+        Args:
+            user_id (str): ID of the user to remove from the account validity table.
+        """
+        yield self._simple_delete_one(
+            table="account_validity",
+            keyvalues={"user_id": user_id},
+            desc="delete_account_validity_for_user",
+        )
+
+    @defer.inlineCallbacks
     def is_server_admin(self, user):
         res = yield self._simple_select_one_onecol(
             table="users",
@@ -352,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                     WHERE creation_ts > ?
                 ) AS t GROUP BY user_type
             """
-            results = {'native': 0, 'guest': 0, 'bridged': 0}
+            results = {"native": 0, "guest": 0, "bridged": 0}
             txn.execute(sql, (yesterday,))
             for row in txn:
                 results[row[0]] = row[1]
@@ -418,7 +438,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             {"medium": medium, "address": address},
             ["guest_access_token"],
             True,
-            'get_3pid_guest_access_token',
+            "get_3pid_guest_access_token",
         )
         if ret:
             defer.returnValue(ret["guest_access_token"])
@@ -455,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore):
             txn,
             "user_threepids",
             {"medium": medium, "address": address},
-            ['user_id'],
+            ["user_id"],
             True,
         )
         if ret:
-            return ret['user_id']
+            return ret["user_id"]
         return None
 
     @defer.inlineCallbacks
@@ -475,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore):
         ret = yield self._simple_select_list(
             "user_threepids",
             {"user_id": user_id},
-            ['medium', 'address', 'validated_at', 'added_at'],
-            'user_get_threepids',
+            ["medium", "address", "validated_at", "added_at"],
+            "user_get_threepids",
         )
         defer.returnValue(ret)
 
@@ -555,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
         return self._simple_select_onecol(
             table="user_threepid_id_server",
-            keyvalues={
-                "user_id": user_id,
-                "medium": medium,
-                "address": address,
-            },
+            keyvalues={"user_id": user_id, "medium": medium, "address": address},
             retcol="id_server",
             desc="get_id_servers_user_bound",
         )
@@ -595,15 +611,80 @@ class RegistrationStore(
         self.register_noop_background_update("refresh_tokens_device_index")
 
         self.register_background_update_handler(
-            "user_threepids_grandfather", self._bg_user_threepids_grandfather,
+            "user_threepids_grandfather", self._bg_user_threepids_grandfather
+        )
+
+        self.register_background_update_handler(
+            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag
         )
 
         # Create a background job for culling expired 3PID validity tokens
         hs.get_clock().looping_call(
-            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
+            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
         )
 
     @defer.inlineCallbacks
+    def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+        """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+        for each of them.
+        """
+
+        last_user = progress.get("user_id", "")
+
+        def _backgroud_update_set_deactivated_flag_txn(txn):
+            txn.execute(
+                """
+                SELECT
+                    users.name,
+                    COUNT(access_tokens.token) AS count_tokens,
+                    COUNT(user_threepids.address) AS count_threepids
+                FROM users
+                    LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+                    LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+                WHERE (users.password_hash IS NULL OR users.password_hash = '')
+                AND (users.appservice_id IS NULL OR users.appservice_id = '')
+                AND users.is_guest = 0
+                AND users.name > ?
+                GROUP BY users.name
+                ORDER BY users.name ASC
+                LIMIT ?;
+                """,
+                (last_user, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            rows_processed_nb = 0
+
+            for user in rows:
+                if not user["count_tokens"] and not user["count_threepids"]:
+                    self.set_user_deactivated_status_txn(txn, user["name"], True)
+                    rows_processed_nb += 1
+
+            logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+            self._background_update_progress_txn(
+                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn
+        )
+
+        if end:
+            yield self._end_background_update("users_set_deactivated_flag")
+
+        defer.returnValue(batch_size)
+
+    @defer.inlineCallbacks
     def add_access_token_to_user(self, user_id, token, device_id=None):
         """Adds an access token for the given user.
 
@@ -768,7 +849,7 @@ class RegistrationStore(
 
         def user_set_password_hash_txn(txn):
             self._simple_update_one_txn(
-                txn, 'users', {'name': user_id}, {'password_hash': password_hash}
+                txn, "users", {"name": user_id}, {"password_hash": password_hash}
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -789,9 +870,9 @@ class RegistrationStore(
         def f(txn):
             self._simple_update_one_txn(
                 txn,
-                table='users',
-                keyvalues={'name': user_id},
-                updatevalues={'consent_version': consent_version},
+                table="users",
+                keyvalues={"name": user_id},
+                updatevalues={"consent_version": consent_version},
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -813,9 +894,9 @@ class RegistrationStore(
         def f(txn):
             self._simple_update_one_txn(
                 txn,
-                table='users',
-                keyvalues={'name': user_id},
-                updatevalues={'consent_server_notice_sent': consent_version},
+                table="users",
+                keyvalues={"name": user_id},
+                updatevalues={"consent_server_notice_sent": consent_version},
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -985,7 +1066,7 @@ class RegistrationStore(
 
         if id_servers:
             yield self.runInteraction(
-                "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn,
+                "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
             )
 
         yield self._end_background_update("user_threepids_grandfather")
@@ -993,12 +1074,7 @@ class RegistrationStore(
         defer.returnValue(1)
 
     def get_threepid_validation_session(
-        self,
-        medium,
-        client_secret,
-        address=None,
-        sid=None,
-        validated=True,
+        self, medium, client_secret, address=None, sid=None, validated=True
     ):
         """Gets a session_id and last_send_attempt (if available) for a
         client_secret/medium/(address|session_id) combo
@@ -1018,23 +1094,22 @@ class RegistrationStore(
                 latest session_id and send_attempt count for this 3PID.
                 Otherwise None if there hasn't been a previous attempt
         """
-        keyvalues = {
-            "medium": medium,
-            "client_secret": client_secret,
-        }
+        keyvalues = {"medium": medium, "client_secret": client_secret}
         if address:
             keyvalues["address"] = address
         if sid:
             keyvalues["session_id"] = sid
 
-        assert(address or sid)
+        assert address or sid
 
         def get_threepid_validation_session_txn(txn):
             sql = """
                 SELECT address, session_id, medium, client_secret,
                 last_send_attempt, validated_at
                 FROM threepid_validation_session WHERE %s
-                """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),)
+                """ % (
+                " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+            )
 
             if validated is not None:
                 sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
@@ -1049,17 +1124,10 @@ class RegistrationStore(
             return rows[0]
 
         return self.runInteraction(
-            "get_threepid_validation_session",
-            get_threepid_validation_session_txn,
+            "get_threepid_validation_session", get_threepid_validation_session_txn
         )
 
-    def validate_threepid_session(
-        self,
-        session_id,
-        client_secret,
-        token,
-        current_ts,
-    ):
+    def validate_threepid_session(self, session_id, client_secret, token, current_ts):
         """Attempt to validate a threepid session using a token
 
         Args:
@@ -1091,7 +1159,7 @@ class RegistrationStore(
 
             if retrieved_client_secret != client_secret:
                 raise ThreepidValidationError(
-                    400, "This client_secret does not match the provided session_id",
+                    400, "This client_secret does not match the provided session_id"
                 )
 
             row = self._simple_select_one_txn(
@@ -1104,7 +1172,7 @@ class RegistrationStore(
 
             if not row:
                 raise ThreepidValidationError(
-                    400, "Validation token not found or has expired",
+                    400, "Validation token not found or has expired"
                 )
             expires = row["expires"]
             next_link = row["next_link"]
@@ -1115,7 +1183,7 @@ class RegistrationStore(
 
             if expires <= current_ts:
                 raise ThreepidValidationError(
-                    400, "This token has expired. Please request a new one",
+                    400, "This token has expired. Please request a new one"
                 )
 
             # Looks good. Validate the session
@@ -1130,8 +1198,7 @@ class RegistrationStore(
 
         # Return next_link if it exists
         return self.runInteraction(
-            "validate_threepid_session_txn",
-            validate_threepid_session_txn,
+            "validate_threepid_session_txn", validate_threepid_session_txn
         )
 
     def upsert_threepid_validation_session(
@@ -1198,6 +1265,7 @@ class RegistrationStore(
             token_expires (int): The timestamp for which after the token
                 will no longer be valid
         """
+
         def start_or_continue_validation_session_txn(txn):
             # Create or update a validation session
             self._simple_upsert_txn(
@@ -1231,6 +1299,7 @@ class RegistrationStore(
 
     def cull_expired_threepid_validation_tokens(self):
         """Remove threepid validation tokens with expiry dates that have passed"""
+
         def cull_expired_threepid_validation_tokens_txn(txn, ts):
             sql = """
             DELETE FROM threepid_validation_token WHERE
@@ -1252,6 +1321,7 @@ class RegistrationStore(
         Args:
             session_id (str): The ID of the session to delete
         """
+
         def delete_threepid_session_txn(txn):
             self._simple_delete_txn(
                 txn,
@@ -1265,6 +1335,53 @@ class RegistrationStore(
             )
 
         return self.runInteraction(
-            "delete_threepid_session",
-            delete_threepid_session_txn,
+            "delete_threepid_session", delete_threepid_session_txn
+        )
+
+    def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+        self._simple_update_one_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            updatevalues={"deactivated": 1 if deactivated else 0},
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_deactivated_status, (user_id,)
         )
+
+    @defer.inlineCallbacks
+    def set_user_deactivated_status(self, user_id, deactivated):
+        """Set the `deactivated` property for the provided user to the provided value.
+
+        Args:
+            user_id (str): The ID of the user to set the status for.
+            deactivated (bool): The value to set for `deactivated`.
+        """
+
+        yield self.runInteraction(
+            "set_user_deactivated_status",
+            self.set_user_deactivated_status_txn,
+            user_id,
+            deactivated,
+        )
+
+    @cachedInlineCallbacks()
+    def get_user_deactivated_status(self, user_id):
+        """Retrieve the value for the `deactivated` property for the provided user.
+
+        Args:
+            user_id (str): The ID of the user to retrieve the status for.
+
+        Returns:
+            defer.Deferred(bool): The requested value.
+        """
+
+        res = yield self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="deactivated",
+            desc="get_user_deactivated_status",
+        )
+
+        # Convert the integer into a boolean.
+        defer.returnValue(res == 1)
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 4c83800cca..1b01934c19 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore):
         """
 
         self._simple_delete_txn(
-            txn,
-            table="event_relations",
-            keyvalues={
-                "event_id": redacted_event_id,
-            }
+            txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
         )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7617913326..8004aeb909 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -420,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 table="room_memberships",
                 column="event_id",
                 iterable=missing_member_event_ids,
-                retcols=('user_id', 'display_name', 'avatar_url'),
+                retcols=("user_id", "display_name", "avatar_url"),
                 keyvalues={"membership": Membership.JOIN},
                 batch_size=500,
                 desc="_get_joined_users_from_context",
@@ -448,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cachedInlineCallbacks(max_entries=10000)
     def is_host_joined(self, room_id, host):
-        if '%' in host or '_' in host:
+        if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
@@ -490,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             Deferred: Resolves to True if the host is/was in the room, otherwise
             False.
         """
-        if '%' in host or '_' in host:
+        if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
@@ -723,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 room_id = row["room_id"]
                 try:
                     event_json = json.loads(row["json"])
-                    content = event_json['content']
+                    content = event_json["content"]
                 except Exception:
                     continue
 
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 147496a38b..3edfcfd783 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
 
 def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table...")
-    cur.execute("""
+    cur.execute(
+        """
         CREATE TABLE IF NOT EXISTS pushers2 (
           id BIGINT PRIMARY KEY,
           user_name TEXT NOT NULL,
@@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs):
           failing_since BIGINT,
           UNIQUE (app_id, pushkey, user_name)
         )
-    """)
-    cur.execute("""SELECT
+    """
+    )
+    cur.execute(
+        """SELECT
         id, user_name, access_token, profile_tag, kind,
         app_id, app_display_name, device_display_name,
         pushkey, ts, lang, data, last_token, last_success,
         failing_since
         FROM pushers
-    """)
+    """
+    )
     count = 0
     for row in cur.fetchall():
         row = list(row)
         row[8] = bytes(row[8]).decode("utf-8")
         row[11] = bytes(row[11]).decode("utf-8")
-        cur.execute(database_engine.convert_param_style("""
+        cur.execute(
+            database_engine.convert_param_style(
+                """
             INSERT into pushers2 (
             id, user_name, access_token, profile_tag, kind,
             app_id, app_display_name, device_display_name,
             pushkey, ts, lang, data, last_token, last_success,
             failing_since
-            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
-            row
+            ) values (%s)"""
+                % (",".join(["?" for _ in range(len(row))]))
+            ),
+            row,
         )
         count += 1
     cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index ef7ec34346..9b95411fb6 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         logger.warning("Could not get app_service_config_files from config")
         pass
 
-    appservices = load_appservices(
-        config.server_name, config_files
-    )
+    appservices = load_appservices(config.server_name, config_files)
 
     owned = {}
 
@@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
                 if user_id in owned.keys():
                     logger.error(
                         "user_id %s was owned by more than one application"
-                        " service (IDs %s and %s); assigning arbitrarily to %s" %
-                        (user_id, owned[user_id], appservice.id, owned[user_id])
+                        " service (IDs %s and %s); assigning arbitrarily to %s"
+                        % (user_id, owned[user_id], appservice.id, owned[user_id])
                     )
                 owned.setdefault(appservice.id, []).append(user_id)
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
+        user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
-                    "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
-                        ",".join("?" for _ in chunk),
-                    )
+                    "UPDATE users SET appservice_id = ? WHERE name IN (%s)"
+                    % (",".join("?" for _ in chunk),)
                 ),
-                [as_id] + chunk
+                [as_id] + chunk,
             )
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
index 93367fa09e..9bb504aad5 100644
--- a/synapse/storage/schema/delta/31/pushers.py
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -24,12 +24,13 @@ logger = logging.getLogger(__name__)
 
 
 def token_to_stream_ordering(token):
-    return int(token[1:].split('_')[0])
+    return int(token[1:].split("_")[0])
 
 
 def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table, delta 31...")
-    cur.execute("""
+    cur.execute(
+        """
         CREATE TABLE IF NOT EXISTS pushers2 (
           id BIGINT PRIMARY KEY,
           user_name TEXT NOT NULL,
@@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs):
           failing_since BIGINT,
           UNIQUE (app_id, pushkey, user_name)
         )
-    """)
-    cur.execute("""SELECT
+    """
+    )
+    cur.execute(
+        """SELECT
         id, user_name, access_token, profile_tag, kind,
         app_id, app_display_name, device_display_name,
         pushkey, ts, lang, data, last_token, last_success,
         failing_since
         FROM pushers
-    """)
+    """
+    )
     count = 0
     for row in cur.fetchall():
         row = list(row)
         row[12] = token_to_stream_ordering(row[12])
-        cur.execute(database_engine.convert_param_style("""
+        cur.execute(
+            database_engine.convert_param_style(
+                """
             INSERT into pushers2 (
             id, user_name, access_token, profile_tag, kind,
             app_id, app_display_name, device_display_name,
             pushkey, ts, lang, data, last_stream_ordering, last_success,
             failing_since
-            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
-            row
+            ) values (%s)"""
+                % (",".join(["?" for _ in range(len(row))]))
+            ),
+            row,
         )
         count += 1
     cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 9754d3ccfb..a26057dfb6 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         database_engine.convert_param_style(
             "UPDATE remote_media_cache SET last_access_ts = ?"
         ),
-        (int(time.time() * 1000),)
+        (int(time.time() * 1000),),
     )
diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py
index f6766501d2..9fd1ccf6f7 100644
--- a/synapse/storage/schema/delta/47/state_group_seq.py
+++ b/synapse/storage/schema/delta/47/state_group_seq.py
@@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs):
         else:
             start_val = row[0] + 1
 
-        cur.execute(
-            "CREATE SEQUENCE state_group_id_seq START WITH %s",
-            (start_val, ),
-        )
+        cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
 
 
 def run_upgrade(*args, **kwargs):
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
index 2233af87d7..49f5f2c003 100644
--- a/synapse/storage/schema/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs):
     rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
 
     # remove duplicates from group_users & group_invites tables
-    cur.execute("""
+    cur.execute(
+        """
         DELETE FROM group_users WHERE %s NOT IN (
            SELECT min(%s) FROM group_users GROUP BY group_id, user_id
         );
-    """ % (rowid, rowid))
-    cur.execute("""
+    """
+        % (rowid, rowid)
+    )
+    cur.execute(
+        """
         DELETE FROM group_invites WHERE %s NOT IN (
            SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
         );
-    """ % (rowid, rowid))
+    """
+        % (rowid, rowid)
+    )
 
     for statement in get_statements(FIX_INDEXES.splitlines()):
         cur.execute(statement)
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 6dd467b6c5..b1684a8441 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs):
 
 def run_upgrade(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
-        cur.execute("""
+        cur.execute(
+            """
             ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
-        """)
+        """
+        )
         return
 
     # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
 
-    cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+    cur.execute(
+        "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
+    )
     (oldsql,) = cur.fetchone()
 
     sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
@@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     cur.execute("PRAGMA writable_schema=ON")
     cur.execute(
         "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
-        (sql, ),
+        (sql,),
     )
     cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
     cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('users_set_deactivated_flag', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index ff49eaae02..f3b1cec933 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore
 logger = logging.getLogger(__name__)
 
 SearchEntry = namedtuple(
-    'SearchEntry',
-    ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+    "SearchEntry",
+    ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
 )
 
 
@@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
-        have_added_index = progress['have_added_indexes']
+        have_added_index = progress["have_added_indexes"]
 
         if not have_added_index:
 
@@ -341,29 +341,7 @@ class SearchStore(BackgroundUpdateStore):
                 for entry in entries
             )
 
-            # inserts to a GIN index are normally batched up into a pending
-            # list, and then all committed together once the list gets to a
-            # certain size. The trouble with that is that postgres (pre-9.5)
-            # uses work_mem to determine the length of the list, and work_mem
-            # is typically very large.
-            #
-            # We therefore reduce work_mem while we do the insert.
-            #
-            # (postgres 9.5 uses the separate gin_pending_list_limit setting,
-            # so doesn't suffer the same problem, but changing work_mem will
-            # be harmless)
-            #
-            # Note that we don't need to worry about restoring it on
-            # exception, because exceptions will cause the transaction to be
-            # rolled back, including the effects of the SET command.
-            #
-            # Also: we use SET rather than SET LOCAL because there's lots of
-            # other stuff going on in this transaction, which want to have the
-            # normal work_mem setting.
-
-            txn.execute("SET work_mem='256kB'")
             txn.executemany(sql, args)
-            txn.execute("RESET work_mem")
 
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index ff266b09b0..1cec84ee2e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -71,7 +71,8 @@ class StatsStore(StateDeltasStore):
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
             # Create the temporary tables
-            stmts = get_statements("""
+            stmts = get_statements(
+                """
                 -- We just recreate the table, we'll be reinserting the
                 -- correct entries again later anyway.
                 DROP TABLE IF EXISTS {temp}_rooms;
@@ -85,7 +86,10 @@ class StatsStore(StateDeltasStore):
                     ON {temp}_rooms(events);
                 CREATE INDEX {temp}_rooms_id
                     ON {temp}_rooms(room_id);
-            """.format(temp=TEMP_TABLE).splitlines())
+            """.format(
+                    temp=TEMP_TABLE
+                ).splitlines()
+            )
 
             for statement in stmts:
                 txn.execute(statement)
@@ -105,7 +109,9 @@ class StatsStore(StateDeltasStore):
                 LEFT JOIN room_stats_earliest_token AS t USING (room_id)
                 WHERE t.room_id IS NULL
                 GROUP BY c.room_id
-            """ % (TEMP_TABLE,)
+            """ % (
+                TEMP_TABLE,
+            )
             txn.execute(sql)
 
         new_pos = yield self.get_max_stream_id_in_current_state_deltas()
@@ -184,7 +190,8 @@ class StatsStore(StateDeltasStore):
 
         logger.info(
             "Processing the next %d rooms of %d remaining",
-            len(rooms_to_work_on), progress["remaining"],
+            len(rooms_to_work_on),
+            progress["remaining"],
         )
 
         # Number of state events we've processed by going through each room
@@ -204,10 +211,17 @@ class StatsStore(StateDeltasStore):
             avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
             canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
 
-            state_events = yield self.get_events([
-                join_rules_id, history_visibility_id, encryption_id, name_id,
-                topic_id, avatar_id, canonical_alias_id,
-            ])
+            state_events = yield self.get_events(
+                [
+                    join_rules_id,
+                    history_visibility_id,
+                    encryption_id,
+                    name_id,
+                    topic_id,
+                    avatar_id,
+                    canonical_alias_id,
+                ]
+            )
 
             def _get_or_none(event_id, arg):
                 event = state_events.get(event_id)
@@ -271,7 +285,7 @@ class StatsStore(StateDeltasStore):
 
                 # We've finished a room. Delete it from the table.
                 self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
                 )
 
             yield self.runInteraction("update_room_stats", _fetch_data)
@@ -338,7 +352,7 @@ class StatsStore(StateDeltasStore):
             "name",
             "topic",
             "avatar",
-            "canonical_alias"
+            "canonical_alias",
         ):
             field = fields.get(col)
             if field and "\0" in field:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6f7f65d96b..d9482a3843 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -65,7 +65,7 @@ _EventDictReturn = namedtuple(
 
 
 def generate_pagination_where_clause(
-    direction, column_names, from_token, to_token, engine,
+    direction, column_names, from_token, to_token, engine
 ):
     """Creates an SQL expression to bound the columns by the pagination
     tokens.
@@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
         str
     """
 
-    assert(bound in (">", "<", ">=", "<="))
+    assert bound in (">", "<", ">=", "<=")
 
     name1, name2 = column_names
     val1, val2 = values
@@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
         # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
         # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
         # use the later form when running against postgres.
-        return "((%d,%d) %s (%s,%s))" % (
-            val1, val2,
-            bound,
-            name1, name2,
-        )
+        return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2)
 
     # We want to generate queries of e.g. the form:
     #
@@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(
-        self, room_ids, from_key, to_key, limit=0, order='DESC'
+        self, room_ids, from_key, to_key, limit=0, order="DESC"
     ):
         """Get new room events in stream ordering since `from_key`.
 
@@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(
-        self, room_id, from_key, to_key, limit=0, order='DESC'
+        self, room_id, from_key, to_key, limit=0, order="DESC"
     ):
 
         """Get new room events in stream ordering since `from_key`.
@@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-        ret = yield self.get_events_as_list([
-            r.event_id for r in rows], get_prev_content=True,
+        ret = yield self.get_events_as_list(
+            [r.event_id for r in rows], get_prev_content=True
         )
 
         self._set_before_and_after(ret, rows, topo_order=from_id is None)
@@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self.get_events_as_list(
-            [r.event_id for r in rows], get_prev_content=True,
+            [r.event_id for r in rows], get_prev_content=True
         )
 
         self._set_before_and_after(ret, rows, topo_order=False)
@@ -725,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn,
             room_id,
             before_token,
-            direction='b',
+            direction="b",
             limit=before_limit,
             event_filter=event_filter,
         )
@@ -735,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn,
             room_id,
             after_token,
-            direction='f',
+            direction="f",
             limit=after_limit,
             event_filter=event_filter,
         )
@@ -816,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         room_id,
         from_token,
         to_token=None,
-        direction='b',
+        direction="b",
         limit=-1,
         event_filter=None,
     ):
@@ -846,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
         args = [False, room_id]
-        if direction == 'b':
+        if direction == "b":
             order = "DESC"
         else:
             order = "ASC"
@@ -882,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         if rows:
             topo = rows[-1].topological_ordering
             toke = rows[-1].stream_ordering
-            if direction == 'b':
+            if direction == "b":
                 # Tokens are positions between events.
                 # This token points *after* the last event in the chunk.
                 # We need it to point to the event before it in the chunk
@@ -898,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(
-        self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None
+        self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None
     ):
         """Returns list of events before or after a given token.