diff options
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 93 |
1 files changed, 44 insertions, 49 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 78f9d40a3a..099ace28c1 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, + event_processing_loop_counter, + event_processing_loop_room_count, events_processed_counter, sent_edus_counter, sent_transactions_counter, @@ -56,6 +58,7 @@ class TransactionQueue(object): """ def __init__(self, hs): + self.hs = hs self.server_name = hs.hostname self.store = hs.get_datastore() @@ -134,26 +137,6 @@ class TransactionQueue(object): self._processing_pending_presence = False - def can_send_to(self, destination): - """Can we send messages to the given server? - - We can't send messages to ourselves. If we are running on localhost - then we can only federation with other servers running on localhost. - Otherwise we only federate with servers on a public domain. - - Args: - destination(str): The server we are possibly trying to send to. - Returns: - bool: True if we can send to the server. - """ - - if destination == self.server_name: - return False - if self.server_name.startswith("localhost"): - return destination.startswith("localhost") - else: - return not destination.startswith("localhost") - def notify_new_events(self, current_id): """This gets called when we have some new events we might want to send out to other servers. @@ -200,9 +183,7 @@ class TransactionQueue(object): # banned then it won't receive the event because it won't # be in the room after the ban. destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], + event.room_id, latest_event_ids=event.prev_event_ids(), ) except Exception: logger.exception( @@ -253,7 +234,13 @@ class TransactionQueue(object): synapse.metrics.event_processing_last_ts.labels( "federation_sender").set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(events)) + + event_processing_loop_room_count.labels( + "federation_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( "federation_sender").set(next_token) @@ -270,10 +257,7 @@ class TransactionQueue(object): self._order += 1 destinations = set(destinations) - destinations = set( - dest for dest in destinations if self.can_send_to(dest) - ) - + destinations.discard(self.server_name) logger.debug("Sending to: %s", str(destinations)) if not destinations: @@ -300,6 +284,9 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + if not self.hs.config.use_presence: + # No-op if presence is disabled. + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled @@ -346,7 +333,7 @@ class TransactionQueue(object): for destinations, states in hosts_and_states: for destination in destinations: - if not self.can_send_to(destination): + if destination == self.server_name: continue self.pending_presence_by_dest.setdefault( @@ -365,7 +352,8 @@ class TransactionQueue(object): content=content, ) - if not self.can_send_to(destination): + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") return sent_edus_counter.inc() @@ -380,10 +368,8 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) def send_device_messages(self, destination): - if destination == self.server_name or destination == "localhost": - return - - if not self.can_send_to(destination): + if destination == self.server_name: + logger.info("Not sending device update to ourselves") return self._attempt_new_transaction(destination) @@ -451,7 +437,19 @@ class TransactionQueue(object): # pending_transactions flag. pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + + # We can only include at most 50 PDUs per transactions + pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:] + if leftover_pdus: + self.pending_pdus_by_dest[destination] = leftover_pdus + pending_edus = self.pending_edus_by_dest.pop(destination, []) + + # We can only include at most 100 EDUs per transactions + pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] + if leftover_edus: + self.pending_edus_by_dest[destination] = leftover_edus + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_edus.extend( @@ -633,14 +631,6 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) except HttpResponseException as e: code = e.code response = e.response @@ -657,19 +647,24 @@ class TransactionQueue(object): destination, txn_id, code ) - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) - if code != 200: + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, txn_id, e_id, r, + ) + else: for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination + logger.warn( + "TX [%s] {%s} Failed to send event %s", + destination, txn_id, p.event_id, ) success = False |