summary refs log tree commit diff
path: root/synapse/federation/sender/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r--synapse/federation/sender/__init__.py70
1 files changed, 37 insertions, 33 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4f0f939102..766c5a37cd 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,8 +44,8 @@ sent_pdus_destination_dist_count = Counter(
 )
 
 sent_pdus_destination_dist_total = Counter(
-    "synapse_federation_client_sent_pdu_destinations:total", ""
-    "Total number of PDUs queued for sending across all destinations",
+    "synapse_federation_client_sent_pdu_destinations:total",
+    "" "Total number of PDUs queued for sending across all destinations",
 )
 
 
@@ -63,14 +63,15 @@ class FederationSender(object):
         self._transaction_manager = TransactionManager(hs)
 
         # map from destination to PerDestinationQueue
-        self._per_destination_queues = {}   # type: dict[str, PerDestinationQueue]
+        self._per_destination_queues = {}  # type: dict[str, PerDestinationQueue]
 
         LaterGauge(
             "synapse_federation_transaction_queue_pending_destinations",
             "",
             [],
             lambda: sum(
-                1 for d in self._per_destination_queues.values()
+                1
+                for d in self._per_destination_queues.values()
                 if d.transmission_loop_running
             ),
         )
@@ -108,8 +109,9 @@ class FederationSender(object):
         # awaiting a call to flush_read_receipts_for_room. The presence of an entry
         # here for a given room means that we are rate-limiting RR flushes to that room,
         # and that there is a pending call to _flush_rrs_for_room in the system.
-        self._queues_awaiting_rr_flush_by_room = {
-        }   # type: dict[str, set[PerDestinationQueue]]
+        self._queues_awaiting_rr_flush_by_room = (
+            {}
+        )  # type: dict[str, set[PerDestinationQueue]]
 
         self._rr_txn_interval_per_room_ms = (
             1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
@@ -141,8 +143,7 @@ class FederationSender(object):
 
         # fire off a processing loop in the background
         run_as_background_process(
-            "process_event_queue_for_federation",
-            self._process_event_queue_loop,
+            "process_event_queue_for_federation", self._process_event_queue_loop
         )
 
     @defer.inlineCallbacks
@@ -152,7 +153,7 @@ class FederationSender(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=100,
+                    last_token, self._last_poked_id, limit=100
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -168,6 +169,9 @@ class FederationSender(object):
                     if not is_mine and send_on_behalf_of is None:
                         return
 
+                    if not event.internal_metadata.should_proactively_send():
+                        return
+
                     try:
                         # Get the state from before the event.
                         # We need to make sure that this is the state from before
@@ -176,7 +180,7 @@ class FederationSender(object):
                         # banned then it won't receive the event because it won't
                         # be in the room after the ban.
                         destinations = yield self.state.get_current_hosts_in_room(
-                            event.room_id, latest_event_ids=event.prev_event_ids(),
+                            event.room_id, latest_event_ids=event.prev_event_ids()
                         )
                     except Exception:
                         logger.exception(
@@ -206,37 +210,40 @@ class FederationSender(object):
                 for event in events:
                     events_by_room.setdefault(event.room_id, []).append(event)
 
-                yield logcontext.make_deferred_yieldable(defer.gatherResults(
-                    [
-                        logcontext.run_in_background(handle_room_events, evs)
-                        for evs in itervalues(events_by_room)
-                    ],
-                    consumeErrors=True
-                ))
-
-                yield self.store.update_federation_out_pos(
-                    "events", next_token
+                yield logcontext.make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            logcontext.run_in_background(handle_room_events, evs)
+                            for evs in itervalues(events_by_room)
+                        ],
+                        consumeErrors=True,
+                    )
                 )
 
+                yield self.store.update_federation_out_pos("events", next_token)
+
                 if events:
                     now = self.clock.time_msec()
                     ts = yield self.store.get_received_ts(events[-1].event_id)
 
                     synapse.metrics.event_processing_lag.labels(
-                        "federation_sender").set(now - ts)
+                        "federation_sender"
+                    ).set(now - ts)
                     synapse.metrics.event_processing_last_ts.labels(
-                        "federation_sender").set(ts)
+                        "federation_sender"
+                    ).set(ts)
 
                     events_processed_counter.inc(len(events))
 
-                    event_processing_loop_room_count.labels(
-                        "federation_sender"
-                    ).inc(len(events_by_room))
+                    event_processing_loop_room_count.labels("federation_sender").inc(
+                        len(events_by_room)
+                    )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
 
                 synapse.metrics.event_processing_positions.labels(
-                    "federation_sender").set(next_token)
+                    "federation_sender"
+                ).set(next_token)
 
         finally:
             self._is_processing = False
@@ -309,9 +316,7 @@ class FederationSender(object):
         if not domains:
             return
 
-        queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(
-            room_id
-        )
+        queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
 
         # if there is no flush yet scheduled, we will send out these receipts with
         # immediate flushes, and schedule the next flush for this room.
@@ -374,10 +379,9 @@ class FederationSender(object):
         # updates in quick succession are correctly handled.
         # We only want to send presence for our own users, so lets always just
         # filter here just in case.
-        self.pending_presence.update({
-            state.user_id: state for state in states
-            if self.is_mine_id(state.user_id)
-        })
+        self.pending_presence.update(
+            {state.user_id: state for state in states if self.is_mine_id(state.user_id)}
+        )
 
         # We then handle the new pending presence in batches, first figuring
         # out the destinations we need to send each state to and then poking it