diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-09-13 08:16:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-13 08:16:37 +0100 |
commit | cdbb6412327b542e0dead792717fe58253291131 (patch) | |
tree | f683529f5175445e87b7a04b677bd6ff1a224dac /synapse/storage | |
parent | Fix GHA skippable syntax (#13778) (diff) | |
download | synapse-cdbb6412327b542e0dead792717fe58253291131.tar.xz |
Add receipts event stream ordering (#13703)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 74 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql | 19 |
2 files changed, 92 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3838409519..719a12b0ae 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore): values={ "stream_id": stream_id, "event_id": event_id, + "event_stream_ordering": stream_ordering, "data": json_encoder.encode(data), }, # receipts_linearized has a unique constraint on @@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -class ReceiptsStore(ReceiptsWorkerStore): +class ReceiptsBackgroundUpdateStore(SQLBaseStore): + POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + self._populate_receipt_event_stream_ordering, + ) + + async def _populate_receipt_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_receipt_event_stream_ordering_txn( + txn: LoggingTransaction, + ) -> bool: + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM receipts_linearized") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + UPDATE receipts_linearized + SET event_stream_ordering = ( + SELECT stream_ordering + FROM events + WHERE event_id = receipts_linearized.event_id + ) + WHERE stream_id >= ? AND stream_id < ? + """ + txn.execute(sql, (start, stop)) + + self.db_pool.updates._background_update_progress_txn( + txn, + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) + + return stop > max_stream_id + + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _populate_receipt_event_stream_ordering_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING + ) + + return batch_size + + +class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql new file mode 100644 index 0000000000..2a822f4509 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql @@ -0,0 +1,19 @@ +/* Copyright 2022 Beeper + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_event_stream_ordering', '{}'); |