diff options
author | Erik Johnston <erik@matrix.org> | 2021-06-11 13:08:30 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2021-06-11 15:20:54 +0100 |
commit | 5e0b4719ea6650596470f2d3bff91a19096067b8 (patch) | |
tree | 9040d0e1e5f2c3101a271c21cc164aef1041e88a /synapse/handlers | |
parent | Fix bug when running presence off master (#10149) (diff) | |
download | synapse-5e0b4719ea6650596470f2d3bff91a19096067b8.tar.xz |
Fix sending presence over federation when using workers (#10163)
When using a federation sender we'd send out all local presence updates over federation even when they shouldn't be. Fixes #10153.
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/presence.py | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f5a049d754..79508580ac 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler): users=users_to_states.keys(), ) - # If this is a federation sender, notify about presence updates. - await self.maybe_send_presence_to_interested_destinations(states) - async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: list ): @@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler): for row in rows ] - for state in states: - self.user_to_current_state[state.user_id] = state + # The list of states to notify sync streams and remote servers about. + # This is calculated by comparing the old and new states for each user + # using `should_notify(..)`. + # + # Note that this is necessary as the presence writer will periodically + # flush presence state changes that should not be notified about to the + # DB, and so will be sent over the replication stream. + state_to_notify = [] + + for new_state in states: + old_state = self.user_to_current_state.get(new_state.user_id) + self.user_to_current_state[new_state.user_id] = new_state + + if not old_state or should_notify(old_state, new_state): + state_to_notify.append(new_state) stream_id = token - await self.notify_from_replication(states, stream_id) + await self.notify_from_replication(state_to_notify, stream_id) + + # If this is a federation sender, notify about presence updates. + await self.maybe_send_presence_to_interested_destinations(state_to_notify) def get_currently_syncing_users_for_replication(self) -> Iterable[str]: return [ |