diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-11-03 12:14:24 +0000 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-11-03 12:14:24 +0000 |
commit | 6e7488ce1166ffb1f70777b4db2e9008f3303a04 (patch) | |
tree | 4cd62ccc22a7eb9630b875e18e05cf4017f62218 /synapse/federation/transaction_queue.py | |
parent | Merge pull request #4047 from matrix-org/michaelkaye/dinsic_allow_user_direct... (diff) | |
parent | Merge branch 'release-v0.33.8' (diff) | |
download | synapse-6e7488ce1166ffb1f70777b4db2e9008f3303a04.tar.xz |
merge master into dinsic, again...
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 75 |
1 files changed, 30 insertions, 45 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 94d7423d01..3fdd63be95 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -137,26 +137,6 @@ class TransactionQueue(object): self._processing_pending_presence = False - def can_send_to(self, destination): - """Can we send messages to the given server? - - We can't send messages to ourselves. If we are running on localhost - then we can only federation with other servers running on localhost. - Otherwise we only federate with servers on a public domain. - - Args: - destination(str): The server we are possibly trying to send to. - Returns: - bool: True if we can send to the server. - """ - - if destination == self.server_name: - return False - if self.server_name.startswith("localhost"): - return destination.startswith("localhost") - else: - return not destination.startswith("localhost") - def notify_new_events(self, current_id): """This gets called when we have some new events we might want to send out to other servers. @@ -279,10 +259,7 @@ class TransactionQueue(object): self._order += 1 destinations = set(destinations) - destinations = set( - dest for dest in destinations if self.can_send_to(dest) - ) - + destinations.discard(self.server_name) logger.debug("Sending to: %s", str(destinations)) if not destinations: @@ -358,7 +335,7 @@ class TransactionQueue(object): for destinations, states in hosts_and_states: for destination in destinations: - if not self.can_send_to(destination): + if destination == self.server_name: continue self.pending_presence_by_dest.setdefault( @@ -377,7 +354,8 @@ class TransactionQueue(object): content=content, ) - if not self.can_send_to(destination): + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") return sent_edus_counter.inc() @@ -392,10 +370,8 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) def send_device_messages(self, destination): - if destination == self.server_name or destination == "localhost": - return - - if not self.can_send_to(destination): + if destination == self.server_name: + logger.info("Not sending device update to ourselves") return self._attempt_new_transaction(destination) @@ -463,7 +439,19 @@ class TransactionQueue(object): # pending_transactions flag. pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + + # We can only include at most 50 PDUs per transactions + pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:] + if leftover_pdus: + self.pending_pdus_by_dest[destination] = leftover_pdus + pending_edus = self.pending_edus_by_dest.pop(destination, []) + + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.pending_edus_by_dest[destination] = leftover_edus + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_edus.extend( @@ -645,14 +633,6 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) except HttpResponseException as e: code = e.code response = e.response @@ -669,19 +649,24 @@ class TransactionQueue(object): destination, txn_id, code ) - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) - if code != 200: + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, txn_id, e_id, r, + ) + else: for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination + logger.warn( + "TX [%s] {%s} Failed to send event %s", + destination, txn_id, p.event_id, ) success = False |