summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-11 15:19:26 +0100
committerErik Johnston <erik@matrix.org>2017-04-11 15:19:26 +0100
commit6308ac45b08ff5bf7259f09a2a767212f3f97860 (patch)
treef291978fef36204653a5fa4e06157d81c8056b10
parentComments (diff)
downloadsynapse-6308ac45b08ff5bf7259f09a2a767212f3f97860.tar.xz
Move get_interested_remotes back to presence handler
-rw-r--r--synapse/federation/transaction_queue.py41
-rw-r--r--synapse/handlers/presence.py52
2 files changed, 55 insertions, 38 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index eb361d904c..08ceda31a6 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
-from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 import synapse.metrics
 
 import logging
@@ -251,7 +251,10 @@ class TransactionQueue(object):
 
         # First we queue up the new presence by user ID, so multiple presence
         # updates in quick successtion are correctly handled
-        self.pending_presence.update({state.user_id: state for state in states})
+        self.pending_presence.update({
+            state.user_id: state for state in states
+            if self.is_mine_id(state.user_id)
+        })
 
         # We then handle the new pending presence in batches, first figuring
         # out the destinations we need to send each state to and then poking it
@@ -283,40 +286,8 @@ class TransactionQueue(object):
         Args:
             states (list(UserPresenceState))
         """
-        # First we look up the rooms each user is in (as well as any explicit
-        # subscriptions), then for each distinct room we look up the remote
-        # hosts in those rooms.
-        room_ids_to_states = {}
-        users_to_states = {}
-        for state in states.itervalues():
-            room_ids = yield self.store.get_rooms_for_user(state.user_id)
-            for room_id in room_ids:
-                room_ids_to_states.setdefault(room_id, []).append(state)
-
-            plist = yield self.store.get_presence_list_observers_accepted(
-                state.user_id,
-            )
-            for u in plist:
-                users_to_states.setdefault(u, []).append(state)
-
-        hosts_and_states = []
-        for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            hosts = yield self.store.get_hosts_in_room(room_id)
-            hosts_and_states.append((hosts, local_states))
-
-        for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            host = get_domain_from_id(user_id)
-            hosts_and_states.append(([host], local_states))
+        hosts_and_states = yield get_interested_remotes(self.store, states)
 
-        # And now finally queue up new transactions
         for destinations, states in hosts_and_states:
             for destination in destinations:
                 if not self.can_send_to(destination):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c1c0dd4d3d..685373ff28 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -633,9 +633,6 @@ class PresenceHandler(object):
             # Always notify self
             users_to_states.setdefault(state.user_id, []).append(state)
 
-        # TODO: de-dup hosts_to_states, as a single host might have multiple
-        # of same presence
-
         defer.returnValue((room_ids_to_states, users_to_states))
 
     @defer.inlineCallbacks
@@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
         persist_and_notify = True
 
     return new_state, persist_and_notify, federation_ping
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states):
+    """Given a list of presence states figure out which remote servers
+    should be sent which.
+
+    All the presence states should be for local users only.
+
+    Args:
+        store (DataStore)
+        states (list(UserPresenceState))
+
+    Returns:
+        Deferred list of ([destinations], [UserPresenceState]), where for
+        each row the list of UserPresenceState should be sent to each
+        destination
+    """
+    # First we look up the rooms each user is in (as well as any explicit
+    # subscriptions), then for each distinct room we look up the remote
+    # hosts in those rooms.
+    room_ids_to_states = {}
+    users_to_states = {}
+    for state in states.itervalues():
+        room_ids = yield store.get_rooms_for_user(state.user_id)
+        for room_id in room_ids:
+            room_ids_to_states.setdefault(room_id, []).append(state)
+
+        plist = yield store.get_presence_list_observers_accepted(
+            state.user_id,
+        )
+        for u in plist:
+            users_to_states.setdefault(u, []).append(state)
+
+    hosts_and_states = []
+    for room_id, states in room_ids_to_states.items():
+        if not local_states:
+            continue
+
+        hosts = yield store.get_hosts_in_room(room_id)
+        hosts_and_states.append((hosts, local_states))
+
+    for user_id, states in users_to_states.items():
+        if not local_states:
+            continue
+
+        host = get_domain_from_id(user_id)
+        hosts_and_states.append(([host], local_states))
+
+    defer.returnValue(hosts_and_states)