summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/streams/events.py45
1 files changed, 44 insertions, 1 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ad9b760713..da6d948e1b 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import heapq
+from collections import defaultdict
 from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Type, TypeVar, cast
 
 import attr
@@ -51,8 +52,19 @@ data part are:
  * The state_key of the state which has changed
  * The event id of the new state
 
+A "state-all" row is sent whenever the "current state" in a room changes, but there are
+too many state updates for a particular room in the same update. This replaces any
+"state" rows on a per-room basis. The fields in the data part are:
+
+* The room id for the state changes
+
 """
 
+# Any room with more than _MAX_STATE_UPDATES_PER_ROOM will send a EventsStreamAllStateRow
+# instead of individual EventsStreamEventRow. This is predominantly useful when
+# purging large rooms.
+_MAX_STATE_UPDATES_PER_ROOM = 150
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class EventsStreamRow:
@@ -111,9 +123,17 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow):
     event_id: Optional[str]
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EventsStreamAllStateRow(BaseEventsStreamRow):
+    TypeId = "state-all"
+
+    room_id: str
+
+
 _EventRows: Tuple[Type[BaseEventsStreamRow], ...] = (
     EventsStreamEventRow,
     EventsStreamCurrentStateRow,
+    EventsStreamAllStateRow,
 )
 
 TypeToRow = {Row.TypeId: Row for Row in _EventRows}
@@ -213,9 +233,28 @@ class EventsStream(Stream):
             if stream_id <= upper_limit
         )
 
+        # Separate out rooms that have many state updates, listeners should clear
+        # all state for those rooms.
+        state_updates_by_room = defaultdict(list)
+        for stream_id, room_id, _type, _state_key, _event_id in state_rows:
+            state_updates_by_room[room_id].append(stream_id)
+
+        state_all_rows = [
+            (stream_ids[-1], room_id)
+            for room_id, stream_ids in state_updates_by_room.items()
+            if len(stream_ids) >= _MAX_STATE_UPDATES_PER_ROOM
+        ]
+        state_all_updates: Iterable[Tuple[int, Tuple]] = (
+            (max_stream_id, (EventsStreamAllStateRow.TypeId, (room_id,)))
+            for (max_stream_id, room_id) in state_all_rows
+        )
+
+        # Any remaining state updates are sent individually.
+        state_all_rooms = {room_id for _, room_id in state_all_rows}
         state_updates: Iterable[Tuple[int, Tuple]] = (
             (stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
             for (stream_id, *rest) in state_rows
+            if rest[0] not in state_all_rooms
         )
 
         ex_outliers_updates: Iterable[Tuple[int, Tuple]] = (
@@ -224,7 +263,11 @@ class EventsStream(Stream):
         )
 
         # we need to return a sorted list, so merge them together.
-        updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
+        updates = list(
+            heapq.merge(
+                event_updates, state_all_updates, state_updates, ex_outliers_updates
+            )
+        )
         return updates, upper_limit, limited
 
     @classmethod