summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py69
-rw-r--r--synapse/federation/transaction_queue.py86
2 files changed, 110 insertions, 45 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b952e59518..93e5acebc1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
 
-        self.presence_map = {}
-        self.presence_changed = sorteddict()
+        self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
+        self.presence_changed = sorteddict()  # Stream position -> user_id
 
-        self.keyed_edu = {}
-        self.keyed_edu_changed = sorteddict()
+        self.keyed_edu = {}  # (destination, key) -> EDU
+        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()
+        self.edus = sorteddict()  # stream position -> Edu
 
-        self.failures = sorteddict()
+        self.failures = sorteddict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()
+        self.device_messages = sorteddict()  # stream position -> destination
 
         self.pos = 1
         self.pos_time = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
                 del self.presence_changed[key]
 
             user_ids = set(
-                user_id for uids in self.presence_changed.values() for _, user_id in uids
+                user_id
+                for uids in self.presence_changed.itervalues()
+                for user_id in uids
             )
 
             to_del = [
@@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_presence(self, destination, states):
-        """As per TransactionQueue"""
+    def send_presence(self, states):
+        """As per TransactionQueue
+
+        Args:
+            states (list(UserPresenceState))
+        """
         pos = self._next_pos()
 
-        self.presence_map.update({
-            state.user_id: state
-            for state in states
-        })
+        # We only want to send presence for our own users, so lets always just
+        # filter here just in case.
+        local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
 
-        self.presence_changed[pos] = [
-            (destination, state.user_id) for state in states
-        ]
+        self.presence_map.update({state.user_id: state for state in local_states})
+        self.presence_changed[pos] = [state.user_id for state in local_states]
 
         self.notifier.on_new_replication_data()
 
@@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object):
         keys = self.presence_changed.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        dest_user_ids = set(
-            (pos, dest_user_id)
+        dest_user_ids = [
+            (pos, user_id)
             for pos in keys[i:j]
-            for dest_user_id in self.presence_changed[pos]
-        )
+            for user_id in self.presence_changed[pos]
+        ]
 
-        for (key, (dest, user_id)) in dest_user_ids:
+        for (key, user_id) in dest_user_ids:
             rows.append((key, PresenceRow(
-                destination=dest,
                 state=self.presence_map[user_id],
             )))
 
@@ -357,7 +361,6 @@ class BaseFederationRow(object):
 
 
 class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
-    "destination",  # str
     "state",  # UserPresenceState
 ))):
     TypeId = "p"
@@ -365,18 +368,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
     @staticmethod
     def from_data(data):
         return PresenceRow(
-            destination=data["destination"],
-            state=UserPresenceState.from_dict(data["state"])
+            state=UserPresenceState.from_dict(data)
         )
 
     def to_data(self):
-        return {
-            "destination": self.destination,
-            "state": self.state.as_dict()
-        }
+        return self.state.as_dict()
 
     def add_to_buffer(self, buff):
-        buff.presence.setdefault(self.destination, []).append(self.state)
+        buff.presence.append(self.state)
 
 
 class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@@ -487,7 +486,7 @@ TypeToRow = {
 
 
 ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
-    "presence",  # dict of destination -> [UserPresenceState]
+    "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
     "failures",  # dict of destination -> [failures]
@@ -509,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence={},
+        presence=[],
         keyed_edus={},
         edus={},
         failures={},
@@ -526,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows):
         parsed_row = RowType.from_data(row.data)
         parsed_row.add_to_buffer(buff)
 
-    for destination, states in buff.presence.iteritems():
-        transaction_queue.send_presence(destination, states)
+    if buff.presence:
+        transaction_queue.send_presence(buff.presence)
 
     for destination, edu_map in buff.keyed_edus.iteritems():
         for key, edu in edu_map.items():
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 0e43891647..dee387eb7f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,11 +21,11 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
-from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 import synapse.metrics
 
 import logging
@@ -79,8 +79,18 @@ class TransactionQueue(object):
         # destination -> list of tuple(edu, deferred)
         self.pending_edus_by_dest = edus = {}
 
-        # Presence needs to be separate as we send single aggragate EDUs
+        # Map of user_id -> UserPresenceState for all the pending presence
+        # to be sent out by user_id. Entries here get processed and put in
+        # pending_presence_by_dest
+        self.pending_presence = {}
+
+        # Map of destination -> user_id -> UserPresenceState of pending presence
+        # to be sent to each destinations
         self.pending_presence_by_dest = presence = {}
+
+        # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
+        # based on their key (e.g. typing events by room_id)
+        # Map of destination -> (edu_type, key) -> Edu
         self.pending_edus_keyed_by_dest = edus_keyed = {}
 
         metrics.register_callback(
@@ -115,6 +125,8 @@ class TransactionQueue(object):
         self._is_processing = False
         self._last_poked_id = -1
 
+        self._processing_pending_presence = False
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -226,17 +238,71 @@ class TransactionQueue(object):
                 self._attempt_new_transaction, destination
             )
 
-    def send_presence(self, destination, states):
-        if not self.can_send_to(destination):
-            return
+    @preserve_fn  # the caller should not yield on this
+    @defer.inlineCallbacks
+    def send_presence(self, states):
+        """Send the new presence states to the appropriate destinations.
 
-        self.pending_presence_by_dest.setdefault(destination, {}).update({
+        This actually queues up the presence states ready for sending and
+        triggers a background task to process them and send out the transactions.
+
+        Args:
+            states (list(UserPresenceState))
+        """
+
+        # First we queue up the new presence by user ID, so multiple presence
+        # updates in quick successtion are correctly handled
+        # We only want to send presence for our own users, so lets always just
+        # filter here just in case.
+        self.pending_presence.update({
             state.user_id: state for state in states
+            if self.is_mine_id(state.user_id)
         })
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+        # We then handle the new pending presence in batches, first figuring
+        # out the destinations we need to send each state to and then poking it
+        # to attempt a new transaction. We linearize this so that we don't
+        # accidentally mess up the ordering and send multiple presence updates
+        # in the wrong order
+        if self._processing_pending_presence:
+            return
+
+        self._processing_pending_presence = True
+        try:
+            while True:
+                states_map = self.pending_presence
+                self.pending_presence = {}
+
+                if not states_map:
+                    break
+
+                yield self._process_presence_inner(states_map.values())
+        finally:
+            self._processing_pending_presence = False
+
+    @measure_func("txnqueue._process_presence")
+    @defer.inlineCallbacks
+    def _process_presence_inner(self, states):
+        """Given a list of states populate self.pending_presence_by_dest and
+        poke to send a new transaction to each destination
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        hosts_and_states = yield get_interested_remotes(self.store, states)
+
+        for destinations, states in hosts_and_states:
+            for destination in destinations:
+                if not self.can_send_to(destination):
+                    continue
+
+                self.pending_presence_by_dest.setdefault(
+                    destination, {}
+                ).update({
+                    state.user_id: state for state in states
+                })
+
+                preserve_fn(self._attempt_new_transaction)(destination)
 
     def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(