summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/transactions.py49
1 files changed, 25 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 54b41513ee..6c299cafa5 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -13,9 +13,8 @@
 # limitations under the License.
 
 import logging
-from collections import namedtuple
 from enum import Enum
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast
 
 import attr
 from canonicaljson import encode_canonical_json
@@ -39,16 +38,6 @@ db_binary_type = memoryview
 logger = logging.getLogger(__name__)
 
 
-_TransactionRow = namedtuple(
-    "_TransactionRow",
-    ("id", "transaction_id", "destination", "ts", "response_code", "response_json"),
-)
-
-_UpdateTransactionRow = namedtuple(
-    "_TransactionRow", ("response_code", "response_json")
-)
-
-
 class DestinationSortOrder(Enum):
     """Enum to define the sorting method used when returning destinations."""
 
@@ -91,7 +80,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         now = self._clock.time_msec()
         month_ago = now - 30 * 24 * 60 * 60 * 1000
 
-        def _cleanup_transactions_txn(txn):
+        def _cleanup_transactions_txn(txn: LoggingTransaction) -> None:
             txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
 
         await self.db_pool.runInteraction(
@@ -121,7 +110,9 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             origin,
         )
 
-    def _get_received_txn_response(self, txn, transaction_id, origin):
+    def _get_received_txn_response(
+        self, txn: LoggingTransaction, transaction_id: str, origin: str
+    ) -> Optional[Tuple[int, JsonDict]]:
         result = self.db_pool.simple_select_one_txn(
             txn,
             table="received_transactions",
@@ -196,7 +187,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         return result
 
     def _get_destination_retry_timings(
-        self, txn, destination: str
+        self, txn: LoggingTransaction, destination: str
     ) -> Optional[DestinationRetryTimings]:
         result = self.db_pool.simple_select_one_txn(
             txn,
@@ -231,7 +222,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         """
 
         if self.database_engine.can_native_upsert:
-            return await self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "set_destination_retry_timings",
                 self._set_destination_retry_timings_native,
                 destination,
@@ -241,7 +232,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
                 db_autocommit=True,  # Safe as its a single upsert
             )
         else:
-            return await self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "set_destination_retry_timings",
                 self._set_destination_retry_timings_emulated,
                 destination,
@@ -251,8 +242,13 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             )
 
     def _set_destination_retry_timings_native(
-        self, txn, destination, failure_ts, retry_last_ts, retry_interval
-    ):
+        self,
+        txn: LoggingTransaction,
+        destination: str,
+        failure_ts: Optional[int],
+        retry_last_ts: int,
+        retry_interval: int,
+    ) -> None:
         assert self.database_engine.can_native_upsert
 
         # Upsert retry time interval if retry_interval is zero (i.e. we're
@@ -282,8 +278,13 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         )
 
     def _set_destination_retry_timings_emulated(
-        self, txn, destination, failure_ts, retry_last_ts, retry_interval
-    ):
+        self,
+        txn: LoggingTransaction,
+        destination: str,
+        failure_ts: Optional[int],
+        retry_last_ts: int,
+        retry_interval: int,
+    ) -> None:
         self.database_engine.lock_table(txn, "destinations")
 
         # We need to be careful here as the data may have changed from under us
@@ -393,7 +394,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             last_successful_stream_ordering: the stream_ordering of the most
                 recent successfully-sent PDU
         """
-        return await self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             "destinations",
             keyvalues={"destination": destination},
             values={"last_successful_stream_ordering": last_successful_stream_ordering},
@@ -534,7 +535,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             else:
                 order = "ASC"
 
-            args = []
+            args: List[object] = []
             where_statement = ""
             if destination:
                 args.extend(["%" + destination.lower() + "%"])
@@ -543,7 +544,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             sql_base = f"FROM destinations {where_statement} "
             sql = f"SELECT COUNT(*) as total_destinations {sql_base}"
             txn.execute(sql, args)
-            count = txn.fetchone()[0]
+            count = cast(Tuple[int], txn.fetchone())[0]
 
             sql = f"""
                 SELECT destination, retry_last_ts, retry_interval, failure_ts,