diff options
author | reivilibre <oliverw@matrix.org> | 2022-12-19 14:57:51 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-19 14:57:51 +0000 |
commit | 2888d7ec83b33b3ce848d9219c921ffe0b88ffbf (patch) | |
tree | 28a37ca0169578045e5735ffdf47db83c5ca9265 /synapse/replication | |
parent | Bump sentry-sdk from 1.11.1 to 1.12.0 (#14701) (diff) | |
download | synapse-2888d7ec83b33b3ce848d9219c921ffe0b88ffbf.tar.xz |
Faster remote room joins: invalidate caches and unblock requests when receiving un-partial-stated event notifications over replication. [rei:frrj/streams/unpsr] (#14546)
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/tcp/client.py | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b4dad47b45..658d89210d 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import ( TagAccountDataStream, ToDeviceStream, TypingStream, + UnPartialStatedEventStream, UnPartialStatedRoomStream, ) from synapse.replication.tcp.streams.events import ( @@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import ( EventsStreamEventRow, EventsStreamRow, ) -from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow +from synapse.replication.tcp.streams.partial_state import ( + UnPartialStatedEventStreamRow, + UnPartialStatedRoomStreamRow, +) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -247,6 +251,14 @@ class ReplicationDataHandler: self._state_storage_controller.notify_room_un_partial_stated( row.room_id ) + elif stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + + # Wake up any tasks waiting for the event to be un-partial-stated. + self._state_storage_controller.notify_event_un_partial_stated( + row.event_id + ) await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows |