summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-10 11:51:50 +0100
committerErik Johnston <erik@matrix.org>2019-10-10 11:51:50 +0100
commit9970f955ce1ab36d21f7824d9f1382ab1df778e5 (patch)
tree94f824e633ac9b9fab18171a67a6b94aec2839a9 /synapse/handlers
parentNewsfile (diff)
parentFix races in room stats (and other) updates. (#6187) (diff)
downloadsynapse-9970f955ce1ab36d21f7824d9f1382ab1df778e5.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/patch_inner
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/room_member.py62
-rw-r--r--synapse/handlers/stats.py12
-rw-r--r--synapse/handlers/user_directory.py17
4 files changed, 75 insertions, 32 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 053cf66b28..2a5f1a007d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -803,17 +803,25 @@ class PresenceHandler(object):
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "presence_delta"):
-                deltas = yield self.store.get_current_state_deltas(self._event_pos)
-                if not deltas:
+                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+                if self._event_pos == room_max_stream_ordering:
                     return
 
+                logger.debug(
+                    "Processing presence stats %s->%s",
+                    self._event_pos,
+                    room_max_stream_ordering,
+                )
+                max_pos, deltas = yield self.store.get_current_state_deltas(
+                    self._event_pos, room_max_stream_ordering
+                )
                 yield self._handle_state_delta(deltas)
 
-                self._event_pos = deltas[-1]["stream_id"]
+                self._event_pos = max_pos
 
                 # Expose current event processing position to prometheus
                 synapse.metrics.event_processing_positions.labels("presence").set(
-                    self._event_pos
+                    max_pos
                 )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1edc657f8a..380e2fad5e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,23 +203,11 @@ class RoomMemberHandler(object):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield self._user_joined_room(target, room_id)
-
-            # Copy over direct message status and room tags if this is a join
-            # on an upgraded room
-
-            # Check if this is an upgraded room
-            predecessor = yield self.store.get_room_predecessor(room_id)
-
-            if predecessor:
-                # It is an upgraded room. Copy over old tags
-                yield self.copy_room_tags_and_direct_to_room(
-                    predecessor["room_id"], room_id, user_id
-                )
-                # Copy over push rules
-                yield self.store.copy_push_rules_from_room_to_room_for_user(
-                    predecessor["room_id"], room_id, user_id
+                # Copy over user state if we're joining an upgraded room
+                yield self.copy_user_state_if_room_upgrade(
+                    room_id, requester.user.to_string()
                 )
+                yield self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -463,10 +451,16 @@ class RoomMemberHandler(object):
                 if requester.is_guest:
                     content["kind"] = "guest"
 
-                ret = yield self._remote_join(
+                remote_join_response = yield self._remote_join(
                     requester, remote_room_hosts, room_id, target, content
                 )
-                return ret
+
+                # Copy over user state if this is a join on an remote upgraded room
+                yield self.copy_user_state_if_room_upgrade(
+                    room_id, requester.user.to_string()
+                )
+
+                return remote_join_response
 
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
@@ -504,6 +498,38 @@ class RoomMemberHandler(object):
         return res
 
     @defer.inlineCallbacks
+    def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
+        """Copy user-specific information when they join a new room if that new room is the
+        result of a room upgrade
+
+        Args:
+            new_room_id (str): The ID of the room the user is joining
+            user_id (str): The ID of the user
+
+        Returns:
+            Deferred
+        """
+        # Check if the new room is an upgraded room
+        predecessor = yield self.store.get_room_predecessor(new_room_id)
+        if not predecessor:
+            return
+
+        logger.debug(
+            "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+            new_room_id,
+            predecessor,
+        )
+
+        # It is an upgraded room. Copy over old tags
+        yield self.copy_room_tags_and_direct_to_room(
+            predecessor["room_id"], new_room_id, user_id
+        )
+        # Copy over push rules
+        yield self.store.copy_push_rules_from_room_to_room_for_user(
+            predecessor["room_id"], new_room_id, user_id
+        )
+
+    @defer.inlineCallbacks
     def send_membership_event(self, requester, event, context, ratelimit=True):
         """
         Change the membership status of a user in a room.
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index c62b113115..466daf9202 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler):
             # Be sure to read the max stream_ordering *before* checking if there are any outstanding
             # deltas, since there is otherwise a chance that we could miss updates which arrive
             # after we check the deltas.
-            room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
             if self.pos == room_max_stream_ordering:
                 break
 
-            deltas = yield self.store.get_current_state_deltas(self.pos)
+            logger.debug(
+                "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+            )
+            max_pos, deltas = yield self.store.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
 
             if deltas:
                 logger.debug("Handling %d state deltas", len(deltas))
                 room_deltas, user_deltas = yield self._handle_deltas(deltas)
-
-                max_pos = deltas[-1]["stream_id"]
             else:
                 room_deltas = {}
                 user_deltas = {}
-                max_pos = room_max_stream_ordering
 
             # Then count deltas for total_events and total_event_bytes.
             room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index e53669e40d..624f05ab5b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler):
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "user_dir_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
+                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+                if self.pos == room_max_stream_ordering:
                     return
 
+                logger.debug(
+                    "Processing user stats %s->%s", self.pos, room_max_stream_ordering
+                )
+                max_pos, deltas = yield self.store.get_current_state_deltas(
+                    self.pos, room_max_stream_ordering
+                )
+
                 logger.info("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
-                self.pos = deltas[-1]["stream_id"]
+                self.pos = max_pos
 
                 # Expose current event processing position to prometheus
                 synapse.metrics.event_processing_positions.labels("user_dir").set(
-                    self.pos
+                    max_pos
                 )
 
-                yield self.store.update_user_directory_stream_pos(self.pos)
+                yield self.store.update_user_directory_stream_pos(max_pos)
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):