diff --git a/changelog.d/14546.misc b/changelog.d/14546.misc
new file mode 100644
index 0000000000..60b6761a51
--- /dev/null
+++ b/changelog.d/14546.misc
@@ -0,0 +1 @@
+Faster remote room joins: stream the un-partial-stating of events over replication.
\ No newline at end of file
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b4dad47b45..658d89210d 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -36,6 +36,7 @@ from synapse.replication.tcp.streams import (
TagAccountDataStream,
ToDeviceStream,
TypingStream,
+ UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
@@ -43,7 +44,10 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
+from synapse.replication.tcp.streams.partial_state import (
+ UnPartialStatedEventStreamRow,
+ UnPartialStatedRoomStreamRow,
+)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
@@ -247,6 +251,14 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
+ elif stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+
+ # Wake up any tasks waiting for the event to be un-partial-stated.
+ self._state_storage_controller.notify_event_un_partial_stated(
+ row.event_id
+ )
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e19b16064b..761b15a815 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -59,8 +59,9 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.replication.tcp.streams import BackfillStream
+from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
from synapse.replication.tcp.streams.events import EventsStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -391,6 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
+ elif stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+
+ self.is_partial_state_event.invalidate((row.event_id,))
+
+ if row.rejection_status_changed:
+ # If the partial-stated event became rejected or unrejected
+ # when it wasn't before, we need to invalidate this cache.
+ self._invalidate_local_get_event_cache(row.event_id)
super().process_replication_rows(stream_name, instance_name, token, rows)
@@ -2380,6 +2391,9 @@ class EventsWorkerStore(SQLBaseStore):
This can happen, for example, when resyncing state during a faster join.
+ It is the caller's responsibility to ensure that other workers are
+ sent a notification so that they call `_invalidate_local_get_event_cache()`.
+
Args:
txn:
event_id: ID of event to update
@@ -2418,14 +2432,3 @@ class EventsWorkerStore(SQLBaseStore):
)
self.invalidate_get_event_cache_after_txn(txn, event_id)
-
- # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
- # call '_send_invalidation_to_replication', but we actually need the other
- # end to call _invalidate_local_get_event_cache() rather than (just)
- # _get_event_cache.invalidate().
- #
- # One solution might be to (somehow) get the workers to call
- # _invalidate_caches_for_event() (though that will invalidate more than
- # strictly necessary).
- #
- # https://github.com/matrix-org/synapse/issues/12994
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index f855903c39..f32cbb2dec 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -14,7 +14,7 @@
# limitations under the License.
import collections.abc
import logging
-from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple
import attr
@@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
+from synapse.replication.tcp.streams import UnPartialStatedEventStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -82,6 +84,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name()
+ def process_replication_rows(
+ self,
+ stream_name: str,
+ instance_name: str,
+ token: int,
+ rows: Iterable[Any],
+ ) -> None:
+ if stream_name == UnPartialStatedEventStream.NAME:
+ for row in rows:
+ assert isinstance(row, UnPartialStatedEventStreamRow)
+ self._get_state_group_for_event.invalidate((row.event_id,))
+
+ super().process_replication_rows(stream_name, instance_name, token, rows)
+
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
Raises:
|