diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index eb361d904c..08ceda31a6 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
-from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
import logging
@@ -251,7 +251,10 @@ class TransactionQueue(object):
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
- self.pending_presence.update({state.user_id: state for state in states})
+ self.pending_presence.update({
+ state.user_id: state for state in states
+ if self.is_mine_id(state.user_id)
+ })
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
@@ -283,40 +286,8 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
- # First we look up the rooms each user is in (as well as any explicit
- # subscriptions), then for each distinct room we look up the remote
- # hosts in those rooms.
- room_ids_to_states = {}
- users_to_states = {}
- for state in states.itervalues():
- room_ids = yield self.store.get_rooms_for_user(state.user_id)
- for room_id in room_ids:
- room_ids_to_states.setdefault(room_id, []).append(state)
-
- plist = yield self.store.get_presence_list_observers_accepted(
- state.user_id,
- )
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
- hosts_and_states = []
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- hosts = yield self.store.get_hosts_in_room(room_id)
- hosts_and_states.append((hosts, local_states))
-
- for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- host = get_domain_from_id(user_id)
- hosts_and_states.append(([host], local_states))
+ hosts_and_states = yield get_interested_remotes(self.store, states)
- # And now finally queue up new transactions
for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c1c0dd4d3d..685373ff28 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -633,9 +633,6 @@ class PresenceHandler(object):
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
- # TODO: de-dup hosts_to_states, as a single host might have multiple
- # of same presence
-
defer.returnValue((room_ids_to_states, users_to_states))
@defer.inlineCallbacks
@@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
persist_and_notify = True
return new_state, persist_and_notify, federation_ping
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states):
+ """Given a list of presence states figure out which remote servers
+ should be sent which.
+
+ All the presence states should be for local users only.
+
+ Args:
+ store (DataStore)
+ states (list(UserPresenceState))
+
+ Returns:
+ Deferred list of ([destinations], [UserPresenceState]), where for
+ each row the list of UserPresenceState should be sent to each
+ destination
+ """
+ # First we look up the rooms each user is in (as well as any explicit
+ # subscriptions), then for each distinct room we look up the remote
+ # hosts in those rooms.
+ room_ids_to_states = {}
+ users_to_states = {}
+ for state in states.itervalues():
+ room_ids = yield store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
+
+ plist = yield store.get_presence_list_observers_accepted(
+ state.user_id,
+ )
+ for u in plist:
+ users_to_states.setdefault(u, []).append(state)
+
+ hosts_and_states = []
+ for room_id, states in room_ids_to_states.items():
+ if not local_states:
+ continue
+
+ hosts = yield store.get_hosts_in_room(room_id)
+ hosts_and_states.append((hosts, local_states))
+
+ for user_id, states in users_to_states.items():
+ if not local_states:
+ continue
+
+ host = get_domain_from_id(user_id)
+ hosts_and_states.append(([host], local_states))
+
+ defer.returnValue(hosts_and_states)
|