diff options
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 7 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 17 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 26 |
3 files changed, 22 insertions, 28 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4662008bfd..5276c1734f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -108,8 +108,6 @@ class FederationSender(object): ), ) - self._order = 1 - self._is_processing = False self._last_poked_id = -1 @@ -272,9 +270,6 @@ class FederationSender(object): # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. - order = self._order - self._order += 1 - destinations = set(destinations) destinations.discard(self.server_name) logger.debug("Sending to: %s", str(destinations)) @@ -286,7 +281,7 @@ class FederationSender(object): sent_pdus_destination_dist_count.inc() for destination in destinations: - self._get_per_destination_queue(destination).send_pdu(pdu, order) + self._get_per_destination_queue(destination).send_pdu(pdu) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c09ffcaf4c..f1534d431d 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -92,8 +92,8 @@ class PerDestinationQueue(object): self._destination = destination self.transmission_loop_running = False - # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + # a list of pending PDUs + self._pending_pdus = [] # type: List[EventBase] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 @@ -132,14 +132,13 @@ class PerDestinationQueue(object): + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase, order: int) -> None: + def send_pdu(self, pdu: EventBase) -> None: """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send - order """ - self._pending_pdus.append((pdu, order)) + self._pending_pdus.append(pdu) self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: @@ -185,7 +184,7 @@ class PerDestinationQueue(object): returns immediately. Otherwise kicks off the process of sending a transaction in the background. """ - # list of (pending_pdu, deferred, order) + if self.transmission_loop_running: # XXX: this can get stuck on by a never-ending # request at which point pending_pdus just keeps growing. @@ -210,7 +209,7 @@ class PerDestinationQueue(object): ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[Tuple[EventBase, int]] + pending_pdus = [] # type: List[EventBase] try: self.transmission_loop_running = True @@ -373,13 +372,13 @@ class PerDestinationQueue(object): "TX [%s] Failed to send transaction: %s", self._destination, e ) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 9bd534a313..0ebc70d57d 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Tuple +from typing import TYPE_CHECKING, List from synapse.api.errors import HttpResponseException from synapse.events import EventBase @@ -53,11 +53,17 @@ class TransactionManager(object): @measure_func("_send_new_transaction") async def send_new_transaction( - self, - destination: str, - pending_pdus: List[Tuple[EventBase, int]], - pending_edus: List[Edu], - ): + self, destination: str, pdus: List[EventBase], edus: List[Edu], + ) -> bool: + """ + Args: + destination: The destination to send to (e.g. 'example.org') + pdus: In-order list of PDUs to send + edus: List of EDUs to send + + Returns: + True iff the transaction was successful + """ # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -67,7 +73,7 @@ class TransactionManager(object): span_contexts = [] keep_destination = whitelisted_homeserver(destination) - for edu in pending_edus: + for edu in edus: context = edu.get_context() if context: span_contexts.append(extract_text_map(json_decoder.decode(context))) @@ -75,12 +81,6 @@ class TransactionManager(object): edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): - - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - success = True logger.debug("TX [%s] _attempt_new_transaction", destination) |