diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 053cf66b28..2a5f1a007d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -803,17 +803,25 @@ class PresenceHandler(object):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
- deltas = yield self.store.get_current_state_deltas(self._event_pos)
- if not deltas:
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self._event_pos == room_max_stream_ordering:
return
+ logger.debug(
+ "Processing presence stats %s->%s",
+ self._event_pos,
+ room_max_stream_ordering,
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self._event_pos, room_max_stream_ordering
+ )
yield self._handle_state_delta(deltas)
- self._event_pos = deltas[-1]["stream_id"]
+ self._event_pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
- self._event_pos
+ max_pos
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1edc657f8a..380e2fad5e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,23 +203,11 @@ class RoomMemberHandler(object):
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
- yield self._user_joined_room(target, room_id)
-
- # Copy over direct message status and room tags if this is a join
- # on an upgraded room
-
- # Check if this is an upgraded room
- predecessor = yield self.store.get_room_predecessor(room_id)
-
- if predecessor:
- # It is an upgraded room. Copy over old tags
- yield self.copy_room_tags_and_direct_to_room(
- predecessor["room_id"], room_id, user_id
- )
- # Copy over push rules
- yield self.store.copy_push_rules_from_room_to_room_for_user(
- predecessor["room_id"], room_id, user_id
+ # Copy over user state if we're joining an upgraded room
+ yield self.copy_user_state_if_room_upgrade(
+ room_id, requester.user.to_string()
)
+ yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -463,10 +451,16 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
- ret = yield self._remote_join(
+ remote_join_response = yield self._remote_join(
requester, remote_room_hosts, room_id, target, content
)
- return ret
+
+ # Copy over user state if this is a join on an remote upgraded room
+ yield self.copy_user_state_if_room_upgrade(
+ room_id, requester.user.to_string()
+ )
+
+ return remote_join_response
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
@@ -504,6 +498,38 @@ class RoomMemberHandler(object):
return res
@defer.inlineCallbacks
+ def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
+ """Copy user-specific information when they join a new room if that new room is the
+ result of a room upgrade
+
+ Args:
+ new_room_id (str): The ID of the room the user is joining
+ user_id (str): The ID of the user
+
+ Returns:
+ Deferred
+ """
+ # Check if the new room is an upgraded room
+ predecessor = yield self.store.get_room_predecessor(new_room_id)
+ if not predecessor:
+ return
+
+ logger.debug(
+ "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+ new_room_id,
+ predecessor,
+ )
+
+ # It is an upgraded room. Copy over old tags
+ yield self.copy_room_tags_and_direct_to_room(
+ predecessor["room_id"], new_room_id, user_id
+ )
+ # Copy over push rules
+ yield self.store.copy_push_rules_from_room_to_room_for_user(
+ predecessor["room_id"], new_room_id, user_id
+ )
+
+ @defer.inlineCallbacks
def send_membership_event(self, requester, event, context, ratelimit=True):
"""
Change the membership status of a user in a room.
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index c62b113115..466daf9202 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler):
# Be sure to read the max stream_ordering *before* checking if there are any outstanding
# deltas, since there is otherwise a chance that we could miss updates which arrive
# after we check the deltas.
- room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos == room_max_stream_ordering:
break
- deltas = yield self.store.get_current_state_deltas(self.pos)
+ logger.debug(
+ "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self.pos, room_max_stream_ordering
+ )
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
-
- max_pos = deltas[-1]["stream_id"]
else:
room_deltas = {}
user_deltas = {}
- max_pos = room_max_stream_ordering
# Then count deltas for total_events and total_event_bytes.
room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index e53669e40d..624f05ab5b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
- if not deltas:
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos == room_max_stream_ordering:
return
+ logger.debug(
+ "Processing user stats %s->%s", self.pos, room_max_stream_ordering
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self.pos, room_max_stream_ordering
+ )
+
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
- self.pos = deltas[-1]["stream_id"]
+ self.pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("user_dir").set(
- self.pos
+ max_pos
)
- yield self.store.update_user_directory_stream_pos(self.pos)
+ yield self.store.update_user_directory_stream_pos(max_pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
|