summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4770.misc1
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py31
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/typing.py2
6 files changed, 37 insertions, 19 deletions
diff --git a/changelog.d/4770.misc b/changelog.d/4770.misc
new file mode 100644
index 0000000000..144d819958
--- /dev/null
+++ b/changelog.d/4770.misc
@@ -0,0 +1 @@
+Optimise EDU transmission for the federation_sender worker.
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 6f5995735a..b7d0b25781 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -159,8 +159,12 @@ class FederationRemoteSendQueue(object):
         # stream.
         pass
 
-    def send_edu(self, destination, edu_type, content, key=None):
+    def build_and_send_edu(self, destination, edu_type, content, key=None):
         """As per TransactionQueue"""
+        if destination == self.server_name:
+            logger.info("Not sending EDU to ourselves")
+            return
+
         pos = self._next_pos()
 
         edu = Edu(
@@ -465,15 +469,11 @@ def process_rows_for_federation(transaction_queue, rows):
 
     for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
-            transaction_queue.send_edu(
-                edu.destination, edu.edu_type, edu.content, key=key,
-            )
+            transaction_queue.send_edu(edu, key)
 
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
-            transaction_queue.send_edu(
-                edu.destination, edu.edu_type, edu.content, key=None,
-            )
+            transaction_queue.send_edu(edu, None)
 
     for destination in buff.device_destinations:
         transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 30941f5ad6..e5e42c647d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -361,7 +361,19 @@ class TransactionQueue(object):
 
                 self._attempt_new_transaction(destination)
 
-    def send_edu(self, destination, edu_type, content, key=None):
+    def build_and_send_edu(self, destination, edu_type, content, key=None):
+        """Construct an Edu object, and queue it for sending
+
+        Args:
+            destination (str): name of server to send to
+            edu_type (str): type of EDU to send
+            content (dict): content of EDU
+            key (Any|None): clobbering key for this edu
+        """
+        if destination == self.server_name:
+            logger.info("Not sending EDU to ourselves")
+            return
+
         edu = Edu(
             origin=self.server_name,
             destination=destination,
@@ -369,18 +381,23 @@ class TransactionQueue(object):
             content=content,
         )
 
-        if destination == self.server_name:
-            logger.info("Not sending EDU to ourselves")
-            return
+        self.send_edu(edu, key)
+
+    def send_edu(self, edu, key):
+        """Queue an EDU for sending
 
+        Args:
+            edu (Edu): edu to send
+            key (Any|None): clobbering key for this edu
+        """
         if key:
             self.pending_edus_keyed_by_dest.setdefault(
-                destination, {}
+                edu.destination, {}
             )[(edu.edu_type, key)] = edu
         else:
-            self.pending_edus_by_dest.setdefault(destination, []).append(edu)
+            self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
 
-        self._attempt_new_transaction(destination)
+        self._attempt_new_transaction(edu.destination)
 
     def send_device_messages(self, destination):
         if destination == self.server_name:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index ba3856674d..37e87fc054 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -816,7 +816,7 @@ class PresenceHandler(object):
         if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
-            yield self.federation.send_edu(
+            yield self.federation.build_and_send_edu(
                 destination=observed_user.domain,
                 edu_type="m.presence_invite",
                 content={
@@ -836,7 +836,7 @@ class PresenceHandler(object):
         if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
-            self.federation.send_edu(
+            self.federation.build_and_send_edu(
                 destination=observer_user.domain,
                 edu_type="m.presence_accept",
                 content={
@@ -848,7 +848,7 @@ class PresenceHandler(object):
             state_dict = yield self.get_state(observed_user, as_event=False)
             state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
-            self.federation.send_edu(
+            self.federation.build_and_send_edu(
                 destination=observer_user.domain,
                 edu_type="m.presence",
                 content={
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 696469732c..8b2d03a756 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -148,7 +148,7 @@ class ReceiptsHandler(BaseHandler):
             logger.debug("Sending receipt to: %r", remotedomains)
 
             for domain in remotedomains:
-                self.federation.send_edu(
+                self.federation.build_and_send_edu(
                     destination=domain,
                     edu_type="m.receipt",
                     content={
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a61bbf9392..39df960c31 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -231,7 +231,7 @@ class TypingHandler(object):
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
                     logger.debug("sending typing update to %s", domain)
-                    self.federation.send_edu(
+                    self.federation.build_and_send_edu(
                         destination=domain,
                         edu_type="m.typing",
                         content={