summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-09-04 11:02:10 +0100
committerBrendan Abolivier <babolivier@matrix.org>2020-09-04 11:02:10 +0100
commitcc23d81a74caead82fa97bddd535b29bb9e1df56 (patch)
tree7013f8ae7aaac8867313ac9e86ac90f93abbbff8 /synapse/federation
parentMerge branch 'develop' into matrix-org-hotfixes (diff)
parentRevert "Add experimental support for sharding event persister. (#8170)" (#8242) (diff)
downloadsynapse-cc23d81a74caead82fa97bddd535b29bb9e1df56.tar.xz
Merge branch 'develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/sender/__init__.py7
-rw-r--r--synapse/federation/sender/per_destination_queue.py17
-rw-r--r--synapse/federation/sender/transaction_manager.py26
3 files changed, 22 insertions, 28 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 399b764840..88474492e5 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -108,8 +108,6 @@ class FederationSender(object): ), ) - self._order = 1 - self._is_processing = False self._last_poked_id = -1 @@ -290,9 +288,6 @@ class FederationSender(object): # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. - order = self._order - self._order += 1 - destinations = set(destinations) destinations.discard(self.server_name) logger.debug("Sending to: %s", str(destinations)) @@ -304,7 +299,7 @@ class FederationSender(object): sent_pdus_destination_dist_count.inc() for destination in destinations: - self._get_per_destination_queue(destination).send_pdu(pdu, order) + self._get_per_destination_queue(destination).send_pdu(pdu) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 374c5610b4..c624a1cfdf 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -95,8 +95,8 @@ class PerDestinationQueue(object): self._destination = destination self.transmission_loop_running = False - # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + # a list of pending PDUs + self._pending_pdus = [] # type: List[EventBase] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 @@ -135,14 +135,13 @@ class PerDestinationQueue(object): + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase, order: int) -> None: + def send_pdu(self, pdu: EventBase) -> None: """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send - order """ - self._pending_pdus.append((pdu, order)) + self._pending_pdus.append(pdu) self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: @@ -188,7 +187,7 @@ class PerDestinationQueue(object): returns immediately. Otherwise kicks off the process of sending a transaction in the background. """ - # list of (pending_pdu, deferred, order) + if self.transmission_loop_running: # XXX: this can get stuck on by a never-ending # request at which point pending_pdus just keeps growing. @@ -213,7 +212,7 @@ class PerDestinationQueue(object): ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[Tuple[EventBase, int]] + pending_pdus = [] # type: List[EventBase] try: self.transmission_loop_running = True @@ -388,13 +387,13 @@ class PerDestinationQueue(object): "TX [%s] Failed to send transaction: %s", self._destination, e ) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) - for p, _ in pending_pdus: + for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index ca768db3e9..031d80593d 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Tuple +from typing import TYPE_CHECKING, List from synapse.api.errors import HttpResponseException from synapse.events import EventBase @@ -57,11 +57,17 @@ class TransactionManager(object): @measure_func("_send_new_transaction") async def send_new_transaction( - self, - destination: str, - pending_pdus: List[Tuple[EventBase, int]], - pending_edus: List[Edu], - ): + self, destination: str, pdus: List[EventBase], edus: List[Edu], + ) -> bool: + """ + Args: + destination: The destination to send to (e.g. 'example.org') + pdus: In-order list of PDUs to send + edus: List of EDUs to send + + Returns: + True iff the transaction was successful + """ # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -71,7 +77,7 @@ class TransactionManager(object): span_contexts = [] keep_destination = whitelisted_homeserver(destination) - for edu in pending_edus: + for edu in edus: context = edu.get_context() if context: span_contexts.append(extract_text_map(json_decoder.decode(context))) @@ -79,12 +85,6 @@ class TransactionManager(object): edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): - - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - success = True logger.debug("TX [%s] _attempt_new_transaction", destination)