diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ad9b760713..da6d948e1b 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
+from collections import defaultdict
from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, TypeVar, cast
import attr
@@ -51,8 +52,19 @@ data part are:
* The state_key of the state which has changed
* The event id of the new state
+A "state-all" row is sent whenever the "current state" in a room changes, but there are
+too many state updates for a particular room in the same update. This replaces any
+"state" rows on a per-room basis. The fields in the data part are:
+
+* The room id for the state changes
+
"""
+# Any room with more than _MAX_STATE_UPDATES_PER_ROOM will send a EventsStreamAllStateRow
+# instead of individual EventsStreamEventRow. This is predominantly useful when
+# purging large rooms.
+_MAX_STATE_UPDATES_PER_ROOM = 150
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventsStreamRow:
@@ -111,9 +123,17 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
event_id: Optional[str]
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EventsStreamAllStateRow(BaseEventsStreamRow):
+ TypeId = "state-all"
+
+ room_id: str
+
+
_EventRows: Tuple[Type[BaseEventsStreamRow], ...] = (
EventsStreamEventRow,
EventsStreamCurrentStateRow,
+ EventsStreamAllStateRow,
)
TypeToRow = {Row.TypeId: Row for Row in _EventRows}
@@ -213,9 +233,28 @@ class EventsStream(Stream):
if stream_id <= upper_limit
)
+ # Separate out rooms that have many state updates, listeners should clear
+ # all state for those rooms.
+ state_updates_by_room = defaultdict(list)
+ for stream_id, room_id, _type, _state_key, _event_id in state_rows:
+ state_updates_by_room[room_id].append(stream_id)
+
+ state_all_rows = [
+ (stream_ids[-1], room_id)
+ for room_id, stream_ids in state_updates_by_room.items()
+ if len(stream_ids) >= _MAX_STATE_UPDATES_PER_ROOM
+ ]
+ state_all_updates: Iterable[Tuple[int, Tuple]] = (
+ (max_stream_id, (EventsStreamAllStateRow.TypeId, (room_id,)))
+ for (max_stream_id, room_id) in state_all_rows
+ )
+
+ # Any remaining state updates are sent individually.
+ state_all_rooms = {room_id for _, room_id in state_all_rows}
state_updates: Iterable[Tuple[int, Tuple]] = (
(stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
for (stream_id, *rest) in state_rows
+ if rest[0] not in state_all_rooms
)
ex_outliers_updates: Iterable[Tuple[int, Tuple]] = (
@@ -224,7 +263,11 @@ class EventsStream(Stream):
)
# we need to return a sorted list, so merge them together.
- updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
+ updates = list(
+ heapq.merge(
+ event_updates, state_all_updates, state_updates, ex_outliers_updates
+ )
+ )
return updates, upper_limit, limited
@classmethod
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2fbd389c71..4d0470ffd9 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -23,6 +23,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
+ EventsStreamAllStateRow,
EventsStreamCurrentStateRow,
EventsStreamEventRow,
EventsStreamRow,
@@ -264,6 +265,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
+ elif row.type == EventsStreamAllStateRow.TypeId:
+ assert isinstance(data, EventsStreamAllStateRow)
+ # Similar to the above, but the entire caches are invalidated. This is
+ # unfortunate for the membership caches, but should recover quickly.
+ self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
+ self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
+ self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
|