summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-05-04 17:59:31 +0100
committerErik Johnston <erik@matrix.org>2021-05-04 17:59:31 +0100
commit24965fc0736292b1c60a26e85d9ffb295af39346 (patch)
treebd99481156eefb11f7be2e40da640ec1ed26262f
parentMerge branch 'erikj/fix_presence_joined' into erikj/test_send (diff)
parentUse lists instead of sets where appropriate (diff)
downloadsynapse-24965fc0736292b1c60a26e85d9ffb295af39346.tar.xz
Merge branch 'erikj/efficient_presence_join' into erikj/test_send
-rw-r--r--synapse/handlers/presence.py28
1 files changed, 19 insertions, 9 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e9e0f1338f..6fd1f34289 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
                 max_pos, deltas = await self.store.get_current_state_deltas(
                     self._event_pos, room_max_stream_ordering
                 )
-                await self._handle_state_delta(deltas)
+
+                # We may get multiple deltas for different rooms, but we want to
+                # handle them on a room by room basis, so we batch them up by
+                # room.
+                deltas_by_room: Dict[str, List[JsonDict]] = {}
+                for delta in deltas:
+                    deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+
+                for room_id, deltas_for_room in deltas_by_room.items():
+                    await self._handle_state_delta(room_id, deltas_for_room)
 
                 self._event_pos = max_pos
 
@@ -1192,9 +1201,9 @@ class PresenceHandler(BasePresenceHandler):
                     max_pos
                 )
 
-    async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
-        """Process current state deltas to find new joins that need to be
-        handled.
+    async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+        """Process current state deltas for the room to find new joins that need
+        to be handled.
         """
 
         # Sets of newly joined users. Note that if the local server is
@@ -1203,9 +1212,10 @@ class PresenceHandler(BasePresenceHandler):
         newly_joined_users = set()
 
         for delta in deltas:
+            assert room_id == delta["room_id"]
+
             typ = delta["type"]
             state_key = delta["state_key"]
-            room_id = delta["room_id"]
             event_id = delta["event_id"]
             prev_event_id = delta["prev_event_id"]
 
@@ -1256,11 +1266,11 @@ class PresenceHandler(BasePresenceHandler):
 
         # Construct sets for all the local users and remote hosts that were
         # already in the room
-        prev_local_users = set()
+        prev_local_users = []
         prev_remote_hosts = set()
         for user_id in prev_users:
             if self.is_mine_id(user_id):
-                prev_local_users.add(user_id)
+                prev_local_users.append(user_id)
             else:
                 prev_remote_hosts.add(get_domain_from_id(user_id))
 
@@ -1268,11 +1278,11 @@ class PresenceHandler(BasePresenceHandler):
         # that were *not* already in the room. Care needs to be taken with the
         # calculating the remote hosts, as a host may have already been in the
         # room even if there is a newly joined user from that host.
-        newly_joined_local_users = set()
+        newly_joined_local_users = []
         newly_joined_remote_hosts = set()
         for user_id in newly_joined_users:
             if self.is_mine_id(user_id):
-                newly_joined_local_users.add(user_id)
+                newly_joined_local_users.append(user_id)
             else:
                 host = get_domain_from_id(user_id)
                 if host not in prev_remote_hosts: