summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py99
1 files changed, 90 insertions, 9 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c27ce7c5f3..fd9e1fa01c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,7 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
@@ -78,6 +78,7 @@ class TransactionQueue(object):
         self.pending_edus_by_dest = edus = {}
 
         # Presence needs to be separate as we send single aggragate EDUs
+        self.pending_presence = {}
         self.pending_presence_by_dest = presence = {}
         self.pending_edus_keyed_by_dest = edus_keyed = {}
 
@@ -113,6 +114,8 @@ class TransactionQueue(object):
         self._is_processing = False
         self._last_poked_id = -1
 
+        self._processing_pending_presence = False
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -224,17 +227,95 @@ class TransactionQueue(object):
                 self._attempt_new_transaction, destination
             )
 
-    def send_presence(self, destination, states):
-        if not self.can_send_to(destination):
+    @preserve_fn
+    @defer.inlineCallbacks
+    def send_presence(self, states):
+        """Send the new presence states to the appropriate destinations.
+
+        Args:
+            states (list(UserPresenceState))
+        """
+
+        # First we queue up the new presence by user ID, so multiple presence
+        # updates in quick successtion are correctly handled
+        self.pending_presence.update({state.user_id: state for state in states})
+
+        # We then handle the new pending presence in batches, first figuring
+        # out the destinations we need to send each state to and then poking it
+        # to attempt a new transaction. We linearize this so that we don't
+        # accidentally mess up the ordering and send multiple presence updates
+        # in the wrong order
+        if self._processing_pending_presence:
             return
 
-        self.pending_presence_by_dest.setdefault(destination, {}).update({
-            state.user_id: state for state in states
-        })
+        self._processing_pending_presence = True
+        try:
+            while True:
+                states = self.pending_presence
+                self.pending_presence = {}
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+                if not states:
+                    break
+
+                yield self._process_presence_inner(states)
+        finally:
+            self._processing_pending_presence = False
+
+    @measure_func("txnqueue._process_presence")
+    @defer.inlineCallbacks
+    def _process_presence_inner(self, states):
+        """Given a list of states populate self.pending_presence_by_dest and
+        poke to send a new transaction to each destination
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        # First we look up the rooms each user is in (as well as any explicit
+        # subscriptions), then for each distinct room we look up the remote
+        # hosts in those rooms.
+        room_ids_to_states = {}
+        users_to_states = {}
+        for state in states.itervalues():
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
+
+            plist = yield self.store.get_presence_list_observers_accepted(
+                state.user_id,
+            )
+            for u in plist:
+                users_to_states.setdefault(u, []).append(state)
+
+        hosts_and_states = []
+        for room_id, states in room_ids_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            hosts = yield self.store.get_hosts_in_room(room_id)
+            hosts_and_states.append((hosts, local_states))
+
+        for user_id, states in users_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            host = get_domain_from_id(user_id)
+            hosts_and_states.append(([host], local_states))
+
+        # And now finally queue up new transactions
+        for destinations, states in hosts_and_states:
+            for destination in destinations:
+                if not self.can_send_to(destination):
+                    continue
+
+                self.pending_presence_by_dest.setdefault(
+                    destination, {}
+                ).update({
+                    state.user_id: state for state in states
+                })
+
+                preserve_fn(self._attempt_new_transaction)(destination)
 
     def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(