summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
committerErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
commit29574fd5b3537cc272a4d792669b8d5be2a92b6f (patch)
tree18753fe16297d23997f6f001740e5e9470033b49 /synapse/federation/transaction_queue.py
parentTypo (diff)
downloadsynapse-29574fd5b3537cc272a4d792669b8d5be2a92b6f.tar.xz
Reduce federation presence replication traffic
This is mainly done by moving the calculation of where to send presence
updates from the presence handler to the transaction queue, so we only
need to send the presence event (and not the destinations) across the
replication connection. Before we were duplicating by sending the full
state across once per destination.
Diffstat (limited to '')
-rw-r--r--synapse/federation/transaction_queue.py99
1 files changed, 90 insertions, 9 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c27ce7c5f3..fd9e1fa01c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,7 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
@@ -78,6 +78,7 @@ class TransactionQueue(object):
         self.pending_edus_by_dest = edus = {}
 
         # Presence needs to be separate as we send single aggragate EDUs
+        self.pending_presence = {}
         self.pending_presence_by_dest = presence = {}
         self.pending_edus_keyed_by_dest = edus_keyed = {}
 
@@ -113,6 +114,8 @@ class TransactionQueue(object):
         self._is_processing = False
         self._last_poked_id = -1
 
+        self._processing_pending_presence = False
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -224,17 +227,95 @@ class TransactionQueue(object):
                 self._attempt_new_transaction, destination
             )
 
-    def send_presence(self, destination, states):
-        if not self.can_send_to(destination):
+    @preserve_fn
+    @defer.inlineCallbacks
+    def send_presence(self, states):
+        """Send the new presence states to the appropriate destinations.
+
+        Args:
+            states (list(UserPresenceState))
+        """
+
+        # First we queue up the new presence by user ID, so multiple presence
+        # updates in quick successtion are correctly handled
+        self.pending_presence.update({state.user_id: state for state in states})
+
+        # We then handle the new pending presence in batches, first figuring
+        # out the destinations we need to send each state to and then poking it
+        # to attempt a new transaction. We linearize this so that we don't
+        # accidentally mess up the ordering and send multiple presence updates
+        # in the wrong order
+        if self._processing_pending_presence:
             return
 
-        self.pending_presence_by_dest.setdefault(destination, {}).update({
-            state.user_id: state for state in states
-        })
+        self._processing_pending_presence = True
+        try:
+            while True:
+                states = self.pending_presence
+                self.pending_presence = {}
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+                if not states:
+                    break
+
+                yield self._process_presence_inner(states)
+        finally:
+            self._processing_pending_presence = False
+
+    @measure_func("txnqueue._process_presence")
+    @defer.inlineCallbacks
+    def _process_presence_inner(self, states):
+        """Given a list of states populate self.pending_presence_by_dest and
+        poke to send a new transaction to each destination
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        # First we look up the rooms each user is in (as well as any explicit
+        # subscriptions), then for each distinct room we look up the remote
+        # hosts in those rooms.
+        room_ids_to_states = {}
+        users_to_states = {}
+        for state in states.itervalues():
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
+
+            plist = yield self.store.get_presence_list_observers_accepted(
+                state.user_id,
+            )
+            for u in plist:
+                users_to_states.setdefault(u, []).append(state)
+
+        hosts_and_states = []
+        for room_id, states in room_ids_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            hosts = yield self.store.get_hosts_in_room(room_id)
+            hosts_and_states.append((hosts, local_states))
+
+        for user_id, states in users_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            host = get_domain_from_id(user_id)
+            hosts_and_states.append(([host], local_states))
+
+        # And now finally queue up new transactions
+        for destinations, states in hosts_and_states:
+            for destination in destinations:
+                if not self.can_send_to(destination):
+                    continue
+
+                self.pending_presence_by_dest.setdefault(
+                    destination, {}
+                ).update({
+                    state.user_id: state for state in states
+                })
+
+                preserve_fn(self._attempt_new_transaction)(destination)
 
     def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(