summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-13 16:26:53 -0500
committerEric Eastwood <erice@element.io>2022-09-13 16:26:53 -0500
commit19c6f6ecc90ebd36d764e51f8e859247721b2af5 (patch)
treec35cdfafebf773cf8c283743f1d04a91f837a9df /synapse
parentRemove linting from CI for now (diff)
parentMake sequence `cache_invalidation_stream_seq` begin at `2` (#13766) (diff)
downloadsynapse-19c6f6ecc90ebd36d764e51f8e859247721b2af5.tar.xz
Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	synapse/storage/schema/__init__.py
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py2
-rw-r--r--synapse/storage/databases/main/receipts.py74
-rw-r--r--synapse/storage/schema/__init__.py1
-rw-r--r--synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql19
-rw-r--r--synapse/storage/schema/main/delta/72/08begin_cache_invalidation_seq_at_2.sql.postgres23
-rw-r--r--synapse/storage/schema/state/delta/30/state_stream.sql4
6 files changed, 122 insertions, 1 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 543bba27c2..30983c47fb 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
 from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
     find_max_generated_user_id_localpart,
@@ -203,6 +204,7 @@ class Store(
     PushRuleStore,
     PusherWorkerStore,
     PresenceBackgroundUpdateStore,
+    ReceiptsBackgroundUpdateStore,
 ):
     def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3838409519..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
+                "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
@@ -830,5 +831,76 @@ class ReceiptsWorkerStore(SQLBaseStore):
         )
 
 
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+    POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+            self._populate_receipt_event_stream_ordering,
+        )
+
+    async def _populate_receipt_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_receipt_event_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                UPDATE receipts_linearized
+                SET event_stream_ordering = (
+                    SELECT stream_ordering
+                    FROM events
+                    WHERE event_id = receipts_linearized.event_id
+                )
+                WHERE stream_id >= ? AND stream_id < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop > max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _populate_receipt_event_stream_ordering_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index b0458643e6..2a85db2e60 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -76,6 +76,7 @@ Changes in SCHEMA_VERSION = 72:
     - event_edges.(room_id, is_state) are no longer written to.
     - Tables related to groups are dropped.
     - Unused column application_services_state.last_txn is dropped
+    - Cache invalidation stream id sequence now begins at 2 to match code expectation.
     - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
       from `opentracing_context` to generalized `tracing_context`.
 """
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('populate_event_stream_ordering', '{}');
diff --git a/synapse/storage/schema/main/delta/72/08begin_cache_invalidation_seq_at_2.sql.postgres b/synapse/storage/schema/main/delta/72/08begin_cache_invalidation_seq_at_2.sql.postgres
new file mode 100644
index 0000000000..69931fe971
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/08begin_cache_invalidation_seq_at_2.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+
+-- The sequence needs to begin at 2 because a bunch of code assumes that
+-- get_next_id_txn will return values >= 2, cf this comment:
+-- https://github.com/matrix-org/synapse/blob/b93bd95e8ab64d27ae26841020f62ee61272a5f2/synapse/storage/util/id_generators.py#L344
+
+SELECT setval('cache_invalidation_stream_seq', (
+    SELECT COALESCE(MAX(last_value), 1) FROM cache_invalidation_stream_seq
+));
diff --git a/synapse/storage/schema/state/delta/30/state_stream.sql b/synapse/storage/schema/state/delta/30/state_stream.sql
index e85699e82e..bdaf8b02d5 100644
--- a/synapse/storage/schema/state/delta/30/state_stream.sql
+++ b/synapse/storage/schema/state/delta/30/state_stream.sql
@@ -26,6 +26,10 @@
  * (event, state) pair, we can use that stream_ordering to identify when
  * the new state was assigned for the event.
  */
+
+/* NB: This table belongs to the `main` logical database; it should not be present
+ * in `state`.
+ */
 CREATE TABLE IF NOT EXISTS ex_outlier_stream(
     event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
     event_id TEXT NOT NULL,