summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/appservice.py62
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/69/01as_txn_seq.py44
3 files changed, 67 insertions, 44 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 55e1ab099d..eb32c34a85 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -29,6 +29,8 @@ from synapse.storage._base import db_to_json
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
+from synapse.storage.types import Cursor
+from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import DeviceListUpdates, JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import _CacheContext, cached
@@ -72,6 +74,22 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore):
         )
         self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
+        def get_max_as_txn_id(txn: Cursor) -> int:
+            logger.warning("Falling back to slow query, you should port to postgres")
+            txn.execute(
+                "SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
+            )
+            return txn.fetchone()[0]  # type: ignore
+
+        self._as_txn_seq_gen = build_sequence_generator(
+            db_conn,
+            database.engine,
+            get_max_as_txn_id,
+            "application_services_txn_id_seq",
+            table="application_services_txns",
+            id_column="txn_id",
+        )
+
         super().__init__(database, db_conn, hs)
 
     def get_app_services(self):
@@ -239,21 +257,7 @@ class ApplicationServiceTransactionWorkerStore(
         """
 
         def _create_appservice_txn(txn):
-            # work out new txn id (highest txn id for this service += 1)
-            # The highest id may be the last one sent (in which case it is last_txn)
-            # or it may be the highest in the txns list (which are waiting to be/are
-            # being sent)
-            last_txn_id = self._get_last_txn(txn, service.id)
-
-            txn.execute(
-                "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
-                (service.id,),
-            )
-            highest_txn_id = txn.fetchone()[0]
-            if highest_txn_id is None:
-                highest_txn_id = 0
-
-            new_txn_id = max(highest_txn_id, last_txn_id) + 1
+            new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)
 
             # Insert new txn into txn table
             event_ids = json_encoder.encode([e.event_id for e in events])
@@ -286,25 +290,8 @@ class ApplicationServiceTransactionWorkerStore(
             txn_id: The transaction ID being completed.
             service: The application service which was sent this transaction.
         """
-        txn_id = int(txn_id)
 
         def _complete_appservice_txn(txn):
-            # Debugging query: Make sure the txn being completed is EXACTLY +1 from
-            # what was there before. If it isn't, we've got problems (e.g. the AS
-            # has probably missed some events), so whine loudly but still continue,
-            # since it shouldn't fail completion of the transaction.
-            last_txn_id = self._get_last_txn(txn, service.id)
-            if (last_txn_id + 1) != txn_id:
-                logger.error(
-                    "appservice: Completing a transaction which has an ID > 1 from "
-                    "the last ID sent to this AS. We've either dropped events or "
-                    "sent it to the AS out of order. FIX ME. last_txn=%s "
-                    "completing_txn=%s service_id=%s",
-                    last_txn_id,
-                    txn_id,
-                    service.id,
-                )
-
             # Set current txn_id for AS to 'txn_id'
             self.db_pool.simple_upsert_txn(
                 txn,
@@ -376,17 +363,6 @@ class ApplicationServiceTransactionWorkerStore(
             device_list_summary=DeviceListUpdates(),
         )
 
-    def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
-        txn.execute(
-            "SELECT last_txn FROM application_services_state WHERE as_id=?",
-            (service_id,),
-        )
-        last_txn_id = txn.fetchone()
-        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
-            return 0
-        else:
-            return int(last_txn_id[0])  # select 'last_txn' col
-
     async def set_appservice_last_pos(self, pos: int) -> None:
         def set_appservice_last_pos_txn(txn):
             txn.execute(
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 7b21c1b96d..ea900e0f3d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 68  # remember to update the list below when updating
+SCHEMA_VERSION = 69  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -58,6 +58,9 @@ Changes in SCHEMA_VERSION = 68:
     - event_reference_hashes is no longer read.
     - `events` has `state_key` and `rejection_reason` columns, which are populated for
       new events.
+
+Changes in SCHEMA_VERSION = 69:
+    - Use sequence to generate future `application_services_txns.txn_id`s
 """
 
 
diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
new file mode 100644
index 0000000000..24bd4b391e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
@@ -0,0 +1,44 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Adds a postgres SEQUENCE for generating application service transaction IDs.
+"""
+
+from synapse.storage.engines import PostgresEngine
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        # If we already have some AS TXNs we want to start from the current
+        # maximum value. There are two potential places this is stored - the
+        # actual TXNs themselves *and* the AS state table. At time of migration
+        # it is possible the TXNs table is empty so we must include the AS state
+        # last_txn as a potential option, and pick the maximum.
+
+        cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns")
+        row = cur.fetchone()
+        txn_max = row[0]
+
+        cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state")
+        row = cur.fetchone()
+        last_txn_max = row[0]
+
+        start_val = max(last_txn_max, txn_max) + 1
+
+        cur.execute(
+            "CREATE SEQUENCE application_services_txn_id_seq START WITH %s",
+            (start_val,),
+        )