summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_bg_updates.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events_bg_updates.py')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py46
1 files changed, 19 insertions, 27 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 35a0e09e3c..e53c6373a8 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventContentFields
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
@@ -94,8 +92,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             where_clause="NOT have_censored",
         )
 
-    @defer.inlineCallbacks
-    def _background_reindex_fields_sender(self, progress, batch_size):
+    async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
@@ -155,19 +152,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             return len(rows)
 
-        result = yield self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
         )
 
         if not result:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
             )
 
         return result
 
-    @defer.inlineCallbacks
-    def _background_reindex_origin_server_ts(self, progress, batch_size):
+    async def _background_reindex_origin_server_ts(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
@@ -234,19 +230,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             return len(rows_to_update)
 
-        result = yield self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
         )
 
         if not result:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_ORIGIN_SERVER_TS_NAME
             )
 
         return result
 
-    @defer.inlineCallbacks
-    def _cleanup_extremities_bg_update(self, progress, batch_size):
+    async def _cleanup_extremities_bg_update(self, progress, batch_size):
         """Background update to clean out extremities that should have been
         deleted previously.
 
@@ -414,26 +409,25 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             return len(original_set)
 
-        num_handled = yield self.db_pool.runInteraction(
+        num_handled = await self.db_pool.runInteraction(
             "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
         )
 
         if not num_handled:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.DELETE_SOFT_FAILED_EXTREMITIES
             )
 
             def _drop_table_txn(txn):
                 txn.execute("DROP TABLE _extremities_to_check")
 
-            yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
             )
 
         return num_handled
 
-    @defer.inlineCallbacks
-    def _redactions_received_ts(self, progress, batch_size):
+    async def _redactions_received_ts(self, progress, batch_size):
         """Handles filling out the `received_ts` column in redactions.
         """
         last_event_id = progress.get("last_event_id", "")
@@ -480,17 +474,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             return len(rows)
 
-        count = yield self.db_pool.runInteraction(
+        count = await self.db_pool.runInteraction(
             "_redactions_received_ts", _redactions_received_ts_txn
         )
 
         if not count:
-            yield self.db_pool.updates._end_background_update("redactions_received_ts")
+            await self.db_pool.updates._end_background_update("redactions_received_ts")
 
         return count
 
-    @defer.inlineCallbacks
-    def _event_fix_redactions_bytes(self, progress, batch_size):
+    async def _event_fix_redactions_bytes(self, progress, batch_size):
         """Undoes hex encoded censored redacted event JSON.
         """
 
@@ -511,16 +504,15 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             txn.execute("DROP INDEX redactions_censored_redacts")
 
-        yield self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
         )
 
-        yield self.db_pool.updates._end_background_update("event_fix_redactions_bytes")
+        await self.db_pool.updates._end_background_update("event_fix_redactions_bytes")
 
         return 1
 
-    @defer.inlineCallbacks
-    def _event_store_labels(self, progress, batch_size):
+    async def _event_store_labels(self, progress, batch_size):
         """Background update handler which will store labels for existing events."""
         last_event_id = progress.get("last_event_id", "")
 
@@ -575,11 +567,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             return nbrows
 
-        num_rows = yield self.db_pool.runInteraction(
+        num_rows = await self.db_pool.runInteraction(
             desc="event_store_labels", func=_event_store_labels_txn
         )
 
         if not num_rows:
-            yield self.db_pool.updates._end_background_update("event_store_labels")
+            await self.db_pool.updates._end_background_update("event_store_labels")
 
         return num_rows