summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py10
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/background_updates.py2
-rw-r--r--synapse/storage/devices.py12
-rw-r--r--synapse/storage/e2e_room_keys.py28
-rw-r--r--synapse/storage/engines/sqlite.py2
-rw-r--r--synapse/storage/event_federation.py28
-rw-r--r--synapse/storage/event_push_actions.py4
-rw-r--r--synapse/storage/events.py28
-rw-r--r--synapse/storage/events_bg_updates.py16
-rw-r--r--synapse/storage/events_worker.py10
-rw-r--r--synapse/storage/group_server.py8
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/media_repository.py22
-rw-r--r--synapse/storage/monthly_active_users.py6
-rw-r--r--synapse/storage/prepare_database.py15
-rw-r--r--synapse/storage/profile.py2
-rw-r--r--synapse/storage/push_rule.py30
-rw-r--r--synapse/storage/pusher.py36
-rw-r--r--synapse/storage/receipts.py2
-rw-r--r--synapse/storage/registration.py109
-rw-r--r--synapse/storage/relations.py6
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/storage/schema/delta/20/pushers.py22
-rw-r--r--synapse/storage/schema/delta/30/as_users.py17
-rw-r--r--synapse/storage/schema/delta/31/pushers.py24
-rw-r--r--synapse/storage/schema/delta/33/remote_media_ts.py2
-rw-r--r--synapse/storage/schema/delta/47/state_group_seq.py5
-rw-r--r--synapse/storage/schema/delta/48/group_unique_indexes.py14
-rw-r--r--synapse/storage/schema/delta/50/make_event_content_nullable.py12
-rw-r--r--synapse/storage/search.py6
-rw-r--r--synapse/storage/stats.py34
-rw-r--r--synapse/storage/stream.py32
33 files changed, 306 insertions, 254 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0ca6f6121f..6b0ca80087 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -280,7 +280,7 @@ class DataStore(
         Counts the number of users who used this homeserver in the last 24 hours.
         """
         yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-        return self.runInteraction("count_daily_users", self._count_users, yesterday,)
+        return self.runInteraction("count_daily_users", self._count_users, yesterday)
 
     def count_monthly_users(self):
         """
@@ -291,9 +291,7 @@ class DataStore(
         """
         thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
         return self.runInteraction(
-            "count_monthly_users",
-            self._count_users,
-            thirty_days_ago,
+            "count_monthly_users", self._count_users, thirty_days_ago
         )
 
     def _count_users(self, txn, time_from):
@@ -361,7 +359,7 @@ class DataStore(
             txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             for row in txn:
-                if row[0] == 'unknown':
+                if row[0] == "unknown":
                     pass
                 results[row[0]] = row[1]
 
@@ -388,7 +386,7 @@ class DataStore(
             txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
             count, = txn.fetchone()
-            results['all'] = count
+            results["all"] = count
 
             return results
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 910f6ee9de..cbd6568c41 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -311,9 +311,7 @@ class SQLBaseStore(object):
             if res:
                 for user in res:
                     self.set_expiration_date_for_user_txn(
-                        txn,
-                        user["name"],
-                        use_delta=True,
+                        txn, user["name"], use_delta=True
                     )
 
         yield self.runInteraction(
@@ -1662,7 +1660,7 @@ def db_to_json(db_content):
     # Decode it to a Unicode string before feeding it to json.loads, so we
     # consistenty get a Unicode-containing object out.
     if isinstance(db_content, (bytes, bytearray)):
-        db_content = db_content.decode('utf8')
+        db_content = db_content.decode("utf8")
 
     try:
         return json.loads(db_content)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b8b8273f73..50f913a414 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             in_flight = set(update["update_name"] for update in updates)
             for update in updates:
                 if update["depends_on"] not in in_flight:
-                    self._background_update_queue.append(update['update_name'])
+                    self._background_update_queue.append(update["update_name"])
 
         if not self._background_update_queue:
             # no work left to do
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d102e07372..3413a46675 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -149,9 +149,7 @@ class DeviceWorkerStore(SQLBaseStore):
             defer.returnValue((stream_id_cutoff, []))
 
         results = yield self._get_device_update_edus_by_remote(
-            destination,
-            from_stream_id,
-            query_map,
+            destination, from_stream_id, query_map
         )
 
         defer.returnValue((now_stream_id, results))
@@ -182,9 +180,7 @@ class DeviceWorkerStore(SQLBaseStore):
         return list(txn)
 
     @defer.inlineCallbacks
-    def _get_device_update_edus_by_remote(
-        self, destination, from_stream_id, query_map,
-    ):
+    def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map):
         """Returns a list of device update EDUs as well as E2EE keys
 
         Args:
@@ -210,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
             # The prev_id for the first row is always the last row before
             # `from_stream_id`
             prev_id = yield self._get_last_device_update_for_remote_user(
-                destination, user_id, from_stream_id,
+                destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
                 stream_id = query_map[(user_id, device_id)]
@@ -238,7 +234,7 @@ class DeviceWorkerStore(SQLBaseStore):
         defer.returnValue(results)
 
     def _get_last_device_update_for_remote_user(
-        self, destination, user_id, from_stream_id,
+        self, destination, user_id, from_stream_id
     ):
         def f(txn):
             prev_sent_id_sql = """
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 521936e3b0..f40ef2ab64 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             },
             values={
                 "version": version,
-                "first_message_index": room_key['first_message_index'],
-                "forwarded_count": room_key['forwarded_count'],
-                "is_verified": room_key['is_verified'],
-                "session_data": json.dumps(room_key['session_data']),
+                "first_message_index": room_key["first_message_index"],
+                "forwarded_count": room_key["forwarded_count"],
+                "is_verified": room_key["is_verified"],
+                "session_data": json.dumps(room_key["session_data"]),
             },
             lock=False,
         )
@@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         try:
             version = int(version)
         except ValueError:
-            defer.returnValue({'rooms': {}})
+            defer.returnValue({"rooms": {}})
 
         keyvalues = {"user_id": user_id, "version": version}
         if room_id:
-            keyvalues['room_id'] = room_id
+            keyvalues["room_id"] = room_id
             if session_id:
-                keyvalues['session_id'] = session_id
+                keyvalues["session_id"] = session_id
 
         rows = yield self._simple_select_list(
             table="e2e_room_keys",
@@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             desc="get_e2e_room_keys",
         )
 
-        sessions = {'rooms': {}}
+        sessions = {"rooms": {}}
         for row in rows:
-            room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
-            room_entry['sessions'][row['session_id']] = {
+            room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}})
+            room_entry["sessions"][row["session_id"]] = {
                 "first_message_index": row["first_message_index"],
                 "forwarded_count": row["forwarded_count"],
                 "is_verified": row["is_verified"],
@@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         keyvalues = {"user_id": user_id, "version": int(version)}
         if room_id:
-            keyvalues['room_id'] = room_id
+            keyvalues["room_id"] = room_id
             if session_id:
-                keyvalues['session_id'] = session_id
+                keyvalues["session_id"] = session_id
 
         yield self._simple_delete(
             table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
@@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
         row = txn.fetchone()
         if not row:
-            raise StoreError(404, 'No current backup version')
+            raise StoreError(404, "No current backup version")
         return row[0]
 
     def get_e2e_room_keys_version_info(self, user_id, version=None):
@@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             )
             current_version = txn.fetchone()[0]
             if current_version is None:
-                current_version = '0'
+                current_version = "0"
 
             new_version = str(int(current_version) + 1)
 
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 933bcf42c2..e9b9caa49a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -85,7 +85,7 @@ class Sqlite3Engine(object):
 
 def _parse_match_info(buf):
     bufsize = len(buf)
-    return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
+    return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
 
 
 def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 09e39c2c28..cb4478342f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             room_id,
         )
 
+    def get_rooms_with_many_extremities(self, min_count, limit):
+        """Get the top rooms with at least N extremities.
+
+        Args:
+            min_count (int): The minimum number of extremities
+            limit (int): The maximum number of rooms to return.
+
+        Returns:
+            Deferred[list]: At most `limit` room IDs that have at least
+            `min_count` extremities, sorted by extremity count.
+        """
+
+        def _get_rooms_with_many_extremities_txn(txn):
+            sql = """
+                SELECT room_id FROM event_forward_extremities
+                GROUP BY room_id
+                HAVING count(*) > ?
+                ORDER BY count(*) DESC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (min_count, limit))
+            return [room_id for room_id, in txn]
+
+        return self.runInteraction(
+            "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
+        )
+
     @cached(max_entries=5000, iterable=True)
     def get_latest_event_ids_in_room(self, room_id):
         return self._simple_select_onecol(
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index a729f3e067..eca77069fd 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
         # by stream_ordering, oldest first.
-        notifs.sort(key=lambda r: r['stream_ordering'])
+        notifs.sort(key=lambda r: r["stream_ordering"])
 
         # Take only up to the limit. We have to stop at the limit because
         # one of the subqueries may have hit the limit.
@@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
         # by received_ts (most recent first)
-        notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+        notifs.sort(key=lambda r: -(r["received_ts"] or 0))
 
         # Now return the first `limit`
         defer.returnValue(notifs[:limit])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f631fb1733..fefba39ea1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,7 +24,7 @@ from six import iteritems, text_type
 from six.moves import range
 
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
@@ -74,6 +74,21 @@ state_delta_reuse_delta_counter = Counter(
     "synapse_storage_events_state_delta_reuse_delta", ""
 )
 
+# The number of forward extremities for each new event.
+forward_extremities_counter = Histogram(
+    "synapse_storage_events_forward_extremities_persisted",
+    "Number of forward extremities for each new event",
+    buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
+# The number of stale forward extremities for each new event. Stale extremities
+# are those that were in the previous set of extremities as well as the new.
+stale_forward_extremities_counter = Histogram(
+    "synapse_storage_events_stale_forward_extremities_persisted",
+    "Number of unchanged forward extremities for each new event",
+    buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
 
 def encode_json(json_object):
     """
@@ -234,7 +249,7 @@ class EventsStore(
         BucketCollector(
             "synapse_forward_extremities",
             lambda: self._current_forward_extremities_amount,
-            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
         )
 
         # Read the extrems every 60 minutes
@@ -541,6 +556,8 @@ class EventsStore(
             and not event.internal_metadata.is_soft_failed()
         ]
 
+        latest_event_ids = set(latest_event_ids)
+
         # start with the existing forward extremities
         result = set(latest_event_ids)
 
@@ -564,6 +581,13 @@ class EventsStore(
         )
         result.difference_update(existing_prevs)
 
+        # We only update metrics for events that change forward extremities
+        # (e.g. we ignore backfill/outliers/etc)
+        if result != latest_event_ids:
+            forward_extremities_counter.observe(len(result))
+            stale = latest_event_ids & result
+            stale_forward_extremities_counter.observe(len(stale))
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 75c1935bf3..1ce21d190c 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         self.register_background_update_handler(
-            self.DELETE_SOFT_FAILED_EXTREMITIES,
-            self._cleanup_extremities_bg_update,
+            self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
         )
 
     @defer.inlineCallbacks
@@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 LEFT JOIN events USING (event_id)
                 LEFT JOIN event_json USING (event_id)
                 LEFT JOIN rejections USING (event_id)
-                """, (batch_size,)
+                """,
+                (batch_size,),
             )
 
             for prev_event_id, event_id, metadata, rejected, outlier in txn:
@@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                     column="event_id",
                     iterable=to_delete,
                     keyvalues={},
-                    retcols=("room_id",)
+                    retcols=("room_id",),
                 )
                 room_ids = set(row["room_id"] for row in rows)
                 for room_id in room_ids:
                     txn.call_after(
-                        self.get_latest_event_ids_in_room.invalidate,
-                        (room_id,)
+                        self.get_latest_event_ids_in_room.invalidate, (room_id,)
                     )
 
             self._simple_delete_many_txn(
@@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             return len(original_set)
 
         num_handled = yield self.runInteraction(
-            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
         )
 
         if not num_handled:
@@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 txn.execute("DROP TABLE _extremities_to_check")
 
             yield self.runInteraction(
-                "_cleanup_extremities_bg_update_drop_table",
-                _drop_table_txn,
+                "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
             )
 
         defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index cc7df5cf14..6d680d405a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -27,7 +27,6 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import NotFoundError
 from synapse.api.room_versions import EventFormatVersions
 from synapse.events import FrozenEvent, event_type_from_format_version  # noqa: F401
-# these are only included to make the type annotations work
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -111,8 +110,7 @@ class EventsWorkerStore(SQLBaseStore):
             return ts
 
         return self.runInteraction(
-            "get_approximate_received_ts",
-            _get_approximate_received_ts_txn,
+            "get_approximate_received_ts", _get_approximate_received_ts_txn
         )
 
     @defer.inlineCallbacks
@@ -677,7 +675,8 @@ class EventsWorkerStore(SQLBaseStore):
         """
         return self.runInteraction(
             "get_total_state_event_counts",
-            self._get_total_state_event_counts_txn, room_id
+            self._get_total_state_event_counts_txn,
+            room_id,
         )
 
     def _get_current_state_event_counts_txn(self, txn, room_id):
@@ -701,7 +700,8 @@ class EventsWorkerStore(SQLBaseStore):
         """
         return self.runInteraction(
             "get_current_state_event_counts",
-            self._get_current_state_event_counts_txn, room_id
+            self._get_current_state_event_counts_txn,
+            room_id,
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index dce6a43ac1..73e6fc6de2 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore):
 
             for table in tables:
                 self._simple_delete_txn(
-                    txn,
-                    table=table,
-                    keyvalues={"group_id": group_id},
+                    txn, table=table, keyvalues={"group_id": group_id}
                 )
 
-        return self.runInteraction(
-            "delete_group", _delete_group_txn
-        )
+        return self.runInteraction("delete_group", _delete_group_txn)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index e3655ad8d7..e72f89e446 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore):
         def _invalidate(res):
             f = self._get_server_verify_key.invalidate
             for i in invalidations:
-                f((i, ))
+                f((i,))
             return res
 
         return self.runInteraction(
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 3ecf47e7a7..6b1238ce4a 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore):
         super(MediaRepositoryStore, self).__init__(db_conn, hs)
 
         self.register_background_index_update(
-            update_name='local_media_repository_url_idx',
-            index_name='local_media_repository_url_idx',
-            table='local_media_repository',
-            columns=['created_ts'],
-            where_clause='url_cache IS NOT NULL',
+            update_name="local_media_repository_url_idx",
+            index_name="local_media_repository_url_idx",
+            table="local_media_repository",
+            columns=["created_ts"],
+            where_clause="url_cache IS NOT NULL",
         )
 
     def get_local_media(self, media_id):
@@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore):
             return dict(
                 zip(
                     (
-                        'response_code',
-                        'etag',
-                        'expires_ts',
-                        'og',
-                        'media_id',
-                        'download_ts',
+                        "response_code",
+                        "etag",
+                        "expires_ts",
+                        "og",
+                        "media_id",
+                        "download_ts",
                     ),
                     row,
                 )
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 8aa8abc470..081564360f 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             if len(self.reserved_users) > 0:
                 # questionmarks is a hack to overcome sqlite not supporting
                 # tuples in 'WHERE IN %s'
-                questionmarks = '?' * len(self.reserved_users)
+                questionmarks = "?" * len(self.reserved_users)
 
                 query_args.extend(self.reserved_users)
                 sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                    ','.join(questionmarks)
+                    ",".join(questionmarks)
                 )
             else:
                 sql = base_sql
@@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 if len(self.reserved_users) > 0:
                     query_args.extend(self.reserved_users)
                     sql = base_sql + """ AND user_id NOT IN ({})""".format(
-                        ','.join(questionmarks)
+                        ",".join(questionmarks)
                     )
                 else:
                     sql = base_sql
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index f2c1bed487..7c4e1dc7ec 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -133,7 +133,7 @@ def _setup_new_database(cur, database_engine):
             if ver <= SCHEMA_VERSION:
                 valid_dirs.append((ver, abs_path))
         else:
-            logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+            logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
 
     if not valid_dirs:
         raise PrepareDatabaseException(
@@ -146,9 +146,10 @@ def _setup_new_database(cur, database_engine):
 
     directory_entries = os.listdir(sql_dir)
 
-    for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter(
-        directory_entries, "*.sql." + specific
-    )):
+    for filename in sorted(
+        fnmatch.filter(directory_entries, "*.sql")
+        + fnmatch.filter(directory_entries, "*.sql." + specific)
+    ):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
         executescript(cur, sql_loc)
@@ -313,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config):
             application config
     """
     for (mod, _config) in config.password_providers:
-        if not hasattr(mod, 'get_db_schema_files'):
+        if not hasattr(mod, "get_db_schema_files"):
             continue
         modname = ".".join((mod.__module__, mod.__name__))
         _apply_module_schema_files(
@@ -343,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
             continue
 
         root_name, ext = os.path.splitext(name)
-        if ext != '.sql':
+        if ext != ".sql":
             raise PrepareDatabaseException(
                 "only .sql files are currently supported for module schemas"
             )
@@ -407,7 +408,7 @@ def get_statements(f):
 
 
 def executescript(txn, schema_path):
-    with open(schema_path, 'r') as f:
+    with open(schema_path, "r") as f:
         for statement in get_statements(f):
             txn.execute(statement)
 
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index aeec2f57c4..0ff392bdb4 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore):
 
         defer.returnValue(
             ProfileInfo(
-                avatar_url=profile['avatar_url'], display_name=profile['displayname']
+                avatar_url=profile["avatar_url"], display_name=profile["displayname"]
             )
         )
 
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9e406baafa..98cec8c82b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map):
     rules = list(list_with_base_rules(ruleslist))
 
     for i, rule in enumerate(rules):
-        rule_id = rule['rule_id']
+        rule_id = rule["rule_id"]
         if rule_id in enabled_map:
-            if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+            if rule.get("enabled", True) != bool(enabled_map[rule_id]):
                 # Rules are cached across users.
                 rule = dict(rule)
-                rule['enabled'] = bool(enabled_map[rule_id])
+                rule["enabled"] = bool(enabled_map[rule_id])
                 rules[i] = rule
 
     return rules
@@ -126,12 +126,12 @@ class PushRulesWorkerStore(
     def get_push_rules_enabled_for_user(self, user_id):
         results = yield self._simple_select_list(
             table="push_rules_enable",
-            keyvalues={'user_name': user_id},
+            keyvalues={"user_name": user_id},
             retcols=("user_name", "rule_id", "enabled"),
             desc="get_push_rules_enabled_for_user",
         )
         defer.returnValue(
-            {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
+            {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
         )
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
@@ -175,7 +175,7 @@ class PushRulesWorkerStore(
         rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
 
         for row in rows:
-            results.setdefault(row['user_name'], []).append(row)
+            results.setdefault(row["user_name"], []).append(row)
 
         enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
 
@@ -194,7 +194,7 @@ class PushRulesWorkerStore(
             rule (Dict): A push rule.
         """
         # Create new rule id
-        rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1])
+        rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
         new_rule_id = rule_id_scope + "/" + new_room_id
 
         # Change room id in each condition
@@ -334,8 +334,8 @@ class PushRulesWorkerStore(
             desc="bulk_get_push_rules_enabled",
         )
         for row in rows:
-            enabled = bool(row['enabled'])
-            results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
+            enabled = bool(row["enabled"])
+            results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         defer.returnValue(results)
 
 
@@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore):
 
         def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
             self._simple_delete_one_txn(
-                txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id}
+                txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
             )
 
             self._insert_push_rules_update_txn(
@@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore):
         self._simple_upsert_txn(
             txn,
             "push_rules_enable",
-            {'user_name': user_id, 'rule_id': rule_id},
-            {'enabled': 1 if enabled else 0},
-            {'id': new_id},
+            {"user_name": user_id, "rule_id": rule_id},
+            {"enabled": 1 if enabled else 0},
+            {"id": new_id},
         )
 
         self._insert_push_rules_update_txn(
@@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore):
                 self._simple_update_one_txn(
                     txn,
                     "push_rules",
-                    {'user_name': user_id, 'rule_id': rule_id},
-                    {'actions': actions_json},
+                    {"user_name": user_id, "rule_id": rule_id},
+                    {"actions": actions_json},
                 )
 
             self._insert_push_rules_update_txn(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1567e1df48..cfe0a94330 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -37,24 +37,24 @@ else:
 class PusherWorkerStore(SQLBaseStore):
     def _decode_pushers_rows(self, rows):
         for r in rows:
-            dataJson = r['data']
-            r['data'] = None
+            dataJson = r["data"]
+            r["data"] = None
             try:
                 if isinstance(dataJson, db_binary_type):
                     dataJson = str(dataJson).decode("UTF8")
 
-                r['data'] = json.loads(dataJson)
+                r["data"] = json.loads(dataJson)
             except Exception as e:
                 logger.warn(
                     "Invalid JSON in data for pusher %d: %s, %s",
-                    r['id'],
+                    r["id"],
                     dataJson,
                     e.args[0],
                 )
                 pass
 
-            if isinstance(r['pushkey'], db_binary_type):
-                r['pushkey'] = str(r['pushkey']).decode("UTF8")
+            if isinstance(r["pushkey"], db_binary_type):
+                r["pushkey"] = str(r["pushkey"]).decode("UTF8")
 
         return rows
 
@@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore):
     )
     def get_if_users_have_pushers(self, user_ids):
         rows = yield self._simple_select_many_batch(
-            table='pushers',
-            column='user_name',
+            table="pushers",
+            column="user_name",
             iterable=user_ids,
-            retcols=['user_name'],
-            desc='get_if_users_have_pushers',
+            retcols=["user_name"],
+            desc="get_if_users_have_pushers",
         )
 
         result = {user_id: False for user_id in user_ids}
-        result.update({r['user_name']: True for r in rows})
+        result.update({r["user_name"]: True for r in rows})
 
         defer.returnValue(result)
 
@@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore):
     ):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_stream_ordering': last_stream_ordering},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            {"last_stream_ordering": last_stream_ordering},
             desc="update_pusher_last_stream_ordering",
         )
 
@@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore):
     ):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
             {
-                'last_stream_ordering': last_stream_ordering,
-                'last_success': last_success,
+                "last_stream_ordering": last_stream_ordering,
+                "last_success": last_success,
             },
             desc="update_pusher_last_stream_ordering_and_success",
         )
@@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore):
     def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
         yield self._simple_update_one(
             "pushers",
-            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'failing_since': failing_since},
+            {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+            {"failing_since": failing_since},
             desc="update_pusher_failing_since",
         )
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index a1647e50a1..b477da12b1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
     @cachedInlineCallbacks()
     def get_users_with_read_receipts_in_room(self, room_id):
         receipts = yield self.get_receipts_for_room(room_id, "m.read")
-        defer.returnValue(set(r['user_id'] for r in receipts))
+        defer.returnValue(set(r["user_id"] for r in receipts))
 
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d36917e4d6..983ce13291 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -116,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore):
         defer.returnValue(res)
 
     @defer.inlineCallbacks
-    def set_account_validity_for_user(self, user_id, expiration_ts, email_sent,
-                                      renewal_token=None):
+    def set_account_validity_for_user(
+        self, user_id, expiration_ts, email_sent, renewal_token=None
+    ):
         """Updates the account validity properties of the given account, with the
         given values.
 
@@ -131,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             renewal_token (str): Renewal token the user can use to extend the validity
                 of their account. Defaults to no token.
         """
+
         def set_account_validity_for_user_txn(txn):
             self._simple_update_txn(
                 txn=txn,
@@ -143,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore):
                 },
             )
             self._invalidate_cache_and_stream(
-                txn, self.get_expiration_ts_for_user, (user_id,),
+                txn, self.get_expiration_ts_for_user, (user_id,)
             )
 
         yield self.runInteraction(
-            "set_account_validity_for_user",
-            set_account_validity_for_user_txn,
+            "set_account_validity_for_user", set_account_validity_for_user_txn
         )
 
     @defer.inlineCallbacks
@@ -217,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         Returns:
             Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]]
         """
+
         def select_users_txn(txn, now_ms, renew_at):
             sql = (
                 "SELECT user_id, expiration_ts_ms FROM account_validity"
@@ -229,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore):
         res = yield self.runInteraction(
             "get_users_expiring_soon",
             select_users_txn,
-            self.clock.time_msec(), self.config.account_validity.renew_at,
+            self.clock.time_msec(),
+            self.config.account_validity.renew_at,
         )
 
         defer.returnValue(res)
@@ -369,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                     WHERE creation_ts > ?
                 ) AS t GROUP BY user_type
             """
-            results = {'native': 0, 'guest': 0, 'bridged': 0}
+            results = {"native": 0, "guest": 0, "bridged": 0}
             txn.execute(sql, (yesterday,))
             for row in txn:
                 results[row[0]] = row[1]
@@ -435,7 +438,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             {"medium": medium, "address": address},
             ["guest_access_token"],
             True,
-            'get_3pid_guest_access_token',
+            "get_3pid_guest_access_token",
         )
         if ret:
             defer.returnValue(ret["guest_access_token"])
@@ -472,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore):
             txn,
             "user_threepids",
             {"medium": medium, "address": address},
-            ['user_id'],
+            ["user_id"],
             True,
         )
         if ret:
-            return ret['user_id']
+            return ret["user_id"]
         return None
 
     @defer.inlineCallbacks
@@ -492,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore):
         ret = yield self._simple_select_list(
             "user_threepids",
             {"user_id": user_id},
-            ['medium', 'address', 'validated_at', 'added_at'],
-            'user_get_threepids',
+            ["medium", "address", "validated_at", "added_at"],
+            "user_get_threepids",
         )
         defer.returnValue(ret)
 
@@ -572,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
         return self._simple_select_onecol(
             table="user_threepid_id_server",
-            keyvalues={
-                "user_id": user_id,
-                "medium": medium,
-                "address": address,
-            },
+            keyvalues={"user_id": user_id, "medium": medium, "address": address},
             retcol="id_server",
             desc="get_id_servers_user_bound",
         )
@@ -612,16 +611,16 @@ class RegistrationStore(
         self.register_noop_background_update("refresh_tokens_device_index")
 
         self.register_background_update_handler(
-            "user_threepids_grandfather", self._bg_user_threepids_grandfather,
+            "user_threepids_grandfather", self._bg_user_threepids_grandfather
         )
 
         self.register_background_update_handler(
-            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag
         )
 
         # Create a background job for culling expired 3PID validity tokens
         hs.get_clock().looping_call(
-            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
+            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
         )
 
     @defer.inlineCallbacks
@@ -662,7 +661,7 @@ class RegistrationStore(
 
             for user in rows:
                 if not user["count_tokens"] and not user["count_threepids"]:
-                    self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+                    self.set_user_deactivated_status_txn(txn, user["name"], True)
                     rows_processed_nb += 1
 
             logger.info("Marked %d rows as deactivated", rows_processed_nb)
@@ -677,8 +676,7 @@ class RegistrationStore(
                 return False
 
         end = yield self.runInteraction(
-            "users_set_deactivated_flag",
-            _backgroud_update_set_deactivated_flag_txn,
+            "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn
         )
 
         if end:
@@ -851,7 +849,7 @@ class RegistrationStore(
 
         def user_set_password_hash_txn(txn):
             self._simple_update_one_txn(
-                txn, 'users', {'name': user_id}, {'password_hash': password_hash}
+                txn, "users", {"name": user_id}, {"password_hash": password_hash}
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -872,9 +870,9 @@ class RegistrationStore(
         def f(txn):
             self._simple_update_one_txn(
                 txn,
-                table='users',
-                keyvalues={'name': user_id},
-                updatevalues={'consent_version': consent_version},
+                table="users",
+                keyvalues={"name": user_id},
+                updatevalues={"consent_version": consent_version},
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -896,9 +894,9 @@ class RegistrationStore(
         def f(txn):
             self._simple_update_one_txn(
                 txn,
-                table='users',
-                keyvalues={'name': user_id},
-                updatevalues={'consent_server_notice_sent': consent_version},
+                table="users",
+                keyvalues={"name": user_id},
+                updatevalues={"consent_server_notice_sent": consent_version},
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
@@ -1068,7 +1066,7 @@ class RegistrationStore(
 
         if id_servers:
             yield self.runInteraction(
-                "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn,
+                "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
             )
 
         yield self._end_background_update("user_threepids_grandfather")
@@ -1076,12 +1074,7 @@ class RegistrationStore(
         defer.returnValue(1)
 
     def get_threepid_validation_session(
-        self,
-        medium,
-        client_secret,
-        address=None,
-        sid=None,
-        validated=True,
+        self, medium, client_secret, address=None, sid=None, validated=True
     ):
         """Gets a session_id and last_send_attempt (if available) for a
         client_secret/medium/(address|session_id) combo
@@ -1101,23 +1094,22 @@ class RegistrationStore(
                 latest session_id and send_attempt count for this 3PID.
                 Otherwise None if there hasn't been a previous attempt
         """
-        keyvalues = {
-            "medium": medium,
-            "client_secret": client_secret,
-        }
+        keyvalues = {"medium": medium, "client_secret": client_secret}
         if address:
             keyvalues["address"] = address
         if sid:
             keyvalues["session_id"] = sid
 
-        assert(address or sid)
+        assert address or sid
 
         def get_threepid_validation_session_txn(txn):
             sql = """
                 SELECT address, session_id, medium, client_secret,
                 last_send_attempt, validated_at
                 FROM threepid_validation_session WHERE %s
-                """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),)
+                """ % (
+                " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+            )
 
             if validated is not None:
                 sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
@@ -1132,17 +1124,10 @@ class RegistrationStore(
             return rows[0]
 
         return self.runInteraction(
-            "get_threepid_validation_session",
-            get_threepid_validation_session_txn,
+            "get_threepid_validation_session", get_threepid_validation_session_txn
         )
 
-    def validate_threepid_session(
-        self,
-        session_id,
-        client_secret,
-        token,
-        current_ts,
-    ):
+    def validate_threepid_session(self, session_id, client_secret, token, current_ts):
         """Attempt to validate a threepid session using a token
 
         Args:
@@ -1174,7 +1159,7 @@ class RegistrationStore(
 
             if retrieved_client_secret != client_secret:
                 raise ThreepidValidationError(
-                    400, "This client_secret does not match the provided session_id",
+                    400, "This client_secret does not match the provided session_id"
                 )
 
             row = self._simple_select_one_txn(
@@ -1187,7 +1172,7 @@ class RegistrationStore(
 
             if not row:
                 raise ThreepidValidationError(
-                    400, "Validation token not found or has expired",
+                    400, "Validation token not found or has expired"
                 )
             expires = row["expires"]
             next_link = row["next_link"]
@@ -1198,7 +1183,7 @@ class RegistrationStore(
 
             if expires <= current_ts:
                 raise ThreepidValidationError(
-                    400, "This token has expired. Please request a new one",
+                    400, "This token has expired. Please request a new one"
                 )
 
             # Looks good. Validate the session
@@ -1213,8 +1198,7 @@ class RegistrationStore(
 
         # Return next_link if it exists
         return self.runInteraction(
-            "validate_threepid_session_txn",
-            validate_threepid_session_txn,
+            "validate_threepid_session_txn", validate_threepid_session_txn
         )
 
     def upsert_threepid_validation_session(
@@ -1281,6 +1265,7 @@ class RegistrationStore(
             token_expires (int): The timestamp for which after the token
                 will no longer be valid
         """
+
         def start_or_continue_validation_session_txn(txn):
             # Create or update a validation session
             self._simple_upsert_txn(
@@ -1314,6 +1299,7 @@ class RegistrationStore(
 
     def cull_expired_threepid_validation_tokens(self):
         """Remove threepid validation tokens with expiry dates that have passed"""
+
         def cull_expired_threepid_validation_tokens_txn(txn, ts):
             sql = """
             DELETE FROM threepid_validation_token WHERE
@@ -1335,6 +1321,7 @@ class RegistrationStore(
         Args:
             session_id (str): The ID of the session to delete
         """
+
         def delete_threepid_session_txn(txn):
             self._simple_delete_txn(
                 txn,
@@ -1348,8 +1335,7 @@ class RegistrationStore(
             )
 
         return self.runInteraction(
-            "delete_threepid_session",
-            delete_threepid_session_txn,
+            "delete_threepid_session", delete_threepid_session_txn
         )
 
     def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
@@ -1360,7 +1346,7 @@ class RegistrationStore(
             updatevalues={"deactivated": 1 if deactivated else 0},
         )
         self._invalidate_cache_and_stream(
-            txn, self.get_user_deactivated_status, (user_id,),
+            txn, self.get_user_deactivated_status, (user_id,)
         )
 
     @defer.inlineCallbacks
@@ -1375,7 +1361,8 @@ class RegistrationStore(
         yield self.runInteraction(
             "set_user_deactivated_status",
             self.set_user_deactivated_status_txn,
-            user_id, deactivated,
+            user_id,
+            deactivated,
         )
 
     @cachedInlineCallbacks()
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 4c83800cca..1b01934c19 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore):
         """
 
         self._simple_delete_txn(
-            txn,
-            table="event_relations",
-            keyvalues={
-                "event_id": redacted_event_id,
-            }
+            txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
         )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7617913326..8004aeb909 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -420,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 table="room_memberships",
                 column="event_id",
                 iterable=missing_member_event_ids,
-                retcols=('user_id', 'display_name', 'avatar_url'),
+                retcols=("user_id", "display_name", "avatar_url"),
                 keyvalues={"membership": Membership.JOIN},
                 batch_size=500,
                 desc="_get_joined_users_from_context",
@@ -448,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cachedInlineCallbacks(max_entries=10000)
     def is_host_joined(self, room_id, host):
-        if '%' in host or '_' in host:
+        if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
@@ -490,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             Deferred: Resolves to True if the host is/was in the room, otherwise
             False.
         """
-        if '%' in host or '_' in host:
+        if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
@@ -723,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 room_id = row["room_id"]
                 try:
                     event_json = json.loads(row["json"])
-                    content = event_json['content']
+                    content = event_json["content"]
                 except Exception:
                     continue
 
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 147496a38b..3edfcfd783 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
 
 def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table...")
-    cur.execute("""
+    cur.execute(
+        """
         CREATE TABLE IF NOT EXISTS pushers2 (
           id BIGINT PRIMARY KEY,
           user_name TEXT NOT NULL,
@@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs):
           failing_since BIGINT,
           UNIQUE (app_id, pushkey, user_name)
         )
-    """)
-    cur.execute("""SELECT
+    """
+    )
+    cur.execute(
+        """SELECT
         id, user_name, access_token, profile_tag, kind,
         app_id, app_display_name, device_display_name,
         pushkey, ts, lang, data, last_token, last_success,
         failing_since
         FROM pushers
-    """)
+    """
+    )
     count = 0
     for row in cur.fetchall():
         row = list(row)
         row[8] = bytes(row[8]).decode("utf-8")
         row[11] = bytes(row[11]).decode("utf-8")
-        cur.execute(database_engine.convert_param_style("""
+        cur.execute(
+            database_engine.convert_param_style(
+                """
             INSERT into pushers2 (
             id, user_name, access_token, profile_tag, kind,
             app_id, app_display_name, device_display_name,
             pushkey, ts, lang, data, last_token, last_success,
             failing_since
-            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
-            row
+            ) values (%s)"""
+                % (",".join(["?" for _ in range(len(row))]))
+            ),
+            row,
         )
         count += 1
     cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index ef7ec34346..9b95411fb6 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         logger.warning("Could not get app_service_config_files from config")
         pass
 
-    appservices = load_appservices(
-        config.server_name, config_files
-    )
+    appservices = load_appservices(config.server_name, config_files)
 
     owned = {}
 
@@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
                 if user_id in owned.keys():
                     logger.error(
                         "user_id %s was owned by more than one application"
-                        " service (IDs %s and %s); assigning arbitrarily to %s" %
-                        (user_id, owned[user_id], appservice.id, owned[user_id])
+                        " service (IDs %s and %s); assigning arbitrarily to %s"
+                        % (user_id, owned[user_id], appservice.id, owned[user_id])
                     )
                 owned.setdefault(appservice.id, []).append(user_id)
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
+        user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
-                    "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
-                        ",".join("?" for _ in chunk),
-                    )
+                    "UPDATE users SET appservice_id = ? WHERE name IN (%s)"
+                    % (",".join("?" for _ in chunk),)
                 ),
-                [as_id] + chunk
+                [as_id] + chunk,
             )
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
index 93367fa09e..9bb504aad5 100644
--- a/synapse/storage/schema/delta/31/pushers.py
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -24,12 +24,13 @@ logger = logging.getLogger(__name__)
 
 
 def token_to_stream_ordering(token):
-    return int(token[1:].split('_')[0])
+    return int(token[1:].split("_")[0])
 
 
 def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table, delta 31...")
-    cur.execute("""
+    cur.execute(
+        """
         CREATE TABLE IF NOT EXISTS pushers2 (
           id BIGINT PRIMARY KEY,
           user_name TEXT NOT NULL,
@@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs):
           failing_since BIGINT,
           UNIQUE (app_id, pushkey, user_name)
         )
-    """)
-    cur.execute("""SELECT
+    """
+    )
+    cur.execute(
+        """SELECT
         id, user_name, access_token, profile_tag, kind,
         app_id, app_display_name, device_display_name,
         pushkey, ts, lang, data, last_token, last_success,
         failing_since
         FROM pushers
-    """)
+    """
+    )
     count = 0
     for row in cur.fetchall():
         row = list(row)
         row[12] = token_to_stream_ordering(row[12])
-        cur.execute(database_engine.convert_param_style("""
+        cur.execute(
+            database_engine.convert_param_style(
+                """
             INSERT into pushers2 (
             id, user_name, access_token, profile_tag, kind,
             app_id, app_display_name, device_display_name,
             pushkey, ts, lang, data, last_stream_ordering, last_success,
             failing_since
-            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
-            row
+            ) values (%s)"""
+                % (",".join(["?" for _ in range(len(row))]))
+            ),
+            row,
         )
         count += 1
     cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 9754d3ccfb..a26057dfb6 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         database_engine.convert_param_style(
             "UPDATE remote_media_cache SET last_access_ts = ?"
         ),
-        (int(time.time() * 1000),)
+        (int(time.time() * 1000),),
     )
diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py
index f6766501d2..9fd1ccf6f7 100644
--- a/synapse/storage/schema/delta/47/state_group_seq.py
+++ b/synapse/storage/schema/delta/47/state_group_seq.py
@@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs):
         else:
             start_val = row[0] + 1
 
-        cur.execute(
-            "CREATE SEQUENCE state_group_id_seq START WITH %s",
-            (start_val, ),
-        )
+        cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
 
 
 def run_upgrade(*args, **kwargs):
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
index 2233af87d7..49f5f2c003 100644
--- a/synapse/storage/schema/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs):
     rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
 
     # remove duplicates from group_users & group_invites tables
-    cur.execute("""
+    cur.execute(
+        """
         DELETE FROM group_users WHERE %s NOT IN (
            SELECT min(%s) FROM group_users GROUP BY group_id, user_id
         );
-    """ % (rowid, rowid))
-    cur.execute("""
+    """
+        % (rowid, rowid)
+    )
+    cur.execute(
+        """
         DELETE FROM group_invites WHERE %s NOT IN (
            SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
         );
-    """ % (rowid, rowid))
+    """
+        % (rowid, rowid)
+    )
 
     for statement in get_statements(FIX_INDEXES.splitlines()):
         cur.execute(statement)
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 6dd467b6c5..b1684a8441 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs):
 
 def run_upgrade(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
-        cur.execute("""
+        cur.execute(
+            """
             ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
-        """)
+        """
+        )
         return
 
     # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
 
-    cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+    cur.execute(
+        "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
+    )
     (oldsql,) = cur.fetchone()
 
     sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
@@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     cur.execute("PRAGMA writable_schema=ON")
     cur.execute(
         "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
-        (sql, ),
+        (sql,),
     )
     cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
     cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 10a27c207a..f3b1cec933 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore
 logger = logging.getLogger(__name__)
 
 SearchEntry = namedtuple(
-    'SearchEntry',
-    ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+    "SearchEntry",
+    ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
 )
 
 
@@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
-        have_added_index = progress['have_added_indexes']
+        have_added_index = progress["have_added_indexes"]
 
         if not have_added_index:
 
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index ff266b09b0..1cec84ee2e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -71,7 +71,8 @@ class StatsStore(StateDeltasStore):
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
             # Create the temporary tables
-            stmts = get_statements("""
+            stmts = get_statements(
+                """
                 -- We just recreate the table, we'll be reinserting the
                 -- correct entries again later anyway.
                 DROP TABLE IF EXISTS {temp}_rooms;
@@ -85,7 +86,10 @@ class StatsStore(StateDeltasStore):
                     ON {temp}_rooms(events);
                 CREATE INDEX {temp}_rooms_id
                     ON {temp}_rooms(room_id);
-            """.format(temp=TEMP_TABLE).splitlines())
+            """.format(
+                    temp=TEMP_TABLE
+                ).splitlines()
+            )
 
             for statement in stmts:
                 txn.execute(statement)
@@ -105,7 +109,9 @@ class StatsStore(StateDeltasStore):
                 LEFT JOIN room_stats_earliest_token AS t USING (room_id)
                 WHERE t.room_id IS NULL
                 GROUP BY c.room_id
-            """ % (TEMP_TABLE,)
+            """ % (
+                TEMP_TABLE,
+            )
             txn.execute(sql)
 
         new_pos = yield self.get_max_stream_id_in_current_state_deltas()
@@ -184,7 +190,8 @@ class StatsStore(StateDeltasStore):
 
         logger.info(
             "Processing the next %d rooms of %d remaining",
-            len(rooms_to_work_on), progress["remaining"],
+            len(rooms_to_work_on),
+            progress["remaining"],
         )
 
         # Number of state events we've processed by going through each room
@@ -204,10 +211,17 @@ class StatsStore(StateDeltasStore):
             avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
             canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
 
-            state_events = yield self.get_events([
-                join_rules_id, history_visibility_id, encryption_id, name_id,
-                topic_id, avatar_id, canonical_alias_id,
-            ])
+            state_events = yield self.get_events(
+                [
+                    join_rules_id,
+                    history_visibility_id,
+                    encryption_id,
+                    name_id,
+                    topic_id,
+                    avatar_id,
+                    canonical_alias_id,
+                ]
+            )
 
             def _get_or_none(event_id, arg):
                 event = state_events.get(event_id)
@@ -271,7 +285,7 @@ class StatsStore(StateDeltasStore):
 
                 # We've finished a room. Delete it from the table.
                 self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
                 )
 
             yield self.runInteraction("update_room_stats", _fetch_data)
@@ -338,7 +352,7 @@ class StatsStore(StateDeltasStore):
             "name",
             "topic",
             "avatar",
-            "canonical_alias"
+            "canonical_alias",
         ):
             field = fields.get(col)
             if field and "\0" in field:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6f7f65d96b..d9482a3843 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -65,7 +65,7 @@ _EventDictReturn = namedtuple(
 
 
 def generate_pagination_where_clause(
-    direction, column_names, from_token, to_token, engine,
+    direction, column_names, from_token, to_token, engine
 ):
     """Creates an SQL expression to bound the columns by the pagination
     tokens.
@@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
         str
     """
 
-    assert(bound in (">", "<", ">=", "<="))
+    assert bound in (">", "<", ">=", "<=")
 
     name1, name2 = column_names
     val1, val2 = values
@@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
         # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
         # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
         # use the later form when running against postgres.
-        return "((%d,%d) %s (%s,%s))" % (
-            val1, val2,
-            bound,
-            name1, name2,
-        )
+        return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2)
 
     # We want to generate queries of e.g. the form:
     #
@@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(
-        self, room_ids, from_key, to_key, limit=0, order='DESC'
+        self, room_ids, from_key, to_key, limit=0, order="DESC"
     ):
         """Get new room events in stream ordering since `from_key`.
 
@@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(
-        self, room_id, from_key, to_key, limit=0, order='DESC'
+        self, room_id, from_key, to_key, limit=0, order="DESC"
     ):
 
         """Get new room events in stream ordering since `from_key`.
@@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-        ret = yield self.get_events_as_list([
-            r.event_id for r in rows], get_prev_content=True,
+        ret = yield self.get_events_as_list(
+            [r.event_id for r in rows], get_prev_content=True
         )
 
         self._set_before_and_after(ret, rows, topo_order=from_id is None)
@@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self.get_events_as_list(
-            [r.event_id for r in rows], get_prev_content=True,
+            [r.event_id for r in rows], get_prev_content=True
         )
 
         self._set_before_and_after(ret, rows, topo_order=False)
@@ -725,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn,
             room_id,
             before_token,
-            direction='b',
+            direction="b",
             limit=before_limit,
             event_filter=event_filter,
         )
@@ -735,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn,
             room_id,
             after_token,
-            direction='f',
+            direction="f",
             limit=after_limit,
             event_filter=event_filter,
         )
@@ -816,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         room_id,
         from_token,
         to_token=None,
-        direction='b',
+        direction="b",
         limit=-1,
         event_filter=None,
     ):
@@ -846,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
         args = [False, room_id]
-        if direction == 'b':
+        if direction == "b":
             order = "DESC"
         else:
             order = "ASC"
@@ -882,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         if rows:
             topo = rows[-1].topological_ordering
             toke = rows[-1].stream_ordering
-            if direction == 'b':
+            if direction == "b":
                 # Tokens are positions between events.
                 # This token points *after* the last event in the chunk.
                 # We need it to point to the event before it in the chunk
@@ -898,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(
-        self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None
+        self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None
     ):
         """Returns list of events before or after a given token.