summary refs log tree commit diff
path: root/synapse/storage/databases/main/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/appservice.py')
-rw-r--r--synapse/storage/databases/main/appservice.py28
1 files changed, 18 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index eb32c34a85..fa732edcca 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 import logging
 import re
-from typing import TYPE_CHECKING, List, Optional, Pattern, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Pattern, Tuple
 
 from synapse.appservice import (
     ApplicationService,
@@ -26,7 +26,11 @@ from synapse.appservice import (
 from synapse.config.appservice import load_appservices
 from synapse.events import EventBase
 from synapse.storage._base import db_to_json
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+)
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.types import Cursor
@@ -92,7 +96,7 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore):
 
         super().__init__(database, db_conn, hs)
 
-    def get_app_services(self):
+    def get_app_services(self) -> List[ApplicationService]:
         return self.services_cache
 
     def get_if_app_services_interested_in_user(self, user_id: str) -> bool:
@@ -256,7 +260,7 @@ class ApplicationServiceTransactionWorkerStore(
             A new transaction.
         """
 
-        def _create_appservice_txn(txn):
+        def _create_appservice_txn(txn: LoggingTransaction) -> AppServiceTransaction:
             new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)
 
             # Insert new txn into txn table
@@ -291,7 +295,7 @@ class ApplicationServiceTransactionWorkerStore(
             service: The application service which was sent this transaction.
         """
 
-        def _complete_appservice_txn(txn):
+        def _complete_appservice_txn(txn: LoggingTransaction) -> None:
             # Set current txn_id for AS to 'txn_id'
             self.db_pool.simple_upsert_txn(
                 txn,
@@ -322,7 +326,9 @@ class ApplicationServiceTransactionWorkerStore(
             An AppServiceTransaction or None.
         """
 
-        def _get_oldest_unsent_txn(txn):
+        def _get_oldest_unsent_txn(
+            txn: LoggingTransaction,
+        ) -> Optional[Dict[str, Any]]:
             # Monotonically increasing txn ids, so just select the smallest
             # one in the txns table (we delete them when they are sent)
             txn.execute(
@@ -364,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def set_appservice_last_pos(self, pos: int) -> None:
-        def set_appservice_last_pos_txn(txn):
+        def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
             txn.execute(
                 "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
             )
@@ -378,7 +384,9 @@ class ApplicationServiceTransactionWorkerStore(
     ) -> Tuple[int, List[EventBase]]:
         """Get all new events for an appservice"""
 
-        def get_new_events_for_appservice_txn(txn):
+        def get_new_events_for_appservice_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[int, List[str]]:
             sql = (
                 "SELECT e.stream_ordering, e.event_id"
                 " FROM events AS e"
@@ -416,7 +424,7 @@ class ApplicationServiceTransactionWorkerStore(
                 % (type,)
             )
 
-        def get_type_stream_id_for_appservice_txn(txn):
+        def get_type_stream_id_for_appservice_txn(txn: LoggingTransaction) -> int:
             stream_id_type = "%s_stream_id" % type
             txn.execute(
                 # We do NOT want to escape `stream_id_type`.
@@ -444,7 +452,7 @@ class ApplicationServiceTransactionWorkerStore(
                 % (stream_type,)
             )
 
-        def set_appservice_stream_type_pos_txn(txn):
+        def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None:
             stream_id_type = "%s_stream_id" % stream_type
             txn.execute(
                 "UPDATE application_services_state SET %s = ? WHERE as_id=?"