summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events_worker.py88
-rw-r--r--synapse/storage/databases/main/state.py34
2 files changed, 113 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..e19b16064b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -70,6 +70,7 @@ from synapse.storage.database import (
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
+    AbstractStreamIdGenerator,
     AbstractStreamIdTracker,
     MultiWriterIdGenerator,
     StreamIdGenerator,
@@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore):
             id_column="chain_id",
         )
 
+        self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
+
+        if isinstance(database.engine, PostgresEngine):
+            self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="un_partial_stated_event_stream",
+                instance_name=hs.get_instance_name(),
+                tables=[
+                    ("un_partial_stated_event_stream", "instance_name", "stream_id")
+                ],
+                sequence_name="un_partial_stated_event_stream_sequence",
+                # TODO(faster_joins, multiple writers) Support multiple writers.
+                writers=["master"],
+            )
+        else:
+            self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
+                db_conn, "un_partial_stated_event_stream", "stream_id"
+            )
+
+    def get_un_partial_stated_events_token(self) -> int:
+        # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
+        #     writers because workers that don't write often will hold all
+        #     readers up.
+        return self._un_partial_stated_events_stream_id_gen.get_current_token()
+
+    async def get_un_partial_stated_events_from_stream(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+        """Get updates for the un-partial-stated events replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_un_partial_stated_events_from_stream_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+            sql = """
+                SELECT stream_id, event_id, rejection_status_changed
+                FROM un_partial_stated_event_stream
+                WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
+            updates = [
+                (
+                    row[0],
+                    (
+                        row[1],
+                        bool(row[2]),
+                    ),
+                )
+                for row in txn
+            ]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_events_from_stream",
+            get_un_partial_stated_events_from_stream_txn,
+        )
+
     def process_replication_rows(
         self,
         stream_name: str,
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f855903c39 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         hs: "HomeServer",
     ):
         super().__init__(database, db_conn, hs)
+        self._instance_name: str = hs.get_instance_name()
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
@@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         context: EventContext,
     ) -> None:
         """Update the state group for a partial state event"""
-        await self.db_pool.runInteraction(
-            "update_state_for_partial_state_event",
-            self._update_state_for_partial_state_event_txn,
-            event,
-            context,
-        )
+        async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+            await self.db_pool.runInteraction(
+                "update_state_for_partial_state_event",
+                self._update_state_for_partial_state_event_txn,
+                event,
+                context,
+                un_partial_state_event_stream_id,
+            )
 
     def _update_state_for_partial_state_event_txn(
         self,
         txn: LoggingTransaction,
         event: EventBase,
         context: EventContext,
+        un_partial_state_event_stream_id: int,
     ) -> None:
         # we shouldn't have any outliers here
         assert not event.internal_metadata.is_outlier()
@@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         # the event may now be rejected where it was not before, or vice versa,
         # in which case we need to update the rejected flags.
-        if bool(context.rejected) != (event.rejected_reason is not None):
+        rejection_status_changed = bool(context.rejected) != (
+            event.rejected_reason is not None
+        )
+        if rejection_status_changed:
             self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
 
         self.db_pool.simple_delete_one_txn(
@@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             keyvalues={"event_id": event.event_id},
         )
 
-        # TODO(faster_joins): need to do something about workers here
-        #   https://github.com/matrix-org/synapse/issues/12994
         txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
         txn.call_after(
             self._get_state_group_for_event.prefill,
@@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             state_group,
         )
 
+        self.db_pool.simple_insert_txn(
+            txn,
+            "un_partial_stated_event_stream",
+            {
+                "stream_id": un_partial_state_event_stream_id,
+                "instance_name": self._instance_name,
+                "event_id": event.event_id,
+                "rejection_status_changed": rejection_status_changed,
+            },
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):