summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-10-07 11:27:56 -0400
committerGitHub <noreply@github.com>2020-10-07 11:27:56 -0400
commite4f72ddc44367d0cd53e6cfc5ba310b6f55319b6 (patch)
tree76bdb1d3a71e35aee33af0e8c2d3704476591340 /synapse/storage/databases/main/transactions.py
parentInclude the configured log level in phone home stats. (#8477) (diff)
downloadsynapse-e4f72ddc44367d0cd53e6cfc5ba310b6f55319b6.tar.xz
Move additional tasks to the background worker (#8458)
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py42
1 files changed, 22 insertions, 20 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 97aed1500e..7d46090267 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -19,7 +19,7 @@ from typing import Iterable, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -43,15 +43,33 @@ _UpdateTransactionRow = namedtuple(
 SENTINEL = object()
 
 
-class TransactionStore(SQLBaseStore):
+class TransactionWorkerStore(SQLBaseStore):
+    def __init__(self, database: DatabasePool, db_conn, hs):
+        super().__init__(database, db_conn, hs)
+
+        if hs.config.run_background_tasks:
+            self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
+
+    @wrap_as_background_process("cleanup_transactions")
+    async def _cleanup_transactions(self) -> None:
+        now = self._clock.time_msec()
+        month_ago = now - 30 * 24 * 60 * 60 * 1000
+
+        def _cleanup_transactions_txn(txn):
+            txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
+
+        await self.db_pool.runInteraction(
+            "_cleanup_transactions", _cleanup_transactions_txn
+        )
+
+
+class TransactionStore(TransactionWorkerStore):
     """A collection of queries for handling PDUs.
     """
 
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
-        self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
-
         self._destination_retry_cache = ExpiringCache(
             cache_name="get_destination_retry_timings",
             clock=self._clock,
@@ -266,22 +284,6 @@ class TransactionStore(SQLBaseStore):
                 },
             )
 
-    def _start_cleanup_transactions(self):
-        return run_as_background_process(
-            "cleanup_transactions", self._cleanup_transactions
-        )
-
-    async def _cleanup_transactions(self) -> None:
-        now = self._clock.time_msec()
-        month_ago = now - 30 * 24 * 60 * 60 * 1000
-
-        def _cleanup_transactions_txn(txn):
-            txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
-
-        await self.db_pool.runInteraction(
-            "_cleanup_transactions", _cleanup_transactions_txn
-        )
-
     async def store_destination_rooms_entries(
         self, destinations: Iterable[str], room_id: str, stream_ordering: int,
     ) -> None: