summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/receipts.py74
-rw-r--r--synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql19
2 files changed, 92 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3838409519..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
+                "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
@@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore):
         )
 
 
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+    POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+            self._populate_receipt_event_stream_ordering,
+        )
+
+    async def _populate_receipt_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_receipt_event_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                UPDATE receipts_linearized
+                SET event_stream_ordering = (
+                    SELECT stream_ordering
+                    FROM events
+                    WHERE event_id = receipts_linearized.event_id
+                )
+                WHERE stream_id >= ? AND stream_id < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop > max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _populate_receipt_event_stream_ordering_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('populate_event_stream_ordering', '{}');