summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
committerErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
commit29574fd5b3537cc272a4d792669b8d5be2a92b6f (patch)
tree18753fe16297d23997f6f001740e5e9470033b49 /synapse/handlers/presence.py
parentTypo (diff)
downloadsynapse-29574fd5b3537cc272a4d792669b8d5be2a92b6f.tar.xz
Reduce federation presence replication traffic
This is mainly done by moving the calculation of where to send presence
updates from the presence handler to the transaction queue, so we only
need to send the presence event (and not the destinations) across the
replication connection. Before we were duplicating by sending the full
state across once per destination.
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py54
1 files changed, 14 insertions, 40 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9ed5af3cb4..c1c0dd4d3d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -318,11 +318,7 @@ class PresenceHandler(object):
             if to_federation_ping:
                 federation_presence_out_counter.inc_by(len(to_federation_ping))
 
-                _, _, hosts_to_states = yield self._get_interested_parties(
-                    to_federation_ping.values()
-                )
-
-                self._push_to_remotes(hosts_to_states)
+                self._push_to_remotes(to_federation_ping.values())
 
     def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -615,12 +611,12 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states, calculate_remote_hosts=True):
+    def _get_interested_parties(self, states):
         """Given a list of states return which entities (rooms, users, servers)
         are interested in the given states.
 
         Returns:
-            3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
+            2-tuple: `(room_ids_to_states, users_to_states)`,
             with each item being a dict of `entity_name` -> `[UserPresenceState]`
         """
         room_ids_to_states = {}
@@ -637,30 +633,10 @@ class PresenceHandler(object):
             # Always notify self
             users_to_states.setdefault(state.user_id, []).append(state)
 
-        hosts_to_states = {}
-        if calculate_remote_hosts:
-            for room_id, states in room_ids_to_states.items():
-                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-                if not local_states:
-                    continue
-
-                hosts = yield self.store.get_hosts_in_room(room_id)
-
-                for host in hosts:
-                    hosts_to_states.setdefault(host, []).extend(local_states)
-
-        for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            host = get_domain_from_id(user_id)
-            hosts_to_states.setdefault(host, []).extend(local_states)
-
         # TODO: de-dup hosts_to_states, as a single host might have multiple
         # of same presence
 
-        defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
+        defer.returnValue((room_ids_to_states, users_to_states))
 
     @defer.inlineCallbacks
     def _persist_and_notify(self, states):
@@ -670,33 +646,32 @@ class PresenceHandler(object):
         stream_id, max_token = yield self.store.update_presence(states)
 
         parties = yield self._get_interested_parties(states)
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-        self._push_to_remotes(hosts_to_states)
+        self._push_to_remotes(states)
 
     @defer.inlineCallbacks
     def notify_for_states(self, state, stream_id):
         parties = yield self._get_interested_parties([state])
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-    def _push_to_remotes(self, hosts_to_states):
+    def _push_to_remotes(self, states):
         """Sends state updates to remote servers.
 
         Args:
-            hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+            hosts_to_states (list): list(state)
         """
-        for host, states in hosts_to_states.items():
-            self.federation.send_presence(host, states)
+        self.federation.send_presence(states)
 
     @defer.inlineCallbacks
     def incoming_presence(self, origin, content):
@@ -837,14 +812,13 @@ class PresenceHandler(object):
         if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
-            hosts = set(get_domain_from_id(u) for u in user_ids)
-            self._push_to_remotes({host: (state,) for host in hosts})
+            self._push_to_remotes([state])
         else:
             user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
-            self._push_to_remotes({user.domain: states.values()})
+            self._push_to_remotes(states.values())
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):