summary refs log tree commit diff
path: root/synapse/storage/databases/main/state.py
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-12-14 14:47:11 +0000
committerGitHub <noreply@github.com>2022-12-14 14:47:11 +0000
commitfb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7 (patch)
treee7422eeab193b3d60b9d29676a0356669feb37fd /synapse/storage/databases/main/state.py
parentDelete event_push_summary_unique_index again. (#14669) (diff)
downloadsynapse-fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7.tar.xz
Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] (#14545)
Diffstat (limited to 'synapse/storage/databases/main/state.py')
-rw-r--r--synapse/storage/databases/main/state.py34
1 files changed, 25 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f855903c39 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         hs: "HomeServer",
     ):
         super().__init__(database, db_conn, hs)
+        self._instance_name: str = hs.get_instance_name()
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
@@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         context: EventContext,
     ) -> None:
         """Update the state group for a partial state event"""
-        await self.db_pool.runInteraction(
-            "update_state_for_partial_state_event",
-            self._update_state_for_partial_state_event_txn,
-            event,
-            context,
-        )
+        async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+            await self.db_pool.runInteraction(
+                "update_state_for_partial_state_event",
+                self._update_state_for_partial_state_event_txn,
+                event,
+                context,
+                un_partial_state_event_stream_id,
+            )
 
     def _update_state_for_partial_state_event_txn(
         self,
         txn: LoggingTransaction,
         event: EventBase,
         context: EventContext,
+        un_partial_state_event_stream_id: int,
     ) -> None:
         # we shouldn't have any outliers here
         assert not event.internal_metadata.is_outlier()
@@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         # the event may now be rejected where it was not before, or vice versa,
         # in which case we need to update the rejected flags.
-        if bool(context.rejected) != (event.rejected_reason is not None):
+        rejection_status_changed = bool(context.rejected) != (
+            event.rejected_reason is not None
+        )
+        if rejection_status_changed:
             self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
 
         self.db_pool.simple_delete_one_txn(
@@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             keyvalues={"event_id": event.event_id},
         )
 
-        # TODO(faster_joins): need to do something about workers here
-        #   https://github.com/matrix-org/synapse/issues/12994
         txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
         txn.call_after(
             self._get_state_group_for_event.prefill,
@@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             state_group,
         )
 
+        self.db_pool.simple_insert_txn(
+            txn,
+            "un_partial_stated_event_stream",
+            {
+                "stream_id": un_partial_state_event_stream_id,
+                "instance_name": self._instance_name,
+                "event_id": event.event_id,
+                "rejection_status_changed": rejection_status_changed,
+            },
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):