diff options
author | Eric Eastwood <erice@element.io> | 2022-09-21 23:32:56 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-09-21 23:32:56 -0500 |
commit | b23b3e4b296908224f998bf234453690d73d8ba2 (patch) | |
tree | 79ca5e4c5b3e9b7ea7776625a29f3be5a889df0c | |
parent | Fix `have_seen_event` cache not being invalidated (diff) | |
download | synapse-b23b3e4b296908224f998bf234453690d73d8ba2.tar.xz |
Calculate the stream_ordering from newest -> oldest (in the correct order) and persist in the oldest -> newest to get the least missing prev_event fetch thrashing
-rw-r--r-- | docker/complement/conf/workers-shared-extra.yaml.j2 | 2 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 33 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 4 |
3 files changed, 36 insertions, 3 deletions
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 9e554a865e..5a53782da9 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -31,7 +31,7 @@ federation_ip_range_blacklist: [] # Disable server rate-limiting rc_federation: window_size: 1000 - sleep_limit: 10 + sleep_limit: 99999 sleep_delay: 500 reject_limit: 99999 concurrent: 3 diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index c87925aa51..40fbcce53a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -137,6 +137,7 @@ class FederationEventHandler: """ def __init__(self, hs: "HomeServer"): + self.hs = hs self._store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state @@ -644,9 +645,39 @@ class FederationEventHandler: f"room {ev.room_id}, when we were backfilling in {room_id}" ) + # foo + # + # We expect the events from the `/backfill`response to start from + # `?v` and include events that preceded it (so the list will be + # newest -> oldest). This is at-most a convention between Synapse + # servers as the order is not specced. + # + # Reverse the list of events + reverse_chronological_events = events + chronological_events = reverse_chronological_events[::-1] + + from synapse.storage.util.id_generators import AbstractStreamIdGenerator + + # This should only exist on instances that are configured to write + assert ( + self._instance_name in self.hs.config.worker.writers.events + ), "Can only instantiate xxxfoobarbaz on master" + + # Since we have been configured to write, we ought to have id generators, + # rather than id trackers. + assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator) + stream_ordering_manager = self._store._backfill_id_gen.get_next_mult( + len(reverse_chronological_events) + ) + async with stream_ordering_manager as stream_orderings: + for event, stream in zip( + reverse_chronological_events, stream_orderings + ): + event.internal_metadata.stream_ordering = stream + await self._process_pulled_events( dest, - events, + chronological_events, backfilled=True, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5932668f2f..9f88ea8c7f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -209,7 +209,9 @@ class PersistEventsStore: async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): - event.internal_metadata.stream_ordering = stream + # foo + if event.internal_metadata.stream_ordering is None: + event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( "persist_events", |