summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12209.misc1
-rw-r--r--docs/upgrade.md13
-rw-r--r--synapse/storage/databases/main/appservice.py62
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/69/01as_txn_seq.py44
-rw-r--r--tests/storage/test_appservice.py71
6 files changed, 83 insertions, 113 deletions
diff --git a/changelog.d/12209.misc b/changelog.d/12209.misc
new file mode 100644
index 0000000000..d145b5eb04
--- /dev/null
+++ b/changelog.d/12209.misc
@@ -0,0 +1 @@
+Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. If running synapse with a dedicated appservice worker, this MUST be stopped before upgrading the main process and database.
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 062e823333..f6d226526a 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -85,6 +85,19 @@ process, for example:
     dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
     ```
 
+# Upgrading to v1.57.0
+
+## Changes to database schema for application services
+
+Synapse v1.57.0 includes a [change](https://github.com/matrix-org/synapse/pull/12209) to the
+way transaction IDs are managed for application services. If your deployment uses a dedicated
+worker for application service traffic, **it must be stopped** when the database is upgraded
+(which normally happens when the main process is upgraded), to ensure the change is made safely
+without any risk of reusing transaction IDs.
+
+Deployments which do not use separate worker processes can be upgraded as normal. Similarly,
+deployments where no applciation services are in use can be upgraded as normal.
+
 # Upgrading to v1.56.0
 
 ## Groups/communities feature has been deprecated
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 55e1ab099d..eb32c34a85 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -29,6 +29,8 @@ from synapse.storage._base import db_to_json
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
+from synapse.storage.types import Cursor
+from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import DeviceListUpdates, JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import _CacheContext, cached
@@ -72,6 +74,22 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore):
         )
         self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
+        def get_max_as_txn_id(txn: Cursor) -> int:
+            logger.warning("Falling back to slow query, you should port to postgres")
+            txn.execute(
+                "SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
+            )
+            return txn.fetchone()[0]  # type: ignore
+
+        self._as_txn_seq_gen = build_sequence_generator(
+            db_conn,
+            database.engine,
+            get_max_as_txn_id,
+            "application_services_txn_id_seq",
+            table="application_services_txns",
+            id_column="txn_id",
+        )
+
         super().__init__(database, db_conn, hs)
 
     def get_app_services(self):
@@ -239,21 +257,7 @@ class ApplicationServiceTransactionWorkerStore(
         """
 
         def _create_appservice_txn(txn):
-            # work out new txn id (highest txn id for this service += 1)
-            # The highest id may be the last one sent (in which case it is last_txn)
-            # or it may be the highest in the txns list (which are waiting to be/are
-            # being sent)
-            last_txn_id = self._get_last_txn(txn, service.id)
-
-            txn.execute(
-                "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
-                (service.id,),
-            )
-            highest_txn_id = txn.fetchone()[0]
-            if highest_txn_id is None:
-                highest_txn_id = 0
-
-            new_txn_id = max(highest_txn_id, last_txn_id) + 1
+            new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)
 
             # Insert new txn into txn table
             event_ids = json_encoder.encode([e.event_id for e in events])
@@ -286,25 +290,8 @@ class ApplicationServiceTransactionWorkerStore(
             txn_id: The transaction ID being completed.
             service: The application service which was sent this transaction.
         """
-        txn_id = int(txn_id)
 
         def _complete_appservice_txn(txn):
-            # Debugging query: Make sure the txn being completed is EXACTLY +1 from
-            # what was there before. If it isn't, we've got problems (e.g. the AS
-            # has probably missed some events), so whine loudly but still continue,
-            # since it shouldn't fail completion of the transaction.
-            last_txn_id = self._get_last_txn(txn, service.id)
-            if (last_txn_id + 1) != txn_id:
-                logger.error(
-                    "appservice: Completing a transaction which has an ID > 1 from "
-                    "the last ID sent to this AS. We've either dropped events or "
-                    "sent it to the AS out of order. FIX ME. last_txn=%s "
-                    "completing_txn=%s service_id=%s",
-                    last_txn_id,
-                    txn_id,
-                    service.id,
-                )
-
             # Set current txn_id for AS to 'txn_id'
             self.db_pool.simple_upsert_txn(
                 txn,
@@ -376,17 +363,6 @@ class ApplicationServiceTransactionWorkerStore(
             device_list_summary=DeviceListUpdates(),
         )
 
-    def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
-        txn.execute(
-            "SELECT last_txn FROM application_services_state WHERE as_id=?",
-            (service_id,),
-        )
-        last_txn_id = txn.fetchone()
-        if last_txn_id is None or last_txn_id[0] is None:  # no row exists
-            return 0
-        else:
-            return int(last_txn_id[0])  # select 'last_txn' col
-
     async def set_appservice_last_pos(self, pos: int) -> None:
         def set_appservice_last_pos_txn(txn):
             txn.execute(
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 7b21c1b96d..ea900e0f3d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 68  # remember to update the list below when updating
+SCHEMA_VERSION = 69  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -58,6 +58,9 @@ Changes in SCHEMA_VERSION = 68:
     - event_reference_hashes is no longer read.
     - `events` has `state_key` and `rejection_reason` columns, which are populated for
       new events.
+
+Changes in SCHEMA_VERSION = 69:
+    - Use sequence to generate future `application_services_txns.txn_id`s
 """
 
 
diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
new file mode 100644
index 0000000000..24bd4b391e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
@@ -0,0 +1,44 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Adds a postgres SEQUENCE for generating application service transaction IDs.
+"""
+
+from synapse.storage.engines import PostgresEngine
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        # If we already have some AS TXNs we want to start from the current
+        # maximum value. There are two potential places this is stored - the
+        # actual TXNs themselves *and* the AS state table. At time of migration
+        # it is possible the TXNs table is empty so we must include the AS state
+        # last_txn as a potential option, and pick the maximum.
+
+        cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns")
+        row = cur.fetchone()
+        txn_max = row[0]
+
+        cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state")
+        row = cur.fetchone()
+        last_txn_max = row[0]
+
+        start_val = max(last_txn_max, txn_max) + 1
+
+        cur.execute(
+            "CREATE SEQUENCE application_services_txn_id_seq START WITH %s",
+            (start_val,),
+        )
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index 97de9a59e0..08078d38e2 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -169,15 +169,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
             (as_id, txn_id, json.dumps([e.event_id for e in events])),
         )
 
-    def _set_last_txn(self, as_id, txn_id):
-        return self.db_pool.runOperation(
-            self.engine.convert_param_style(
-                "INSERT INTO application_services_state(as_id, last_txn, state) "
-                "VALUES(?,?,?)"
-            ),
-            (as_id, txn_id, ApplicationServiceState.UP.value),
-        )
-
     def test_get_appservice_state_none(
         self,
     ) -> None:
@@ -277,64 +268,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
         self.assertEqual(txn.events, events)
         self.assertEqual(txn.service, service)
 
-    def test_create_appservice_txn_older_last_txn(
-        self,
-    ) -> None:
-        service = Mock(id=self.as_list[0]["id"])
-        events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
-        self.get_success(self._set_last_txn(service.id, 9643))  # AS is falling behind
-        self.get_success(self._insert_txn(service.id, 9644, events))
-        self.get_success(self._insert_txn(service.id, 9645, events))
-        txn = self.get_success(
-            self.store.create_appservice_txn(
-                service, events, [], [], {}, {}, DeviceListUpdates()
-            )
-        )
-        self.assertEqual(txn.id, 9646)
-        self.assertEqual(txn.events, events)
-        self.assertEqual(txn.service, service)
-
-    def test_create_appservice_txn_up_to_date_last_txn(
-        self,
-    ) -> None:
-        service = Mock(id=self.as_list[0]["id"])
-        events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
-        self.get_success(self._set_last_txn(service.id, 9643))
-        txn = self.get_success(
-            self.store.create_appservice_txn(
-                service, events, [], [], {}, {}, DeviceListUpdates()
-            )
-        )
-        self.assertEqual(txn.id, 9644)
-        self.assertEqual(txn.events, events)
-        self.assertEqual(txn.service, service)
-
-    def test_create_appservice_txn_up_fuzzing(
-        self,
-    ) -> None:
-        service = Mock(id=self.as_list[0]["id"])
-        events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
-        self.get_success(self._set_last_txn(service.id, 9643))
-
-        # dump in rows with higher IDs to make sure the queries aren't wrong.
-        self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643))
-        self.get_success(self._set_last_txn(self.as_list[2]["id"], 9))
-        self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643))
-        self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events))
-        self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events))
-        self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events))
-        self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events))
-        self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events))
-
-        txn = self.get_success(
-            self.store.create_appservice_txn(
-                service, events, [], [], {}, {}, DeviceListUpdates()
-            )
-        )
-        self.assertEqual(txn.id, 9644)
-        self.assertEqual(txn.events, events)
-        self.assertEqual(txn.service, service)
-
     def test_complete_appservice_txn_first_txn(
         self,
     ) -> None:
@@ -368,13 +301,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(0, len(res))
 
-    def test_complete_appservice_txn_existing_in_state_table(
+    def test_complete_appservice_txn_updates_last_txn_state(
         self,
     ) -> None:
         service = Mock(id=self.as_list[0]["id"])
         events = [Mock(event_id="e1"), Mock(event_id="e2")]
         txn_id = 5
-        self.get_success(self._set_last_txn(service.id, 4))
+        self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
         self.get_success(self._insert_txn(service.id, txn_id, events))
         self.get_success(
             self.store.complete_appservice_txn(txn_id=txn_id, service=service)