summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py29
1 files changed, 20 insertions, 9 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index a6cb3ba58f..774ecd81b6 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -353,21 +353,25 @@ class FederationSender(AbstractFederationSender):
                 last_token = await self.store.get_federation_out_pos("events")
                 (
                     next_token,
-                    events,
                     event_to_received_ts,
-                ) = await self.store.get_all_new_events_stream(
+                ) = await self.store.get_all_new_event_ids_stream(
                     last_token, self._last_poked_id, limit=100
                 )
 
+                event_ids = event_to_received_ts.keys()
+                event_entries = await self.store.get_unredacted_events_from_cache_or_db(
+                    event_ids
+                )
+
                 logger.debug(
                     "Handling %i -> %i: %i events to send (current id %i)",
                     last_token,
                     next_token,
-                    len(events),
+                    len(event_entries),
                     self._last_poked_id,
                 )
 
-                if not events and next_token >= self._last_poked_id:
+                if not event_entries and next_token >= self._last_poked_id:
                     logger.debug("All events processed")
                     break
 
@@ -508,8 +512,14 @@ class FederationSender(AbstractFederationSender):
                             await handle_event(event)
 
                 events_by_room: Dict[str, List[EventBase]] = {}
-                for event in events:
-                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                for event_id in event_ids:
+                    # `event_entries` is unsorted, so we have to iterate over `event_ids`
+                    # to ensure the events are in the right order
+                    event_cache = event_entries.get(event_id)
+                    if event_cache:
+                        event = event_cache.event
+                        events_by_room.setdefault(event.room_id, []).append(event)
 
                 await make_deferred_yieldable(
                     defer.gatherResults(
@@ -524,9 +534,10 @@ class FederationSender(AbstractFederationSender):
                 logger.debug("Successfully handled up to %i", next_token)
                 await self.store.update_federation_out_pos("events", next_token)
 
-                if events:
+                if event_entries:
                     now = self.clock.time_msec()
-                    ts = event_to_received_ts[events[-1].event_id]
+                    last_id = next(reversed(event_ids))
+                    ts = event_to_received_ts[last_id]
                     assert ts is not None
 
                     synapse.metrics.event_processing_lag.labels(
@@ -536,7 +547,7 @@ class FederationSender(AbstractFederationSender):
                         "federation_sender"
                     ).set(ts)
 
-                    events_processed_counter.inc(len(events))
+                    events_processed_counter.inc(len(event_entries))
 
                     event_processing_loop_room_count.labels("federation_sender").inc(
                         len(events_by_room)