diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2022-02-15 14:33:28 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-15 14:33:28 +0000 |
commit | bab2394aa9b514f51feb2c378a39d92143e9cec8 (patch) | |
tree | 110eff1181964669364c4a896e8b69916f38756d /synapse/handlers/federation_event.py | |
parent | Track cache invalidations (#12000) (diff) | |
download | synapse-bab2394aa9b514f51feb2c378a39d92143e9cec8.tar.xz |
`_auth_and_persist_outliers`: drop events we have already seen (#11994)
We already have two copies of this code, in 2/3 of the callers of `_auth_and_persist_outliers`. Before I add a third, let's push it down.
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/federation_event.py | 44 |
1 files changed, 20 insertions, 24 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9edc7369d6..6dc27a38f3 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -419,8 +419,6 @@ class FederationEventHandler: Raises: SynapseError if the response is in some way invalid. """ - event_map = {e.event_id: e for e in itertools.chain(auth_events, state)} - create_event = None for e in auth_events: if (e.type, e.state_key) == (EventTypes.Create, ""): @@ -439,11 +437,6 @@ class FederationEventHandler: if room_version.identifier != room_version_id: raise SynapseError(400, "Room version mismatch") - # filter out any events we have already seen - seen_remotes = await self._store.have_seen_events(room_id, event_map.keys()) - for s in seen_remotes: - event_map.pop(s, None) - # persist the auth chain and state events. # # any invalid events here will be marked as rejected, and we'll carry on. @@ -455,7 +448,9 @@ class FederationEventHandler: # signatures right now doesn't mean that we will *never* be able to, so it # is premature to reject them. # - await self._auth_and_persist_outliers(room_id, event_map.values()) + await self._auth_and_persist_outliers( + room_id, itertools.chain(auth_events, state) + ) # and now persist the join event itself. logger.info("Peristing join-via-remote %s", event) @@ -1245,6 +1240,16 @@ class FederationEventHandler: """ event_map = {event.event_id: event for event in events} + # filter out any events we have already seen. This might happen because + # the events were eagerly pushed to us (eg, during a room join), or because + # another thread has raced against us since we decided to request the event. + # + # This is just an optimisation, so it doesn't need to be watertight - the event + # persister does another round of deduplication. + seen_remotes = await self._store.have_seen_events(room_id, event_map.keys()) + for s in seen_remotes: + event_map.pop(s, None) + # XXX: it might be possible to kick this process off in parallel with fetching # the events. while event_map: @@ -1717,31 +1722,22 @@ class FederationEventHandler: event_id: the event for which we are lacking auth events """ try: - remote_event_map = { - e.event_id: e - for e in await self._federation_client.get_event_auth( - destination, room_id, event_id - ) - } + remote_events = await self._federation_client.get_event_auth( + destination, room_id, event_id + ) + except RequestSendFailed as e1: # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. logger.info("Failed to get event auth from remote: %s", e1) return - logger.info("/event_auth returned %i events", len(remote_event_map)) + logger.info("/event_auth returned %i events", len(remote_events)) # `event` may be returned, but we should not yet process it. - remote_event_map.pop(event_id, None) - - # nor should we reprocess any events we have already seen. - seen_remotes = await self._store.have_seen_events( - room_id, remote_event_map.keys() - ) - for s in seen_remotes: - remote_event_map.pop(s, None) + remote_auth_events = (e for e in remote_events if e.event_id != event_id) - await self._auth_and_persist_outliers(room_id, remote_event_map.values()) + await self._auth_and_persist_outliers(room_id, remote_auth_events) async def _update_context_for_auth_events( self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] |