diff --git a/changelog.d/13703.misc b/changelog.d/13703.misc
new file mode 100644
index 0000000000..685a29b17d
--- /dev/null
+++ b/changelog.d/13703.misc
@@ -0,0 +1 @@
+Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 543bba27c2..30983c47fb 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
@@ -203,6 +204,7 @@ class Store(
PushRuleStore,
PusherWorkerStore,
PresenceBackgroundUpdateStore,
+ ReceiptsBackgroundUpdateStore,
):
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3838409519..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
values={
"stream_id": stream_id,
"event_id": event_id,
+ "event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
},
# receipts_linearized has a unique constraint on
@@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore):
)
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+ POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_update_handler(
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+ self._populate_receipt_event_stream_ordering,
+ )
+
+ async def _populate_receipt_event_stream_ordering(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ def _populate_receipt_event_stream_ordering_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+
+ if "max_stream_id" in progress:
+ max_stream_id = progress["max_stream_id"]
+ else:
+ txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+ res = txn.fetchone()
+ if res is None or res[0] is None:
+ return True
+ else:
+ max_stream_id = res[0]
+
+ start = progress.get("stream_id", 0)
+ stop = start + batch_size
+
+ sql = """
+ UPDATE receipts_linearized
+ SET event_stream_ordering = (
+ SELECT stream_ordering
+ FROM events
+ WHERE event_id = receipts_linearized.event_id
+ )
+ WHERE stream_id >= ? AND stream_id < ?
+ """
+ txn.execute(sql, (start, stop))
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+ {
+ "stream_id": stop,
+ "max_stream_id": max_stream_id,
+ },
+ )
+
+ return stop > max_stream_id
+
+ finished = await self.db_pool.runInteraction(
+ "_remove_devices_from_device_inbox_txn",
+ _populate_receipt_event_stream_ordering_txn,
+ )
+
+ if finished:
+ await self.db_pool.updates._end_background_update(
+ self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+ )
+
+ return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_event_stream_ordering', '{}');
|