summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
committerErik Johnston <erik@matrix.org>2017-04-10 16:48:30 +0100
commit29574fd5b3537cc272a4d792669b8d5be2a92b6f (patch)
tree18753fe16297d23997f6f001740e5e9470033b49 /synapse/federation/send_queue.py
parentTypo (diff)
downloadsynapse-29574fd5b3537cc272a4d792669b8d5be2a92b6f.tar.xz
Reduce federation presence replication traffic
This is mainly done by moving the calculation of where to send presence
updates from the presence handler to the transaction queue, so we only
need to send the presence event (and not the destinations) across the
replication connection. Before we were duplicating by sending the full
state across once per destination.
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py47
1 files changed, 20 insertions, 27 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 748548bbe2..a12c18f4df 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}
         self.presence_changed = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
                 del self.presence_changed[key]
 
             user_ids = set(
-                user_id for uids in self.presence_changed.values() for _, user_id in uids
+                user_id
+                for uids in self.presence_changed.itervalues()
+                for user_id in uids
             )
 
             to_del = [
@@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_presence(self, destination, states):
+    def send_presence(self, states):
         """As per TransactionQueue"""
         pos = self._next_pos()
 
-        self.presence_map.update({
-            state.user_id: state
-            for state in states
-        })
+        local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
 
-        self.presence_changed[pos] = [
-            (destination, state.user_id) for state in states
-        ]
+        self.presence_map.update({state.user_id: state for state in local_states})
+        self.presence_changed[pos] = [state.user_id for state in local_states]
 
         self.notifier.on_new_replication_data()
 
@@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object):
         keys = self.presence_changed.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        dest_user_ids = set(
-            (pos, dest_user_id)
+        dest_user_ids = [
+            (pos, user_id)
             for pos in keys[i:j]
-            for dest_user_id in self.presence_changed[pos]
-        )
+            for user_id in self.presence_changed[pos]
+        ]
 
-        for (key, (dest, user_id)) in dest_user_ids:
+        for (key, user_id) in dest_user_ids:
             rows.append((key, PresenceRow(
-                destination=dest,
                 state=self.presence_map[user_id],
             )))
 
@@ -354,7 +352,6 @@ class BaseFederationRow(object):
 
 
 class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
-    "destination",  # str
     "state",  # UserPresenceState
 ))):
     TypeId = "p"
@@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
     @staticmethod
     def from_data(data):
         return PresenceRow(
-            destination=data["destination"],
-            state=UserPresenceState.from_dict(data["state"])
+            state=UserPresenceState.from_dict(data)
         )
 
     def to_data(self):
-        return {
-            "destination": self.destination,
-            "state": self.state.as_dict()
-        }
+        return self.state.as_dict()
 
     def add_to_buffer(self, buff):
-        buff.presence.setdefault(self.destination, []).append(self.state)
+        buff.presence.append(self.state)
 
 
 class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@@ -469,7 +462,7 @@ TypeToRow = {
 
 
 ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
-    "presence",  # dict of destination -> [UserPresenceState]
+    "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
     "failures",  # dict of destination -> [failures]
@@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence={},
+        presence=[],
         keyed_edus={},
         edus={},
         failures={},
@@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows):
         parsed_row = RowType.from_data(row.data)
         parsed_row.add_to_buffer(buff)
 
-    for destination, states in buff.presence.iteritems():
-        transaction_queue.send_presence(destination, states)
+    if buff.presence:
+        transaction_queue.send_presence(buff.presence)
 
     for destination, edu_map in buff.keyed_edus.iteritems():
         for key, edu in edu_map.items():