summary refs log tree commit diff
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-09-13 08:16:37 +0100
committerGitHub <noreply@github.com>2022-09-13 08:16:37 +0100
commitcdbb6412327b542e0dead792717fe58253291131 (patch)
treef683529f5175445e87b7a04b677bd6ff1a224dac
parentFix GHA skippable syntax (#13778) (diff)
downloadsynapse-cdbb6412327b542e0dead792717fe58253291131.tar.xz
Add receipts event stream ordering (#13703)
-rw-r--r--changelog.d/13703.misc1
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py2
-rw-r--r--synapse/storage/databases/main/receipts.py74
-rw-r--r--synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql19
4 files changed, 95 insertions, 1 deletions
diff --git a/changelog.d/13703.misc b/changelog.d/13703.misc
new file mode 100644
index 0000000000..685a29b17d
--- /dev/null
+++ b/changelog.d/13703.misc
@@ -0,0 +1 @@
+Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 543bba27c2..30983c47fb 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
 from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
     find_max_generated_user_id_localpart,
@@ -203,6 +204,7 @@ class Store(
     PushRuleStore,
     PusherWorkerStore,
     PresenceBackgroundUpdateStore,
+    ReceiptsBackgroundUpdateStore,
 ):
     def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3838409519..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
+                "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
@@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore):
         )
 
 
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+    POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+            self._populate_receipt_event_stream_ordering,
+        )
+
+    async def _populate_receipt_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_receipt_event_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                UPDATE receipts_linearized
+                SET event_stream_ordering = (
+                    SELECT stream_ordering
+                    FROM events
+                    WHERE event_id = receipts_linearized.event_id
+                )
+                WHERE stream_id >= ? AND stream_id < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop > max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _populate_receipt_event_stream_ordering_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('populate_event_stream_ordering', '{}');