diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a6cb3ba58f..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -353,21 +353,25 @@ class FederationSender(AbstractFederationSender):
last_token = await self.store.get_federation_out_pos("events")
(
next_token,
- events,
event_to_received_ts,
- ) = await self.store.get_all_new_events_stream(
+ ) = await self.store.get_all_new_event_ids_stream(
last_token, self._last_poked_id, limit=100
)
+ event_ids = event_to_received_ts.keys()
+ event_entries = await self.store.get_unredacted_events_from_cache_or_db(
+ event_ids
+ )
+
logger.debug(
"Handling %i -> %i: %i events to send (current id %i)",
last_token,
next_token,
- len(events),
+ len(event_entries),
self._last_poked_id,
)
- if not events and next_token >= self._last_poked_id:
+ if not event_entries and next_token >= self._last_poked_id:
logger.debug("All events processed")
break
@@ -430,7 +434,23 @@ class FederationSender(AbstractFederationSender):
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
destinations = set()
- else:
+
+ if destinations is None:
+ # During partial join we use the set of servers that we got
+ # when beginning the join. It's still possible that we send
+ # events to servers that left the room in the meantime, but
+ # we consider that an acceptable risk since it is only our own
+ # events that we leak and not other server's ones.
+ partial_state_destinations = (
+ await self.store.get_partial_state_servers_at_join(
+ event.room_id
+ )
+ )
+
+ if len(partial_state_destinations) > 0:
+ destinations = partial_state_destinations
+
+ if destinations is None:
# We check the external cache for the destinations, which is
# stored per state group.
@@ -508,8 +528,14 @@ class FederationSender(AbstractFederationSender):
await handle_event(event)
events_by_room: Dict[str, List[EventBase]] = {}
- for event in events:
- events_by_room.setdefault(event.room_id, []).append(event)
+
+ for event_id in event_ids:
+ # `event_entries` is unsorted, so we have to iterate over `event_ids`
+ # to ensure the events are in the right order
+ event_cache = event_entries.get(event_id)
+ if event_cache:
+ event = event_cache.event
+ events_by_room.setdefault(event.room_id, []).append(event)
await make_deferred_yieldable(
defer.gatherResults(
@@ -524,9 +550,9 @@ class FederationSender(AbstractFederationSender):
logger.debug("Successfully handled up to %i", next_token)
await self.store.update_federation_out_pos("events", next_token)
- if events:
+ if event_entries:
now = self.clock.time_msec()
- ts = event_to_received_ts[events[-1].event_id]
+ ts = max(t for t in event_to_received_ts.values() if t)
assert ts is not None
synapse.metrics.event_processing_lag.labels(
@@ -536,7 +562,7 @@ class FederationSender(AbstractFederationSender):
"federation_sender"
).set(ts)
- events_processed_counter.inc(len(events))
+ events_processed_counter.inc(len(event_entries))
event_processing_loop_room_count.labels("federation_sender").inc(
len(events_by_room)
@@ -621,7 +647,7 @@ class FederationSender(AbstractFederationSender):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+ domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
|