summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py7
-rw-r--r--synapse/federation/sender/per_destination_queue.py17
-rw-r--r--synapse/federation/sender/transaction_manager.py26
3 files changed, 22 insertions, 28 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4662008bfd..5276c1734f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -108,8 +108,6 @@ class FederationSender(object):
             ),
         )
 
-        self._order = 1
-
         self._is_processing = False
         self._last_poked_id = -1
 
@@ -272,9 +270,6 @@ class FederationSender(object):
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
-        order = self._order
-        self._order += 1
-
         destinations = set(destinations)
         destinations.discard(self.server_name)
         logger.debug("Sending to: %s", str(destinations))
@@ -286,7 +281,7 @@ class FederationSender(object):
         sent_pdus_destination_dist_count.inc()
 
         for destination in destinations:
-            self._get_per_destination_queue(destination).send_pdu(pdu, order)
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c09ffcaf4c..f1534d431d 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -92,8 +92,8 @@ class PerDestinationQueue(object):
         self._destination = destination
         self.transmission_loop_running = False
 
-        # a list of tuples of (pending pdu, order)
-        self._pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        # a list of pending PDUs
+        self._pending_pdus = []  # type: List[EventBase]
 
         # XXX this is never actually used: see
         # https://github.com/matrix-org/synapse/issues/7549
@@ -132,14 +132,13 @@ class PerDestinationQueue(object):
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdu(self, pdu: EventBase, order: int) -> None:
+    def send_pdu(self, pdu: EventBase) -> None:
         """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
             pdu: pdu to send
-            order
         """
-        self._pending_pdus.append((pdu, order))
+        self._pending_pdus.append(pdu)
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -185,7 +184,7 @@ class PerDestinationQueue(object):
         returns immediately. Otherwise kicks off the process of sending a
         transaction in the background.
         """
-        # list of (pending_pdu, deferred, order)
+
         if self.transmission_loop_running:
             # XXX: this can get stuck on by a never-ending
             # request at which point pending_pdus just keeps growing.
@@ -210,7 +209,7 @@ class PerDestinationQueue(object):
         )
 
     async def _transaction_transmission_loop(self) -> None:
-        pending_pdus = []  # type: List[Tuple[EventBase, int]]
+        pending_pdus = []  # type: List[EventBase]
         try:
             self.transmission_loop_running = True
 
@@ -373,13 +372,13 @@ class PerDestinationQueue(object):
                 "TX [%s] Failed to send transaction: %s", self._destination, e
             )
 
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
         except Exception:
             logger.exception("TX [%s] Failed to send transaction", self._destination)
-            for p, _ in pending_pdus:
+            for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 9bd534a313..0ebc70d57d 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, List, Tuple
+from typing import TYPE_CHECKING, List
 
 from synapse.api.errors import HttpResponseException
 from synapse.events import EventBase
@@ -53,11 +53,17 @@ class TransactionManager(object):
 
     @measure_func("_send_new_transaction")
     async def send_new_transaction(
-        self,
-        destination: str,
-        pending_pdus: List[Tuple[EventBase, int]],
-        pending_edus: List[Edu],
-    ):
+        self, destination: str, pdus: List[EventBase], edus: List[Edu],
+    ) -> bool:
+        """
+        Args:
+            destination: The destination to send to (e.g. 'example.org')
+            pdus: In-order list of PDUs to send
+            edus: List of EDUs to send
+
+        Returns:
+            True iff the transaction was successful
+        """
 
         # Make a transaction-sending opentracing span. This span follows on from
         # all the edus in that transaction. This needs to be done since there is
@@ -67,7 +73,7 @@ class TransactionManager(object):
         span_contexts = []
         keep_destination = whitelisted_homeserver(destination)
 
-        for edu in pending_edus:
+        for edu in edus:
             context = edu.get_context()
             if context:
                 span_contexts.append(extract_text_map(json_decoder.decode(context)))
@@ -75,12 +81,6 @@ class TransactionManager(object):
                 edu.strip_context()
 
         with start_active_span_follows_from("send_transaction", span_contexts):
-
-            # Sort based on the order field
-            pending_pdus.sort(key=lambda t: t[1])
-            pdus = [x[0] for x in pending_pdus]
-            edus = pending_edus
-
             success = True
 
             logger.debug("TX [%s] _attempt_new_transaction", destination)