diff options
author | reivilibre <oliverw@matrix.org> | 2022-12-14 14:47:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-14 14:47:11 +0000 |
commit | fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7 (patch) | |
tree | e7422eeab193b3d60b9d29676a0356669feb37fd /synapse/storage/databases/main/state.py | |
parent | Delete event_push_summary_unique_index again. (#14669) (diff) | |
download | synapse-fb60cb16fe3cf26fbd947eec926cb4b24b8e9fc7.tar.xz |
Faster remote room joins: stream the un-partial-stating of events over replication. [rei:frrj/streams/unpsr] (#14545)
Diffstat (limited to 'synapse/storage/databases/main/state.py')
-rw-r--r-- | synapse/storage/databases/main/state.py | 34 |
1 files changed, 25 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index c801a93b5b..f855903c39 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self._instance_name: str = hs.get_instance_name() async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room @@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): context: EventContext, ) -> None: """Update the state group for a partial state event""" - await self.db_pool.runInteraction( - "update_state_for_partial_state_event", - self._update_state_for_partial_state_event_txn, - event, - context, - ) + async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id: + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + un_partial_state_event_stream_id, + ) def _update_state_for_partial_state_event_txn( self, txn: LoggingTransaction, event: EventBase, context: EventContext, + un_partial_state_event_stream_id: int, ) -> None: # we shouldn't have any outliers here assert not event.internal_metadata.is_outlier() @@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # the event may now be rejected where it was not before, or vice versa, # in which case we need to update the rejected flags. - if bool(context.rejected) != (event.rejected_reason is not None): + rejection_status_changed = bool(context.rejected) != ( + event.rejected_reason is not None + ) + if rejection_status_changed: self.mark_event_rejected_txn(txn, event.event_id, context.rejected) self.db_pool.simple_delete_one_txn( @@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): keyvalues={"event_id": event.event_id}, ) - # TODO(faster_joins): need to do something about workers here - # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, @@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): state_group, ) + self.db_pool.simple_insert_txn( + txn, + "un_partial_stated_event_stream", + { + "stream_id": un_partial_state_event_stream_id, + "instance_name": self._instance_name, + "event_id": event.event_id, + "rejection_status_changed": rejection_status_changed, + }, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): |