summary refs log tree commit diff
path: root/synapse/federation/sender/transaction_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/transaction_manager.py')
-rw-r--r--synapse/federation/sender/transaction_manager.py33
1 files changed, 16 insertions, 17 deletions
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py

index c7f6cb3d73..c84072ab73 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Tuple - -from canonicaljson import json +from typing import TYPE_CHECKING, List from synapse.api.errors import HttpResponseException from synapse.events import EventBase @@ -28,6 +26,7 @@ from synapse.logging.opentracing import ( tags, whitelisted_homeserver, ) +from synapse.util import json_decoder from synapse.util.metrics import measure_func if TYPE_CHECKING: @@ -36,7 +35,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class TransactionManager(object): +class TransactionManager: """Helper class which handles building and sending transactions shared between PerDestinationQueue objects @@ -54,11 +53,17 @@ class TransactionManager(object): @measure_func("_send_new_transaction") async def send_new_transaction( - self, - destination: str, - pending_pdus: List[Tuple[EventBase, int]], - pending_edus: List[Edu], - ): + self, destination: str, pdus: List[EventBase], edus: List[Edu], + ) -> bool: + """ + Args: + destination: The destination to send to (e.g. 'example.org') + pdus: In-order list of PDUs to send + edus: List of EDUs to send + + Returns: + True iff the transaction was successful + """ # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -68,20 +73,14 @@ class TransactionManager(object): span_contexts = [] keep_destination = whitelisted_homeserver(destination) - for edu in pending_edus: + for edu in edus: context = edu.get_context() if context: - span_contexts.append(extract_text_map(json.loads(context))) + span_contexts.append(extract_text_map(json_decoder.decode(context))) if keep_destination: edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): - - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - success = True logger.debug("TX [%s] _attempt_new_transaction", destination)