Fix sending presence over federation when using workers (#10163)
When using a federation sender we'd send out all local presence updates over
federation even when they shouldn't be.
Fixes #10153.
2 files changed, 20 insertions, 6 deletions
diff --git a/changelog.d/10163.bugfix b/changelog.d/10163.bugfix
new file mode 100644
index 0000000000..7ccde66743
--- /dev/null
+++ b/changelog.d/10163.bugfix
@@ -0,0 +1 @@
+Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f5a049d754..79508580ac 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
users=users_to_states.keys(),
)
- # If this is a federation sender, notify about presence updates.
- await self.maybe_send_presence_to_interested_destinations(states)
-
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
):
@@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler):
for row in rows
]
- for state in states:
- self.user_to_current_state[state.user_id] = state
+ # The list of states to notify sync streams and remote servers about.
+ # This is calculated by comparing the old and new states for each user
+ # using `should_notify(..)`.
+ #
+ # Note that this is necessary as the presence writer will periodically
+ # flush presence state changes that should not be notified about to the
+ # DB, and so will be sent over the replication stream.
+ state_to_notify = []
+
+ for new_state in states:
+ old_state = self.user_to_current_state.get(new_state.user_id)
+ self.user_to_current_state[new_state.user_id] = new_state
+
+ if not old_state or should_notify(old_state, new_state):
+ state_to_notify.append(new_state)
stream_id = token
- await self.notify_from_replication(states, stream_id)
+ await self.notify_from_replication(state_to_notify, stream_id)
+
+ # If this is a federation sender, notify about presence updates.
+ await self.maybe_send_presence_to_interested_destinations(state_to_notify)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
|