summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-04-12 11:07:13 +0100
committerGitHub <noreply@github.com>2017-04-12 11:07:13 +0100
commit247c736b9b0b9b6f3280a3d8a48e1ad2dbfd3bc6 (patch)
treec3d446fba8df5b497d09f47c0702392240c55043 /synapse/handlers
parentMerge pull request #2121 from matrix-org/paul/sent-transactions-metric (diff)
parentComment (diff)
downloadsynapse-247c736b9b0b9b6f3280a3d8a48e1ad2dbfd3bc6.tar.xz
Merge pull request #2115 from matrix-org/erikj/dedupe_federation_repl
Reduce federation replication traffic
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py143
1 files changed, 76 insertions, 67 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9ed5af3cb4..f3707afcd0 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -318,11 +318,7 @@ class PresenceHandler(object):
             if to_federation_ping:
                 federation_presence_out_counter.inc_by(len(to_federation_ping))
 
-                _, _, hosts_to_states = yield self._get_interested_parties(
-                    to_federation_ping.values()
-                )
-
-                self._push_to_remotes(hosts_to_states)
+                self._push_to_remotes(to_federation_ping.values())
 
     def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -615,88 +611,39 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states, calculate_remote_hosts=True):
-        """Given a list of states return which entities (rooms, users, servers)
-        are interested in the given states.
-
-        Returns:
-            3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
-            with each item being a dict of `entity_name` -> `[UserPresenceState]`
-        """
-        room_ids_to_states = {}
-        users_to_states = {}
-        for state in states:
-            room_ids = yield self.store.get_rooms_for_user(state.user_id)
-            for room_id in room_ids:
-                room_ids_to_states.setdefault(room_id, []).append(state)
-
-            plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
-            for u in plist:
-                users_to_states.setdefault(u, []).append(state)
-
-            # Always notify self
-            users_to_states.setdefault(state.user_id, []).append(state)
-
-        hosts_to_states = {}
-        if calculate_remote_hosts:
-            for room_id, states in room_ids_to_states.items():
-                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-                if not local_states:
-                    continue
-
-                hosts = yield self.store.get_hosts_in_room(room_id)
-
-                for host in hosts:
-                    hosts_to_states.setdefault(host, []).extend(local_states)
-
-        for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            host = get_domain_from_id(user_id)
-            hosts_to_states.setdefault(host, []).extend(local_states)
-
-        # TODO: de-dup hosts_to_states, as a single host might have multiple
-        # of same presence
-
-        defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
-
-    @defer.inlineCallbacks
     def _persist_and_notify(self, states):
         """Persist states in the database, poke the notifier and send to
         interested remote servers
         """
         stream_id, max_token = yield self.store.update_presence(states)
 
-        parties = yield self._get_interested_parties(states)
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        parties = yield get_interested_parties(self.store, states)
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-        self._push_to_remotes(hosts_to_states)
+        self._push_to_remotes(states)
 
     @defer.inlineCallbacks
     def notify_for_states(self, state, stream_id):
-        parties = yield self._get_interested_parties([state])
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        parties = yield get_interested_parties(self.store, [state])
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-    def _push_to_remotes(self, hosts_to_states):
+    def _push_to_remotes(self, states):
         """Sends state updates to remote servers.
 
         Args:
-            hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+            states (list(UserPresenceState))
         """
-        for host, states in hosts_to_states.items():
-            self.federation.send_presence(host, states)
+        self.federation.send_presence(states)
 
     @defer.inlineCallbacks
     def incoming_presence(self, origin, content):
@@ -837,14 +784,13 @@ class PresenceHandler(object):
         if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
-            hosts = set(get_domain_from_id(u) for u in user_ids)
-            self._push_to_remotes({host: (state,) for host in hosts})
+            self._push_to_remotes([state])
         else:
             user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
-            self._push_to_remotes({user.domain: states.values()})
+            self._push_to_remotes(states.values())
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
@@ -1344,3 +1290,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
         persist_and_notify = True
 
     return new_state, persist_and_notify, federation_ping
+
+
+@defer.inlineCallbacks
+def get_interested_parties(store, states):
+    """Given a list of states return which entities (rooms, users)
+    are interested in the given states.
+
+    Args:
+        states (list(UserPresenceState))
+
+    Returns:
+        2-tuple: `(room_ids_to_states, users_to_states)`,
+        with each item being a dict of `entity_name` -> `[UserPresenceState]`
+    """
+    room_ids_to_states = {}
+    users_to_states = {}
+    for state in states:
+        room_ids = yield store.get_rooms_for_user(state.user_id)
+        for room_id in room_ids:
+            room_ids_to_states.setdefault(room_id, []).append(state)
+
+        plist = yield store.get_presence_list_observers_accepted(state.user_id)
+        for u in plist:
+            users_to_states.setdefault(u, []).append(state)
+
+        # Always notify self
+        users_to_states.setdefault(state.user_id, []).append(state)
+
+    defer.returnValue((room_ids_to_states, users_to_states))
+
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states):
+    """Given a list of presence states figure out which remote servers
+    should be sent which.
+
+    All the presence states should be for local users only.
+
+    Args:
+        store (DataStore)
+        states (list(UserPresenceState))
+
+    Returns:
+        Deferred list of ([destinations], [UserPresenceState]), where for
+        each row the list of UserPresenceState should be sent to each
+        destination
+    """
+    hosts_and_states = []
+
+    # First we look up the rooms each user is in (as well as any explicit
+    # subscriptions), then for each distinct room we look up the remote
+    # hosts in those rooms.
+    room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+
+    for room_id, states in room_ids_to_states.iteritems():
+        hosts = yield store.get_hosts_in_room(room_id)
+        hosts_and_states.append((hosts, states))
+
+    for user_id, states in users_to_states.iteritems():
+        host = get_domain_from_id(user_id)
+        hosts_and_states.append(([host], states))
+
+    defer.returnValue(hosts_and_states)