summary refs log tree commit diff
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2023-02-07 15:26:55 +0000
committerDavid Robertson <davidr@element.io>2023-02-07 15:26:55 +0000
commit9cd7610f86ab5051c9365dd38d1eec405a5f8ca6 (patch)
treeb29e58313a8dd1894f1dd88d856fb55e62ed5e78
parent1.77.0rc1 (diff)
downloadsynapse-9cd7610f86ab5051c9365dd38d1eec405a5f8ca6.tar.xz
Revert "Add `event_stream_ordering` column to membership state tables (#14979)"
This reverts commit 5fdc12f482c68e2cdbb78d7db5de2cfe621720d4.
-rw-r--r--synapse/storage/databases/main/events.py23
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py104
-rw-r--r--synapse/storage/databases/main/events_worker.py8
-rw-r--r--synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql21
4 files changed, 11 insertions, 145 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b6cce0a7cc..1536937b67 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1147,15 +1147,11 @@ class PersistEventsStore:
                 # been inserted into room_memberships.
                 txn.execute_batch(
                     """INSERT INTO current_state_events
-                        (room_id, type, state_key, event_id, membership, event_stream_ordering)
-                    VALUES (
-                        ?, ?, ?, ?,
-                        (SELECT membership FROM room_memberships WHERE event_id = ?),
-                        (SELECT stream_ordering FROM events WHERE event_id = ?)
-                    )
+                        (room_id, type, state_key, event_id, membership)
+                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
                     """,
                     [
-                        (room_id, key[0], key[1], ev_id, ev_id, ev_id)
+                        (room_id, key[0], key[1], ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                     ],
                 )
@@ -1182,15 +1178,11 @@ class PersistEventsStore:
             if to_insert:
                 txn.execute_batch(
                     """INSERT INTO local_current_membership
-                        (room_id, user_id, event_id, membership, event_stream_ordering)
-                    VALUES (
-                        ?, ?, ?,
-                        (SELECT membership FROM room_memberships WHERE event_id = ?),
-                        (SELECT stream_ordering FROM events WHERE event_id = ?)
-                    )
+                        (room_id, user_id, event_id, membership)
+                    VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
                     """,
                     [
-                        (room_id, key[1], ev_id, ev_id, ev_id)
+                        (room_id, key[1], ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                         if key[0] == EventTypes.Member and self.is_mine_id(key[1])
                     ],
@@ -1798,7 +1790,6 @@ class PersistEventsStore:
             table="room_memberships",
             keys=(
                 "event_id",
-                "event_stream_ordering",
                 "user_id",
                 "sender",
                 "room_id",
@@ -1809,7 +1800,6 @@ class PersistEventsStore:
             values=[
                 (
                     event.event_id,
-                    event.internal_metadata.stream_ordering,
                     event.state_key,
                     event.user_id,
                     event.room_id,
@@ -1842,7 +1832,6 @@ class PersistEventsStore:
                     keyvalues={"room_id": event.room_id, "user_id": event.state_key},
                     values={
                         "event_id": event.event_id,
-                        "event_stream_ordering": event.internal_metadata.stream_ordering,
                         "membership": event.membership,
                     },
                 )
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0e81d38cca..b9d3c36d60 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, ca
 
 import attr
 
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import EventContentFields, RelationTypes
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import make_event_from_dict
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -71,10 +71,6 @@ class _BackgroundUpdates:
 
     EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
 
-    POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
-        "populate_membership_event_stream_ordering"
-    )
-
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -104,10 +100,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         super().__init__(database, db_conn, hs)
 
         self.db_pool.updates.register_background_update_handler(
-            _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
-            self._populate_membership_event_stream_ordering,
-        )
-        self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
             self._background_reindex_origin_server_ts,
         )
@@ -1506,97 +1498,3 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             )
 
         return batch_size
-
-    async def _populate_membership_event_stream_ordering(
-        self, progress: JsonDict, batch_size: int
-    ) -> int:
-        def _populate_membership_event_stream_ordering(
-            txn: LoggingTransaction,
-        ) -> bool:
-
-            if "max_stream_ordering" in progress:
-                max_stream_ordering = progress["max_stream_ordering"]
-            else:
-                txn.execute("SELECT max(stream_ordering) FROM events")
-                res = txn.fetchone()
-                if res is None or res[0] is None:
-                    return True
-                else:
-                    max_stream_ordering = res[0]
-
-            start = progress.get("stream_ordering", 0)
-            stop = start + batch_size
-
-            sql = f"""
-                SELECT room_id, event_id, stream_ordering
-                FROM events
-                WHERE
-                    type = '{EventTypes.Member}'
-                    AND stream_ordering >= ?
-                    AND stream_ordering < ?
-            """
-            txn.execute(sql, (start, stop))
-
-            rows: List[Tuple[str, str, int]] = cast(
-                List[Tuple[str, str, int]], txn.fetchall()
-            )
-
-            event_ids: List[Tuple[str]] = []
-            event_stream_orderings: List[Tuple[int]] = []
-
-            for _, event_id, event_stream_ordering in rows:
-                event_ids.append((event_id,))
-                event_stream_orderings.append((event_stream_ordering,))
-
-            self.db_pool.simple_update_many_txn(
-                txn,
-                table="current_state_events",
-                key_names=("event_id",),
-                key_values=event_ids,
-                value_names=("event_stream_ordering",),
-                value_values=event_stream_orderings,
-            )
-
-            self.db_pool.simple_update_many_txn(
-                txn,
-                table="room_memberships",
-                key_names=("event_id",),
-                key_values=event_ids,
-                value_names=("event_stream_ordering",),
-                value_values=event_stream_orderings,
-            )
-
-            # NOTE: local_current_membership has no index on event_id, so only
-            # the room ID here will reduce the query rows read.
-            for room_id, event_id, event_stream_ordering in rows:
-                txn.execute(
-                    """
-                        UPDATE local_current_membership
-                        SET event_stream_ordering = ?
-                        WHERE room_id = ? AND event_id = ?
-                    """,
-                    (event_stream_ordering, room_id, event_id),
-                )
-
-            self.db_pool.updates._background_update_progress_txn(
-                txn,
-                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
-                {
-                    "stream_ordering": stop,
-                    "max_stream_ordering": max_stream_ordering,
-                },
-            )
-
-            return stop > max_stream_ordering
-
-        finished = await self.db_pool.runInteraction(
-            "_populate_membership_event_stream_ordering",
-            _populate_membership_event_stream_ordering,
-        )
-
-        if finished:
-            await self.db_pool.updates._end_background_update(
-                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
-            )
-
-        return batch_size
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6d0ef10258..d7d08369ca 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1779,7 +1779,7 @@ class EventsWorkerStore(SQLBaseStore):
             txn: LoggingTransaction,
         ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
             sql = (
-                "SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type,"
+                "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
                 " e.outlier"
                 " FROM events AS e"
@@ -1791,10 +1791,10 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN event_relations USING (event_id)"
                 " LEFT JOIN room_memberships USING (event_id)"
                 " LEFT JOIN rejections USING (event_id)"
-                " WHERE ? < out.event_stream_ordering"
-                " AND out.event_stream_ordering <= ?"
+                " WHERE ? < event_stream_ordering"
+                " AND event_stream_ordering <= ?"
                 " AND out.instance_name = ?"
-                " ORDER BY out.event_stream_ordering ASC"
+                " ORDER BY event_stream_ordering ASC"
             )
 
             txn.execute(sql, (last_id, current_id, instance_name))
diff --git a/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql
deleted file mode 100644
index 7c30a67fc4..0000000000
--- a/synapse/storage/schema/main/delta/73/26membership_tables_event_stream_ordering.sql
+++ /dev/null
@@ -1,21 +0,0 @@
-/* Copyright 2022 Beeper
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
-ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
-ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
-
-INSERT INTO background_updates (update_name, progress_json) VALUES
-  ('populate_membership_event_stream_ordering', '{}');