summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events.py23
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py104
-rw-r--r--synapse/storage/databases/main/events_worker.py8
3 files changed, 124 insertions, 11 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1536937b67..b6cce0a7cc 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1147,11 +1147,15 @@ class PersistEventsStore:
                 # been inserted into room_memberships.
                 txn.execute_batch(
                     """INSERT INTO current_state_events
-                        (room_id, type, state_key, event_id, membership)
-                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                        (room_id, type, state_key, event_id, membership, event_stream_ordering)
+                    VALUES (
+                        ?, ?, ?, ?,
+                        (SELECT membership FROM room_memberships WHERE event_id = ?),
+                        (SELECT stream_ordering FROM events WHERE event_id = ?)
+                    )
                     """,
                     [
-                        (room_id, key[0], key[1], ev_id, ev_id)
+                        (room_id, key[0], key[1], ev_id, ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                     ],
                 )
@@ -1178,11 +1182,15 @@ class PersistEventsStore:
             if to_insert:
                 txn.execute_batch(
                     """INSERT INTO local_current_membership
-                        (room_id, user_id, event_id, membership)
-                    VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                        (room_id, user_id, event_id, membership, event_stream_ordering)
+                    VALUES (
+                        ?, ?, ?,
+                        (SELECT membership FROM room_memberships WHERE event_id = ?),
+                        (SELECT stream_ordering FROM events WHERE event_id = ?)
+                    )
                     """,
                     [
-                        (room_id, key[1], ev_id, ev_id)
+                        (room_id, key[1], ev_id, ev_id, ev_id)
                         for key, ev_id in to_insert.items()
                         if key[0] == EventTypes.Member and self.is_mine_id(key[1])
                     ],
@@ -1790,6 +1798,7 @@ class PersistEventsStore:
             table="room_memberships",
             keys=(
                 "event_id",
+                "event_stream_ordering",
                 "user_id",
                 "sender",
                 "room_id",
@@ -1800,6 +1809,7 @@ class PersistEventsStore:
             values=[
                 (
                     event.event_id,
+                    event.internal_metadata.stream_ordering,
                     event.state_key,
                     event.user_id,
                     event.room_id,
@@ -1832,6 +1842,7 @@ class PersistEventsStore:
                     keyvalues={"room_id": event.room_id, "user_id": event.state_key},
                     values={
                         "event_id": event.event_id,
+                        "event_stream_ordering": event.internal_metadata.stream_ordering,
                         "membership": event.membership,
                     },
                 )
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index b9d3c36d60..0e81d38cca 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, ca
 
 import attr
 
-from synapse.api.constants import EventContentFields, RelationTypes
+from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import make_event_from_dict
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -71,6 +71,10 @@ class _BackgroundUpdates:
 
     EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
 
+    POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
+        "populate_membership_event_stream_ordering"
+    )
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -100,6 +104,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         super().__init__(database, db_conn, hs)
 
         self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
+            self._populate_membership_event_stream_ordering,
+        )
+        self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
             self._background_reindex_origin_server_ts,
         )
@@ -1498,3 +1506,97 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             )
 
         return batch_size
+
+    async def _populate_membership_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_membership_event_stream_ordering(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_ordering" in progress:
+                max_stream_ordering = progress["max_stream_ordering"]
+            else:
+                txn.execute("SELECT max(stream_ordering) FROM events")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_ordering = res[0]
+
+            start = progress.get("stream_ordering", 0)
+            stop = start + batch_size
+
+            sql = f"""
+                SELECT room_id, event_id, stream_ordering
+                FROM events
+                WHERE
+                    type = '{EventTypes.Member}'
+                    AND stream_ordering >= ?
+                    AND stream_ordering < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            rows: List[Tuple[str, str, int]] = cast(
+                List[Tuple[str, str, int]], txn.fetchall()
+            )
+
+            event_ids: List[Tuple[str]] = []
+            event_stream_orderings: List[Tuple[int]] = []
+
+            for _, event_id, event_stream_ordering in rows:
+                event_ids.append((event_id,))
+                event_stream_orderings.append((event_stream_ordering,))
+
+            self.db_pool.simple_update_many_txn(
+                txn,
+                table="current_state_events",
+                key_names=("event_id",),
+                key_values=event_ids,
+                value_names=("event_stream_ordering",),
+                value_values=event_stream_orderings,
+            )
+
+            self.db_pool.simple_update_many_txn(
+                txn,
+                table="room_memberships",
+                key_names=("event_id",),
+                key_values=event_ids,
+                value_names=("event_stream_ordering",),
+                value_values=event_stream_orderings,
+            )
+
+            # NOTE: local_current_membership has no index on event_id, so only
+            # the room ID here will reduce the query rows read.
+            for room_id, event_id, event_stream_ordering in rows:
+                txn.execute(
+                    """
+                        UPDATE local_current_membership
+                        SET event_stream_ordering = ?
+                        WHERE room_id = ? AND event_id = ?
+                    """,
+                    (event_stream_ordering, room_id, event_id),
+                )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
+                {
+                    "stream_ordering": stop,
+                    "max_stream_ordering": max_stream_ordering,
+                },
+            )
+
+            return stop > max_stream_ordering
+
+        finished = await self.db_pool.runInteraction(
+            "_populate_membership_event_stream_ordering",
+            _populate_membership_event_stream_ordering,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d7d08369ca..6d0ef10258 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1779,7 +1779,7 @@ class EventsWorkerStore(SQLBaseStore):
             txn: LoggingTransaction,
         ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
             sql = (
-                "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+                "SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
                 " e.outlier"
                 " FROM events AS e"
@@ -1791,10 +1791,10 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN event_relations USING (event_id)"
                 " LEFT JOIN room_memberships USING (event_id)"
                 " LEFT JOIN rejections USING (event_id)"
-                " WHERE ? < event_stream_ordering"
-                " AND event_stream_ordering <= ?"
+                " WHERE ? < out.event_stream_ordering"
+                " AND out.event_stream_ordering <= ?"
                 " AND out.instance_name = ?"
-                " ORDER BY event_stream_ordering ASC"
+                " ORDER BY out.event_stream_ordering ASC"
             )
 
             txn.execute(sql, (last_id, current_id, instance_name))