summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-21 23:32:56 -0500
committerEric Eastwood <erice@element.io>2022-09-21 23:32:56 -0500
commitb23b3e4b296908224f998bf234453690d73d8ba2 (patch)
tree79ca5e4c5b3e9b7ea7776625a29f3be5a889df0c
parentFix `have_seen_event` cache not being invalidated (diff)
downloadsynapse-b23b3e4b296908224f998bf234453690d73d8ba2.tar.xz
Calculate the stream_ordering from newest -> oldest (in the correct order) and persist in the oldest -> newest to get the least missing prev_event fetch thrashing
-rw-r--r--docker/complement/conf/workers-shared-extra.yaml.j22
-rw-r--r--synapse/handlers/federation_event.py33
-rw-r--r--synapse/storage/databases/main/events.py4
3 files changed, 36 insertions, 3 deletions
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
index 9e554a865e..5a53782da9 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -31,7 +31,7 @@ federation_ip_range_blacklist: []
 # Disable server rate-limiting
 rc_federation:
   window_size: 1000
-  sleep_limit: 10
+  sleep_limit: 99999
   sleep_delay: 500
   reject_limit: 99999
   concurrent: 3
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index c87925aa51..40fbcce53a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -137,6 +137,7 @@ class FederationEventHandler:
     """
 
     def __init__(self, hs: "HomeServer"):
+        self.hs = hs
         self._store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
@@ -644,9 +645,39 @@ class FederationEventHandler:
                         f"room {ev.room_id}, when we were backfilling in {room_id}"
                     )
 
+            # foo
+            #
+            # We expect the events from the `/backfill`response to start from
+            # `?v` and include events that preceded it (so the list will be
+            # newest -> oldest). This is at-most a convention between Synapse
+            # servers as the order is not specced.
+            #
+            # Reverse the list of events
+            reverse_chronological_events = events
+            chronological_events = reverse_chronological_events[::-1]
+
+            from synapse.storage.util.id_generators import AbstractStreamIdGenerator
+
+            # This should only exist on instances that are configured to write
+            assert (
+                self._instance_name in self.hs.config.worker.writers.events
+            ), "Can only instantiate xxxfoobarbaz on master"
+
+            # Since we have been configured to write, we ought to have id generators,
+            # rather than id trackers.
+            assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
+            stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
+                len(reverse_chronological_events)
+            )
+            async with stream_ordering_manager as stream_orderings:
+                for event, stream in zip(
+                    reverse_chronological_events, stream_orderings
+                ):
+                    event.internal_metadata.stream_ordering = stream
+
             await self._process_pulled_events(
                 dest,
-                events,
+                chronological_events,
                 backfilled=True,
             )
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 5932668f2f..9f88ea8c7f 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -209,7 +209,9 @@ class PersistEventsStore:
 
         async with stream_ordering_manager as stream_orderings:
             for (event, _), stream in zip(events_and_contexts, stream_orderings):
-                event.internal_metadata.stream_ordering = stream
+                # foo
+                if event.internal_metadata.stream_ordering is None:
+                    event.internal_metadata.stream_ordering = stream
 
             await self.db_pool.runInteraction(
                 "persist_events",