diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 94a65ac65f..fc1d8c88a7 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -62,12 +62,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
- "synapse_federation_client_sent_pdu_destinations:count",
+ "synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
- "synapse_federation_client_sent_pdu_destinations:total",
+ "synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
)
@@ -353,21 +353,25 @@ class FederationSender(AbstractFederationSender):
last_token = await self.store.get_federation_out_pos("events")
(
next_token,
- events,
event_to_received_ts,
- ) = await self.store.get_all_new_events_stream(
+ ) = await self.store.get_all_new_event_ids_stream(
last_token, self._last_poked_id, limit=100
)
+ event_ids = event_to_received_ts.keys()
+ event_entries = await self.store.get_unredacted_events_from_cache_or_db(
+ event_ids
+ )
+
logger.debug(
"Handling %i -> %i: %i events to send (current id %i)",
last_token,
next_token,
- len(events),
+ len(event_entries),
self._last_poked_id,
)
- if not events and next_token >= self._last_poked_id:
+ if not event_entries and next_token >= self._last_poked_id:
logger.debug("All events processed")
break
@@ -430,7 +434,23 @@ class FederationSender(AbstractFederationSender):
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
destinations = set()
- else:
+
+ if destinations is None:
+ # During partial join we use the set of servers that we got
+ # when beginning the join. It's still possible that we send
+ # events to servers that left the room in the meantime, but
+ # we consider that an acceptable risk since it is only our own
+ # events that we leak and not other server's ones.
+ partial_state_destinations = (
+ await self.store.get_partial_state_servers_at_join(
+ event.room_id
+ )
+ )
+
+ if len(partial_state_destinations) > 0:
+ destinations = partial_state_destinations
+
+ if destinations is None:
# We check the external cache for the destinations, which is
# stored per state group.
@@ -441,6 +461,19 @@ class FederationSender(AbstractFederationSender):
destinations = await self._external_cache.get(
"get_joined_hosts", str(sg)
)
+ if destinations is None:
+ # Add logging to help track down #13444
+ logger.info(
+ "Unexpectedly did not have cached destinations for %s / %s",
+ sg,
+ event.event_id,
+ )
+ else:
+ # Add logging to help track down #13444
+ logger.info(
+ "Unexpectedly did not have cached prev group for %s",
+ event.event_id,
+ )
if destinations is None:
try:
@@ -495,8 +528,14 @@ class FederationSender(AbstractFederationSender):
await handle_event(event)
events_by_room: Dict[str, List[EventBase]] = {}
- for event in events:
- events_by_room.setdefault(event.room_id, []).append(event)
+
+ for event_id in event_ids:
+ # `event_entries` is unsorted, so we have to iterate over `event_ids`
+ # to ensure the events are in the right order
+ event_cache = event_entries.get(event_id)
+ if event_cache:
+ event = event_cache.event
+ events_by_room.setdefault(event.room_id, []).append(event)
await make_deferred_yieldable(
defer.gatherResults(
@@ -511,9 +550,9 @@ class FederationSender(AbstractFederationSender):
logger.debug("Successfully handled up to %i", next_token)
await self.store.update_federation_out_pos("events", next_token)
- if events:
+ if event_entries:
now = self.clock.time_msec()
- ts = event_to_received_ts[events[-1].event_id]
+ ts = max(t for t in event_to_received_ts.values() if t)
assert ts is not None
synapse.metrics.event_processing_lag.labels(
@@ -523,7 +562,7 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).set(ts)
- events_processed_counter.inc(len(events))
+ events_processed_counter.inc(len(event_entries))
event_processing_loop_room_count.labels("federation_sender").inc(
len(events_by_room)
|