summary refs log tree commit diff
path: root/synapse/storage/databases/main/state.py
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-12-19 14:57:51 +0000
committerGitHub <noreply@github.com>2022-12-19 14:57:51 +0000
commit2888d7ec83b33b3ce848d9219c921ffe0b88ffbf (patch)
tree28a37ca0169578045e5735ffdf47db83c5ca9265 /synapse/storage/databases/main/state.py
parentBump sentry-sdk from 1.11.1 to 1.12.0 (#14701) (diff)
downloadsynapse-2888d7ec83b33b3ce848d9219c921ffe0b88ffbf.tar.xz
Faster remote room joins: invalidate caches and unblock requests when receiving un-partial-stated event notifications over replication. [rei:frrj/streams/unpsr] (#14546)
Diffstat (limited to 'synapse/storage/databases/main/state.py')
-rw-r--r--synapse/storage/databases/main/state.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py

index f855903c39..f32cbb2dec 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -14,7 +14,7 @@ # limitations under the License. import collections.abc import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple import attr @@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.opentracing import trace +from synapse.replication.tcp.streams import UnPartialStatedEventStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -82,6 +84,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): super().__init__(database, db_conn, hs) self._instance_name: str = hs.get_instance_name() + def process_replication_rows( + self, + stream_name: str, + instance_name: str, + token: int, + rows: Iterable[Any], + ) -> None: + if stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + self._get_state_group_for_event.invalidate((row.event_id,)) + + super().process_replication_rows(stream_name, instance_name, token, rows) + async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room Raises: