summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py70
-rw-r--r--synapse/federation/sender/per_destination_queue.py7
-rw-r--r--synapse/federation/sender/transaction_manager.py38
3 files changed, 57 insertions, 58 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4f0f939102..766c5a37cd 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,8 +44,8 @@ sent_pdus_destination_dist_count = Counter(
 )
 
 sent_pdus_destination_dist_total = Counter(
-    "synapse_federation_client_sent_pdu_destinations:total", ""
-    "Total number of PDUs queued for sending across all destinations",
+    "synapse_federation_client_sent_pdu_destinations:total",
+    "" "Total number of PDUs queued for sending across all destinations",
 )
 
 
@@ -63,14 +63,15 @@ class FederationSender(object):
         self._transaction_manager = TransactionManager(hs)
 
         # map from destination to PerDestinationQueue
-        self._per_destination_queues = {}   # type: dict[str, PerDestinationQueue]
+        self._per_destination_queues = {}  # type: dict[str, PerDestinationQueue]
 
         LaterGauge(
             "synapse_federation_transaction_queue_pending_destinations",
             "",
             [],
             lambda: sum(
-                1 for d in self._per_destination_queues.values()
+                1
+                for d in self._per_destination_queues.values()
                 if d.transmission_loop_running
             ),
         )
@@ -108,8 +109,9 @@ class FederationSender(object):
         # awaiting a call to flush_read_receipts_for_room. The presence of an entry
         # here for a given room means that we are rate-limiting RR flushes to that room,
         # and that there is a pending call to _flush_rrs_for_room in the system.
-        self._queues_awaiting_rr_flush_by_room = {
-        }   # type: dict[str, set[PerDestinationQueue]]
+        self._queues_awaiting_rr_flush_by_room = (
+            {}
+        )  # type: dict[str, set[PerDestinationQueue]]
 
         self._rr_txn_interval_per_room_ms = (
             1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
@@ -141,8 +143,7 @@ class FederationSender(object):
 
         # fire off a processing loop in the background
         run_as_background_process(
-            "process_event_queue_for_federation",
-            self._process_event_queue_loop,
+            "process_event_queue_for_federation", self._process_event_queue_loop
         )
 
     @defer.inlineCallbacks
@@ -152,7 +153,7 @@ class FederationSender(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=100,
+                    last_token, self._last_poked_id, limit=100
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -168,6 +169,9 @@ class FederationSender(object):
                     if not is_mine and send_on_behalf_of is None:
                         return
 
+                    if not event.internal_metadata.should_proactively_send():
+                        return
+
                     try:
                         # Get the state from before the event.
                         # We need to make sure that this is the state from before
@@ -176,7 +180,7 @@ class FederationSender(object):
                         # banned then it won't receive the event because it won't
                         # be in the room after the ban.
                         destinations = yield self.state.get_current_hosts_in_room(
-                            event.room_id, latest_event_ids=event.prev_event_ids(),
+                            event.room_id, latest_event_ids=event.prev_event_ids()
                         )
                     except Exception:
                         logger.exception(
@@ -206,37 +210,40 @@ class FederationSender(object):
                 for event in events:
                     events_by_room.setdefault(event.room_id, []).append(event)
 
-                yield logcontext.make_deferred_yieldable(defer.gatherResults(
-                    [
-                        logcontext.run_in_background(handle_room_events, evs)
-                        for evs in itervalues(events_by_room)
-                    ],
-                    consumeErrors=True
-                ))
-
-                yield self.store.update_federation_out_pos(
-                    "events", next_token
+                yield logcontext.make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            logcontext.run_in_background(handle_room_events, evs)
+                            for evs in itervalues(events_by_room)
+                        ],
+                        consumeErrors=True,
+                    )
                 )
 
+                yield self.store.update_federation_out_pos("events", next_token)
+
                 if events:
                     now = self.clock.time_msec()
                     ts = yield self.store.get_received_ts(events[-1].event_id)
 
                     synapse.metrics.event_processing_lag.labels(
-                        "federation_sender").set(now - ts)
+                        "federation_sender"
+                    ).set(now - ts)
                     synapse.metrics.event_processing_last_ts.labels(
-                        "federation_sender").set(ts)
+                        "federation_sender"
+                    ).set(ts)
 
                     events_processed_counter.inc(len(events))
 
-                    event_processing_loop_room_count.labels(
-                        "federation_sender"
-                    ).inc(len(events_by_room))
+                    event_processing_loop_room_count.labels("federation_sender").inc(
+                        len(events_by_room)
+                    )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
 
                 synapse.metrics.event_processing_positions.labels(
-                    "federation_sender").set(next_token)
+                    "federation_sender"
+                ).set(next_token)
 
         finally:
             self._is_processing = False
@@ -309,9 +316,7 @@ class FederationSender(object):
         if not domains:
             return
 
-        queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(
-            room_id
-        )
+        queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
 
         # if there is no flush yet scheduled, we will send out these receipts with
         # immediate flushes, and schedule the next flush for this room.
@@ -374,10 +379,9 @@ class FederationSender(object):
         # updates in quick succession are correctly handled.
         # We only want to send presence for our own users, so lets always just
         # filter here just in case.
-        self.pending_presence.update({
-            state.user_id: state for state in states
-            if self.is_mine_id(state.user_id)
-        })
+        self.pending_presence.update(
+            {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
+        )
 
         # We then handle the new pending presence in batches, first figuring
         # out the destinations we need to send each state to and then poking it
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 22a2735405..9aab12c0d3 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -360,7 +360,7 @@ class PerDestinationQueue(object):
 
         # Retrieve list of new device updates to send to the destination
         now_stream_id, results = yield self._store.get_devices_by_remote(
-            self._destination, last_device_list, limit=limit,
+            self._destination, last_device_list, limit=limit
         )
         edus = [
             Edu(
@@ -381,10 +381,7 @@ class PerDestinationQueue(object):
         last_device_stream_id = self._last_device_stream_id
         to_device_stream_id = self._store.get_to_device_stream_token()
         contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
-            self._destination,
-            last_device_stream_id,
-            to_device_stream_id,
-            limit,
+            self._destination, last_device_stream_id, to_device_stream_id, limit
         )
         edus = [
             Edu(
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 35e6b8ff5b..c987bb9a0d 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -29,9 +29,10 @@ class TransactionManager(object):
 
     shared between PerDestinationQueue objects
     """
+
     def __init__(self, hs):
         self._server_name = hs.hostname
-        self.clock = hs.get_clock()   # nb must be called this for @measure_func
+        self.clock = hs.get_clock()  # nb must be called this for @measure_func
         self._store = hs.get_datastore()
         self._transaction_actions = TransactionActions(self._store)
         self._transport_layer = hs.get_federation_transport_client()
@@ -55,9 +56,9 @@ class TransactionManager(object):
         txn_id = str(self._next_txn_id)
 
         logger.debug(
-            "TX [%s] {%s} Attempting new transaction"
-            " (pdus: %d, edus: %d)",
-            destination, txn_id,
+            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+            destination,
+            txn_id,
             len(pdus),
             len(edus),
         )
@@ -79,9 +80,9 @@ class TransactionManager(object):
 
         logger.debug("TX [%s] Persisted transaction", destination)
         logger.info(
-            "TX [%s] {%s} Sending transaction [%s],"
-            " (PDUs: %d, EDUs: %d)",
-            destination, txn_id,
+            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+            destination,
+            txn_id,
             transaction.transaction_id,
             len(pdus),
             len(edus),
@@ -112,20 +113,12 @@ class TransactionManager(object):
             response = e.response
 
             if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info(
-                    "TX [%s] {%s} got %d response",
-                    destination, txn_id, code
-                )
+                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
                 raise e
 
-        logger.info(
-            "TX [%s] {%s} got %d response",
-            destination, txn_id, code
-        )
+        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
 
-        yield self._transaction_actions.delivered(
-            transaction, code, response
-        )
+        yield self._transaction_actions.delivered(transaction, code, response)
 
         logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
 
@@ -134,13 +127,18 @@ class TransactionManager(object):
                 if "error" in r:
                     logger.warn(
                         "TX [%s] {%s} Remote returned error for %s: %s",
-                        destination, txn_id, e_id, r,
+                        destination,
+                        txn_id,
+                        e_id,
+                        r,
                     )
         else:
             for p in pdus:
                 logger.warn(
                     "TX [%s] {%s} Failed to send event %s",
-                    destination, txn_id, p.event_id,
+                    destination,
+                    txn_id,
+                    p.event_id,
                 )
             success = False