diff options
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 70 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 7 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 38 |
3 files changed, 57 insertions, 58 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4f0f939102..766c5a37cd 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -44,8 +44,8 @@ sent_pdus_destination_dist_count = Counter( ) sent_pdus_destination_dist_total = Counter( - "synapse_federation_client_sent_pdu_destinations:total", "" - "Total number of PDUs queued for sending across all destinations", + "synapse_federation_client_sent_pdu_destinations:total", + "" "Total number of PDUs queued for sending across all destinations", ) @@ -63,14 +63,15 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: dict[str, PerDestinationQueue] + self._per_destination_queues = {} # type: dict[str, PerDestinationQueue] LaterGauge( "synapse_federation_transaction_queue_pending_destinations", "", [], lambda: sum( - 1 for d in self._per_destination_queues.values() + 1 + for d in self._per_destination_queues.values() if d.transmission_loop_running ), ) @@ -108,8 +109,9 @@ class FederationSender(object): # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room = { - } # type: dict[str, set[PerDestinationQueue]] + self._queues_awaiting_rr_flush_by_room = ( + {} + ) # type: dict[str, set[PerDestinationQueue]] self._rr_txn_interval_per_room_ms = ( 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second @@ -141,8 +143,7 @@ class FederationSender(object): # fire off a processing loop in the background run_as_background_process( - "process_event_queue_for_federation", - self._process_event_queue_loop, + "process_event_queue_for_federation", self._process_event_queue_loop ) @defer.inlineCallbacks @@ -152,7 +153,7 @@ class FederationSender(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=100, + last_token, self._last_poked_id, limit=100 ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -168,6 +169,9 @@ class FederationSender(object): if not is_mine and send_on_behalf_of is None: return + if not event.internal_metadata.should_proactively_send(): + return + try: # Get the state from before the event. # We need to make sure that this is the state from before @@ -176,7 +180,7 @@ class FederationSender(object): # banned then it won't receive the event because it won't # be in the room after the ban. destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=event.prev_event_ids(), + event.room_id, latest_event_ids=event.prev_event_ids() ) except Exception: logger.exception( @@ -206,37 +210,40 @@ class FederationSender(object): for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background(handle_room_events, evs) - for evs in itervalues(events_by_room) - ], - consumeErrors=True - )) - - yield self.store.update_federation_out_pos( - "events", next_token + yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + logcontext.run_in_background(handle_room_events, evs) + for evs in itervalues(events_by_room) + ], + consumeErrors=True, + ) ) + yield self.store.update_federation_out_pos("events", next_token) + if events: now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_lag.labels( - "federation_sender").set(now - ts) + "federation_sender" + ).set(now - ts) synapse.metrics.event_processing_last_ts.labels( - "federation_sender").set(ts) + "federation_sender" + ).set(ts) events_processed_counter.inc(len(events)) - event_processing_loop_room_count.labels( - "federation_sender" - ).inc(len(events_by_room)) + event_processing_loop_room_count.labels("federation_sender").inc( + len(events_by_room) + ) event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( - "federation_sender").set(next_token) + "federation_sender" + ).set(next_token) finally: self._is_processing = False @@ -309,9 +316,7 @@ class FederationSender(object): if not domains: return - queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get( - room_id - ) + queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) # if there is no flush yet scheduled, we will send out these receipts with # immediate flushes, and schedule the next flush for this room. @@ -374,10 +379,9 @@ class FederationSender(object): # updates in quick succession are correctly handled. # We only want to send presence for our own users, so lets always just # filter here just in case. - self.pending_presence.update({ - state.user_id: state for state in states - if self.is_mine_id(state.user_id) - }) + self.pending_presence.update( + {state.user_id: state for state in states if self.is_mine_id(state.user_id)} + ) # We then handle the new pending presence in batches, first figuring # out the destinations we need to send each state to and then poking it diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 22a2735405..9aab12c0d3 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -360,7 +360,7 @@ class PerDestinationQueue(object): # Retrieve list of new device updates to send to the destination now_stream_id, results = yield self._store.get_devices_by_remote( - self._destination, last_device_list, limit=limit, + self._destination, last_device_list, limit=limit ) edus = [ Edu( @@ -381,10 +381,7 @@ class PerDestinationQueue(object): last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() contents, stream_id = yield self._store.get_new_device_msgs_for_remote( - self._destination, - last_device_stream_id, - to_device_stream_id, - limit, + self._destination, last_device_stream_id, to_device_stream_id, limit ) edus = [ Edu( diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 35e6b8ff5b..c987bb9a0d 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -29,9 +29,10 @@ class TransactionManager(object): shared between PerDestinationQueue objects """ + def __init__(self, hs): self._server_name = hs.hostname - self.clock = hs.get_clock() # nb must be called this for @measure_func + self.clock = hs.get_clock() # nb must be called this for @measure_func self._store = hs.get_datastore() self._transaction_actions = TransactionActions(self._store) self._transport_layer = hs.get_federation_transport_client() @@ -55,9 +56,9 @@ class TransactionManager(object): txn_id = str(self._next_txn_id) logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d)", - destination, txn_id, + "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + destination, + txn_id, len(pdus), len(edus), ) @@ -79,9 +80,9 @@ class TransactionManager(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d)", - destination, txn_id, + "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + destination, + txn_id, transaction.transaction_id, len(pdus), len(edus), @@ -112,20 +113,12 @@ class TransactionManager(object): response = e.response if e.code in (401, 404, 429) or 500 <= e.code: - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code - ) + logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) raise e - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code - ) + logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) - yield self._transaction_actions.delivered( - transaction, code, response - ) + yield self._transaction_actions.delivered(transaction, code, response) logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) @@ -134,13 +127,18 @@ class TransactionManager(object): if "error" in r: logger.warn( "TX [%s] {%s} Remote returned error for %s: %s", - destination, txn_id, e_id, r, + destination, + txn_id, + e_id, + r, ) else: for p in pdus: logger.warn( "TX [%s] {%s} Failed to send event %s", - destination, txn_id, p.event_id, + destination, + txn_id, + p.event_id, ) success = False |